In [None]:
# Required packages for processing
import pandas as pd
import numpy as np
import pytz, time
import matplotlib.pyplot as plt

%matplotlib inline

In [None]:
def preprocess_parquet_to_df(name):
    df = pd.read_parquet(name)
    df = df.replace(-1, 0)
    df = df.fillna(0)
    df["dt"] = pd.to_datetime(df.index, utc=True, unit="s")
    df["dt"] = df["dt"].dt.tz_convert(pytz.timezone('Europe/Amsterdam')).dt.tz_localize(None)
    df = df.set_index("dt")
    df = df.sort_index()
    df['Total']= df.sum(axis=1)
    return(df)

def preprocess_jobdata_to_df(name):
    with open(name,'r') as file:
        filedata = file.read()
        filedata = filedata.replace('None assigned','NoneAssigned')
    with open(str('processed_'+name),'w') as file:
        file.write(filedata)
    jobdata = pd.read_fwf(str('processed_'+name), delimiter=r"\s+", header=None)#, low_memory=False)
    jobdata = jobdata.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
    jobdata = jobdata.rename(columns=jobdata.iloc[0]).drop(jobdata.index[0])
    jobdata = jobdata.iloc[1:]
    jobdata = jobdata.astype({"ElapsedRaw": int, "CPUTimeRAW": int, "NCPUS": int})
    return(jobdata)

def calculate_perjob(jobdata,data_type,name):
    output = jobdata
    output[name] = ""
    # Make sure that date ranges for both sources are equal
    #output = output[(output["Start"] >= data_type.index[0]) & (output["Start"] <= data_type.index[-1])]
    # Iterate over rows, select relevant info for processing
    for index, row in output.iterrows():
        #print(index)
        start = row["Start"]
        end = row["End"]
        nodes = row["NodeList"]
        selection = data_type.loc[start:end]
        # Check if job is executed on only one node
        if nodes in selection.columns:
            selection = selection[nodes]
            total = selection.sum()
            print(total)
            output.loc[index,name] = total
        # Process node list containing multiple nodes, based on Kristians' code
        # To do: check for [, split list on comma, check for -, range first digit to second digit
        if "," in nodes:
            closeBracket = 0
            openBracket = nodes.find('[', closeBracket)
            if(openBracket == -1):
                splitnodes = nodes.split(',')
            nodeList = list()
            while(openBracket > -1):
                closeBracket = nodes.find(']', openBracket)
                rack = nodes[openBracket - 4:openBracket]
                temp = nodes[openBracket + 1:closeBracket]
                temp = temp.split(',')
                for item in temp:
                    dash = item.find('-')
                    if(dash > -1):
                        first = int(item.split('-')[0])
                        last = int(item.split('-')[1])
                        tempList = list(range(first,last+1))
                        for value in tempList:
                            nodeList.append(rack + str(value))
                    else:
                        nodeList.append(rack + item)
                nodes = nodes[:openBracket - 4] + ','.join(nodeList) + nodes[closeBracket+1:]
                nodeList.clear()
                openBracket = nodes.find('[', closeBracket)
            # Generate list of individual nodes in job
            splitnodes = nodes.split(',')
            totallist = []
            # Cycle through list, calculate metric for each node and sum them together
            for node in splitnodes:
                try:
                    selection = selection[node]
                    totallist.append(selection.sum())
                except KeyError:
                    pass
            total = sum(totallist)
            print(total)
            output.loc[index,name] = total
    return(output)

In [None]:
# Load and preprocess data
node_sockstat_TCP_mem = preprocess_parquet_to_df("node_sockstat_TCP_mem")
node_sockstat_UDP_mem = preprocess_parquet_to_df("node_sockstat_UDP_mem")
node_network_transmit_packets = preprocess_parquet_to_df("node_network_transmit_packets")
jobdata = preprocess_jobdata_to_df("jobdata.csv")

In [None]:
print(node_sockstat_TCP_mem)

In [None]:
# Check if preprocessing was executed as expected
filtered_jobdata = jobdata[(jobdata["Start"] >= '2020-01-01 00:00:00') & (jobdata["Start"] <= '2020-08-05 00:00:00')]
filtered_jobdata = filtered_jobdata[(~filtered_jobdata["NodeList"].str.contains("None")) & (~filtered_jobdata["NodeList"].str.contains("software")) & (~filtered_jobdata["NodeList"].str.contains("login"))]
filtered_jobdata["ElapsedRaw"] = filtered_jobdata["ElapsedRaw"].apply(lambda x: (x / 60) / 60)
print(filtered_jobdata)

In [None]:
filtered_jobdata.to_pickle("filtered_jobdata.pkl")

In [None]:
# Start calculating metrics for all used nodes per job, save the output
node_sockstat_TCP_mem_perjob = calculate_perjob(jobdata,node_sockstat_TCP_mem,"node_sockstat_TCP_mem")
node_sockstat_TCP_mem_perjob.to_csv('node_sockstat_TCP_mem_perjob.csv')

