## Using joblib to speed up your Python pipelines

### Why joblib?

There are several reasons to integrate joblib tools as a part of the ML pipeline. There are major two reasons mentioned on their [website](https://joblib.readthedocs.io/en/latest/) to use it. However, I thought to rephrase it again:
- Capability to use cache which avoids recomputation of some of the steps,
- Execute Parallelization to fully utilize all the cores of CPU/GPU.

Beyond this, there are several other reasons why I would recommend joblib:
1. Can be easily integrated
2. No specific dependencies
3. Saves cost and time
4. Easy to learn

There are other functionalities that are also resourceful and help greatly if included in daily work.

### 1. Using Cached results

It often happens, that we need to re-run our pipelines multiple times while testing or creating the model. Some of the functions might be called several times, with the same input data and the computation happens again. Joblib provides a better way to avoid recomputing the same function repetitively saving a lot of time and computational cost. For example, let's take a simple example below:

In [2]:
import time
import numpy as np

result = []

# Getting the square of the number:
def square_number(no):
    return (no*no)

# Function to compute square of a range of a number:
def get_square_range(start_no, end_no):
    for i in np.arange(start_no, end_no):
        time.sleep(1)
        result.append(square_number(i))
    return result

start = time.time()
# Getting square of 1 to 20:
final_result = get_square_range(1, 21)
end = time.time()

# Total time to compute
print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 20.18 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


As seen above, the function is simply computing the square of a number over a range provided. It takes ~20 s to get the result. Now, let's use joblib’s Memory function with a location defined to store a cache as below:

In [16]:
from joblib import Memory

# Define a location to store cache
location = 'cache_dir'
memory = Memory(location, verbose=0)

result = []

# Function to compute square of a range of a number:
def get_square_range_cached(start_no, end_no):
    for i in np.arange(start_no, end_no):
        time.sleep(1)
        result.append(square_number(i))
    return result

get_square_range_cached = memory.cache(get_square_range_cached)

start = time.time()
# Getting square of 1 to 50:
final_result = get_square_range_cached(1, 21)
end = time.time()

# Total time to compute
print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 20.17 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


On computing the first time, the result is pretty much the same as before of ~20 s, because the results are computing the first time and then getting stored to a location. Let's try running one more time:

In [17]:
start = time.time()
# Getting square of 1 to 50:
final_result = get_square_range_cached(1, 21)
end = time.time()

print('\nThe function took {:.2f} s to compute.'.format(end - start))
print(final_result)


The function took 0.00 s to compute.
[1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]


And VOILA! It took 0.01 s to provide the results. The time reduced almost by 2000x. This is mainly because the results were already computed and stored in a cache on the computer. The efficiency rate will not be the same for all the functions! It might vary majorly for the type of computation requested. But you will definitely have this superpower to expedite the pipeline by caching!

To clear the cache results, it is possible using a direct command:

In [18]:
# Clean-up the cache folder
memory.clear(warn=False)

Be careful though, before using this code. You might wipe out your work worth weeks of computation.

### 2. Parallelization

As the name suggests, we can compute in parallel any specified function with even multiple arguments using “joblib.Parallel”. Behind the scenes, when using multiple jobs (if specified), each calculation does not wait for the previous one to complete and can use different processors to get the task done. For better understanding, I have shown how Parallel jobs can be run inside caching.

Consider the following random dataset generated:

In [8]:
rng = np.random.RandomState(42)
data = rng.randn(int(1e4), 4)

In [11]:
data.shape

(10000, 4)

In [12]:
data[:10]

array([[ 0.49671415, -0.1382643 ,  0.64768854,  1.52302986],
       [-0.23415337, -0.23413696,  1.57921282,  0.76743473],
       [-0.46947439,  0.54256004, -0.46341769, -0.46572975],
       [ 0.24196227, -1.91328024, -1.72491783, -0.56228753],
       [-1.01283112,  0.31424733, -0.90802408, -1.4123037 ],
       [ 1.46564877, -0.2257763 ,  0.0675282 , -1.42474819],
       [-0.54438272,  0.11092259, -1.15099358,  0.37569802],
       [-0.60063869, -0.29169375, -0.60170661,  1.85227818],
       [-0.01349722, -1.05771093,  0.82254491, -1.22084365],
       [ 0.2088636 , -1.95967012, -1.32818605,  0.19686124]])

Below is a run with our normal sequential processing, where a new calculation starts only after the previous calculation is completed.

In [13]:
def costly_compute(data, column):
    """Emulate a costly function by sleeping and returning a column."""
    time.sleep(2)
    return data[column]

def data_processing_mean(data, column):
    """Compute the mean of a column."""
    return costly_compute(data, column).mean()

In [14]:
start = time.time()
results = [data_processing_mean(data, col) for col in range(data.shape[1])]
stop = time.time()

In [15]:
print('\nSequential processing')
print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))


