# More advanced concepts: Parallel computation and caching

Authors: [Alexandre Gramfort](http://alexandre.gramfort.net), [Thomas Moreau](https://tommoral.github.io/about.html), and [Pedro L. C. Rodrigues](https://plcrodrigues.github.io/).

The aim of this notebook is:

  - to explain how parallel computation works within scikit-learn
  - how to cache certain computations to save computation time.

For this tutorial we will rely essentially on the [joblib package](https://joblib.readthedocs.io/).

In [1]:
%matplotlib inline
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)
warnings.simplefilter(action="ignore", category=UserWarning)

In [2]:
import os
from urllib.request import urlretrieve

url = (
    "https://archive.ics.uci.edu/ml/machine-learning-databases"
    "/adult/adult.data"
)
local_filename = os.path.basename(url)
if not os.path.exists(local_filename):
    print("Downloading Adult Census datasets from UCI")
    urlretrieve(url, local_filename)

Downloading Adult Census datasets from UCI


In [3]:
names = (
    "age, workclass, fnlwgt, education, education-num, "
    "marital-status, occupation, relationship, race, sex, "
    "capital-gain, capital-loss, hours-per-week, "
    "native-country, income"
).split(", ")
data = pd.read_csv(local_filename, names=names)

y = data["income"]
X_df = data.drop("income", axis=1)

In [4]:
X_df.head()

Unnamed: 0,age,workclass,fnlwgt,education,education-num,marital-status,occupation,relationship,race,sex,capital-gain,capital-loss,hours-per-week,native-country
0,39,State-gov,77516,Bachelors,13,Never-married,Adm-clerical,Not-in-family,White,Male,2174,0,40,United-States
1,50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States
2,38,Private,215646,HS-grad,9,Divorced,Handlers-cleaners,Not-in-family,White,Male,0,0,40,United-States
3,53,Private,234721,11th,7,Married-civ-spouse,Handlers-cleaners,Husband,Black,Male,0,0,40,United-States
4,28,Private,338409,Bachelors,13,Married-civ-spouse,Prof-specialty,Wife,Black,Female,0,0,40,Cuba


In [5]:
y.value_counts()

income
<=50K    24720
>50K      7841
Name: count, dtype: int64

## Let's construct a full model with a ColumnTransformer

In [6]:
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler, QuantileTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import cross_val_score

numeric_features = [c for c in X_df if X_df[c].dtype.kind in ("i", "f")]
categorical_features = [c for c in X_df if X_df[c].dtype.kind not in ("i", "f")]

pipeline = make_pipeline(
    make_column_transformer(
        (
            OneHotEncoder(handle_unknown="ignore", sparse_output=False),
            categorical_features,
        ),
        (StandardScaler(), numeric_features),
    ),
    RandomForestClassifier(max_depth=7, n_estimators=300),
)

cv_scores = cross_val_score(pipeline, X_df, y, scoring="roc_auc", cv=5)
print("CV score:", np.mean(cv_scores))

CV score: 0.905280194469601


### How to run things in parallel in scikit-learn: The `n_jobs` parameter

In [7]:
%timeit -n1 -r2 cross_val_score(pipeline, X_df, y, scoring='roc_auc', cv=5)

22.8 s ± 720 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)


In [8]:
%timeit -n1 -r2 cross_val_score(pipeline, X_df, y, scoring='roc_auc', cv=5, n_jobs=-1)

12.4 s ± 3.63 s per loop (mean ± std. dev. of 2 runs, 1 loop each)


In [9]:
%%timeit -n1 -r2

pipeline[-1].set_params(n_jobs=-1)
cv_scores = cross_val_score(
    pipeline, X_df, y, scoring="roc_auc", cv=5, n_jobs=1
)

6.97 s ± 83 ms per loop (mean ± std. dev. of 2 runs, 1 loop each)


### How to write your own parallel code with joblib

Let's first look at a simple example:

In [10]:
from joblib import Parallel, delayed
from math import sqrt

