# Philly data preparation

Weifan jiang, weifanjiang@g.harvard.edu

In [1]:
import datetime
import csv
import os
import json
import random
import numpy as np
from tqdm.notebook import tqdm
import pandas as pd

## utility functions

In [2]:
# parse string date
def parse_date(date_str):
    if date_str is None or date_str == '' or date_str == 'None':
        return None
    if date_str.endswith("PST") or date_str.endswith("PDT"):
        date_str = date_str[:-4]
    return datetime.datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S')


# convert time delta object to number of minutes
def timedelta_to_minutes(timedelta):
    minutes = 0.0
    minutes += timedelta.days * 24 * 60
    minutes += timedelta.seconds / 60.0
    minutes += timedelta.microseconds / (60 * 1000)
    return minutes


# convert datetime object to string representation
def datetime_to_str(t):
    return t.strftime('%Y-%m-%d %H:%M:%S')


# adjust current time by several minutes
def change_time_by_min(t, minutes):
    return t + datetime.timedelta(seconds=60*minutes)


# count the number of machines that a job is scheduled on
# returns (cpu count, gpu count)
def count_machines(detail):
    cpu_count = len(detail)
    gpu_count = 0
    for machine in detail:
        gpu_count += len(machine["gpus"])
    return cpu_count, gpu_count


# rounds a datetime object down to the nearest minute
def round_to_nearest_minute(t):
    return t - datetime.timedelta(seconds=t.second, microseconds=t.microsecond)


# get the time interval from lo to hi, centered at given time
def get_time_interval(center, lo, hi):
    dt = round_to_nearest_minute(parse_date(center))
    return [datetime_to_str(change_time_by_min(dt, x)) for x in range(lo, hi + 1)]


# read csv (the format is not compatible with pandas.read_csv)
def philly_read_csv(fpath, max_lines, desc='loading'):
    columns = None
    data_lists = None
    with open(fpath, "r") as fin:
        reader = csv.reader(fin)
        columns = [x.strip() for x in next(reader)]
        data_lists = [list() for _ in columns]
        pbar = tqdm(total=max_lines, desc=desc)
        for row_raw in reader:
            pbar.update(1)

            # handle missing data: replace "NA" with None
            row = [x if x != "NA" else -1 for x in row_raw]

            # special case for file misformat in gpu utilization trace
            if len(row) != len(columns):
                if row[-1] == "":
                    row = row[:-1]
                if len(row) == 18 and len(columns) == 10:
                    row = row[0:2] + row[2:][::2]
                row = row + [None, ] * (len(columns) - len(row))
            
            # remove the time zone
            if columns[0] == 'time':
                row[0] = row[0][:-4]
            
            for idx, element in enumerate(row):
                data_lists[idx].append(element)
        pbar.close()
    data_dict = dict()
    for colname, elements in zip(columns, data_lists):
        data_dict[colname] = elements
    return pd.DataFrame(data=data_dict)

## preparation

In [3]:
# seed
np.random.seed(10)
random.seed(10)


# data location
trace_dir = "philly-traces/trace-data/"
job_log_path = os.path.join(trace_dir, "cluster_job_log")
output_dir = "data/philly"
sampled_jobs_path = os.path.join(output_dir, "sampled_jobs.json")
os.makedirs(output_dir, exist_ok=True)
for name in ['gpu_util', 'cpu_util', 'mem_util']:
    job_data_dir = os.path.join(output_dir, name)
    os.makedirs(job_data_dir, exist_ok=True)


# schema
time_lo = -10
time_hi = 3
output_columns = ["name", "machine_type", "trace", ]
for i in range(time_lo, time_hi + 1):
    output_columns.append(i)

## sample jobs

