# Dask for Machine Learning

This is a high-level overview demonstrating some the components of Dask-ML.
Visit the main [Dask-ML](http://ml.dask.org) documentation, see the [dask tutorial](https://github.com/dask/dask-tutorial) notebook 08, or explore some of the other machine-learning examples.

In [3]:
# from dask.distributed import Client, progress
# client = Client(processes=False, threads_per_worker=4,
#                 n_workers=1, memory_limit='2GB')
# clients

from tools import init_cluster

In [4]:
cluster, client = init_cluster()

In [5]:
cluster

VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n    …

### Create Scikit-Learn Estimator

In [6]:
from sklearn.datasets import make_classification
from sklearn.svm import SVC
from sklearn.model_selection import GridSearchCV
import pandas as pd

We'll use scikit-learn to create a pair of small random arrays, one for the features `X`, and one for the target `y`.

In [7]:
X, y = make_classification(n_samples=1000, random_state=0)
X[:5]

array([[-1.06377997,  0.67640868,  1.06935647, -0.21758002,  0.46021477,
        -0.39916689, -0.07918751,  1.20938491, -0.78531472, -0.17218611,
        -1.08535744, -0.99311895,  0.30693511,  0.06405769, -1.0542328 ,
        -0.52749607, -0.0741832 , -0.35562842,  1.05721416, -0.90259159],
       [ 0.0708476 , -1.69528125,  2.44944917, -0.5304942 , -0.93296221,
         2.86520354,  2.43572851, -1.61850016,  1.30071691,  0.34840246,
         0.54493439,  0.22532411,  0.60556322, -0.19210097, -0.06802699,
         0.9716812 , -1.79204799,  0.01708348, -0.37566904, -0.62323644],
       [ 0.94028404, -0.49214582,  0.67795602, -0.22775445,  1.40175261,
         1.23165333, -0.77746425,  0.01561602,  1.33171299,  1.08477266,
        -0.97805157, -0.05012039,  0.94838552, -0.17342825, -0.47767184,
         0.76089649,  1.00115812, -0.06946407,  1.35904607, -1.18958963],
       [-0.29951677,  0.75988955,  0.18280267, -1.55023271,  0.33821802,
         0.36324148, -2.10052547, -0.4380675 , -

We'll fit a [Support Vector Classifier](http://scikit-learn.org/stable/modules/generated/sklearn.svm.SVC.html), using [grid search](http://scikit-learn.org/stable/modules/generated/sklearn.model_selection.GridSearchCV.html) to find the best value of the $C$ hyperparameter.

In [8]:
import numpy as np

In [9]:
C = np.logspace(0.001, 10, 40)

In [10]:
C

array([1.00230524e+00, 1.80877529e+00, 3.26414341e+00, 5.89052287e+00,
       1.06301272e+01, 1.91832893e+01, 3.46184558e+01, 6.24729922e+01,
       1.12739713e+02, 2.03451801e+02, 3.67152216e+02, 6.62568476e+02,
       1.19568116e+03, 2.15774441e+03, 3.89389840e+03, 7.02698831e+03,
       1.26810100e+04, 2.28843437e+04, 4.12974349e+04, 7.45259794e+04,
       1.34490717e+05, 2.42703994e+05, 4.37987321e+05, 7.90398585e+05,
       1.42636531e+06, 2.57404055e+06, 4.64515275e+06, 8.38271335e+06,
       1.51275721e+07, 2.72994468e+07, 4.92649971e+07, 8.89043635e+07,
       1.60438167e+08, 2.89529158e+08, 5.22488725e+08, 9.42891104e+08,
       1.70155564e+09, 3.07065320e+09, 5.54134749e+09, 1.00000000e+10])

In [11]:
param_grid = {"C": C,
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           iid=True,
                           cv=3,
                           n_jobs=1)

To fit that normally, we would call

```python
grid_search.fit(X, y)
```

To fit it using the cluster, we just need to use a context manager provided by joblib.

In [12]:
%%time
grid_search.fit(X, y)

CPU times: user 40.7 s, sys: 9.32 ms, total: 40.7 s
Wall time: 40.7 s




GridSearchCV(cv=3,
             estimator=SVC(gamma='auto', probability=True, random_state=0),
             iid=True, n_jobs=1,
             param_grid={'C': array([1.00230524e+00, 1.80877529e+00, 3.26414341e+00, 5.89052287e+00,
       1.06301272e+01, 1.91832893e+01, 3.46184558e+01, 6.24729922e+01,
       1.12739713e+02, 2.03451801e+02, 3.67152216e+02, 6.62568476e+02,
       1.19568116e+03, 2.15774441e+03, 3.89389840e+03, 7.0269...
       1.34490717e+05, 2.42703994e+05, 4.37987321e+05, 7.90398585e+05,
       1.42636531e+06, 2.57404055e+06, 4.64515275e+06, 8.38271335e+06,
       1.51275721e+07, 2.72994468e+07, 4.92649971e+07, 8.89043635e+07,
       1.60438167e+08, 2.89529158e+08, 5.22488725e+08, 9.42891104e+08,
       1.70155564e+09, 3.07065320e+09, 5.54134749e+09, 1.00000000e+10]),
                         'kernel': ['rbf', 'poly', 'sigmoid'],
                         'shrinking': [True, False]})

In [13]:
param_grid = {"C": C,
              "kernel": ['rbf', 'poly', 'sigmoid'],
              "shrinking": [True, False]}

grid_search = GridSearchCV(SVC(gamma='auto', random_state=0, probability=True),
                           param_grid=param_grid,
                           return_train_score=False,
                           iid=True,
                           cv=3,
                           n_jobs=-1)

In [14]:
import joblib

In [15]:
%%time
with joblib.parallel_backend('dask'):
    grid_search.fit(X, y)

CPU times: user 1.39 s, sys: 94.6 ms, total: 1.49 s
Wall time: 3.18 s




We fit 48 different models, one for each hyper-parameter combination in `param_grid`, distributed across the cluster. At this point, we have a regular scikit-learn model, which can be used for prediction, scoring, etc.

In [16]:
pd.DataFrame(grid_search.cv_results_).head()

Unnamed: 0,mean_fit_time,std_fit_time,mean_score_time,std_score_time,param_C,param_kernel,param_shrinking,params,split0_test_score,split1_test_score,split2_test_score,mean_test_score,std_test_score,rank_test_score
0,0.096794,0.00228,0.006406,0.000988,1.00231,rbf,True,"{'C': 1.0023052380778996, 'kernel': 'rbf', 'sh...",0.952096,0.93994,0.954955,0.949,0.006507,5
1,0.081441,0.010529,0.005748,0.000605,1.00231,rbf,False,"{'C': 1.0023052380778996, 'kernel': 'rbf', 'sh...",0.952096,0.93994,0.954955,0.949,0.006507,5
2,0.079572,0.01482,0.005306,0.001139,1.00231,poly,True,"{'C': 1.0023052380778996, 'kernel': 'poly', 's...",0.946108,0.927928,0.957958,0.944,0.012344,83
3,0.078707,0.011469,0.005277,0.001665,1.00231,poly,False,"{'C': 1.0023052380778996, 'kernel': 'poly', 's...",0.946108,0.927928,0.957958,0.944,0.012344,83
4,0.062117,0.002798,0.005501,0.000837,1.00231,sigmoid,True,"{'C': 1.0023052380778996, 'kernel': 'sigmoid',...",0.952096,0.924925,0.957958,0.945,0.014385,81


In [17]:
grid_search.predict(X)[:5]

array([0, 1, 1, 1, 0])

In [18]:
grid_search.score(X, y)

0.999

For more on training scikit-learn models with distributed joblib, see the [dask-ml documentation](http://dask-ml.readthedocs.io/en/latest/joblib.html).

## Training on Large Datasets

Most estimators in scikit-learn are designed to work on in-memory arrays. Training with larger datasets may require different algorithms.

All of the algorithms implemented in Dask-ML work well on larger than memory datasets, which you might store in a [dask array](http://dask.pydata.org/en/latest/array.html) or [dataframe](http://dask.pydata.org/en/latest/dataframe.html).

In [19]:
%matplotlib inline

In [20]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

ModuleNotFoundError: No module named 'dask_ml'

In this example, we'll use `dask_ml.datasets.make_blobs` to generate some random *dask* arrays.

In [None]:
X, y = dask_ml.datasets.make_blobs(n_samples=10000000,
                                   chunks=1000000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X

We'll use the k-means implemented in Dask-ML to cluster the points. It uses the `k-means||` (read: "k-means parallel") initialization algorithm, which scales better than `k-means++`. All of the computation, both during and after initialization, can be done in parallel.

In [None]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

We'll plot a sample of points, colored by the cluster each falls into.

In [None]:
fig, ax = plt.subplots()
ax.scatter(X[::10000, 0], X[::10000, 1], marker='.', c=km.labels_[::10000],
           cmap='viridis', alpha=0.25);

For all the estimators implemented in Dask-ML, see the [API documentation](http://dask-ml.readthedocs.io/en/latest/modules/api.html).