# Getting up to speed with Dask

## Part 2: Dask to the rescue!

We will do the same analysis as Part 1 with the same laptop-sized machine, except we will be using a rougly 10x larger dataset

AWS EC2 instance type: r5.xlarge (2 CPU, 16GB RAM)

Dask will work without intializing a distributed backend, but it's [recommended to do so](https://docs.dask.org/en/latest/setup/single-distributed.html) even on a laptop. To initialize, start a `Client` like so:

In [2]:
from dask.distributed import Client

client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:46179  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 16.25 GB


In [3]:
import pandas as pd
import numpy as np
import datetime
import s3fs
from pathlib import Path

# change this path if you changed in Part 0
data_path = Path('data')
seed = 42

# Load and explore data

In [4]:
taxi_dtypes = {
    'store_and_fwd_flag': str,
    'RatecodeID': 'float64',
    'VendorID': 'float64',
    'passenger_count': 'float64',
    'payment_type': 'float64',
}

def load_csv(f):
    """ Same as Part 1"""
    return pd.read_csv(
        f,
        dtype=taxi_dtypes, 
        parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

In [5]:
# new dask imports
from dask import delayed
import dask.dataframe as dd

## The hard way

We could parallelize our for-loop that blew up in Part 1 using [dask.delayed](https://docs.dask.org/en/latest/delayed.html).

NOTE: There is a better way below! This is just for educational purposes.

In [6]:
%%time

dfs = []
for f in data_path.iterdir():
    df = delayed(load_csv)(f)  # see the delayed() call
    dfs.append(df)
taxi = dd.from_delayed(dfs)  # from_delayed rather than pd.concat

CPU times: user 436 ms, sys: 64 ms, total: 500 ms
Wall time: 15.2 s


In [7]:
dfs

[Delayed('load_csv-863adce6-516b-46c6-b156-fdea6e5a113f'),
 Delayed('load_csv-0b46a36e-f5ff-4755-a3df-463e3dafba00'),
 Delayed('load_csv-556f630c-7049-4274-8328-10c83792566e'),
 Delayed('load_csv-eafa309f-5067-422f-9441-e77b9c6f9034'),
 Delayed('load_csv-ef165669-ca8f-4db4-9108-76ad14adf30f'),
 Delayed('load_csv-26533d43-621b-47d0-b99d-9581a00f689a'),
 Delayed('load_csv-5cca96e7-58b3-4fa5-ac9e-c0a104a1621b'),
 Delayed('load_csv-458953cf-ddc5-4f2b-a03c-882b3c5b12db'),
 Delayed('load_csv-7c92b38c-6f20-4ea9-a945-66936763b159'),
 Delayed('load_csv-4f8194eb-a35e-47d0-9bad-bfb35d915164'),
 Delayed('load_csv-9f0ba641-eb6d-4691-b9dd-be8a9c35d325'),
 Delayed('load_csv-5cf9bd5f-65e2-4505-a0b5-5c39d0fc1da0')]

In [8]:
%%time
df0 = dfs[0].compute()

CPU times: user 2.04 s, sys: 2.62 s, total: 4.65 s
Wall time: 18.7 s


In [9]:
df0.head()

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
0,1.0,2019-12-01 00:26:58,2019-12-01 00:41:45,1.0,4.2,1.0,N,142,116,2.0,14.5,3.0,0.5,0.0,0.0,0.3,18.3,2.5
1,1.0,2019-12-01 00:12:08,2019-12-01 00:12:14,1.0,0.0,1.0,N,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
2,1.0,2019-12-01 00:25:53,2019-12-01 00:26:04,1.0,0.0,1.0,N,145,145,2.0,2.5,0.5,0.5,0.0,0.0,0.3,3.8,0.0
3,1.0,2019-12-01 00:12:03,2019-12-01 00:33:19,2.0,9.4,1.0,N,138,25,1.0,28.5,0.5,0.5,10.0,0.0,0.3,39.8,0.0
4,1.0,2019-12-01 00:05:27,2019-12-01 00:16:32,2.0,1.6,1.0,N,161,237,2.0,9.0,3.0,0.5,0.0,0.0,0.3,12.8,2.5


In [10]:
type(df0)

pandas.core.frame.DataFrame

In [11]:
taxi

Unnamed: 0_level_0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge
npartitions=12,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1
,float64,datetime64[ns],datetime64[ns],float64,float64,float64,object,int64,int64,float64,float64,float64,float64,float64,float64,float64,float64,float64
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...


In [12]:
%%time
len(taxi)

CPU times: user 4.8 s, sys: 139 ms, total: 4.94 s
Wall time: 2min 22s


84399019

## Okay, let's do it the better way

`dask.dataframe.read_csv` has the same API as `pandas.dataframe.read_csv`

In [13]:
%%time

taxi = dd.read_csv(
    'data/yellow_tripdata_2019-*.csv',
    dtype=taxi_dtypes, 
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'])

CPU times: user 20.5 ms, sys: 950 µs, total: 21.4 ms
Wall time: 21.1 ms


In [14]:
%%time
len(taxi)

CPU times: user 4.89 s, sys: 400 ms, total: 5.29 s
Wall time: 2min 17s


84399019

<br>

Note the calls to `.compute()`. Dask executes _lazily_ meaning it won't do something unless you explicitly tell it to. Peek at the dashboard while these are executing to see the progress!

In [15]:
%%time
taxi.memory_usage(deep=True).sum().compute() / 1e9

CPU times: user 4.95 s, sys: 624 ms, total: 5.58 s
Wall time: 2min 26s


16.367014316

<br>
This takes a while

In [16]:
%%time
np.round(taxi.describe().compute(), 3).T

CPU times: user 3min 9s, sys: 10.4 s, total: 3min 19s
Wall time: 17min 55s


Unnamed: 0,count,mean,std,min,25%,50%,75%,max
VendorID,84152418.0,1.645,0.498,1.0,1.0,2.0,2.0,4.0
passenger_count,84152418.0,1.563,1.208,0.0,1.0,1.0,2.0,9.0
trip_distance,84399019.0,3.001,8.091,-37264.53,1.07,1.93,8.82,45977.22
RatecodeID,84152418.0,1.061,0.76,1.0,1.0,1.0,1.0,99.0
PULocationID,84399019.0,163.158,66.016,1.0,132.0,162.0,234.0,265.0
DOLocationID,84399019.0,161.353,70.251,1.0,116.0,163.0,236.0,265.0
payment_type,84152418.0,1.289,0.479,1.0,1.0,1.0,2.0,5.0
fare_amount,84399019.0,13.344,174.375,-1856.0,7.0,11.0,32.04,943274.8
extra,84399019.0,1.087,1.249,-60.0,0.0,1.0,3.0,535.38
mta_tax,84399019.0,0.495,0.067,-0.5,0.5,0.5,0.5,212.42


# Feature engineering

In [17]:
def make_features(df):
    """ Same code from Part 1 """
    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.weekday
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.isocalendar().week.astype(int)
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df['pickup_year_seconds'] = (df.tpep_pickup_datetime - datetime.datetime(2019, 1, 1, 0, 0, 0)).dt.seconds
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['store_and_fwd_flag'] = (df.store_and_fwd_flag == 'Y').astype(int)
    df['VendorID'] = df.VendorID.fillna(-1)
    df['RatecodeID'] = df.RatecodeID.fillna(-1)

In [18]:
%%time

make_features(taxi)

CPU times: user 54.2 ms, sys: 4.33 ms, total: 58.5 ms
Wall time: 75.1 ms


In [19]:
%%time

taxi.head()

CPU times: user 101 ms, sys: 18.5 ms, total: 120 ms
Wall time: 2.06 s


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,...,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,pickup_weekday,pickup_weekofyear,pickup_hour,pickup_minute,pickup_year_seconds,pickup_week_hour
0,1.0,2019-01-01 00:46:40,2019-01-01 00:53:20,1.0,1.5,1.0,0,151,239,1.0,...,0.0,0.3,9.95,,1,1,0,46,2800,24
1,1.0,2019-01-01 00:59:47,2019-01-01 01:18:59,1.0,2.6,1.0,0,239,246,1.0,...,0.0,0.3,16.3,,1,1,0,59,3587,24
2,2.0,2018-12-21 13:48:30,2018-12-21 13:52:40,3.0,0.0,1.0,0,236,236,1.0,...,0.0,0.3,5.8,,4,51,13,48,49710,109
3,2.0,2018-11-28 15:52:25,2018-11-28 15:55:45,5.0,0.0,1.0,0,193,193,2.0,...,0.0,0.3,7.55,,2,48,15,52,57145,63
4,2.0,2018-11-28 15:56:57,2018-11-28 15:58:33,5.0,0.0,2.0,0,193,193,2.0,...,0.0,0.3,55.55,,2,48,15,56,57417,63


# Machine learning

In [20]:
# same as Part 1
numeric_feat = [
    'pickup_weekday', 
    'pickup_weekofyear', 
    'pickup_hour', 
    'pickup_minute', 
    'pickup_year_seconds',
    'pickup_week_hour', 
    'passenger_count',
]
categorical_feat = [
    'VendorID', 
    'RatecodeID', 
    'store_and_fwd_flag',
    'PULocationID',
    'DOLocationID',
]
features = numeric_feat + categorical_feat
y_col = 'total_amount'

In [21]:
# note the dask_ml imports rather than sklearn
from dask_ml.model_selection import train_test_split
from dask_ml.metrics import mean_squared_error
from xgboost.dask import DaskXGBRegressor

In [22]:
%%time

X_train, X_test, y_train, y_test = train_test_split(
    taxi[features], taxi[y_col], test_size=0.33, random_state=seed, shuffle=True)

CPU times: user 4.68 ms, sys: 3.7 ms, total: 8.38 ms
Wall time: 10 ms


<br>

Remember, nothing happens yet because of lazy evaluation. If you have the RAM, you can call `df.persist()` to avoid repeated CSV loading

(but if you had the RAM, you could do without Dask)

In [23]:
# X_train.persist()
# y_train.persist()

In [24]:
xgb = DaskXGBRegressor(
    n_estimators=10, 
    max_depth=3, 
    learning_rate=0.1, 
    random_state=seed, 
    n_jobs=-1,
)

In [None]:
%%time

_ = xgb.fit(X_train, y_train)

In [None]:
%%time

# get test RMSE
preds = xgb.predict(X_test)
np.sqrt(mean_squared_error(preds, y_test.to_dask_array()))