# Parallelize Sci-Kit Learn Models & Functions

In [1]:
import pandas as pd
import numpy as np
import random

from random import randint
from multiprocessing import Pool
from multiprocessing import cpu_count
from functools import partial

from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.metrics import classification_report
from sklearn.model_selection import cross_val_score

import warnings
warnings.filterwarnings("ignore")

## Generate Data

In [2]:
def generate_data(n_books = 3000, n_genres = 10, n_authors = 450, n_publishers = 50, n_readers = 30000, dataset_size = 100000):
    '''
    This function will generate a dataset with features associated to
    book data set. The dataset will have the following columns : 
        - book_id (String) : Unique identified for the book
        - book_rating (Integer) : A value between 0 and 10
        - reader_id (String) : Unique identifier for the user
        - book_genre (Integer) : An integer representing a genre for the book, 
                                 value is between 1 and 15, indicating that 
                                 there are 15 unique genres. Each book can only
                                 have 1 genre
        - author_id (String) : Unique identifier for the author of the book
        - num_pages (Integer) : Random value between 70 and 500
        - publisher_id (String) : A unique identifier for the publisher of the book
        - publish_year (Integer) : The year of book publishing
        - book_price (Integer) : The sale price of the book
        - text_lang (Integer) : The language of the book - returns an integer which 
                                is mapped to some language
        
    params:
        n_books (Integer) : The number of books you want the dataset to have
        n_genres (Integer) : Number of genres to be chosen from
        n_authors (Integer) : Number of authors to be generated
        n_publishers (Integer) : Number of publishers for the dataset
        n_readers (Integer) : Number of readers for the dataset
        dataset_size (Integer) : The number of rows to be generated 
        
    example:
        data = generate_data()
    '''
    
    d = pd.DataFrame(
        {
            'book_id' : [randint(1, n_books) for _ in range(dataset_size)],
            'author_id' : [randint(1, n_authors) for _ in range(dataset_size)],
            'book_genre' : [randint(1, n_genres) for _ in range(dataset_size)],
            'reader_id' : [randint(1, n_readers) for _ in range(dataset_size)],
            'num_pages' : [randint(75, 700) for _ in range(dataset_size)],
            'book_rating' : [randint(1, 10) for _ in range(dataset_size)],
            'publisher_id' : [randint(1, n_publishers) for _ in range(dataset_size)],
            'publish_year' : [randint(2000, 2021) for _ in range(dataset_size)],
            'book_price' : [randint(1, 200) for _ in range(dataset_size)],
            'text_lang' : [randint(1,7) for _ in range(dataset_size)]
        }
    ).drop_duplicates()
    return d
  
d = generate_data(dataset_size = 100000)
# d.to_csv('data.csv', index = False)

In [3]:
d.head()

Unnamed: 0,book_id,author_id,book_genre,reader_id,num_pages,book_rating,publisher_id,publish_year,book_price,text_lang
0,2397,356,2,24324,365,8,25,2008,56,3
1,1797,209,7,21066,679,8,17,2009,77,4
2,1284,381,7,20818,298,10,21,2007,180,7
3,2376,416,2,28382,511,7,32,2020,164,4
4,2927,272,2,28206,255,3,44,2010,135,1


## Train Model Normally

In [4]:
genre_features = ['num_pages', 'book_rating', 'book_price', 'text_lang']
genre_target = 'book_genre'

In [5]:
x = d[genre_features].values
y = d[genre_target].values

X_train, X_test, y_train, y_test = train_test_split(
    x, 
    y, 
    test_size = 0.3
)

In [6]:
%%time
print("Training Model")
# instantiate the model (using the default parameters)
gen_mdl = GradientBoostingClassifier()

# fit the model with data
gen_mdl.fit(X_train, y_train)

Training Model
CPU times: user 54.4 s, sys: 71.9 ms, total: 54.5 s
Wall time: 54.7 s


GradientBoostingClassifier(ccp_alpha=0.0, criterion='friedman_mse', init=None,
                           learning_rate=0.1, loss='deviance', max_depth=3,
                           max_features=None, max_leaf_nodes=None,
                           min_impurity_decrease=0.0, min_impurity_split=None,
                           min_samples_leaf=1, min_samples_split=2,
                           min_weight_fraction_leaf=0.0, n_estimators=100,
                           n_iter_no_change=None, presort='deprecated',
                           random_state=None, subsample=1.0, tol=0.0001,
                           validation_fraction=0.1, verbose=0,
                           warm_start=False)

