# Concurrency and Parallelism in Python
Until now, we have used Python sequentially: instructions were executed one after the other.

However, Python offers in its standard library several modules for **concurrent** and **parallel** execution: several instructions of code will execute at the same time, or seem to.

In Python 3, there are 3 main ways to execute code in concurrently on your local machine:

|Concurrency Type | Switching Decision | Number of CPU cores|
| --- | --- | --- |
|Pre-emptive multitasking (threading) | The operating system decides when to switch tasks external to Python. | 1|
|Cooperative multitasking (asyncio) | The tasks decide when to give up control. | 1|
|Multiprocessing (multiprocessing) | The processes all run at the same time on different processors. | Many| 

In the scope of this course, we will not cover cooperative multitasking. Most of your needs can be covered by simple threading or multiprocessing but that should not discourage you from learning about `asyncio` as it is pretty neat.

## A simple threading example

In the code below, we define a simple function.

In [1]:
from time import sleep

def task(id):
    print(f"Task {id}: starting. ")
    sleep(.1) # .1 seconds
    print(f"Task {id}: finishing. ")

Let's say we want to call it 5 times. We measure how long it will take to run.

In [2]:
from time import perf_counter

start_time = perf_counter()

for i in range(5):
    task(i)

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")    

Task 0: starting. 
Task 0: finishing. 
Task 1: starting. 
Task 1: finishing. 
Task 2: starting. 
Task 2: finishing. 
Task 3: starting. 
Task 3: finishing. 
Task 4: starting. 
Task 4: finishing. 

Time spent inside the loop: 0.5237274999999997 seconds.


We wait 5 times for .1 second, so .5 seconds. What if we start 5 times the `task()` method at once?

We use the ``threading`` module. We create a ``Thread`` object whose contructor takes the function and its arguments as parameters. Then, we ``start()`` the thread.

This executes the tasks *concurrently* on one CPU core. Picture the OS passing "the ball" to each thread one by one and waiting for them to send it back in their own time.

In [3]:
from threading import Thread

start_time = perf_counter()

threads = list()
for i in range(5):
    thread = Thread(target=task, args=(i,)) # New thread will run "task" with argument "i"
    threads.append(thread) # To keep track of all the treads

for thread in threads:
    thread.start()

for thread in threads:  # The second loop is necessary. start() everything then join() everything.
    thread.join() # Make sure all the threads are done before continuing

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")    

Task 0: starting. Task 1: starting. 

Task 2: starting. 
Task 3: starting. 
Task 4: starting. 
Task 0: finishing. Task 1: finishing. Task 3: finishing. 


Task 4: finishing. Task 2: finishing. 


Time spent inside the loop: 0.10274862500000026 seconds.


Let's unpack what we observe:

* **Evidence of concurrent execution:** 
  * The final **print()** function executes after only .1 second.
* **Threading does not guaranty order:** 
  * The threads do *not* finish in the order they were started.
* **Evidence of race conditions:** 
  * The outputs are mangled. 
  * Observe how some outputs are on the same line, showing a thread started to write while another was still writing.
  * The threads are fighting for the same resource. In this case, the standard output, your screen.

### Locks avoid race conditions

A simple way to "synchronize" our threads, to share resources gracefully, we use a *lock*:

When accessing a shared resource, we tell the lock to block its access until released. If another thread wants to use this resource, it must wait.

In [4]:
from threading import RLock

rlock = RLock() # Needs to be outside the function. Created once, used by every thread.

def task_locked(i):
    with rlock: # Equivalent of rlock.acquire() [insert code] rlock.release()
        task(i)

In [5]:
start_time = perf_counter()

threads = list()
for i in range(5):
    thread = Thread(target=task_locked, args=(i,))
    threads.append(thread)

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
    
print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")    

Task 0: starting. 
Task 0: finishing. 
Task 1: starting. 
Task 1: finishing. 
Task 2: starting. 
Task 2: finishing. 
Task 3: starting. 
Task 3: finishing. 
Task 4: starting. 
Task 4: finishing. 

Time spent inside the loop: 0.5225427499999995 seconds.


In this case, the lock is blocking the whole ``task()`` function. It is better to lock at a more granular level to free up the locked resources more often. 

In [6]:
def task_locked(id):
    with rlock:
        print(f"Task {id}: starting. ")
    sleep(.1) # This is the part that takes time and should be run concurrently
    with rlock:
        print(f"Task {id}: finishing. ")

In [7]:
start_time = perf_counter()

threads = list()
for i in range(5):
    thread = Thread(target=task_locked, args=(i,))
    threads.append(thread)

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
    
print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")    

Task 0: starting. 
Task 1: starting. 
Task 2: starting. 
Task 3: starting. 
Task 4: starting. 
Task 1: finishing. 
Task 0: finishing. 
Task 3: finishing. 
Task 2: finishing. 
Task 4: finishing. 

