# Benchmark: Koalas (PySpark) and Dask - Local execution

The benchmark was performed against the 2009 - 2013 Yellow Taxi Trip Records (157 GB) from NYC Taxi and Limousine Commission (TLC) Trip Record Data. We identified common operations from our pandas workloads such as basic calculations of statistics, join, filtering and grouping on this dataset.

The operations were measured with/without filter operations to consider real world workloads.

### Columns of the dataset:

```Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge', 'airport_fee'],
      dtype='object')
```

## Set-up

In [1]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()

In [2]:
spark.conf.set("spark.databricks.io.cache.enabled", "false")
print("spark.databricks.io.cache.enabled is %s" % spark.conf.get("spark.databricks.io.cache.enabled"))

spark.databricks.io.cache.enabled is false


In [3]:
%pip install -U koalas dask[complete] numpy pandas pyarrow

Collecting koalas
  Downloading koalas-1.8.2-py3-none-any.whl.metadata (6.4 kB)
Collecting numpy
  Using cached numpy-1.26.4-cp39-cp39-win_amd64.whl.metadata (61 kB)
Collecting pandas
  Downloading pandas-2.2.2-cp39-cp39-win_amd64.whl.metadata (19 kB)
Collecting pyarrow
  Downloading pyarrow-16.0.0-cp39-cp39-win_amd64.whl.metadata (3.1 kB)
Collecting dask[complete]
  Using cached dask-2024.5.0-py3-none-any.whl.metadata (3.8 kB)
Collecting click>=8.1 (from dask[complete])
  Using cached click-8.1.7-py3-none-any.whl.metadata (3.0 kB)
Collecting cloudpickle>=1.5.0 (from dask[complete])
  Using cached cloudpickle-3.0.0-py3-none-any.whl.metadata (7.0 kB)
Collecting fsspec>=2021.09.0 (from dask[complete])
  Using cached fsspec-2024.3.1-py3-none-any.whl.metadata (6.8 kB)
Collecting partd>=1.2.0 (from dask[complete])
  Downloading partd-1.4.2-py3-none-any.whl.metadata (4.6 kB)
Collecting pyyaml>=5.3.1 (from dask[complete])
  Using cached PyYAML-6.0.1-cp39-cp39-win_amd64.whl.metadata (2.1 kB)
C

In [1]:
import pandas as pd
import numpy as np
import databricks.koalas as ks
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

print('pandas version: %s' % pd.__version__)
print('numpy version: %s' % np.__version__)
print('koalas version: %s' % ks.__version__)
import dask
print('dask version: %s' % dask.__version__)

import time

def benchmark(f, df, benchmarks, name, **kwargs):
    """Benchmark the given function against the given DataFrame.
    
    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name
    
    Returns
    -------
    Duration (in seconds) of the given operation
    """
    start_time = time.time()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.time() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")
    return benchmarks['duration'][-1]

def get_results(benchmarks):
    """Return a pandas DataFrame containing benchmark results."""
    return pd.DataFrame.from_dict(benchmarks)



pandas version: 2.2.2
numpy version: 1.23.0
koalas version: 1.8.2
dask version: 2024.5.0


# Dask

## Preparation

In [None]:
client = Client()

dask_data = dd.read_parquet('../../Data/yellow_tripdata_2013-06.parquet',index='VendorID')

dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

In [44]:
import pandas as pd

# Path to your Parquet file
file_path = '../../Data/yellow_tripdata_2023-02.parquet'

# Read the first few rows of the Parquet file
df_sample = pd.read_parquet(file_path, engine='pyarrow')

# Print the column names
print(df_sample.columns)


Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
       'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag',
       'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra',
       'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge',
       'total_amount', 'congestion_surcharge', 'Airport_fee'],
      dtype='object')


