# Epsilon TSP Job Analysis

The following notebook contains code and plots analysing Epsilons job configurations used for their **AggregateTSPMetricsJob-Resources** CDE job.

Cloudera has analysed the following job configurations

- 4 CPU Cores, 8GB Memory
- 2 CPU Cores, 8GB Memory
- 5 CPU Cores, 8GB Memory
Each jobs analysis has been done with a minimum of 200 jobs that were run with those configurations.

**The data for these jobs is fetched via the CDE API, as the logs for these jobs are also parsed to identify errors, the sample size has been kept small.**

In [1]:
# pip install xmltodict
# pip install requests-toolbelt
# pip install yagmail

## Importing the required Python Modules

In [2]:
import os, json, requests
import pandas as pd
import python_cde.cdeconnection as cde
from datetime import datetime as dt
import time
import requests
import subprocess as sp
#import yagmail
import sys
import warnings
warnings.filterwarnings('ignore')
import datetime

import matplotlib.pyplot as plt
import numpy as np
import itertools
import seaborn as sns
import matplotlib.colors as colors
import matplotlib.cm as cmx
from matplotlib.pyplot import figure
import epsilon_analysis as ea

## CDE API Token Generation

In [3]:
# Setting variables for script
JOBS_API_URL = "https://gxjqflc9.cde-q8k5xhqz.cdp-pcm.cpx9-02nn.cloudera.site/dex/api/v1"
WORKLOAD_USER = "csso_adh_bigdata_devs" #"cdpusername"
WORKLOAD_PASSWORD = "password" #"cdppwd"

# Instantiate the Connection to CDE
cde_connection = cde.CdeConnection(JOBS_API_URL, WORKLOAD_USER, WORKLOAD_PASSWORD)
TOKEN = cde_connection.set_cde_token()

#headers for API Call
headers = {
    'Authroization': f"Bearer {TOKEN}",
    'accept': 'application/json',
    'Content-Type': 'application/json'
}

## Issues

In [4]:
# job_runs = requests.get(JOBS_API_URL+"/jobs", headers=headers)
# print(job_runs)
# print(type(job_runs))
# job_runs.json()

In [5]:
#Query Filter not working as expected
# output = sp.getoutput(f'curl -s -H "Authorization: Bearer {TOKEN}" -H "Content-Type: application/json" -X GET "https://gxjqflc9.cde-q8k5xhqz.cdp-pcm.cpx9-02nn.cloudera.site/dex/api/v1/job-runs?offset=20200&limit=100"')
# convert_to_json = json.loads(output)

## Function Definitions

The following functions have been defined

- **get_data()**:- Takes a Job ID which will be our starting point along with a number which will be multiplied by 100 as the CDE API fetches job runs in batches of 100. This function returns a dataframe with the raw data collected from the API
- **filter_data()**:- Takes raw API data and performs filtering(filters TSP jobs) and cleaning such as dropping NaNs, converting data types of columns to datatime and int where needed
- **cores_memory()**:- Takes filterd jobs data and extracts the CPU core and Memory values from the "spark" object type column
- **add_error_column()**:- Most of Epsilon jobs fail due to these errors
  - No Eligible Deployments found from cmd view s3 files
  - OOMKilled
  - java.lang.String cannot be cast to java.sql.Timestamp
This function creates a new column "Error_Type". It parses through the logs of each job searching for the above errors. When an error is found,it is added to the Error_Type column of the Job ID

**add_runtimes()**:- This function calculates the runtimes of each jobs in minutes using the "started" and "ended" columns
**waittime()**:- This function extracts the app_start_time and app_stop_time from each jobs log and calculates the wait time in minutes by subtracting the started and app_start_time values

In [6]:
# Calling the CDE Jobs API and storing the data into a data frame
def get_data(offset_start,batches):
    runs = list(range(offset_start, (batches*100)+ offset_start, 100))
    raw_data = pd.DataFrame()
    for offset in runs:
        output = sp.getoutput(f'curl -s -H "Authorization: Bearer {TOKEN}" -H "Content-Type: application/json" -X GET "https://gxjqflc9.cde-q8k5xhqz.cdp-pcm.cpx9-02nn.cloudera.site/dex/api/v1/job-runs?offset={offset}&limit=100"')
        convert_to_json = json.loads(output)
        json_df = pd.DataFrame(convert_to_json["runs"])
        raw_data = raw_data.append(json_df, ignore_index=True)
    return raw_data

