# Dask Acceleration vs Pandas

This notebook explores speed advantages realized by using the Dask library over Pandas for dataframe operations in Python.

| Tech | Version |
| --- | --- |
| Python | 3.6.9 |
| jupyter-notebook | 6.0.1 |
| pandas | 1.0.3 |
| dask | 2.17.2 |
| x64 - based PC | i7 - 2.60 GHz |
| RAM | 16.0 GB |

Hat - tip to Saturn Cloud's [Your Practical Guide to Dask](https://www.saturncloud.io/s/practical-guide-to-dask/).

In [1]:
# importing library to measure execution time
import timeit
from timeit import default_timer as timer

# starting timer for complete notebook execution
notebook_start = timer()

In [2]:
# importing required libraries
import random
import pandas as pd
import os # for directory operations

__Randomize stock data in Python and save the data as a Pandas dataframe.__

In [3]:
# instantiating 1M random stock records
num_rows = 1000000

symbols = ["AAPL", "AMD", "GOOG", "MSFT", "NVDA"]
prices = [random.randint(1, 500) for _ in range(50)]

In [4]:
def get_stock_data(symbols, prices):
    '''
    function to generate random stock data from the
    `symbols` list and the randomized `prices` list
    '''
    return {"symbol": random.sample(symbols, 1)[0],
            "price": random.sample(prices, 1)[0]}

In [5]:
# using the function to generate stock data for the
# number of rows instantiated in `num_rows`
stock_data = [get_stock_data(symbols, prices) for _ in range(num_rows)]

# instantiate data as a pandas dataframe
stock_df = pd.DataFrame(stock_data,
                        columns=["symbol", "price"])

__Export stock data to a csv file.__

In [6]:
# save `stock_df` as a csv file in the data subdirectory
if not os.path.exists('data'):
    os.makedir('data')

# prefix filename with '__rc__' to .gitignore
stock_df.to_csv("data/_rc_stock_data.csv")

# preview the dataframe
stock_df.head()

Unnamed: 0,symbol,price
0,GOOG,12
1,NVDA,166
2,AMD,403
3,MSFT,388
4,NVDA,103


## Load csv Data to a Dask Dataframe

In [7]:
import dask.dataframe as dd

# loading csv data to a dask dataframe
dask_df = dd.read_csv("data/_rc_stock_data.csv")

__Repartition dask data to sizes of 100MB-or-less, per [official documentation](https://docs.dask.org/en/latest/dataframe-best-practices.html#repartition-to-reduce-overhead)__.

In [8]:
# repartitioning dask dataframe from csv
dask_df = dask_df.repartition(partition_size="100MB")

# loading pandas dataframe from csv
pandas_df = pd.read_csv("data/_rc_stock_data.csv")

## Compute CPU Acceleration with Dask vs Pandas

### Calculating Mean

__Create functions to calculate mean for the same dataframe in each of the two libraries.__

In [9]:
def pandas_mean():
    '''Calculates mean using pandas
    ''' 
    pandas_df["price"].mean()
    
def dask_mean():
    '''Calculate means using dask
    ''' 
    dask_df["price"].mean()

__Compare Calculating Means (single iteration)__

In [10]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_mean, number=1), 5)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_mean, number=1), 5)
     )


pandas execution time:  0.0012
------------------------------

dask execution time:  0.00146


In [11]:
# computing and comparing task execution
start = timer()

# Calculate mean using pandas
pandas_df["price"].mean()

end = timer()

# print time elapsed in seconds
print("pandas execution time: ", round(end - start, 5))
print("-"*30)

start = timer()

# Calculate mean using Dask
dask_df["price"].mean()

end = timer()

# print time elapsed in seconds
print("\ndask execution time: ", round(end - start, 5))

pandas execution time:  0.00166
------------------------------

dask execution time:  0.00214


__Compare Calculating Means (10K iterations)__

In [12]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_mean, number=10000), 2)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_mean, number=10000), 2)
     )


pandas execution time:  8.72
------------------------------

dask execution time:  10.12


__Compare Calculating Means (100K iterations)__

In [13]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_mean, number=100000), 2)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_mean, number=100000), 2)
     )


pandas execution time:  84.52
------------------------------

dask execution time:  99.38


> Calculating means is about 17.6% slower over a large dataset, with dask on CPU.

### Filtering

__Create functions to filter dataframe in each of the two libraries.__

