# High Performance Jupyter

## Introducing...Dask!

<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="400">

We will do the same analysis as [laptop.ipynb](laptop.ipynb) on the same machine, except we will be using a rougly 10x larger dataset. This notebook should execute on any machine with >4GB RAM.

Outputs here are from a 2019 Macbook Pro (6 cores, 32GB RAM)

Dask will work without intializing a distributed backend, but it's [recommended to do so](https://docs.dask.org/en/latest/setup/single-distributed.html) even on a laptop. To initialize, start a `Client` like so:

In [1]:
from dask.distributed import Client

client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:64865  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 4  Cores: 12  Memory: 34.36 GB


This creates a `LocalCluster`, which sets up a cluster environment using the cores on your machine.

It's also possible to create a cluster from the Dask JupyterLab extension. Click on the Dask logo on the left sidebar, then press "+NEW" under clusters. If you ran the cell above already, the extension can also discover your cluster if you press the magnifying glass at the top.

Now it's fun window arranging time! Click on all the different tiles in the Dask JupyterLab panel to see all the things you can monitor on your "cluster".

In [2]:
import pandas as pd
import numpy as np
import datetime
import s3fs
import warnings
warnings.simplefilter("ignore")

data_path = 's3://nyc-tlc/trip data'
seed = 42

# Load and explore data

We'll load data for all of 2019 and show how Dask is able to process the data even though it does not all fit in RAM.


In [3]:
# new dask imports
from dask import delayed
import dask.dataframe as dd

fs = s3fs.S3FileSystem(anon=True)
files_2019 = fs.glob('s3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv')

## The hard way

