In [1]:
import pandas as pd
import numpy as np
import datetime
import json
import os
import itertools
import glob
import copy
import ray


out_dir="./log/"
ray.init()

def replace_str(target):
    target = target.replace('\n', '')
    target = target.replace(',', '')
    return target

def find_value(arr, target, jumpto=1):
    try:
        num = replace_str(arr[arr.index(target)+jumpto])
    except:
        raise ValueError(
            f'{arr}, {target}')
    return num

with open('./sim_conf.json') as json_file:
    conf = json.load(json_file)


conf_share=ray.put(conf)
# TODO: fix directory finding as input
# origin_filename = "fsNprepNloadNtrain_Imagenet_default_resnet50_epoch5_b2048_worker12_thread0"
# origin_filename = "fsNprepNloadNtrain_Imagenet_randaugment_resnet50_epoch5_b2048_worker12_thread0"

origin_filename = conf["FILE"]
# origin_filename = "fsNprepNloadNtrain_openimage_default_resnet50_epoch1_b2048_worker12_thread0"

simp_filename = f'{origin_filename}_simp'
fetch_filename = f"{simp_filename}_fetchdifftime"
startdiff_filename = f"{simp_filename}_fetchstartdifftime"
parse_dir = "../dsanalyzer_parsed/DDP4GPUFULLTRACE/{suffix}/{filename}.csv"

perf_datafile = parse_dir.format(suffix="", filename= origin_filename)

simp_datafile = parse_dir.format(suffix="simp", filename= simp_filename)
fetch_datafile = parse_dir.format(suffix="simp", filename= fetch_filename)
startdiff_datafile = parse_dir.format(suffix="simp", filename= startdiff_filename)

perf_df= pd.read_csv(perf_datafile, index_col=0)
simp_df = pd.read_csv(simp_datafile, index_col=0)
fetch_df = pd.read_csv(fetch_datafile, index_col=0)
start_df = pd.read_csv(startdiff_datafile, index_col=0)

2021-08-27 13:36:03,375	INFO services.py:1245 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [2]:
# Train type and setup
info = perf_datafile.split('/')

trainType = find_value(info, "dsanalyzer_parsed")

if trainType.find("2GPU") != -1: 
    gpu_num = 2
if trainType.find("4GPU") != -1:
    gpu_num = 4
elif trainType.find("DDP") != -1:
    gpu_num = 8
else:
    gpu_num = 1
    
trainType_share = ray.put(trainType)

# Fetch time align

In [3]:
# Make gpu columns for simulation
gpu_start_col = []
gpu_fetch_col = []
gpu_fetch_done_col = []
gpu_training_stall_col = []

single_gpu_prefetch_count = []
train_col= ["Epoch", "Index number"]

for i in range(gpu_num):
    gpu_start_col.append(f"Start time_gpu{i}")
    gpu_fetch_col.append(f"Fetch time (sec)_gpu{i}")
    gpu_fetch_done_col.append(f"Fetch done time (sec)_gpu{i}")
    gpu_training_stall_col.append(f"Training Stall time (sec)_gpu{i}")
    
    start_df[gpu_start_col[i]] = pd.to_datetime(
                                    start_df[gpu_start_col[i]], format='%Y-%m-%d %H:%M:%S.%f', errors='ignore')

# Init simulation informations
simulation_df = pd.DataFrame()

simulation_df[train_col] = fetch_df[train_col]

simulation_df = simulation_df.merge(right=fetch_df[train_col+gpu_fetch_col], 
                                    on = train_col)
simulation_df = simulation_df.merge(right=start_df[train_col+gpu_start_col], 
                                    on = train_col)

simulation_df["Min fetch time (sec)"] = fetch_df[gpu_fetch_col].min(axis=1)
simulation_df["Avg fetch time (sec)"] = fetch_df[gpu_fetch_col].mean(axis=1)
simulation_df["Min start time"] = start_df[gpu_start_col].min(axis=1)

simulation_df.sort_values(by=train_col, inplace = True)
simulation_df.reset_index(inplace = True, drop = True)
simulation_df = simulation_df[simulation_df["Epoch"] == 2]
max_index_number = simulation_df["Index number"].max()
max_index_number_share = ray.put(max_index_number)

simulation_df_share = ray.put(simulation_df)

In [4]:
columns =  ["Epoch", "Index number"] + gpu_start_col + gpu_fetch_col + gpu_fetch_done_col

# Preprocessing time align
FETCH_ALIGNs = conf["FETCH_ALIGN"]
# Preprocessing start time align
START_ALIGNs = conf["START_ALIGN"]