In [46]:
df_sample.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,1,2023-02-01 00:32:53,2023-02-01 00:34:34,2.0,0.3,1.0,N,142,163,2,4.4,3.5,0.5,0.0,0.0,1.0,9.4,2.5,0.0
1,2,2023-02-01 00:35:16,2023-02-01 00:35:30,1.0,0.0,1.0,N,71,71,4,-3.0,-1.0,-0.5,0.0,0.0,-1.0,-5.5,0.0,0.0
2,2,2023-02-01 00:35:16,2023-02-01 00:35:30,1.0,0.0,1.0,N,71,71,4,3.0,1.0,0.5,0.0,0.0,1.0,5.5,0.0,0.0
3,1,2023-02-01 00:29:33,2023-02-01 01:01:38,0.0,18.8,1.0,N,132,26,1,70.9,2.25,0.5,0.0,0.0,1.0,74.65,0.0,1.25
4,2,2023-02-01 00:12:28,2023-02-01 00:25:46,1.0,3.22,1.0,N,161,145,1,17.0,1.0,0.5,3.3,0.0,1.0,25.3,2.5,0.0


## Standard operations

In [47]:
def read_file_parquet(df=None):
    return dd.read_parquet('../../Data/yellow_tripdata_2013-06.parquet')
  
def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean().compute()

def standard_deviation(df):
    return df.fare_amount.std().compute()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean().compute()

def sum_columns(df):
    return (df.fare_amount + df.tip_amount).compute()

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean().compute()

def product_columns(df):
    return (df.fare_amount * df.tip_amount).compute()
  
def value_counts(df):
    return df.fare_amount.value_counts().compute()
  
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean().compute()
  
def complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.compute()
  
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    ).compute()
other = groupby_statistics(dask_data)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True).compute()

In [48]:
benchmark(read_file_parquet, df=None, benchmarks=dask_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_benchmarks, name='value counts')
# No column for this
#benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='mean of complex arithmetic ops')
#benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_benchmarks, name='groupby statistics')
benchmark(join_count, dask_data, benchmarks=dask_benchmarks, name='join count', other=other)
#benchmark(join_data, dask_data, benchmarks=dask_benchmarks, name='join', other=other)  cant join

read file took: 0.017640352249145508 seconds
count took: 0.10663700103759766 seconds
count index length took: 0.869950532913208 seconds
mean took: 2.00138783454895 seconds
standard deviation took: 1.8177976608276367 seconds
mean of columns addition took: 2.728876829147339 seconds
addition of columns took: 6.491725444793701 seconds
mean of columns multiplication took: 2.517961263656616 seconds
multiplication of columns took: 6.112968921661377 seconds
value counts took: 2.2724311351776123 seconds
groupby statistics took: 3.4179625511169434 seconds
join count took: 1.939040184020996 seconds


1.939040184020996

## Operations with filtering

In [49]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]
  
dask_filtered = filter_data(dask_data)

In [50]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
#benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
#benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')

other = groupby_statistics(dask_filtered)
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
#benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)

filtered count took: 1.571927785873413 seconds
filtered count index length took: 1.6500351428985596 seconds
filtered mean took: 2.652149200439453 seconds
filtered standard deviation took: 3.2502121925354004 seconds
filtered mean of columns addition took: 2.445396661758423 seconds
filtered addition of columns took: 4.6458117961883545 seconds
filtered mean of columns multiplication took: 2.4689013957977295 seconds
filtered multiplication of columns took: 4.786742210388184 seconds
filtered value counts took: 2.9803638458251953 seconds
filtered groupby statistics took: 3.429048538208008 seconds
filtered join count took: 2.0234270095825195 seconds


2.0234270095825195

In [51]:
client.restart()

# Koalas

## Preparation

In [4]:
koalas_data = ks.read_parquet('../../Data/yellow_tripdata_2023-02.parquet')

koalas_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

