In [1]:
import os.path

import pandas as pd
import numpy as np
import random

In [2]:
DATA_DIR = "~/datasets/ali-cluster/cluster-trace-gpu-v2020/data"

def get_df(file, header=None):
    df = pd.read_csv(file, header=None)
    # df.columns = DF_HEADER.get(key, df.columns)
    df.columns = pd.read_csv("{}.header".format(file.split('.csv')[0])).columns if header is None else header
    return df

In [46]:
def get_df_one_inst(dft, dfj):
    # 过滤掉太短的任务，它们有可能是inference任务。
    min_run_time = 1000.
    dfa = dft.merge(dfj, on=['job_name'], suffixes = ['','_j'])
    dfa.loc[dfa.start_time==0, 'start_time'] = np.nan
    dfa.loc[dfa.start_time==0, 'end_time'] = np.nan
    dfa['runtime'] = dfa.end_time - dfa.start_time
    df_target = dfa[
        (dfa['status'] == 'Terminated') &
        (dfa['gpu_type'] != 'MISC') &
        (dfa['plan_gpu'] == 100.0) &
        (dfa['inst_num'] == 1.0) &
        (dfa['end_time'] - dfa['start_time'] >= min_run_time) &
        (dfa['task_name'].isin(['tensorflow', 'PyTorchWorker', 'worker']))]
    df_target = df_target.sort_values(['start_time_j'])
    df_target['norm_job_submit_time'] = df_target.start_time_j - df_target.iloc[0]['start_time_j'].item()
    df_target = df_target.drop_duplicates(['job_name'], keep='first')
    print(df_target[df_target['gpu_type'] == 'V100M32'].shape)
    print("T4 shape", df_target[df_target['gpu_type'] == 'T4'].shape)
    df_target.loc[df_target.gpu_type == 'V100M32', 'gpu_type'] = 'V100'
    return df_target

In [47]:
def gen_ddl_and_gpu_runtimes(df_one_inst):
    gpu_types = [str(t) for t in df_one_inst['gpu_type'].unique()]
    """
    configs
    """
    runtimes = {
        'T4': (1, 1),
        'P100': (1.4, 2.),
        'V100': (2.4, 2.66),
    }
    assert set(runtimes.keys()) == set(list(gpu_types))
    ddl_ratio = 10
    ddl_range = (1.2, 3.0)
    jobs_count = 5000
    submit_together = True
    df_one_inst = df_one_inst[['job_name', 'gpu_type', 'runtime', 'norm_job_submit_time']]
    df_one_inst = df_one_inst[:jobs_count]

    print(df_one_inst.shape)
    print("print", df_one_inst.iloc[0:100, :])
    """
    end configs
    """

    def gen_runtime(from_gpu, to_gpu, origin_runtime):
        if from_gpu == to_gpu:
            return origin_runtime
        if from_gpu not in gpu_types:
            print("not in gpu_types:", from_gpu)
        to_rand = random.uniform(*runtimes[to_gpu])
        from_rand = random.uniform(*runtimes[from_gpu])
        return int(origin_runtime * to_rand / from_rand)

    def gen_ddl(norm_submit_time, runtime):
        if random.randint(0, 100) < ddl_ratio:
            return int(norm_submit_time + runtime * random.uniform(*ddl_range))
        return np.inf

    for gpu_type in gpu_types:
        df_one_inst.loc[:, gpu_type] = df_one_inst.apply(lambda x: gen_runtime(x.gpu_type, gpu_type, x.runtime), axis=1)
    df_one_inst.loc[:, 'ddl'] = df_one_inst.apply(lambda x: gen_ddl(x.norm_job_submit_time, x.runtime), axis=1)

    if submit_together:
        df_one_inst = df_one_inst.iloc[np.random.permutation(len(df_one_inst))]
        df_one_inst.loc[:, 'ddl'] = df_one_inst.apply(lambda x: x['ddl'] - x['norm_job_submit_time'], axis=1)
        df_one_inst.loc[:, 'norm_job_submit_time'] = df_one_inst.apply(lambda x: 0, axis=1)
    df_output = df_one_inst[['job_name', 'norm_job_submit_time', 'ddl', *gpu_types]]
    print("output: ", df_output.shape)
    to_csv(df_output, f"case_{jobs_count}_all_{ddl_ratio}_ddl.csv")

In [48]:
def to_csv(df, name):
    df = df.reset_index(drop=True)
    df.to_csv(name)

In [49]:
def main():
    dft = get_df(os.path.join(DATA_DIR, "pai_task_table.csv"))
    dfj = get_df(os.path.join(DATA_DIR, "pai_job_table.csv"))
    df = get_df_one_inst(dft, dfj)
    gen_ddl_and_gpu_runtimes(df)

if __name__ == '__main__':
    main()

V100M32 shape (3603, 17)
V100 shape (3851, 17)
T4 shape (10988, 17)
P100 shape (17795, 17)
(5000, 4)
print                         job_name gpu_type   runtime  norm_job_submit_time
273363  34b3d819023ea21e28afd50f       T4  176371.0                   0.0
200169  af3b0f5e810838d6c33840e1       T4  109920.0               65724.0
88329   8b3581e39780f46e28e48c4a     P100  240371.0               66246.0
296404  e3789c5e546bbbe82007127f     P100  232288.0               66313.0
388556  8dc5b00629f823cf8d30cade     P100  157663.0               66385.0
...                          ...      ...       ...                   ...
343013  dbbc7b184fb67cf1ff4254b5       T4    1190.0              184504.0
426284  ea3bd3de4cb78d2bd1a06619     P100    1188.0              185251.0
413142  5487ca4ba4dbb22cda463042     P100    1157.0              185313.0
277222  493650beeaaa64eb0504c911     P100    1476.0              185338.0
453788  4f0246b30625281fa3d0b537     P100    2081.0              185372.0

[100

In [49]:

dft = get_df(os.path.join(DATA_DIR, "pai_task_table.csv"))
dfj = get_df(os.path.join(DATA_DIR, "pai_job_table.csv"))
df_one_inst = get_df_one_inst(dft, dfj)

(6278, 17)


In [50]:
gen_ddl_and_gpu_runtimes(df_one_inst)

Empty DataFrame
Columns: [job_name, gpu_type, runtime, norm_job_submit_time]
Index: []
Empty DataFrame
Columns: [job_name, norm_job_submit_time, ddl, T4, P100, V100]
Index: []
