# SPARK METRIC EXTRACTION

In [53]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col 

### Input EventLog JSON file & Output directory path

In [54]:
file_absolute_path = "C:/Users/shivam.verma/Desktop/event_logs/application_1587909807967_89833"
output_dir = "C:/Users/shivam.verma/Desktop/event_logs/"

### Create spark session & read json as dataframe

In [55]:
spark = SparkSession.builder.appName("Spark Event Log Metrics Collector").enableHiveSupport().getOrCreate()
metrics = spark.read.json("file:///" + file_absolute_path)

### Query execution details
* query: contains query text. To show query in event log, set sparkContext.setLocalProperty("callSite.long", query) where query is variable containing the query text.
* execution_start_time: execution start time of query in millisec.
* execution_end_time: execution end time of query in millisec.
* execution_time: total time taken in executing the query in millisec.
* numStages: number of stages used in executing query
* numTasks: number of tasks used in executing query
* executorCpuTime: total executor CPU time used in executing query in millisec.
* memoryBytesSpilled: total memory spilled while executing query in bytes.
* peakExecutionMemory: max execution memory used while executing query in bytes.
* recordsRead: number of records read while executing query.
* bytesRead: bytes read while executing query.
* recordsWritten: number of records written while executing query.
* bytesWritten: bytes written while executing query. 

In [56]:
class QueryExecutionDetail:
    query = ""
    execution_start_time = ""
    execution_end_time = ""
    execution_time = ""
    numStages = ""
    numTasks= ""
    executorCpuTime = ""
    memoryBytesSpilled = ""
    peakExecutionMemory = ""
    recordsRead = ""
    bytesRead = ""
    recordsWritten = ""
    bytesWritten = ""

### Application details
* application_name: name of the applicating in spark.
* application_id: application id in spark.
* application_start_time: execution start time of application in millisec.
* application_end_time: execution end time of application in millisec.
* application_execution_time: total time taken in executing the application in millisec.
* application_history_url: url t access spark UI for application.
* numStages: number of stages used in executing application
* numTasks: number of tasks used in executing application
* executorCpuTime: total executor CPU time used in executing application in millisec.
* memoryBytesSpilled: total memory spilled while executing application in bytes.
* peakExecutionMemory: max execution memory used while executing application in bytes.
* recordsRead: number of records read while executing application.
* bytesRead: bytes read while executing application.
* recordsWritten: number of records written while executing application.
* bytesWritten: bytes written while executing application.

In [57]:
class ApplicationDetail:
    application_name = ""
    application_id = ""
    application_start_time = ""
    application_end_time = ""
    application_execution_time = ""
    application_history_url = ""
    numStages = ""
    numTasks= ""
    executorCpuTime = ""
    memoryBytesSpilled = ""
    peakExecutionMemory = ""
    recordsRead = ""
    bytesRead = ""
    recordsWritten = ""
    bytesWritten = ""
    query_details = []

### Extract application details from metrics
* **SparkListenerApplicationStart** event contains informations like app name, id, timestamp when application started etc.
* **SparkListenerApplicationEnd** event contains timestamp when application ended.
* Application details are extractred and stored into application detail object.

In [58]:
app_detail = ApplicationDetail()

app_start_metrics = metrics.filter("Event='SparkListenerApplicationStart'").select("App ID", "App Name", "Timestamp")
app_end_metrics = metrics.filter("Event='SparkListenerApplicationEnd'").select("Timestamp")

app_start_metric_dict = app_start_metrics.first().asDict()
app_end_metric_dict = app_end_metrics.first().asDict()

app_detail.application_name = app_start_metric_dict["App Name"]
app_detail.application_id = app_start_metric_dict["App ID"]
app_detail.application_start_time = app_start_metric_dict["Timestamp"]
app_detail.application_end_time = app_end_metric_dict["Timestamp"]
app_detail.application_execution_time = int(app_detail.application_end_time) - int(app_detail.application_start_time)

### Extract query details executed in application
* **SparkListenerSQLExecutionStart** event contains informations like query execution start time, executin id and query text if local property "callSite.long" is set as query.
* **SparkListenerSQLExecutionEnd** event contains execution end timestamp for the queries with its execution id.
* Joining query execution start info & query execution end info on execution id, we can get all info about queries executed in application.
* Query details for each query is saved in seperate object and all query detail object is saved in application detail object.

In [59]:
query_start_metrics = metrics.filter("Event='org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionStart'").select(
    [col("details").alias("query_text"), col("executionId"), col("time").alias("exec_start_time")])
query_end_metrics = metrics.filter("Event='org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd'").select(
    [col("executionId"), col("time").alias("exec_end_time")])
joined_query_metrics = query_start_metrics.join(query_end_metrics, on=['executionId'], how='inner')

joined_query_metrics_lst = joined_query_metrics.sort('executionId').collect()

