In [10]:
from math import sqrt
import random
from timeit import timeit

Parallel processing is, in general, the act of running several tasks of your code simultaneously. Nowadays, most computers have multiple processing cores, and not using them feels like a waste of resources. Sometimes, more expensive servers (that can be rented in the cloud, for example) can have hundreds or even thousands of processors available at the same time.

Depending on the circumstances, the act of turning your sequential code into a parallel version can be quite straightforward, and will involve simply breaking your operations down into well-defined *chunks*. However, it is not always that simple. If we are not careful, parallel processing can lead to a lot of unexpected behavior, especially if multiple parallel tasks are accessing and modifying the same objects. It is almost impossible to predict the exact order in which parallel tasks will be executed, because of many external factors.

**In this lecture we will treat only the easy cases, that is, when the tasks running in paralell do not affect each other.** Other more complicated cases are out of scope for now and require much more study and work.

### Embarassingly Parallel

When a problem, that is originally sequential, can be easily turned into a parallel version with little or no extra effort, it is known as an *embarassingly parallel* problem. This is the case when the tasks that are going to be executed simultaneously have little or no dependency between each other.

Let's revisit our example of the Euclidean distance between two vectors $x$ and $y$, this time in pure Python:

In [6]:
def eucl_dist(x, y):
    #e = []
    #for i in range(len(x)):
    #    e.append((x[i] - y[i])**2)
    #return sqrt(sum(e))
    
    # Alternative, shorter implementation:
    return sqrt(sum([(x_ - y_)**2 for x_, y_ in zip(x, y)]))

As we discussed before, we commonly need the full pairwise distance matrix between all vectors (or rows) of a dataset $X$. The implementation below is in pure Python:

In [8]:
def empty_square_matrix(n):    
    D = []
    for i in range(n):
        row = []
        for j in range(n):
            row.append(0)
        D.append(row)
    return D    

def eucl_dist_matrix(X):
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = empty_square_matrix(n)
    # Fill D with the actual pairwise distances
    for i in range(n):
        for j in range(n):
            D[i][j] = eucl_dist(X[i], X[j])
    return D

The implementation is inherently sequential and quadratic, processing each possible (unordered) pair of rows from the dataset $X$ *twice*. The result is a list of lists $D$, such that $D[10][15]$, for example, contains the distance between rows $10$ and $15$ from the original dataset $X$.

In [11]:
import numpy as np
from scipy.spatial.distance import squareform, pdist
from sklearn.datasets import load_iris


X_iris = load_iris().data
print("Shape of X_iris:", X_iris.shape)

# So far, our implementation is still in pure Python
D_iris = eucl_dist_matrix(X_iris)
print(f"Shape of D_iris: ({len(D_iris)}, {len(D_iris[0])})")

# 'pdist' is a scipy function to calculate pairwise distances.
# We use it here to check if our implementation is correct.
D_pdist = squareform(pdist(X_iris))
assert np.allclose(D_iris, D_pdist)

D_iris[10][15]

Shape of X_iris: (150, 4)
Shape of D_iris: (150, 150)


0.7874007874011812

As it is right now, this pure-Python implementation is very slow. For the `iris` dataset it is not so bad, but as soon as we get to even a thousand points, it becomes quite a problem.

In [12]:
# Note: A 'lambda' is equivalent to an anonymous, inline function
print("Iris:", timeit(lambda: eucl_dist_matrix(X_iris), number=1))

def random_dataset(m, n):
    X_rnd = []
    for i in range(m):
        row = []
        X_rnd.append(row)
        for j in range(n):
            row.append(random.random())
    return X_rnd
    # Alternative, shorter implementation:
    #return [[random.random() for _ in range(n)] for _ in range(m)]

X_rnd = random_dataset(1000, 100)

print("1000 random 100D points:", timeit(lambda: eucl_dist_matrix(X_rnd), number=1))

Iris: 0.12969236400022055
1000 random 100D points: 14.68535031600004


Now, obviously, one solution would be to reimplement this using Numpy (as we did in Part 1). We will get to that again soon, but first let's try something different. This problem can be seen as an *embarassingly parallel* problem: each cell, row, or column of the matrix $D$ can be computed independently, in any order, and the results will still be the same. 

See the example below for a demonstration of this. It is almost identical to the previous function, but this time the rows and columns ($i$ and $j$) are computed in a random sequence. The result is identical to the previous one.

