# Parallelising text vectorisation transformations on csr matrices

In [1]:
# Import libraries
import pandas as pd
import numpy as np
import scipy.sparse as sparse_matrix # for sparse matrix operations
from sklearn.feature_extraction.text import TfidfVectorizer # the word vectoriser

import time # for reporting time elapsed

import random # for generating random text
import string # for string operations

from joblib import Parallel, delayed # the parallelisation library
from multiprocessing import cpu_count # for finding the number of threads
num_cores = cpu_count() # store the thread count

In [2]:
# Print elapsed time
def elapsed_time():
    print("Finished in %0.2fs." % (time.time() - t0),'\n')

In [5]:
print("Generating dataset...")
t0 = time.time()

docs = pd.read_csv(r'.\sample_data.csv').astype(str)
for i in range(1,len(docs.columns)): # join all columns together
    docs.iloc[:,0] = docs.iloc[:,0] + ' ' + docs.iloc[:,i]
docs = docs['Col0']

for i in range(4): # increase the size of the text to near 1 million
    docs = pd.concat([docs, docs], axis=0, sort=False, copy=False)

elapsed_time()
display(docs.head(),docs.shape[0])

Generating dataset...
Finished in 0.62s. 



0    Eldon Base for stackable storage shelf, platin...
1    1.7 Cubic Foot Compact "Cube" Office Refrigera...
2    Cardinal Slant-D® Ring Binder, Heavy Gauge Vin...
3    R380 Clay Rozendal 483 1198.97 195.99 3.99 Nun...
4    Holmes HEPA Air Purifier Carlos Soltero 515 30...
Name: Col0, dtype: object

952112

In [6]:
print("Extracting tf-idf features...")

t0 = time.time()
tfidf_vectorizer = TfidfVectorizer(analyzer='word') # initialise word counter
tfidf_vectorizer.fit(docs) # word counts

elapsed_time()

Extracting tf-idf features...
Finished in 18.11s. 



In [7]:
print("Un-parallelised transformation of given column...")

t0 = time.time()
tfidf = tfidf_vectorizer.transform(docs)

elapsed_time()

Un-parallelised transformation of given column...
Finished in 18.00s. 



In [8]:
# The transformation using fitted vectoriser
def tfidf_transform(split_array):
    tfidf_matrix = tfidf_vectorizer.transform(split_array)
#     time.sleep(0.1)
    return tfidf_matrix

# Parellelisation
def parallelise_vectorisation(vectorise_col, tfidf_transform=tfidf_transform):
    split_array = np.array_split(vectorise_col, num_cores) # split column row-wise by number of cores
    results = Parallel(n_jobs=num_cores, verbose=0, backend="threading")(map(delayed(tfidf_transform), split_array)) # threading expecting given column broken in parts
    tfidf_matrix = sparse_matrix.csr_matrix((0,results[0].shape[1]), dtype=np.uint8) # instantiate an empty csr matrix with 0 rows and the same number of columns as any of the split results
    for i in range(len(results)): 
        tfidf_matrix = sparse_matrix.vstack([tfidf_matrix,results[i]], format='csr') # join the results together using sparse format, csr works best
    return tfidf_matrix # return the joined matrix

In [9]:
print("Parallelised execution using joblib...")

t0 = time.time()

tfidf_parallel = parallelise_vectorisation(docs)

elapsed_time()

Parallelised execution using joblib...
Finished in 17.38s. 



In [10]:
# Quick check using summation
tfidf.sum() == tfidf_parallel.sum()

True

+ Sources:
    + Parallelisation using sklearn and tfidfvectorizer: https://stackoverflow.com/questions/28396957/sklearn-tfidf-vectorizer-to-run-as-parallel-jobs   
    + Parallelise using dataframes in python: http://blog.adeel.io/2016/11/06/parallelize-pandas-map-or-apply/