#Cleaning and Filtering
def filter_data(raw_data):
    raw_data = raw_data[['id','job', 'status', 'started', 'ended', 'spark']]
    TSP_filter = raw_data["job"] == "AggregateTSPMetricsJob" #Only TSP Jobs
    raw_data = raw_data.where(TSP_filter)
    issues = raw_data[raw_data["spark"] == {}].index
    raw_data = raw_data.drop(issues)
    raw_data = raw_data.dropna() # Drop any NA
    raw_data["status"] = raw_data["status"].astype('category')
    raw_data["id"] = raw_data["id"].astype('Int64') # Convert "Id" to Int
    raw_data["started"] = pd.to_datetime(raw_data["started"]) #Convert "started" to DateTime
    raw_data["ended"] = pd.to_datetime(raw_data["ended"]) #Convert "ended" to DateTime
    raw_data = raw_data.reset_index() # Reset dataframe index
    return raw_data

In [7]:
# Get cores and memory of the job from "spark" complex object
def cores_memory(clean_data):
    cores_list = [] 
    memory_list = []
    org_list = []
    init_exec = []
   

    for id in clean_data.index:
        cores_list.append(clean_data.spark[id]["spec"]["executorCores"])
    for id in clean_data.index:
        memory_list.append(clean_data.spark[id]["spec"]["executorMemory"])
    for id in clean_data.index:
        if "spark.dynamicAllocation.initialExecutors" in clean_data.spark[id]["spec"]["conf"]:
            init_exec.append(clean_data.spark[id]["spec"]["conf"]["spark.dynamicAllocation.initialExecutors"])
        else:
            init_exec.append(15)
    for id in clean_data.index:
        result = clean_data["spark"][id]["spec"]["args"][0]
        result_to_json = json.loads(result)
        result_json_df = pd.DataFrame(result_to_json)
        org_list.append(result_json_df["orgId"][0])
        

    cores_series = pd.Series(cores_list)
    memory_series = pd.Series(memory_list)
    org_series = pd.Series(org_list)
    init_exec_series = pd.Series(init_exec)

    # Create new dataframe with reduced columns
    inter_df = clean_data[['id','job', 'status', 'started', 'ended']]

    #Add Cores and Memory columns to dataframe
    inter_df["cores"] = cores_series.values
    inter_df["memory"] = memory_series.values
    inter_df["orgId"] = org_series.values
    inter_df["orgId"] = inter_df["orgId"].astype('category')
    inter_df["init_exec"] = init_exec_series.values
    inter_df["init_exec"] = inter_df["init_exec"].astype("category")
    return inter_df

