# Install packages

In [5]:
%%capture
%%bash
pip install -U pip dask[complete] numpy fsspec>=0.3.3 tqdm pyarrow

In [1]:
import gc
import numpy as np
import pandas as pd
import warnings
import time
import gc
import os

instance_type = 'c5d4xlarge' # change this
results_bucket = f"s3://xdss-benchmarks/benchmarks" # change this

name = 'dask'
data_path = 'datasets/taxi_parquet/'
output_file = f'{name}_{instance_type}.csv'
results_path = f"results/{output_file}"
results_bucket = f"{results_bucket}/{output_file}" 
benchmarks = {
    'run':[],
    'duration': [],
    'task': []   
}

long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90


def get_results(benchmarks=benchmarks):
    return pd.DataFrame.from_dict(benchmarks, orient='index').T

def persist():
    gc.collect()
    get_results(benchmarks).to_csv(results_path)
    os.system(f"aws s3 cp {results_path} {results_bucket}")
    
def benchmark(f, df, name, **kwargs):    
    for i in range(2):
        start_time = time.time()
        ret = f(df, **kwargs)
        benchmarks['duration'].append(time.time()-start_time)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]
          
def add_nan(name):
    for i in range(2):
        benchmarks['duration'].append(np.nan)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]

def reload():
    df = pd.read_csv(f"results/{output_file}").drop(['Unnamed: 0'],axis=1)
    benchmarks = df.to_dict(orient='list')
    return benchmarks

print(f"We test every benchmark twice and save both results")

We test every benchmark twice and save both results


# Benchmark

In [2]:
import dask.dataframe as dd
import numpy as np
from dask.distributed import Client, LocalCluster 

# I found that running this line makes for better results
client = Client(threads_per_worker=1)



# Load data
data = dd.read_parquet(data_path, engine='pyarrow')
size = len(data.vendor_id)
print(f"size: {size} with {len(data.columns)} columns and {data.npartitions} partitions")

size: 1173057928 with 18 columns and 1174 partitions


In [3]:
def read_file_parquet(df=None):
    return dd.read_parquet(data_path, engine='pyarrow')

benchmark(read_file_parquet, df=data, name='read_file')

read_file took: 0.8108847141265869


0.8108847141265869

In [4]:
def count(df=None):
    # there is a dask bug - len(df) takes 20X time longer
    return len(df.vendor_id) 

benchmark(count, df=data, name='count')

count took: 7.938486099243164


7.938486099243164

In [5]:
def mean(df):
    return df.fare_amount.mean().compute()

benchmark(mean, df=data, name='mean')

mean took: 5.056631803512573


5.056631803512573

In [6]:
def standard_deviation(df):
    return df.fare_amount.std().compute()

benchmark(standard_deviation, df=data, name='standard deviation')

standard deviation took: 5.378662586212158


5.378662586212158

In [7]:
def mean_of_sum(df):
    return (df.fare_amount + df.trip_distance).mean().compute()

benchmark(mean_of_sum, df=data, name='sum columns mean')

sum columns mean took: 6.816420078277588


6.816420078277588

In [8]:
# lazy evaulation - instant
def sum_columns(df):
    return (df.fare_amount + df.trip_distance)

benchmark(sum_columns, df=data, name='sum columns')

sum columns took: 0.0010232925415039062


0.0010232925415039062

In [9]:
def mean_of_product(df):
    return (df.fare_amount * df.trip_distance).mean().compute()

benchmark(mean_of_product, df=data, name='product columns mean')

product columns mean took: 6.626446723937988


6.626446723937988

In [10]:
# lazy evaulation - instant
def product_columns(df):
    return df.fare_amount * df.trip_distance

benchmark(product_columns, df=data, name='product columns')

product columns took: 0.0011837482452392578


0.0011837482452392578

In [12]:
def lazy_mean(df):
    df['lazy'] = df.fare_amount * df.trip_distance
    return df['lazy'].mean().compute()
    
benchmark(lazy_mean, df=data, name='lazy evaluation')  

lazy evaluation took: 153.01910591125488


153.01910591125488

# Restart and reload the data is much faster... 
I got results between 4X - 75X faster by using this.    
`client.restart()` did not match the effect of retarting the kenrel.

