In [1]:
import pandas as pd
import os, glob

def get_header_from_schema(schema: pd.DataFrame, subdir: str) -> list[str]:
    names = schema[schema['file pattern'].str.contains(f'^{subdir}')]['content'].tolist()
    return [s.replace(' ', '_') for s in names]


def get_df_from_multiple_csv_in_folder(folder: os.PathLike, names, limit_no_of_file=None, usecols=[], dtype=None, converters=None) -> pd.DataFrame:
    all_csv = glob.glob(os.path.join(folder, '*.csv'))
    if limit_no_of_file:
        all_csv = all_csv[:limit_no_of_file]

    try:
        return pd.concat((pd.read_csv(f, names=names, usecols=usecols, dtype=dtype, converters=converters, on_bad_lines='skip', verbose=True)\
                          for f in all_csv), ignore_index=True)
    except MemoryError:
        retry_files = len(all_csv) // 2
        print(f"Out of memory, retry with litmiting of {retry_files} files")
        get_df_from_multiple_csv_in_folder(folder, names, limit_no_of_file=retry_files, usecols=usecols, dtype=dtype)

In [12]:
import configparser, numpy as np

config = configparser.ConfigParser()
config.read('data.ini')
default = config['DEFAULT']

TRACES_PATH = default.get('traces_path')
MACHINE_EVENTS_SUBDIR = default.get('machine_events_subdir')
TASK_EVENTS_SUBDIR = default.get('task_events_subdir')

schema = pd.read_csv(os.path.join(TRACES_PATH, 'schema.csv'))

In [11]:
schema[schema['file pattern'].str.contains(f'^{MACHINE_EVENTS_SUBDIR}')]

Unnamed: 0,file pattern,field number,content,format,mandatory
21,machine_events/part-00000-of-00001.csv.gz,1,time,INTEGER,YES
22,machine_events/part-00000-of-00001.csv.gz,2,machine ID,INTEGER,YES
23,machine_events/part-00000-of-00001.csv.gz,3,event type,INTEGER,YES
24,machine_events/part-00000-of-00001.csv.gz,4,platform ID,STRING_HASH,NO
25,machine_events/part-00000-of-00001.csv.gz,5,CPUs,FLOAT,NO
26,machine_events/part-00000-of-00001.csv.gz,6,Memory,FLOAT,NO


In [13]:
schema[schema['file pattern'].str.contains(f'^{TASK_EVENTS_SUBDIR}')]

Unnamed: 0,file pattern,field number,content,format,mandatory
8,task_events/part-?????-of-?????.csv.gz,1,time,INTEGER,YES
9,task_events/part-?????-of-?????.csv.gz,2,missing info,INTEGER,NO
10,task_events/part-?????-of-?????.csv.gz,3,job ID,INTEGER,YES
11,task_events/part-?????-of-?????.csv.gz,4,task index,INTEGER,YES
12,task_events/part-?????-of-?????.csv.gz,5,machine ID,INTEGER,NO
13,task_events/part-?????-of-?????.csv.gz,6,event type,INTEGER,YES
14,task_events/part-?????-of-?????.csv.gz,7,user,STRING_HASH,NO
15,task_events/part-?????-of-?????.csv.gz,8,scheduling class,INTEGER,NO
16,task_events/part-?????-of-?????.csv.gz,9,priority,INTEGER,YES
17,task_events/part-?????-of-?????.csv.gz,10,CPU request,FLOAT,NO


In [6]:
machines_info = get_df_from_multiple_csv_in_folder(os.path.join(TRACES_PATH, MACHINE_EVENTS_SUBDIR),\
                                                names=get_header_from_schema(schema, MACHINE_EVENTS_SUBDIR),\
                                                usecols=['machine_ID', 'CPUs', 'Memory'],
                                                dtype={'machine_ID': np.int64})