delayed(sqrt)(2, x=1)

(<function math.sqrt(x, /)>, (2,), {'x': 1})

In [11]:
import os


def f(x):
    print("working on", os.getpid())
    return 2 * x


print("Main process in ", os.getpid())
Parallel(n_jobs=2)(delayed(f)(i) for i in range(5))
# [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

Main process in  83837
working on 85042
working on 85043
working on 85042
working on 85042
working on 85042


[0, 2, 4, 6, 8]

Let's now do a full cross-validation in parallel:

In [12]:
from sklearn.base import clone


def _fit_score(model, X, y, train_idx, test_idx):
    X_train = X.iloc[train_idx]
    X_test = X.iloc[test_idx]
    y_train = y.iloc[train_idx]
    y_test = y.iloc[test_idx]
    model = clone(model)
    model.fit(X_train, y_train)
    return model.score(X_test, y_test)

In [13]:
from sklearn.model_selection import StratifiedKFold

n_jobs = 1

cv = StratifiedKFold(n_splits=5)
# Parallel arg: return_as="generator"
scores = Parallel(n_jobs=n_jobs)(
    delayed(_fit_score)(pipeline, X_df, y, train_idx, test_idx)
    for train_idx, test_idx in cv.split(X_df, y)
)

print(scores)

[0.8441578381698143, 0.8478194103194103, 0.8504299754299754, 0.8502764127764127, 0.8499692874692875]


### How about caching?

Something you want to avoid redoing again and again the same computations.
One classical solution to address this is called function [memoization](https://en.wikipedia.org/wiki/Memoization).

joblib offers a very trivial way to do using a simple Python decorator.

In [14]:
from joblib import Memory

location = "."
mem = Memory(location, verbose=0)
mem.clear()  # make sure there is not left over cache from previous run

_fit_score_cached = mem.cache(_fit_score)


def evaluate_model():
    scores = Parallel(n_jobs=n_jobs)(
        delayed(_fit_score_cached)(pipeline, X_df, y, train_idx, test_idx)
        for train_idx, test_idx in cv.split(X_df, y)
    )
    print(scores)


%timeit -n1 -r1 evaluate_model()

[Memory(location=./joblib)]: Flushing completely the cache


[0.8435436818670352, 0.8493550368550369, 0.8504299754299754, 0.8521191646191646, 0.851044226044226]
10.2 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


In [15]:
%timeit -n1 -r1 evaluate_model()

[0.8435436818670352, 0.8493550368550369, 0.8504299754299754, 0.8521191646191646, 0.851044226044226]
2.14 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


Certain transformer objects in scikit-learn have a memory parameter. This allows to cache their computation for example to avoid rerunning the same preprocessing in a grid-search when tuning the classifier or regressor at the end of the pipeline.

To go further you can also look how joblib can be used in combination with [dask-distributed](http://distributed.dask.org/en/stable/) to run computations across different machines or with [dask-jobqueue](http://jobqueue.dask.org/en/latest/) to use a cluster with a queuing system like `slurm`.

In [16]:
from sklearn.pipeline import Pipeline

Pipeline?

[0;31mInit signature:[0m [0mPipeline[0m[0;34m([0m[0msteps[0m[0;34m,[0m [0;34m*[0m[0;34m,[0m [0mtransform_input[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mmemory[0m[0;34m=[0m[0;32mNone[0m[0;34m,[0m [0mverbose[0m[0;34m=[0m[0;32mFalse[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m     
A sequence of data transformers with an optional final predictor.

`Pipeline` allows you to sequentially apply a list of transformers to
preprocess the data and, if desired, conclude the sequence with a final
:term:`predictor` for predictive modeling.

Intermediate steps of the pipeline must be transformers, that is, they
must implement `fit` and `transform` methods.
The final :term:`estimator` only needs to implement `fit`.
The transformers in the pipeline can be cached using ``memory`` argument.

The purpose of the pipeline is to assemble several steps that can be
cross-validated together while setting different parameters. For this, it
enables setting param