In [8]:
# Create new dataframe containing all the errors
def add_error_column(inter_df):
    s3_error = []
    oom_error = []
    cast_error = []
    success = []
    misc = []
    
    success_df = inter_df[inter_df.status == "succeeded"]
    for job_run_id in success_df["id"]:
        success.append(job_run_id)
    
    fail_df = inter_df[inter_df.status == "failed"]    
    for job_run_id in fail_df["id"]:
        logs = sp.getoutput(f'curl -s -H "Authorization: Bearer {TOKEN}" -H "Content-Type: application/json" -X GET "https://gxjqflc9.cde-q8k5xhqz.cdp-pcm.cpx9-02nn.cloudera.site/dex/api/v1/job-runs/{job_run_id}/logs?type=driver/stderr&tailLines=10000"')
        if "No Eligible Deployments found from cmd view s3 files" in logs:
            s3_error.append(job_run_id)
        elif "OOMKilled" in logs:
            oom_error.append(job_run_id)
        elif "java.lang.String cannot be cast to java.sql.Timestamp" in logs:
            cast_error.append(job_run_id)
        else:
            misc.append(job_run_id)
    
    
    all_errors = [s3_error, oom_error, cast_error, misc, success]
    errors_df = pd.DataFrame((_ for _ in itertools.zip_longest(*all_errors)), columns=["S3_Error", "OOM_Error", "Cast_Error", "Misc", "Success"])
    errors_df["OOM_Error"] = errors_df["OOM_Error"].astype('category')
    errors_df["Cast_Error"] = errors_df["Cast_Error"].astype('category')
    errors_df["Misc"] = errors_df["Misc"].astype('category')
    errors_df["Success"] = errors_df["Success"].astype('category')

    # Add "Error_Type" column in dataframe
    inter_df["Error_Type"] = np.nan

    # Loop through each id in the dataset and assign it an Error_Type
    for i in inter_df["id"]:
        if i in errors_df["S3_Error"].values:
            index = inter_df.index[inter_df["id"] == i].tolist()
            inter_df["Error_Type"][index] = "S3_Error"
        elif i in errors_df["OOM_Error"].values:
            index = inter_df.index[inter_df["id"] == i].tolist()
            inter_df["Error_Type"][index] = "OOM_Error"
        elif i in errors_df["Cast_Error"].values:
            index = inter_df.index[inter_df["id"] == i].tolist()
            inter_df["Error_Type"][index] = "Cast_Error"
        elif i in errors_df["Success"].values:
            index = inter_df.index[inter_df["id"] == i].tolist()
            inter_df["Error_Type"][index] = "Successful Job"
        else:
            index = inter_df.index[inter_df["id"] == i].tolist()
            inter_df["Error_Type"][index] = "Misc"
    #Set "Error_Type" column as categorical variable
    inter_df["Error_Type"] = inter_df["Error_Type"].astype('category')
    print(f'Jobs with S3 error:{s3_error}')
    print(f'Total: {len(s3_error)}')
    print(f'Jobs with OOM error:{oom_error}')
    print(f'Total: {len(oom_error)}')
    print(f'Jobs with Casting error:{cast_error}')
    print(f'Total: {len(cast_error)}')
    print(f'Misc - Success :{success}')
    print(f'Total: {len(success)}')
    print(f'Misc - Killed/Failed Due to other reasons :{misc}')
    print(f'Total: {len(misc)}')
    
    return inter_df

In [9]:
def add_runtimes(inter_df):
    #Add "runtime" variable by subtracting "ended" and "started"

    inter_df["runtime"] = inter_df["ended"] - inter_df["started"]
    inter_df.head()

    # Set "runtime" column to minutes
    inter_df["runtime"] = inter_df["runtime"] / pd.Timedelta(minutes=1)
    inter_df["runtime"]
    
    return inter_df

In [10]:
def waittime(inter_df):
    start_time = []
    stop_time = []
    job_id = []
    for job_run_id in inter_df["id"]:
        logs = sp.getoutput(f'curl -s -H "Authorization: Bearer {TOKEN}" -H "Content-Type: application/json" -X GET "https://gxjqflc9.cde-q8k5xhqz.cdp-pcm.cpx9-02nn.cloudera.site/dex/api/v1/job-runs/{job_run_id}/logs?type=driver/event"')
        words = logs.splitlines()
        try:
            for word in words:
                event = json.loads(word)
                if event["Event"] == "SparkListenerApplicationStart":
                    timestamp = event["Timestamp"]/1000
                    start_time_convert = datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
                    start_time.append(start_time_convert)
        except Exception: 
            print(f"App Start Time JSONDecodeError {job_run_id}")
        try:
            for word in words:
                event = json.loads(word)
                if event["Event"] == "SparkListenerApplicationEnd":
                    timestamp = event["Timestamp"]/1000
                    stop_time_convert = datetime.datetime.utcfromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S')
                    stop_time.append(stop_time_convert)
        except Exception: 
            print(f"App Stop Time JSONDecodeError {job_run_id}")

        job_id.append(job_run_id)
    
    start_series = pd.Series(start_time)
    stop_series = pd.Series(stop_time)
    inter_df["app_start_time"] = pd.Series(start_series)
    inter_df["app_stop_time"] = pd.Series(stop_series)
    inter_df["app_start_time"] = pd.to_datetime(inter_df["app_start_time"], utc=True)
    inter_df["app_stop_time"] = pd.to_datetime(inter_df["app_stop_time"], utc=True)
    inter_df["waitTime"] = inter_df["app_start_time"] - inter_df["started"]
    inter_df["waitTime"] = inter_df["waitTime"] / pd.Timedelta(minutes=1)
    print(f'Total: {len(start_time)}')
    print(f'Total: {len(stop_time)}')
