In [1]:
data = [
    {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "45211", "env_video_time": "38.0", "oper_time": "2024-11-06 00:32:29"},
    {"statis_date": "20241106", "serv_number": "13457692971", "video_id": "44963", "env_video_time": "38.0", "oper_time": "2024-11-06 00:32:37"},
    {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "48948", "env_video_time": "100.0", "oper_time": "2024-11-06 00:47:47"},
    {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "48082", "env_video_time": "72.0", "oper_time": "2024-11-07 00:23:46"},
    {"statis_date": "20241107", "serv_number": "13457692971", "video_id": "44963", "env_video_time": "0.0", "oper_time": "2024-11-07 00:24:58"}
]

In [2]:
import pandas as pd

# 创建DataFrame
df = pd.DataFrame(data)

# 显示DataFrame
print(df)

  statis_date  serv_number video_id env_video_time            oper_time
0    20241106  13487028369    45211           38.0  2024-11-06 00:32:29
1    20241106  13457692971    44963           38.0  2024-11-06 00:32:37
2    20241106  13485401573    48948          100.0  2024-11-06 00:47:47
3    20241106  13548466410    48082           72.0  2024-11-06 00:23:46
4    20241106  17837240647    44963            0.0  2024-11-06 00:24:58


In [None]:
def sample_data(df, sample_size=3000000):
    """
    按要求采样数据
    Args:
        df: 原始DataFrame
        sample_size: 目标采样大小，默认300万
    Returns:
        采样后的DataFrame
    """
    # 确保时间戳格式正确
    df['oper_time'] = pd.to_datetime(df['oper_time'], format='mixed')
    df['date'] = df['oper_time'].dt.date
    
    # 将serv_number转换为字符串并确保统一格式
    df['serv_number'] = df['serv_number'].astype(str).str.strip()
    
    # 按手机号和时间排序（使用字符串排序）
    df_sorted = df.sort_values(['serv_number', 'oper_time'], 
                             key=lambda x: x.astype(str) if x.name == 'serv_number' else x)
    
    # 获取前300万行
    sampled = df_sorted.head(sample_size)
    
    # 获取第300万行的用户和日期
    if len(df_sorted) > sample_size:
        last_user = sampled.iloc[-1]['serv_number']
        last_date = sampled.iloc[-1]['date']
        
        # 获取最后一个用户当天的所有剩余记录
        remaining_records = df_sorted[
            (df_sorted['serv_number'] == last_user) & 
            (df_sorted['date'] == last_date) & 
            (df_sorted.index > sampled.index[-1])
        ]
        
        # 将剩余记录添加到采样数据中
        sampled = pd.concat([sampled, remaining_records])
    
    return sampled

# 使用示例
if __name__ == "__main__":
    # 读取原始数据
    df = pd.read_csv('your_data.csv')
    
    # 查看数据类型
    print("采样前数据类型：")
    print(df.dtypes)
    
    # 进行采样
    sampled_df = sample_data(df, sample_size=3000000)
    
    print(f"\n原始数据大小: {len(df)}")
    print(f"采样后数据大小: {len(sampled_df)}")
    
    print("\n采样后数据类型：")
    print(sampled_df.dtypes)
    
    # 保存采样结果
    sampled_df.to_csv('sampled_data.csv', index=False)

In [24]:
from datetime import datetime
import numpy as np
from tqdm import tqdm


def generate_watch_count_labels_vectorized(df):
    """
    完全向量化的版本，适用于内存充足的情况
    """
    print(f"开始处理数据，总数据量: {len(df):,} 条")
    start_time = datetime.now()
    
    # 确保时间戳格式正确
    print("正在处理时间戳...")
    df['oper_time'] = pd.to_datetime(df['oper_time'], format='mixed')
    df['date'] = df['oper_time'].dt.date
    
    result = df.copy()
    
    # 使用apply替代循环
    def count_remaining(group):
        group = group.sort_values('oper_time')
        times = group['oper_time'].values
        return np.sum(times.reshape(-1, 1) <= times.reshape(1, -1), axis=1)
    
    # 获取分组信息并显示
    groups = df.groupby(['serv_number', 'date'])
    group_count = len(groups)
    print(f"总共需要处理 {group_count:,} 个用户日期组")
    
    # 使用tqdm包装groupby的apply操作
    tqdm.pandas(desc="处理进度", ncols=100)
    result['label'] = groups.progress_apply(
        lambda x: pd.Series(count_remaining(x), index=x.index)
    ).values
    
    # 计算总耗时
    end_time = datetime.now()
    process_time = end_time - start_time
    print(f"\n处理完成！总耗时: {process_time}")
    
    return result

