In [1]:
import pandas as pd
import numpy as np
import glob
import os

from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))
display(HTML("<style>div.output_scroll { height: 44em; }</style>"))

In [2]:
def get_meta(path):
    props = os.path.splitext(os.path.basename(path))[0].split('_')
    values = [int(x[1:]) for x in props[1:]]

    return {'max_value':values[0],
            'threads':values[1],
            'input_size_approx':values[2],
            'repeats':values[3]}

In [10]:
def load(file):
    """Load the experiment data from a CSV file with converter metrics."""
        
    schema = {
        'num_threads': np.int64(),
        'num_jsons_converted': np.int64(),
        'num_json_bytes_converted': np.int64(),
        'num_recordbatch_bytes': np.int64(),
        'num_ipc': np.int64(),
        'ipc_bytes': np.int64(),
        'num_buffers_converted': np.int64(),
        't_parse': np.float64(),
        't_resize': np.float64(),
        't_serialize': np.float64(),
        't_thread': np.float64(),
        't_enqueue': np.float64(),
        'status': np.int64()
    }
    
    display('Reading: {}'.format(file))
    
    df = pd.read_csv(file, dtype=schema)
    
    meta = get_meta(file)

    for key, value in meta.items():  
        df.insert(0, key, value)
        
    # Make sure there were no errors for converters.
    assert(df['status'].sum() == len(df.index))
        
    return df

In [11]:
def analyze(df):
    """Analyze the experiment data, deriving various metrics such as throughput."""
    # Calculate time spent within the thread as 'other'.
    df['t_other'] = df['t_thread'] - df[['t_parse', 't_resize', 't_serialize', 't_enqueue']].sum(axis=1)
    
    # Calculate the throughput per thread
    df['Parse throughput (in)'] = df['num_json_bytes_converted'] / df['t_parse']
    df['Parse throughput (out)'] = df['num_recordbatch_bytes'] / df['t_parse']
    
    return df

In [12]:
def summarize(df):
    """Summarize the data from one run into one row with averages."""
    
    assert(len(pd.unique(df['max_value'])==1))
    assert(len(pd.unique(df['threads'])==1))
    assert(len(pd.unique(df['input_size_approx'])==1))
    assert(df['num_threads'].sum()==pd.unique(df['threads'])[0])
    repeats = pd.unique(df['repeats'])[0]
    
    row = {'Max. value': pd.unique(df['max_value'])[0],
           'Input size': pd.unique(df['input_size_approx'])[0],
           'Repeats': pd.unique(df['repeats'])[0],
           'Threads': df['num_threads'].sum(),
           'JSONs': df['num_jsons_converted'].sum() / repeats,
           'Bytes (in)': df['num_json_bytes_converted'].sum() / repeats,
           'RecordBatch bytes': df['num_recordbatch_bytes'].sum() / repeats,
           'IPC messages': df['num_ipc'].sum() / repeats,
           'IPC bytes': df['ipc_bytes'].sum() / repeats,
           'Buffers converted': df['num_buffers_converted'].sum() / repeats,
           # For time, we use the max time of all threads, 
           # since the throughput is determined by the slowest thread in the pool,
           # and they all start operating simultaneously
           'Parse time': df['t_parse'].max(),
           'Resize time': df['t_resize'].max(),
           'Serialize time': df['t_serialize'].max(),
           'Enqueue time': df['t_enqueue'].max(),
           'Other time': df['t_other'].max(),
           'Thread time': df['t_thread'].max(),
           'Parse throughput (in)': df['num_json_bytes_converted'].sum() / df['t_parse'].max(),
           'Parse throughput (out)': df['num_recordbatch_bytes'].sum() / df['t_parse'].max()}
              
    return row;

In [13]:
def get_all_data(data_path, schema, impl):
    path = '{}/{}/latency/threads/metrics/{}/'.format(data_path, schema, impl.lower())
    csv_files = []
    for file in glob.glob("{}*.csv".format(path)):
        csv_files.append(file)
    print("Found {} files in {}".format(len(csv_files), path))

    records = []
    for file in csv_files:
        records.append(summarize(analyze(load(file))))


    df = pd.DataFrame.from_records(records)
    df.sort_values(by=['Threads', 'JSONs'], inplace=True)
    df.insert(0,'Implementation', impl)
    
    # Use only best value
    df = df[df['Max. value'] == 18446744073709551615]
    
    # Print max throughput
    display(df['Parse throughput (in)'].max())
    
    return df

