In [None]:
%matplotlib inline
import random
import time
import math
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

This notebook was tested on a Mac. Changes may be required to run this code in a Windows machine. 

In these examples I am hardcoding the number of cores to use (`NUM_PROCESSES = 4`). The `multiprocessing` module will try and use all available cores, so you shouldn't need to hardcode the number of processes unless you want to manage your resources. In my experience, at least in Windows machines, I've noticed that is better to hardcode the number of processes as number of available cores - 1. Otherwise, the operating system starts feeling laggy and unresponsive. 

## Estimate the value of pi using the Monte Carlo method

We generate multiple random values. The proportion of random values within a unit circle (x^2 + y^2 <= 1) with respect to the total amoung of generated random values is our approximation of pi. 

This is an ideal first problem because the workload can be evenly split across a number of processes. 

In [None]:
def estimate_nbr_points_in_quarter_circle(nbr_estimates):
    nbr_trials_in_quarter_unit_circle = 0
    
    for step in range(int(nbr_estimates)):
        x = random.uniform(0, 1)
        y = random.uniform(0, 1)
        is_in_unit_circle = x * x + y * y <= 1.0
        nbr_trials_in_quarter_unit_circle += is_in_unit_circle
    return nbr_trials_in_quarter_unit_circle

This version uses a pool of processes. The time is estimated after creating the pool, because spawning processes (as opposed to spawning threads) has some overhead. 

In [None]:
nbr_samples_in_total = 1e8

times_proc = []

for num_processes in range(1,9):
    print('Number of processes: ' + str(num_processes))
    pool = Pool(processes = num_processes)
    nbr_samples_per_worker = nbr_samples_in_total / num_processes
    print('Making {} samples per worker'.format(nbr_samples_per_worker))
    nbr_trials_per_process = [nbr_samples_per_worker] * num_processes

    t1 = time.time()
    nbr_in_unit_circles = pool.map(estimate_nbr_points_in_quarter_circle, nbr_trials_per_process)
    # We multiply by 4 because we are producing sampels only on one quarter of the unit circle
    pi_estimate = sum(nbr_in_unit_circles) * 4 / nbr_samples_in_total
    print('Estimated pi ' + str(pi_estimate))
    delta = time.time() - t1
    print('Delta: ' + str(delta))
    print('-----------------')
    
    pool.close()
    
    times_proc.append(delta)

This version is based on threads. The problem with threads is that due to Python's GIL contraint (Global Interpreter Lock) only one thread can run at a time. As a consequence of this, adding more threads actually slows down the process (due to the overhead of switching between threads).

In [None]:
nbr_samples_in_total = 1e8

times_thread = []

for num_processes in range(1,9):
    print('Number of processes: ' + str(num_processes))
    pool = ThreadPool(processes = num_processes)
    nbr_samples_per_worker = nbr_samples_in_total / num_processes
    print('Making {} samples per worker'.format(nbr_samples_per_worker))
    nbr_trials_per_process = [nbr_samples_per_worker] * num_processes

    t1 = time.time()
    nbr_in_unit_circles = pool.map(estimate_nbr_points_in_quarter_circle, nbr_trials_per_process)
    # We multiply by 4 because we are producing sampels only on one quarter of the unit circle
    pi_estimate = sum(nbr_in_unit_circles) * 4 / nbr_samples_in_total
    print('Estimated pi ' + str(pi_estimate))
    delta = time.time() - t1
    print('Delta: ' + str(delta))
    print('-----------------')
    
    pool.close()
    
    times_thread.append(delta)

In [None]:
fig, ax = plt.subplots()
ax.plot(range(1, 9), times_proc)
ax.plot(range(1, 9), times_thread)
ax.set_xlabel('number of workers')
ax.set_ylabel('time')
ax.grid(True)
ax.legend(['Processes', 'Threads'])
fig.set_figwidth(16)

## Estimate the value of pi using the Monte Carlo method (numpy version)

In this version we need to explicitely set numpy's random seed for each forked process (not for threads). Otherwise, all the workers wil generate exactly the same sequence of random numbers. This is because all forks share the same shared state. This is not the case with the `random` module, which was used above, because this is dealt with by the `multiprocessing` module. 

In [None]:
def estimate_nbr_points_in_quarter_circle(nbr_samples):
    np.random.seed()
    
    xs = np.random.uniform(0, 1, nbr_samples)
    ys = np.random.uniform(0, 1, nbr_samples)
    estimate_inside_quarter_unit_circle = (xs * xs + ys * ys) <= 1
    nbr_trials_in_quarter_unit_circle = np.sum(estimate_inside_quarter_unit_circle)
    return nbr_trials_in_quarter_unit_circle

In [None]:
nbr_samples_in_total = 1e8

times_proc = []

for num_processes in range(1,9):
    print('Number of processes: ' + str(num_processes))
    pool = Pool(processes = num_processes)
    nbr_samples_per_worker = int(nbr_samples_in_total / num_processes)
    print('Making {} samples per worker'.format(nbr_samples_per_worker))
    nbr_trials_per_process = [nbr_samples_per_worker] * num_processes

    t1 = time.time()
    nbr_in_unit_circles = pool.map(estimate_nbr_points_in_quarter_circle, nbr_trials_per_process)
    # We multiply by 4 because we are producing sampels only on one quarter of the unit circle
    pi_estimate = sum(nbr_in_unit_circles) * 4 / nbr_samples_in_total
    print('Estimated pi ' + str(pi_estimate))
    delta = time.time() - t1
    print('Delta: ' + str(delta))
    print('-----------------')
    
    pool.close()
    
    times_proc.append(delta)