In [None]:
node_sockstat_UDP_mem_perjob = calculate_perjob(jobdata,node_sockstat_UDP_mem,"node_sockstat_UDP_mem")
node_sockstat_UDP_mem_perjob.to_csv('node_sockstat_UDP_mem_perjob.csv')

In [None]:
node_network_transmit_packets_perjob = calculate_perjob(jobdata,node_network_transmit_packets,"node_network_transmit_packets")
node_network_transmit_packets_perjob.to_csv('node_network_transmit_packets_perjob.csv')

In [None]:
# Exclude 0 values, since in this instance we only want to analyze jobs with traffic
node_sockstat_TCP_mem_perjob = pd.read_csv('node_sockstat_TCP_mem_perjob.csv')  
node_sockstat_TCP_mem_perjob = node_sockstat_TCP_mem_perjob[(node_sockstat_TCP_mem_perjob["node_sockstat_TCP_mem"] > 0)]
# Transform runtime to hours
node_sockstat_TCP_mem_perjob["ElapsedRaw"] = node_sockstat_TCP_mem_perjob["ElapsedRaw"].apply(lambda x: x / 60 )
print(node_sockstat_TCP_mem_perjob)

In [None]:
# Calculate averages for different time bins
average_5min_TCP = (np.mean(node_sockstat_TCP_mem_perjob.loc[node_sockstat_TCP_mem_perjob["ElapsedRaw"] < 5, "node_sockstat_TCP_mem"]))
average_1hr_TCP = (np.mean(node_sockstat_TCP_mem_perjob.loc[(node_sockstat_TCP_mem_perjob["ElapsedRaw"] >= 5) & (node_sockstat_TCP_mem_perjob["ElapsedRaw"] < 60), "node_sockstat_TCP_mem"]))
average_6hrs_TCP = (np.mean(node_sockstat_TCP_mem_perjob.loc[(node_sockstat_TCP_mem_perjob["ElapsedRaw"] >= 60) & (node_sockstat_TCP_mem_perjob["ElapsedRaw"] < 360), "node_sockstat_TCP_mem"]))
average_1day_TCP = (np.mean(node_sockstat_TCP_mem_perjob.loc[(node_sockstat_TCP_mem_perjob["ElapsedRaw"] >= 360) & (node_sockstat_TCP_mem_perjob["ElapsedRaw"] < 1440), "node_sockstat_TCP_mem"]))
average_1to5days_TCP = (np.mean(node_sockstat_TCP_mem_perjob.loc[(node_sockstat_TCP_mem_perjob["ElapsedRaw"] >= 1440), "node_sockstat_TCP_mem"]))
print("mean_5min",average_5min_TCP)
print("mean_1hr",average_1hr_TCP)
print("mean_6hrs",average_6hrs_TCP)
print("mean_1day",average_1day_TCP)
print("mean_1to5days",average_1to5days_TCP)

In [None]:
# Visualize results
cat = ['<5 minutes', '<1 hour', '<6 hours', '<1 day', '1 to 5 days']
vals = [average_5min_TCP, average_1hr_TCP, average_6hrs_TCP, average_1day_TCP, average_1to5days_TCP]

fig = plt.figure()
plt.yscale("symlog")
plt.ylim(bottom=1)
plt.ylim(top=10**5.8)
plt.locator_params(axis='y', numticks=12)
plt.bar(cat, vals, width=1, edgecolor="black")
plt.xlabel("Job duration - Each previous category is excluded in the next")
plt.ylabel("Mean node_sockstat_TCP_mem")
plt.tight_layout()
fig.savefig("TCP_mem_job_duration.pdf")

In [None]:
# Exclude 0 values, since in this instance we only want to analyze jobs with traffic
node_sockstat_UDP_mem_perjob = pd.read_csv('node_sockstat_UDP_mem_perjob.csv')  
node_sockstat_UDP_mem_perjob = node_sockstat_UDP_mem_perjob[(node_sockstat_UDP_mem_perjob["node_sockstat_UDP_mem"] > 0)]
# Transform runtime to hours
node_sockstat_UDP_mem_perjob["ElapsedRaw"] = node_sockstat_UDP_mem_perjob["ElapsedRaw"].apply(lambda x: x / 60 )
print(node_sockstat_UDP_mem_perjob)

