# In this notebook we will demonstrate the use of the joblib package to parallelize a machine learning workflow for text data

## Amazon Fine Food Reviews data has been downloaded from [Kaggle](https://www.kaggle.com/datasets/snap/amazon-fine-food-reviews)

### The data consists of ~500,000 records

In [4]:
import math
from collections import Counter
import re
import itertools

import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.metrics import roc_auc_score
import matplotlib.pyplot as plt
from joblib import Parallel, delayed

In [5]:
# pd.set_option('display.max_rows', 500)
# pd.set_option('display.max_columns', 500)
# pd.set_option('display.width', 1000)
pd.set_option('max_colwidth', 800)

In [6]:
%%time
df = pd.read_csv("/home/ciel/Documents/data/Reviews.csv")

CPU times: user 3.13 s, sys: 354 ms, total: 3.48 s
Wall time: 3.74 s


In [7]:
df.shape

(568454, 10)

In [8]:
df.columns

Index(['Id', 'ProductId', 'UserId', 'ProfileName', 'HelpfulnessNumerator',
       'HelpfulnessDenominator', 'Score', 'Time', 'Summary', 'Text'],
      dtype='object')

In [9]:
df.head()

Unnamed: 0,Id,ProductId,UserId,ProfileName,HelpfulnessNumerator,HelpfulnessDenominator,Score,Time,Summary,Text
0,1,B001E4KFG0,A3SGXH7AUHU8GW,delmartian,1,1,5,1303862400,Good Quality Dog Food,I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than most.
1,2,B00813GRG4,A1D87F6ZCVE5NK,dll pa,0,0,1,1346976000,Not as Advertised,"Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intended to represent the product as ""Jumbo""."
2,3,B000LQOCH0,ABXLMWJIXXAIN,"Natalia Corres ""Natalia Corres""",1,1,4,1219017600,"""Delight"" says it all","This is a confection that has been around a few centuries. It is a light, pillowy citrus gelatin with nuts - in this case Filberts. And it is cut into tiny squares and then liberally coated with powdered sugar. And it is a tiny mouthful of heaven. Not too chewy, and very flavorful. I highly recommend this yummy treat. If you are familiar with the story of C.S. Lewis' ""The Lion, The Witch, and The Wardrobe"" - this is the treat that seduces Edmund into selling out his Brother and Sisters to the Witch."
3,4,B000UA0QIQ,A395BORC6FGVXV,Karl,3,3,2,1307923200,Cough Medicine,If you are looking for the secret ingredient in Robitussin I believe I have found it. I got this in addition to the Root Beer Extract I ordered (which was good) and made some cherry soda. The flavor is very medicinal.
4,5,B006K2ZZ7K,A1UQRSCLF8GW1T,"Michael D. Bigham ""M. Wassir""",0,0,5,1350777600,Great taffy,"Great taffy at a great price. There was a wide assortment of yummy taffy. Delivery was very quick. If your a taffy lover, this is a deal."


In [10]:
df.Text

0                                                                                                                                                                                                                                                               I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.
1                                                                                                                                                                                                                                                                                                                                        Product arrived labeled as Jumbo Salted Peanuts...the peanuts were actually small sized unsalted. Not sure if this was an error or if the vendor intend

In [13]:
df.Score.value_counts().sort_index()

1     52268
2     29769
3     42640
4     80655
5    363122
Name: Score, dtype: int64

### Lengths of texts

In [11]:
text_lens = df.Text.apply(len).quantile([i * 0.1 for i in range(1, 10)] + [0.99, 1.0])
text_lens

0.10      127.0
0.20      161.0
0.30      199.0
0.40      246.0
0.50      302.0
0.60      371.0
0.70      465.0
0.80      606.0
0.90      877.0
0.99     2166.0
1.00    21409.0
Name: Text, dtype: float64

# ML Workflow Examples: identifying opportunities for parallelization

# Expensive pre-processing

#### Remove text regarding shipping experience from review text, to focus solely on customer's review of product details

In [52]:
test_text = "Liked it. The package arrived fast! The cookies were flaky and tender. "

In [53]:
result = \
re.split("([.!?]+)", test_text)

In [54]:
result

['Liked it',
 '.',
 ' The package arrived fast',
 '!',
 ' The cookies were flaky and tender',
 '.',
 ' ']

In [35]:
result = ["".join(result[i:i+2]) for i in range(0, len(result), 2)]

In [43]:
result