AssertionError: [dtype('int32'), dtype('<M8[us]'), dtype('<M8[us]'), dtype('int64'), dtype('float64'), dtype('int64'), dtype('O'), dtype('int32'), dtype('int32'), dtype('int64'), dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64'), dtype('float64')]

## Standard operations

In [None]:
def read_file_parquet(df=None):
    return ks.read_parquet('../../Data/yellow_tripdata_2013-06.parquet')
  
def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean()

def standard_deviation(df):
    return df.fare_amount.std()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean()

def sum_columns(df):
    x = df.fare_amount + df.tip_amount
    x.to_pandas()
    return x

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean()

def product_columns(df):
    x = df.fare_amount * df.tip_amount
    x.to_pandas()
    return x

def value_counts(df):
    val_counts = df.fare_amount.value_counts()
    val_counts.to_pandas()
    return val_counts
  
def complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2)
    ret.to_pandas()
    return ret
  
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2 - theta_1) / 2 * np.pi / 180) ** 2
           + np.cos(theta_1 * np.pi / 180) * np.cos(theta_2 * np.pi / 180) * np.sin((phi_2 - phi_1) / 2 * np.pi / 180) ** 2)
    ret = np.multiply(np.arctan2(np.sqrt(temp), np.sqrt(1-temp)),2) 
    return ret.mean()
  
def groupby_statistics(df):
    gb = df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    )
    gb.to_pandas()
    return gb
  
other = ks.DataFrame(groupby_statistics(koalas_data).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True))

def join_data(df, other):
    ret = df.merge(other.spark.hint("broadcast"), left_index=True, right_index=True)
    ret.to_pandas()
    return ret

In [None]:
benchmark(read_file_parquet, df=None, benchmarks=koalas_benchmarks, name='read file')
benchmark(count, df=koalas_data, benchmarks=koalas_benchmarks, name='count')
benchmark(count_index_length, df=koalas_data, benchmarks=koalas_benchmarks, name='count index length')
benchmark(mean, df=koalas_data, benchmarks=koalas_benchmarks, name='mean')
benchmark(standard_deviation, df=koalas_data, benchmarks=koalas_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=koalas_data, benchmarks=koalas_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=koalas_data, benchmarks=koalas_benchmarks, name='value counts')
benchmark(complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='complex arithmetic ops')
benchmark(mean_of_complicated_arithmetic_operation, df=koalas_data, benchmarks=koalas_benchmarks, name='mean of complex arithmetic ops')
benchmark(groupby_statistics, df=koalas_data, benchmarks=koalas_benchmarks, name='groupby statistics')
benchmark(join_count, koalas_data, benchmarks=koalas_benchmarks, name='join count', other=other)
benchmark(join_data, koalas_data, benchmarks=koalas_benchmarks, name='join', other=other)

## Operations with filtering

In [None]:
expr_filter = (koalas_data.tip_amt >= 1) & (koalas_data.tip_amt <= 5)

def filter_data(df):
    return df[expr_filter]

koalas_filtered = filter_data(koalas_data)

In [None]:
benchmark(count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count')
benchmark(count_index_length, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered count index length')
benchmark(mean, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean')
benchmark(standard_deviation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=koalas_filtered, benchmarks=koalas_benchmarks, name='filtered multiplication of columns')
benchmark(mean_of_complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered mean of complex arithmetic ops')
benchmark(complicated_arithmetic_operation, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, koalas_filtered, benchmarks=koalas_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered groupby statistics')

other = ks.DataFrame(groupby_statistics(koalas_filtered).to_pandas())
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
benchmark(join_data, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join', other=other)
benchmark(join_count, koalas_filtered, benchmarks=koalas_benchmarks, name='filtered join count', other=other)

# Result

In [None]:
koalas_res_temp = get_results(koalas_benchmarks).set_index('task')
dask_res_temp = get_results(dask_benchmarks).set_index('task')
df = pd.concat([koalas_res_temp.duration, dask_res_temp.duration], axis=1, keys=['koalas', 'dask'])

In [None]:
from datetime import datetime

filename = '/dbfs/FileStore/koalas-benchmark-no-parquet-cache/single_node_' + datetime.now().strftime("%H%M%S")
print(filename)

df.to_parquet(filename)