# Parallel and Distributed Machine Learning

[Dask-ML](https://dask-ml.readthedocs.io) has resources for parallel and distributed machine learning.

## Types of Scaling

There are a couple of distinct scaling problems you might face.
The scaling strategy depends on which problem you're facing.

1. CPU-Bound: Data fits in RAM, but training takes too long. Many hyperparameter combinations, a large ensemble of many models, etc.
2. Memory-bound: Data is larger than RAM, and sampling isn't an option.

![](images/ml-dimensions.png)

* For in-memory problems, just use scikit-learn (or your favorite ML library).
* For large models, use `dask_ml.joblib` and your favorite scikit-learn estimator
* For large datasets, use `dask_ml` estimators

## Scikit-Learn in 5 Minutes

Scikit-Learn has a nice, consistent API.

1. You instantiate an `Estimator` (e.g. `LinearRegression`, `RandomForestClassifier`, etc.). All of the models *hyperparameters* (user-specified parameters, not the ones learned by the estimator) are passed to the estimator when it's created.
2. You call `estimator.fit(X, y)` to train the estimator.
3. Use `estimator` to inspect attributes, make predictions, etc. 

Let's generate some random data.

In [5]:
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=10000, n_features=4, random_state=0)
X[:8]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-1.90879217, -1.1602627 , -0.27364545, -0.82766028],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [-1.17047054,  0.02212382, -2.17376797, -0.13421976],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959],
       [ 1.68616989,  1.6329131 , -1.42072654,  1.04050557],
       [-0.93912893, -1.02270838,  1.10093827, -0.63714432]])

In [2]:
y[:8]

array([0, 0, 1, 0, 0, 0, 0, 1])

We'll fit a Support Vector Classifier.

In [3]:
from sklearn.svm import SVC

Create the estimator and fit it.

In [4]:
estimator = SVC(random_state=0)
estimator.fit(X, y)

SVC(random_state=0)

Inspect the learned attributes.

In [5]:
estimator.support_vectors_[:4]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [ 0.79010037,  0.68530624, -0.44740487,  0.44692959]])

Check the accuracy.

In [6]:
estimator.score(X, y)

0.905

## Hyperparameters

Most models have *hyperparameters*. They affect the fit, but are specified up front instead of learned during training.

In [7]:
estimator = SVC(C=0.00001, shrinking=False, random_state=0)
estimator.fit(X, y)
estimator.support_vectors_[:4]

array([[-0.77244139,  0.3607576 , -2.38110133,  0.08757   ],
       [ 1.14946035,  0.62254594,  0.37302939,  0.45965795],
       [-0.77694695,  0.31434299, -2.26231851,  0.06339125],
       [-1.17047054,  0.02212382, -2.17376797, -0.13421976]])

In [8]:
estimator.score(X, y)

0.5007

## Hyperparameter Optimization

There are a few ways to learn the best *hyper*parameters while training. One is `GridSearchCV`.
As the name implies, this does a brute-force search over a grid of hyperparameter combinations.

In [9]:
from sklearn.model_selection import GridSearchCV

In [10]:
%%time
estimator = SVC(gamma='auto', random_state=0, probability=True)
param_grid = {
    'C': [0.001, 10.0],
    'kernel': ['rbf', 'poly'],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
[CV] END ................................C=0.001, kernel=rbf; total time=   8.6s
[CV] END ................................C=0.001, kernel=rbf; total time=   8.9s
[CV] END ...............................C=0.001, kernel=poly; total time=   3.7s
[CV] END ...............................C=0.001, kernel=poly; total time=   3.7s
[CV] END .................................C=10.0, kernel=rbf; total time=   2.3s
[CV] END .................................C=10.0, kernel=rbf; total time=   2.1s
[CV] END ................................C=10.0, kernel=poly; total time=   4.1s
[CV] END ................................C=10.0, kernel=poly; total time=   3.7s
CPU times: user 43.7 s, sys: 1.13 s, total: 44.8 s
Wall time: 45.7 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

## Single-machine parallelism with scikit-learn

![](images/unmerged_grid_search_graph.svg)

Scikit-Learn has nice *single-machine* parallelism, via Joblib.
Any scikit-learn estimator that can operate in parallel exposes an `n_jobs` keyword.
This controls the number of CPU cores that will be used.

In [11]:
%%time
grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=2, n_jobs=-1)
grid_search.fit(X, y)

Fitting 2 folds for each of 4 candidates, totalling 8 fits
[CV] END ................................C=0.001, kernel=rbf; total time=   8.7s
[CV] END ................................C=0.001, kernel=rbf; total time=   9.1s
[CV] END ...............................C=0.001, kernel=poly; total time=   3.8s
[CV] END ...............................C=0.001, kernel=poly; total time=   3.9s
[CV] END .................................C=10.0, kernel=rbf; total time=   2.3s
[CV] END .................................C=10.0, kernel=rbf; total time=   2.1s
[CV] END ................................C=10.0, kernel=poly; total time=   4.1s
[CV] END ................................C=10.0, kernel=poly; total time=   3.7s
CPU times: user 44.3 s, sys: 665 ms, total: 45 s
Wall time: 46.6 s


