# More advanced concepts: Parallel computation and caching

```
Authors: Alexandre Gramfort
         Thomas Moreau
```

The aim of this notebook is:

  - to explain how parallel computation works within scikit-learn
  - how to cache certain computations to save computation time.

For this tutorial we will rely essentially on the [joblib package](https://joblib.readthedocs.io/).

In [1]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings
warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)

In [2]:
import os
from urllib.request import urlretrieve

url = ("https://archive.ics.uci.edu/ml/machine-learning-databases"
       "/adult/adult.data")
local_filename = os.path.basename(url)
if not os.path.exists(local_filename):
    print("Downloading Adult Census datasets from UCI")
    urlretrieve(url, local_filename)

Downloading Adult Census datasets from UCI


In [3]:
names = ("age, workclass, fnlwgt, education, education-num, "
         "marital-status, occupation, relationship, race, sex, "
         "capital-gain, capital-loss, hours-per-week, "
         "native-country, income").split(', ')    
data = pd.read_csv(local_filename, names=names)

y = data['income']
X_df = data.drop('income', axis=1)

In [4]:
X_df.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba


In [5]:
y.value_counts()

 <=50K    24720
 >50K      7841
Name: income, dtype: int64

## Let's construct a full model with a ColumnTransformer

In [7]:
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler, QuantileTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

numeric_features = [c for c in X_df
                    if X_df[c].dtype.kind in ('i', 'f')]
categorical_features = [c for c in X_df
                        if X_df[c].dtype.kind not in ('i', 'f')]

pipeline = make_pipeline(
    make_column_transformer(
        (OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features),
        (StandardScaler(), numeric_features),
    ),
    RandomForestClassifier(max_depth=7, n_estimators=300)
)

cv_scores = cross_val_score(pipeline, X_df, y, scoring='roc_auc', cv=5)
print("CV score:", np.mean(cv_scores))

TypeError: __init__() got an unexpected keyword argument 'sparse_output'

### How to run things in parallel in scikit-learn: The `n_jobs` parameter

In [None]:
%timeit -n1 -r2 cross_val_score(pipeline, X_df, y, scoring='roc_auc', cv=5)

In [None]:
%timeit -n1 -r2 cross_val_score(pipeline, X_df, y, scoring='roc_auc', cv=5, n_jobs=-1)

In [None]:
%%timeit -n1 -r2

pipeline[-1].set_params(n_jobs=-1)
cv_scores = cross_val_score(pipeline, X_df, y, scoring='roc_auc', cv=5, n_jobs=1)

### How to write your own parallel code with joblib

Let's first look at a simple example:

In [None]:
from joblib import Parallel, delayed
from math import sqrt

delayed(sqrt)(2, x=1)

In [None]:
import os

def f(x):
    print("working on", os.getpid())
    return 2 * x

print("Main process in ", os.getpid())
Parallel(n_jobs=2)(
    delayed(f)(i) for i in range(5)
)
# [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Let's now do a full cross-validation in parallel:

In [None]:
from sklearn.base import clone

def _fit_score(model, X, y, train_idx, test_idx):
    X_train = X.iloc[train_idx]
    X_test = X.iloc[test_idx]
    y_train = y.iloc[train_idx]
    y_test = y.iloc[test_idx]
    model = clone(model)
    model.fit(X_train, y_train)
    return model.score(X_test, y_test)

In [None]:
from sklearn.model_selection import StratifiedKFold

n_jobs = 1

cv = StratifiedKFold(n_splits=5)

scores = Parallel(n_jobs=n_jobs)(delayed(_fit_score)(
    pipeline, X_df, y, train_idx, test_idx
) for train_idx, test_idx in cv.split(X_df, y))

print(scores)

### How about caching?

Something you want to avoid redoing again and again the same computations.
One classical solution to address this is called function [memoization](https://en.wikipedia.org/wiki/Memoization).

joblib offers a very trivial way to do using a simple Python decorator.

In [None]:
from joblib import Memory

location = '.'
mem = Memory(location, verbose=0)
mem.clear()  # make sure there is not left over cache from previous run

_fit_score_cached = mem.cache(_fit_score)

def evaluate_model():
    scores = Parallel(n_jobs=n_jobs)(delayed(_fit_score_cached)(
        pipeline, X_df, y, train_idx, test_idx
    ) for train_idx, test_idx in cv.split(X_df, y))
    print(scores)

%timeit -n1 -r1 evaluate_model()

In [None]:
%timeit -n1 -r1 evaluate_model()

Certain transformer objects in scikit-learn have a memory parameter. This allows to cache their computation for example to avoid rerunning the same preprocessing in a grid-search when tuning the classifier or regressor at the end of the pipeline.

To go further you can also look how joblib can be used in combination with [dask-distributed](http://distributed.dask.org/en/stable/) to run computations across different machines or with [dask-jobqueue](http://jobqueue.dask.org/en/latest/) to use a cluster with a queuing system like `slurm`.

In [None]:
from sklearn.pipeline import Pipeline
Pipeline?