In [334]:
from plotnine import *
import pandas
from sklearn import preprocessing
import itertools
from statistics import mean
from solarized import *
pandas.options.display.max_rows=10

df=pandas.read_feather('reports/nomodin_dorian.feather')
factors= ['wflow', 'optimizer', 'net', 'scale', 'index']
df.columns

Index(['index', 'bytes_memory', 'cpu_percent', 'bytes_sent', 'bytes_recv',
       'wall_time', 'exitcode', 'db_time', 'rep', 'net', 'scale', 'wflow',
       'optimizer'],
      dtype='object')

In [None]:
1.{5end_to_end} tpch 1,2,3,4,5 with scale 1, 10
2 {5module4} and 3. module4 | also {5module4mem} and {5module4net}
3.{5tpchmodin} tpch 1, 4, 5 with modin on scale 10
4.{5micro_scales} micros with scale 1, 10 | also {5micro_traffic}
5.{5micro_net} micro join, selection with net=wan and scale 1

In [367]:
df['rep']

0         2
1         2
2         2
3         2
4         2
         ..
208220    2
208221    2
208222    2
208223    2
208224    2
Name: rep, Length: 208225, dtype: int64

# CHECK

In [335]:
df['exitcode'].value_counts()

0    186285
1     21940
Name: exitcode, dtype: int64

In [336]:
df.query("exitcode == 1")['wflow'].unique()

array(['tpch1.py', 'q07.sql', 'q08.sql', 'q09.sql'], dtype=object)

# PREPROCESS

In [337]:
df['gb_memory'] = df['bytes_memory']/10**9
df.drop(columns=['bytes_memory'], inplace=True)

In [338]:
df['cpu'] = df['cpu_percent'].apply(lambda x: mean(x))
df.drop(columns=['cpu_percent'], inplace=True)

In [339]:
df['gb_net'] = df['bytes_recv']/10**9
df.drop(columns=['bytes_recv'], inplace=True)

In [341]:
df['procedural_time']=df['wall_time']-df['db_time']

In [384]:
micro = df[df['wflow'].isin(['micro_join.py','micro_sel.py','micro_proj.py','micro_max.py'])]

In [385]:
micro['rep'].value_counts()

2    106013
Name: rep, dtype: int64

In [386]:
# add id to interprocess measurements. Remove if snapshot_idx exists
micro['snapshot_idx'] = micro.groupby(factors)['wall_time'].rank(method='first').astype('int') # wall_time could be any column

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [387]:
interproc=micro[micro['rep']==2]
interproc=micro.drop(columns=['rep'])

In [399]:
overview = interproc.groupby(by=factors).first().reset_index().drop(columns=['exitcode', 'snapshot_idx'])
overview['scale'] = overview['scale'].astype('str')
overview.columns

Index(['wflow', 'optimizer', 'net', 'scale', 'index', 'bytes_sent',
       'wall_time', 'db_time', 'gb_memory', 'cpu', 'gb_net',
       'procedural_time'],
      dtype='object')

# PROCEDURAL AND DB TIME

In [402]:
def prep_for_plot(origin_df: pandas.DataFrame, measurements:list):
    prj = origin_df[factors+measurements]
    return prj.melt(id_vars=factors, value_vars=measurements,
                    value_name='measurement', var_name='var')


In [403]:
addon_factors=['net', 'scale', 'index']
def cycle_factors():
    factor_values=[]
    for factor in addon_factors:
        factor_values.append(list(exec_time_melt[factor].unique()))
    return list(itertools.product(*factor_values))

def gen_query(factors):
    combzip = zip(addon_factors, factors)
    combzip_quote = map(lambda x: [x[0], "'"+x[1]+"'"], combzip)
    eq = [' == '.join(f) for f in combzip_quote]
    return ' and '.join(eq)

    
def gen_name(factors:tuple)->str:
    print(factors)
    names, values = zip(*factors)
    print(names)
    print(values)

In [417]:
def plot_time(plot_df, plot_title):
    return (
        ggplot(plot_df, aes('optimizer', y='measurement', fill='var'))
        + geom_col(width=0.3)
        #+ scale_fill_manual(values=cdict) 
        + facet_wrap('wflow', scales='free')
        + ggtitle(plot_title)
        + xlab("Optimization Method")
        + ylab("Time (in sec.)")
        + theme(figure_size=(5, 2.5),
               subplots_adjust={'hspace': 1, 'wspace': 0.25})
    )