In [14]:
def pandas_filter():
    '''Filters by price using pandas
    ''' 
    pandas_df[pandas_df["price"] > 250]
    
def dask_filter():
    '''Filters by price using dask
    ''' 
    dask_df[dask_df["price"] > 250]

__Compare Filtering Dataframes (single iteration)__

In [15]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_filter, number=1), 5)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_filter, number=1), 5)
     )


pandas execution time:  0.01715
------------------------------

dask execution time:  0.00097


In [16]:
start_p = timer()
# Filtering by price in Pandas
pandas_df[pandas_df["price"] > 250]
end_p = timer()
pandas_time = round(end_p - start_p, 5)
print("pandas execution time: ", pandas_time)
print("-"*30)

start_d = timer()
# Filtering by price in Dask
dask_df[dask_df["price"] > 250]
end_d = timer()
dask_time = round(end_d - start_d, 5)
print("\ndask execution time: ", dask_time)

pandas execution time:  0.01508
------------------------------

dask execution time:  0.00107


__Compare Filtering Dataframes (10K iterations)__

In [18]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_filter, number=10000), 5)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_filter, number=10000), 5)
     )


pandas execution time:  134.14251
------------------------------

dask execution time:  6.87882


__Compare Filtering Dataframes (100K iterations)__

In [19]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_filter, number=100000), 5)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_filter, number=100000), 5)
     )


pandas execution time:  1322.65833
------------------------------

dask execution time:  69.662


> Filtering is about 94.7% faster over a large dataset, with dask on CPU.

### Adding

__Create functions to add dataframes in each of the two libraries.__

In [20]:
def pandas_add():
    '''Adds dataframes together using pandas
    ''' 
    pandas_df + pandas_df + pandas_df + pandas_df + pandas_df
    
def dask_add():
    '''Adds dataframes together using dask
    ''' 
    dask_df + dask_df + dask_df + dask_df + dask_df

__Compare Adding Dataframes (single iteration)__

In [21]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_add, number=1), 5)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_add, number=1), 5)
     )


pandas execution time:  0.41359
------------------------------

dask execution time:  0.02015


In [27]:
start_p = timer()
# adding big dataframes in pandas
pandas_df + pandas_df + pandas_df + pandas_df + pandas_df
end_p = timer()
pandas_time = round(end_p - start_p, 5)
print("pandas execution time: ", pandas_time)
print("-"*30)

start_d = timer()
# adding big dataframes in dask
dask_df + dask_df + dask_df + dask_df + dask_df
end_d = timer()
dask_time = round(end_d - start_d, 5)
print("\ndask execution time: ", dask_time)

# calculating dask speed improvement
print("-"*30, "\n")

print(round((pandas_time - dask_time) / pandas_time*100, 3), 
      "% increase in speed")

pandas execution time:  0.40339
------------------------------

dask execution time:  0.01968
------------------------------ 

95.121 % increase in speed


__Compare Adding Dataframes (10K iterations)__

In [24]:
print("\npandas execution time: ", round(
    timeit.timeit(pandas_add, number=10000), 5)
     )
print("-"*30)

print("\ndask execution time: ", round(
    timeit.timeit(dask_add, number=10000), 5)
     )


pandas execution time:  4088.36484
------------------------------

dask execution time:  200.67291


__Compare Adding Dataframes (100K iterations)__

In [None]:
# --time prohibitive--
# print("\npandas execution time: ", round(
#     timeit.timeit(pandas_add, number=100000), 5)
#      )
# print("-"*30)

# print("\ndask execution time: ", round(
#     timeit.timeit(dask_add, number=100000), 5)
#      )

> Adding 5 dataframes together is about 95\% faster over a large dataset, with dask on CPU.

======================================================

## Compute GPU Acceleration with Dask vs Pandas

__The cuDF library enables GPU acceleration for Pandas - Dask dataframe computation.__

In [28]:
# import cudf

# # reinstantiate `dask_df` for use with GPU backend
# dask_df = dask_df.map_partitions(cudf.from_pandas) 

### ======_cuDF no longer available via pip [or for Windows???]_ ======

__Resume in containerized environment.__

In [29]:
# ending timer for complete notebook execution
notebook_end = timer()

# calculating time for complete notebook execution
notebook_duration = notebook_end - notebook_start
print(round(notebook_duration, 2))

7413.23


In [30]:
7413/60

123.55