In [1]:
import pandas as pd
import numpy as np
from numpy.random import randint, shuffle
import random

def get_multiple_dfs(data_frame_rows):
    """
    Get a complete code str that creates a DF with random value
    """
    sizes_before_join = int(data_frame_rows * 1.1)
    start_with_offset = int(data_frame_rows * 0.1)
    end_with_offset = start_with_offset + sizes_before_join
    assert sizes_before_join - start_with_offset == data_frame_rows

    id_a = np.arange(sizes_before_join)
    shuffle(id_a)
    a = randint(0,100,size=(sizes_before_join))
    b = randint(0,100,size=(sizes_before_join))
    categories = ['cat_a', 'cat_b', 'cat_c']
    group_col_1 = pd.Series(random.choices(categories, k=sizes_before_join))
    group_col_2 = pd.Series(random.choices(categories, k=sizes_before_join))
    group_col_3 = pd.Series(random.choices(categories, k=sizes_before_join))

    id_b = np.arange(start_with_offset, end_with_offset)
    shuffle(id_b)
    c = randint(0,100,size=(sizes_before_join)) 
    d = randint(0,100,size=(sizes_before_join))

    df_a = pd.DataFrame(zip(id_a, a, b, group_col_1, group_col_2, group_col_3), columns=['id', 'A', 'B', 
        'group_col_1', 'group_col_2', 'group_col_3'])
    df_b = pd.DataFrame(zip(id_b, c, d), columns=['id', 'C', 'D'])
        
    return df_a, df_b

In [2]:
df_a, df_b = get_multiple_dfs(3000000)

# Without instrumentation

In [3]:
%%time
test = df_a.merge(df_b, on='id')

CPU times: user 1.46 s, sys: 371 ms, total: 1.83 s
Wall time: 1.84 s


# Without instrumentation, with duckdb

In [4]:
import duckdb
con = duckdb.connect(database=':memory:', read_only=False)

In [5]:
%%time
con.register('df_a', df_a)
con.register('df_b', df_b)
test = con.execute('SELECT * FROM df_a JOIN df_b ON df_a.id = df_b.id').fetchdf()

CPU times: user 1.36 s, sys: 377 ms, total: 1.73 s
Wall time: 1.73 s


In [6]:
from mlinspect.inspections._lineage import LineageId, JoinLineageId

lineage_id_list_a = [LineageId(0, row_id) for row_id in range(len(df_a))]
lineage_ids_a = pd.DataFrame({"RowLineage(10)": pd.Series(lineage_id_list_a, dtype="object")})
lineage_id_list_b = [LineageId(0, row_id) for row_id in range(len(df_b))]
lineage_ids_b = pd.DataFrame({"RowLineage(10)": pd.Series(lineage_id_list_b, dtype="object")})

In [7]:
from mlinspect.backends._pandas_backend import PandasBackend, execute_inspection_visits_join
from mlinspect.backends._pandas_backend_frame_wrapper import MlinspectDataFrame
from mlinspect.inspections import RowLineage
from mlinspect.inspections._inspection_input import OperatorContext
from mlinspect.instrumentation._dag_node import OperatorType, CodeReference

pandas_backend = PandasBackend()
pandas_backend.inspections = [RowLineage(10)]
function_info = ('pandas.core.frame', 'merge')
operator_context = OperatorContext(OperatorType.JOIN, function_info)
code_reference = CodeReference(0,0,0,10)

mlinspect_df_a = MlinspectDataFrame(df_a.copy())
mlinspect_df_a.annotations = lineage_ids_a
mlinspect_df_b = MlinspectDataFrame(df_b.copy())
mlinspect_df_b.annotations = lineage_ids_b

# With current instrumentation

In [8]:
%%time
mlinspect_df_a['mlinspect_index_x'] = range(0, len(mlinspect_df_a))
mlinspect_df_b['mlinspect_index_y'] = range(0, len(mlinspect_df_b))
original_return_value = mlinspect_df_a.merge(mlinspect_df_b, on='id')


return_value = execute_inspection_visits_join(pandas_backend, operator_context, code_reference,
                                              mlinspect_df_a,
                                              mlinspect_df_a.annotations,
                                              mlinspect_df_b,
                                              mlinspect_df_b.annotations,
                                              original_return_value)

# list(pandas_backend.dag_node_identifier_to_inspection_output.values())
# return_value.annotations

CPU times: user 26.9 s, sys: 2.97 s, total: 29.9 s
Wall time: 30 s


# Calculating directly using apply (apply is not a fast function)

In [9]:
%%time
df_a['lineage_left'] = lineage_ids_a
df_b['lineage_right'] = lineage_ids_b

join_result = df_a.merge(df_b, on='id')

