In [4]:
# Required preprocessing/parsing of the job data

import os
import numpy as np
import pandas as pd
import matplotlib.patches as mpatches
import matplotlib.pyplot as plt
import time, datetime, pytz
from matplotlib.ticker import MultipleLocator, FixedLocator, LogLocator, NullFormatter, ScalarFormatter
from datetime import date, datetime, time, timedelta
import re


In [5]:
# Load and preprocess raw slurmdata collected from SLURM:
#   convert data tpye -> int 
#   Unknown -> NaN values

def preprocess_df(file_name):
    with open(os.path.join(file_path, file_name),'r') as file:
        filedata = file.read()
        filedata = filedata.replace('None assigned','NoneAssigned')

    data = pd.read_fwf(os.path.join(file_path, file_name), delimiter = r"\s", header = None)
    data = data.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
    data = data.rename(columns=data.iloc[0]).drop(data.index[0])
    data = data.iloc[1:]
    data = data.astype({"ElapsedRaw": int, "CPUTimeRAW": int, "NCPUS": int, "NNode":int, "AllocCPUS":int, "AllocNode":int, "ReqCPUS":int})
    data.replace(to_replace = r'^\s*$', value = np.nan, regex = True, inplace = True)  # Blank value->NaN value
    data.replace(to_replace = r'Unknown', value = np.nan, regex = True, inplace = True)  # Unknown value->NaN value
    return(data)


In [7]:
file_path = "/Users/yzang/Documents/Slurm data/Raw data/"
data_0 = preprocess_df("slurm_data_0322-0223.csv")
data_0.to_csv(file_path + "raw_data_cleaned.csv")

In [9]:
# Read data of Jan 2022 and Feb 2022
slurm_0122 = preprocess_df("slurm_data_0122.csv")
slurm_0222 = preprocess_df("slurm_data_0222.csv")
slurm_0322 = preprocess_df("slurm_data_0322.csv")

In [10]:
# Merge data of Jan 2022 and Feb 2022
data_1 = pd.merge(slurm_0122, slurm_0222, how = 'outer')

In [11]:
data_2 = pd.merge(data_1, slurm_0322, how = "outer")

In [12]:
# Merge the two datasets
data = data_2.append(data_0)

  data = data_2.append(data_0)


In [13]:
# Drop NaN values in Start, End
data = data.dropna(subset = ["Start", "End"])  

In [14]:
# Unify the "State" metrics
data.State = data.State.apply(lambda x: "CANCELLED" if 'CANCELLED' in x else x)
data.State = data.State.apply(lambda x: "OUT_OF_MEMORY" if 'OUT_OF_ME' in x else x)

In [15]:
data.State.value_counts()

COMPLETED        1768608
FAILED            424160
CANCELLED         152336
TIMEOUT            67168
OUT_OF_MEMORY      13390
NODE_FAIL             56
REQUEUED               6
Name: State, dtype: int64

In [16]:
# parses node strings like r12n[1-30,32] to r12n1, r12n2 ... r12n30, r12n32
def split_nodes(s):
    if s is None or len(s) == 0:
        return set()
    
    s = s.replace(",+", "").replace("+", "").replace("\r\n", "").replace("\n", "").replace("\t", "")

    start = 0
    index = 0
    rack_chunks = []
    in_bracket = False
    while index < len(s):  # Separate them in parts like r12n[1-30,32] or r13n1
        if s[index] == "[":
            in_bracket = True
        elif s[index] == "]":
            in_bracket = False
        elif s[index] == "," and not in_bracket:
            rack_chunks.append(s[start: index])
            start = index + 1
        index += 1
    rack_chunks.append(s[start: index])  # Add the last line

    node_names = set()

    for rack_chunk in rack_chunks:
        if "[" in rack_chunk:
            prefix, postfix = rack_chunk.split("[")
            postfix = postfix[:-1]  # Remove the last bracket
            nodes = postfix.split(",")
            for node in nodes:
                if "-" in node:
                    start, end = node.split("-")
                    if not start.isnumeric() or not end.isnumeric():
                        continue
                    for i in range(int(start), int(end) + 1):
                        node_names.add("{}{}".format(prefix, i))
                else:
                    node_names.add("{}{}".format(prefix, node))
        else:
            node_names.add(rack_chunk)

    return node_names