machines_info.dropna(inplace=True)
machines_info.drop_duplicates(subset='machine_ID', inplace=True)
machines_info.to_csv(default.get('machines_info_path'), index=False)

Tokenization took: 20.10 ms
Type conversion took: 1.06 ms
Parser memory cleanup took: 0.00 ms


In [15]:
task_events = get_df_from_multiple_csv_in_folder(os.path.join(TRACES_PATH, TASK_EVENTS_SUBDIR),\
                                                    names=get_header_from_schema(schema, TASK_EVENTS_SUBDIR),\
                                                    usecols=['time', 'job_ID', 'task_index', 'event_type', 'CPU_request', 'memory_request'],\
                                                    dtype={'event type': 'category'})
task_events.dropna(inplace=True)
KEY = ['job_ID', 'task_index']

SUBMIT = 0
SCHEDULE = 1
FINISH = 4

submitted_tasks = task_events[task_events['event_type'] == SUBMIT]
submitted_tasks.drop(columns=['event_type'], inplace=True)
# tasks might be scheduled multiple time but might be fail/evicted/lost and scheduled again, keep only the last scheduled timestamp
submitted_tasks.drop_duplicates(subset=KEY, keep='last', inplace=True)
submitted_tasks.rename(columns={'time': 'start_time'}, inplace=True)

scheduled_tasks = task_events[task_events['event_type'] == SCHEDULE]
scheduled_tasks.drop(columns=['event_type', 'CPU_request', 'memory_request'], inplace=True)
scheduled_tasks.drop_duplicates(subset=KEY, keep='last', inplace=True)
scheduled_tasks.rename(columns={'time': 'scheduled_time'}, inplace=True)

submitted_tasks.set_index(KEY, inplace=True)
scheduled_tasks.set_index(KEY, inplace=True)
submitted_tasks = submitted_tasks.join(scheduled_tasks, on=KEY, how='inner')

task_events = task_events[task_events['event_type'] == FINISH]
task_events.drop(columns=['event_type', 'CPU_request', 'memory_request'], inplace=True)
task_events.drop_duplicates(subset=KEY, keep='last', inplace=True)
task_events.rename(columns={'time': 'finished_time'}, inplace=True)

task_events.set_index(KEY, inplace=True)
tasks = task_events.join(submitted_tasks, on=KEY, how='inner')

tasks.start_time = round(tasks.start_time / 1e+6) # time is in microsecond, convert to second
tasks.scheduled_time = round(tasks.scheduled_time / 1e+6) # time is in microsecond, convert to second
tasks.finished_time = round(tasks.finished_time / 1e+6) # time is in microsecond, convert to second
tasks['runtime'] = tasks.finished_time - tasks.scheduled_time
tasks.drop(tasks[tasks['runtime'] < 0].index, inplace=True) # drop tasks that have negative runtime
tasks.drop(columns=['scheduled_time', 'finished_time'], inplace=True)
tasks.sort_values(by=['start_time'], inplace=True)

tasks.reset_index(drop=True, inplace=True)
tasks.to_csv(default['tasks_path'], index=False)

Tokenization took: 30.93 ms
Type conversion took: 5.37 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 20.43 ms
Type conversion took: 28.21 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 21.17 ms
Type conversion took: 8.80 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 21.61 ms
Type conversion took: 8.42 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 21.20 ms
Type conversion took: 12.20 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 20.74 ms
Type conversion took: 10.14 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 19.06 ms
Type conversion took: 4.58 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 27.22 ms
Type conversion took: 9.00 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 4.30 ms
Type conversion took: 2.79 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 30.75 ms
Type conversion took: 5.66 ms
Parser memory cleanup took: 0.00 ms
Tokenization took: 20.97 ms
Type conversion took: 15.37 ms

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  submitted_tasks.drop(columns=['event_type'], inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  submitted_tasks.drop_duplicates(subset=KEY, keep='last', inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  submitted_tasks.rename(columns={'time': 'start_time'}, inplace=True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#re