In [0]:
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder \
    .appName("YourAppName") \
    .getOrCreate()

In [0]:
spark.conf.set("spark.databricks.io.cache.enabled", "false")
print("spark.databricks.io.cache.enabled is %s" % spark.conf.get("spark.databricks.io.cache.enabled"))

spark.databricks.io.cache.enabled is false


In [0]:
%pip install dask[complete]

Python interpreter will be restarted.
Collecting dask[complete]
  Downloading dask-2024.5.2-py3-none-any.whl (1.2 MB)
Collecting toolz>=0.10.0
  Downloading toolz-0.12.1-py3-none-any.whl (56 kB)
Collecting importlib-metadata>=4.13.0
  Downloading importlib_metadata-7.1.0-py3-none-any.whl (24 kB)
Collecting click>=8.1
  Downloading click-8.1.7-py3-none-any.whl (97 kB)
Collecting fsspec>=2021.09.0
  Downloading fsspec-2024.5.0-py3-none-any.whl (316 kB)
Collecting partd>=1.2.0
  Downloading partd-1.4.2-py3-none-any.whl (18 kB)
Collecting cloudpickle>=1.5.0
  Downloading cloudpickle-3.0.0-py3-none-any.whl (20 kB)
Collecting pyyaml>=5.3.1
  Downloading PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (738 kB)
Collecting lz4>=4.3.2
  Downloading lz4-4.3.3-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
Collecting zipp>=0.5
  Downloading zipp-3.19.1-py3-none-any.whl (9.0 kB)
Collecting locket
  Downloading locket-1.0.0-py2.py3-none-any.whl (4.4 kB)
Colle

In [0]:
%pip install pyarrow==10.0.1

Python interpreter will be restarted.
Collecting pyarrow==10.0.1
  Downloading pyarrow-10.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (35.9 MB)
Installing collected packages: pyarrow
  Attempting uninstall: pyarrow
    Found existing installation: pyarrow 7.0.0
    Not uninstalling pyarrow at /databricks/python3/lib/python3.9/site-packages, outside environment /local_disk0/.ephemeral_nfs/envs/pythonEnv-e5fa1c31-e351-4056-a9dc-f3f4a9c54411
    Can't uninstall 'pyarrow'. No files were found to uninstall.
Successfully installed pyarrow-10.0.1
Python interpreter will be restarted.


In [0]:
import pandas as pd
import numpy as np
#import databricks.koalas as ks
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster

print('pandas version: %s' % pd.__version__)

print('numpy version: %s' % np.__version__)

#print('koalas version: %s' % ks.__version__)

import dask
print('dask version: %s' % dask.__version__)

import pyarrow
print('pyarrow version: %s' % pyarrow.__version__)

import pyspark
print('pyspark version: %s' % pyspark.__version__)


import time

client = Client()

def benchmark(f, df, benchmarks, name, **kwargs):
    """Benchmark the given function against the given DataFrame.
    
    Parameters
    ----------
    f: function to benchmark
    df: data frame
    benchmarks: container for benchmark results
    name: task name
    
    Returns
    -------
    Duration (in seconds) of the given operation
    """
    start_time = time.time()
    ret = f(df, **kwargs)
    benchmarks['duration'].append(time.time() - start_time)
    benchmarks['task'].append(name)
    print(f"{name} took: {benchmarks['duration'][-1]} seconds")
    return benchmarks['duration'][-1]

def get_results(benchmarks):
    """Return a pandas DataFrame containing benchmark results."""
    return pd.DataFrame.from_dict(benchmarks)

pandas version: 2.2.2
numpy version: 1.26.4
dask version: 2024.5.2
pyarrow version: 10.0.1
pyspark version: 3.3.2.dev0


In [0]:
filenames = [f"/FileStore/tables/yellow_tripdata_2023_0{i}.parquet" for i in range(1, 6)]

dfs = []
for filename in filenames:
    # df = pd.read_parquet(filename)
    df = spark.read.format('parquet').options(header='true').load(filename).toPandas()

    if 'airport_fee' in df.columns:
        df.rename(columns={'airport_fee': 'Airport_fee'}, inplace=True)
    df_dask = dd.from_pandas(df, npartitions=3)

    dfs.append(df_dask)

# pandas_data = pd.concat(dfs, ignore_index=True)
dask_data = dd.concat(dfs)

In [0]:
len(dask_data)

Out[3]: 16186386

In [0]:

dask_benchmarks = {
    'duration': [],  # in seconds
    'task': [],
}

In [0]:
def read_file_parquet(df=None):
    return dd.read_parquet("/FileStore/tables/yellow_tripdata_2023_01.parquet")
  
def count(df=None):
    return len(df)

def count_index_length(df=None):
    return len(df.index)

def mean(df):
    return df.fare_amount.mean().compute()

def standard_deviation(df):
    return df.fare_amount.std().compute()

def mean_of_sum(df):
    return (df.fare_amount + df.tip_amount).mean().compute()

def sum_columns(df):
    return (df.fare_amount + df.tip_amount).compute()

def mean_of_product(df):
    return (df.fare_amount * df.tip_amount).mean().compute()

def product_columns(df):
    return (df.fare_amount * df.tip_amount).compute()
  