# node_names = split_nodes("r26n[4,7-16,29]")
# node_names

In [17]:
# Unify data: 
# ->AveCPUFreq M/K-> M, 
# ->AveDiskRead -> M, 
# ->AveDiskWrite -> M, 
# ->MaxDiskRead -> M, 
# ->MaxDiskWrite -> M, 
# ->MaxVMSize -> M, 
# ->ReqMem M/G-> M,
# ->NodeList -> r1n[1-3], r2n[4-5] sepereate to a list: [r1n1, r1n2, r1n3, r2n4, r2n5]

def unify_ave_cpu_freq(df):
    ave_cpu_freq_l = []
    for i in df["AveCPUFreq"]:
        if pd.isnull(i):
            ave_cpu_freq_num = i
        else:
            ave_cpu_freq = re.findall(r"\d+\.?\d*", i)
            ave_cpu_freq_num = ave_cpu_freq[0]
            if 'K' in i:
                ave_cpu_freq_num = format(float(ave_cpu_freq_num)/1024.00, '.2f')
            else:
                ave_cpu_freq_num = format(float(ave_cpu_freq_num)/1.00, '.2f')
        ave_cpu_freq_l.append(ave_cpu_freq_num)
    df["ave_cpu_freq"] = ave_cpu_freq_l

def unify_ave_disk_read(df):
    ave_disk_read_l = []
    for i in df["AveDiskRead"]:
        if pd.isnull(i):
            ave_disk_read_num = i
        else:
            ave_disk_read = re.findall(r"\d+\.?\d*", i)
            ave_disk_read_num = ave_disk_read[0]
            if 'G' in i:
                ave_disk_read_num = format(float(ave_disk_read_num)*1024.00, '.2f')
            else:
                ave_disk_read_num = format(float(ave_disk_read_num)*1.00, '.2f')
        ave_disk_read_l.append(ave_disk_read_num)
    df["ave_disk_read"] = ave_disk_read_l
    
def unify_ave_disk_write(df):
    ave_disk_write_l = []
    for i in df["AveDiskWrite"]:
        if pd.isnull(i):
            ave_disk_write_num = i
        else:
            ave_disk_write = re.findall(r"\d+\.?\d*", i)
            ave_disk_write_num = ave_disk_write[0]
            if 'G' in i:
                ave_disk_write_num = format(float(ave_disk_write_num)*1024.00, '.2f')
            else:
                ave_disk_write_num = format(float(ave_disk_write_num)*1.00, '.2f')
        ave_disk_write_l.append(ave_disk_write_num)
    df["ave_disk_write"] = ave_disk_write_l

def unify_max_disk_read(df):
    max_disk_read_l = []
    for i in df["MaxDiskRead"]:
        if pd.isnull(i):
            max_disk_read_num = i
        else:
            max_disk_read = re.findall(r"\d+\.?\d*", i)
            max_disk_read_num = max_disk_read[0]
            if 'G' in i:
                max_disk_read_num = format(float(max_disk_read_num)*1024.00, '.2f')
            else:
                max_disk_read_num = format(float(max_disk_read_num)*1.00, '.2f')
        max_disk_read_l.append(max_disk_read_num)
    df["max_disk_read"] = max_disk_read_l

def unify_max_disk_write(df):
    max_disk_write_l = []
    for i in df["MaxDiskWrite"]:
        if pd.isnull(i):
            max_disk_write_num = i
        else:
            max_disk_write = re.findall(r"\d+\.?\d*", i)
            max_disk_write_num = max_disk_write[0]
            if 'G' in i:
                max_disk_write_num = format(float(max_disk_write_num)*1024.00, '.2f')
            else:
                max_disk_write_num = format(float(max_disk_write_num)*1.00, '.2f')
        max_disk_write_l.append(max_disk_write_num)
    df["max_disk_write"] = max_disk_write_l
    