for row in joined_query_metrics_lst:
    query_detail = QueryExecutionDetail()
    query_detail.query = row["query_text"]
    query_detail.execution_start_time = row["exec_start_time"]
    query_detail.execution_end_time = row["exec_end_time"]
    query_detail.execution_time = int(query_detail.execution_end_time) - int(query_detail.execution_start_time)
    app_detail.query_details.append(query_detail)

### Extract stages and its metrics used in application 
* **SparkListenerStageCompleted** event contains all information completed successfully while execution of the application.
* Following information are extracted from this event:
    * Stage ID: Id to identify stage.
    * Submission Time: Timestamp when stage execution started for the stage.
    * Completion Time: Timestamp when stage execution completed for the stage.
    * Number of Tasks: Total number of tasks created while execution of stage.

In [60]:
stage_completed_metrics = metrics.filter("Event='SparkListenerStageCompleted'").select("`Stage Info`.*")
stage_completed_metrics.createOrReplaceTempView("stage_completed_metrics")
stage_completed_metrics_filtered = spark.sql("""
          select `Stage ID`,
                 min(`Submission Time`) as `Submission Time`, 
                 max(`Completion Time`) as `Completion Time`, 
                 sum(`Number of Tasks`) as `Number of Tasks`
           from stage_completed_metrics group by `Stage ID` order by `Stage ID`
           """)
stage_completed_metrics.createOrReplaceTempView("stage_completed_metrics_filtered")

### Extract task & its metrics used in application
* **SparkListenerTaskEnd** event contains all information about task created and executed successfully while execution of application.
* Following informations are extracted from this event:
    * Stage ID: Id of the stage under which this task was created.
    * Task ID: Task identification number.
    * Executor CPU Time: total executor CPU time used in executing the task in millisec.
    * Memory Bytes Spilled: total memory spilled while executing the task in bytes.
    * Peak Memory Execution: max execution memory used while executing the task in bytes.
    * Records Read: number of records read while executing the task.
    * Bytes Read: bytes read while executing the task.
    * Records Written: number of records written while the task application.
    * Bytes Written: bytes written while executing the task.
* Stage metrics & task metrics are joined based on stage id and metrics for each stage is calculated.

In [61]:
task_end_metrics = metrics.filter("Event='SparkListenerTaskEnd'").select("Stage ID", "Task Info.*", "Task Metrics.*")
task_end_metrics.createOrReplaceTempView("task_end_metrics")

task_end_metrics_filtered = task_end_metrics.select("Task ID", "Stage ID", "Executor CPU Time", "Memory Bytes Spilled",
                                                    "Output Metrics.*","Input Metrics.*", "Finish Time")
task_end_metrics_filtered.createOrReplaceTempView("task_end_metrics_filtered")

task_end_metrics_acccumulables = spark.sql("""
    select t1.`Task ID`, t2.Name, cast(t2.Value as bigint) as `Peak Execution Memory` from task_end_metrics as t1 
    lateral view explode(t1.Accumulables) as t2 
    where t2.Name = 'internal.metrics.peakExecutionMemory'
    """)
task_end_metrics_acccumulables.createOrReplaceTempView("task_end_metrics_accumulables")
task_metrics_aggregated = spark.sql("""
          select t1.`Stage ID`,  
                  count(t1.`Task ID`) as `No. of Tasks`, 
                  sum(t1.`Executor CPU Time`) as `Total Executor CPU Time`,
                  sum(t1.`Memory Bytes Spilled`) as `Total Memory Bytes Spilled`,
                  max(t2.`Peak Execution Memory`) as `Peak Execution Memory`,
                  sum(t1.`Bytes Written`) as `Total Bytes Written`,
                  sum(t1.`Records Written`) as `Total Records Written`,
                  sum(t1.`Bytes Read`) as `Total Bytes Read`,
                  sum(t1.`Records Read`) as `Total Records Read`
           from task_end_metrics_filtered t1 left join task_end_metrics_accumulables t2
           on (t1.`Task ID` = t2.`Task ID`) 
           group by `Stage ID` order by `Stage ID`
          """)
task_metrics_aggregated.createOrReplaceTempView("task_metrics_aggregated")
joined_metrics = spark.sql("""
      select sm.`Stage ID` as `Stage ID`,
             sm.`Submission Time` as `Submission Time`,
             sm.`Completion Time` as `Completion Time`,
             sm.`Number of Tasks` as `Number of Tasks`,
             tm.`Total Executor CPU Time` as `Total Executor CPU Time`,
             tm.`Total Memory Bytes Spilled` as `Total Memory Bytes Spilled`,
             tm.`Peak Execution Memory` as `Peak Execution Memory`,
             tm.`Total Bytes Written` as `Total Bytes Written`,
             tm.`Total Records Written` as `Total Records Written`,
             tm.`Total Bytes Read` as `Total Bytes Read`,
             tm.`Total Records Read` as `Total Records Read`
       from stage_completed_metrics_filtered as sm inner join task_metrics_aggregated as tm 
       on (sm.`Stage ID` = tm.`Stage ID`) order by `Stage ID`
      """)