def simulate_dataload(FETCH_ALIGN, START_ALIGN, idx, row, sim_dataload_df, sim_train_df):
    simulated_gpu_info = {}
    
    for i in range(gpu_num):
        # Simulate based on the number of workers
        if idx < conf["WORKER_NUM"]:
            next_fetch_start_time = row["Min start time"] if START_ALIGN else row[gpu_start_col[i]]
        else:
            # [NOTE]: 
            # If fetch align, align with the fastest previous fetch time 
            # regardless each previous time (can be overlapped)
            # If not, follow the previous fetch done time
            next_fetch_start_time = sim_dataload_df.loc[idx-conf["WORKER_NUM"]][gpu_fetch_done_col].min() if START_ALIGN else sim_dataload_df.loc[idx-conf["WORKER_NUM"], f"Fetch done time (sec)_gpu{i}"]
            next_fetch_start_time += pd.Timedelta(conf["FETCH_DELAY"], 's')
            
            prefetch_step = conf["WORKER_NUM"]*conf["PREFETCH_FACTOR"]
            if prefetch_step <= idx:
                prefetch_bound_time = sim_train_df.loc[idx-prefetch_step+1]["Training end time"]
                
                if next_fetch_start_time < prefetch_bound_time:
                    next_fetch_start_time = prefetch_bound_time + pd.Timedelta(conf["PREFETCH_DELAY"], 's')
                    
        simulated_gpu_info[gpu_start_col[i]] = next_fetch_start_time
#         print(next_fetch_start_time)
        if FETCH_ALIGN == "avg":
            fetch_time = row["Avg fetch time (sec)"]
        elif FETCH_ALIGN == "min":
            fetch_time = row["Min fetch time (sec)"]
        else:
            fetch_time = row[gpu_fetch_col[i]]
            
        simulated_gpu_info[gpu_fetch_col[i]] = fetch_time
                                                                                                                 
        simulated_gpu_info[gpu_fetch_done_col[i]] = simulated_gpu_info[gpu_start_col[i]] + datetime.timedelta(seconds = simulated_gpu_info[gpu_fetch_col[i]])
    return simulated_gpu_info

def simulate_training(idx, sim_dataload_df, sim_train_df):
    # FIXME: Manually tune with idx
    slowest_fetch_time = sim_dataload_df.loc[idx][gpu_fetch_done_col].max()

    if idx == 0:
        training_end_time = slowest_fetch_time +  pd.Timedelta(conf["TRAINING_TIME"], 's')
        pre_training_end_time = training_end_time
    else:
        pre_training_end_time = sim_train_df.loc[idx, "Training end time"]
        if pre_training_end_time > slowest_fetch_time:
            training_end_time = pre_training_end_time + pd.Timedelta(conf["PADDING_TIME"] + conf["TRAINING_TIME"], 's')
        else:
            training_end_time = slowest_fetch_time + pd.Timedelta(conf["PADDING_TIME"] + conf["TRAINING_TIME"], 's')
    
    gpu_fetchs = sim_dataload_df.loc[idx][gpu_fetch_done_col]
    training_stall_times = gpu_fetchs - np.array([pre_training_end_time])
#     print(training_stall_times)
   
    done_row = {"Slowest end time":slowest_fetch_time, "Training end time":training_end_time }
    training_stall_max = 0
    training_stall_min = 9999999.0
    for i in range(len(training_stall_times)):
        training_stall_seconds = training_stall_times[i].total_seconds()
#         print(training_stall_seconds)
        if training_stall_seconds <= 0:
            training_stall_new_val = 0
        else:
            training_stall_new_val = training_stall_seconds
        
        done_row[f"Training Stall time (sec)_gpu{i}"] = training_stall_new_val
        
        if training_stall_max < training_stall_new_val:
            training_stall_max = training_stall_new_val
            
        if training_stall_min == 9999999.0 or training_stall_max < training_stall_new_val:
            training_stall_min = training_stall_new_val

    delay_time = training_stall_max - training_stall_min
    done_row["Delay time (sec)"] = delay_time
    
    done_row["Training stall time (sec)"] = training_stall_max
    
    return done_row

@ray.remote
def simulation(FETCH_ALIGN,START_ALIGN, conf, trainType, simulation_df, max_index_number):
    simulated_df = pd.DataFrame(columns=columns)
    done_df = pd.DataFrame(columns=["Slowest end time", "Training end time", "Delay time (sec)","Training stall time (sec)"]+gpu_training_stall_col)
    
    for idx, row in simulation_df.iterrows():
        simulated_row = {}
        simulated_row[train_col[0]] = row[train_col[0]]
        simulated_row[train_col[1]] = row[train_col[1]]
        
        simulated_row.update(simulate_dataload(FETCH_ALIGN, START_ALIGN, idx, row, simulated_df, done_df))

        simulated_df.loc[len(simulated_df)] = simulated_row
