In [None]:
# replace this with a Spark history log of your own or parameterize with Papermill!

metrics_file = "metrics/application_1601392010735_0030"

In [None]:
import pandas as pd
pd.options.display.max_columns = None
pd.options.display.max_rows = None

In [None]:
import pyspark
import pyspark.sql.functions as F
import json

spark = pyspark.sql.SparkSession.builder.getOrCreate()

In [None]:
metrics = spark.read.json(metrics_file)

In [None]:
app_id, app_name = metrics.select("App ID", "App Name").dropna().collect()[0]

In [None]:
def collect_and_dictify(df):
    return [json.loads(row[0]) for row in df.selectExpr("to_json(*)").collect()]

def executor_info(df):
    info = df.select("Executor Info").dropna()
    return collect_and_dictify(info)

def plan_dicts(df):
    return collect_and_dictify(df.select("sparkPlanInfo").dropna())


In [None]:
from collections import namedtuple

MetricNode = namedtuple("MetricNode", "plan_node accumulatorId metricType name")
PlanInfoNode = namedtuple("PlanInfoNode", "plan_node parent nodeName simpleString")

def nextid():
    i = 0
    while True:
        yield i
        i = i + 1
    
node_ctr = nextid()

def plan_dicts(df):
    return collect_and_dictify(df.select("sparkPlanInfo").dropna())

def flatplan(dicts, parent=-1, plan_nodes=None, metric_nodes=None):
    if plan_nodes is None:
        plan_nodes = list()
        
    if metric_nodes is None:
        metric_nodes = list()
    
    for pd in dicts:
        pid = next(node_ctr)
        for m in pd['metrics']:
            metric_nodes.append(MetricNode(pid, m['accumulatorId'], m['metricType'], m['name']))
        
        plan_nodes.append(PlanInfoNode(pid, parent, pd['nodeName'], pd['simpleString']))
        
        flatplan(pd['children'], pid, plan_nodes, metric_nodes)
    
    return(plan_nodes, metric_nodes)

def plan_dfs(df):
    pn, mn = flatplan(plan_dicts(metrics))
    
    pndf = spark.createDataFrame(data=pn)
    mndf = spark.createDataFrame(data=mn)
    
    return (pndf, mndf)

In [None]:
plan_nodes, accumulable_nodes = plan_dfs(metrics)
pn, mn = flatplan(plan_dicts(metrics))

In [None]:
def tasks_to_stages(df):
    return df.where(F.col('Event') == 'SparkListenerTaskStart').select(F.col("Task Info.Task ID").alias('Task ID'), 'Stage ID')

In [None]:
def accumulables(df, noun='Task', extra_cols=[]):
    mcol = '%s Info' % noun
    idcol = '%s ID' % noun
    
    acc_cols = [F.col('Accumulable.%s' % s).alias('Metric %s' % s) for s in ['ID', 'Name', 'Value']]
    obs = df.select(mcol, *extra_cols).select('%s.*' % mcol, *extra_cols)
    cols = [F.col(elt) for elt in sorted(set(obs.columns) - set([idcol, 'Accumulables']))]
    
    return obs.select(
        idcol, 
        F.explode('Accumulables').alias('Accumulable'), 
        *(cols)
    ).select(
        idcol, 
        *(cols + acc_cols)
    ).withColumnRenamed("Metric ID", "accumulatorId").withColumn("Metric Value", F.col("Metric Value").cast("float"))

def tidy_metrics(df, noun='Task', event=None, interesting_metrics=None, extra_cols=[]):
    mcol = '%s Info' % noun
    idcol = '%s ID' % noun
    
    if event is not None:
        event_selector = (F.col('Event') == event)
    else:
        event_selector = F.lit(True)
    
    filtered = df.where(event_selector)
    
    metric_cols = ""
    
    return accumulables(filtered, noun, extra_cols)

def tidy_tasks(df, event='SparkListenerTaskEnd', interesting_metrics=None):
    return tidy_metrics(df, 'Task', event=event, interesting_metrics=(interesting_metrics or F.lit(True)), extra_cols=["Stage ID"])

def tidy_stages(df, event='SparkListenerStageCompleted', interesting_metrics=None):
    return tidy_metrics(df, 'Stage', event=event, interesting_metrics=F.lit(True))



In [None]:
MetricMeta = namedtuple('MetricMeta', 'MetricName kind unit')

