<img src="https://raw.githubusercontent.com/dask/dask/main/docs/source/images/dask_horizontal.svg"
     width="60%"
     alt="Dask logo\" />

# Parallel and Distributed Machine Learning

The material in this notebook was based on the open-source content from [Dask's tutorial repository](https://github.com/dask/dask-tutorial) and the [Machine learning notebook](https://github.com/coiled/data-science-at-scale/blob/master/3-machine-learning.ipynb) from data science at scale from coiled

So far we have seen how Dask makes data analysis scalable with parallelization via Dask DataFrames. Let's now see how [Dask-ML](https://ml.dask.org/) allows us to do machine learning in a parallel and distributed manner. Note, machine learning is really just a special case of data analysis (one that automates analytical model building), so the 💪 Dask gains 💪 we've seen will apply here as well!

> If you'd like a refresher on the difference between parallel and distributed computing, [here's a good discussion on StackExchange](https://cs.stackexchange.com/questions/1580/distributed-vs-parallel-computing). You can also check out my [Beginner's Guide to Distributed Computing](https://towardsdatascience.com/the-beginners-guide-to-distributed-computing-6d6833796318).

What we'll cover:
1. Scale Scikit-Learn with Joblib+Dask (compute-bound)
2. Scale Scikit-Learn with Dask-ML (memory-bound)
3. Scale XGBoost with Dask


## Types of scaling problems in machine learning

There are two main types of scaling challenges you can run into in your machine learning workflow: scaling the **size of your data** and scaling the **size of your model**. That is:

1. **CPU-bound problems**: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
2. **Memory-bound problems**: Data is larger than RAM, and sampling isn't an option.

Here's a handy diagram for visualizing these problems:

In [36]:
from IPython.display import Image
Image(url="images/dask-zones.png", width=500)

In the bottom-left quadrant, your datasets are not too large (they fit comfortably in RAM) and your model is not too large either. When these conditions are met, you are much better off using something like scikit-learn, XGBoost, and similar libraries. You don't need to leverage multiple machines in a distributed manner with a library like Dask-ML. However, if you are in any of the other quadrants, distributed machine learning is the way to go.

Summarizing: 

* For in-memory problems, just use scikit-learn (or your favorite ML library).
* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator.
* For large datasets, use `dask_ml` estimators.

## Scikit-Learn in five minutes

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/scikit_learn_logo_small.svg" 
     width="30%"
     alt="sklearn logo\" />

In this section, we'll quickly run through a typical Scikit-Learn workflow:

* Load some data (in this case, we'll generate it)
* Import the Scikit-Learn module for our chosen ML algorithm
* Create an estimator for that algorithm and fit it with our data
* Inspect the learned attributes
* Check the accuracy of our model

Scikit-Learn has a nice, consistent API:

* You instantiate an `Estimator` (e.g. `LinearRegression`, `RandomForestClassifier`, etc.). All of the models *hyperparameters* (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it's created.
* You call `estimator.fit(X, y)` to train the estimator.
* Use `estimator` to inspect attributes, make predictions, etc. 

Here `X` is an array of *feature variables* (what you're using to predict) and `y` is an array of *target variables* (what we're trying to predict).

### Generate some random data

In [None]:
from sklearn.datasets import make_classification

# Generate data
X, y = make_classification(n_samples=10000, n_features=4, random_state=0)

**Refreshing some ML concepts**

- `X` is the samples matrix (or design matrix). The size of `X` is typically (`n_samples`, `n_features`), which means that samples are represented as rows and features are represented as columns.
- A "feature" (also called an "attribute") is a measurable property of the phenomenon we're trying to analyze. A feature for a dataset of employees might be their hire date, for example.
- `y` are the target values, which are real numbers for regression tasks, or integers for classification (or any other discrete set of values). For unsupervized learning tasks, `y` does not need to be specified. `y` is usually 1d array where the `i`th entry corresponds to the target of the `i`th sample (row) of `X`.

In [None]:
# Let's take a look at X
X[:8]

In [None]:
# Let's take a look at y
y[:8]

### Fitting a SVC

For this example, we will fit a [Support Vector Classifier](https://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html).

In [None]:
from sklearn.svm import SVC

estimator = SVC(random_state=0)
estimator.fit(X, y)

We can inspect the learned features by taking a look a the `support_vectors_`:

In [None]:
estimator.support_vectors_[:4]

And we check the accuracy:

In [None]:
estimator.score(X, y)

### Hyperparameter Optimization

There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.
As the name implies, this does a brute-force search over a grid of hyperparameter combinations. Scikit-learn provides tools to automatically find the best parameter combinations via cross-validation (which is the "CV" in `GridSearchCV`).

In [None]:
from sklearn.model_selection import GridSearchCV

In [None]:
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

# Brute-force search over a grid of hyperparameter combinations
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)

In [None]:
grid_search.best_params_, grid_search.best_score_

## Compute Bound: Single-machine parallelism with Joblib

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/joblib_logo.svg" 
     alt="Joblib logo" 
     width="15%"/>

In this section we'll see how [Joblib](https://joblib.readthedocs.io/en/latest/) ("*a set of tools to provide lightweight pipelining in Python*") gives us parallelism on our laptop. Here's what our grid search graph would look like if we set up six training "jobs" in parallel:

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/unmerged_grid_search_graph.svg" 
     alt="grid search graph" 
     width="75%"/>

With Joblib, we can say that Scikit-Learn has *single-machine* parallelism.

**Any Scikit-Learn estimator that can operate in parallel exposes an `n_jobs` keyword**, which tells you how many tasks to run in parallel. Specifying `n_jobs=-1` jobs means running the maximum possible number of tasks in parallel.

In [None]:
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

Notice that the computation above it is faster than before. If you are running this computation on binder, you might not see a speed-up and the reason for that is that binder instances tend to have only one core with no threads so you can't see any parallelism. 

## Compute Bound: Multi-machine parallelism with Dask

In this section we'll see how Dask (plus Joblib and Scikit-Learn) gives us multi-machine parallelism. Here's what our grid search graph would look like if we allowed Dask to schedule our training "jobs" over multiple machines in our cluster:

<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/merged_grid_search_graph.svg" 
     alt="merged grid search graph" 
     width="100%"/>
     
Dask can talk to Scikit-Learn (via Joblib) so that our *Dask cluster* is used to train a model. 



In [None]:
from dask.distributed import Client

# create local Dask cluster with 8 workers (cores)
client = Client(n_workers=8)
client

**Note:** Click on Cluster Info, to see more details about the cluster. You can see the configuration of the cluster and some other specs. 

We can expand our problem by specifying more hyperparameters before training, and see how using `dask` as backend can help us. 

In [None]:
param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    'kernel': ['rbf', 'poly', 'linear'],
    'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)

### Dask parallel backend

We can fit our estimator with multi-machine parallelism by quickly *switching to a Dask parallel backend* when using joblib. 

In [None]:
import joblib

In [None]:
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)

**What just happened?**

Dask-ML developers worked with the Scikit-Learn and Joblib developers to implement a Dask parallel backend. So internally, scikit-learn now talks to Joblib, and Joblib talks to Dask, and Dask is what handles scheduling all of those tasks on multiple machines.

The best parameters and best score:

In [None]:
grid_search.best_params_, grid_search.best_score_

## Memory Bound: Multi-machine parallelism with Dask-ML

We have seen how to work with larger models, but sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on Dask `Arrays` and `DataFrames` that may be larger than your machine's RAM.

In [None]:
from dask_ml.datasets import make_regression
from dask_ml.linear_model import LinearRegression

In [None]:
X, y = make_regression(n_samples=10_000, chunks=100)

In [None]:
X

In [None]:
lr = LinearRegression()

In [None]:
%%time
lr.fit(X, y)

In [None]:
lr.predict(X)

In [None]:
lr.score(X, y)

This works!

But it does take a while...can we speed this up even more?

In [None]:
client.close()

##  Multi-machine parallelism in the cloud with Coiled

<br>
<img src="https://raw.githubusercontent.com/coiled/data-science-at-scale/master/images/Coiled-Logo_Horizontal_RGB_Black.png"
     alt="Coiled logo" 
     width=25%/>
<br>

In this section we'll see how Coiled allows us to solve machine learning problems with multi-machine parallelism in the cloud.

Coiled, [among other things](https://coiled.io/product/), provides hosted and scalable Dask clusters. **The biggest barriers to entry for doing machine learning at scale are "Do you have access to a cluster?" and "Do you know how to manage it?"** Coiled solves both of those problems. 

We'll connect to the Coiled cluster we launched earlier then connect our Dask client to that cluster.

If you are running on your local machine and not in binder, and you want to give Coiled a try, you can signup [here](https://cloud.coiled.io/login?redirect_uri=/) and reach out to Support to get set up with some free credits. If you installed the environment by following the steps on the repository's [README](https://github.com/coiled/dask-mini-tutorial/blob/main/README.md) you will have `coiled` installed. You will just need to login, by following the steps on the [setup page](https://docs.coiled.io/user_guide/getting_started.html), and you will be ready to go. 

To learn more about how to set up an environment you can visit Coiled documentation on [Creating software environments](https://docs.coiled.io/user_guide/software_environment_creation.html). But for now you can use the envioronment we set up for this tutorial. 

In [None]:
import coiled

In [None]:
# Connect to running cluster by referencing only its name (GET NAME FROM COILED DASHBOARD)
cluster = coiled.Cluster(
    name="dask-tutorial", 
)

Our cluster currently has 20 workers.

Machine Learning is a compute-heavy process and we're working against a tight deadline. Let's scale up to 100 to speed this up.

In [None]:
cluster.scale(100)

In [None]:
from dask.distributed import Client
client = Client(cluster)
client

## Same Linear Regression Model as before

We can use Dask-ML estimators on the cloud to work with larger datasets.

In [None]:
X, y = make_regression(n_samples=2_000_000, chunks=1000) 

Notice we created a dataset with 1000 chunks, a number that can be logically distributed over our n_workers (100).

In [None]:
X

In [None]:
lr = LinearRegression()

In [None]:
%%time
lr.fit(X, y)

In [None]:
lr.predict(X)

In [None]:
lr.score(X, y)

In [None]:
client.close()

## Training XGBoost in Parallel

Dask-ML implements some of the most popular machine learning algorithms for parallel processing, but not all of them.

For XGBoost, the maintainers of Dask and XGBoost took a different approach: they built a Dask Backend for XGBoost so you can run XGBoost in parallel with Dask straight from your normal XGBoost library.

Running an XGBoost model with the distributed Dask backend only requires two changes to your regular XGBoost code:

1. substitute `dtrain = xgb.DMatrix(X_train, y_train)` with `dtrain = xgb.dask.DaskDMatrix(X_train, y_train)`
2. substitute `xgb.train(params, dtrain, ...)` with `xgb.dask.train(client, params, dtrain)`

[Here's a step-by-step tutorial.](https://coiled.io/blog/dask-xgboost-python-example/) that trains XGBoost on 100GB of synthetic data in 4 minutes.



## Extra resources:

- [Dask-ML documentation](https://ml.dask.org/)
- [Getting started with Coiled](https://docs.coiled.io/user_guide/getting_started.html)