We could parallelize our for-loop that blew up in Part 1 using [dask.delayed](https://docs.dask.org/en/latest/delayed.html). `delayed` is really useful when you have some computation or functions that don't fit well into a dataframe or array processing paradigm.

> NOTE: There is a better way below! This is just for educational purposes.

In [4]:
%%time

# the @delayed decorator tells Dask to evaluate this function lazily
@delayed
def load_csv(f):
    return pd.read_csv(
        fs.open(f),
        parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    )

dfs = []
for f in files_2019:
    df = load_csv(f's3://{f}')
    dfs.append(df)
taxi = dd.from_delayed(dfs)  # from_delayed rather than pd.concat

CPU times: user 4.53 s, sys: 541 ms, total: 5.07 s
Wall time: 1min 28s


In [5]:
dfs

[Delayed('load_csv-b0f5af10-cb61-4713-a125-7bb63c254868'),
 Delayed('load_csv-1aff350c-d6f5-456c-bcae-81d157dc3d58'),
 Delayed('load_csv-83ad3bc3-c1f6-4ddb-b620-1eb8f1e3fb16'),
 Delayed('load_csv-7f665dd6-8fb9-448f-8683-3f8849e97244'),
 Delayed('load_csv-6ed5c24a-d8b8-488c-92d0-26db0e30c2c1'),
 Delayed('load_csv-a627c4ac-f77f-4800-b993-1156f18ed437'),
 Delayed('load_csv-3c07f5d9-921e-4ba6-8b48-f1e25de18c89'),
 Delayed('load_csv-bf56fc20-14d6-4181-a215-d1f7585231e4'),
 Delayed('load_csv-39942d6b-775e-4778-868b-59f6735b6421'),
 Delayed('load_csv-7346e05b-d3d0-4cc6-b092-79513324f4d3'),
 Delayed('load_csv-776c6fb8-729c-4d5c-8fdb-65d35116cc01'),
 Delayed('load_csv-3db35a97-0da9-4943-9079-04879ca6bef7')]

`compute()` tells Dask to execute the delayed function and return its result, which in this case is a Pandas dataframe.

Notice that this takes around the same amount of time it took to load one CSV into Pandas - that's because it is doing exactly the same thing!

In [6]:
%%time
df0 = dfs[0].compute()

CPU times: user 5.58 s, sys: 1.99 s, total: 7.57 s
Wall time: 1min 27s


In [7]:
df0.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1,2019-01-01 00:46:40,2019-01-01 00:53:20,1,1.5,1,N,151,239,1,7.0,0.5,0.5,1.65,0.0,0.3,9.95,
1,1,2019-01-01 00:59:47,2019-01-01 01:18:59,1,2.6,1,N,239,246,1,14.0,0.5,0.5,1.0,0.0,0.3,16.3,
2,2,2018-12-21 13:48:30,2018-12-21 13:52:40,3,0.0,1,N,236,236,1,4.5,0.5,0.5,0.0,0.0,0.3,5.8,
3,2,2018-11-28 15:52:25,2018-11-28 15:55:45,5,0.0,1,N,193,193,2,3.5,0.5,0.5,0.0,0.0,0.3,7.55,
4,2,2018-11-28 15:56:57,2018-11-28 15:58:33,5,0.0,2,N,193,193,2,52.0,0.0,0.5,0.0,0.0,0.3,55.55,


In [8]:
type(df0)

pandas.core.frame.DataFrame

The `taxi` object was created from `dd.from_delayed`, which is a handy function that will take a collection of delayed objects that return Pandas dataframes, and combine them into one big Dask dataframe.

In [9]:
taxi

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=12,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,int64,datetime64[ns],datetime64[ns],int64,float64,int64,object,int64,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


Notice that the output from printing `taxi` is much different than what you get from a Pandas dataframe. This is because a Dask dataframe is a collection of pointers to Pandas dataframes. The dataframes don't get loaded until you tell Dask to perform some action, such as getting the row count:

## Okay, let's do it the better way

Dask already has a function for loading a collection of CSVs into one big dataframe, and it has the same API as `pandas.dataframe.read_csv`.

Notice the glob syntax for the filename argument, which tells Dask to load all files with this pattern.

In [10]:
%%time

taxi = dd.read_csv(
    f'{data_path}/yellow_tripdata_2019-*.csv',
    assume_missing=True,
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
)

CPU times: user 358 ms, sys: 43.7 ms, total: 402 ms
Wall time: 1.94 s


In [11]:
taxi

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=127,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,datetime64[ns],datetime64[ns],float64,float64,float64,object,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


<br>The cells above execute super fast because Dask really hasn't done anything yet. Notice too how `dd.read_csv` introduced more partitions than when we manually constructed the dataframe using `dask.delayed`. This is good! More partitions means more parallelism and opportunity for faster computation.

Also note the calls to `.compute()` in the cells below. This tells Dask to trigger computation of the task graph. `len()` is one exception where Dask does trigger execution and gives you results immediately.

In [12]:
%%time
print(f"Row count: {len(taxi)}")
print(f"Size in GB: {taxi.memory_usage(deep=True).sum().compute() / 1e9}")

Row count: 84399019
Size in GB: 16.367014316
CPU times: user 1min 1s, sys: 7.08 s, total: 1min 8s
Wall time: 21min 51s


<br>
This takes a while, but it finishes! This would likely run out of memory when trying to do it with Pandas.

In [13]:
%%time
np.round(taxi.describe().compute(), 3).T

CPU times: user 2min 6s, sys: 7.21 s, total: 2min 14s
Wall time: 11min 18s


Unnamed: 0,count,mean,std,min,25%,50%,75%,max
VendorID,84152418.0,1.645,0.498,1.0,1.0,2.0,2.0,4.0
passenger_count,84152418.0,1.563,1.208,0.0,1.0,1.0,2.0,9.0
trip_distance,84399019.0,3.001,8.091,-37264.53,1.07,1.93,8.82,45977.22
RatecodeID,84152418.0,1.061,0.76,1.0,1.0,1.0,1.0,99.0
PULocationID,84399019.0,163.158,66.016,1.0,132.0,162.0,234.0,265.0
DOLocationID,84399019.0,161.353,70.251,1.0,116.0,163.0,236.0,265.0
payment_type,84152418.0,1.289,0.479,1.0,1.0,1.0,2.0,5.0
fare_amount,84399019.0,13.344,174.375,-1856.0,7.0,11.0,32.04,943274.8
extra,84399019.0,1.087,1.249,-60.0,0.0,1.0,3.0,535.38
mta_tax,84399019.0,0.495,0.067,-0.5,0.5,0.5,0.5,212.42


# Feature engineering

Same feature engineering from [laptop.ipynb](laptop.ipynb), using the same code!

In [14]:
numeric_feat = [
    'pickup_weekday', 
    'pickup_hour', 
    'pickup_week_hour', 
    'pickup_minute', 
    'passenger_count',
]
categorical_feat = [
    'PULocationID', 
    'DOLocationID',
]
features = numeric_feat + categorical_feat
y_col = 'high_tip'

In [15]:
def prep_df(df: dd.DataFrame) -> dd.DataFrame:
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = df[df.fare_amount > 0]  # avoid divide-by-zero
    df['tip_fraction'] = df.tip_amount / df.fare_amount
    df['high_tip'] = (df['tip_fraction'] > 0.2) # class label
    
    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.weekday
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.weekofyear
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [y_col]].astype(float).fillna(-1)
    
    return df

