In [24]:
#!/usr/bin/python

import json
import sys
from operator import itemgetter
from os import listdir
from os.path import isfile, join, isdir
import pandas as pd

In [44]:
def collectStatistics(analyze_app, event_dir='/tmp/spark-events', memory_dir='/tmp/spark-memory'):   
    captions = ["Iteration",
                "Incremental Start",
                "Incremental End",
                "Incremental Duration",
                "Batch Start",
                "Batch End",
                "Batch Duration", 'Memory Min (bytes)', 'Memory Max (bytes)', 'Memory Mean (bytes)']

    data_frame = pd.DataFrame(columns=captions)
    data_frame.set_index('Iteration')

    onlyfiles = [f for f in listdir(event_dir) if isfile(join(event_dir, f))]

    for file in onlyfiles:
        with open(join(event_dir, file)) as f:
            
            content = f.readlines()
            batch_computation = False
            iteration_number = -1
            for line in content:
                event = json.loads(line)

                if event['Event'] == 'SparkListenerEnvironmentUpdate':
                    app_name = event['Spark Properties']['spark.app.name']
                    if app_name.startswith(analyze_app):
                        version = app_name.replace(analyze_app, '')
                        if not version:
                            version = '0'
                        if '_batch_' in version:
                            batch_computation = True
                            iteration_number = int(version.replace('_batch_', ''))
                            if data_frame.loc[data_frame['Iteration'] == iteration_number].empty:
                                data_frame.loc[len(data_frame)] = iteration_number

                        else:
                            batch_computation = False
                            iteration_number = int(version)
                            if data_frame.loc[data_frame['Iteration'] == iteration_number].empty:
                                data_frame.loc[len(data_frame)] = iteration_number




                if event['Event'] == 'SparkListenerJobStart':
                    start = event['Submission Time']
                    caption = 'Incremental Start'
                    if batch_computation:
                        caption = 'Batch Start'

                    data_frame.loc[data_frame['Iteration'] == iteration_number, caption] = start

                if event['Event'] == 'SparkListenerJobEnd':
                    end = event['Completion Time']
                    caption = 'Incremental End'
                    if batch_computation:
                        caption = 'Batch End'
                    data_frame.loc[data_frame['Iteration'] == iteration_number, caption] = end
                    
            if isdir(memory_dir):
                mem_df = pd.read_csv(join(memory_dir, file + '.driver.jvm.heap.used.csv'))
                data_frame.loc[data_frame['Iteration'] == iteration_number, 'Memory Min (bytes)'] = mem_df['value'].min()
                data_frame.loc[data_frame['Iteration'] == iteration_number, 'Memory Max (bytes)'] = mem_df['value'].max()
                data_frame.loc[data_frame['Iteration'] == iteration_number, 'Memory Mean (bytes)'] = mem_df['value'].mean()


    data_frame['Incremental Duration'] = data_frame['Incremental End'] - data_frame['Incremental Start']
    data_frame['Batch Duration'] = data_frame['Batch End'] - data_frame['Batch Start']

    data_frame = data_frame.sort_values(['Iteration'], ascending=[True])
    return data_frame




In [45]:
memory_dir = '/tmp/spark-memory'
event_dir = '/tmp/spark-events/'

app_id = 'local-1607283405250'

app_name = 'SchemEX-Manual-Test0'
res_frame = collectStatistics(app_name)

display(res_frame)



Unnamed: 0,Iteration,Incremental Start,Incremental End,Incremental Duration,Batch Start,Batch End,Batch Duration,Memory Min (bytes),Memory Max (bytes),Memory Mean (bytes)
0,0,1607283401812,1607283402924,1112,0,0,0,194100968,198295272,196198000.0


In [None]:
if not isdir('experiments/' + app_name):
    makedirs('experiments/' + app_name)

data_frame.to_csv('experiments/' + app_name + '/' + app_name + '-performance.csv', index=False)
print("Collected statistics for " + app_name)