In [1]:
import gc
import numpy as np
import pandas as pd
import warnings
import time
import gc
import os
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster 


instance_type = 'c5d4xlarge' # change this
results_bucket = f"s3://xdss-benchmarks/benchmarks" # change this

name = 'dask'
data_path = 'datasets/taxi_parquet/'
output_file = f'{name}_{instance_type}.csv'
results_path = f"results/{output_file}"
results_bucket = f"{results_bucket}/{output_file}" 
benchmarks = {
    'run':[],
    'duration': [],
    'task': []   
}

long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90


def get_results(benchmarks=benchmarks):
    return pd.DataFrame.from_dict(benchmarks, orient='index').T

def persist():
    gc.collect()
    get_results(benchmarks).to_csv(results_path)
    os.system(f"aws s3 cp {results_path} {results_bucket}")
    
def benchmark(f, df, name, **kwargs):    
    for i in range(2):
        start_time = time.time()
        ret = f(df, **kwargs)
        benchmarks['duration'].append(time.time()-start_time)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]
          
def add_nan(name):
    for i in range(2):
        benchmarks['duration'].append(np.nan)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]

def reload():
    df = pd.read_csv(f"results/{output_file}").drop(['Unnamed: 0'],axis=1)
    benchmarks = df.to_dict(orient='list')
    return benchmarks


benchmarks = reload()
client = Client(threads_per_worker=1)

# Load data
data = dd.read_parquet(data_path, engine='pyarrow')
size = len(data.vendor_id)
print(f"size: {size} with {len(data.columns)} columns and {data.npartitions} partitions")

size: 1173057928 with 18 columns and 1174 partitions


In [2]:
def value_counts(df):
    return df.passenger_count.value_counts().compute()

benchmark(value_counts, df=data, name='value counts')

value counts took: 5.161530494689941


5.161530494689941

In [2]:
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.pickup_longitude
    phi_1 = df.pickup_latitude
    theta_2 = df.dropoff_longitude
    phi_2 = df.dropoff_latitude
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean().compute()

benchmark(mean_of_complicated_arithmetic_operation, df=data, name='arithmetic operation mean')

arithmetic operation mean took: 36.519623041152954


36.519623041152954

In [3]:
def complicated_arithmetic_operation(df):
    theta_1 = df.pickup_longitude
    phi_1 = df.pickup_latitude
    theta_2 = df.dropoff_longitude
    phi_2 = df.dropoff_latitude
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean()

benchmark(complicated_arithmetic_operation, df=data, name='arithmetic operation')

arithmetic operation took: 0.019608259201049805


0.019608259201049805

# Restart for best results
This restart got me a 7X faster results for group-by

In [1]:
import gc
import numpy as np
import pandas as pd
import warnings
import time
import gc
import os
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster 


instance_type = 'c5d4xlarge' # change this
results_bucket = f"s3://xdss-benchmarks/benchmarks" # change this

name = 'dask'
data_path = 'datasets/taxi_parquet/'
output_file = f'{name}_{instance_type}.csv'
results_path = f"results/{output_file}"
results_bucket = f"{results_bucket}/{output_file}" 
benchmarks = {
    'run':[],
    'duration': [],
    'task': []   
}

long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90


def get_results(benchmarks=benchmarks):
    return pd.DataFrame.from_dict(benchmarks, orient='index').T

def persist():
    gc.collect()
    get_results(benchmarks).to_csv(results_path)
    os.system(f"aws s3 cp {results_path} {results_bucket}")
    
def benchmark(f, df, name, **kwargs):    
    for i in range(2):
        start_time = time.time()
        ret = f(df, **kwargs)
        benchmarks['duration'].append(time.time()-start_time)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]
          
def add_nan(name):
    for i in range(2):
        benchmarks['duration'].append(np.nan)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]

def reload():
    df = pd.read_csv(f"results/{output_file}").drop(['Unnamed: 0'],axis=1)
    benchmarks = df.to_dict(orient='list')
    return benchmarks


benchmarks = reload()
client = Client(threads_per_worker=1)

# Load data
data = dd.read_parquet(data_path, engine='pyarrow')
size = len(data.vendor_id)
print(f"size: {size} with {len(data.columns)} columns and {data.npartitions} partitions")

