# Multiprocessing and Apply

If you have to use `.apply()` - there is a way to use multiprocessing with `.apply()`, and this *might* speed up your results.

I am very much a beginner when it comes to using multiprocessing with python and have only used it myself a few times.  Below is an example of how I have used it to improve the speed of data processing. Note that this was on a machine with 96 cores.

There is much more to multiprocessing than what you see in this example. To learn more, see the guides/docs here: https://docs.python.org/3.8/library/multiprocessing.html <br>
https://docs.python.org/3/library/concurrent.futures.html

In [1]:
import numpy as np
import pandas as pd
import time
from multiprocessing import Pool
from concurrent.futures import ProcessPoolExecutor

# Basic Multiprocessing Example

In this example we have fake time series data. For 100,000 different groups. Think of it as 100,000 different temperature sensors. Each group has 5 data points. Each data point has a "score" value, but many of these values are null.  For each of the 100,000 time series, we need to fill the null values with the previous value, assuming the previous value is non-null.  We start from the first point in the time series and move forward.  The order of the time series is given by the "order" column.

We create this fake data in the cell below

## Create Some Fake Data For This Example

In [2]:
# Set the random seed so that the example is reproducable
np.random.seed(19)

N = 100000  # number of groups
K = 5  # number of points per group

group_ids = np.tile(range(0, N), K)
order = np.concatenate([np.ones(N) + x for x in range(5)])

scores = np.ones_like(group_ids) * np.nan
scores[np.random.randint(0, len(scores), int(N*K*0.5))] = np.random.rand(int(N*K*0.5))

# Comvine the group_id, order, and score columns into a dataframe
my_data = pd.DataFrame([group_ids, order, scores]).T
my_data.columns = ['group_id', 'order', 'score']
my_data = my_data.sample(frac=1).reset_index(drop=True)
my_data['group_id'] = my_data['group_id'].astype(int)
my_data.sort_values(by=["group_id", "order"], inplace=True)

### Take a look at the data 

In [3]:
my_data.head(10)

Unnamed: 0,group_id,order,score
73653,0,1.0,0.16711
51522,0,2.0,0.667212
124104,0,3.0,
43731,0,4.0,0.416135
287132,0,5.0,0.225123
374386,1,1.0,
451609,1,2.0,0.88313
95948,1,3.0,0.317241
342395,1,4.0,0.972857
463988,1,5.0,0.419329


### Group the data by the group id

In [4]:
data_groups = my_data.groupby('group_id')

### Look at one of the groups as an example

In [5]:
data_groups.get_group(1000)

Unnamed: 0,group_id,order,score
306240,1000,1.0,
17489,1000,2.0,
454714,1000,3.0,
147282,1000,4.0,0.622336
497298,1000,5.0,


## A Solution Using Apply

We use the function defined below to fill the null values using the forward fill method, for each group.

In [6]:
def propogate_score(group):
    '''Sort the values by the order column and then fill null values in the
    score column using the forward-fill method
    '''
    scores_filled = group['score'].fillna(method='ffill')
    return scores_filled

In [7]:
time_1 = time.time()
results = data_groups.apply(propogate_score)
time_2 = time.time()

print(f'Elapsed Time: {time_2 - time_1}')



Elapsed Time: 29.58462405204773


##  A Solution Using Multiprocessing

In [9]:
def propogate_score_mp(x):
    g_id = x[0]
    group = x[1]
    scores_filled = group['score'].fillna(method='ffill')
    return scores_filled

### Notes on the cell below:

`Pool()` create a pool of processes, `p`, that we can send work to. The `.map_async()` method maps the function to each group in data_groups but dividing the groups up into chunks and sending them to processes in the pool. The tasks are asynchronous, meaning that our main process (our notebook) can still run while the pool is running. This is how we are able to run the while loop that keeps printing the number of tasks left. Note that once the results are ready (meaning all tasks are complete) we exit the while loop.


#### Notice that adjust the` chunksize` can significantly impact the performance.
The `chunksize` is the number of items in your list of items to process that are chunked together and sent to each processor.

In [11]:
time_1 = time.time()

p = Pool()
results = p.map_async(propogate_score_mp, data_groups, chunksize=5)
p.close() # Close pool now that no more work will be submitted

while (True):
  if (results.ready()):
    break
  remaining = results._number_left
  print("Waiting for", remaining, "tasks to complete...")
  time.sleep(1)

p.join() # Block the main process untill the pool is completely terminated

time_2 = time.time()

results_df = pd.concat(results.get())

print(f'Elapsed Time: {time_2 - time_1}')


Waiting for 20000 tasks to complete...
Waiting for 19374 tasks to complete...
Waiting for 18683 tasks to complete...
Waiting for 18281 tasks to complete...
Waiting for 17478 tasks to complete...
Waiting for 16706 tasks to complete...
Waiting for 15863 tasks to complete...
Waiting for 14977 tasks to complete...
Waiting for 14222 tasks to complete...
Waiting for 13361 tasks to complete...
Waiting for 12596 tasks to complete...
Waiting for 11833 tasks to complete...
Waiting for 11068 tasks to complete...
Waiting for 10151 tasks to complete...
Waiting for 9346 tasks to complete...
Waiting for 8651 tasks to complete...
Waiting for 7839 tasks to complete...
Waiting for 7185 tasks to complete...
Waiting for 6514 tasks to complete...
Waiting for 5743 tasks to complete...
Waiting for 5128 tasks to complete...
Waiting for 4332 tasks to complete...
Waiting for 3546 tasks to complete...
Waiting for 2717 tasks to complete...
Waiting for 2179 tasks to complete...
Waiting for 1417 tasks to complete..

## Question or Comments About This Notebook?
Feel free to contact me via my LinkedIn: https://www.linkedin.com/in/william-j-henry <br>
You can also email me at will@henryanalytics.com <br>