metric_metas = [
    MetricMeta('GPU decode time', 'time', 'ms'),
    MetricMeta('GPU time', 'time', 'ms'),
    MetricMeta('avg hash probe bucket list iters', 'count', 'iterations'),
    MetricMeta('buffer time', 'time', 'ms'),
    MetricMeta('build side size', 'size', 'bytes'),
    MetricMeta('build time', 'time', 'ms'),
    MetricMeta('collect batch time', 'time', 'ms'),
    MetricMeta('concat batch time', 'time', 'ms'),
    MetricMeta('data size', 'size', 'bytes'),
    MetricMeta('duration', 'time', 'ms'),
    MetricMeta('fetch wait time', 'time', 'ms'),
    MetricMeta('internal.metrics.diskBytesSpilled', 'size', 'bytes'),
    MetricMeta('internal.metrics.executorCpuTime', 'time', 'ns'),
    MetricMeta('internal.metrics.executorDeserializeCpuTime', 'time', 'ns'),
    MetricMeta('internal.metrics.executorDeserializeTime', 'time', 'ms'),
    MetricMeta('internal.metrics.executorRunTime', 'time', 'ms'),
    MetricMeta('internal.metrics.input.bytesRead', 'size', 'bytes'),
    MetricMeta('internal.metrics.input.recordsRead', 'count', 'records'),
    MetricMeta('internal.metrics.jvmGCTime', 'time', 'ms'),
    MetricMeta('internal.metrics.memoryBytesSpilled', 'size', 'bytes'),
    MetricMeta('internal.metrics.output.bytesWritten', 'size', 'bytes'),
    MetricMeta('internal.metrics.output.recordsWritten', 'count', 'records'),
    MetricMeta('internal.metrics.peakExecutionMemory', 'size', 'bytes'),
    MetricMeta('internal.metrics.resultSerializationTime', 'time', 'ms'),
    MetricMeta('internal.metrics.resultSize', 'size', 'bytes'),
    MetricMeta('internal.metrics.shuffle.read.fetchWaitTime', 'time', 'ms'),
    MetricMeta('internal.metrics.shuffle.read.localBlocksFetched', 'count', 'blocks'),
    MetricMeta('internal.metrics.shuffle.read.localBytesRead', 'size', 'bytes'),
    MetricMeta('internal.metrics.shuffle.read.recordsRead', 'count', 'records'),
    MetricMeta('internal.metrics.shuffle.read.remoteBlocksFetched', 'count', 'blocks'),
    MetricMeta('internal.metrics.shuffle.read.remoteBytesRead', 'size', 'bytes'),
    MetricMeta('internal.metrics.shuffle.read.remoteBytesReadToDisk', 'size', 'bytes'),
    MetricMeta('internal.metrics.shuffle.write.bytesWritten', 'size', 'bytes'),
    MetricMeta('internal.metrics.shuffle.write.recordsWritten', 'count', 'records'),
    MetricMeta('internal.metrics.shuffle.write.writeTime', 'time', 'ms'),
    MetricMeta('join output rows', 'count', 'rows'),
    MetricMeta('join time', 'time', 'ms'),
    MetricMeta('local blocks read', 'count', 'blocks'),
    MetricMeta('local bytes read', 'size', 'bytes'),
    MetricMeta('number of input columnar batches', 'count', 'batches'),
    MetricMeta('number of input rows', 'count', 'rows'),
    MetricMeta('number of output columnar batches', 'count', 'batches'),
    MetricMeta('number of output rows', 'count', 'rows'),
    MetricMeta('peak device memory', 'size', 'bytes'),
    MetricMeta('peak memory', 'size', 'bytes'),
    MetricMeta('records read', 'count', 'records'),
    MetricMeta('remote blocks read', 'count', 'blocks'),
    MetricMeta('remote bytes read', 'size', 'bytes'),
    MetricMeta('scan time', 'time', 'ms'),
    MetricMeta('shuffle bytes written', 'size', 'bytes'),
    MetricMeta('shuffle records written', 'count', 'records'),
    MetricMeta('shuffle write time', 'time', 'ms'),
    MetricMeta('spill size', 'size', 'bytes'),
    MetricMeta('sort time', 'time', 'ms'),
    MetricMeta('time in aggregation build', 'time', 'ms'),
    MetricMeta('time in batch concat', 'time', 'ms'),
    MetricMeta('time in compute agg', 'time', 'ms'),
    MetricMeta('total time', 'time', 'ns'),
    MetricMeta('write time', 'time', 'ms')
]

In [None]:
metric_meta = spark.createDataFrame(data=metric_metas)
task_metrics = tidy_tasks(metrics)

In [None]:
task_metrics.toPandas()

In [None]:
tasks_to_plans = task_metrics.join(accumulable_nodes, "accumulatorId").join(plan_nodes, "plan_node")


In [None]:
tasks_to_plans.count()

In [None]:
import altair as alt
alt.data_transformers.enable('json')

def stage_and_task_charts(task_metrics_df, noun="Time"):
    
    selection = alt.selection_multi(name="SelectorName", fields=['Stage ID'])
    stage_metrics_df = task_metrics_df.groupby(['Stage ID', 'Metric Name']).sum()
    
    stages = alt.Chart(
        stage_metrics_df.reset_index()
    ).mark_bar().encode(
        x='Stage ID:N',
        y=alt.Y('sum(Metric Value):Q', title=noun),
        color='Metric Name:N',
        tooltip=['Metric Name', 'Metric Value', 'Task ID']
    ).add_selection(selection).interactive()
    
    tasks = alt.Chart(
        task_metrics_df.reset_index()
    ).mark_bar().encode(
        x='Task ID:N',
        y=alt.Y('sum(Metric Value):Q', title=noun),
        color='Metric Name:N',
        tooltip=['Metric Name', 'Metric Value', 'Task ID']
    ).transform_filter(
        selection
    ).interactive()

    return alt.vconcat(stages, tasks)