In [15]:
def eucl_dist_matrix_shuffled(X):
    n = len(X)
    # Initialize D as empty list of lists
    D = empty_square_matrix(n)
    # Fill D with the actual pairwise distances
    # This time, however, the rows and columns are filled in random order
    for i in random.sample(range(n), k=n):
        for j in random.sample(range(n), k=n):
            D[i][j] = eucl_dist(X[i], X[j])
    return D

In [16]:
D_iris_shf = eucl_dist_matrix_shuffled(X_iris)

assert np.allclose(D_iris, D_iris_shf)

### Threads vs. Processes

There are, in general, two different ways to run tasks in parallel (in most programming languages): using multiple **threads** or multiple **processes**. 

**Threads** (https://docs.python.org/3/library/threading.html) are separate execution contexts within a single program. Code running in one thread cannot (in general) access or see the resources from another thread. Even if you never explicitly manipulated threads, you always used at least one thread in every program you ever wrote: the main (and, most of the time, only) execution thread where your program is running. You can, however, start new threads to run specific parts of your code (like the chunks we created before). In general, they will run in parallel (but not always, as we'll see below). Threads are very lightweight, which means you can start new threads very efficiently, because they share a lot of the resources of the process that is running them.

**Processes** (https://docs.python.org/3/library/multiprocessing.html) are different programs running in your computer, completely isolated from each other. For example, if you have a browser window and a terminal open, these are two different processes, each with its own memory and resources. Usually, whenever you run a Python script, you are running a single process: the Python interpreter that executes your code. You can, however, start new "child" processes to run specific parts of your code; in practice, you are basically starting a completely new interpreter. This is not as lightweight as threads, because you need to copy a lot of data from the parent process to the child (usually using the hard disk), and then back again after the child is done. 

In both cases, however, the workflow is quite similar: (1) break your problem into chunks, (2) start a new thread/process to execute each chunk, and (3) gather all the intermediate results and aggregate them to reach the final result. However, there are advantages and disadvantages for the two approaches, and in Python specifically, threads don't always work the way you'd expect them to.

### Python threads and the GIL (Global Interpreter Lock)

This is how you run a single thread to execute a single task.

In [19]:
import random
import time
from threading import Thread


# Warning: print is not thread-safe!
def a_simple_function(x):
    print("Running in a thread!")
    # Some (useful) time-consuming task that would be executed by the thread
    sum(2 ** x for x in range(random.randint(10000, 20000)))
    print("x =", x)
    
t = Thread(target=a_simple_function, args=(123,))
t.start()

t.join()

print("The end.")

Running in a thread!
x = 123
The end.


Note that, if you don't use `join` to wait for the thread to end, you may get unexpected results, such as "The end." being printed before "x = 123".

A single thread is usually not very interesting, though. In the example below, we start several different threads at the same time. The random size of the operation in each thread will simulate the unexpected external factors that may affect the running time of each thread. You should never expect the threads to be executed in a specific sequence (unless you're using locks or other more advanced mechanisms, which we won't discuss here).

In [24]:
threads = []

for i in range(5):
    t = Thread(target=a_simple_function, args=(i,))
    t.start()
    threads.append(t)

for t in threads:
    t.join()

print("The end.")

Running in a thread!Running in a thread!
Running in a thread!

Running in a thread!Running in a thread!

x = 0
x =x = 4
 1
x = 2
x = 3
The end.


Again, we have to `join` the threads if we want to make sure they are finished before we move on with the rest of the code. Nevertheless, you will probably still see some strange results with the `print` statements, since the threads are all running in parallel and there is a small random factor in each of them.

We're now ready to implement our multithreaded version of the euclidean distance matrix.

In [25]:
# Task to be run in parallel by each thread
def task(D, X, i, j):
    D[i][j] = eucl_dist(X[i], X[j])

def eucl_dist_matrix_thr(X):
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = empty_square_matrix(n)
    # Fill D with the actual pairwise distances
    threads = []
    for i in range(n):
        for j in range(n):
            # Previous implementation:
            #D[i][j] = eucl_dist(X[i], X[j])
            t = Thread(target=task, args=(D, X, i, j))
            t.start()
            threads.append(t)
    for t in threads:
        t.join()
    return D

Let's now check the correctness and performance of our new implementation:

In [26]:
X_rnd = random_dataset(300, 100)

D_rnd = eucl_dist_matrix(X_rnd)
D_rnd_thr = eucl_dist_matrix_thr(X_rnd)

assert np.allclose(D_rnd, D_rnd_thr)

print("Original:", timeit(lambda: eucl_dist_matrix(X_rnd), number=1))
print("Threaded:", timeit(lambda: eucl_dist_matrix_thr(X_rnd), number=1))

1.4346309740003562
7.8913885570000275


**Ouch.** Not only we did not get any improvement, we actually made it significantly slower. There are two main factors that caused this. First, starting threads is not "free". There is an overhead when you start a new thread, which is usually not a big deal (especially when compared with the overhead of starting a new process). However, in this case (with the `iris` dataset), we are starting $150 \times 150 = 22500$ threads! It is a bit exaggerated. We fix this in the implementation below.

In [27]:
# Task to be run in parallel by each thread
def task_2(D, X, i):
    for j in range(len(X)):
        D[i][j] = eucl_dist(X[i], X[j])

def eucl_dist_matrix_thr_2(X):
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = empty_square_matrix(n)
    # Fill D with the actual pairwise distances
    threads = []
    for i in range(n):
        t = Thread(target=task_2, args=(D, X, i))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    return D

In this new version, we start "only" $n$ threads, one per row of $X$. Each thread then handles the generation of one entire row of $D$. Let's check the correctness and performance below.

In [28]:
X_rnd = random_dataset(300, 100)

D_rnd = eucl_dist_matrix(X_rnd)
D_rnd_thr_2 = eucl_dist_matrix_thr_2(X_rnd)

assert np.allclose(D_rnd, D_rnd_thr_2)

print(timeit(lambda: eucl_dist_matrix(X_rnd), number=1))
print(timeit(lambda: eucl_dist_matrix_thr_2(X_rnd), number=1))

1.349947126999723
1.3732232420006767


Ok, this is not wonderful, but at least we got rid of the massive slowdown that we had before. **The lesson to learn here is:** threads are not free, so there must be a balance between the number of threads you start and the complexity of the task that each thread is executing. Too many lightweight threads may generate a significant overhead.

However, we still did not get any improvement. In fact, we basically had the exact same performance, plus a very small overhead for the threads. Why is that? Because of the infamous **Global Intepreter Lock, or GIL**. We're not getting into details here about the GIL and why it exists (trust me, you wouldn't enjoy it). Suffice to say, **the GIL only allows one thread to be executed at any given time** in the Python interpreter. That is, even though the threads are independent, only one is executed at a time. Yes, that means **the GIL does not allow threads be run in parallel**.

### Quick detour: Processes

The Process API is almost identical to the Thread, but with a few important caveats.

In [1]:
from multiprocessing import Process

# Task to be run in parallel by each process
def task_p(D, X, k, chunk_size):
    for i in range(k, min(k+chunk_size, len(X))):
        for j in range(len(X)):
            D[i][j] = eucl_dist(X[i], X[j])

def eucl_dist_matrix_p(X):
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = empty_square_matrix(n)
    # Fill D with the actual pairwise distances
    threads = []
    chunk_size = n // 8
    for i in range(0, n, chunk_size):
        t = Process(target=task_p, args=(D, X, i, chunk_size))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    return D

In [2]:
X_rnd = random_dataset(700, 100)

#D_rnd = eucl_dist_matrix(X_rnd)
#D_rnd_p = eucl_dist_matrix_p(X_rnd)

#assert np.allclose(D_rnd, D_rnd_p)

print(timeit(lambda: eucl_dist_matrix(X_rnd), number=1))
print(timeit(lambda: eucl_dist_matrix_p(X_rnd), number=1))

NameError: name 'random_dataset' is not defined

The code above will not work because of how the data is passed between processes. After $D$ is passed down to the child subprocess, it is never sent back to the parent one. For that you need some special *shared* objects. We will not get into that today.

### Back to threads

So, why did we do all this? Is multithreading useless in Python? **No.** See the example below.

In [29]:
# Task to be run in parallel by each thread
def task_3(D, X, i):
    D[i] = np.sqrt(np.sum((X[i] - X)**2, axis=-1))

def eucl_dist_matrix_thr_3(X):
    X = np.array(X)
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = np.zeros((n, n))
    # Fill D with the actual pairwise distances
    threads = []
    for i in range(n):
        t = Thread(target=task_3, args=(D, X, i))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    return D

In [31]:
X_rnd = random_dataset(500, 100)

D_rnd = np.array(eucl_dist_matrix(X_rnd))
D_rnd_thr_3 = eucl_dist_matrix_thr_3(X_rnd)
assert np.allclose(D_rnd, D_rnd_thr_3)

print(timeit(lambda: eucl_dist_matrix(X_rnd), number=1))
print(timeit(lambda: eucl_dist_matrix_thr_3(X_rnd), number=1))

3.7616889959999753
0.07967096500033222


However, we can do better. The threads here are still too lightweight. If we add more workload to each thread, and reduce the number of threads, we get a better result.

In [32]:
from scipy.spatial.distance import cdist

# Task to be run in parallel by each thread
def task_4(D, X, i, chunk_size):
    D[i:i+chunk_size] = cdist(X[i:i+chunk_size], X)
    #cdist(X[i:i+chunk_size], X)

def eucl_dist_matrix_thr_4(X, n_jobs=8):
    X = np.array(X)
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = np.zeros((n, n))
    chunk_size = n // n_jobs
    # Fill D with the actual pairwise distances
    threads = []
    for i in range(0, n, chunk_size):
        t = Thread(target=task_4, args=(D, X, i, chunk_size))
        t.start()
        threads.append(t)
    for t in threads:
        t.join()
    return D

In [37]:
X_rnd = random_dataset(10000, 100)

#D_rnd_thr_3 = eucl_dist_matrix_thr_3(X_rnd)
#D_rnd_thr_4 = eucl_dist_matrix_thr_4(X_rnd)
#assert np.allclose(D_rnd_thr_3, D_rnd_thr_4)

#print(timeit(lambda: eucl_dist_matrix_thr_3(X_rnd), number=1))
print(timeit(lambda: eucl_dist_matrix_thr_4(X_rnd), number=1))

15.657238543999483


### More code for comparison

Below you can find versions without threads, and using `joblib`.

In [67]:
# Task to be run in parallel by each thread
def task_4(D, X, i, chunk_size):
    D[i:i+chunk_size] = cdist(X[i:i+chunk_size], X)

def eucl_dist_matrix_4(X, n_jobs=8):
    X = np.array(X)
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = np.zeros((n, n))
    chunk_size = n // n_jobs
    # Fill D with the actual pairwise distances    
    for i in range(0, n, chunk_size):
        task_4(D, X, i, chunk_size)
    return D

In [68]:
X_rnd = random_dataset(15000, 100)

#D_rnd_thr_4 = eucl_dist_matrix_thr_4(X_rnd)
#D_rnd_4 = eucl_dist_matrix_4(X_rnd)
#assert np.allclose(D_rnd_thr_4, D_rnd_4)

print(timeit(lambda: eucl_dist_matrix_thr_4(X_rnd), number=1))
print(timeit(lambda: eucl_dist_matrix_4(X_rnd), number=1))

4.413308563999863
25.02847582200002


In [113]:
from scipy.spatial.distance import cdist
from joblib import Parallel, delayed

# Task to be run in parallel by each thread
def task_jl(D, X, i, chunk_size):
    return cdist(X[i:i+chunk_size], X)

def eucl_dist_matrix_jl(X, n_jobs=8):
    X = np.array(X)
    n = len(X)
    # Initialize D as empty square list of lists, with len(x) rows/columns
    D = np.zeros((n, n))
    chunk_size = n // n_jobs
    # Fill D with the actual pairwise distances
    p = Parallel(n_jobs=n_jobs, prefer='threads')
    D = p(delayed(task_jl)(D, X, i, chunk_size) for i in range(0, n, chunk_size))
    #print(D)
    return D

In [114]:
X_rnd = random_dataset(6000, 100)

#D_rnd_thr_4 = eucl_dist_matrix_thr_4(X_rnd)
#D_rnd_4 = eucl_dist_matrix_4(X_rnd)
#assert np.allclose(D_rnd_thr_4, D_rnd_4)

print(timeit(lambda: eucl_dist_matrix_thr_4(X_rnd), number=1))
print(timeit(lambda: eucl_dist_matrix_jl(X_rnd), number=1))



0.5781095169995751
0.6651502690001507


### Summary

**Process**, advantages:
* Parallelizes any Python code

**Process**, disadvantages:
* Slow to start
* Hard to communicate with

**Threads**, advantages:
* Lightweight, cheap to start (but not free)
* Easy to communicate with

**Threads**, disadvantages:
* Only parallelizes code that releases the GIL