In [None]:
import warnings
warnings.filterwarnings('ignore')

<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask_horizontal.svg"
     width="60%"
     alt="Dask logo\" />

# Parallel and Distributed Machine Learning
So far we have seen how Dask makes data analysis scalable with parallelization via Dask DataFrames and Dask Array. Let's now see how [Dask-ML](https://ml.dask.org/) allows us to do machine learning in a parallel and distributed manner. Note, machine learning is really just a special case of data analysis (one that automates analytical model building), so the 💪 Dask gains 💪 we've seen will apply here as well!

> If you'd like a refresher on the difference between parallel and distributed computing, [here's a good discussion on StackExchange](https://cs.stackexchange.com/questions/1580/distributed-vs-parallel-computing). You can also check out [The Beginner's Guide to Distributed Computing](https://towardsdatascience.com/the-beginners-guide-to-distributed-computing-6d6833796318).

### What we'll cover:
1. Types of scaling problems in ML
2. Scale Scikit-Learn with Joblib+Dask (compute-bound)
3. Scale Scikit-Learn with Dask-ML (memory-bound)
4. Scale XGBoost with Dask

## Types of scaling problems in machine learning

There are two main types of scaling challenges you can run into in your machine learning workflow: scaling the **size of your data** and scaling the **size of your model**. That is:

1. **Memory-bound problems**: Data is larger than RAM, and sampling isn't an option.
2. **CPU-bound problems**: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.

Here's a handy diagram for visualizing these problems:

In [None]:
from IPython.display import Image
Image(url="images/dask-zones.png", width=400)

In the bottom-left quadrant, your datasets are not too large (they fit comfortably in RAM) and your model is not too large either. When these conditions are met, you are much better off using something like scikit-learn, XGBoost, and similar libraries. You don't need to leverage multiple machines in a distributed manner with a library like Dask-ML. However, if you are in any of the other quadrants, distributed machine learning is the way to go.

Summarizing: 

* For in-memory problems, just use scikit-learn (or your favorite ML library).
* For large models, use `dask` and `joblib` together with your favorite scikit-learn estimator.
* For large datasets, use `dask_ml` or `dask-xgboost` estimators.

## Scikit-Learn Refresher

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/scikit_learn_logo_small.svg" 
     width="30%"
     alt="sklearn logo\" />

In this section, we'll quickly run through a typical Scikit-Learn workflow:

* Load some data (in this case, we'll generate it)
* Import the Scikit-Learn module for our chosen ML algorithm
* Create an estimator for that algorithm and fit it with our data
* Inspect the learned attributes
* Check the accuracy of our model


### Generate some random data

In [None]:
from sklearn.datasets import make_classification

# Generate data
X, y = make_classification(n_samples=10000, n_features=4, random_state=0)

In [None]:
# Let's take a look at X
X[:8]

In [None]:
# Let's take a look at y
y[:8]

### Fitting a SVC

For this example, we will fit a [Support Vector Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html).

In [None]:
from sklearn.svm import SVC

estimator = SVC(random_state=0)
estimator.fit(X, y)

We can inspect the learned features by taking a look a the `support_vectors_`:

In [None]:
estimator.support_vectors_[:4]

And we check the accuracy:

In [None]:
estimator.score(X, y)

### Hyperparameter Optimization

There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.
As the name implies, this does a brute-force search over a grid of hyperparameter combinations. Scikit-learn provides tools to automatically find the best parameter combinations via cross-validation (which is the "CV" in `GridSearchCV`).

In [None]:
from sklearn.model_selection import GridSearchCV

In [None]:
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

# Brute-force search over a grid of hyperparameter combinations
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)

In [None]:
grid_search.best_params_, grid_search.best_score_

## Compute Bound: Single-machine parallelism with Joblib
With Joblib, we can say that Scikit-Learn has *single-machine* parallelism.

**Any Scikit-Learn estimator that can operate in parallel exposes an `n_jobs` keyword**, which tells you how many tasks to run in parallel. Specifying `n_jobs=-1` jobs means running the maximum possible number of tasks in parallel.

In [None]:
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

Notice that the computation above it is faster than before. 

## Compute Bound: Multi-machine parallelism with Dask

In this section we'll see how Dask (plus Joblib and Scikit-Learn) gives us multi-machine parallelism. Here's what our grid search graph would look like if we allowed Dask to schedule our training "jobs" over multiple machines in our cluster:

Dask can talk to Scikit-Learn (via Joblib) so that our *Dask cluster* is used to train a model. 



In [None]:
from dask.distributed import Client

# create local Dask cluster with 8 workers (cores)
client = Client(n_workers=8)
client

**Note:** Click on Cluster Info, to see more details about the cluster. You can see the configuration of the cluster and some other specs. 

We can expand our problem by specifying more hyperparameters before training, and see how using `dask` as backend can help us. 

In [None]:
param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    'kernel': ['rbf', 'poly', 'linear'],
    'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)

### Dask parallel backend

We can fit our estimator with multi-machine parallelism by quickly *switching to a Dask parallel backend* when using joblib. 

In [None]:
import joblib

In [None]:
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)