In [4]:
if not os.path.isfile(sampled_jobs_path):

    min_start_time = "2017-10-03 01:00:00"
    max_start_time = "2017-12-15 17:42:00"

    # read full data
    with open(job_log_path, "r") as fin:
        job_log = json.load(fin)
    

    # only keep the last attempt of jobs
    jobs_single_attempt = list()
    for job in job_log:
        if len(job["attempts"]) > 1:
            job["attempts"] = [job["attempts"][-1], ]
        # select for pass/fail jobs
        if job['status'] in ('Pass', 'Failed') and len(job["attempts"]) == 1:
            jobs_single_attempt.append(job)

    # jobs with complete runtime properties
    for job in jobs_single_attempt:
        start_time = parse_date(job["attempts"][0]["start_time"])
        end_time = parse_date(job["attempts"][0]["end_time"])
        if start_time is not None and end_time is not None:
            job["runtime_min"] = timedelta_to_minutes(end_time - start_time)
        else:
            job["runtime_min"] = None
    jobs_single_attempt = [x for x in jobs_single_attempt if x['runtime_min'] is not None]
    # filter for jobs that lasted for at list 5 minutes
    jobs_single_attempt = [x for x in jobs_single_attempt if 5 <= x['runtime_min']]
    # try to select jobs scheduled on multiple GPUs
    jobs_single_attempt = [
        x for x in jobs_single_attempt if count_machines(x["attempts"][0]["detail"])[1] > 1
    ]

    # format output
    output_json = list()
    pbar = tqdm(total=len(jobs_single_attempt), desc="extract sampled jobs")
    for job in jobs_single_attempt:
        pbar.update(1)
        output_job = dict()
        for key in ("status", "vc", "jobid", "submitted_time", "user", "runtime_min"):
            output_job[key] = job[key]
        for key in ("start_time", "end_time", "detail"):
            output_job[key] = job["attempts"][0][key]
        
        # filter for time interval with cpu/gpu/mem logs
        if output_job["start_time"] >= min_start_time and output_job["start_time"] <= max_start_time:
            output_json.append(output_job)
    pbar.close()
    with open(sampled_jobs_path, "w") as fout:
        json.dump(output_json, fout, indent=2)


with open(sampled_jobs_path, "r") as fin:
    sampled_jobs = json.load(fin)

total_job_count = len(sampled_jobs)
pass_job_count = len([x for x in sampled_jobs if x['status'] == 'Pass'])
failed_job_count = len([x for x in sampled_jobs if x['status'] == 'Failed'])

print('total job count {}'.format(total_job_count))
print('Pass jobs {}, failed jobs {}'.format(pass_job_count, failed_job_count))

total job count 5910
Pass jobs 3172, failed jobs 2738


## per-job gpu trace collection

In [5]:
gpu_df = philly_read_csv(os.path.join(trace_dir, "cluster_gpu_util"), 44750640, "load gpu utilization traces")
gpu_df.set_index(["time", "machineId"], inplace=True)
gpu_df.sort_index(inplace=True)

load gpu utilization traces:   0%|          | 0/44750640 [00:00<?, ?it/s]

In [6]:
for job in tqdm(sampled_jobs, desc='collect job gpu utilizations'):
    job_trace_path = os.path.join(output_dir, "gpu_util", "{}.csv".format(job["jobid"]))
    if not os.path.isfile(job_trace_path):
        data_lists = [list() for _ in output_columns]
        selected_times = get_time_interval(job["start_time"], time_lo, time_hi)
        for assignment in job["detail"]:
            ip = assignment["ip"]
            gpus = assignment["gpus"]
            for gpu in gpus:
                data_lists[0].append("{}_{}".format(ip, gpu))
                data_lists[1].append("gpu")
                data_lists[2].append("utilization")

                for idx, stime in enumerate(selected_times):
                    if (stime, ip) in gpu_df.index:
                        val = gpu_df.loc[(stime, ip), "{}_util".format(gpu)]
                        if val is None:
                            data_lists[3 + idx].append(None)
                        else:
                            data_lists[3 + idx].append(float(val))
                    else:
                        data_lists[3 + idx].append(None)
        data_dict = dict()
        for cname, cvalues in zip(output_columns, data_lists):
            data_dict[cname] = cvalues
        out_df = pd.DataFrame(data=data_dict)
        out_df.to_csv(job_trace_path, index=False)