#         print(done_row)
        done_df.loc[len(simulated_df)] = simulate_training(idx, simulated_df, done_df)
#     print(done_df)   
    sim_train_df = pd.DataFrame()
    iterTime = done_df["Training end time"][1:].reset_index(drop = True) - done_df["Training end time"][:max_index_number].reset_index(drop = True)
    iterTime = iterTime / np.timedelta64(1, 's')
    out = simulated_df.loc[0][gpu_fetch_col].max()+ conf["TRAINING_TIME"]
    first = pd.Series(out)
    iterTime= pd.concat([first,iterTime]).reset_index(drop=True)
    sim_train_df[train_col] = simulated_df[train_col]
    sim_train_df["Iteration time (sec)"] = iterTime
    sim_train_df[gpu_training_stall_col] = done_df[gpu_training_stall_col].reset_index(drop=True)
    sim_train_df["Training stall time (sec)"] = done_df["Training stall time (sec)"].reset_index(drop=True)
    sim_train_df["Delay time (sec)"] = done_df["Delay time (sec)"].reset_index(drop=True)
    sim_train_df["Batch fetch time diff (sec)"]  = (simulated_df[gpu_fetch_done_col].max(axis=1) - simulated_df[gpu_fetch_done_col].min(axis=1)).dt.total_seconds()
    sim_train_df["Batch fetch start time diff (sec)"] = (simulated_df[gpu_start_col].max(axis=1) - simulated_df[gpu_start_col].min(axis=1)).dt.total_seconds()
    
    os.makedirs(f"./{trainType}/FETCH_ALIGN_{FETCH_ALIGN}_START_ALIGN_{START_ALIGN}/debug", exist_ok=True)
    
    sim_train_df.to_csv(f"./{trainType}/FETCH_ALIGN_{FETCH_ALIGN}_START_ALIGN_{START_ALIGN}/{origin_filename}.csv")
    
    # For debugging
    sanity_check_df = done_df[done_df["Slowest end time"] > done_df["Training end time"]]
    if not sanity_check_df.empty:
        print(f"Error in this FETCH_ALIGN_{FETCH_ALIGN}_START_ALIGN_{START_ALIGN}")
        print(done_df[done_df["Slowest end time"] > done_df["Training end time"]])
    simulated_df.to_csv(f"./{trainType}/FETCH_ALIGN_{FETCH_ALIGN}_START_ALIGN_{START_ALIGN}/debug/dataload_sim.csv")
    done_df.to_csv(f"./{trainType}/FETCH_ALIGN_{FETCH_ALIGN}_START_ALIGN_{START_ALIGN}/debug/train_sim.csv")
    return simulated_df

# Initialization
conf_combinations = list(itertools.product(FETCH_ALIGNs, START_ALIGNs))
result_ids = []
for FETCH_ALIGN, START_ALIGN in conf_combinations:
    result_ids.append(simulation.remote(FETCH_ALIGN,START_ALIGN, conf_share, 
                                        trainType_share, simulation_df_share, max_index_number_share))
    
while len(result_ids):
    done_id, result_ids = ray.wait(result_ids)
    ray.get(done_id)
#     # Debug break
#     break