GridSearchCV(cv=2,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             n_jobs=-1,
             param_grid={'C': [0.001, 10.0], 'kernel': ['rbf', 'poly']},
             verbose=2)

## Multi-machine parallelism with Dask

![](images/merged_grid_search_graph.svg)

Dask can talk to scikit-learn (via joblib) so that your *cluster* is used to train a model. 

If you run this on a laptop, it will take quite some time, but the CPU usage will be satisfyingly near 100% for the duration. To run faster, you would need a disrtibuted cluster. That would mean putting something in the call to `Client` something like

```
c = Client('tcp://my.scheduler.address:8786')
```

Details on the many ways to create a cluster can be found [here](https://docs.dask.org/en/latest/setup/single-distributed.html).

Let's try it on a larger problem (more hyperparameters).

In [1]:
import joblib
import dask.distributed

# By default this sets up 1 worker per core
c = dask.distributed.Client(n_workers=3)

distributed.diskutils - INFO - Found stale lock file and directory '/home/jovyan/dask-worker-space/worker-mjqd6x24', purging


In [13]:
param_grid = {
    'C': [0.001, 0.1, 1.0, 2.5, 5, 10.0],
    # Uncomment this for larger Grid searches on a cluster
    # 'kernel': ['rbf', 'poly', 'linear'],
    # 'shrinking': [True, False],
}

grid_search = GridSearchCV(estimator, param_grid, verbose=2, cv=5, n_jobs=-1)

In [14]:
%%time
with joblib.parallel_backend("dask", scatter=[X, y]):
    grid_search.fit(X, y)

Fitting 5 folds for each of 6 candidates, totalling 30 fits
[CV] END .............................................C=10.0; total time=   6.2s
[CV] END .............................................C=10.0; total time=   5.9s
[CV] END ................................................C=5; total time=   6.4s
[CV] END ..............................................C=2.5; total time=   6.4s
[CV] END ..............................................C=1.0; total time=   6.5s
[CV] END ..............................................C=0.1; total time=   8.4s
[CV] END ............................................C=0.001; total time=  25.7s
[CV] END .............................................C=10.0; total time=   6.4s
[CV] END ................................................C=5; total time=   6.2s
[CV] END ..............................................C=2.5; total time=   6.4s
[CV] END ..............................................C=1.0; total time=   6.4s
[CV] END ........................................

In [17]:
grid_search.best_params_, grid_search.best_score_

({'C': 10.0}, 0.9119000000000002)

# Training on Large Datasets

Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

In [2]:
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

We'll make a small (random) dataset locally using scikit-learn.

In [3]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

array([[ 1.00796679,  4.34582168,  2.15175661,  1.04337835, -1.82115164,
         2.81149666, -1.18757701,  7.74628882,  9.36761449, -2.20570731,
         5.71142324,  0.41084221,  1.34168817,  8.4568751 , -8.59042755,
        -8.35194302, -9.55383028,  6.68605157,  5.34481483,  7.35044606],
       [ 9.49283024,  6.1422784 , -0.97484846,  5.8604399 , -7.61126963,
         2.86555735, -7.25390288,  8.89609285,  0.33510318, -1.79181328,
        -4.66192239,  5.43323887, -0.86162507,  1.3705568 , -9.7904172 ,
         2.3613231 ,  2.20516237,  2.20604823,  8.76464833,  3.47795068],
       [-2.67206588, -1.30103177,  3.98418492, -8.88040428,  3.27735964,
         3.51616445, -5.81395151, -7.42287114, -3.73476887, -2.89520363,
         1.49435043, -1.35811028,  9.91250767, -7.86133474, -5.78975793,
        -6.54897163,  3.08083281, -5.18975209, -0.85563107, -5.06615534],
       [-6.85980599, -7.87144648,  3.33572279, -7.00394241, -5.97224874,
        -2.55638942,  6.36329802, -7.97988653,  

The small dataset will be the template for our large random dataset.
We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. 

In [6]:
n_samples_per_block = 20000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype=X.dtype)
          for obj in delayeds]
X = da.concatenate(arrays)
X

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,3.05 MiB
Shape,"(10000000, 20)","(20000, 20)"
Count,2000 Tasks,500 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 1.49 GiB 3.05 MiB Shape (10000000, 20) (20000, 20) Count 2000 Tasks 500 Chunks Type float64 numpy.ndarray",20  10000000,

Unnamed: 0,Array,Chunk
Bytes,1.49 GiB,3.05 MiB
Shape,"(10000000, 20)","(20000, 20)"
Count,2000 Tasks,500 Chunks
Type,float64,numpy.ndarray


In [7]:
X = X.persist()  # Only run this on the cluster.

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML.

In [8]:
from dask_ml.cluster import KMeans

In [9]:
clf = KMeans(init_max_iter=3, oversampling_factor=10)

# ATTENTION
The following line will take a lot of time on Binder. **Do it only if you want to wait several minutes.** But it won't explode your memory :)

In [None]:
%time clf.fit(X)



In [None]:
clf.labels_

In [None]:
clf.labels_[:10].compute()