def layered_stage_and_task_charts(task_layers, noun="Time"):
    
    selection = alt.selection_multi(name="selector_SelectorName", fields=['Stage ID'])
    sdfs = [tdf.groupby(['Stage ID', 'Metric Name']).sum() for tdf in task_layers]
    
    stages = alt.layer(*[alt.Chart(
        sdf.reset_index()
    ).mark_bar().encode(
        x='Stage ID:N',
        y=alt.Y('sum(Metric Value):Q', title=noun),
        color='Metric Name:N',
        tooltip=['Metric Name', 'Metric Value', 'Task ID']
    ) for sdf in sdfs]).add_selection(selection).interactive()
    
    tasks = alt.layer(*[alt.Chart(
        tdf.reset_index()
    ).mark_bar().encode(
        x='Task ID:N',
        y=alt.Y('sum(Metric Value):Q', title=noun),
        color='Metric Name:N',
        tooltip=['Metric Name', 'Metric Value', 'Task ID']
    ).transform_filter(
        selection
    ) for tdf in task_layers]).interactive()

    return alt.vconcat(stages, tasks)


In [None]:
task_metrics.select("Metric Name").distinct().collect()

In [None]:
task_byte_metrics = tidy_tasks(metrics).join(
    metric_meta.withColumnRenamed("MetricName", "Metric Name"), 
    "Metric Name", 
    how="outer"
).where(F.col("unit") == "bytes").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").toPandas()


# Shuffle metrics

In [None]:
task_shuffle_metrics = task_byte_metrics[task_byte_metrics['Metric Name'].str.contains('internal.metrics.shuffle')].sort_values('Task ID')
shuffle_replacer = lambda match: "Shuffle %s" % match.group('metric')
task_shuffle_metrics['Metric Name'] = task_shuffle_metrics['Metric Name'].str.replace(r'internal\.metrics\.shuffle\.(?P<kind>read|write).(?P<metric>.*)$', shuffle_replacer)

In [None]:
stage_and_task_charts(task_shuffle_metrics, "bytes")

# Executor time metrics

In [None]:
task_metrics = tidy_tasks(metrics).join(
    metric_meta.withColumnRenamed("MetricName", "Metric Name"), 
    "Metric Name", 
    how="outer"
).withColumn("Metric Value", F.col("Metric Value").cast("float"))

task_ms_metrics = task_metrics.where(F.col("unit") == "ms").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value")
task_ns_metrics = task_metrics.where(F.col("unit") == "ns").groupBy("Stage ID", "Task ID", "Metric Name").sum("Metric Value").withColumnRenamed("sum(Metric Value)", "Metric Value").withColumn("Metric Value", F.col("Metric Value").cast("float") / 1000000)

task_time_metrics = task_ms_metrics.union(task_ns_metrics).toPandas()

In [None]:
task_metrics

In [None]:
task_executor_metrics = task_time_metrics[~task_time_metrics['Metric Name'].str.contains('internal.metrics.shuffle.')].sort_values('Task ID')

stage_and_task_charts(task_executor_metrics)

# Plotting wall-clock vs CPU time with layered charts

In [None]:
cputime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorCpuTime')]
runtime = task_time_metrics[task_time_metrics['Metric Name'].str.contains('executorRunTime')]
layered_stage_and_task_charts([runtime, cputime])

# Memory and spill metrics

In [None]:
stage_and_task_charts(task_byte_metrics[task_byte_metrics['Metric Name'].str.contains('memory') | task_byte_metrics['Metric Name'].str.contains('size') | task_byte_metrics['Metric Name'].str.contains('pill')], "bytes")

In [None]:
stage_and_task_charts(task_byte_metrics, "bytes")


# Configuration information

In [None]:
def melt(df, id_vars=None, value_vars=None, var_name='variable', value_name='value'):
    if id_vars is None:
        id_vars = []
    
    if value_vars is None:
        value_vars = [c for c in df.columns if c not in id_vars]
    
    return df.withColumn(
        "value_tuple",
        F.explode(
            F.array(
                *[
                    F.struct(
                        F.lit(vv).alias(var_name), 
                        F.col("`%s`" % vv).alias(value_name)
                    ) 
                    for vv in value_vars
                ]
            )
        )
    ).select(*(id_vars + [F.col("value_tuple")[cn].alias(cn) for cn in [var_name, value_name]]))

In [None]:
def meltmetrics(raw_df, event):
    if event is not None:
        if isinstance(event, list):
            df = raw_df.where(F.col("Event").isin(event))
        else:
            df = raw_df.where(F.col("Event") == event)
    else:
        df = raw_df
            
    def helper(df, field):
        return melt(df.select(field).dropna().select("%s.*" % field))

    return helper(df, "Properties").union(helper(df, "System Properties")).union(helper(df, "Hadoop Properties")).distinct()
    

In [None]:
meltmetrics(metrics, ["SparkListenerEnvironmentUpdate","SparkListenerJobStart"]).toPandas()