In [2]:
import os.path
from typing import Optional

import numpy as np
import pandas as pd


In [3]:
DATA_DIR = "/data/l1hy/ali-cluster/cluster-trace-gpu-v2020/data"

def read_csv_with_header(
    file_path: str,
    header: Optional[list[str]] = None
) -> pd.DataFrame:
    """读取 CSV 文件并处理表头

    Args:
        file_path: CSV 文件路径
        header: 可选的表头列表。如果为 None, 则从对应的 .header 文件读取

    Returns:
        pd.DataFrame: 读取并设置好表头的数据框

    Raises:
        FileNotFoundError: 当 CSV 文件或对应的 header 文件不存在时
        pd.errors.EmptyDataError: 当 CSV 文件为空时
    """
    df = pd.read_csv(file_path, header=None)
    df.columns = (pd.read_csv("{}.header".format(file_path.split('.csv')[0])).columns
                 if header is None else header)
    return df

In [4]:
def get_model_configs():
    """获取预定义的模型配置"""
    return [
        {
            "model_name": "ResNet50",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "ImageNet",
            "dataset_size": 150000,  # ImageNet原始数据集约150GB
            "model_size": 98,  # ResNet50标准模型大小约98MB
        },
        {
            "model_name": "MobileNetV3",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "ImageNet",
            "dataset_size": 150000,
            "model_size": 22,  # MobileNetV3轻量化模型典型大小
        },
        {
            "model_name": "ResNet18",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "CIFAR-10",
            "dataset_size": 170,  # CIFAR-10压缩包标准大小约170MB
            "model_size": 45,  # ResNet18典型参数量对应约45MB
        },
        {
            "model_name": "MobileNetV2",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "CIFAR-10",
            "dataset_size": 170,
            "model_size": 14,  # MobileNetV2轻量化版本典型大小
        },
        {
            "model_name": "EfficientNet",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "CIFAR-10",
            "dataset_size": 170,
            "model_size": 29,  # EfficientNet-B0基准模型大小
        },
        {
            "model_name": "VGG11",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "CIFAR-10",
            "dataset_size": 170,
            "model_size": 507,  # VGG11典型参数量对应约507MB
        },
        {
            "model_name": "DCGAN",
            "task_type": "Image Generation",
            "batch_sizes": [64, 128, 256],
            "dataset_name": "LSUN",
            "dataset_size": 42000,  # LSUN官方发布版本约42GB
            "model_size": 45,  # DCGAN基础架构典型模型大小
        },
        {
            "model_name": "PointNet",
            "task_type": "3D Point Cloud Processing",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "ShapeNet",
            "dataset_size": 30000,  # ShapeNet Core55版本约30GB
            "model_size": 40,  # PointNet基础模型参数量对应约40MB
        },
        {
            "model_name": "BERT",
            "task_type": "Question Answering",
            "batch_sizes": [32],
            "dataset_name": "SQuAD",
            "dataset_size": 35000,  # SQuAD v2.0预处理后约35GB
            "model_size": 1200,  # BERT-Base英文版约1.2GB
        },
        {
            "model_name": "LSTM",
            "task_type": "Language Modeling",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "Wikitext2",
            "dataset_size": 5000,  # Wikitext2原始文本约5GB
            "model_size": 35,  # 单层LSTM典型参数量对应约35MB
        },
        {
            "model_name": "Transformer",
            "task_type": "Machine Translation",
            "batch_sizes": [16, 32, 64],
            "dataset_name": "Multi30k",
            "dataset_size": 3000,  # Multi30k标准版本约3GB
            "model_size": 85,  # 基础Transformer模型参数量对应约85MB
        },
    ]


def get_micro_task_configs():
    """获取微任务的配置"""
    return [
        {
            "model_name": "ResNet50",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "ImageNet",
            "dataset_size": 150000,  # ImageNet原始数据集约150GB
            "model_size": 98,  # ResNet50标准模型大小约98MB
        },
        {
            "model_name": "ResNet18",
            "task_type": "Image Classification",
            "batch_sizes": [32, 64, 128],
            "dataset_name": "CIFAR-10",
            "dataset_size": 170,  # CIFAR-10压缩包标准大小约170MB
            "model_size": 45,  # ResNet18典型参数量对应约45MB
        },
    ]


