<img style="float: right" src="img/saturn.png" width="300" />

# Scaling Machine Learning in Python

## Large datasets

This notebook shows how to process large datasets with Dask and execute machine learning workflows in parallel across the cluster. Specifically, we will cover the following failure scenarios from [02-single-node.ipynb](02-single-node.ipynb):

- Load and process large dataset
- Predict over large dataset
- Train model with large dataset

## Initialize Dask cluster

See [03-hyperparameter.ipynb](03-hyperparameter.ipynb) for more details.

In [None]:
from dask_saturn import SaturnCluster
from dask.distributed import Client

cluster = SaturnCluster(
    scheduler_size='medium',
    worker_size='xlarge',
    n_workers=3,
    nthreads=4,
)
client = Client(cluster)
client.wait_for_workers(3)

client

# Load and process large dataset

## Load data

Our large dataset for this notebook will be NYC taxi data from all of 2019 (in [02-single-node.ipynb](02-single-node.ipynb) we just used one month of 2019). Rather than load the data with pandas' `pd.read_csv`, we will use Dask's [`dd.read_csv` method](https://docs.dask.org/en/latest/dataframe-create.html).

In [None]:
import s3fs
import pandas as pd
import numpy as np
import dask
import dask.dataframe as dd
from dask.distributed import wait
import warnings
warnings.simplefilter("ignore")

s3 = s3fs.S3FileSystem(anon=True)

`dd.read_csv` accepts glob syntax for loading in multiple files. This way, we don't have to write a for loop and concatenate DataFrames like we tried with pandas. 

In [None]:
files_2019 = 's3://nyc-tlc/trip data/yellow_tripdata_2019-*.csv'
s3.glob(files_2019)

<br>

We need to pass a couple of extra arguments to `dd.read_csv`:
- `storage_options=...`: this tells Dask to use anonymous S3 access (we did this with `s3.open` for pandas)
- `assuming_missing=True`: this tells Dask to read all numeric columns as floats. Dask sometimes needs type information up-front to be able to parallelize tasks effectively.

In [None]:
%%time

taxi = dd.read_csv(
    files_2019,
    parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime'],
    storage_options={'anon': True},
    assume_missing=True,
)

In [None]:
taxi

Notice that previewing the `taxi` object doesn't print out the contents of the DataFrame, like with pandas. This is because Dask has not yet loaded any data. It does tell us the number of partitions (i.e. little pandas DataFrames) the big Dask DataFrame has. 

### Exercise

Compute the number of rows in the `taxi` DataFrame (hint: think pandas API!).

In [None]:
<FILL IN>

In [None]:
len(taxi)

--- 

You may have tried `taxi.shape` and gotten some unfamiliar output. This because of Dask's lazy evaluation - Dask doesn't perform any operations until asked to. `len` is a special case that triggers computation. If we want to get the row count of out `taxi.shape`, we need to run `.compute()` on the delayed object.

In [None]:
taxi.shape[0].compute()

In [None]:
taxi_bytes = taxi.memory_usage(deep=True).sum()
taxi_bytes

Notice the above cell completed immediately - a comparable pandas call would take a few seconds to compute the memory usage. You guessed it - lazy evaluation! 

### Exercise

Trigger computation on `taxi_bytes` to get the actual size of the Dask DataFrame.

In [None]:
%%time
print(f"Size (MB): {<FILL IN >}")

In [None]:
%%time
print(f"Size (MB): {taxi_bytes.compute() / 1e6}")

--- 

`.compute()` returns results immediately. Be careful though, because if you run `taxi.compute()` Dask will give you the entire big DataFrame as a pandas object (this will certainly blow up the kernel!).

It it useful in many cases to trigger computation on objects even if you don't want to pull them down to the Jupyter Server. In this case we use `.persist()`, which triggers all computations performed on the DataFrame and holds the results in memory across the _cluster_. It becomes useful when we perform later machine learning operations, as we don't want Dask to be re-parsing CSV files in each iteration of model training.

In [None]:
taxi = taxi.persist()

Notice that the above cell completed immediately but the Dask Dashboard is still doing work. We can use the `wait()` function to block our notebook until the `taxi` DataFrame is fully done persisting.

In [None]:
%%time
_ = wait(taxi)

The cell below will run much faster than before! This is because the DataFrame is loaded up into memory across the cluster, and Dask does not need to download and parse the CSV files again.

In [None]:
%%time
taxi_bytes = taxi.memory_usage(deep=True).sum()
print(f"Size (MB): {taxi_bytes.compute() / 1e6}")

### Messy data - `dask.delayed`

> Advanced topic: This section is optional. You may continue to the **Exploratory Analysis** section if you want to skip this.

Data files aren't always provided in a clean tabular form thats readable with a `read_*` method from pandas or Dask. With [`dask.delayed` functions](https://docs.dask.org/en/latest/delayed.html), we can write a function that processes a single chunk of raw data and then tell Dask to collect these into a Dask DataFrame. We'll illustrate that now with the CSV files, but its always better to use a `dd.read_*` method if your data supports it. We won't cover it more in this workshop, but `dask.delayed` is very flexible and powerful - chances are you will use it for some of your workloads. 

In [None]:
@dask.delayed
def make_data(i):
    return pd.DataFrame([(i,), ], columns=['foo'])

dfs = []
for i in range(10):
    df = make_data(i)
    dfs.append(df)

### Exercise

Define a function, `load_csv` that will return a pandas DataFrame for a given NYC taxi file path. (Hint: a similar function was created in [02-single-node.ipynb](02-single-node.ipynb)). Then call this for the 2019 files and create a Dask DataFrame with `dd.from_delayed`.

In [None]:
%%time

@dask.delayed
def load_csv(file):
    <FILL IN>

dfs = []
for f in s3.glob(files_2019):
    df = load_csv(f)
    dfs.append(df)
taxi_delayed = dd.from_delayed(dfs)

In [None]:
%%time

@dask.delayed
def load_csv(file):
    df = pd.read_csv(
        s3.open(file, mode='rb'),
        parse_dates=['tpep_pickup_datetime', 'tpep_dropoff_datetime']
    )
    return df


dfs = []
for f in s3.glob(files_2019):
    df = load_csv(f)
    dfs.append(df)
taxi_delayed = dd.from_delayed(dfs)

--- 

Notice that this for loop looks very similar to what blew up our kernel in [02-single-node.ipynb](02-single-node.ipynb). Because of Dask's lazy evaluation none of these functions actually pull data until we perform operations with it.

In [None]:
taxi_delayed

In [None]:
taxi_delayed.head()

# Exploratory analysis

We'll go back to using the `taxi` Dask DataFrame we loaded with `dd.read_csv`.

In [None]:
%%time
taxi_describe = taxi.describe().compute().T
np.round(taxi_describe, 3)

## Feature engineering

Notice that this feature engineering code is _exactly_ the same as what we did in [02-single-node.ipynb](02-single-node.ipynb). Dask' DataFrame API matches pandas' API in many places. Check out the [Dask DataFrame docs](https://docs.dask.org/en/latest/dataframe.html#dask-dataframe-copies-the-pandas-api) for more information on what is and is not supported from the pandas API.

In [None]:
# specify feature and label column names
raw_features = [
    'tpep_pickup_datetime', 
    'passenger_count', 
    'tip_amount', 
    'fare_amount',
]
features = [
    'pickup_weekday', 
    'pickup_weekofyear', 
    'pickup_hour', 
    'pickup_week_hour', 
    'pickup_minute', 
    'passenger_count',
]
label = 'tip_fraction'

In [None]:
def prep_df(taxi_df):
    '''
    Generate features from a raw taxi dataframe.
    '''
    df = taxi_df[taxi_df.fare_amount > 0][raw_features].copy()  # avoid divide-by-zero
    df[label] = df.tip_amount / df.fare_amount
     
    df['pickup_weekday'] = df.tpep_pickup_datetime.dt.weekday
    df['pickup_weekofyear'] = df.tpep_pickup_datetime.dt.weekofyear
    df['pickup_hour'] = df.tpep_pickup_datetime.dt.hour
    df['pickup_week_hour'] = (df.pickup_weekday * 24) + df.pickup_hour
    df['pickup_minute'] = df.tpep_pickup_datetime.dt.minute
    df = df[features + [label]].astype(float).fillna(-1)
    
    return df

In [None]:
taxi_feat = prep_df(taxi)
taxi_feat.head()

# Predict over large dataset

## Previously trained model

The [`map_partitions` method](https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.map_partitions) allows execution of arbitrary functions on the partitions of the Dask DataFrame. Remember these partitions are just pandas DataFrames, so any code that works with pandas works here! This enables us to execute a function that performs predictions with a pre-trained model.

First lets get a handle on how to use the `map_partitions` function with a toy example.

Grab one partition from the Dask DataFrame for testing

In [None]:
taxi_feat_part = taxi_feat.partitions[0].compute()
print(type(taxi_feat_part))
print(taxi_feat_part.shape)

In [None]:
def myfunc(df):
    return df['pickup_weekday'] * 5

In [None]:
myfunc(taxi_feat_part)

In [None]:
out = taxi_feat.map_partitions(myfunc)

In [None]:
out

In [None]:
out.head()

Dask will attempt to infer the data type of the function used with `map_partitions`. To be more explict, you should pass a `meta=` argument describing the data type of the output.

In [None]:
out = taxi_feat.map_partitions(
    myfunc,
    meta=pd.Series(dtype='float64')
)

Now let's use `map_partitions` to make predictions from a previously trained model. We'll load the model that was trained with scikit-learn and saved in [02-single-node.ipynb](02-single-node.ipynb).

In [None]:
import cloudpickle
model = cloudpickle.load(open('/tmp/model.pkl', 'rb'))

### Exercise

Write a function that uses the `model` to make a prediction for a given input DataFrame, then execute it with `map_partitions` across the entire `taxi_feat` DataFrame. 

Assume the input DataFrame already has had features created. The output of the function should be a `pd.Series` object that has predictions for each row in the input DataFrame. Validate that your function works properly by executing it with `taxi_feat_part` as input before trying it with `map_partitions`. The output should look something like:

```
0         0.164296
1         0.166451
            ...   
717799    0.165269
717800    0.168916
Length: 717801, dtype: float64
```

In [None]:
def predict(df):
    <FILL IN>
    
preds_sklearn = predict(taxi_feat_part)
preds_sklearn.head()

In [None]:
preds_dask = taxi_feat.map_partitions(
    <FILL IN>
)
preds_dask.head()

In [None]:
def predict(df):
    preds = model.predict(df[features])
    return pd.Series(preds)

preds_sklearn = predict(taxi_feat_part)
preds_sklearn.head()

In [None]:
preds_dask = taxi_feat.map_partitions(
    predict, 
    meta=pd.Series(dtype='float64'),
)
preds_dask.head()

---

In [None]:
len(preds_sklearn)

In [None]:
len(preds_dask)

In [None]:
from dask_ml.metrics import mean_squared_error

mean_squared_error(
    taxi_feat[label].values, 
    preds_dask.values, 
    squared=False,
)

## `ParallelPostFit` wrapper

Dask ML also has a [`ParallelPostFit` meta-estimator](https://ml.dask.org/meta-estimators.html) the wraps a scikit-learn model for parallelized predictions. This is useful in scenarios where it is known up-front that a model needs to be trained on a small amount of data but predictions need to be made for a large amount of data.

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.linear_model import ElasticNet
from sklearn.preprocessing import StandardScaler

from dask_ml.wrappers import ParallelPostFit

pipeline = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', ElasticNet(normalize=False, max_iter=100, l1_ratio=0)),
])

ppf = ParallelPostFit(estimator=pipeline)
ppf_fitted = ppf.fit(taxi_feat_part[features], taxi_feat_part[label])

In [None]:
preds_dask = ppf_fitted.predict(taxi_feat[features])

mean_squared_error(
    taxi_feat[label].values,
    preds_dask, 
    squared=False,
)

## Train model with large dataset

First, we need to split our `taxi_feat` DataFrame into train/test sets.

### Exercise

Use the [`dask_ml.model_selection.train_test_split` function](https://ml.dask.org/modules/generated/dask_ml.model_selection.train_test_split.html) to split into train and test sets. (Hint: the `dask_ml` function works the same as the `sklearn` function.)

In [None]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = <FILL IN>

In [None]:
from dask_ml.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(
    taxi_feat[features], 
    taxi_feat[label], 
    test_size=0.3,
    random_state=42
)

--- 

Due to Dask's lazy evaluation, these arrays have not been computed yet. To ensure the rest of our ML code runs quickly, lets kick off computation on the cluster by calling `persist()` on the arrays. Note that there is a `dask.persist` function that accepts multiple objects rather than calling `.persist()` individually. This is helpful for objects that share upstream tasks - Dask will avoid re-computing the shared tasks.

In [None]:
%%time
X_train, X_test, y_train, y_test = dask.persist(
    X_train, X_test, y_train, y_test,
)
_ = wait(X_train)

In [None]:
len(X_train), len(y_train)

In [None]:
len(X_test), len(y_test)

## Dask ML models

The dask-ml package has parallel implementations of machine learning algorithms that do not have parallel implementations in scikit-learn or other packages. These currently cover linear models and clustering.

In [None]:
from sklearn.pipeline import Pipeline

from dask_ml.linear_model import LinearRegression
from dask_ml.preprocessing import StandardScaler
from dask_ml.metrics import mean_squared_error
from dask_ml.model_selection import GridSearchCV

lr = Pipeline(steps=[
    ('scale', StandardScaler()),
    ('clf', LinearRegression(penalty='l2', max_iter=100)),
])

In [None]:
X_train_arr = X_train.to_dask_array(lengths=True)
y_train_arr = y_train.to_dask_array(lengths=True)
X_test_arr = X_test.to_dask_array(lengths=True)
y_test_arr = y_test.to_dask_array(lengths=True)

### Exercise

Train the `lr` model with `X_train_arr` and `y_train_arr` as input.

> Note: this will take a few minutes because we are training with a pretty large dataset. You can scale up your cluster if you want it to execute faster!

In [None]:
%%time
lr_fitted = <FILL IN>

In [None]:
%%time

lr_fitted = lr.fit(
    X_train_arr,
    y_train_arr,
)

--- 

In [None]:
lr_preds = lr_fitted.predict(X_test_arr)
mean_squared_error(y_test_arr, lr_preds, squared=False)

## XGBoost

The `dask-xgboost` package has an integration between XGBoost and Dask that parallelizes model training and prediction across a Dask cluster. 

> Note: The native XGBoost library also has an integration in the `xgboost.dask` module that will become the recommended approach in the future.

In [None]:
from dask_xgboost import XGBRegressor

xgb = XGBRegressor(
    objective="reg:squarederror",
    tree_method='approx',
    learning_rate=0.1,
    max_depth=5,
    n_estimators=100,
)

In [None]:
%%time

xgb_fitted = xgb.fit(
    X_train_arr,
    y_train_arr,
)

In [None]:
xgb_preds = xgb_fitted.predict(X_test_arr)
mean_squared_error(y_test_arr, xgb_preds, squared=False)

### Incremental learning

Dask ML can hook into scikit-learn's incremental training features with the [`Incremental` meta-estimator](https://ml.dask.org/incremental.html). Any model that implements a `partial_fit()` method can be utilized with this meta-estimator. We will not cover `Incremental` in this tutorial (`ElasticNet` does not have a `partial_fit()` method). 