# 示例使用
if __name__ == "__main__":
    # 创建示例数据
    data = [
        {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "45211", "env_video_time": "38.0", "oper_time": "2024-11-06 00:32:29.130"},#2
        {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "45211", "env_video_time": "38.0", "oper_time": "2024-11-06 00:32:30.130"},#2       
        {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "48948", "env_video_time": "100.0", "oper_time": "2024-11-06 00:47:47"},#1
        {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "48082", "env_video_time": "72.0", "oper_time": "2024-11-07 00:23:46.269"},#3
        {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "44963", "env_video_time": "0.0", "oper_time": "2024-11-07 00:24:58"},#2
        {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "44964", "env_video_time": "0.0", "oper_time": "2024-11-07 00:24:59"}#1
    ]
    
    df = pd.DataFrame(data)
    
    # 生成标签
    labeled_df = generate_watch_count_labels_vectorized(df)
    
    print("\n处理结果示例:")
    print(labeled_df)

开始处理数据，总数据量: 6 条
正在处理时间戳...
总共需要处理 2 个用户日期组


处理进度: 100%|██████████████████████████████████████████████████████| 2/2 [00:00<00:00, 460.20it/s]


处理完成！总耗时: 0:00:00.025781

处理结果示例:
  statis_date  serv_number video_id env_video_time               oper_time  \
0    20241106  13487028369    45211           38.0 2024-11-06 00:32:29.130   
1    20241106  13487028369    45211           38.0 2024-11-06 00:32:30.130   
2    20241106  13487028369    48948          100.0 2024-11-06 00:47:47.000   
3    20241107  13548466410    48082           72.0 2024-11-07 00:23:46.269   
4    20241107  13548466410    44963            0.0 2024-11-07 00:24:58.000   
5    20241107  13548466410    44964            0.0 2024-11-07 00:24:59.000   

         date  label  
0  2024-11-06      3  
1  2024-11-06      2  
2  2024-11-06      1  
3  2024-11-07      3  
4  2024-11-07      2  
5  2024-11-07      1  





In [3]:
import pandas as pd 
from datetime import datetime
import numpy as np
from tqdm import tqdm

def generate_watch_count_labels_vectorized(df):
    """
    完全向量化的版本，排除 env_video_time <= 0 的记录
    """
    print(f"开始处理数据，总数据量: {len(df):,} 条")
    start_time = datetime.now()
    
    # 确保时间戳格式正确
    print("正在处理时间戳...")
    df['oper_time'] = pd.to_datetime(df['oper_time'], format='mixed')
    df['date'] = df['oper_time'].dt.date
    
    # 确保 env_video_time 为数值类型
    df['env_watch_time'] = pd.to_numeric(df['env_watch_time'], errors='coerce')
    
    result = df.copy()
    
    def count_remaining(group):
        group = group.sort_values('oper_time')
        times = group['oper_time'].values
        # 创建有效观看时长的掩码
        valid_mask = group['env_watch_time'].values > 0
        # 使用掩码创建计数矩阵
        count_matrix = (times.reshape(-1, 1) <= times.reshape(1, -1)) & valid_mask.reshape(1, -1)
        return np.sum(count_matrix, axis=1)
    
    # 获取分组信息并显示
    groups = df.groupby(['serv_number', 'date'])
    group_count = len(groups)
    print(f"总共需要处理 {group_count:,} 个用户日期组")
    
    # 使用tqdm包装groupby的apply操作
    tqdm.pandas(desc="处理进度", ncols=100)
    result['label'] = groups.progress_apply(
        lambda x: pd.Series(count_remaining(x), index=x.index)
    ).values
    
    # 计算总耗时
    end_time = datetime.now()
    process_time = end_time - start_time
    print(f"\n处理完成！总耗时: {process_time}")
    
    return result

# 示例使用
if __name__ == "__main__":
    # 创建示例数据
    data = [
        {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "45211", "env_video_time": "38.0", "oper_time": "2024-11-06 00:32:29.130","env_watch_time":7.0},#2
        {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "45211", "env_video_time": "38.0", "oper_time": "2024-11-06 00:32:30.130","env_watch_time":0.0},#2       
        {"statis_date": "20241106", "serv_number": "13487028369", "video_id": "48948", "env_video_time": "100.0", "oper_time": "2024-11-06 00:47:47","env_watch_time":5.0},#1
        {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "48082", "env_video_time": "72.0", "oper_time": "2024-11-07 00:23:46.269","env_watch_time":6.0},#3
        {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "44963", "env_video_time": "0.0", "oper_time": "2024-11-07 00:24:58","env_watch_time":0.0},#2
        {"statis_date": "20241107", "serv_number": "13548466410", "video_id": "44964", "env_video_time": "0.0", "oper_time": "2024-11-07 00:24:59","env_watch_time":2.0}#1
    ]
    
    df = pd.DataFrame(data)
    
    # 生成标签
    labeled_df = generate_watch_count_labels_vectorized(df)
    
    print("\n处理结果示例:")
    print(labeled_df)

开始处理数据，总数据量: 6 条
正在处理时间戳...
总共需要处理 2 个用户日期组


处理进度: 100%|██████████████████████████████████████████████████████| 2/2 [00:00<00:00, 217.37it/s]


处理完成！总耗时: 0:00:00.068279

