In [None]:
# Script to get JSON data from 
import re
import matplotlib.pyplot as plt
import json
import os
import datetime
import numpy as np
import seaborn as sns
import pandas as pd
pd.set_option('display.max_columns', 500)

cwd = os.getcwd()

#@MARIAMA - If you move spark-stats directory in all to the parent directory (what is base_dir below), then
#you should be able to get rid of the following if statement.
#Make sure you pull from master.  There is a new metrics directory in the parent directory, so do NOT pull
#metrics from the all directory.

stats_dir = "spark-stats"
metrics_dir = "metrics"
cluster_stats="cluster-spark-stats"
cluster_metrics="cluster_metrics"
base_dir = os.path.dirname(cwd)

#provides an absolute path to spark-stats directory
spark_stats_dir=""
#provides an absolute path to metrics directory (the run-output files)
spark_metrics_dir=""
cluster_stats_dir=""
cluster_metrics_dir=""
if(cwd.startswith("C")): #On a Windows
    spark_stats_dir = base_dir + "\\" + stats_dir
    spark_metrics_dir = base_dir + "\\" + metrics_dir
    cluster_stats_dir = base_dir + "\\" + cluster_stats
    cluster_metrics_dir = base_dir + "\\" + cluster_metrics
else:
    spark_stats_dir = base_dir + "/" + stats_dir
    spark_metrics_dir = base_dir + "/" + metrics_dir
    cluster_stats_dir = base_dir + "/" + cluster_stats
    cluster_metrics_dir = base_dir + "/" + cluster_metrics

metric_output_mapping = {
    "basic-dataframe-run-output": "BasicDataframe",
    "basic-dataset-run-output": "BasicDataset",
    "basic-rdd-run-output": "BasicRDD",
    "cache-dataframe-run-output": "CacheDataframe",
    "cache-dataset-run-output": "CacheDataset",
    "cache-rdd-run-output": "CacheRDD",
    "kmeans-dataframe-run-output": "KMeansDataframe",
    "kmeans-dataset-run-output": "KMeansDataset",
    "map-partitions-rdd-run-output": "MapPartitionsRDD",
    "map-rdd-run-output": "MapRDD",
    "partition-dataframe-run-output": "PartitionDataframe",
    "parititon-dataset-run-output": "PartitionDataset",
    "partition-rdd-run-output": "PartitionRDD"
}

#Invert the above mapping
metric_output_mapping_inv = {v: k for k, v in metric_output_mapping.items()}

#Given spark-stats app name, get the file name
def get_spark_stats_file(appName):
    if(cwd.startswith("C")):
        return spark_stats_dir + "\\" + appName
    else:
        return spark_stats_dir + "/" + appName

#Given the metrics output name, get the file name
def get_metrics_file(fileName):
    if(cwd.startswith("C")):
        return spark_metrics_dir + "\\" + fileName
    else:
        return spark_metrics_dir + "/" + fileName

#Given a run-output file name (in metrics directory), get the appNames associated with that run
def get_associated_appnames(run_output_name):
    valid_files = list(pd.read_csv(get_metrics_file(run_output_name))["appName"])
    return [a_file for a_file in os.listdir(spark_stats_dir) if a_file in valid_files]

#Given an appName (also a file name in the spark-stats directory), get the run parameters associated with that run
def get_run_params(appName):
    for beginningOutput in metric_output_mapping_inv:
        if appName.startswith(beginningOutput):
            metrics_df = pd.read_csv(get_metrics_file(metric_output_mapping_inv[beginningOutput]))
            return metrics_df[metrics_df["appName"] == appName]

#Sort the metrics dataframe by threadCount, then executorMem, then overheadMem.  This will only apply to local jobs as is
def metrics_df_sort(metrics_df):
    return metrics_df.sort_values(by=['threadCount', 'executorMem', 'overheadMem'])

#Get the metrics dataframe for the specified run-output file name. Then clean up and sort  This will only apply to local jobs as is
def get_metrics(run_output_name):
    raw_metrics = pd.read_csv(get_metrics_file(run_output_name))
    raw_metrics["executorMem"] = raw_metrics["executorMem"].apply(lambda x: int(x[:-1]))
    raw_metrics["overheadMem"] = raw_metrics["overheadMem"].apply(lambda x: int(x[:-1]))
    return raw_metrics