RayTaskError(TypeError): [36mray::simulation()[39m (pid=64126, ip=10.20.22.156)
  File "python/ray/_raylet.pyx", line 534, in ray._raylet.execute_task
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-4-9b1befbba0b7>", line 107, in simulation
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/series.py", line 966, in __getitem__
    return self._get_with(key)
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/series.py", line 973, in _get_with
    slobj = self.index._convert_slice_indexer(key, kind="getitem")
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3753, in _convert_slice_indexer
    indexer = self.slice_indexer(start, stop, step)
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 5686, in slice_indexer
    start_slice, end_slice = self.slice_locs(start, end, step=step)
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 5894, in slice_locs
    end_slice = self.get_slice_bound(end, "right")
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 5798, in get_slice_bound
    label = self._maybe_cast_slice_bound(label, side)
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 5750, in _maybe_cast_slice_bound
    raise self._invalid_indexer("slice", label)
TypeError: cannot do slice indexing on Index with these indexers [nan] of type float

In [None]:
dir_names = glob.glob(
               f"./{trainType}/*/*.csv")

dataset_col_name = ["Align","Log type", "Dataset size(GB)", "Model", "Augmentation", "Worker", "Worker batch size", "Epoch", "Batch size", "Avg iteration time (sec)",
                                "Avg throughput (images/sec)", "Avg training stall time (sec)", "Avg delay time (sec)", "Avg preprocessing start diff (sec)", "Avg preprocessing diff (sec)"]
total_log = [] 

for logdir in dir_names:
    split_logdir = logdir.split('/')
    align_type = split_logdir[-2]
    parse_filename = split_logdir[-1]
    logdir_list = parse_filename.split("_")
#         print(logdir_list)
    logtype = logdir_list[0]
    dataset = logdir_list[1]
    aug = find_value(logdir_list, dataset)
    model = find_value(logdir_list, aug)
    epoch = find_value(logdir_list, model)
    batchsize = find_value(logdir_list, epoch)
    worker = find_value(logdir_list, batchsize)
    thread = find_value(logdir_list, worker).replace(".csv","")
    model = model.replace("_", "")

    epoch_num = int(epoch.replace("epoch", ""))
    batchsize_num = int(batchsize.replace("b", ""))
    single_batchsize_num = batchsize_num/gpu_num
    conf["WORKER_NUM"] = int(worker.replace("worker", ""))
    thread_num = int(thread.replace("thread", ""))

    df = pd.read_csv(logdir, index_col=None)
    df = df[df["Index number"]>10]
#     gpu_log = []

#     for i in range(gpu_num):
#         gpu_df = df[df["GPU"] == i].dropna()
#         gpu_epoch_time = f'{round(gpu_df["Iteration time (sec)"].mean(),4)}±{round(gpu_df["Iteration time (sec)"].std(),4)}'
#         gpu_log.append(gpu_epoch_time)
#         gpu_stall_time = f'{round(gpu_df["Training stall time (sec)"].mean(),4)}±{round(gpu_df["Training stall time (sec)"].std(),4)}'
#         gpu_log.append(gpu_stall_time)


#     df = df[df["Step"] > 10]
#     filtered_avg_epoch_time = round(
#         df["Iteration time (sec)"].astype(float).sum()/(int(epoch_num)-1), 2)
#     print(df.describe())
#     print("\n")
    # print(parse_filename, df["Iteration time (sec)"])
    iter_origin = df["Iteration time (sec)"].mean()
    iter_avg = f'{round(iter_origin,4)}±{round(df["Iteration time (sec)"].std(),4)}'
    data_avg = f'{round(df["Training stall time (sec)"].mean(),4)}±{round(df["Training stall time (sec)"].std(),4)}'
    delay_avg = f'{round(df["Delay time (sec)"].mean(),4)}±{round(df["Delay time (sec)"].std(),4)}'
    fetch_avg = f'{round(df["Batch fetch time diff (sec)"].mean(),4)}±{round(df["Batch fetch time diff (sec)"].std(),4)}'
    fetch_start_avg = f'{round(df["Batch fetch start time diff (sec)"].mean(),4)}±{round(df["Batch fetch start time diff (sec)"].std(),4)}'

    throughput_origin = single_batchsize_num/iter_origin
    throughput_avg = round(throughput_origin, 4)
    # # [Note] :
    # # Deprecated throughput avg,
    # # theoretically wrong in mathematic, Please check below link
    # # https://fxloader.com/inverse_of_an_average_compared_to_averages_of_inverses/
    # throughput_avg = f'{round(df["Throughput (image/sec)"].mean(),2)}±{round(df["Throughput (image/sec)"].std(),4)}'

    # FIXME: Hard coded as imagenet avg size (MB)
    processed_data_avg = throughput_origin * 105.53 / 1024

    total_loglet = [align_type, logtype, dataset, model, aug, conf["WORKER_NUM"], thread_num, epoch_num, batchsize_num,
                    iter_avg, throughput_avg, data_avg, delay_avg, fetch_start_avg, fetch_avg]
#     total_loglet.extend(gpu_log)
    total_log.append(total_loglet)

avg_df = pd.DataFrame(total_log,
                      columns=dataset_col_name)
avg_df.dropna().to_csv(f"./{trainType}/total_summary.csv",
                       sep=',', na_rep='NA')

2021-08-27 13:36:10,239	ERROR worker.py:78 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::simulation()[39m (pid=64124, ip=10.20.22.156)
  File "python/ray/_raylet.pyx", line 534, in ray._raylet.execute_task
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-4-9b1befbba0b7>", line 107, in simulation
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/series.py", line 966, in __getitem__
    return self._get_with(key)
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/series.py", line 973, in _get_with
    slobj = self.index._convert_slice_indexer(key, kind="getitem")
  File "/home/chanho/anaconda3/envs/torchtest/lib/python3.8/site-packages/pandas/core/indexes/base.py", line 3753, in _convert_slice_indexer
    indexer = self.slice_indexe