def unify_max_vm_size(df):
    max_vm_size_l = []
    for i in df["MaxDiskWrite"]:
        if pd.isnull(i):
            max_vm_size_num = i
        else:
            max_vm_size = re.findall(r"\d+\.?\d*", i)
            max_vm_size_num = max_vm_size[0]
            if 'G' in i:
                max_vm_size_num = format(float(max_vm_size_num)*1024.00, '.2f')
            else:
                max_vm_size_num = format(float(max_vm_size_num)*1.00, '.2f')
        max_vm_size_l.append(max_vm_size_num)
    df["max_vm_size"] = max_vm_size_l
    
def unify_req_mem(df):
    req_mem_l = []
    for i in df["ReqMem"]:
        if pd.isnull(i):
            req_mem_num = i
        else:
            req_mem = re.findall(r"\d+\.?\d*", i)
            req_mem_num = req_mem[0]
            if 'G' in i:
                req_mem_num = format(float(req_mem_num)*1024.00, '.2f')
            else:
                req_mem_num = format(float(req_mem_num)*1.00, '.2f')
        req_mem_l.append(req_mem_num)
    df["req_mem"] = req_mem_l

def unify_node_list(df):
    node_list = []
    for node_name in df["NodeList"]:
        node_name_list = split_nodes(node_name)
        node_list.append(node_name_list)
    df["node_list"] = node_list

def unify_data(df):
    unify_ave_cpu_freq(df)
    unify_ave_disk_read(df)
    unify_ave_disk_write(df)
    unify_max_disk_read(df)
    unify_max_disk_write(df)
    unify_max_vm_size(df)
    unify_req_mem(df)
    unify_node_list(df)
    

In [18]:
unify_data(data)
# data.head(10)

In [19]:
data.drop(["ExitCode", "AveCPUFreq", "AveDiskRead", "AveDiskWrite", "MaxDiskRead", "MaxDiskWrite", "MaxVMSize", "ReqMem"], axis = 1, inplace = True)
# data.head(10)


In [20]:
num_partition = data["Partition"].value_counts()
display(num_partition)
print(len(num_partition))

normal                        1372932
shared                         581609
gpu_shared                     104591
gpu_titanrtx_shared             63530
gpu_shared_course               45517
gpu_titan+                      21019
gpu_titanrtx                    14695
gpu                             11950
shared_52c_384g                 11351
gpu_titanrtx_shared_course      11242
fat_soil_shared                  7049
gpu_short                        6159
gpu_share+                       4507
sw                               4394
shared_jupyter                   2903
fat                              2621
gpu_titanrtx_short               2501
short                            1701
fat_soil_+                       1567
gpu_shared_jupyter               1563
hared_ju+                         344
Name: Partition, dtype: int64

21


In [22]:
# Construct new fields: 
#   Submit, Start, End
#   submit_hour_of_day, submit_day_of_week, submit_date
#   waiting_time=Start-Submit, running_time=End-Start

data['Submit'] = pd.to_datetime(data['Submit'], utc=False, format="%Y-%m-%dT%H:%M:%S")
data['Start'] = pd.to_datetime(data['Start'], utc=False, format="%Y-%m-%dT%H:%M:%S")
data['End'] = pd.to_datetime(data['End'], utc=False, format="%Y-%m-%dT%H:%M:%S")

data["submit_hour_of_day"] = data["Submit"].dt.hour
data["submit_day_of_week"] = data["Submit"].apply(lambda x:x.weekday())
data['sumbit_day_of_month'] = data['Submit'].dt.day
data["submit_date"] = data["Submit"].dt.date

data["waiting_time"] = data["Start"] - data["Submit"]
data["waiting_time"] = data["waiting_time"].apply(lambda x:timedelta.total_seconds(x))

data["running_time"] = data["End"] - data["Start"]
data["running_time"] = data["running_time"].apply(lambda x:timedelta.total_seconds(x))

# data

