In [131]:
from __future__ import division
import pandas as pd
import matplotlib.pyplot as plt
import datetime
import time
import math
import glob
import numpy as np
from decimal import *
getcontext().prec = 6
%matplotlib

Using matplotlib backend: Qt5Agg


In [159]:
trials = 1    # Number of trials for each data point 
cores = [64, 128, 256, 512, 1024]      # List of number of total cores at various data points

In [160]:
def extract_exec_time(df_cu):
    
    # Convert dataframe into a dictionary - for each uid (compute unit id), we get
    # the timestamps for 'Executing' and 'AgentStagningOutputPending' states
    
    '''
    The structure of the dictionary is as below:
    super_dict = {
                    'unit.00000': {
                                    'Executing': timestamp1,
                                    'AgentStagingOutputPending': timestamp2
                            }
                }
    '''
    
    super_dict = dict()
    for row in df_cu.iterrows():
        row=row[1]
        uid = row['uid']
        start_probe = float(row['Executing'])
        end_probe = float(row['AgentStagingOutputPending'])

        if uid not in super_dict:
            super_dict[uid] = dict()
            
        if 'Executing' not in super_dict[uid]:
            super_dict[uid]['Executing'] = start_probe
            
        if 'AgentStagingOutputPending' not in super_dict[uid]:
            super_dict[uid]['AgentStagingOutputPending'] = end_probe
            

    # Use the magic function to get the total time spent between 'Executing' and 'AgentStagingOutputPending'
    return get_Toverlap(super_dict, 'Executing', 'AgentStagingOutputPending')

In [161]:
def plot(data_df, err_df):
    
    # Fontsize for all text in the plot
    FONTSIZE=18
    
    ov_data = data_df['EnTK overhead']
    ov_err = err_df['EnTK overhead']
    
    exec_data = data_df['RP overhead']
    exec_err = err_df['RP overhead']
    
    ov_ax = ov_data.plot(kind='bar', 
                         yerr=ov_err, 
                         position=1, 
                         width=0.15, 
                         color='red',
                         fontsize=FONTSIZE)
    
    exec_ax = ov_ax.twinx()
    
    exec_ax = exec_data.plot(ax=exec_ax, 
                             kind='bar', 
                             yerr=exec_err, 
                             secondary_y=True, 
                             position=0, 
                             width=0.15, 
                             color='blue',
                            fontsize=FONTSIZE)
    
    ov_ax.set_xlabel('Total number of pipelines', fontsize=FONTSIZE)
    ov_ax.set_ylabel('EnTK Overhead (seconds)', fontsize=FONTSIZE)
    ov_ax.set_ylim(0,10)

    exec_ax.set_ylabel('RP Overhead (seconds)', fontsize=FONTSIZE)
    exec_ax.set_ylim(0,1000)
       
    # TODO: Following two lines does not make an effect, have to check how to assign the fontsize
    # for the tick labels of the seconday y axis
    for tick in exec_ax.yaxis.get_major_ticks():
        tick.label.set_fontsize(FONTSIZE) 
    
    
    l1, h1 = ov_ax.get_legend_handles_labels()
    l2, h2 = exec_ax.get_legend_handles_labels()
    
    l1.extend(l2)
    h1.extend(h2)
    
    ov_ax.legend(l1, h1, fontsize=FONTSIZE)
    ov_ax.set_title('Weak scaling behaviour of HT-BAC workflow on NCSA Blue Waters \n with null workload varying number of pipelines, number of cores per pipeline = 8')    
    plt.savefig('weak_scaling_null_workload_bw.pdf')

In [162]:
# Magic function

def get_Toverlap(d, start_state, stop_state):
    '''
    Helper function to create the list of lists from which to calculate the
    overlap of the elements of a DataFrame between the two boundaries passed as
     arguments.
    '''

    overlap = 0
    ranges = []

    for obj, states in d.iteritems():
        #print states
        ranges.append([states[start_state], states[stop_state]])

    for crange in collapse_ranges(ranges):
        overlap += crange[1] - crange[0]
    
    return overlap