#Given an app name (file name in spark-stats since they match), get the dataframe for that app name
def get_spark_stats(appName):
    return pd.read_json(get_spark_stats_file(appName))

print(cwd)
print(base_dir)
print(get_associated_appnames("basic-dataframe-run-output"))

In [None]:
#Just messing around. The output in this paragraph is a list of the stats available to be used
map_partitions_files = get_associated_appnames("map-partitions-rdd-run-output")
map_files = get_associated_appnames("map-rdd-run-output")
print(map_partitions_files)
print(map_files)
sample = pd.read_json(get_spark_stats_file(map_partitions_files[0]))
list(sample)

In [None]:
sample["stageId"].value_counts()

In [None]:
# Given the spark-stats dataframe, find the start time, end time, and run duration time in millis for the dataframe
def get_full_execution_time(df):
    start_time = min(df["launchTime"])
    end_time = max(df["finishTime"])
    time_range = end_time - start_time
    return (start_time, end_time, time_range)

# Given the spark-stats dataframe, calculate the total cpu time for the job
def get_full_cpu_time(df):
    return df["executorCpuTime"].sum()

# Given the spark-stats dataframe, calculate the max peakExecutionMemory for the job
def get_max_peakExecutionMemory(df):
    if df is None:
        return 0
    return max(df["peakExecutionMemory"])

# Given the spark-stats dataframe, calculate the min peakExecutionMemory for the job
def get_min_peakExecutionMemory(df):
    return min(df["peakExecutionMemory"])

In [None]:
t = get_full_execution_time(sample)

In [None]:
t

In [None]:
get_run_params(map_partitions_files[0])

In [None]:
def generate_graph(data, xlabel, ylabel, title, savefig="graph.png"):
    ax = sns.lineplot(data=data, linewidth=2.5)
    ax.set(xlabel=xlabel, ylabel=ylabel, title=title)
    plt.savefig("images/"+ savefig)

    #Cleanup the plot that we just printed so the one below doesn't have extra data
    plt.cla()