joined_metrics.createOrReplaceTempView("joined_metrics")

### Calculate Query metrices
* Based on submission time of stages & completion time of stage and start time & end time of query. We can calculate metrics for each query.
* Submission time of the stage is mapped to execution start time of the query.
* Completion time of the stage is mapped to execution end time of the query.
* All stages and tasks created and executed b/w start time & end time of the query are for that particular query. 

In [62]:
def get_query_metrics(execution_start_time, execution_end_time):
    query_metrics = spark.sql("""
      select count(*) as `Number of Stages`,
             max(`Completion Time`) - min(`Submission Time`) as `Execution Time`,
             sum(`Number of Tasks`) as `Number of Tasks`,
             sum(`Total Executor CPU Time`) as `Total Executor CPU Time`,
             sum(`Total Memory Bytes Spilled`) as `Total Memory Bytes Spilled`,
             max(`Peak Execution Memory`) as `Peak Execution Memory`,
             sum(`Total Bytes Written`) as `Total Bytes Written`,
             sum(`Total Records Written`) as `Total Records Written`,
             sum(`Total Bytes Read`) as `Total Bytes Read`,
             sum(`Total Records Read`) as `Total Records Read`
       from joined_metrics where `Submission Time` >= """ + str(execution_start_time) + 
                              """ and `Completion Time` <= """ + str(execution_end_time))
    return query_metrics

In [63]:
def getValue(dict, key, isString):
    if key in dict.keys() and dict[key] != 'None' and dict[key]is not None:
        if isString:
            return str(dict[key])
        return int(dict[key])
    else: 
        if isString:
            return ""
        return 0

### Calculate Application metrics
* After metrics for each query is calculated, we aggregate them to calculate metrics for the action.

In [64]:
def populate_application_metrics(app_detail):
    total_num_stages = 0
    total_num_tasks = 0
    total_executor_cpu_time = 0
    total_memory_bytes_spilled = 0
    peak_execution_memory = 0
    total_records_read = 0
    total_bytes_read = 0
    total_records_written = 0
    total_bytes_written = 0
    for query in app_detail.query_details:
        total_num_stages = total_num_stages + query.numStages
        total_num_tasks = total_num_tasks + query.numTasks
        total_executor_cpu_time = total_executor_cpu_time + query.executorCpuTime
        total_memory_bytes_spilled = total_memory_bytes_spilled + query.memoryBytesSpilled
        if peak_execution_memory < query.peakExecutionMemory:
            peak_execution_memory = query.peakExecutionMemory
        total_records_read = total_records_read + query.recordsRead
        total_bytes_read = total_bytes_read + query.bytesRead
        total_records_written = total_records_written + query.recordsWritten
        total_bytes_written = total_bytes_written + query.bytesWritten
        
    app_detail.numStages = total_num_stages
    app_detail.numTasks= total_num_tasks
    app_detail.executorCpuTime = total_executor_cpu_time
    app_detail.memoryBytesSpilled = total_memory_bytes_spilled
    app_detail.peakExecutionMemory = peak_execution_memory
    app_detail.recordsRead = total_records_read
    app_detail.bytesRead = total_bytes_read
    app_detail.recordsWritten = total_records_written
    app_detail.bytesWritten = total_bytes_written


### Calculating metrics for each query
* For every query we seperate their metrics based on its execution start time and execution end time based.
* Extracted metrics are then populated into respective query object.

In [65]:
query_peak_memories = []
for query in app_detail.query_details:
    execution_start_time = query.execution_start_time
    execution_end_time = query.execution_end_time
    query_metrics = get_query_metrics(execution_start_time, execution_end_time)
    num_of_agg_rows = query_metrics.count()
    query_metrics_dict = {}
    if num_of_agg_rows > 0:
        query_metrics_dict = query_metrics.first().asDict()
    query.numStages = getValue(query_metrics_dict, "Number of Stages", False)
    query.numTasks= getValue(query_metrics_dict, "Number of Tasks", False)
    executor_cpu_time = getValue(query_metrics_dict, "Total Executor CPU Time", False)
    query.executorCpuTime = executor_cpu_time // 1000000
    query.memoryBytesSpilled = getValue(query_metrics_dict, "Total Memory Bytes Spilled", False)
    peak_execution_memory = getValue(query_metrics_dict, "Peak Execution Memory", False)
    query.peakExecutionMemory = peak_execution_memory
    query_peak_memories.append(peak_execution_memory)
    query.recordsRead = getValue(query_metrics_dict, "Total Records Read", False)
    query.bytesRead = getValue(query_metrics_dict, "Total Bytes Read", False)
    query.recordsWritten = getValue(query_metrics_dict, "Total Records Written", False)
    query.bytesWritten = getValue(query_metrics_dict, "Total Bytes Written", False)