['Liked it.',
 ' The package arrived fast!',
 ' The cookies were flaky and tender.',
 ' ']

In [48]:
shipping_pattern = \
"[^!?.]*((shipping|delivery)|\
((arrived|came|delivered)[a-z\s]*(slow|fast|quick)))[^!?.]*[!?.]*"

In [49]:
"".join([re.sub(shipping_pattern, "", s) for s in result])

'Liked it. The cookies were flaky and tender. '

In [55]:
def regex_clean(text, removal_pattern):
    """
    Regex pattern flags sentences for removal from text.
    
    # Arguments
        text: string
        removal_pattern: regex pattern string
        
    # Returns
        string of cleaned text
    """
    # First split text into sentences
    sentences = re.split("([.!?]+)", text)
    sentences = ["".join(sentences[i:i+2]) 
                 for i in range(0, len(sentences), 2)]
    # Filter each sentence
    cleaned = "".join([re.sub(removal_pattern, "", s) 
                       for s in sentences])
    return cleaned

In [56]:
regex_clean(test_text, shipping_pattern)

'Liked it. The cookies were flaky and tender. '

## Without joblib

#### Time the job to see how it scales

In [66]:
%%time
cleaned_text = df.Text.apply(regex_clean, removal_pattern=shipping_pattern)

CPU times: user 13min 20s, sys: 1.41 s, total: 13min 22s
Wall time: 13min 27s


In [None]:
# 1000 records: Wall time: 1.24 s
# 10000 records: Wall time: 12.9 s
# 100000 records: Wall time: 2min 18s
# 568454 records: Wall time: 13min 27s

In [67]:
%%time
cleaned_text = [regex_clean(s, removal_pattern=shipping_pattern) 
                for s in df.Text.values]
# Wall time: 13min 2s

CPU times: user 12min 57s, sys: 1.2 s, total: 12min 58s
Wall time: 13min 2s


# "Embarassingly Simple" for loop
## If I'm just looping over each record in the dataset, why not split this amongst the different cores of my computer?

## Designing joblib specification: number of jobs vs chunk size of each job

#### Job size: My machine has 4 cores. Chunk size: We'll test to see how memory-intensive this task is as we parallelize. How much data to give each core at a time?
#### Testing will help us determine how many workers to parallelize at once, and what size chunks to give them

## Create a function for joblib that takes an iterable input (workers/cores may be fed multiple chunks of data)

#### Joblib returns an iterable of results, one from each worker (order-preserving)
#### We may need to restructure our job to work on an iterable input, if we feed workers chunks of size > 1. If we were to instead feed each record as a separate job (iterating over a Dataframe/Series), we may spawn too many jobs and incur too much overhead

In [68]:
def regex_clean_parts(texts, removal_pattern):
    """
    Wrapper for regex_clean() that takes as input 
    iterable of texts.
    
    # Arguments
        texts: iterable of strings
        removal_pattern: regex pattern string
        
    # Returns
        iterable of strings of cleaned text
    """
    return texts.apply(regex_clean, removal_pattern=removal_pattern)

In [176]:
%%time
num_jobs=4
num_chunks=4
chunk_size = math.ceil(df.shape[0] / num_chunks)
print("Chunk size:", chunk_size)

cleaned_text = \
    Parallel(n_jobs=num_jobs, 
             # backend='loky', 
             prefer='processes', 
             batch_size='auto', 
             verbose=10)(delayed(regex_clean_parts)(t, removal_pattern=shipping_pattern) 
                         for t in [df.Text.iloc[i*chunk_size:(i+1)*chunk_size] 
                                   for i in range(num_chunks)])
# jobs=chunks=4, Wall time: 6min 2s. 
# jobs=4, chunks=16, Wall time: 5min 58s
# jobs=4, chunks=32, Wall time: 5min 46s
# jobs=chunks=8, Wall time: 6min 4s
# jobs=2, chunks=16, Wall time: 7min 18s

Chunk size: 142114