join_result['annotations'] = join_result.apply(lambda row: JoinLineageId([row.lineage_left, row.lineage_right]), axis=1)
join_result.drop('lineage_left', inplace=True, axis=1)
join_result.drop('lineage_right', inplace=True, axis=1)
dag_annotation = join_result.head(10)
annotation = join_result.pop('annotations')

# dag_annotation
# annotation

CPU times: user 1min 6s, sys: 1.56 s, total: 1min 8s
Wall time: 1min 8s


In [10]:
def lineage_iter(lineage_left, lineage_right):
    zipped_lineage = zip(lineage_left, lineage_right)
    join_ids_iter = map(lambda input_tuple: JoinLineageId([*input_tuple]), zipped_lineage)
    return list(join_ids_iter)

# Calculating directly using zip and map on numpy arrays

In [11]:
%%time
df_a['lineage_left'] = lineage_ids_a
df_b['lineage_right'] = lineage_ids_b

join_result = df_a.merge(df_b, on='id')

join_result['annotations'] = lineage_iter(join_result['lineage_left'].values, join_result['lineage_right'].values)
join_result.drop('lineage_left', inplace=True, axis=1)
join_result.drop('lineage_right', inplace=True, axis=1)
dag_annotation = join_result.head(10)
annotation = join_result.pop('annotations')

# dag_annotation
# annotation

CPU times: user 15.2 s, sys: 996 ms, total: 16.2 s
Wall time: 16.2 s


In [12]:
def lineage_row_compute(lineage_left, lineage_right):
    return JoinLineageId([lineage_left, lineage_right])

# Np vectorize is a bit faster

In [13]:
%%time
df_a['lineage_left'] = lineage_ids_a
df_b['lineage_right'] = lineage_ids_b

join_result = df_a.merge(df_b, on='id')

lineage_vectorized = np.vectorize(lineage_row_compute)
join_result['annotations'] = lineage_vectorized(join_result['lineage_left'].values, join_result['lineage_right'].values)
join_result.drop('lineage_left', inplace=True, axis=1)
join_result.drop('lineage_right', inplace=True, axis=1)
dag_annotation = join_result.head(10)
annotation = join_result.pop('annotations')

# dag_annotation
# annotation

CPU times: user 11.2 s, sys: 782 ms, total: 12 s
Wall time: 12 s


In [14]:
from mlinspect.inspections._lineage import LineageId, JoinLineageId

lineage_id_list_a = ["LineageId(0, " + str(row_id) + ")" for row_id in range(len(df_a))]
lineage_ids_a = pd.DataFrame({"RowLineage(10)": pd.Series(lineage_id_list_a, dtype="object")})
lineage_id_list_b = ["LineageId(1, " + str(row_id) + ")" for row_id in range(len(df_a))]
lineage_ids_b = pd.DataFrame({"RowLineage(10)": pd.Series(lineage_id_list_b, dtype="object")})

# Calculating directly by using strings for the lineage id, this can likely be done much smarter using uints

In [15]:
%%time
df_a['lineage_left'] = lineage_ids_a
df_b['lineage_right'] = lineage_ids_b

join_result = df_a.merge(df_b, on='id')

join_result['annotations'] = "JoinLineageId([" + join_result["lineage_left"] + ", " + join_result["lineage_right"] + "])"
join_result.drop('lineage_left', inplace=True, axis=1)
join_result.drop('lineage_right', inplace=True, axis=1)
dag_annotation = join_result.head(10)
annotation = join_result.pop('annotations')

# dag_annotation
# annotation

CPU times: user 6.16 s, sys: 1.16 s, total: 7.32 s
Wall time: 7.35 s


# Calculating directly by using strings with duckdb (still WIP)

In [16]:
%%time

df_a['lineage_left'] = lineage_ids_a
df_b['lineage_right'] = lineage_ids_b

con.register('df_a', df_a)
con.register('df_b', df_b)

join_result = con.execute("SELECT *, 'JoinLineageID([' || df_a.lineage_left || ', ' || df_b.lineage_right || '])' as annotations FROM df_a JOIN df_b ON df_a.id = df_b.id").fetchdf()
dag_annotation = join_result.head(10)
annotation = join_result.pop('annotations')

# dag_annotation
# annotation

CPU times: user 3.35 s, sys: 684 ms, total: 4.04 s
Wall time: 4.1 s


# Todos

* Think about alternative lineage representation not using custom Python objects 
* Then try to find a pandas/numpy way to use this to benefit from vectorization
* Test how fast this can be executed in other ways, e.g., with DuckDB

# Learnings
* Don't let inspections use arbitrary data types for the inspection annotations
* It seems like the current performance overhead mainly comes from the end of the pipeline, not the earlier parts with pandas
* In the current implementation, we probably should try to use numpy arrays directly that back the pd columns instead of the modfied itertuples stuff