def collapse_ranges(ranges):
    """
    given be a set of ranges (as a set of pairs of floats [start, end] with
    'start <= end'. This algorithm will then collapse that set into the
    smallest possible set of ranges which cover the same, but not more nor
    less, of the domain (floats).
    
    We first sort the ranges by their starting point. We then start with the
    range with the smallest starting point [start_1, end_1], and compare to the
    next following range [start_2, end_2], where we now know that start_1 <=
    start_2. We have now two cases:
    
    a) when start_2 <= end_1, then the ranges overlap, and we collapse them
    into range_1: range_1 = [start_1, max[end_1, end_2]
    
    b) when start_2 > end_2, then ranges don't overlap. Importantly, none of
    the other later ranges can ever overlap range_1. So we move range_1 to
    the set of final ranges, and restart the algorithm with range_2 being
    the smallest one.
    
    Termination condition is if only one range is left -- it is also moved to
    the list of final ranges then, and that list is returned.
    """

    final = []

    # sort ranges into a copy list
    _ranges = sorted (ranges, key=lambda x: x[0])
        
    START = 0
    END = 1

    base = _ranges[0] # smallest range

    for _range in _ranges[1:]:

        if _range[START] <= base[END]:
            # ranges overlap -- extend the base
            base[END] = max(base[END], _range[END])

        else:

            # ranges don't overlap -- move base to final, and current _range
            # becomes the new base
            final.append(base)
            base = _range

    # termination: push last base to final
    final.append(base)

    return final   

In [163]:
def extract_entk_overhead(df_pat):
    
    
    # Convert dataframe into a dictionary - for each task (we get a unique name by using the stage and 
    # the pipeline number in combination), we get
    # the timestamps for 'start_time', 'wait_time', 'res_time' and 'done_time' events
    
    '''
    The structure of the dictionary is as below:
    super_dict = {
                    'stage1-pipeline1': {
                                    'start_time': timestamp1, 
                                    'wait_time': timestamp2, 
                                    'res_time': timestamp3, 
                                    'done_time': timestamp4
                            }
                }
    '''
    from datetime import datetime, timedelta
    

    def totimestamp(dt, epoch=datetime(1970,1,1)):
        td = dt - epoch
        #return td.total_seconds()
        return (td.microseconds + (td.seconds + td.days * 86400) * 10**6) 

    
    super_dict = dict()
    for row in df_pat.iterrows():
        row=row[1]
        stage = row['stage']
        pipeline = row['pipeline']
        probe = row['probe']
        #timestamp = time.mktime(time.strptime(row['timestamp'], "%Y-%m-%d %H:%M:%S.%f"))
        #print datetime.strptime(row['timestamp'], "%Y-%m-%d %H:%M:%S.%f")
        precise_epoch = totimestamp(datetime.strptime(row['timestamp'], "%Y-%m-%d %H:%M:%S.%f"))
        # totimestamp(datetime.fromtimestamp(timestamp))
        #print timestamp, precise_epoch
        if '%s-%s'%(stage, pipeline) not in super_dict:
            super_dict['%s-%s'%(stage, pipeline)] = dict()
            
        if probe not in super_dict['%s-%s'%(stage, pipeline)]:
            super_dict['%s-%s'%(stage, pipeline)][probe] = precise_epoch
            
    return (get_Toverlap(super_dict, 'start_time', 'wait_time') + get_Toverlap(super_dict, 'res_time', 'done_time'))/ 10**6 

In [164]:
#MAIN

data_df = pd.DataFrame(columns=['EnTK overhead',
                                'RP overhead'])

err_df = pd.DataFrame(columns=[ 'EnTK overhead',
                                'RP overhead'])

for c in cores:
    
    
    # This list is to get the average and error for each result across multiple trials
    entk_ov_list = list()
    exec_list = list()
    
    for t in range(1,trials+1):
        
        # Read csv files into DataFrames
        df_pat = pd.read_csv('weak_scaling_null_workload_data/null-ws-{1}cores-trial{0}/enmd_pat_overhead.csv'.format(t,c),
                             header=0,
                             sep=',',
                             skipinitialspace=True)
        fname = glob.glob('weak_scaling_null_workload_data/null-ws-{1}cores-trial{0}/execution*.csv'.format(t,c))[0]
        df_cu = pd.read_csv('{0}'.format(fname),header=0,sep=',',skipinitialspace=True)
        
        
        # Methods to get the EnTK overhead, execution time
        entk_ov = extract_entk_overhead(df_pat)
        exec_time = extract_exec_time(df_cu)
        
        
        # Add overhead and execution time from current iteration to list
        entk_ov_list.append(entk_ov)
        exec_list.append(exec_time)
 
    # Get the average and stderr when we have done multiple trials for each data point
    data_df.loc[c/8] = [np.average(entk_ov_list),
                        np.average(exec_list)]
    
    err_df.loc[c/8] = [ np.std(entk_ov_list)/math.sqrt(trials),
                    np.std(exec_list)/math.sqrt(trials)]

# Bar plot of the results    
#print data_df
plot(data_df,err_df)