# Python Parallel demo
Apr 24, 2019. Guorui Shen, guorui233@outlook.com

This notebook is a simple parallel demonstration in Python, with minor modification from [1] for better understanding.

[1] https://www.machinelearningplus.com/python/parallel-processing-python/

In [1]:
import multiprocessing as mp
import numpy as np
from timeit import default_timer as timer

In [2]:
# how many CPUs do you have in your machine?
print("Number of processors: ", mp.cpu_count())

('Number of processors: ', 4)


In [3]:
def howmany_within_range(row, minimum=4, maximum=8):
    """
    input:
        row: a vector contained several real numbers
        [minimum, maximum]: an interval
    return:
        count: the number of row's elements that lie in [minimum, maximum]
    """
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return count

# Prepare data
np.random.seed(100)
arr = np.random.randint(0, 10, size=[200000, 5])
data = arr.tolist()
print data[:5]

[[8, 8, 3, 7, 7], [0, 4, 2, 5, 2], [2, 2, 1, 0, 8], [4, 0, 9, 6, 2], [4, 1, 5, 3, 4]]


### no parallel

In [4]:
# Solution Without Parallelization
results = []
time_start = timer()
for row in data:
    results.append(howmany_within_range(row, minimum=4, maximum=8))
print(results[:10])

# time cost
time_end = timer()
print "Time cost without paralleling is", time_end - time_start, "seconds."

[4, 2, 1, 2, 3, 2, 2, 2, 2, 2]
Time cost without paralleling is 0.0833251476288 seconds.


## Parallel computing
In parallel processing, there are two types of execution: Synchronous and Asynchronous.

A synchronous execution is one the processes are completed in the same order in which it was started. This is achieved by locking the main program until the respective processes are finished.

Asynchronous, on the other hand, doesn’t involve locking. As a result, the order of results can get mixed up but usually gets done quicker.

### synchronously parallel computing

In [5]:
# Parallelizing with Pool.apply()
results = []
time_start = timer()
pool_1 = mp.Pool(mp.cpu_count())
results = [pool_1.apply(howmany_within_range, args=(row, 4, 8)) for row in data]
pool_1.close()
print(results[:10])

# time cost
time_end = timer()
print "Time cost without paralleling is", time_end - time_start, "seconds."

[4, 2, 1, 2, 3, 2, 2, 2, 2, 2]
Time cost without paralleling is 10.4073710442 seconds.


In [6]:
# Parallelizing with Pool.map()
time_start = timer()
pool_2 = mp.Pool(mp.cpu_count())
results = pool_2.map(howmany_within_range, [row for row in data])
pool_2.close()
print(results[:10])

# time cost
time_end = timer()
print "Time cost without paralleling is", time_end - time_start, "seconds."

[4, 2, 1, 2, 3, 2, 2, 2, 2, 2]
Time cost without paralleling is 0.22460103035 seconds.


### asynchronously parallel computing
The asynchronous equivalents `apply_async()`, `map_async()` and `starmap_async()` lets you do execute the processes in parallel asynchronously, that is the next process can start as soon as previous one gets over without regard for the starting order. As a result, there is no guarantee that the result will be in the same order as the input.

In [7]:
results = []

# Step 1: Redefine, to accept `i`, the iteration number
def howmany_within_range2(i, row, minimum, maximum):
    """Returns how many numbers lie within `maximum` and `minimum` in a given `row`"""
    count = 0
    for n in row:
        if minimum <= n <= maximum:
            count = count + 1
    return (i, count)

# Step 2: Define callback function to collect the output in `results`
def collect_result(result):
    global results
    results.append(result)

# Step 3: Use loop to parallelize
time_start = timer()
pool_3 = mp.Pool(mp.cpu_count())
for i, row in enumerate(data):
    pool_3.apply_async(howmany_within_range2, args=(i, row, 4, 8), callback=collect_result)

# Step 4: Close Pool and let all the processes complete    
pool_3.close()
pool_3.join()  # postpones the execution of next line of code until all processes in the queue are done.

# Step 5: Sort results [OPTIONAL]
results.sort(key=lambda x: x[0])
results_final = [r for i, r in results]
print(results_final[:10])

# time cost
time_end = timer()
print "Time cost without paralleling is", time_end - time_start, "seconds."

[4, 2, 1, 2, 3, 2, 2, 2, 2, 2]
Time cost without paralleling is 7.66351318359 seconds.