**What just happened?**

Dask-ML developers worked with the Scikit-Learn and Joblib developers to implement a Dask parallel backend. So internally, scikit-learn now talks to Joblib, and Joblib talks to Dask, and Dask is what handles scheduling all of those tasks on multiple machines.

The best parameters and best score:

In [None]:
grid_search.best_params_, grid_search.best_score_

## But that was cheating...sort of

In [None]:
import coiled 
cluster = coiled.Cluster(
    name="intro-to-dask",
    n_workers=10,
    worker_memory="16GiB",
    package_sync=True,
)

In [None]:
from distributed import Client
client = Client(cluster)

In [None]:
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)

## Memory Bound: Parallel Machine Learning with Dask-ML

We have seen how to work with larger models, but sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on Dask `Arrays` and `DataFrames` that may be larger than your machine's RAM.

In [None]:
from dask_ml.datasets import make_regression
from dask_ml.linear_model import LinearRegression
from dask_ml.model_selection import train_test_split

In [None]:
# create synthetic regression data
X, y = make_regression(n_samples=10_000, chunks=100)

In [None]:
X

In [None]:
# create train/test splits
X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=21, test_size=0.3, convert_mixed_types=True)

In [None]:
# instantiate model
lr = LinearRegression()

### Exercise:
Can you fit this parallel Dask-ML `LinearRegression()` model on the training data?

In [None]:
# %load solutions/ml-ex-1.py
lr.fit(X_train, y_train)

### Exercise:
Can you make predictions with this `LinearRegression()` model?

In [None]:
# %load solutions/ml-ex-2.py
y_pred = lr.predict(X_test)

In [None]:
lr.score(X,y)

## Training XGBoost in Parallel

Dask-ML implements some of the most popular machine learning algorithms for parallel processing, but not all of them.

For XGBoost, the maintainers of Dask and XGBoost took a different approach: they built a Dask Backend for XGBoost so you can run XGBoost in parallel with Dask straight from your normal XGBoost library.

Running an XGBoost model with the distributed Dask backend requires minimal changes to your regular XGBoost code:

```python
import xgboost as xgb

# Create the XGBoost DMatrix for our training and testing splits
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train)
dtest = xgb.dask.DaskDMatrix(client, X_test, y_test)

# Set model parameters (XGBoost defaults)
params = {
    "max_depth": 6,
    "gamma": 0,
    "eta": 0.3,
    "min_child_weight": 30,
    "objective": "reg:squarederror",
    "grow_policy": "depthwise"
}

# train the model
output = xgb.dask.train(
    client, params, dtrain, num_boost_round=4,
    evals=[(dtrain, 'train')]
)

# make predictions
y_pred = xgb.dask.predict(client, output, dtest)
```

See [this step-by-step tutorial](https://coiled.io/blog/dask-xgboost-python-example/) if you're interested to learn more.

## Extra resources:

- [Dask-ML documentation](https://ml.dask.org/)
- [Getting started with Coiled](https://docs.coiled.io/user_guide/getting_started.html)