size: 1173057928 with 18 columns and 1174 partitions


In [3]:
def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg({'fare_amount': ['mean', 'std'], 
                                               'tip_amount': ['mean', 'std']
                                              }).compute()

benchmark(groupby_statistics, df=data, name='groupby statistics')

groupby statistics took: 104.00652551651001


104.00652551651001

In [None]:
# For join before restarting
other = data.groupby(by='passenger_count').agg({'fare_amount': ['mean', 'std'], 'tip_amount': ['mean', 'std']}).compute()
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
other.to_parquet('datasets/other.parquet')

# Restart again was the only way to finish this task
* Removing this line `client = Client(threads_per_worker=1)` is also important

### [Join best practice](https://docs.dask.org/en/latest/dataframe-best-practices.html#joins)

In [1]:
import gc
import numpy as np
import pandas as pd
import warnings
import time
import gc
import os
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster 


instance_type = 'c5d4xlarge' # change this
results_bucket = f"s3://xdss-benchmarks/benchmarks" # change this

name = 'dask'
data_path = 'datasets/taxi_parquet/'
output_file = f'{name}_{instance_type}.csv'
results_path = f"results/{output_file}"
results_bucket = f"{results_bucket}/{output_file}" 
benchmarks = {
    'run':[],
    'duration': [],
    'task': []   
}

long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90


def get_results(benchmarks=benchmarks):
    return pd.DataFrame.from_dict(benchmarks, orient='index').T

def persist():
    gc.collect()
    get_results(benchmarks).to_csv(results_path)
    os.system(f"aws s3 cp {results_path} {results_bucket}")
    
def benchmark(f, df, name, **kwargs):    
    for i in range(2):
        start_time = time.time()
        ret = f(df, **kwargs)
        benchmarks['duration'].append(time.time()-start_time)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]
          
def add_nan(name):
    for i in range(2):
        benchmarks['duration'].append(np.nan)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]

def reload():
    df = pd.read_csv(f"results/{output_file}").drop(['Unnamed: 0'],axis=1)
    benchmarks = df.to_dict(orient='list')
    return benchmarks


benchmarks = reload()
#client = Client(threads_per_worker=1)

# Load data
data = dd.read_parquet(data_path, engine='pyarrow')
size = len(data.vendor_id)
other = pd.read_parquet('datasets/other.parquet')
print(f"size: {size} with {len(data.columns)} columns and {data.npartitions} partitions")

size: 1173057928 with 18 columns and 1174 partitions


In [2]:
def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

benchmark(join_count, data, name='join count', other=other)

join count took: 646.4160277843475


646.4160277843475

In [3]:
# crashes
def join_data(df, other):
    return dd.merge(df, other, left_index=True, right_index=True)

add_nan('join')
# benchmark(join_count, data, name='join', other=other)

join took: nan


nan

## Filtered data (and restart the kernel again)

In [1]:
import gc
import numpy as np
import pandas as pd
import warnings
import time
import gc
import os
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster 


instance_type = 'c5d4xlarge' # change this
results_bucket = f"s3://xdss-benchmarks/benchmarks" # change this

name = 'dask'
data_path = 'datasets/taxi_parquet/'
output_file = f'{name}_{instance_type}.csv'
results_path = f"results/{output_file}"
results_bucket = f"{results_bucket}/{output_file}" 
benchmarks = {
    'run':[],
    'duration': [],
    'task': []   
}

long_min = -74.05
long_max = -73.75
lat_min = 40.58
lat_max = 40.90


def get_results(benchmarks=benchmarks):
    return pd.DataFrame.from_dict(benchmarks, orient='index').T

def persist():
    gc.collect()
    get_results(benchmarks).to_csv(results_path)
    os.system(f"aws s3 cp {results_path} {results_bucket}")
    
def benchmark(f, df, name, **kwargs):    
    for i in range(2):
        start_time = time.time()
        ret = f(df, **kwargs)
        benchmarks['duration'].append(time.time()-start_time)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]
          
def add_nan(name):
    for i in range(2):
        benchmarks['duration'].append(np.nan)
        benchmarks['task'].append(name)
        benchmarks['run'].append(i+1)
    persist()
    print(f"{name} took: {benchmarks['duration'][-1]}")
    return benchmarks['duration'][-1]