[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   1 tasks      | elapsed:  5.5min
[Parallel(n_jobs=4)]: Done   2 out of   4 | elapsed:  5.5min remaining:  5.5min


CPU times: user 752 ms, sys: 1.52 s, total: 2.27 s
Wall time: 5min 38s


[Parallel(n_jobs=4)]: Done   4 out of   4 | elapsed:  5.6min remaining:    0.0s
[Parallel(n_jobs=4)]: Done   4 out of   4 | elapsed:  5.6min finished


In [177]:
len(cleaned_text)

4

In [178]:
len(cleaned_text[0])

142114

In [179]:
cleaned_text[0][0]

'I have bought several of the Vitality canned dog food products and have found them all to be of good quality. The product looks more like a stew than a processed meat and it smells better. My Labrador is finicky and she appreciates this product better than  most.'

#### Finally aggregate results

In [184]:
%%time
cleaned_text = [t for l in cleaned_text for t in l]

CPU times: user 115 ms, sys: 3.3 ms, total: 118 ms
Wall time: 119 ms


In [185]:
len(cleaned_text)
# 568454

568454

#### Compared to 'each record' version

In [118]:
%%time
num_jobs=4

cleaned_text = \
    Parallel(n_jobs=num_jobs, 
             # backend='loky', 
             prefer='processes', 
             batch_size='auto', 
             verbose=10)(delayed(regex_clean)(t, removal_pattern=shipping_pattern) 
                         for t in df.Text.values)
# Wall time: 6min 4s

[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   5 tasks      | elapsed:    0.7s
[Parallel(n_jobs=4)]: Done  10 tasks      | elapsed:    0.7s
[Parallel(n_jobs=4)]: Batch computation too fast (0.1881s.) Setting batch_size=2.
[Parallel(n_jobs=4)]: Done  17 tasks      | elapsed:    0.7s
[Parallel(n_jobs=4)]: Batch computation too fast (0.0220s.) Setting batch_size=4.
[Parallel(n_jobs=4)]: Done  28 tasks      | elapsed:    0.8s
[Parallel(n_jobs=4)]: Batch computation too fast (0.0446s.) Setting batch_size=8.
[Parallel(n_jobs=4)]: Done  56 tasks      | elapsed:    0.8s
[Parallel(n_jobs=4)]: Batch computation too fast (0.0415s.) Setting batch_size=16.
[Parallel(n_jobs=4)]: Done 116 tasks      | elapsed:    0.9s
[Parallel(n_jobs=4)]: Batch computation too fast (0.0403s.) Setting batch_size=32.
[Parallel(n_jobs=4)]: Done 292 tasks      | elapsed:    1.0s
[Parallel(n_jobs=4)]: Batch computation too fast (0.0973s.) Setting batch_size=64.
[Pa

CPU times: user 8.03 s, sys: 944 ms, total: 8.98 s
Wall time: 6min 4s


[Parallel(n_jobs=4)]: Done 568454 out of 568454 | elapsed:  6.1min finished


# Training anti-example - GBM

#### GBM is inherently difficult to parallelize because each step is dependent upon the previous step in terms of how weights evolve and splits are chosen (ie, mining successive model residuals)

# Training example: hyper-parameter tuning

## Classify review scores into high/low

### Extract target: let Score=5 be "1" and Score < 5 be "0"

In [123]:
y = (df.Score == 5).astype(int)

In [124]:
y.shape

(568454,)

In [126]:
y.value_counts()

1    363122
0    205332
Name: Score, dtype: int64

In [127]:
%%time
X_train, X_val, y_train, y_val = \
train_test_split(df.Text, y, test_size=0.33, random_state=42)

CPU times: user 124 ms, sys: 6.59 ms, total: 130 ms
Wall time: 139 ms


In [128]:
X_train.shape

(380864,)

In [129]:
X_val.shape

(187590,)

In [130]:
y_train.shape

(380864,)

In [131]:
y_val.shape

(187590,)

In [135]:
X_train.iloc[0]

"I buy this exact same product at a large oriental market here in Florida...same size package for $1.49. I think they are fantastic for making sushi summer rolls. Make sure you are rolling them tight enough and don't soak them as long as the package says...as some of the product you add will add more moisture...I actually spritz mine instead of soaking and you can always spritz a little water on them later if need be. These performed for me just as they should."

In [136]:
y_train[0]

1

### Generate some features, using code from https://developers.google.com/machine-learning/guides/text-classification/step-3

#### Top twenty words encoded as features, according to f_classif score

In [137]:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import f_classif

# Vectorization parameters
# Range (inclusive) of n-gram sizes for tokenizing text.
NGRAM_RANGE = (1, 2)

# Limit on the number of features. We use the top 20K features.
TOP_K = 20

# Whether text should be split into word or character n-grams.
# One of 'word', 'char'.
TOKEN_MODE = 'word'

# Minimum document/corpus frequency below which a token will be discarded.
MIN_DOCUMENT_FREQUENCY = 200

def ngram_vectorize(train_texts, train_labels, val_texts):
    """Vectorizes texts as n-gram vectors.

    1 text = 1 tf-idf vector the length of vocabulary of unigrams + bigrams.

    # Arguments
        train_texts: list, training text strings.
        train_labels: np.ndarray, training labels.
        val_texts: list, validation text strings.

    # Returns
        x_train, x_val: vectorized training and validation texts
    """
    # Create keyword arguments to pass to the 'tf-idf' vectorizer.
    kwargs = {
            'ngram_range': NGRAM_RANGE,  # Use 1-grams + 2-grams.
            'dtype': 'int32',
            'strip_accents': 'unicode',
            'decode_error': 'replace',
            'analyzer': TOKEN_MODE,  # Split text into word tokens.
            'min_df': MIN_DOCUMENT_FREQUENCY,
    }
    vectorizer = TfidfVectorizer(**kwargs)

    # Learn vocabulary from training texts and vectorize training texts.
    x_train = vectorizer.fit_transform(train_texts)

    # Vectorize validation texts.
    x_val = vectorizer.transform(val_texts)

    # Select top 'k' of the vectorized features.
    selector = SelectKBest(f_classif, k=min(TOP_K, x_train.shape[1]))
    selector.fit(x_train, train_labels)
    x_train = selector.transform(x_train).astype('float32')
    x_val = selector.transform(x_val).astype('float32')
    return x_train, x_val

In [138]:
%%time
X_train, X_val = ngram_vectorize(X_train, y_train, X_val)



CPU times: user 1min 12s, sys: 10.5 s, total: 1min 23s
Wall time: 1min 23s


In [139]:
X_train.shape

(380864, 20)

In [146]:
X_val.shape

(187590, 20)

## Hyper-parameter Tuning

In [82]:
hp_grid = {'learning_rate': [0.01, 0.001],
           'n_estimators': [100, 500, 1000],
           'subsample': [1.0, 0.8, 0.5],
           'max_depth': [2, 3, 4]
          }

### Generate iterable of hyper-parameter combinations

In [103]:
hp_combos = []
for k in hp_grid.keys():
    hp_combos.append([(k, v) for v in hp_grid[k]])

In [108]:
hp_combos = list(itertools.product(*hp_combos))

## Fit a single model

In [153]:
dict(hp_combos[0])

{'learning_rate': 0.01, 'n_estimators': 100, 'subsample': 1.0, 'max_depth': 2}

In [150]:
my_gbm = GradientBoostingClassifier(**dict(hp_combos[0]), 
                                    min_samples_split=20, 
                                    min_samples_leaf=20)

In [156]:
%%time
my_gbm = my_gbm.fit(X_train, y_train)
# Wall time: 1min 4s

CPU times: user 1min 3s, sys: 122 ms, total: 1min 4s
Wall time: 1min 4s


### Evaluate model on validation set

In [158]:
%%time
pred_val = my_gbm.predict(X_val)

CPU times: user 192 ms, sys: 22 µs, total: 192 ms
Wall time: 191 ms


In [159]:
%%time
val_auc = roc_auc_score(y_val, pred_val)

CPU times: user 56 ms, sys: 10 µs, total: 56 ms
Wall time: 54 ms


In [160]:
val_auc

0.5003981908781411

## Use joblib to fit and evaluate many models, one model per worker

In [161]:
def fit_and_eval(hp_combo, X_train, y_train, X_val, y_val):
    """
    Fit and evaluate a sklearn GBM classifier.
    
    # Arguments
        hp_combo: Iterable of (hyper-parameter 
            string, value) tuples.
        X_train: training data
        y_train: training labels
        X_val: holdout data
        y_val: holdout labels
        
    # Return
        Tuple of (dictionary of model parameters, 
        float for validation AUC score).
    """
    my_gbm = GradientBoostingClassifier(**dict(hp_combo), 
                                        min_samples_split=20, 
                                        min_samples_leaf=20)
    my_gbm = my_gbm.fit(X_train, y_train)
    pred_val = my_gbm.predict(X_val)
    val_auc = roc_auc_score(y_val, pred_val)
    return (hp_combo, val_auc)

In [169]:
len(hp_combos)

54

### 54 combos - at 1 minute each that's about an hour if we iterated over them one at a time

#### Without joblib, looping through

In [174]:
%%time
hp_grid_results0 = []
for hp_combo in hp_combos:
    hp_grid_results_i = fit_and_eval(hp_combo, X_train, y_train, X_val, y_val)
    hp_grid_results0.append(hp_grid_results_i)
# Wall time: 59min 17s

CPU times: user 58min 44s, sys: 3.7 s, total: 58min 48s
Wall time: 59min 17s


In [195]:
len(hp_grid_results0)

54

### With joblib...

In [219]:
def fit_and_eval_parts(hp_list, X_train, y_train, X_val, y_val):
    """
    Fit and evaluate a series of sklearn GBM classifier.
    
    # Arguments
        hp_list: Iterable of iterable of 
            (hyper-parameter, value) tuples.
        X_train: training data
        y_train: training labels
        X_val: holdout data
        y_val: holdout labels
        
    # Return
        List of (dictionary of model parameters, float for 
        validation AUC score).
    """
    hp_grid_results = []
    for hp_combo in hp_list:
        hp_grid_results_i = fit_and_eval(hp_combo, X_train, y_train, X_val, y_val)
        hp_grid_results.append(hp_grid_results_i)
    return hp_grid_results

In [167]:
%%time
num_jobs = 4

hp_grid_results = \
    Parallel(n_jobs=num_jobs, 
             # backend='loky', 
             prefer='processes', 
             batch_size='auto', 
             verbose=10)(delayed(fit_and_eval)(hp_combo, 
                                               X_train=X_train, 
                                               y_train=y_train, 
                                               X_val=X_val, 
                                               y_val=y_val) 
                         for hp_combo in hp_combos)
# 4 jobs, 54 tasks - Wall time: 26min 19s

[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   5 tasks      | elapsed:  3.9min
[Parallel(n_jobs=4)]: Done  10 tasks      | elapsed:  5.9min
[Parallel(n_jobs=4)]: Done  17 tasks      | elapsed:  9.9min
[Parallel(n_jobs=4)]: Done  24 tasks      | elapsed: 12.1min
[Parallel(n_jobs=4)]: Done  33 tasks      | elapsed: 17.7min
[Parallel(n_jobs=4)]: Done  42 tasks      | elapsed: 21.5min


CPU times: user 281 ms, sys: 514 ms, total: 794 ms
Wall time: 26min 19s


[Parallel(n_jobs=4)]: Done  54 out of  54 | elapsed: 26.3min finished


In [170]:
len(hp_grid_results)

54

In [173]:
hp_grid_results[1]

((('learning_rate', 0.01),
  ('n_estimators', 100),
  ('subsample', 1.0),
  ('max_depth', 3)),
 0.5003981908781411)

In [220]:
%%time
num_jobs = 4
num_chunks = 4
chunk_size = math.ceil(len(hp_combos) / num_chunks)

print("Chunk size:", chunk_size)
hp_grid_results = \
    Parallel(n_jobs=num_jobs, 
             # backend='loky', 
             prefer='processes', 
             batch_size='auto', 
             verbose=10)(delayed(fit_and_eval_parts
                                )(hp, 
                                  X_train=X_train, 
                                  y_train=y_train, 
                                  X_val=X_val, 
                                  y_val=y_val) 
                         for hp in [
                             hp_combos[i*chunk_size:(i+1)*chunk_size] 
                             for i in range(num_chunks)])
# 4 jobs, 4 tasks - Wall time: 25min 3s
# 4 jobs, 14 tasks - Wall time: 26min 18s

Chunk size: 4


[Parallel(n_jobs=4)]: Using backend LokyBackend with 4 concurrent workers.
[Parallel(n_jobs=4)]: Done   5 tasks      | elapsed: 14.6min
[Parallel(n_jobs=4)]: Done   9 out of  14 | elapsed: 21.9min remaining: 12.2min
[Parallel(n_jobs=4)]: Done  11 out of  14 | elapsed: 22.0min remaining:  6.0min


CPU times: user 134 ms, sys: 282 ms, total: 416 ms
Wall time: 26min 18s


[Parallel(n_jobs=4)]: Done  14 out of  14 | elapsed: 26.3min finished


In [216]:
len(hp_grid_results)

4

#### Aggregate results

In [217]:
%%time
hp_grid_results = [r for l in hp_grid_results for r in l]

CPU times: user 26 µs, sys: 1 µs, total: 27 µs
Wall time: 36.5 µs


In [218]:
len(hp_grid_results)

54

# A word on scoring. Use built-in functions if they're available to score in batch