Time spent inside the loop: 0.10432283299999945 seconds.


### OOP equivalent

To run the code concurrently, the ``Sleeper`` class inherits from the `Thread` class.

In [8]:
class Sleeper(Thread): # Sleeper is a Thread
    def __init__(self, id): # Override Thread constructor
        super().__init__() # Call the parent constructor. Not optional because we are overriding __init__
        self.id = id

    def run(self): # Called "run" to comply with the Thread module default behavior.
        print(f"Task {self.id}: starting. ")
        sleep(.1)
        print(f"Task {self.id}: finishing. ")

Calling the ``run()`` method directly, we execute the code sequentially, as if ``Sleeper`` were not a ``Thread``.

In [9]:
start_time = perf_counter()

for i in range(5):
    Sleeper(i).run() # Not using threads

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")

Task 0: starting. 
Task 0: finishing. 
Task 1: starting. 
Task 1: finishing. 
Task 2: starting. 
Task 2: finishing. 
Task 3: starting. 
Task 3: finishing. 
Task 4: starting. 
Task 4: finishing. 

Time spent inside the loop: 0.5177888330000009 seconds.


We have to call the ``start()`` method inherited from ``Thread`` (see [https://docs.python.org/2/library/threading.html#thread-objects](https://docs.python.org/2/library/threading.html#thread-objects)).

In [10]:
start_time = perf_counter()

sleepers = list()
for i in range(5):
    sleepers.append(Sleeper(i))

for sleeper in sleepers:
    sleeper.start()

for sleeper in sleepers:
    sleeper.join()
    
print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")

Task 0: starting. Task 1: starting. 

Task 2: starting. 
Task 3: starting. 
Task 4: starting. 
Task 1: finishing. 
Task 0: finishing. 
Task 3: finishing. Task 4: finishing. 

Task 2: finishing. 

Time spent inside the loop: 0.11432316699999845 seconds.


Using Locks:

In [11]:
class SleeperLock(Sleeper): # SleeperLock is a Sleeper and thus a Thread
    rlock = RLock() # This is a class variable. It is created only once.

    def run(self): # Override the parent run() method
        with self.rlock: 
            print(f"Task {self.id}: starting. ")
        sleep(.1)
        with self.rlock: 
            print(f"Task {self.id}: finishing. ")

In [12]:
start_time = perf_counter()

sleepers = list()
for i in range(5):
    sleepers.append(SleeperLock(i))

for sleeper in sleepers:
    sleeper.start()

for sleeper in sleepers:
    sleeper.join()
    
print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")

Task 0: starting. 
Task 1: starting. 
Task 2: starting. 
Task 3: starting. 
Task 4: starting. 
Task 1: finishing. 
Task 2: finishing. 
Task 3: finishing. 
Task 4: finishing. 
Task 0: finishing. 

Time spent inside the loop: 0.10353191700000153 seconds.


## Processes vs Threads, the GIL

For performance reasons, and because of its age, Python uses a Global Interpreter Lock or GIL ([https://realpython.com/python-gil/](https://realpython.com/python-gil/)). This *lock* enforces the use of a single CPU core for pre-emptive and cooperative concurrency. If we want to leverage modern CPUs and their many cores, we need to turn to *multiprocessing* to run processes in *parallel*.

**How to decide? It depends on your problem. Is it:**

* I/O bound: use threading. The bottleneck is not your CPU, so running on more than one core is not going to improve performance.
* CPU bound: use multiprocessing. If the problem maxes out one CPU core, chances are spreading the load is the solution.

**Why not use multiprocessing all the time then?**

Because it creates a lot of overhead by *spawning* or *forking* (see below) entire copies of your main python process whereas threads are much more lightweight and can share ressources between them in addition.

First we start with the sequential baseline:

In [13]:
from math import factorial

it_nb = 30 # Change this to ajust compute time on your machine

start_time = perf_counter()

for i in range(it_nb):
    factorial(10**5) # Big number

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")


Time spent inside the loop: 4.535982041000002 seconds.


Then, just to be sure, using threading:

In [14]:
start_time = perf_counter()

threads = list()

for i in range(it_nb):
    thread = Thread(target=factorial, args=(10**5,))
    threads.append(thread)

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()
print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")


Time spent inside the loop: 4.470772624999999 seconds.


As expected, the problem could use more **power**, not faster **I/O**. Let's check multiprocessing:

In [15]:
from multiprocessing import Process

start_time = perf_counter()

processes = list()

for i in range(it_nb):
    process = Process(target=factorial, args=(10**5,))
    processes.append(process)

for process in processes:
    process.start()

for process in processes:
    process.join()

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")



Time spent inside the loop: 1.1059027090000022 seconds.


You should see a significant speedup in compute time if your computer has more than one core. But not quite as fast as the number of your computer cores would suggest. Why? The overhead of creating entire processes. Also, we created ``it_nb`` processes. Do you have that amount of CPU cores?

In [16]:
from multiprocessing import cpu_count

print(f"Number of CPU cores on my machine: {cpu_count()}")

Number of CPU cores on my machine: 8


Probably not.

## Map, Workers and Pools

There are high-level tools to help you inject concurrency and parallelism into your code less painfully.

One of the most handy tools is to think about your problem in relation to the ``map`` function ([https://realpython.com/python-map-function/](https://realpython.com/python-map-function/)):

In [17]:
start_time = perf_counter()

factorial_arguments = [10**5, ] * it_nb # [10**5, 10**5, ...]

gen = map(factorial, factorial_arguments) # map is a generator (think yield)
tuple(gen) # do the actual work

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")


Time spent inside the loop: 4.548467917 seconds.


This is the ``map`` version of the sequential code we saw above. Unsurprizingly, the execution time is similar.

Let's see how to code it with threads. Even though we saw it would be useless in this CPU bound case.

In [18]:
from concurrent.futures import ThreadPoolExecutor 

start_time = perf_counter()

with ThreadPoolExecutor() as pool: # Without arguments, ThreadPoolExecutor() will create as many workers as CPU cores + 4
    tuple(pool.map(factorial, factorial_arguments))

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")


Time spent inside the loop: 4.487030957999998 seconds.


Pretty neat and very limited code change. Note that ``ThreadPoolExecutor`` takes a ``max_workers`` argument so things don't get out of hand ([https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor](https://docs.python.org/3/library/concurrent.futures.html#concurrent.futures.ProcessPoolExecutor))

Now with multiprocessing:

In [19]:
from multiprocessing import Pool

start_time = perf_counter()

with Pool() as pool: # Without arguments, Pool() will create as many workers as CPU cores (a good default).
    tuple(pool.map(factorial, factorial_arguments))

print(f"\nTime spent inside the loop: {perf_counter() - start_time} seconds.")


Time spent inside the loop: 1.0267589169999987 seconds.


Again, minimal code change to add multiprocessing to your "loop".

Note that all these ``map()`` functions have the added benefit of preserving order:

In [20]:
from multiprocessing import get_context, RLock

rlock = RLock()

def task(id):
    with rlock:
        if id < 10: # Only to keep the output small
            print(id, end=', ')
    return id

with get_context("fork").Pool() as pool: # get_context("fork"). Seems necessary on my M1 mac, otherwise the code hangs. Try the normal Pool() first. 
    res = tuple(pool.map(task, range(71))) # Any number. May require a few tries to show different outputs.

print()
print(res[:10]) # the first 10 results should be sufficient to show the 2 lists are different

0, 3, 6, 9, 1, 4, 7, 2, 5, 8, 
(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)


The results are ordered even though the processes executed in a different order.

**NB** for more explanations on *forks* and *spawns*:
* [https://stackoverflow.com/questions/67999589/multiprocessing-with-pool-throws-error-on-m1-macbook](https://stackoverflow.com/questions/67999589/multiprocessing-with-pool-throws-error-on-m1-macbook)
* [https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn](https://britishgeologicalsurvey.github.io/science/python-forking-vs-spawn)

### Effect of the number of workers on performance for multiprocessing

In [21]:
from multiprocessing import Pool

for i in range(10): 
    nb_workers = 2**i # Increase the number of workers exponentially

    start_time = perf_counter()

    with Pool(processes=nb_workers) as pool: # We fix the number of workers ourselves
        tuple(pool.map(factorial, factorial_arguments))

    print(f"[{nb_workers} workers] Time spent inside the loop: {perf_counter() - start_time} seconds.")

[1 workers] Time spent inside the loop: 4.791193207999999 seconds.
[2 workers] Time spent inside the loop: 2.533712584 seconds.
[4 workers] Time spent inside the loop: 1.3501607090000007 seconds.
[8 workers] Time spent inside the loop: 1.047103208000003 seconds.
[16 workers] Time spent inside the loop: 1.0464014579999983 seconds.
[32 workers] Time spent inside the loop: 1.1825077920000027 seconds.
[64 workers] Time spent inside the loop: 1.455161000000004 seconds.
[128 workers] Time spent inside the loop: 1.9985152909999968 seconds.
[256 workers] Time spent inside the loop: 3.2653933329999987 seconds.
[512 workers] Time spent inside the loop: 6.078202250000004 seconds.


It is probably good enough to leave it to Python to determine the number of workers by calling your pools without arguments.

## Additional resources

* [https://fastapi.tiangolo.com/async/#asynchronous-code](https://fastapi.tiangolo.com/async/#asynchronous-code)
* [https://realpython.com/python-concurrency/](https://realpython.com/python-concurrency/)
* [https://realpython.com/async-io-python/](https://realpython.com/async-io-python/)
* [https://realpython.com/python-gil/](https://realpython.com/python-gil/)