# Imports

In [47]:
import pandas as pd
from scipy import stats
from google.colab import drive
drive.mount('/content/drive')
import glob
import os
import matplotlib.pyplot as plt

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Load result metrics files from Google Drive

In [38]:
folder_path = '/content/drive/MyDrive/cdle'
csv_files = glob.glob(folder_path + '/*.csv')

for file in csv_files:
    print(os.path.basename(file))

local_benchmark_20250510_140935.csv
local_benchmark_20250511_124246.csv
local_benchmark_20250511_124435.csv


# Functions

In [61]:
def annotate(ax):
  """Annotate the height of each bar in the plot."""
  for p in ax.patches:
    ax.annotate("%.2fs" % p.get_height(), (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(0, 10), textcoords='offset points')



def annotate_x_times_faster(ax, x_times_list):
  """Annotate how many times faster is a processing tool per operation in the plot."""
  num_ops = len(x_times_list)
  for i, p in enumerate(ax.patches):
    if i < num_ops:
      ax.annotate("%.1fx" % x_times_list[i], (p.get_x() + p.get_width() / 2., p.get_height()), ha='center', va='center', xytext=(4, 10), textcoords='offset points', fontsize=8, weight='bold', color="#585858")



def create_dual_comparison_plots(plot_title, df, first_tool, second_tool):
  """Create the plots for comparison of metrics between processing tools."""
  # Plot with increased size
  ax = df.sort_index().plot.bar(
      title=plot_title,
      figsize=(20, 8)  # ← Change this as needed
  )
  ax.set_ylabel("Elapsed time (sec)")

  # Compute ratios
  tmp_df_x_times_faster = df.sort_index().copy()
  tmp_df_x_times_faster[f'{first_tool} / {second_tool}'] = tmp_df_x_times_faster.dask / tmp_df_x_times_faster.koalas

  # Annotate speedup
  annotate_x_times_faster(ax, x_times_list=tmp_df_x_times_faster[f'{first_tool} / {second_tool}'].to_list())

  # Plot log-scaled version with same increased size
  df.sort_index().plot.bar(
      logy=True,
      title=f'{plot_title} - log scaling',
      figsize=(20, 8)
  ).set_ylabel("Elapsed time (sec)")



def create_full_comparison_plots(plot_title, df):
  """Create the plots for comparison of metrics between all processing tools."""
  # Plot with increased size
  ax = df.sort_index().plot.bar(
      title=plot_title,
      figsize=(20, 8)  # ← Change this as needed
  )
  ax.set_ylabel("Elapsed time (sec)")

  # Plot log-scaled version with same increased size
  df.sort_index().plot.bar(
      logy=True,
      title=f'{plot_title} - log scaling',
      figsize=(20, 8)
  ).set_ylabel("Elapsed time (sec)")



def fair_avg(durations):
  """Get an average duration among multiple durations fairly by removing the first run and the best run first."""
  durations = durations[1:]
  durations.remove(min(durations))
  return sum(durations) / len(durations)



def rename_index(df):
    """Set 'task' as index and rename operations in the index for clarity."""
    df = df.set_index('task')
    df.index = pd.Index([
        s.replace("filtered ", "")
         .replace("cache ", "")
         .replace("local ", "")
         .replace("of columns", "of series")
         .replace("addition of series", "series addition")
         .replace("multiplication of series", "series multiplication")
         .replace("arithmetic ops", "arithmetic")
         .replace("count index length", "count index")
        for s in df.index
    ])
    return df



def avg_result_df(file_name_prefix):
  """Get result files with the given prefix and then construct the average result dataframe."""
  dfs = []

  # Loop over the actual list of CSV file paths
  for file_path in csv_files:
      filename = os.path.basename(file_path)
      if filename.startswith(file_name_prefix):
          dfs.append(pd.read_csv(file_path))

  print(f'{file_name_prefix} has {len(dfs)} runs')

  # Now compute the average
  avg_df = dfs[0].copy()

  for op in dfs[0].index:
      for lib in ['koalas', 'dask', 'modin', 'joblib', 'spark']:
          durations = [df.loc[op][lib] for df in dfs]
          avg_df.loc[op, lib] = fair_avg(durations)

  print(f"Loaded {len(dfs)} CSV files into DataFrames.")

  for i, file_path in enumerate(csv_files):
    print(f"{i+1}. {os.path.basename(file_path)}")

  return avg_df

# Create the results dataframe with the average values

In [None]:
benchmark_mode = 'local' # 'local' or 'distributed'
benchmark_df = rename_index(avg_result_df(benchmark_mode))
benchmark_df.head(10)

# Separate dataframes

Separate the metrics dataframe in 3 new dataframes: Standard operations, Standard operations with filter and Standard operations with filter and cache

In [None]:
standard_ops = benchmark_df.iloc[:15]
ops_with_filtering = benchmark_df.iloc[15:30]
ops_with_filtering_and_cache = benchmark_df.iloc[30:]

standard_ops.head(15)

# Create insights for comparison between 2 tools



In [None]:
selected_columns = ['dask', 'koalas'] # 'dask', 'koalas', 'modin', 'joblib', 'spark'
df = standard_ops # 'standard_ops', 'ops_with_filtering', 'ops_with_filtering_and_cache'
plot_title=f'Operations with filtering ({benchmark_mode})' # 'Operations with filtering',  'Operations with filtering and cache', 'Standard Operations'

create_dual_comparison_plots(plot_title, standard_ops[selected_columns], selected_columns[0], selected_columns[1])

# Show metrics

In [None]:
annotate(pd.Series(stats.gmean(df[selected_columns]), index=[selected_columns[0], selected_columns[1]]).plot.bar(title='Geometric mean'))

In [None]:
annotate(df[selected_columns].sum().plot.bar(title='Total execution time'))

In [None]:
tmp_df_x_times_faster = df.sort_index().copy()
tmp_df_x_times_faster[f'[{selected_columns[0]} / {selected_columns[1]}'] = tmp_df_x_times_faster[selected_columns[0]] / tmp_df_x_times_faster[selected_columns[1]]
tmp_df_x_times_faster

# Create insights for comparison between all tools


In [None]:
df = standard_ops # 'standard_ops', 'ops_with_filtering', 'ops_with_filtering_and_cache'
plot_title=f'Operations with filtering ({benchmark_mode})' # 'Operations with filtering',  'Operations with filtering and cache', 'Standard Operations'

create_full_comparison_plots(plot_title, df)

In [70]:
print("Performance diff %% (simple avg): %s" % (sum(df[selected_columns[0]] / df[selected_columns[1]]) / len(df)))
print("Performance diff %% (geomean): %s" % stats.gmean(df[selected_columns[0]] / df[selected_columns[1]]))


arithmetic_ops = df.filter(items=['complex arithmetic', 'series multiplication', 'series addition'], axis=0)
print("Performance diff (arthemetic) %% (simple avg): %s" % (sum(arithmetic_ops[selected_columns[0]] / arithmetic_ops[selected_columns[1]]) / len(arithmetic_ops)))
print("Performance diff (arthemetic) %% (geomean): %s" % stats.gmean(arithmetic_ops[selected_columns[0]] / arithmetic_ops[selected_columns[1]]))

basic_stats_ops = df.filter(items=['count', 'mean', 'standard deviation', 'count index', 'join', 'join count'], axis=0)
print("Performance diff (basic stats) %% (simple avg): %s" % (sum(basic_stats_ops[selected_columns[0]] / basic_stats_ops[selected_columns[1]]) / len(basic_stats_ops)))
print("Performance diff (basic stats) %% (geomean): %s" % stats.gmean(basic_stats_ops[selected_columns[0]] / basic_stats_ops[selected_columns[1]]))

Performance diff % (simple avg): 2.529296137793361
Performance diff % (geomean): 0.5960303014107625
Performance diff (arthemetic) % (simple avg): 8.694845825478447
Performance diff (arthemetic) % (geomean): 7.584173893794975
Performance diff (basic stats) % (simple avg): 1.0945918566471413
Performance diff (basic stats) % (geomean): 0.20501112863815132


Overall


In [None]:
overall_df = pd.concat([standard_ops, ops_with_filtering, ops_with_filtering_caching])
print("Total performance diff %% (simple avg): %s" % (sum(overall_df[selected_columns[0]] / overall_df[selected_columns[1]]) / len(overall_df)))
print("Total performance diff %% (geomean): %s" % stats.gmean(overall_df[selected_columns[0]] / overall_df[selected_columns[1]]))

NameError: name 'standard_ops' is not defined