In [None]:
nbr_samples_in_total = 1e8

times_thread = []

for num_processes in range(1,9):
    print('Number of processes: ' + str(num_processes))
    pool = ThreadPool(processes = num_processes)
    nbr_samples_per_worker = int(nbr_samples_in_total / num_processes)
    print('Making {} samples per worker'.format(nbr_samples_per_worker))
    nbr_trials_per_process = [nbr_samples_per_worker] * num_processes

    t1 = time.time()
    nbr_in_unit_circles = pool.map(estimate_nbr_points_in_quarter_circle, nbr_trials_per_process)
    # We multiply by 4 because we are producing sampels only on one quarter of the unit circle
    pi_estimate = sum(nbr_in_unit_circles) * 4 / nbr_samples_in_total
    print('Estimated pi ' + str(pi_estimate))
    delta = time.time() - t1
    print('Delta: ' + str(delta))
    print('-----------------')
    
    pool.close()
    
    times_thread.append(delta)

In [None]:
fig, ax = plt.subplots()
ax.plot(range(1, 9), times_proc)
ax.plot(range(1, 9), times_thread)
ax.set_xlabel('number of workers')
ax.set_ylabel('time')
ax.grid(True)
ax.legend(['Processes', 'Threads'])
fig.set_figwidth(16)

Obviously, this vectorised version of the code is much faster than the first version on this notebook based on pure Python. 

Adding more threads decreases the overall execution time. This is because numpy can achieve additional spped ups by operating outside the GIL. 

## Finding prime numbers

We test for prime numbers over a large number range. This is a different problem to pi value estimation because the workload depends on the location in the number range and each number's check has an unpredictable complexity. However, the problem is still embarrasingly parallel (we do not need to pass state information between processes).

The `multiprocessing` module, by default, divides the total number of items to compute by the number of processors. This may be the best approach if the computation of each item takes approximately the same amount of time. 

Strategies for efficiently use `multiprocessing` for embarrassingly parallel problems:

- Split your jobs into independent units of work
- If your workers take varying amounts of time, then consider randomizing the sequence of work
- Sorting your work queue so slowest jobs go first may help
- Use the default chunksize unless you have verified reasons to change it
- Align the number of jobs with the number of physical cpus

This is the function that we use to check whether a single number is primer or not:

In [None]:
def check_prime(n):
    if n % 2 == 0:
        return False
    from_i = 3
    to_i = math.sqrt(n) + 1
    for i in range(from_i, int(to_i), 2):
        if n % i == 0:
            return False
    return True

And we can use this function to check a list of numbers (the `numbers` parameter) using a pool of workers with a given `chunksize`, that is, how many numbers are assigned to each worker. This function returns the total time it took to perform the computation (the time used to create the pool of workers is not taken into account).

In [None]:
def check_primes(chunksize, numbers): 
    num_processes = 4

    pool = Pool(processes = num_processes)
    t1 = time.time()
    is_prime = pool.map(check_prime, numbers, chunksize)
    delta = time.time() - t1

    pool.close()   
    
    return delta

For instance:

In [None]:
def generate_numbers():
    for n in range(100000000, 101000000):
        yield n

In [None]:
check_primes(100, generate_numbers())

We are going to try and compare some of the strategies enumerated above and compare execution times:

In [None]:
# Baseline - just increasing the number of chunks, no considering the number of processes
numbers = list(generate_numbers())
chunksizes_b = [int(len(numbers)/chunks) for chunks in range(1, 16)]
times_baseline = []
for chunksize in tqdm(chunksizes_b):
    times_baseline.append(check_primes(chunksize, generate_numbers()))

In [None]:
# Using chunksizes that are multiplo of the number of processes
chunksizes = []
i = 0
while 2 ** i < 10000:
    chunksizes.append(2 ** i)
    i = i + 1
times_multiplo = []
for chunksize in tqdm(chunksizes):
    times_multiplo.append(check_primes(chunksize, generate_numbers()))

In [None]:
# Randomising the jobs' order, hoping that that way we will balance the load
random.seed(0)
random.shuffle(numbers)
times_random = []
for chunksize in tqdm(chunksizes):
    times_random.append(check_primes(chunksize, numbers))

In [None]:
# Inverse the jobs order, hopping that biggest jobs will be at the beginning
numbers = list(generate_numbers())[::-1]
times_reverse = []
for chunksize in tqdm(chunksizes):
    times_reverse.append(check_primes(chunksize, numbers))

In [None]:
fig, ax = plt.subplots()
ax.plot(chunksizes, times_multiplo)
#plt.plot(chunksizes_b, times_baseline)
ax.plot(chunksizes, times_random)
ax.plot(chunksizes, times_reverse)
#ax.set_yscale('log')
ax.grid(True)
fig.set_figwidth(16)
fig.set_figheight(12)

In [None]:
chunksizes

In [None]:
chunksizes_b = [siz for siz in range(30, 8000, 30)]
chunksizes_b

In [None]:
len(numbers)/8192

In [None]:
# TODO: the plot is not correct: it should have number of chunks instead of chunksize on the x axis. Besides,
# how is it possible that as I increase the chunksize the time decreases? It seems like if the chunksize 
# parameter in pool's map function is actually the number of chunks!