# Parallelization Example

In [1]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import numpy as np
import sys
import os
from os.path import join as oj
import time
from joblib import Parallel, delayed
import inspect

sys.path.append('..') # add parent directory to path
from python.fit import fit_rf_loo

In [2]:
# Helper variables
DATA_PATH = oj("..", "data")
n = 30  # Do leave-one-out for the first 30 samples for illustration

### Load data

In this parallelization example, we will be working with gene expression data from women with breast cancer from The Cancer Genome Atlas (TCGA). In particular, we will be using the gene expressions to predict their breast cancer subtype (Luminal A, Luminal B, Basal, Her2, Normal-like). Let's first load in this data.

In [3]:
X = pd.read_csv(oj(DATA_PATH, "X_tcga_cleaned.csv")).values  # Convert to NumPy array
y = pd.read_csv(oj(DATA_PATH, "Y_tcga.csv")).iloc[:, 0].values  # Convert to NumPy array

In [4]:
X

array([[ 9.61452694,  7.40176299, 10.85901081, ...,  0.        ,
         6.60972707,  7.55111676],
       [ 8.72532295,  9.37559946, 10.12843209, ...,  0.        ,
         7.66576669,  7.31031678],
       [ 8.67158522,  7.75682541, 10.69036948, ...,  0.30910104,
         6.42531155,  7.9417606 ],
       ...,
       [ 9.36340306,  8.13355076, 10.27647796, ...,  0.        ,
         6.9925986 ,  6.98624376],
       [ 8.82015369,  5.66191514,  8.57902627, ...,  0.        ,
         6.64379924,  7.58504255],
       [ 9.81865415,  7.82862208,  9.9021779 , ...,  0.        ,
         7.14774132,  7.28772567]])

In [5]:
y

array(['LumA', 'LumB', 'LumA', ..., 'LumA', 'LumA', 'LumA'], dtype=object)

### Fitting leave-one-out models

For the sake of this demonstration, suppose that we want to evaluate the performance of a random forest model on this data using leave-one-out cross-validation (also known as the jackknife in statistics). To do so, there is a helper function `fit_rf_loo()` that takes in the covariate data `X`, the response variable `y`, and the index of the observation to leave out `i`. This function `fit_rf_loo()` fits a random forest model on the data with the `i`-th observation left out and returns the predicted class of the left-out `i`-th observation.

In [6]:
print(inspect.getsource(fit_rf_loo))

def fit_rf_loo(i, x, y, **kwargs):
    # Remove the ith observation for leave-one-out
    x_train = np.delete(x, i, axis=0)
    y_train = np.delete(y, i)

    # Train the RandomForestClassifier
    rf = RandomForestClassifier(**kwargs)
    rf.fit(x_train, y_train)

    # Make a prediction on the left-out observation
    preds = rf.predict(x[i:(i+1), :])[0]
    return preds



### Without Parallelization

Let's first run the leave-one-out cross-validation without parallelization to see how long it takes. We will only run this on the first `n = 30` observations for demonstration purposes.

In [20]:
start_time = time.time()
preds = np.empty(n, dtype=object)  # Vector of leave-one-out predictions
for i in range(n):
    preds[i] = fit_rf_loo(i, X, y)
end_time = time.time()
execution_time = end_time - start_time
print("Execution time without parallelization:", execution_time)

Execution time without parallelization: 145.0246922969818


### With Parallelization using joblib

Next, let's run the leave-one-out cross-validation with parallelization using the `joblib` package. But before implementing this, let's check how many cores are available on this machine using `os.cpu_count()`.

In [8]:
os.cpu_count()

64

Now, to parallelize this code, there are two steps:

**Step 1: Setting up the parallel backend.** This is done using `Parallel(n_jobs=...)`, where we specify the number of cores we would like to use.

**Step 2: Re-write the code using futures (or `delayed()`)** Here, we need to put the code that we want to run in parallel into a single function (luckily, this is already done for us) and wrap that function call using `delayed()`. This essentially creates futures that can be evaluated in parallel.

In [9]:
start_time = time.time()
preds_parallel = Parallel(n_jobs=2)(delayed(fit_rf_loo)(i, X, y) for i in range(n))
end_time = time.time()
execution_time = end_time - start_time
print("Execution time with joblib parallelization:", execution_time)

Execution time with joblib parallelization: 74.90133261680603


### Prediction Results

Let's look at the predictions from the non-parallelized and parallelized implementations


In [22]:
pd.DataFrame({"preds": preds, "preds_parallel": preds_parallel}).head()

Unnamed: 0,preds,preds_parallel
0,LumA,LumA
1,LumA,LumA
2,LumA,LumA
3,LumB,LumB
4,Her2,Her2


In [23]:
np.mean(preds == y[:n])

np.float64(0.8333333333333334)

### Additional Resources/Links

- joblib: https://joblib.readthedocs.io/en/stable/ 
- multiprocessing: https://docs.python.org/3/library/multiprocessing.html 
- Dask: https://www.dask.org/ 
    - Great for handling large datasets and scaling operations from a single machine to a cluster
- Ray: https://www.ray.io/ 
    - Useful for building distributed applications and handling complex parallel tasks across multiple machines
- A great introductory tutorial covering joblib, multiprocessing, and Dask from Thomas Langford (Yale): https://docs.ycrc.yale.edu/parallel_python/#/ 