In [23]:
# Extract ml jobs and tasks
gpu_nodes = {
    "r28n1", "r28n2", "r28n3", "r28n4", "r28n5",
    "r29n1", "r29n2", "r29n3", "r29n4", "r29n5",
    "r30n1", "r30n2", "r30n3", "r30n4", "r30n5", "r30n6", "r30n7",
    "r31n1", "r31n2", "r31n3", "r31n4", "r31n5", "r31n6"
    "r32n1", "r32n2", "r32n3", "r32n4", "r32n5", "r32n6", "r32n7",
    "r33n2", "r33n3", "r33n5", "r33n6",
    "r34n1", "r34n2", "r34n3", "r34n4", "r34n5", "r34n6", "r34n7",
    "r35n1", "r35n2", "r35n3", "r35n4", "r35n5",
    "r36n1", "r36n2", "r36n3", "r36n4", "r36n5",
    "r38n1", "r38n2", "r38n3", "r38n4", "r38n5",
}

def mark_ml_data(df):
    node_type_l = []
    for i in df["node_list"]:
        if any(n in gpu_nodes for n in i):
            node_type = 0
        else: 
            node_type = 1
        node_type_l.append(node_type)
    df["node_type"] = node_type_l

mark_ml_data(data)
data.drop("NodeList", axis = 1, inplace = True)

ml_data = data.loc[data["node_type"] == 0]
generic_data = data.loc[data["node_type"] == 1]

In [24]:
# Extract jobs and tasks run on the node "gpu_titanrtx_shared_course"
def mark_course_data(df):
    for i in df.index:
        if "course" in df["Partition"].iloc[i]: df["node_type"].iloc[i] = 2
        if "education" in df["Partition"].iloc[i]: df["node_type"].iloc[i] = 2

In [25]:
mark_course_data(data)
course_data = data.loc[data["node_type"] == 2]

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  if "course" in df["Partition"].iloc[i]: df["node_type"].iloc[i] = 2


In [None]:
course_data

In [26]:
data['node_type'].value_counts()

1    2143897
0     228290
2      53537
Name: node_type, dtype: int64

In [27]:
ml_data.drop(columns = "node_type", inplace = True)
generic_data.drop(columns = "node_type", inplace = True)
course_data.drop(columns = "node_type", inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  ml_data.drop(columns = "node_type", inplace = True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  generic_data.drop(columns = "node_type", inplace = True)
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  course_data.drop(columns = "node_type", inplace = True)


In [None]:
# Extract jobs and tasks run on the node "gpu_titanrtx_shared_course"
# course_data = ml_data.loc[ml_data['Partition'].str.contains("course|education", na = False)]
# course_data

In [28]:
# Save the cleaned ml data and generic data
file_path_2 = "/Users/yzang/Documents/Slurm data/Clean data/"

data.to_csv(file_path_2 + "slurm_data_cleaned.csv")
ml_data.to_csv(file_path_2 + "ml_data_cleaned.csv")
course_data.to_csv(file_path_2 + "course_data_cleaned.csv")
generic_data.to_csv(file_path_2 + "generic_data_cleaned.csv")

In [29]:
# data.isna().sum()
missing_per = round(data.isnull().mean() * 100, 2)
missing_per

JobID                   0.00
GID                     6.27
UID                     6.27
Partition               6.27
Submit                  0.00
Start                   0.00
End                     0.00
ElapsedRaw              0.00
CPUTimeRAW              0.00
NCPUS                   0.00
NNode                   0.00
State                   0.00
AllocCPUS               0.00
AllocNode               0.00
NTask                  93.73
ReqCPUS                 0.00
ave_cpu_freq           93.73
ave_disk_read          93.79
ave_disk_write         93.79
max_disk_read          93.79
max_disk_write         93.79
max_vm_size            93.79
req_mem                 6.27
node_list               0.00
submit_hour_of_day      0.00
submit_day_of_week      0.00
sumbit_day_of_month     0.00
submit_date             0.00
waiting_time            0.00
running_time            0.00
node_type               0.00
dtype: float64