# Hyperparameter tuning

Use the same sample as [laptop.ipynb](laptop.ipynb) for comparison purposes.

In [16]:
taxi_sample = dd.read_csv(
    f'{data_path}/yellow_tripdata_2019-01.csv',
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
)
taxi_sample = prep_df(taxi_sample)
taxi_sample = taxi_sample.sample(frac=0.05, replace=False, random_state=seed)

`persist()` tells Dask to execute all steps of the task graph up until this point, so that way the grid search doesn't trigger repeated CSV downloading.

In [17]:
%%time
from dask.distributed import wait

taxi_sample = taxi_sample.persist()
_ = wait(taxi_sample)

len(taxi_sample)

CPU times: user 3.53 s, sys: 409 ms, total: 3.94 s
Wall time: 1min 1s


382912

Note that several of the imports here are different than [laptop.ipynb](laptop.ipynb). This is because Dask-ML has its own implementation of certain scikit-learn classes that are optimized for cluster computing. Of particular note is Dask-ML's `GridSearchCV`. It optimizes the task graph by [skipping repeated operations](https://ml.dask.org/hyper-parameter-search.html#avoid-repeated-work). This results in pretty significant performance gains when compared to scikit's `GridSearchCV`.

In [18]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, DummyEncoder, Categorizer
from dask_ml.model_selection import GridSearchCV

lr = LogisticRegression(
    solver='saga',
    penalty='elasticnet', 
    l1_ratio=0.5,
    max_iter=100, 
    random_state=seed,
)
pipeline = Pipeline(steps=[
    ('categorize', Categorizer(columns=categorical_feat)),
    ('onehot', DummyEncoder(columns=categorical_feat)),
    ('scale', ColumnTransformer(transformers=[('num', StandardScaler(), numeric_feat)])),
    ('clf', lr),
])

params = {
    'clf__l1_ratio': [0.2, 0.3, 0.5, 0.7, 0.9],
}

grid_search = GridSearchCV(
    pipeline, 
    params,
    cv=3, 
    scoring='accuracy',
)

In [19]:
%%time
_ = grid_search.fit(taxi_sample[features], taxi_sample[y_col])
grid_search.best_score_

CPU times: user 4.93 s, sys: 548 ms, total: 5.47 s
Wall time: 27.5 s


0.5216890564933979

# Getting there...

It's great that Dask can be used on a laptop to analyze datasets that don't fit in memory. However, some parts of this notebook take quite a while to execute, because Dask can only pull a few partitions into memory at one time. It can also be pretty slow pulling down data from S3 if your internet connection is slow.

Check out [dask-cluster.ipynb](dask-cluster.ipynb) to see the same code scaled across a cluster of machines in AWS. Spoiler- it will be a lot faster!