Scale Scikit-Learn for Small Data Problems
==========================================

Dask can be used to scale scikit-learn to a cluster of machines for a CPU-bound problem.
We will be using a local cluster with 4 workers, each with 1 thread.

In [None]:
from dask.distributed import Client, progress

In [None]:
import os
# The jupyter notebook is launched from your $HOME directory.
# Change the working directory to the workshop directory
# which was created in your username directory under /scratch/vp91
os.chdir(os.path.expandvars("/scratch/vp91/$USER/"))

In [None]:
client = Client(n_workers=4, threads_per_worker=1, memory_limit='2GB')
client

## Distributed Training


Scikit-learn uses [joblib](http://joblib.readthedocs.io/) for single-machine parallelism. This lets you train most estimators (anything that accepts an `n_jobs` parameter) using all the cores of your laptop or workstation.

Alternatively, Scikit-Learn can use Dask for parallelism.  This lets you train those estimators using all the cores of your *cluster* without significantly changing your code.  This is most useful for training large models on medium-sized datasets. 

### Create Scikit-Learn Pipeline

In [None]:
from pprint import pprint
from time import time
import logging

from sklearn.datasets import fetch_20newsgroups
from sklearn.feature_extraction.text import HashingVectorizer
from sklearn.feature_extraction.text import TfidfTransformer
from sklearn.linear_model import SGDClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.pipeline import Pipeline

In [None]:
# Scale Up: set categories=None to use all the categories
categories = [
    'alt.atheism',
    'talk.religion.misc',
]

print("Loading 20 newsgroups dataset for categories:")
print(categories)

data = fetch_20newsgroups(subset='train', categories=categories)
print("%d documents" % len(data.filenames))
print("%d categories" % len(data.target_names))
print()

We'll define a small pipeline that combines text feature extraction with a simple classifier.

In [None]:
pipeline = Pipeline([
    ('vect', HashingVectorizer()),
    ('tfidf', TfidfTransformer()),
    ('clf', SGDClassifier(max_iter=1000)),
])

In [None]:
pipeline

### Define Grid for Parameter Search
Grid search over some parameters.
GridSearchCV is a technique for finding the optimal parameter values from a given set of parameters in a grid. 

In [None]:
parameters = {
    'tfidf__use_idf': (True, False),
    'tfidf__norm': ('l1', 'l2'),
    'clf__alpha': (0.00001, 0.000001),
    # 'clf__penalty': ('l2', 'elasticnet'),
    # 'clf__n_iter': (10, 50, 80),
}

In [None]:
grid_search = GridSearchCV(pipeline, parameters, n_jobs=-1, verbose=1, cv=3, refit=False)

To fit this normally, we would write


```python
grid_search.fit(data.data, data.target)
```

That would use the default joblib backend (multiple processes) for parallelism.
To use the Dask distributed backend, which will use a cluster of machines to train the model, perform the fit in a `parallel_backend` context.

In [None]:
import joblib

with joblib.parallel_backend('dask'):
    grid_search.fit(data.data, data.target)

In [None]:
grid_search.best_score_

In [None]:
grid_search.best_params_

In [None]:
import pandas as pd
cv_results = pd.DataFrame(grid_search.cv_results_)
cv_results.head()

Score and Predict Large Datasets
================================

Sometimes you'll train on a smaller dataset that fits in memory, but need to predict or score for a much larger (possibly larger than memory) dataset.
Perhaps your [learning curve](http://scikit-learn.org/stable/modules/learning_curve.html) has leveled off, or you only have labels for a subset of the data.

In this situation, you can use [ParallelPostFit](http://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html) to parallelize and distribute the scoring or prediction steps.

In [None]:
from dask.distributed import Client, progress

client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

In [None]:
import numpy as np
import dask.array as da
from sklearn.datasets import make_classification

We'll generate a small random dataset with scikit-learn.

In [None]:
X_train, y_train = make_classification(
    n_features=2, n_redundant=0, n_informative=2,
    random_state=1, n_clusters_per_class=1, n_samples=1000)
X_train[:5]

And we'll clone that dataset many times with `dask.array`. `X_large` and `y_large` represent our larger than memory dataset.

In [None]:
# Scale up: increase N, the number of times we replicate the data.
N = 100
X_large = da.concatenate([da.from_array(X_train, chunks=X_train.shape)
                          for _ in range(N)])
y_large = da.concatenate([da.from_array(y_train, chunks=y_train.shape)
                          for _ in range(N)])
X_large

Since our training dataset fits in memory, we can use a scikit-learn estimator as the actual estimator fit during traning.
But we know that we'll want to predict for a large dataset, so we'll wrap the scikit-learn estimator with `ParallelPostFit`.

In [None]:
from sklearn.linear_model import LogisticRegressionCV
from dask_ml.wrappers import ParallelPostFit

In [None]:
clf = ParallelPostFit(LogisticRegressionCV(cv=3), scoring="r2")

See the note in the `dask-ml`'s documentation about when and why a `scoring` parameter is needed: https://ml.dask.org/modules/generated/dask_ml.wrappers.ParallelPostFit.html#dask_ml.wrappers.ParallelPostFit.

Now we'll call `clf.fit`. Dask-ML does nothing here, so this step can only use datasets that fit in memory.

In [None]:
clf.fit(X_train, y_train)

Now that training is done, we'll turn to predicting for the full (larger than memory) dataset.

In [None]:
y_pred = clf.predict(X_large)
y_pred

`y_pred` is a Dask array.
Workers can write the predicted values to a shared file system, without ever having to collect the data on a single machine.

Or we can check the models score on the entire large dataset.
The computation will be done in parallel, and no single machine will have to hold all the data.

In [None]:
clf.score(X_large, y_large)

Incrementally Train Large Datasets
==================================

In [None]:
from dask.distributed import Client
client = Client(n_workers=4, threads_per_worker=1)
client

## Create Data

We create a synthetic dataset that is large enough to be interesting, but small enough to run quickly.  

Our dataset has 1,000,000 examples and 100 features.

In [None]:
import dask
import dask.array as da
from dask_ml.datasets import make_classification


n, d = 100000, 100

X, y = make_classification(n_samples=n, n_features=d,
                           chunks=n // 10, flip_y=0.2)
X

## Split data for training and testing

We split our dataset into training and testing data to aid evaluation by making sure we have a fair test:

In [None]:
from dask_ml.model_selection import train_test_split
X_train, X_test, y_train, y_test = train_test_split(X, y)
X_train

## Persist data in memory

This dataset is small enough to fit in distributed memory, so we call `dask.persist` to ask Dask to execute the computations above and keep the results in memory.

In [None]:
X_train, X_test, y_train, y_test = dask.persist(X_train, X_test, y_train, y_test)

If you are working in a situation where your dataset does not fit in memory then you should skip this step.  Everything will still work, but will be slower and use less memory.

Calling `dask.persist` will preserve our data in memory, so no computation will be needed as we pass over our data many times.  For example if our data came from CSV files and was not persisted, then the CSV files would have to be re-read on each pass.  This is desirable if the data does not fit in RAM, but not slows down our computation otherwise.

## Precompute classes

We pre-compute the classes from our training data, which is required for this classification example:

In [None]:
classes = da.unique(y_train).compute()
classes

## Create Scikit-Learn model

We make the underlying Scikit-Learn estimator, an `SGDClassifier`:

In [None]:
from sklearn.linear_model import SGDClassifier

est = SGDClassifier(loss='squared_error', penalty='l2', tol=1e-3)

Here we use `SGDClassifier`, but any estimator that implements the `partial_fit` method will work.  A list of Scikit-Learn models that implement this API is available [here](https://scikit-learn.org/stable/computing/scaling_strategies.html#incremental-learning).


## Wrap with Dask-ML's Incremental meta-estimator

We now wrap our `SGDClassifer` with the [`dask_ml.wrappers.Incremental`](http://ml.dask.org/modules/generated/dask_ml.wrappers.Incremental.html#dask_ml.wrappers.Incremental) meta-estimator.

In [None]:
from dask_ml.wrappers import Incremental

inc = Incremental(est, scoring='accuracy')

`Incremental` only does data management while leaving the actual algorithm to the underlying Scikit-Learn estimator.

Note: We set the scoring parameter above in the Dask estimator to tell it to handle scoring.  This works better when using Dask arrays for test data.

## Model training

`Incremental` implements a `fit` method, which will perform one loop over the dataset, calling `partial_fit` over each chunk in the Dask array.

You may want to watch the dashboard during this fit process to see the sequential fitting of many batches.

In [None]:
inc.fit(X_train, y_train, classes=classes)

In [None]:
inc.score(X_test, y_test)

Train Models on Large Datasets
==============================

Most estimators in scikit-learn are designed to work with NumPy arrays or scipy sparse matricies.
These data structures must fit in the RAM on a single machine.

Estimators implemented in Dask-ML work well with Dask Arrays and DataFrames. This can be much larger than a single machine's RAM. They can be distributed in memory on a cluster of machines.

In [None]:
from dask.distributed import Client

# Scale up: connect to your own cluster with more resources
# see http://dask.pydata.org/en/latest/setup.html
client = Client(processes=False, threads_per_worker=4,
                n_workers=1, memory_limit='2GB')
client

In [None]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In this example, we'll use `dask_ml.datasets.make_blobs` to generate some random *dask* arrays.

In [None]:
# Scale up: increase n_samples or n_features
X, y = dask_ml.datasets.make_blobs(n_samples=1000000,
                                   chunks=100000,
                                   random_state=0,
                                   centers=3)
X = X.persist()
X

We'll use the k-means implemented in Dask-ML to cluster the points. It uses the `k-means||` (read: "k-means parallel") initialization algorithm, which scales better than `k-means++`. All of the computation, both during and after initialization, can be done in parallel.

In [None]:
km = dask_ml.cluster.KMeans(n_clusters=3, init_max_iter=2, oversampling_factor=10)
km.fit(X)

We'll plot a sample of points, colored by the cluster each falls into.

In [None]:
fig, ax = plt.subplots()
ax.scatter(X[::1000, 0], X[::1000, 1], marker='.', c=km.labels_[::1000],
           cmap='viridis', alpha=0.25);