## Multiprocessing Methods

In [1]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

Number of processors:  2


I. Pool Class

1.   Synchronous execution

      Pool.map() and Pool.starmap()
      Pool.apply()
2.   Asynchronous execution

      Pool.map_async() and Pool.starmap_async()
      Pool.apply_async()

II. Process Class

In [2]:
import numpy as np
from time import time

# Prepare data
np.random.RandomState(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()
data[:5]

[[4, 5, 9, 7, 2],
 [3, 8, 1, 7, 7],
 [1, 8, 6, 8, 0],
 [7, 7, 7, 2, 1],
 [6, 1, 3, 3, 1]]

In [3]:
# Solution Without Paralleization

def howmany_within_range(row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

results = []
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))

print(results[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


### Synchronous Method

1. apply()

In [4]:
# Parallelizing using Pool.apply()

import multiprocessing as mp

# Step 1: Init multiprocessing.Pool()
pool = mp.Pool(mp.cpu_count())

# Step 2: `pool.apply` the `howmany_within_range()`
results = [pool.apply(howmany_within_range, args=(row, 4, 8)) for row in data]

# Step 3: Don't forget to close
pool.close()

print(results[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


2. map()

In [5]:
# Parallelizing using Pool.map()
import multiprocessing as mp

# Redefine, with only 1 mandatory argument.
def howmany_within_range_rowonly(row, minimum=4, maximum=8):
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

pool = mp.Pool(mp.cpu_count())

results = pool.map(howmany_within_range_rowonly, [row for row in data])

pool.close()

print(results[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


3. startmap()

In [6]:
# Parallelizing with Pool.starmap()
import multiprocessing as mp

pool = mp.Pool(mp.cpu_count())

results = pool.starmap(howmany_within_range, [(row, 4, 8) for row in data])

pool.close()

print(results[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


### Asynchronous

1. apply_async()

In [8]:
# Parallel processing with Pool.apply_async()

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)


# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)


# Step 3: Use loop to parallelize
for i, row in enumerate(data):
    pool.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# Step 4: Close Pool and let all the processes complete
pool.close()
pool.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]

print(results_final[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


In [9]:
# Parallel processing with Pool.apply_async() without callback function

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# call apply_async() without callback
result_objects = [pool.apply_async(howmany_within_range2, args=(i, row, 4, 8)) for i, row in enumerate(data)]

# result_objects is a list of pool.ApplyResult objects
results = [r.get()[1] for r in result_objects]

pool.close()
pool.join()
print(results[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


2. startmap_async()

In [12]:
# Parallelizing with Pool.starmap_async()

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

results = pool.starmap_async(howmany_within_range2, [(i, row, 4, 8) for i, row in enumerate(data)]).get()

pool.close()

# result_objects is a list of pool.ApplyResult objects
results = [r.get()[1] for r in result_objects]
print(results[:10])

#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


In [11]:
# Parallelizing with Pool.map_async()

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())

results = []

# With map, use `howmany_within_range_rowonly` instead
results = pool.map_async(howmany_within_range_rowonly, [row for row in data]).get()

pool.close()
print(results[:10])
#> [3, 3, 3, 3, 1, 1, 2, 2, 2, 2]

[3, 3, 3, 3, 1, 1, 2, 2, 2, 2]


## Applying multiprocessing on Pandas

In [13]:
import numpy as np
import pandas as pd
import multiprocessing as mp

df = pd.DataFrame(np.random.randint(3, 10, size=[5, 2]))
print(df.head())

   0  1
0  9  3
1  3  3
2  7  9
3  6  9
4  3  8


In [28]:
num_processes = min(df.shape[1], mp.cpu_count())

In [36]:
# Row wise Operation
def hypotenuse(row):
    return round(row[1]**2 + row[2]**2, 2)**0.5

with mp.Pool(num_processes) as pool:
    result = pool.imap(hypotenuse, df.itertuples(name=None),chunksize=10)
    output = [round(x, 2) for x in result]

print(output)
#> [9.49, 4.24, 11.4, 10.82, 8.54]

[9.49, 4.24, 11.4, 10.82, 8.54]


In [31]:
# Column wise Operation
def sum_of_squares(column):
    return sum([i**2 for i in column[1]])

with mp.Pool(2) as pool:
    result = pool.imap(sum_of_squares, df.iteritems(), chunksize=10)
    output = [x for x in result]

print(output)
#> [184, 244]

[184, 244]


  x = tuple(itertools.islice(it, size))