def value_counts(df):
    return df.fare_amount.value_counts().compute()
  
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean().compute()
  
def complicated_arithmetic_operation(df):
    theta_1 = df.start_lon
    phi_1 = df.start_lat
    theta_2 = df.end_lon
    phi_2 = df.end_lat
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.compute()
  
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg(
      {
        'fare_amount': ['mean', 'std'], 
        'tip_amount': ['mean', 'std']
      }
    ).compute()
# other = groupby_statistics(dask_data)
# other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True).compute()

In [0]:
#benchmark(read_file_parquet, df=None, benchmarks=dask_benchmarks, name='read file')
benchmark(count, df=dask_data, benchmarks=dask_benchmarks, name='count')
benchmark(count_index_length, df=dask_data, benchmarks=dask_benchmarks, name='count index length')
benchmark(mean, df=dask_data, benchmarks=dask_benchmarks, name='mean')
benchmark(standard_deviation, df=dask_data, benchmarks=dask_benchmarks, name='standard deviation')
benchmark(mean_of_sum, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns addition')
benchmark(sum_columns, df=dask_data, benchmarks=dask_benchmarks, name='addition of columns')
benchmark(mean_of_product, df=dask_data, benchmarks=dask_benchmarks, name='mean of columns multiplication')
benchmark(product_columns, df=dask_data, benchmarks=dask_benchmarks, name='multiplication of columns')
benchmark(value_counts, df=dask_data, benchmarks=dask_benchmarks, name='value counts')
# No column for this
# benchmark(mean_of_complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='mean of complex arithmetic ops')
# benchmark(complicated_arithmetic_operation, df=dask_data, benchmarks=dask_benchmarks, name='complex arithmetic ops')
benchmark(groupby_statistics, df=dask_data, benchmarks=dask_benchmarks, name='groupby statistics')
# benchmark(join_count, dask_data, benchmarks=dask_benchmarks, name='join count', other=other)
# benchmark(join_data, dask_data, benchmarks=dask_benchmarks, name='join', other=other) # cant join

count took: 0.058249711990356445 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


count index length took: 36.14887094497681 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


mean took: 2.394890069961548 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


standard deviation took: 1.5536243915557861 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


mean of columns addition took: 2.125427722930908 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


addition of columns took: 3.7850420475006104 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


mean of columns multiplication took: 1.7951135635375977 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


multiplication of columns took: 3.9497592449188232 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


value counts took: 2.5369818210601807 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


groupby statistics took: 4.098268747329712 seconds
Out[29]: 4.098268747329712

Operations with filtering

In [0]:
expr_filter = (dask_data.tip_amount >= 1) & (dask_data.tip_amount <= 5)

def filter_data(df):
    return df[expr_filter]
  
dask_filtered = filter_data(dask_data)

In [0]:
benchmark(count, dask_filtered, benchmarks=dask_benchmarks, name='filtered count')
benchmark(count_index_length, dask_filtered, benchmarks=dask_benchmarks, name='filtered count index length')
benchmark(mean, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean')
benchmark(standard_deviation, dask_filtered, benchmarks=dask_benchmarks, name='filtered standard deviation')
benchmark(mean_of_sum, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns addition')
benchmark(sum_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered addition of columns')
benchmark(mean_of_product, dask_filtered, benchmarks=dask_benchmarks, name ='filtered mean of columns multiplication')
benchmark(product_columns, df=dask_filtered, benchmarks=dask_benchmarks, name='filtered multiplication of columns')
#benchmark(mean_of_complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered mean of complex arithmetic ops')
#benchmark(complicated_arithmetic_operation, dask_filtered, benchmarks=dask_benchmarks, name='filtered complex arithmetic ops')
benchmark(value_counts, dask_filtered, benchmarks=dask_benchmarks, name ='filtered value counts')
benchmark(groupby_statistics, dask_filtered, benchmarks=dask_benchmarks, name='filtered groupby statistics')

# other = groupby_statistics(dask_filtered)
# other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])

# benchmark(join_count, dask_filtered, benchmarks=dask_benchmarks, name='filtered join count', other=other)
# benchmark(join_data, dask_filtered, benchmarks=dask_benchmarks, name='filtered join', other=other)

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered count took: 44.74642515182495 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered count index length took: 50.05057716369629 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered mean took: 3.3873913288116455 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered standard deviation took: 2.5518338680267334 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered mean of columns addition took: 2.100897789001465 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered addition of columns took: 4.536018371582031 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered mean of columns multiplication took: 2.053715705871582 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered multiplication of columns took: 3.2464606761932373 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered value counts took: 3.4203221797943115 seconds


This may cause some slowdown.
Consider scattering data ahead of time and using futures.


filtered groupby statistics took: 4.089606046676636 seconds
Out[31]: 4.089606046676636

In [0]:
dask_res_temp = get_results(dask_benchmarks).set_index('task')
dask_res_temp

Unnamed: 0_level_0,duration
task,Unnamed: 1_level_1
count,0.05825
count index length,36.148871
mean,2.39489
standard deviation,1.553624
mean of columns addition,2.125428
addition of columns,3.785042
mean of columns multiplication,1.795114
multiplication of columns,3.949759
value counts,2.536982
groupby statistics,4.098269
