In [None]:
from df_cereal.df_widget import DFWidget, BaseWidget
import pandas as pd
import pyarrow as pa
import numpy as np
import pyarrow.feather as feather
import base64
import json
import io
import time
from buckaroo import BuckarooWidget

In [None]:
from traitlets import Unicode, List, Dict, observe, Any, Bytes, observe, Bool

class Base64Widget(BaseWidget):
    _view_name = Unicode('Base64WidgetView').tag(sync=True)
    df_base64 = Any("").tag(sync=True)

class BytesWidget(BaseWidget):
    _view_name = Unicode('BytesWidgetView').tag(sync=True)
    df_arrow_bytes = Bytes().tag(sync=True)

class SimpleBytesBenchmarkWidget(BaseWidget):
    _view_name = Unicode('BytesBenchmarkWidgetView').tag(sync=True)
    df_arrow_bytes = Bytes().tag(sync=True)
    timing_info = Dict({}).tag(sync=True)
    do_calc = Bool(True).tag(sync=True)

In [None]:
def df_to_arrow_bytes(df):
    table = pa.Table.from_pandas(df)
    fake_file = io.BytesIO()
    feather.write_feather(table, fake_file, compression='uncompressed')
    fake_file.seek(0)
    return fake_file.read()

def df_to_base64(df):
    return base64.b64encode(df_to_arrow_bytes(df)).decode('utf8')

def df_gen_func(exp):
    N = 4 * (10**exp)//2
    big_df = df = pd.DataFrame({
        'str': ["foo", "barr", "bazz", "bofffff"] * (N//4),
        'log_normal': np.random.lognormal(25, .3, N)})
    return df

def second_convert(ser):
    return ser.dt.seconds + ser.dt.microseconds / (10**6)

In [None]:
class BaseBenchmarkWidget(BaseWidget):
    _view_name = Unicode('BytesBenchmarkWidgetView').tag(sync=True)
    timing_info = Dict({}).tag(sync=True)
    do_calc = Bool(True).tag(sync=True)
    def convert_df(self, df):
        pass
    target_prop = None

    def __init__(self, df_gen_func, min_n, max_n, **kwargs):
        self.results_arr = []
        self.max_n = max_n
        self.current_n = min_n
        self.df_gen_func = df_gen_func
        self.bw = BuckarooWidget(pd.DataFrame(), pinned_rows=[])
        display(self.bw)
        super().__init__()
        self.set_from_gen()
        
    def __del__(self):
        print("in __del__")
        self.bw.close()
        del self.bw
        return super().__del__()
    
    def format_results(self):
        tdf = pd.DataFrame(self.results_arr)
        tdf['klass'] = self.__class__.__name__
        tdf['py_serialize'] = second_convert(tdf['serial_t2'] - tdf['serial_t1'])
        tdf['widget_xfer'] =  second_convert(tdf['t1'] - tdf['serial_t2'] )
        tdf['js_deserialize'] = second_convert(tdf['t2'] - tdf['t1'])
        tdf['js_iterate'] = second_convert(tdf['t3'] - tdf['t2'])
        tdf['py_notify'] = second_convert(tdf['t4'] - tdf['t3'])
        tdf['roundtrip'] = second_convert(tdf['t4'] - tdf['serial_t3'])
        self.bw.raw_df = tdf[['rows', 'bytes', 'elements', 'klass', 'py_serialize', 'widget_xfer', 'js_deserialize', 'js_iterate', 'py_notify', 'roundtrip']]
    
    def set_from_gen(self):
        self.do_calc = False
        self.timing_info = {}
        self.current_timing = {'gen_t1': pd.Timestamp.utcnow()}
        gen_df = self.df_gen_func(self.current_n)
        self.current_timing['serial_t1'] = pd.Timestamp.utcnow()
        self.current_timing['rows'] = len(gen_df)
        self.current_timing['elements'] = len(gen_df) * len(gen_df.columns)
        byts = self.convert_df(gen_df)
        self.current_timing['bytes'] = len(byts)

        self.current_timing['serial_t2'] = pd.Timestamp.utcnow()
        self.do_calc = True
        #print("setting arrow bytes", len(byts) / 1_000_000)
        setattr(self, self.target_prop, byts)
        self.current_timing['serial_t3'] = pd.Timestamp.utcnow()
        #print("after set arrow bytes")
    
    @observe('timing_info')
    def _timing_info_change(self, change):
        if len(self.timing_info) == 0:
            print("exit early because empty timing_info")
            return
        new_res = { k: pd.Timestamp(v) for k,v in self.timing_info.items()}
        new_res['t4'] = pd.Timestamp.utcnow()
        new_res.update(self.current_timing)
        self.results_arr.append(new_res)
        if self.current_n > self.max_n:
            print("done")
            self.format_results()
            return
        else:
            self.do_calc = False
            self.timing_info = {}
            #time.sleep(.1)
            self.set_from_gen()
            self.current_n += 1

In [None]:
class Base64BenchmarkWidget(BaseBenchmarkWidget):
    _view_name = Unicode('Base64BenchmarkWidgetView').tag(sync=True)
    df_base64 = Any("").tag(sync=True)
    target_prop = 'df_base64'
    
    def convert_df(self, df):
        return df_to_base64(df)
class BytesBenchmarkWidget(BaseBenchmarkWidget):
    _view_name = Unicode('BytesBenchmarkWidgetView').tag(sync=True)

    def convert_df(self, df):
        return df_to_arrow_bytes(df)
    df_arrow_bytes = Bytes().tag(sync=True)
    target_prop = 'df_arrow_bytes'

class DFDataBenchmarkWidget(BaseBenchmarkWidget):
    _view_name = Unicode('DFDataBenchmarkWidgetView').tag(sync=True)

    def convert_df(self, df):
        return json.loads(df.to_json(orient='records'))
    df_data = Any([]).tag(sync=True)
    target_prop = 'df_data'

In [None]:
def df_gen_func(exp):
    N = 20 * (10**exp)//2
    big_df = df = pd.DataFrame({
        'str': ["foo", "barr", "bazz", "bofffff"] * (N//4),
        'log_normal': np.random.lognormal(25, .3, N),
        'bool': [True, False] * (N//2),
        'randint': np.random.randint(0,100,N)
    })
    return df

In [None]:
all_obs_dfs = []
dfd_bn = DFDataBenchmarkWidget(df_gen_func, min_n=1, max_n=4)
dfd_bn

In [None]:
all_obs_dfs.append(dfd_bn.bw.raw_df)
dfd_bn.close()
dfd_bn.bw.close()
del dfd_bn.bw
del dfd_bn
b64_bn = Base64BenchmarkWidget(df_gen_func, min_n=1, max_n=6)
b64_bn

In [None]:
all_obs_dfs.append(b64_bn.bw.raw_df)

b64_bn.close()
del b64_bn
byt_bn = BytesBenchmarkWidget(df_gen_func, min_n=1, max_n=6)
byt_bn

In [None]:
all_obs_dfs.append(byt_bn.bw.raw_df)
byt_bn.close()
del byt_bn

In [None]:
full_df = pd.concat(all_obs_dfs)
full_df['rows_per_second'] = full_df['roundtrip'] / full_df['rows']
full_df