处理结果示例:
  statis_date  serv_number video_id env_video_time               oper_time  \
0    20241106  13487028369    45211           38.0 2024-11-06 00:32:29.130   
1    20241106  13487028369    45211           38.0 2024-11-06 00:32:30.130   
2    20241106  13487028369    48948          100.0 2024-11-06 00:47:47.000   
3    20241107  13548466410    48082           72.0 2024-11-07 00:23:46.269   
4    20241107  13548466410    44963            0.0 2024-11-07 00:24:58.000   
5    20241107  13548466410    44964            0.0 2024-11-07 00:24:59.000   

   env_watch_time        date  label  
0             7.0  2024-11-06      2  
1             0.0  2024-11-06      1  
2             5.0  2024-11-06      1  
3             6.0  2024-11-07      2  
4             0.0  2024-11-07      1  
5             2.0  2024-11-07      1  





In [21]:
labeled_df

Unnamed: 0,statis_date,serv_number,video_id,env_video_time,oper_time,date,label
0,20241106,13487028369,45211,38.0,2024-11-06 00:32:29.130,2024-11-06,1
1,20241106,13457692971,44963,38.0,2024-11-06 00:32:37.000,2024-11-06,2
2,20241106,13487028369,48948,100.0,2024-11-06 00:47:47.000,2024-11-06,1
3,20241107,13548466410,48082,72.0,2024-11-07 00:23:46.269,2024-11-07,3
4,20241107,13548466410,44963,0.0,2024-11-07 00:24:58.000,2024-11-07,2
5,20241107,13548466410,44964,0.0,2024-11-07 00:24:59.000,2024-11-07,1


In [None]:
import os
from tqdm import tqdm
import pandas as pd

def split_dataframe(df, num_splits=20, save_dir='/data/zhujianhao/split_data'):
    """
    将DataFrame分割为指定数量的子集并保存，确保同一用户的数据不会被分到不同子集
    
    Args:
        df: 已经按serv_number和oper_time排序的DataFrame
        num_splits: 需要分割的份数，默认20
        save_dir: 保存分割数据的目录路径
    """
    print(f"开始分割数据集，总数据量: {len(df):,} 条")
    
    # 确保保存目录存在
    os.makedirs(save_dir, exist_ok=True)
    
    # 计算理想的每份大小
    base_size = len(df) // num_splits
    
    start_idx = 0
    split_info = []  # 用于存储每个分片的信息
    
    for split_num in tqdm(range(num_splits), desc="分割进度"):
        # 生成保存文件路径
        save_path = os.path.join(save_dir, f'sample_data_{split_num+1:02d}.csv')
        
        if split_num == num_splits - 1:
            # 最后一份直接取到结束
            split_df = df.iloc[start_idx:]
            split_df.to_csv(save_path, index=False)
            split_info.append({
                'split_num': split_num + 1,
                'size': len(split_df),
                'file_path': save_path
            })
            break
            
        # 计算当前分割的理想结束位置
        end_idx = start_idx + base_size
        
        if end_idx >= len(df):
            # 如果超出范围，直接添加剩余数据
            split_df = df.iloc[start_idx:]
            split_df.to_csv(save_path, index=False)
            split_info.append({
                'split_num': split_num + 1,
                'size': len(split_df),
                'file_path': save_path
            })
            break
            
        # 获取分割点的用户
        split_user = df.iloc[end_idx]['serv_number']
        
        # 向后查找不同用户的位置
        while end_idx < len(df) and df.iloc[end_idx]['serv_number'] == split_user:
            end_idx += 1
            
        # 分割并保存数据集
        split_df = df.iloc[start_idx:end_idx]
        split_df.to_csv(save_path, index=False)
        
        # 记录分割信息
        split_info.append({
            'split_num': split_num + 1,
            'size': len(split_df),
            'file_path': save_path
        })
        
        # 更新开始位置
        start_idx = end_idx
    
    # 打印分割结果信息
    print("\n分割完成！分割结果概要：")
    for info in split_info:
        print(f"第 {info['split_num']:02d} 份: {info['size']:,} 条记录 -> {info['file_path']}")
    
    # 保存分割信息到日志文件
    log_path = os.path.join(save_dir, 'split_info.txt')
    with open(log_path, 'w') as f:
        f.write(f"数据分割信息 - 总数据量: {len(df):,} 条\n")
        f.write("=" * 50 + "\n")
        for info in split_info:
            f.write(f"分片 {info['split_num']:02d}:\n")
            f.write(f"  记录数: {info['size']:,}\n")
            f.write(f"  文件路径: {info['file_path']}\n")
            f.write("-" * 30 + "\n")
    
    return split_info

# 使用示例
if __name__ == "__main__":
    # 假设df是已经排序的DataFrame
    # 进行分割并保存
    split_info = split_dataframe(df, num_splits=20)
    
    print(f"\n分割信息已保存到: {os.path.join('/data/zhujianhao/split_data', 'split_info.txt')}")