In [5]:
def preprocess_training_data(task_df) -> pd.DataFrame:
    """数据预处理：筛选和清理训练任务数据

    对原始任务数据进行预处理，包括：
    1. 合并任务和作业数据
    2. 清理无效时间戳
    3. 筛选有效训练任务
    4. 标准化时间
    5. 统一GPU类型

    Args:
        task_df: 任务数据表
        job_df: 作业数据表

    Returns:
        DataFrame: 处理后的训练任务数据，包含以下主要字段：
            - job_name: 作业名称
            - gpu_type: GPU类型(V100或T4)
            - runtime: 运行时长
            - norm_job_submit_time: 标准化后的作业提交时间
    """
    # 常量定义
    MIN_RUNTIME_SECONDS = 1000.0  # 最小运行时间（秒）
    VALID_TASK_TYPES = ['tensorflow', 'PyTorchWorker', 'worker']


    # 处理无效的时间戳
    task_df.loc[task_df.start_time == 0, ['start_time', 'end_time']] = np.nan
    task_df['runtime'] = task_df.end_time - task_df.start_time

    # 筛选有效的训练任务
    valid_tasks = task_df[
        (task_df['status'] == 'Terminated') &             # 已完成的任务
        (task_df['gpu_type'] != 'MISC') &                 # 排除杂项GPU类型
        (task_df['plan_gpu'] == 100) &                    # 完整GPU使用
        (task_df['runtime'] >= MIN_RUNTIME_SECONDS) &     # 运行时间足够长
        (task_df['inst_num'] <= 8) &                     # 实例数量小于8
        (task_df['task_name'].isin(VALID_TASK_TYPES))     # 有效的任务类型
    ]

    # 按提交时间排序并标准化
    valid_tasks = valid_tasks.sort_values(['start_time'])

    # 去重并统一GPU类型名称
    valid_tasks.loc[valid_tasks.gpu_type == 'V100M32', 'gpu_type'] = 'V100'

    return valid_tasks

In [6]:
import random


def sample_tasks_random(task_df: pd.DataFrame, jobs_count: int) -> pd.DataFrame:
    """
    从任务数据中纯随机采样指定数量的任务

    Args:
        task_df: 任务数据表
        jobs_count: 需要采样的任务总数量

    Returns:
        pd.DataFrame: 随机采样后的任务数据表
    """
    # 确保采样数量不超过可用数据量
    sample_size = min(jobs_count, len(task_df))

    # 直接随机采样
    sampled_tasks = task_df.sample(n=sample_size)

    # 统计采样结果
    single_gpu_count = len(sampled_tasks[sampled_tasks['inst_num'] <= 4])
    multi_gpu_count = len(sampled_tasks[sampled_tasks['inst_num'] > 4])

    # 打印采样统计信息
    print(f"采样统计: 总任务数 {len(sampled_tasks)}, "
          f"单卡任务 {single_gpu_count} ({single_gpu_count/len(sampled_tasks):.1%}), "
          f"多卡任务 {multi_gpu_count} ({multi_gpu_count/len(sampled_tasks):.1%})")

    return sampled_tasks