In [14]:
def get_max_throughput_for_max_size(df):
    df = df[df.JSONs == df.JSONs.max()]
    #df.set_index('Threads', inplace=True)

    result = df[df['Parse throughput (in)'] == df['Parse throughput (in)'].max()]

    return result

In [15]:
import matplotlib.pyplot as plt
from utils import lighten_color

plt.rcParams.update({
    "text.usetex": True,
    "font.family": "serif",
    "font.serif": ["Palatino"],
    "font.size": 14
})

colors = ['#4878d0', '#6acc64', '#d65f5f', '#d5bb67', '#dc7ec0', '#8c613c']
markers = ['o', 's', 'd']

In [16]:
d_impls = []

d_impls.append(get_all_data('../experiments/data-sigmax', 'trip', 'Arrow'))
d_impls.append(get_all_data('../experiments/data-sigmax', 'trip', 'Custom'))
d_impls.append(get_all_data('../experiments/data-sigmax', 'trip', 'FPGA'))

df = pd.concat(d_impls)

#with pd.option_context('display.max_rows', None, 'display.max_columns', None):  
display(df)

# Get all dimensions for plots
#max_values = pd.unique(df['Max. value'])
#max_num_values = pd.unique(df['Max. number of values'])
value_bytes = np.sort(pd.unique(df['Value bytes']))
input_sizes = np.sort(pd.unique(df['Input size']))
threads = np.sort(pd.unique(df['Threads']))
impls = pd.unique(df['Implementation'])

print("Value bytes    :", value_bytes)
print("Input sizes    :", input_sizes)
print("Threads        :", threads)
print("Impls          :", impls)

Found 12 files in ../experiments/data-sigmax/trip/latency/threads/metrics/arrow/


'Reading: ../experiments/data-sigmax/trip/latency/threads/metrics/arrow/metrics_m18446744073709551615_t1_s16777216_r8.csv'

'Reading: ../experiments/data-sigmax/trip/latency/threads/metrics/arrow/metrics_m18446744073709551615_t12_sNone_r8.csv'

ValueError: invalid literal for int() with base 10: 'None'

In [None]:
fig, axs = plt.subplots(nrows=len(value_bytes), ncols=len(input_sizes), figsize=(2.5 * len(input_sizes), 2.5 * len(value_bytes)), sharey=True, sharex=True)

handles = {}

for xa, inps in enumerate(input_sizes):
    for ya, valb in enumerate(value_bytes):
        ax = axs[ya][xa]
                
        for i, impl in enumerate(impls):
            # Prepare plotting data
            dl = df[(df['Value bytes'] == valb) & (df['Input size'] == inps) & (df['Implementation'] == impl)]
            y = dl['Parse throughput (in)'] * 1e-9
            x = dl['Threads']
            
            # Plot FPGA data
            handles[impl], = ax.plot(x, y, c=lighten_color(colors[i],0.3), marker=markers[i], mfc=colors[i], mec=colors[i], linewidth=3)
            
            if impl == 'FPGA':
                ax.axhline(y=y.to_numpy()[-1], color=lighten_color(colors[i],0.7))
            
            
            
        # Set inline 
        ax.annotate("V:{:.1f} B\nS:{:.0f} MiB".format(valb, inps / (1<<20)), 
                    xycoords='axes fraction', 
                    xy=(0.05, 0.775), 
                    fontsize=12,
                    backgroundcolor='#FFFFFF80')
        
        ax.set_xticks(threads)
        ax.set_xticklabels(threads, rotation=90, fontsize=8)
        
        ax.set_yticks(range(0, 25,4))
        ax.set_ylim(0, 25)
        
        ax.grid(which='both')
        
        if (ya == len(value_bytes) - 1) and (xa == 0):
            ax.set_xlabel('Threads / Parser instances')
            ax.set_ylabel('Throughput (GB/s)')
                        
leg_handles = [v for k,v in handles.items()]
leg_labels = [k for k,v in handles.items()]
fig.legend(leg_handles, leg_labels, ncol=3, bbox_to_anchor=(-0.25, 0.83, 1.0, 0.1))
plt.subplots_adjust(hspace = .1, wspace = .075)

fig.savefig("throughput.pdf")