In [66]:
populate_application_metrics(app_detail)

In [67]:
def append_row(content, row):
    return content + "\n" + row

### Report in CSV
* After metrics for application is calculated, we prepare a CSV report for the application and queries executed in it.
* The report file is wrriten with the name in the format application name followed by application id and saved to given output path.

In [68]:
filename = "{}/{}_{}.csv".format(output_dir, app_detail.application_name, app_detail.application_id)
content = '"Action Detail"'
content = append_row(content, '"Application Name:","{}"'.format(app_detail.application_name))
content = append_row(content, '"No. of Queries:","{}"'.format(len(app_detail.query_details)))
content = append_row(content, '"Start Time:","{}"'.format(app_detail.application_start_time))
content = append_row(content, '"End Time:","{}"'.format(app_detail.application_end_time))
content = append_row(content, '"Execution Time:","{}"'.format(app_detail.application_execution_time))
content = append_row(content, '"Total No. of Stages:","{}"'.format(app_detail.numStages))
content = append_row(content, '"Total No. of Tasks:","{}"'.format(app_detail.numTasks))
content = append_row(content, '"Executor CPU Time:","{}"'.format(app_detail.executorCpuTime))
content = append_row(content, '"Total Memory Bytes Spilled:","{}"'.format(app_detail.memoryBytesSpilled))
content = append_row(content, '"Peak Execution Memory:","{}"'.format(app_detail.peakExecutionMemory))
content = append_row(content, '"Total Records Read:","{}"'.format(app_detail.recordsRead))
content = append_row(content, '"Total Bytes Read:","{}"'.format(app_detail.bytesRead))
content = append_row(content, '"Total Records Written:","{}"'.format(app_detail.recordsWritten))
content = append_row(content, '"Total Bytes Written:","{}"'.format(app_detail.bytesWritten))
content = append_row(content, '"Spark Job History URL:","{}"'.format(app_detail.application_history_url))
content = append_row(content, '\n"Query Wise Detail"')
header = '"S. No.","Query","Query Execution Start Time","Query Execution End Time",\
"Total Execution Time","No. of Stages","No. of Tasks",\
"Executor CPU Time","Memory Bytes Spilled","Peak Execution Memory","Records Read",\
"Bytes Read","Records Written","Bytes Written"'
content = append_row(content, header)
for query_index, query_detail in enumerate(app_detail.query_details):
    executor_cpu_time = query_detail.executorCpuTime
    mem_bytes_spilled = str(query_detail.memoryBytesSpilled)
    peak_exec_mem = str(query_detail.peakExecutionMemory)
    bytes_read = str(query_detail.bytesRead)
    bytes_written = str(query_detail.bytesWritten)
    query_text = query_detail.query.replace('"', "'").strip()
    content = append_row(content, '"{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}","{}"'.format(
        query_index + 1, query_text, query_detail.execution_start_time, query_detail.execution_end_time, 
        query_detail.execution_time, query_detail.numStages, query_detail.numTasks, 
        executor_cpu_time, mem_bytes_spilled, peak_exec_mem, query_detail.recordsRead, bytes_read, 
        query_detail.recordsWritten, bytes_written))
content = append_row(content, "\n")

workflow_summary_file = open(filename, 'w')
workflow_summary_file.write(content)
workflow_summary_file.close()

In [85]:
from openpyxl import Workbook
from openpyxl.chart import ScatterChart, Reference , Series

filename = output_dir + "/chart.xlsx"
workbook = Workbook()
worksheet = workbook.active

worksheet.cell(column=1, row=1, value="Action ID")
worksheet.cell(column=2, row=1, value="Query ID")
worksheet.cell(column=3, row=1, value="Peak Execution Memory")
row=2
for i, item in enumerate(query_peak_memories): 
    worksheet.cell(column=1, row=row, value=1)
    worksheet.cell(column=2, row=row, value=i)
    worksheet.cell(column=3, row=row, value=int(item))
    row += 1

chart = ScatterChart()
chart.title = "Peak Memory Graph"
chart.style = 13
chart.y_axis.title = 'Peak Execution Memory'
chart.x_axis.title = 'Query ID'
x_data = Reference(worksheet, min_col=2, min_row=2, max_row=len(worksheet['B']))
y_data = Reference(worksheet, min_col=3, min_row=1, max_row=len(worksheet['C']))
series = Series(y_data, x_data, title_from_data=True)
chart.series.append(series)
chart.height = 11
chart.width = 28
worksheet.add_chart(chart, 'E3')  
workbook.save(filename=filename)
workbook.close()