def sample_tasks_with_ratio(task_df: pd.DataFrame, jobs_count: int, multi_gpu_ratio: float = 0.5) -> pd.DataFrame:
    """
    从任务数据中采样指定数量的任务，并提高多卡任务的比例

    Args:
        task_df: 任务数据表
        jobs_count: 采样任务总数量
        multi_gpu_ratio: 多卡任务在采样中的目标比例，默认为0.5
    Returns:
        pd.DataFrame: 采样后的任务数据表，多卡任务比例提高
    """
    # 将任务分为单卡和多卡两组
    single_gpu_tasks = task_df[task_df['inst_num'] <= 4]
    multi_gpu_tasks = task_df[task_df['inst_num'] > 4]

    # 计算需要的多卡和单卡任务数量
    multi_gpu_count = int(jobs_count * multi_gpu_ratio)
    single_gpu_count = jobs_count - multi_gpu_count

    # 如果任一组的任务数量不足，调整采样数量
    if len(multi_gpu_tasks) < multi_gpu_count:
        multi_gpu_count = len(multi_gpu_tasks)
        single_gpu_count = jobs_count - multi_gpu_count

    if len(single_gpu_tasks) < single_gpu_count:
        single_gpu_count = len(single_gpu_tasks)
        multi_gpu_count = jobs_count - single_gpu_count

    # 分别从两组中采样
    sampled_single = single_gpu_tasks.sample(n=single_gpu_count) if single_gpu_count > 0 else pd.DataFrame()
    sampled_multi = multi_gpu_tasks.sample(n=multi_gpu_count) if multi_gpu_count > 0 else pd.DataFrame()

    # 合并结果
    sampled_tasks = pd.concat([sampled_single, sampled_multi])

    # 打印采样统计信息
    print(f"采样统计: 总任务数 {len(sampled_tasks)}, 单卡任务 {len(sampled_single)} ({len(sampled_single)/len(sampled_tasks):.1%}), "
          f"多卡任务 {len(sampled_multi)} ({len(sampled_multi)/len(sampled_tasks):.1%})")

    return sampled_tasks

def wrap_task_runtimes(task_df: pd.DataFrame) -> pd.DataFrame:
    """生成不同 GPU 类型的运行时间"""
    t4_performance = 8.1
    p100_performance = 9.3
    v100_performance = 15.7

    runtimes = {
        'T4': (1, 1),
        'P100': (t4_performance / p100_performance, t4_performance / p100_performance),
        'V100': (t4_performance / v100_performance, t4_performance / v100_performance),
    }
    gpu_types = runtimes.keys()

    def gen_runtime(from_gpu, to_gpu, origin_runtime):
        if from_gpu == to_gpu:
            return origin_runtime
        if from_gpu not in gpu_types:
            print("not in gpu_types:", from_gpu)
        to_rand = random.uniform(*runtimes[to_gpu])
        from_rand = random.uniform(*runtimes[from_gpu])
        return int(origin_runtime * to_rand / from_rand)

    for gpu_type in gpu_types:
        task_df[f'runtime_{gpu_type}'] = task_df.apply(
            lambda row, gpu_type=gpu_type: gen_runtime(row['gpu_type'], gpu_type, row['runtime']),
            axis=1
        )

    return task_df

def wrap_task_name(task_df: pd.DataFrame, model_configs: list[dict]) -> pd.DataFrame:
    # 为每个任务随机分配一个模型
    def assign_model(row):
        model_config = random.choice(model_configs)
        return model_config['model_name']

    # 应用模型分配
    task_df['task_name'] = task_df.apply(assign_model, axis=1)
    return task_df

def to_csv(df, name):
    df = df.reset_index(drop=True)
    df.to_csv(name)

In [7]:
def sample_micro_tasks_random(task_df: pd.DataFrame, jobs_count: int, ratio: float = 0.5) -> pd.DataFrame:
    # 将任务分为单卡和多卡两组
    single_gpu_tasks = task_df[task_df['inst_num'] == 1]
    two_gpu_tasks = task_df[task_df['inst_num'] == 2]

    # 计算需要的多卡和单卡任务数量
    two_gpu_count = int(jobs_count * ratio)
    single_gpu_count = jobs_count - two_gpu_count

    # 如果任一组的任务数量不足，调整采样数量
    if len(two_gpu_tasks) < two_gpu_count:
        two_gpu_count = len(two_gpu_tasks)
        single_gpu_count = jobs_count - two_gpu_count

    if len(single_gpu_tasks) < single_gpu_count:
        single_gpu_count = len(single_gpu_tasks)
        two_gpu_count = jobs_count - single_gpu_count

    # 分别从两组中采样
    sampled_single = single_gpu_tasks.sample(n=single_gpu_count) if single_gpu_count > 0 else pd.DataFrame()
    sampled_two = two_gpu_tasks.sample(n=two_gpu_count) if two_gpu_count > 0 else pd.DataFrame()

    sampled_micro_tasks = pd.concat([sampled_single, sampled_two])
    # 随机分布一下
    sampled_micro_tasks = sampled_micro_tasks.sample(frac=1).reset_index(drop=True)
    # 随机生成运行时间，<2000s
    sampled_micro_tasks['runtime'] = np.random.randint(100, 600, size=len(sampled_micro_tasks))

    return sampled_micro_tasks