collect job gpu utilizations:   0%|          | 0/5910 [00:00<?, ?it/s]

## per-job cpu trace collection

In [5]:
cpu_df = philly_read_csv(os.path.join(trace_dir, "cluster_cpu_util"), 45028260, "load cpu utilization traces")
cpu_df.set_index(["time", "machine_id"], inplace=True)
cpu_df.sort_index(inplace=True)

load cpu utilization traces:   0%|          | 0/45028260 [00:00<?, ?it/s]

In [7]:
for job in tqdm(sampled_jobs, desc='collect job cpu utilizations'):
    job_trace_path = os.path.join(output_dir, "cpu_util", "{}.csv".format(job["jobid"]))
    if not os.path.isfile(job_trace_path):
        data_lists = [list() for _ in output_columns]
        selected_times = get_time_interval(job["start_time"], time_lo, time_hi)
        for assignment in job["detail"]:
            ip = assignment["ip"]
            data_lists[0].append(ip)
            data_lists[1].append("cpu")
            data_lists[2].append("utilization")

            for idx, stime in enumerate(selected_times):
                if (stime, ip) in cpu_df.index:
                    val = cpu_df.loc[(stime, ip)].loc[(stime, ip), "cpu_util"]
                    if val is None:
                        data_lists[3 + idx].append(None)
                    else:
                        data_lists[3 + idx].append(val)
                else:
                    data_lists[3 + idx].append(None)
        data_dict = dict()
        for cname, cvalues in zip(output_columns, data_lists):
            data_dict[cname] = cvalues
        out_df = pd.DataFrame(data=data_dict)
        out_df.to_csv(job_trace_path, index=False)

collect job cpu utilizations:   0%|          | 0/5910 [00:00<?, ?it/s]

## per-job memory utilization

In [8]:
mem_df = philly_read_csv(os.path.join(trace_dir, "cluster_mem_util"), 45003060, "load memory utilization traces")
mem_df.set_index(["time", "machine_id"], inplace=True)
mem_df.sort_index(inplace=True)

load memory utilization traces:   0%|          | 0/45003060 [00:00<?, ?it/s]

In [10]:
for job in tqdm(sampled_jobs, desc='collect job memory utilizations'):
    job_trace_path = os.path.join(output_dir, "mem_util", "{}.csv".format(job["jobid"]))
    if not os.path.isfile(job_trace_path):
        data_lists = [list() for _ in output_columns]
        selected_times = get_time_interval(job["start_time"], time_lo, time_hi)
        for assignment in job["detail"]:
            ip = assignment["ip"]
            data_lists[0].append(ip)
            data_lists[1].append("cpu")
            data_lists[2].append("memory")

            for idx, stime in enumerate(selected_times):
                if (stime, ip) in mem_df.index:
                    mtotal = mem_df.loc[(stime, ip), "mem_total"]
                    mfree = mem_df.loc[(stime, ip), "mem_free"]
                    if mtotal is None or mfree is None:
                        data_lists[3 + idx].append(None)
                    elif mtotal == -1 or mfree == -1:
                        data_lists[3 + idx].append(-1)
                    else:
                        data_lists[3 + idx].append(100 * float(mfree) / float(mtotal))
                else:
                    data_lists[3 + idx].append(None)
        data_dict = dict()
        for cname, cvalues in zip(output_columns, data_lists):
            data_dict[cname] = cvalues
        out_df = pd.DataFrame(data=data_dict)
        out_df.to_csv(job_trace_path, index=False)

collect job memory utilizations:   0%|          | 0/5910 [00:00<?, ?it/s]