In [None]:
# Calculate averages for different time bins
average_5min_UDP = (np.mean(node_sockstat_UDP_mem_perjob.loc[node_sockstat_UDP_mem_perjob["ElapsedRaw"] < 5, "node_sockstat_UDP_mem"]))
average_1hr_UDP = (np.mean(node_sockstat_UDP_mem_perjob.loc[(node_sockstat_UDP_mem_perjob["ElapsedRaw"] >= 5) & (node_sockstat_UDP_mem_perjob["ElapsedRaw"] < 60), "node_sockstat_UDP_mem"]))
average_6hrs_UDP = (np.mean(node_sockstat_UDP_mem_perjob.loc[(node_sockstat_UDP_mem_perjob["ElapsedRaw"] >= 60) & (node_sockstat_UDP_mem_perjob["ElapsedRaw"] < 360), "node_sockstat_UDP_mem"]))
average_1day_UDP = (np.mean(node_sockstat_UDP_mem_perjob.loc[(node_sockstat_UDP_mem_perjob["ElapsedRaw"] >= 360) & (node_sockstat_UDP_mem_perjob["ElapsedRaw"] < 1440), "node_sockstat_UDP_mem"]))
average_1to5days_UDP = (np.mean(node_sockstat_UDP_mem_perjob.loc[(node_sockstat_UDP_mem_perjob["ElapsedRaw"] >= 1440), "node_sockstat_UDP_mem"]))
print("mean_5min",average_5min_UDP)
print("mean_1hr",average_1hr_UDP)
print("mean_6hrs",average_6hrs_UDP)
print("mean_1day",average_1day_UDP)
print("mean_1to5days",average_1to5days_UDP)

In [None]:
# Visualize results
cat = ['<5 minutes', '<1 hour', '<6 hours', '<1 day', '1 to 5 days']
vals = [average_5min_UDP, average_1hr_UDP, average_6hrs_UDP, average_1day_UDP, average_1to5days_UDP]

fig = plt.figure()
plt.yscale("symlog")
plt.ylim(bottom=1)
plt.ylim(top=10**5.2)
plt.locator_params(axis='y', numticks=12)
plt.bar(cat, vals, width=1, edgecolor="black")
plt.xlabel("Job duration - Each previous category is excluded in the next")
plt.ylabel("Mean node_sockstat_UDP_mem")
plt.tight_layout()
fig.savefig("UDP_mem_job_duration.pdf")

In [None]:
# Exclude 0 values, since in this instance we only want to analyze jobs with traffic
node_network_transmit_packets_perjob = pd.read_csv('node_network_transmit_packets_perjob.csv')  
node_network_transmit_packets_perjob = node_network_transmit_packets_perjob[(node_network_transmit_packets_perjob["node_network_transmit_packets"] > 0)]
# Transform runtime to hours
node_network_transmit_packets_perjob["elapsedraw"] = node_network_transmit_packets_perjob["elapsedraw"].apply(lambda x: x / 60 )
print(node_network_transmit_packets_perjob)

In [None]:
# Calculate averages for different time bins
average_5min_transmit = (np.mean(node_network_transmit_packets_perjob.loc[node_network_transmit_packets_perjob["ElapsedRaw"] < 5, "node_network_transmit_packets"]))
average_1hr_transmit = (np.mean(node_network_transmit_packets_perjob.loc[(node_network_transmit_packets_perjob["ElapsedRaw"] >= 5) & (node_network_transmit_packets_perjob["ElapsedRaw"] < 60), "node_network_transmit_packets"]))
average_6hrs_transmit = (np.mean(node_network_transmit_packets_perjob.loc[(node_network_transmit_packets_perjob["ElapsedRaw"] >= 60) & (node_network_transmit_packets_perjob["ElapsedRaw"] < 360), "node_network_transmit_packets"]))
average_1day_transmit = (np.mean(node_network_transmit_packets_perjob.loc[(node_network_transmit_packets_perjob["ElapsedRaw"] >= 360) & (node_network_transmit_packets_perjob["ElapsedRaw"] < 1440), "node_network_transmit_packets"]))
average_1to5days_transmit = (np.mean(node_network_transmit_packets_perjob.loc[(node_network_transmit_packets_perjob["ElapsedRaw"] >= 1440), "node_network_transmit_packets"]))
print("mean_5min",average_5min_transmit)
print("mean_1hr",average_1hr_transmit)
print("mean_6hrs",average_6hrs_transmit)
print("mean_1day",average_1day_transmit)
print("mean_1to5days",average_1to5days_transmit)

In [None]:
# Visualize results
cat = ['<5 minutes', '<1 hour', '<6 hours', '<1 day', '1 to 5 days']
vals = [average_5min_transmit, average_1hr_transmit, average_6hrs_transmit, average_1day_transmit, average_1to5days_transmit]

fig = plt.figure()
plt.yscale("symlog")
plt.ylim(bottom=1)
plt.ylim(top=10**13.5)
plt.locator_params(axis='y', numticks=12)
plt.bar(cat, vals, width=1, edgecolor="black")
plt.xlabel("Job duration - Each previous category is excluded in the next")
plt.ylabel("Mean transmitted packets")
plt.tight_layout()
fig.savefig("node_network_transmit_packets_job_duration.pdf")

In [None]:
# Generate scatterplot
fig = plt.figure()
plt.scatter((node_network_transmit_packets_perjob["ElapsedRaw"] / 60),
            node_network_transmit_packets_perjob["node_network_transmit_packets"], marker='o')
plt.xlabel("Job duration in hours")
plt.ylabel("Transmitted packets")
plt.tight_layout()
fig.savefig("node_network_transmit_packets_job_scatter.pdf")