In [None]:
def generate_task_config(valid_task_df, jobs_count: int, task_type: str):
    all_model_configs = get_model_configs()
    micro_model_configs = get_micro_task_configs()
    """生成任务配置"""
    if task_type == "random":
        random_task_df = sample_tasks_random(valid_task_df, jobs_count)
        random_task_wrap_runtimes_df = wrap_task_runtimes(random_task_df)
        random_task_wrap_runtimes_df = wrap_task_name(random_task_wrap_runtimes_df, all_model_configs)
        to_csv(random_task_wrap_runtimes_df, f"case_random_{jobs_count}_tasks.csv")
        print(f"task_file:{f'case_random_{jobs_count}_tasks.csv'} generated")
    elif task_type == "light":
        light_task_df = sample_tasks_with_ratio(valid_task_df, jobs_count, 0.1)
        light_task_wrap_runtimes_df = wrap_task_runtimes(light_task_df)
        light_task_wrap_runtimes_df = wrap_task_name(light_task_wrap_runtimes_df, all_model_configs)
        to_csv(light_task_wrap_runtimes_df, f"case_light_{jobs_count}_tasks.csv")
        print(f"task_file:{f'case_light_{jobs_count}_tasks.csv'} generated")
    elif task_type == "heavy":
        heavy_task_df = sample_tasks_with_ratio(valid_task_df, jobs_count, 0.5)
        heavy_task_wrap_runtimes_df = wrap_task_runtimes(heavy_task_df)
        heavy_task_wrap_runtimes_df = wrap_task_name(heavy_task_wrap_runtimes_df, all_model_configs)
        to_csv(heavy_task_wrap_runtimes_df, f"case_heavy_{jobs_count}_tasks.csv")
        print(f"task_file:{f'case_heavy_{jobs_count}_tasks.csv'} generated")
    elif task_type == "micro":
        micro_task_df = sample_micro_tasks_random(valid_task_df, jobs_count)
        micro_task_wrap_runtimes_df = wrap_task_runtimes(micro_task_df)
        micro_task_wrap_runtimes_df = wrap_task_name(micro_task_wrap_runtimes_df, micro_model_configs)
        to_csv(micro_task_wrap_runtimes_df, f"case_micro_{jobs_count}_tasks.csv")
        print(f"task_file:{f'case_micro_{jobs_count}_tasks.csv'} generated")

def generate_simulator_task_config():
    jobs_count_list = [1000, 1300, 1600, 1900, 2200, 2500]

    task_df = read_csv_with_header(os.path.join(DATA_DIR, "pai_task_table.csv"))
    valid_task_df = preprocess_training_data(task_df)

    base_jobs_count = 2500
    random_base_task_df = sample_tasks_random(valid_task_df, base_jobs_count)
    light_base_task_df = sample_tasks_with_ratio(valid_task_df, base_jobs_count, 0.1)
    heavy_base_task_df = sample_tasks_with_ratio(valid_task_df, base_jobs_count, 0.5)

    for jobs_count in jobs_count_list:
        generate_task_config(random_base_task_df, jobs_count, "random")
        generate_task_config(light_base_task_df, jobs_count, "light")
        generate_task_config(heavy_base_task_df, jobs_count, "heavy")

def generate_micro_task_config():
    jobs_count_list = [10, 20, 30, 40, 50, 60, 70, 80, 90, 100]

    task_df = read_csv_with_header(os.path.join(DATA_DIR, "pai_task_table.csv"))
    valid_task_df = preprocess_training_data(task_df)
    for jobs_count in jobs_count_list:
        generate_task_config(valid_task_df, jobs_count, "micro")

if __name__ == '__main__':
    # generate_simulator_task_config()
    generate_micro_task_config()