#     print(f'Job IDs: {job_id}')
#     print(f'Total Job IDs: {len(job_id)}')
    return inter_df

## Analysis for 6vCPU 24GB with 1 Executor

In [11]:
offset_start = 34676 #Jobs will be retrieved starting from this Job Run ID
batches = 10 # Number*100 of jobs will be fetched, e.g. When value is 4 -> 4*100 = 400 jobs Original 6

In [12]:
%%time
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_error_column(clean_data)
clean_data = add_runtimes(clean_data)

Jobs with S3 error:[34679, 34681, 34683, 34686, 34687, 34688, 34690, 34692, 34695, 34697, 34700, 34703, 34704, 34705, 34708, 34711, 34713, 34715, 34717, 34718, 34719, 34720, 34721, 34722, 34724, 34726, 34729, 34731, 34734, 34737, 34738, 34739, 34740, 34743, 34744, 34747, 34749, 34752, 34755, 34756, 34757, 34758, 34761, 34764, 34766, 34769, 34771, 34772, 34773, 34774, 34776, 34778, 34781, 34784, 34786, 34787, 34788, 34789, 34790, 34791, 34794, 34797, 34799, 34801, 34803, 34804, 34805, 34806, 34808, 34815, 34816, 34818, 34820, 34822, 34823, 34824, 34825, 34826, 34827, 34831, 34834, 34836, 34839, 34841, 34842, 34843, 34844, 34847, 34848, 34851, 34854, 34856, 34857, 34858, 34859, 34860, 34861, 34862, 34865, 34868, 34870, 34873, 34874, 34875, 34876, 34877, 34885, 34887, 34890, 34892, 34895, 34898, 34899, 34900, 34901, 34904, 34905, 34908, 34910, 34913, 34915, 34916, 34917, 34918, 34921, 34922, 34924, 34926, 34928, 34930, 34931, 34932, 34933, 34936, 34937, 34940, 34942, 34945, 34948, 34949, 

In [13]:
clean_data = waittime(clean_data)

Total: 397
Total: 397


In [14]:
final_df24_exec_1 = clean_data

In [15]:
final_df24_exec_1.to_csv("six_core_24_exec_1_Jan19.csv", index=False)

## Analysis for 6vCPU 24GB

In [None]:
offset_start = 21802 #Jobs will be retrieved starting from this Job Run ID
batches = 12 # Number*100 of jobs will be fetched, e.g. When value is 4 -> 4*100 = 400 jobs Original 6

In [None]:
%%time
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_error_column(clean_data)
clean_data = add_runtimes(clean_data)

In [None]:
clean_data = waittime(clean_data)

In [None]:
final_df24 = clean_data

In [None]:
final_df24.to_csv("six_core_24.csv", index=False)

## Analysis for 6vCPU 14GB

Job ID 21174 was the first job run with the configuration of 6 cores and 14GB of memory. It was run on Dec 21st 06:00 PM. We have used a batch size of 4, which will fetch 400 jobs for us, these will be filtered and reduced down further to create our sample data of **354 TSP Jobs**

In [None]:
offset_start = 21174 #Jobs will be retrieved starting from this Job Run ID
batches = 6 # Number*100 of jobs will be fetched, e.g. When value is 4 -> 4*100 = 400 jobs Original 4

In [None]:
%%time
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_error_column(clean_data)
clean_data = add_runtimes(clean_data)

In [None]:
%%time
clean_data = waittime(clean_data)

In [None]:
final_df14 = clean_data


In [None]:
final_df14.to_csv("six_core_14.csv", index=False)

In [None]:
final_df14.head()

# Analysis for 4vCPU 8GB

Job ID 20243 was the first job run with the configuration of 4 cores and 8GB of memory. It was run on We have used a batch size of 4, which will fetch 400 jobs for us, these will be filtered and reduced down further to create our sample data of **354 TSP Jobs**

In [None]:
offset_start = 20242 #Jobs will be retrieved starting from this Job Run ID
batches = 4 # Number*100 of jobs will be fetched, e.g. When value is 4 -> 4*100 = 400 jobs Original 4

In [None]:
%%time
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_error_column(clean_data)

In [None]:
clean_data = add_runtimes(clean_data)

In [None]:
%%time
clean_data = waittime(clean_data)

In [None]:
#TODO: investigate Run ID 20301. It succeeded, but had an OOM error

In [None]:
final_df4 = clean_data
final_df4.tail(10)

In [None]:
final_df4.to_csv("four_core_8.csv", index=False)

## Visualizations

The following visualization shows us the runtimes of each job with 4 cores and 8GB memory configured. **The average run time of hese jobs ~40 minutes**

In [None]:
jobs = final_df4
jobs_by_org = jobs.groupby(['orgId']).count()
jobs_by_org = jobs_by_org["id"]
jobs_by_org.sort_values(ascending=False)

In [None]:
jobs = final_df4
jobs_by_org = jobs.groupby(['orgId',"status"]).count()
jobs_by_org = jobs_by_org["id"]
jobs_by_org.sort_values(ascending=False)

## Out of Memory Error by Organisations

In [None]:
oom_error = final_df4[final_df4["Error_Type"] == "OOM_Error"]
error_by_org = oom_error.groupby(['orgId','Error_Type']).count()
error_by_org = error_by_org["id"]
error_by_org.sort_values(ascending=False)


In [None]:
error_by_org.tail(40)

In [None]:
print()
ax = error_by_org.unstack(level=1).plot(kind='bar', subplots=True, rot=0, figsize=(16,10), layout=(4,1), xlabel = "Status", ylabel = "Count", title = "Individual Cores Distribution" )

## ScatterPlot of Job Runtimes

In [None]:
#Get required data
x = final_df4["id"]
y = final_df4["runtime"]

#Get unique status
uniq = list(set(final_df4["status"]))

# Set color map to match number of status types
z = range(1, len(uniq))
hot = plt.get_cmap('hot')
cnorm = colors.Normalize(vmin=0, vmax=len(uniq))
scalarMap = cmx.ScalarMappable(norm=cnorm, cmap=hot)

plt.figure(figsize=(15,8))

# Plot each status
for i in range(len(uniq)):
    indx = final_df4["status"] == uniq[i]
    plt.scatter(x[indx], y[indx], s = 50, color=scalarMap.to_rgba(i), label=uniq[i])

#Calculate Mean Run time
runtime_mean = [np.mean(final_df4["runtime"])]*len(final_df4.index)
print(f"Average Runtime:{runtime_mean[0]}")


#Add Plot details
plt.title("Runtime of Jobs", size = 15)
plt.xlabel("Job ID", size = 15)
plt.ylabel("Minutes", size = 15)
plt.legend(loc='upper right', fontsize = 15)
#Add Mean Line
plt.plot(final_df4["id"],runtime_mean, label='Mean', linestyle='-', color = 'blue')
plt.show()

We can see from the below bar plot that out 354 jobs runs:

- **167 failed with an S3 error**
- **120 failed with an Out of Memory Error**
- **55 failed with a casting error**

and the rest were Miscellaneous

In [None]:
#Error Distribution
sns.set(rc={'figure.figsize':(15,7)})
ax = sns.countplot(final_df4["Error_Type"], order = final_df4["Error_Type"].value_counts().index)
ax.set_title("Error Type Distribution", size = 15)
ax.set_xlabel("Error_Type", size=15)
ax.set_ylabel("Count", size=15)
ax.bar_label(ax.containers[0])

# Analysis for 2 vCPU 8GB Jobs

Job ID 12916 was run on December 02nd with 2 cores and 8GB of memory. We will be using a batch size of 8 to collect a sufficient sample size. **The Sample size for this configuration is 212**

In [None]:
offset_start = 12916 #Jobs will be retrieved starting from this Job Run ID
batches = 8 # Number*100 of jobs will be fetched, e.g. When value is 4 -> 4*100 = 400 jobs 

In [None]:
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_runtimes(clean_data)
clean_data = add_error_column(clean_data)

In [None]:
%%time
clean_data = waittime(clean_data)

In [None]:
final_df2 = clean_data
final_df2.head(10)

In [None]:
final_df2.to_csv("two_core_8.csv", index=False)

## Visualizations

## Out of Memory Error by Organisations

In [None]:
oom_error = final_df2[final_df2["Error_Type"] == "OOM_Error"]
error_by_org = oom_error.groupby(['orgId','Error_Type']).count()
error_by_org = error_by_org["id"]
error_by_org

In [None]:
print()
ax = error_by_org.unstack(level=1).plot(kind='bar', subplots=True, rot=0, figsize=(16,10), layout=(4,1), xlabel = "Status", ylabel = "Count", title = "Individual Cores Distribution" )

## ScatterPlot of Job Runtimes

In [None]:
#Get required data
x = final_df2["id"]
y = final_df2["runtime"]

#Get unique status
uniq = list(set(final_df2["status"]))

# Set color map to match number of status types
z = range(1, len(uniq))
hot = plt.get_cmap('hot')
cnorm = colors.Normalize(vmin=0, vmax=len(uniq))
scalarMap = cmx.ScalarMappable(norm=cnorm, cmap=hot)

plt.figure(figsize=(15,8))

# Plot each status
for i in range(len(uniq)):
    indx = final_df2["status"] == uniq[i]
    plt.scatter(x[indx], y[indx], s = 50, color=scalarMap.to_rgba(i), label=uniq[i])

#Calculate Mean Run time
runtime_mean = [np.mean(final_df2["runtime"])]*len(final_df2.index)
print(f"Average Runtime:{runtime_mean[0]}")


#Add Plot details
plt.title("Runtime of Jobs", size = 15)
plt.xlabel("Job ID", size = 15)
plt.ylabel("Minutes", size = 15)
plt.legend(loc='upper right', fontsize = 15)
#Add Mean Line
plt.plot(final_df2["id"],runtime_mean, label='Mean', linestyle='-', color = 'blue')
plt.show()

We can see from the below bar plot that out 212 jobs runs:

- **162 failed with an S3 error**
- **31 failed with a casting error**
- **12 failed with an Out of Memory Error**

and the rest were Miscellaneous

In [None]:
#Error Distribution
sns.set(rc={'figure.figsize':(15,7)})
ax = sns.countplot(final_df2["Error_Type"], order = final_df2["Error_Type"].value_counts().index)
ax.set_title("Error Type Distribution", size = 15)
ax.set_xlabel("Error_Type", size=15)
ax.set_ylabel("Count", size=15)
ax.bar_label(ax.containers[0])

# Analysis for 5 vCPU and 8 GB Jobs

Job ID 10678 was run on November 24th with 5 cores and 8GB of memory. We will be using a batch size of 12 to collect a sufficient sample size. **The Sample size for this configuration is 232**

In [None]:
offset_start = 10678 #Jobs will be retrieved starting from this Job Run ID
batches = 12 # Number*100 of jobs will be fetched, e.g. When value is 12 -> 12*100 = 1200 jobs 

In [None]:
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_runtimes(clean_data)
clean_data = add_error_column(clean_data)

In [None]:
clean_data = waittime(clean_data)

In [None]:
clean_data.tail(120)

In [None]:
final_df5 = clean_data
final_df5.head(10)

In [None]:
final_df5.to_csv("five_core_8.csv", index=False)

## Visualizations

The following visualization shows us the runtimes of each job with 5 cores and 8GB memory configured. **The average run time of these jobs ~15 minutes**

## Out of Memory Error by Organisations

In [None]:
oom_error = final_df5[final_df5["Error_Type"] == "OOM_Error"]
error_by_org = oom_error.groupby(['orgId','Error_Type']).count()
error_by_org = error_by_org["id"]
error_by_org

In [None]:
print()
ax = error_by_org.unstack(level=1).plot(kind='bar', subplots=True, rot=0, figsize=(16,10), layout=(4,1), xlabel = "Status", ylabel = "Count", title = "Individual Cores Distribution" )

## ScatterPlot of Job Runtimes

In [None]:
#Get required data
x = final_df5["id"]
y = final_df5["runtime"]

#Get unique status
uniq = list(set(final_df5["status"]))

# Set color map to match number of status types
z = range(1, len(uniq))
hot = plt.get_cmap('hot')
cnorm = colors.Normalize(vmin=0, vmax=len(uniq))
scalarMap = cmx.ScalarMappable(norm=cnorm, cmap=hot)

plt.figure(figsize=(15,8))

# Plot each status
for i in range(len(uniq)):
    indx = final_df5["status"] == uniq[i]
    plt.scatter(x[indx], y[indx], s = 50, color=scalarMap.to_rgba(i), label=uniq[i])

#Calculate Mean Run time
runtime_mean = [np.mean(final_df5["runtime"])]*len(final_df5.index)
print(f"Average Runtime:{runtime_mean[0]}")


#Add Plot details
plt.title("Runtime of Jobs", size = 15)
plt.xlabel("Job ID", size = 15)
plt.ylabel("Minutes", size = 15)
plt.legend(loc='upper right', fontsize = 15)
#Add Mean Line
plt.plot(final_df5["id"],runtime_mean, label='Mean', linestyle='-', color = 'blue')
plt.show()

We can see from the below bar plot that out 212 jobs runs:

- **117 failed with an Out of Memory Error**
- **102 failed with an S3 error**

and the rest were Miscellaneous

In [None]:
#Error Distribution
sns.set(rc={'figure.figsize':(15,7)})
ax = sns.countplot(final_df5["Error_Type"], order = final_df5["Error_Type"].value_counts().index)
ax.set_title("Error Type Distribution", size = 15)
ax.set_xlabel("Error_Type", size=15)
ax.set_ylabel("Count", size=15)
ax.bar_label(ax.containers[0])

# All Jobs

This section will be used to analyze all the jobs that have been run on the PCM Prod cluster. We will not be doing an error type analysis as parsing through logs of over 20,000 jobs will not be reasonable. This data(upto Job ID 20500) will be used for additional visualizations.

**Close to ~7500 TSP jobs have been run on the PCM Prod Cluster**

In [11]:
offset_start = 10 #Jobs will be retrieved starting from this Job Run ID
batches = 363 # Number*100 of jobs will be fetched, e.g. When value is 4 -> 4*100 = 400 jobs 

In [12]:
%%time
#Creating new dataframe for Clean Data
raw_data = get_data(offset_start, batches)
clean_data = filter_data(raw_data)
clean_data = cores_memory(clean_data)
clean_data = add_runtimes(clean_data)

CPU times: total: 36.9 s
Wall time: 1min 57s


In [13]:
all_jobs = clean_data

In [14]:
all_jobs.tail()

Unnamed: 0,id,job,status,started,ended,cores,memory,orgId,init_exec,runtime
13553,36283,AggregateTSPMetricsJob,failed,2023-01-18 12:30:01+00:00,2023-01-18 12:34:34+00:00,6,24g,TRAINING_HDS_APAC_DEMO_BU_HDS,1,4.55
13554,36285,AggregateTSPMetricsJob,failed,2023-01-18 12:36:02+00:00,2023-01-18 12:37:04+00:00,6,24g,P2DEMXLOYALTYBU_P2DEMX_Loyalty_BU_1559207031463,1,1.033333
13555,36288,AggregateTSPMetricsJob,failed,2023-01-18 12:42:03+00:00,2023-01-18 12:45:39+00:00,6,24g,TRAINING_HDS_HNY_ENV1_HDS,1,3.6
13556,36291,AggregateTSPMetricsJob,failed,2023-01-18 12:48:05+00:00,2023-01-18 12:49:09+00:00,6,24g,EPS_SALES_HDS_DIG_INCORP_HDS,1,1.066667
13557,36295,AggregateTSPMetricsJob,failed,2023-01-18 12:54:08+00:00,2023-01-18 12:58:29+00:00,6,24g,CDP_AUTO_QEPROD_1_CDP_SUB_AUTO_QEPROD_1,1,4.35


In [15]:
all_jobs.to_csv("All_Jobs_Data.csv", index=False)

In [None]:
def autopct_format(values):
        def my_format(pct):
            total = sum(values)
            val = int(round(pct*total/100.0))
            return '{:.1f}%\n({v:d})'.format(pct, v=val)
        return my_format


s = all_jobs['status'].value_counts()
explode = (0,0.7,0.5)
plt.pie(s,labels = s.index, explode = explode, autopct=autopct_format(s))

## Breakdown of Jobs based on Cores

Break down of Epsilon job runs on the basis of CPU cores

**5 Cores** | **4 Cores** | **2 Cores**
--- | --- | ---
Failed: 4157 | Failed: 315 | Failed: 2716
Killed: 11 | Killed: NA | Killed: 2
Succeeded: 219 | Succeeded: 13 | Succeeded: 54
**Total: 4387** | **Total: 328** | **Total: 2772** 

In [None]:
job_distr = all_jobs.groupby(['status','cores']).count()
job_distr = job_distr["id"]
print(job_distr)
ax = job_distr.unstack(level=1).plot(kind='bar', subplots=True, rot=0, figsize=(16,10), layout=(1,3), xlabel = "Status", ylabel = "Count", title = "Individual Cores Distribution" )

## Runtime Statistics

In [None]:
#Calculating Runtime statistics
runtime_mean = all_jobs.groupby(['status','cores']).mean()
runtime_mean = runtime_mean["runtime"]
runtime_max = all_jobs.groupby(['status','cores']).max()
runtime_max = runtime_max["runtime"]
runtime_min = all_jobs.groupby(['status','cores']).min()
runtime_min = runtime_min["runtime"]
runtime_median = all_jobs.groupby(['status','cores']).median()
runtime_median = runtime_median["runtime"]
runtime_std = all_jobs.groupby(['status','cores']).std()
runtime_std = runtime_std["runtime"]

## Mean Runtime of Jobs

In [None]:
print(f'Mean Runtime of Jobs per Core\n {runtime_mean}')

## Max Runtime of Jobs

In [None]:
print(f'Max Runtime of Jobs per Core\n {runtime_max}')

## Minimum Runtime of Jobs

In [None]:
print(f'Min Runtime of Jobs per Core\n {runtime_min}')

## Median Runtime of Jobs

In [None]:
print(f'Median Runtime of Jobs per Core\n {runtime_median}')

## Runtime Standard Deviation

In [None]:
print(f'Runtime Standard Deviation per Core\n {runtime_std}')

## Histogram of Job Runtimes per core

In [None]:
Success_filter = all_jobs["status"] == "succeeded" #Only TSP Jobs
filtered_data = all_jobs.where(Success_filter)
filtered_data = filtered_data.dropna()
filtered_data

In [None]:
core_filter_5 =  filtered_data["cores"] == 5.0 #Only TSP Jobs
core_data_5 = filtered_data.where(core_filter_5)
core_data_5 = core_data_5.dropna()
core_data_5

plt.figure(figsize=(15,8))
plt.hist(core_data_5["runtime"])
plt.show

In [None]:
core_filter_4 =  filtered_data["cores"] == 4.0 #Only TSP Jobs
core_data_4 = filtered_data.where(core_filter_4)
core_data_4 = core_data_4.dropna()
core_data_4
plt.figure(figsize=(15,8))
plt.hist(core_data_4["runtime"])
plt.show

In [None]:
core_filter_2 =  filtered_data["cores"] == 2.0 #Only TSP Jobs
core_data_2 = filtered_data.where(core_filter_2)
core_data_2 = core_data_2.dropna()
core_data_2
plt.figure(figsize=(15,8))
plt.hist(core_data_2["runtime"])
plt.show

In [None]:
core_filter_2 =  filtered_data["cores"] == 2.0 #Only TSP Jobs
core_data_2 = filtered_data.where(core_filter_2)
core_data_2 = core_data_2.dropna()
core_data_2

sns.set(rc={'figure.figsize':(15,7)})
sns.kdeplot(core_data_2["runtime"])