def plot_net(plot_df, plot_title):
    return (
    ggplot(plot_df, aes('optimizer', y='measurement'))
    + geom_col(width=0.3)
    #+ scale_fill_manual(values=cdict)
    + ggtitle(plot_title)
    + xlab("Optimization Method")
    + ylab("Time (in sec.)")
    + facet_wrap('wflow', scales='free')
    + theme(figure_size=(5, 2.5),
           subplots_adjust={'hspace': 1, 'wspace': 0.25})
)
def plot_cpu(plot_df, plot_title):
    plot_df['color']=C_BLUE
    dummy_df=plot_df.copy()
    dummy_df['color'] = C_GREEN
    return (
    ggplot(plot_df, aes('snapshot_idx', y='Memory usage in %'))
    + geom_area(aes(fill='color'))
    + geom_rect(aes(xmax='db_ratio', fill='color'), dummy_df, xmin=0,ymin=0,ymax=100,
               alpha=0.2)
    + facet_grid('optimizer ~ wflow')
    + theme(figure_size=(8, 2.5))
    + labs(y=None)
    + scale_fill_identity(name = 'Area colors', guide = 'legend',labels = ('Global Memory Consumption in %', 'Execution inside the RDBMS')) 
    # + scale_colour_manual(name = 'the colour', 
    #     values ={'black':'white','red':'blue'}, labels = ('c2','c1'))
)
def plot_mem(plot_df, plot_title):
    plot_df['color']=C_BLUE
    dummy_df=plot_df.copy()
    dummy_df['color'] = C_GREEN
    return (
    ggplot(plot_df, aes('snapshot_idx', y='Memory usage in %'))
    + geom_area(aes(fill='color'))
    + geom_rect(aes(xmax='db_ratio', fill='color'), dummy_df, xmin=0,ymin=0,ymax=100,
               alpha=0.2)
    + facet_grid('optimizer ~ wflow')
    + theme(figure_size=(8, 2.5))
    + labs(y=None)
    + scale_fill_identity(name = 'Area colors', guide = 'legend',labels = ('Global Memory Consumption in %', 'Execution inside the RDBMS')) 
    # + scale_colour_manual(name = 'the colour', 
    #     values ={'black':'white','red':'blue'}, labels = ('c2','c1'))
)


In [418]:
plot_time.__qualname__

'plot_time'

In [419]:
def gen_plots(all_factors):
    for bigdf, plot_f in [(df_mem, plot_mem), (df_cpu, plot_cpu),
                          (df_time, plot_time), (df_net, plot_net)]:
        for factors in all_factors:
            query = gen_query(factors)
            print(query)
            qdf = bigdf.query(query)
            qdf = qdf.drop(columns=addon_factors)
            name = ''.join(factors)
            folder = plot_f.__qualname__

            curplot = plot_f(qdf, query)
            curplot.save(f"plots/{folder}/{name}.png")
            display(curplot)
        

In [420]:
# pandas.options.display.max_rows=200
# exec_time_melt.query("optimizer=='optimized' and net=='loc' and scale=='1' and index=='false'")

In [421]:
df_time =prep_for_plot(overview, ['procedural_time', 'db_time'])
df_net = prep_for_plot(overview, ['gb_net'])
df_cpu = prep_for_plot(interproc, ['cpu'])
df_mem = prep_for_plot(interproc, ['gb_net'])


In [422]:
all_factors = cycle_factors()

In [425]:
gen_plots(all_factors)

net == 'lan' and scale == '1' and index == 'false'




SyntaxError: unexpected EOF while parsing (<string>, line 1)

In [56]:
# variable_cat = pandas.CategoricalDtype(categories=['python_time', 'db_time'], ordered=True)
# df['variable'] = df['variable'].astype(variable_cat)
# cdict={'python_time': C_BLUE,
#        'db_time': C_GREEN
# }
# df.rename(columns={'value':'Execution time %'}, inplace=True)

In order to explain these results, we also ran microbenchmarks: workflows consisting of a single operation on the dataset. The following list provides a quick overview of the microbenchmarks, all of which were expressed in pandas:
- join (Two database tables are pulled from the RDBMS and an inner join is performed)
- max (A database table is pulled from the RDBMS and a maximum of a column is calculated)
- projection (A database table is pulled from the RDBMS and a subset of the columns are removed inside the python runtime)
- selection (A database table is pulled from the RDBMS and a subset of the rows are removed inside the python runtime)
For a detailed look at the microbenchmarks, one can see the workflows inside the "benchmarks" folder in the project's git repositoty. It should be noted that none of the microbenchmarks modify tables inside the RDBMS.

Since the various microbenchmark workflows have different base runtimes (e.g. aggregations were 10 times faster in this specific measurement), the values we see in the above graphic are normalized, such that the slowest optimization method has a value of 1. Lower values are better.  We can see that our optimizer generally performed better than both modin and standard pandas "base".

To gain better insight about this, we split the overall wall time between the time it took the database to deliver the queried data "DB time" and the time it took the python environment to execute the operation "Python time". As expected, our performing operator pushdown to RDBMS practically eliminates "Python time". What is arguably more interesting, is that in most cases performing the pushdown also decreased "DB time". 