In [7]:
##perform classification and prediction on samples in tf_test
predicted_mdl = gen_mdl.predict(X_test)

print(classification_report(y_test, predicted_mdl))
report = classification_report(y_test, predicted_mdl, output_dict=True)

              precision    recall  f1-score   support

           1       0.10      0.08      0.09      2899
           2       0.09      0.09      0.09      2974
           3       0.11      0.08      0.09      2990
           4       0.11      0.10      0.10      3025
           5       0.10      0.13      0.11      2967
           6       0.10      0.12      0.11      3024
           7       0.10      0.09      0.09      2993
           8       0.10      0.08      0.09      3026
           9       0.09      0.10      0.10      3037
          10       0.11      0.13      0.12      3065

    accuracy                           0.10     30000
   macro avg       0.10      0.10      0.10     30000
weighted avg       0.10      0.10      0.10     30000



## Train Model Parallel

In [8]:
lang_features = ['num_pages', 'book_rating', 'book_price', 'book_genre']
lang_target = 'text_lang'

In [9]:
x = d[lang_features].values
y = d[lang_target].values

X_train, X_test, y_train, y_test = train_test_split(
    x, 
    y, 
    test_size = 0.3
)

In [10]:
%%time
print("Training Model")
# instantiate the model (using the default parameters)
lang_mdl = RandomForestClassifier(n_jobs = 2)

# fit the model with data
lang_mdl.fit(X_train, y_train)

Training Model
CPU times: user 17 s, sys: 439 ms, total: 17.4 s
Wall time: 8.96 s


RandomForestClassifier(bootstrap=True, ccp_alpha=0.0, class_weight=None,
                       criterion='gini', max_depth=None, max_features='auto',
                       max_leaf_nodes=None, max_samples=None,
                       min_impurity_decrease=0.0, min_impurity_split=None,
                       min_samples_leaf=1, min_samples_split=2,
                       min_weight_fraction_leaf=0.0, n_estimators=100, n_jobs=2,
                       oob_score=False, random_state=None, verbose=0,
                       warm_start=False)

In [11]:
##perform classification and prediction on samples in tf_test
predicted_mdl = lang_mdl.predict(X_test)

print(classification_report(y_test, predicted_mdl))
report = classification_report(y_test, predicted_mdl, output_dict=True)

              precision    recall  f1-score   support

           1       0.15      0.15      0.15      4258
           2       0.14      0.15      0.14      4277
           3       0.15      0.14      0.15      4391
           4       0.14      0.13      0.13      4275
           5       0.14      0.14      0.14      4254
           6       0.14      0.13      0.14      4255
           7       0.14      0.13      0.13      4290

    accuracy                           0.14     30000
   macro avg       0.14      0.14      0.14     30000
weighted avg       0.14      0.14      0.14     30000



## Model Evaluation

In [12]:
%%time
# evaluate the model
n_scores = cross_val_score(
    lang_mdl,
    X_train,
    y_train,
    scoring='accuracy',
    cv=4, 
    n_jobs=1
)

CPU times: user 6.26 s, sys: 1.38 s, total: 7.64 s
Wall time: 34.8 s


In [13]:
%%time
# evaluate the model
n_scores = cross_val_score(
    lang_mdl,
    X_train,
    y_train,
    scoring='accuracy',
    cv=4,
    n_jobs=2
)

CPU times: user 33.1 ms, sys: 0 ns, total: 33.1 ms
Wall time: 28.1 s


## Parallelize Model Predictions - Singular Model

### Predict Normally

In [14]:
def predict(data, feature_cols, clf, pred_col):
    '''
    This function will generate predictions given a dataset, the associated features and a model.
    
    params:
        data (DataFrame) : The dataset which holds the features
        feature_cols (List -> String) : List of column names in data corresponding to the model features
        clf (Model) : The classification model which generates the predictions
        pred_col (String) : The name of the column you want to store the predictions under in data
        
    returns:
        This function will add a column to the input dataset associated to the predictions generated
    
    example:
        >> predict(
            data = df,
            feature_col = lang_features,
            pred_col = 'lang_prediction'
        )
    '''
    ft = data[feature_cols].values
    res = clf.predict(ft)
    data[pred_col] = res
    return data

In [15]:
%%time
# normal predictions
res = predict(
    data = d,
    feature_cols = lang_features,
    clf = lang_mdl,
    pred_col = 'lang_prediction'
)

