# Scaling Machine Learning with Python and Dask

<img src="https://docs.dask.org/en/latest/_images/dask_horizontal.svg" width="400">

The code in this notebook uses [`dask-cloudprovider`](https://github.com/dask/dask-cloudprovider) to create a Dask cluster in Amazon ECS, a container orchestration service from AWS.

This noteboook assumes that you've already configured access to AWS. See [the AWS docs](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/quickstart.html#configuration) for details on how to do so.

In [None]:
# !pip install blosc==1.9.2 lz4==3.1.0 dask==2.27.0 dask-ml==1.6.0 numpy==1.18.1 scikit-learn==0.23.2

In [None]:
import os
import time

import dask.dataframe as dd
import datetime
import numpy as np
import pandas as pd
import s3fs
import warnings

from dask_cloudprovider import FargateCluster
from dask.distributed import Client, wait

from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.ensemble import RandomForestRegressor
from dask_ml.compose import ColumnTransformer
from dask_ml.preprocessing import StandardScaler, DummyEncoder, Categorizer
from dask_ml.model_selection import GridSearchCV

In [None]:
numeric_feat = [
    'pickup_weekday',
    'pickup_weekofyear',
    'pickup_hour',
    'pickup_week_hour',
    'pickup_minute',
    'passenger_count',
]
categorical_feat = [
    'PULocationID',
    'DOLocationID',
]
features = numeric_feat + categorical_feat
y_col = 'tip_fraction'

# Initialize Dask Cluster

In [None]:
n_workers = 6

os.environ["AWS_DEFAULT_REGION"] = "us-east-2"

cluster = FargateCluster(
    image="daskdev/dask:latest",
    worker_mem=30720,
    n_workers=n_workers,
    fargate_use_private_ip=False,
    scheduler_timeout="45 minutes",
    environment={
        "EXTRA_PIP_PACKAGES": "dask-ml==1.6.0 scikit-learn==0.23.2 s3fs"
    }
)
client = Client(cluster)
cluster

Open the dashboard (link above ^) and watch it when you execute some commands, you'll see which tasks are running across the cluster.

In [None]:
client.scheduler_info()

I changed my mind...let's add more workers! `FargateCluster` sub-classes `SpecCluster` from `dask.distributed`. That class allows you to programmatically scale a cluster up and down.

Run the line below, then visit the ECS console in AWS. You should see two more workers spin up!

In [None]:
cluster.scale(n_workers + 2)

# Load data and feature engineering

The code below creates a Dask Dataframe from a collection of CSV files in S3. Doesn't it look like `pandas` code?

After this cell, no data has actually been pulled. The DataFrame is just a task graph at this point, and won't be computed until we ask it for something.

In [None]:
warnings.simplefilter("ignore")

fs = s3fs.S3FileSystem()
csvs = fs.ls('s3://nyc-tlc/trip data/')
csvs = [
    f"s3://{x}" for x in csvs
    if 'yellow' in x and ('2018' in x)
]

cols = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'passenger_count', 'trip_distance', 'RatecodeID', 'store_and_fwd_flag', 'PULocationID', 'DOLocationID', 'payment_type', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'total_amount']

ddf = dd.read_csv(
    csvs,
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
    usecols=cols
).sample(frac=0.01, replace=False)

Open up the Dask dashboard before running the line below. Then you can watch the individual tasks that have to contribute to get an answer to this

> how many rows are there in ddf?

In [None]:
%%time
print(f"Num rows: {len(ddf)}")

In [None]:
print(f"Size: {ddf.memory_usage(deep=True).sum().compute() / 1e6} MB")

Dask Dataframes as lazily evaluated. Right now, that dataframe is just a collection of function calls waiting to be executed.

To materialize it (so we don't have to keep re-reading the data from S3), we can use `persist()`. This tells the scheduler to tell workers to execute all the tasks needed to read in the data. `persist()` is asynchronous...it won't wait for all of those tasks to complete before returning.

Using `wait()` allows us to say "wait until the datta frame has been completely read".

In [None]:
%%time
ddf = ddf.persist()
_ = wait(ddf)

In [None]:
%%time
len(ddf)

In [None]:
def prep_df(df: dd.DataFrame) -> dd.DataFrame:
    """
    Generate features from a raw taxi dataframe.
    """
    df = df[df.fare_amount > 0]  # avoid divide-by-zero
    df['tip_fraction'] = df.tip_amount / df.fare_amount
    
    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.weekday
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.weekofyear
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [y_col]].astype(float).fillna(-1)
    
    return df
    
taxi_train = prep_df(ddf)

# Run grid search

We use the preprocessing and `GridSearchCV` classes from dask-ml, but still use the scikit-learn `RandomForestRegressor` model.

In [None]:
pipeline = Pipeline(steps=[
    ('categorize', Categorizer(columns=categorical_feat)),
    ('onehot', DummyEncoder(columns=categorical_feat)),
    ('scale', ColumnTransformer(transformers=[('num', StandardScaler(), numeric_feat)])),
    ('reg', RandomForestRegressor())
])

params = {
    'reg__n_estimators': [50, 100],
    'reg__max_depth': [3, 7]
}

grid_search = GridSearchCV(
    pipeline,
    params,
    cv=3,
    scoring='neg_mean_squared_error'
)

Open up the Dask dashboard after you run the cell below, you'll see the grid search in action!

In [None]:
%%time
_ = grid_search.fit(taxi_train[features], taxi_train[y_col])

In [None]:
grid_search.best_params_

Let's check the MSE from the best model, and compare that to the summary statistics from the target to see how closely we've fit to the training data.

In [None]:
grid_search.best_score_

In [None]:
taxi_train[y_col].describe().compute()

## Save model

The fitted pipeline object includes a `best_estimator_`, a serializable model fit with the set of hyperparameters that had the best performance.

In [None]:
import cloudpickle

with open("tip-predictor.pkl", "wb") as f:
    cloudpickle.dump(grid_search.best_estimator_, f)

## References

* https://docs.dask.org/en/latest/setup/docker.html