Sequential processing
Elapsed time for the entire processing: 8.04 s


For parallel processing, we set the number of jobs = 2. The number of jobs is limit to the number of cores the CPU has or are available (idle).

In [19]:
#Import package
from joblib import Parallel, delayed
from joblib import Memory

location = 'cache_dir'
memory = Memory(location, verbose=0)
costly_compute_cached = memory.cache(costly_compute)

def data_processing_mean_using_cache(data, column):
    """Compute the mean of a column."""
    return costly_compute_cached(data, column).mean()

start = time.time()
results = Parallel(n_jobs=2)(
    delayed(data_processing_mean_using_cache)(data, col)
    for col in range(data.shape[1]))
stop = time.time()

print('Elapsed time for the entire processing: {:.2f} s'
      .format(stop - start))

Elapsed time for the entire processing: 4.37 s


Here we can see that time for processing using the Parallel method was reduced by **2x**.

- **Note**: using this method may show deteriorated performance if used for less computational intensive functions.

### 3. Dump and Load

We often need to store and load the datasets, models, computed results, etc. to and from a location on the computer. Joblib provides functions that can be used to dump and load easily:

In [74]:
from joblib import dump, load

start = time.time()

path = 'dumped/'
# Name of the file
joblib_file = 'train_features.joblib'

dump(data, path + joblib_file)

# Calculating the total time
simple_joblib_duration = time.time() - start
print("Dump duration: %0.3fs" % simple_joblib_duration)

Dump duration: 0.002s


In [76]:
start = time.time()

data = load(path + joblib_file)

# Calculating the total time
simple_joblib_duration = time.time() - start
print("Load duration: %0.3fs" % simple_joblib_duration)

Load duration: 0.001s


In [78]:
data[:10]

array([[ 0.49671415, -0.1382643 ,  0.64768854,  1.52302986],
       [-0.23415337, -0.23413696,  1.57921282,  0.76743473],
       [-0.46947439,  0.54256004, -0.46341769, -0.46572975],
       [ 0.24196227, -1.91328024, -1.72491783, -0.56228753],
       [-1.01283112,  0.31424733, -0.90802408, -1.4123037 ],
       [ 1.46564877, -0.2257763 ,  0.0675282 , -1.42474819],
       [-0.54438272,  0.11092259, -1.15099358,  0.37569802],
       [-0.60063869, -0.29169375, -0.60170661,  1.85227818],
       [-0.01349722, -1.05771093,  0.82254491, -1.22084365],
       [ 0.2088636 , -1.95967012, -1.32818605,  0.19686124]])

---

### 4. Compression methods

When dealing with larger datasets the size occupied by these files is massive. With feature engineering, the file size gets even larger as we add more columns. Fortunately, nowadays, with the storages getting so cheap, it is less of an issue. However, still, to be efficient there are some compression methods that joblib provides are very simple to use:

**a. Simple Compression:**

In [104]:
start = time.time()

# File
joblib_file = '/train_features.joblib'

# Dumping the file in the normal format
dump(data, path + joblib_file)
    
simple_pickle_duration = time.time() - start

# total time taken to dump
print("Raw dump duration: %0.3fs" % simple_pickle_duration)

Raw dump duration: 0.002s


In [105]:
# joblib_file = '/train_features.joblib'

# Measuring the size of the file
raw_file_size = os.stat(path + joblib_file).st_size / 1e6

# Printing the size
print("The file size is: %0.3fMB" % raw_file_size)

The file size is: 0.320MB


**b. Using Zlib compression:**

In [106]:
start = time.time()

# File
joblib_file_zlib = 'joblib_file_zlib.joblib'

# Dumping the file in the zlib compression format
dump(data, path + joblib_file_zlib, compress='zlib')

zlib_joblib_duration = time.time() - start

# Total time taken to dump
print("Zlib dump duration: %0.3fs" % zlib_joblib_duration)

Zlib dump duration: 0.012s


In [107]:
# Measuring the size of the file
zlib_file_size = os.stat(path + joblib_file_zlib).st_size / 1e6

# Printing the size
print("The file size is: %0.3fMB" % zlib_file_size)

The file size is: 0.309MB


**There is another comperssion algorithm - lz4.**

---