def reload():
    df = pd.read_csv(f"results/{output_file}").drop(['Unnamed: 0'],axis=1)
    benchmarks = df.to_dict(orient='list')
    return benchmarks
          
def mean(df):
    return df.fare_amount.mean().compute()

def standard_deviation(df):
    return df.fare_amount.std().compute()

def mean_of_sum(df):
    return (df.fare_amount + df.trip_distance).mean().compute()

def sum_columns(df):
    return (df.fare_amount + df.trip_distance)

def mean_of_product(df):
    return (df.fare_amount * df.trip_distance).mean().compute()


def product_columns(df):
    return df.fare_amount * df.trip_distance
          
def mean_of_complicated_arithmetic_operation(df):
    theta_1 = df.pickup_longitude
    phi_1 = df.pickup_latitude
    theta_2 = df.dropoff_longitude
    phi_2 = df.dropoff_latitude
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean().compute()

def complicated_arithmetic_operation(df):
    theta_1 = df.pickup_longitude
    phi_1 = df.pickup_latitude
    theta_2 = df.dropoff_longitude
    phi_2 = df.dropoff_latitude
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    ret = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return ret.mean()

def value_counts(df):
    return df.passenger_count.value_counts().compute()

def groupby_statistics(df):
    return df.groupby(by='passenger_count').agg({'fare_amount': ['mean', 'std'], 
                                               'tip_amount': ['mean', 'std']
                                              }).compute()

def join_count(df, other):
    return len(dd.merge(df, other, left_index=True, right_index=True))

          
benchmarks = reload()
client = Client(threads_per_worker=1)

# Load data
data = dd.read_parquet(data_path, engine='pyarrow')
size = len(data.vendor_id)
other = pd.read_parquet('datasets/other.parquet')
print(f"size: {size} with {len(data.columns)} columns and {data.npartitions} partitions")

size: 1173057928 with 18 columns and 1174 partitions


In [3]:
print(f"Prepare filtered data and deleted {gc.collect()} MB")
expr_filter = (data.pickup_longitude > long_min)  & (data.pickup_longitude < long_max) & \
                  (data.pickup_latitude > lat_min)    & (data.pickup_latitude < lat_max) & \
                  (data.dropoff_longitude > long_min) & (data.dropoff_longitude < long_max) & \
                  (data.dropoff_latitude > lat_min)   & (data.dropoff_latitude < lat_max)

def filter_data(df):
    return df[expr_filter]

benchmark(filter_data, data, name='filter data')

Prepare filtered data and deleted 388 MB
filter data took: 0.00021028518676757812


0.00021028518676757812

In [4]:
# https://docs.dask.org/en/latest/dataframe-best-practices.html
filtered = filter_data(data)
nb_partitions = int(data.npartitions//(len(filtered.vendor_id)/size))

filtered = filtered.repartition(npartitions=nb_partitions)#.persist()
del data
print(f"cleaned {gc.collect()} mb")

cleaned 248 mb


In [None]:
benchmark(mean, filtered, name='filtered mean')
benchmark(standard_deviation, filtered, name='filtered standard deviation')
benchmark(mean_of_sum, filtered, name ='filtered sum columns mean')
benchmark(sum_columns, df=filtered, name='filtered sum columns')
benchmark(mean_of_product, filtered, name ='filtered product columns mean')
benchmark(product_columns, df=filtered, name='filtered product columns')
benchmark(mean_of_complicated_arithmetic_operation, filtered, name='filtered arithmetic operation mean')
benchmark(complicated_arithmetic_operation, filtered, name='filtered arithmetic operation')
benchmark(value_counts, filtered, name ='filtered value counts')
benchmark(groupby_statistics, filtered, name='filtered groupby statistics')
other = filtered.groupby(by='passenger_count').agg({'fare_amount': ['mean', 'std'], 'tip_amount': ['mean', 'std']}).compute()
other.columns = pd.Index([e[0]+'_' + e[1] for e in other.columns.tolist()])
add_nan('filtered join')
benchmark(join_count, filtered, name='filtered join count', other=other)
print(name)
get_results(benchmarks)