CPU times: user 4.47 s, sys: 35.5 ms, total: 4.5 s
Wall time: 2.62 s


### Predict in Parallel

In [16]:
def parallel_pred(fn, data, feature_cols, clf, pred_col, n_cores):
    '''
    This function will parallelize the prediction process such that the data is split into
    n components (n is defined based on n_cores) and passed onto the model.
    
    params:
        fn (Function) : The function you want to parallelize
        data (DataFrame) : The dataset holding the features for the model
        feature_cols (List -> String) : List of column names in data corresponding to the model features
        clf (Model) : The  model which generates the predictions
        pred_col (String) : The name of the column you want to store the predictions under in data
        n_cores (Integer) : The number of cores you want to use
    
    returns:
        This function will return the result of the input function
        
    example:
        parallel_pred(
            fn = predict, 
            data = d,
            feature_cols = lang_features,
            clf = lang_mdl,
            pred_col = 'parallel_lang_pred',
            n_cores = 4
        )
    '''
    if cpu_count() < n_cores:
        raise ValueError("The number of CPU's specified exceed the amount available")

    df_list = np.array_split(data, n_cores)
    pool = Pool(n_cores)
    res = pool.map(partial(
        fn, 
        feature_cols = feature_cols, 
        clf = clf, 
        pred_col = pred_col
    ), df_list)
    pool.close()
    pool.join()
    return pd.concat(res)

In [17]:
%%time
# parallel predictions
res = parallel_pred(
    fn = predict, 
    data = d,
    feature_cols = lang_features,
    clf = lang_mdl,
    pred_col = 'parallel_lang_pred',
    n_cores = 2
)

CPU times: user 894 ms, sys: 1.79 s, total: 2.68 s
Wall time: 7.85 s


## Parallelize Model Predictions - Multiple Models

In [18]:
models = [gen_mdl, lang_mdl]
features = [genre_features, lang_features]

In [19]:
model_data = [[gen_mdl, genre_features, 'genre'], [lang_mdl, lang_features, 'lang']]

### Predict Normally

In [20]:
def multi_predcit(data, models_data, pred_col = 'prediction_{}'):
    '''
    This function will outline how to generate predictions associated to multiple
    models when passing in the same dataset.
    
    params:
        data (DataFrame) : The dataframe holding the feature data for the model
        models_data (List -> List) : Nested list associated with the model, 
                                     feature columns and the name of the target
        pred_col (String): The name of the column you want storing the results
        
    returns:
        This function will return the input dataframe with additional columns 
        corresponding to the predictions generated from all the models passed.
        
    example:
        multi_predcit(
            data = d,
            models_data = model_data,
            pred_col = 'prediction_{}'
        )
    '''

    for i in models_data:
        mdl = i[0]
        ft_cols = i[1]
        target = i[2]
        ft = data[ft_cols].values
        res = mdl.predict(ft)
        data[pred_col.format(target)] = res
    return data

In [21]:
%%time
# predict normally
res = multi_predcit(
    data = d,
    models_data = model_data,
    pred_col = 'prediction_{}'
)

CPU times: user 5.61 s, sys: 27.8 ms, total: 5.64 s
Wall time: 3.48 s


### Predict Parallel

In [24]:
def parallel_multi(fn, data, model_data, n_cores):
    '''
    This function will parallelize the input function so that a model is allocated to each core and 
    predictions are generated in parallel. 
    
    params:
        fn (Function) : The function you want to parallelize
        data (DataFrame) : The dataset holding the features
        models_data (List -> List) : Nested list associated with the model, 
                                     feature columns and the name of the target
        n_cores (Integer) : The number of cores you want to parallelize with
        
    returns:
        This function will return the input dataframe with additional columns, 1 
        corresponding to each of the models
        
    example: 
        parallel_multi(
            fn = multi_predcit,
            data = d,
            model_data = model_data,
            n_cores = 2
        )
    '''
    
    mdl_split = np.array_split(model_data, n_cores)
    pool = Pool(n_cores)
    res = pool.map(partial(fn, data), mdl_split)
    pool.close()
    pool.join()
    return res

In [25]:
%%time
# predict parallel
res = parallel_multi(
    fn = multi_predcit,
    data = d,
    model_data = model_data,
    n_cores = 2
)

CPU times: user 433 ms, sys: 1.19 s, total: 1.63 s
Wall time: 7.25 s


---