In [None]:
# Map Partitions RDD vs Map RDD
# Line plot of the job num vs execution time
# Note that basic-rdd and map-partitions are the exact same.  They were just labelled differently.
# I was curious to see if they exhibited the same run times, but they don't.  I don't know how to explain the differences
mapPartitions_vs_map_df = {
    "BasicRDD": get_metrics("basic-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "MapPartitions": get_metrics("map-partitions-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "Map": get_metrics("map-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(mapPartitions_vs_map_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When\nUsing Map vs MapPartitions vs BasicRDD", savefig="mapVsMapPartitionsAllExecutionTime.png")

#Cleanup the plot that we just printed so the one below doesn't have extra data
plt.cla()

#Now do cleanup. Since BasicRDD and MapParitions are the same job, let's just take the minimum execution time
#to compare to the execution time of the map job.
data["MapPartitions"] = data[["MapPartitions", "BasicRDD"]].min(axis=1)
data = data.drop(["BasicRDD"], axis=1)

generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Using Map vs MapPartitions", savefig="mapVsMapPartitionsBestExecutionTime.png")

In [None]:
# I wanted to see what comparing the CPU Time would look like.
mapPartitions_vs_map_df = {
    "BasicRDD": get_metrics("basic-rdd-run-output")["appName"].apply(lambda appName:get_full_cpu_time(get_spark_stats(appName))),
    "MapPartitions": get_metrics("map-partitions-rdd-run-output")["appName"].apply(lambda appName:get_full_cpu_time(get_spark_stats(appName))),
    "Map": get_metrics("map-rdd-run-output")["appName"].apply(lambda appName:get_full_cpu_time(get_spark_stats(appName))),  
}
data = pd.DataFrame(mapPartitions_vs_map_df)

#Again, just seeing how BasicRDD and MapPartitions jobs compare.
generate_graph(data, "Job Number", "Total CPU Time of Executors (sec)", 
               title="CPU Execution Time Comparison When\nUsing Map vs MapPartitions vs BasicRDD", savefig="mapVsMapPartitionsAllCpuTime.png")

#Now, just seeing how minimum CPU Time of BasicRDD and MapPartitions jobs compare to the Map job.
data["MapPartitions"] = data[["MapPartitions", "BasicRDD"]].min(axis=1)
data = data.drop(["BasicRDD"], axis=1)

generate_graph(data, "Job Number", "Total CPU Time of Executors (sec)", 
               title="CPU Execution Time Comparison When Using Map vs MapPartitions", savefig="mapVsMapPartitionsBestCpuTime.png")


In [None]:
# Now I'm comparing the max peak execution memory of the mapParititons and the map jobs.
mapPartitions_vs_map_df = {
    "MapPartitions": get_metrics("map-partitions-rdd-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),
    "Map": get_metrics("map-rdd-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
}
data = pd.DataFrame(mapPartitions_vs_map_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison When\nUsing Map vs MapPartitions", savefig="mapVsMapPartitionsMaxPeakExecutionMemory.png")


In [None]:
# Now I'm comparing the min peak execution memory of the mapParititons and the map jobs.  Was curious to see
mapPartitions_vs_map_df = {
    "MapPartitions": get_metrics("map-partitions-rdd-run-output")["appName"].apply(lambda appName:get_min_peakExecutionMemory(get_spark_stats(appName))),
    "Map": get_metrics("map-rdd-run-output")["appName"].apply(lambda appName:get_min_peakExecutionMemory(get_spark_stats(appName))),  
}
data = pd.DataFrame(mapPartitions_vs_map_df)

generate_graph(data, "Job Number", "Min Peak Execution Memory (bytes)", 
               title="Min Peak Execution Memory Comparison When\nUsing Map vs MapPartitions", savefig="mapVsMapPartitionsMinPeakExecutionMemory.png")


In [None]:
#Let's get metrics on the input of the job
sample_mapPartitionsJob_appName = get_metrics("map-partitions-rdd-run-output")["appName"][0]
df = get_spark_stats(sample_mapPartitionsJob_appName)
sum(df[df["stageId"] == 0]["bytesRead"])


In [None]:
sum(df[df["stageId"] == 0]["recordsRead"])

In [None]:
basic_jobs_df = {
    "BasicRDD": get_metrics("basic-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "BasicDataset": get_metrics("basic-dataset-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "BasicDataframe": get_metrics("basic-dataframe-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(basic_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Running\nBasic Count Job for RDD vs Dataset vs DataFrame", savefig="basicJobAllExecutionTime.png")

basic_jobs_df = {
    "BasicRDD": get_metrics("basic-rdd-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),
    "BasicDataset": get_metrics("basic-dataset-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
    "BasicDataframe": get_metrics("basic-dataframe-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
}
data = pd.DataFrame(basic_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison Running\nBasic Count Job for RDD vs Dataset vs DataFrame", savefig="basicJobMaxPeakExecutionMemory.png")




In [None]:
partition_jobs_df = {
    "PartitionRDD": get_metrics("partition-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "PartitionDataset": get_metrics("partition-dataset-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "PartitionDataframe": get_metrics("partition-dataframe-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(partition_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Running\nPartition Job for RDD vs Dataset vs DataFrame", savefig="partitionJobAllExecutionTime.png")

partition_jobs_df = {
    "PartitionRDD": get_metrics("partition-rdd-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),
    "PartitionDataset": get_metrics("partition-dataset-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
    "PartitionDataframe": get_metrics("partition-dataframe-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
}
data = pd.DataFrame(partition_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison Running\nPartition Job for RDD vs Dataset vs DataFrame", savefig="partitionJobMaxPeakExecutionMemory.png")


In [None]:
cache_jobs_df = {
    "CacheRDD": get_metrics("cache-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "CacheDataset": get_metrics("cache-dataset-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "CacheDataframe": get_metrics("cache-dataframe-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(cache_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Running\nCache Job for RDD vs Dataset vs DataFrame", savefig="cacheJobAllExecutionTime.png")

cache_jobs_df = {
    "CacheRDD": get_metrics("cache-rdd-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),
    "CacheDataset": get_metrics("cache-dataset-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
    "CacheDataframe": get_metrics("cache-dataframe-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
}
data = pd.DataFrame(cache_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison Running\nCache Job for RDD vs Dataset vs DataFrame", savefig="cacheJobMaxPeakExecutionMemory.png")


In [None]:
kmeans_jobs_df = {
#     "KMeansRDD": get_metrics("kmeans-rdd-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "KMeansDataset": get_metrics("kmeans-dataset-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),
    "KMeansDataframe": get_metrics("kmeans-dataframe-run-output")["appName"].apply(lambda appName:get_full_execution_time(get_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(kmeans_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Running\nKMeans Job for RDD vs Dataset vs DataFrame", savefig="kmeansJobAllExecutionTime.png")

kmeans_jobs_df = {
#     "KMeansRDD": get_metrics("kmeans-rdd-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),
    "KMeansDataset": get_metrics("kmeans-dataset-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
    "KMeansDataframe": get_metrics("kmeans-dataframe-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_spark_stats(appName))),  
}
data = pd.DataFrame(kmeans_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison Running\nKMeans Job for RDD vs Dataset vs DataFrame", savefig="kmeansJobMaxPeakExecutionMemory.png")


In [None]:
maxPeakExecutionMemory = 0
for appName in get_metrics("map-partitions-rdd-run-output")["appName"]:
    maxPeakExecutionMemory = max(maxPeakExecutionMemory, get_max_peakExecutionMemory(get_spark_stats(appName)))
sample_df = get_spark_stats(get_metrics("map-partitions-rdd-run-output")["appName"][0])

In [None]:
sample_df[sample_df["peakExecutionMemory"] == maxPeakExecutionMemory]

In [None]:
#Okay, this is prep for the two graphics below.  Basically, take all of the stats for the map Partitions jobs
#and union them together.
apps = get_metrics("map-partitions-rdd-run-output")["appName"]
df = get_spark_stats(apps[0])
for appName in apps[1:]:
    df = df.append(get_spark_stats(appName))

In [None]:
#I wanted to see if the peak execution memory related to the bytes input/output.  This is running a linear regression
#(a best fit line) comparing the peakExecutionMemory for a task and the bytes read into the task.
# It looks like they are somewhat related, but I wouldn't say they are strongly related.
ax = sns.lmplot(x="peakExecutionMemory", y="bytesRead", truncate=True, data=df)
ax.set(xlabel='Peak Execution Memory (bytes)', 
       xlim = (0,max(df["peakExecutionMemory"])),
       ylabel='Bytes Read (bytes)', 
       ylim = (0,max(df["bytesRead"])),
       title="Peak Execution Memory vs Bytes Read for MapPartitions RDD")

In [None]:
# This is interesting.  It really looks like the peakExecutionMemory and the shuffleBytesWritten (the output of a task)
# are strongly related.  I'd like to see if this pattern continues through the other comparisons. Might be a good
# talking point.
ax = sns.lmplot(x="peakExecutionMemory", y="shuffleBytesWritten", truncate=True, data=df)
ax.set(xlabel='Peak Execution Memory (bytes)', 
       xlim = (0,max(df["peakExecutionMemory"])),
       ylabel='Shuffle Bytes Written (bytes)', 
       ylim = (0,max(df["shuffleBytesWritten"])),
       title="Peak Execution Memory vs Bytes Read for MapPartitions RDD")

In [None]:
cluster_metric_output_mapping = {
    "basic-dataframe-cluster-run-output": "BasicDataframe",
    "basic-dataset-cluster-run-output": "BasicDataset",
    "basic-rdd-cluster-run-output": "BasicRDD",
    "cache-dataframe-cluster-run-output": "CacheDataframe",
    "cache-dataset-cluster-run-output": "CacheDataset",
    "cache-rdd-cluster-run-output": "CacheRDD",
    "kmeans-dataframe-cluster-run-output": "KMeansDataframe",
    "kmeans-dataset-cluster-run-output": "KMeansDataset",
    "map-partitions-cluster-rdd-run-output": "MapPartitionsRDD",
    "map-rdd-cluster-run-output": "MapRDD",
    "partition-dataframe-cluster-run-output": "PartitionDataframe",
    "parititon-dataset-cluster-run-output": "PartitionDataset",
    "partition-rdd-cluster-run-output": "PartitionRDD"
}

#Invert the above mapping
cluster_metric_output_mapping_inv = {v: k for k, v in cluster_metric_output_mapping.items()}

#Given spark-stats app name, get the file name
def get_cluster_spark_stats_file(appName):
    if(cwd.startswith("C")):
        return cluster_stats_dir + "\\" + appName
    else:
        return cluster_stats_dir + "/" + appName

#Given the metrics output name, get the file name
def get_cluster_metrics_file(fileName):
    if(cwd.startswith("C")):
        return cluster_metrics_dir + "\\" + fileName
    else:
        return cluster_metrics_dir + "/" + fileName

#Given a run-output file name (in metrics directory), get the appNames associated with that run
def get_cluster_associated_appnames(run_output_name):
    valid_files = list(pd.read_csv(get_cluster_metrics_file(run_output_name))["appName"])
    return [a_file for a_file in os.listdir(cluster_stats_dir) if a_file in valid_files]

#Given an appName (also a file name in the spark-stats directory), get the run parameters associated with that run
def get_cluster_run_params(appName):
    for beginningOutput in cluster_metric_output_mapping_inv:
        if appName.startswith(beginningOutput):
            metrics_df = pd.read_csv(get_cluster_metrics_file(cluster_metric_output_mapping_inv[beginningOutput]))
            return metrics_df[metrics_df["appName"] == appName]

#Given an app name (file name in spark-stats since they match), get the dataframe for that app name
def get_cluster_spark_stats(appName):
    filetoUse = get_cluster_spark_stats_file(appName)
    try:
        return pd.read_json(filetoUse)
    except:
        return None

#Get the metrics dataframe for the specified run-output file name. Then clean up and sort  This will only apply to local jobs as is
def get_cluster_metrics(run_output_name):
    raw_metrics = pd.read_csv(get_cluster_metrics_file(run_output_name))
    raw_metrics["executorMem"] = raw_metrics["executorMem"].apply(lambda x: int(x[:-1]))
    raw_metrics["overheadMem"] = raw_metrics["overheadMem"].apply(lambda x: int(x[:-1]))
    return raw_metrics

def get_cluster_full_execution_time(df):
    if df is None:
        return (0, 0, -1000)
    start_time = min(df["submissionTime"])
    end_time = max(df["completionTime"])
    time_range = end_time - start_time
    return (start_time, end_time, time_range)


In [None]:
# Map Partitions RDD vs Map RDD
# Line plot of the job num vs execution time
# Note that basic-rdd and map-partitions are the exact same.  They were just labelled differently.
# I was curious to see if they exhibited the same run times, but they don't.  I don't know how to explain the differences
mapPartitions_vs_map_df = {
    "BasicRDD": get_cluster_metrics("basic-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "MapPartitions": get_cluster_metrics("map-partitions-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "Map": get_cluster_metrics("map-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(mapPartitions_vs_map_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When\nUsing Map vs MapPartitions vs BasicRDD On Cluster", savefig="mapVsMapPartitionsAllExecutionTimeCluster.png")

#Cleanup the plot that we just printed so the one below doesn't have extra data
# plt.cla()

#Now do cleanup. Since BasicRDD and MapParitions are the same job, let's just take the minimum execution time
#to compare to the execution time of the map job.
data["MapPartitions"] = data[["MapPartitions", "BasicRDD"]].min(axis=1)
data = data.drop(["BasicRDD"], axis=1)

generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Using Map vs MapPartitions", savefig="mapVsMapPartitionsBestExecutionTimeCluster.png")


In [None]:
# Now I'm comparing the max peak execution memory of the mapParititons and the map jobs.
mapPartitions_vs_map_df = {
    "MapPartitions": get_cluster_metrics("map-partitions-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),
    "Map": get_cluster_metrics("map-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
#     "BasicRDD": get_cluster_metrics("basic-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
}
data = pd.DataFrame(mapPartitions_vs_map_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison When\nUsing Map vs MapPartitions on Cluster", savefig="mapVsMapPartitionsMaxPeakExecutionMemoryCluster.png")


In [None]:
get_cluster_metrics("basic-rdd-cluster-run-output")["appName"][0]

In [None]:
#Let's get metrics on the input of the job
sample_mapPartitionsJob_appName = get_cluster_metrics("map-partitions-rdd-cluster-run-output")["appName"][0]
df = get_cluster_spark_stats(sample_mapPartitionsJob_appName)
sum(df[df["stageId"] == 0]["bytesRead"])


In [None]:
sum(df[df["stageId"] == 0]["recordsRead"])

In [None]:
basic_jobs_df = {
    "BasicRDD": get_cluster_metrics("basic-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "BasicDataset": get_cluster_metrics("basic-dataset-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "BasicDataFrame": get_cluster_metrics("basic-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(basic_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Using Count Jobs of\nRDD vs DataFrame vs Dataset On Cluster", savefig="basicJobsAllExecutionTimeCluster.png")

basic_jobs_df = {
    "BasicDataset": get_cluster_metrics("basic-dataset-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),
    "BasicDataFrame": get_cluster_metrics("basic-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
    "BasicRDD": get_cluster_metrics("basic-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
}
data = pd.DataFrame(basic_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison When Using Count Jobs of\nRDD vs DataFrame vs Dataset On Cluster", savefig="basicJobsMaxPeakExecutionMemoryCluster.png")



In [None]:
partition_jobs_df = {
    "PartitionRDD": get_cluster_metrics("partition-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "PartitionDataset": get_cluster_metrics("partition-dataset-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "PartitionDataFrame": get_cluster_metrics("partition-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(partition_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Repartitioning\nRDD vs DataFrame vs Dataset On Cluster", savefig="partitionJobsAllExecutionTimeCluster.png")

partition_jobs_df = {
    "PartitionDataset": get_cluster_metrics("partition-dataset-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),
    "PartitionDataFrame": get_cluster_metrics("partition-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
    "PartitionRDD": get_cluster_metrics("partition-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
}
data = pd.DataFrame(partition_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison When Repartitioning\nRDD vs DataFrame vs Dataset On Cluster", savefig="partitionJobsMaxPeakExecutionMemoryCluster.png")


In [None]:
partition_jobs_df = {
    "PartitionRDD": get_cluster_metrics("partition-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "PartitionDataset": get_cluster_metrics("partition-dataset-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "PartitionDataFrame": get_cluster_metrics("partition-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(partition_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Repartitioning\nRDD vs DataFrame vs Dataset On Cluster", savefig="partitionJobsAllExecutionTimeCluster.png")

partition_jobs_df = {
    "PartitionDataset": get_cluster_metrics("partition-dataset-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),
    "PartitionDataFrame": get_cluster_metrics("partition-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
    "PartitionRDD": get_cluster_metrics("partition-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
}
data = pd.DataFrame(partition_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison When Repartitioning\nRDD vs DataFrame vs Dataset On Cluster", savefig="partitionJobsMaxPeakExecutionMemoryCluster.png")


In [None]:
cache_jobs_df = {
    "CacheRDD": get_cluster_metrics("cache-rdd-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "CacheDataset": get_cluster_metrics("cache-dataset-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),
    "CacheDataFrame": get_cluster_metrics("cache-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_cluster_full_execution_time(get_cluster_spark_stats(appName))[2]/1000.0),  
}
data = pd.DataFrame(cache_jobs_df)

#Create a line plot for all 3 jobs.  This is what shows there are weird anomalies in the execution time for the 
# mapPartitions/basicRDD jobs
generate_graph(data, "Job Number", "Total Execution Time (sec)", title="Execution Time Comparison When Caching\nRDD vs DataFrame vs Dataset On Cluster", savefig="cacheJobsAllExecutionTimeCluster.png")

cache_jobs_df = {
    "CacheDataset": get_cluster_metrics("cache-dataset-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),
    "CacheDataFrame": get_cluster_metrics("cache-dataframe-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
    "CacheRDD": get_cluster_metrics("cache-rdd-cluster-run-output")["appName"].apply(lambda appName:get_max_peakExecutionMemory(get_cluster_spark_stats(appName))),  
}
data = pd.DataFrame(cache_jobs_df)

generate_graph(data, "Job Number", "Max Peak Execution Memory (bytes)", 
               title="Max Peak Execution Memory Comparison When Caching\nRDD vs DataFrame vs Dataset On Cluster", savefig="cacheJobsMaxPeakExecutionMemoryCluster.png")
