In [8]:
import time

from multiprocessing import Pool

from utilities import countdown, COUNT

# Multithreading, Coroutines & Multiprocessing in Python 

In this notebook, we will look at some simple examples of multithreading, multiprocessing and asynchronous programs in Python. The execution time of each piece of code is out primary focus. We have a small utility file named `utilities.py` that contains a countdown function that we will use in the experiments, that decrements an integer until it reaches zero. 

First let us run a synchronous version of this function on a single thread and report the time in seconds.

In [9]:
# single threaded program

# start a timer
start = time.time()

# run the task
countdown(COUNT)

# end the timer and print execution time
end = time.time()
print('Time taken in seconds -', end - start)

Time taken in seconds - 3.924863815307617


Next, we will split this task over two threads by using the `threading` builtin module. To do this we create two `Thread()` instances whose target callable is our countdown function. We then split the work over the two threads evenly by getting each thread to count half of the integer `COUNT`. 

We start each thread's activity by calling the `start()` method on the `Thread` instances. We then call `join()`, which blocks until the thread returns. We might naively expect this to half the execution time. 

In [10]:
# multithreaded program
from threading import Thread


# Create two thread objects and use them to invoke our countdown
# function, splitting the count over each of them
t1 = Thread(target=countdown, args=(COUNT//2,))
t2 = Thread(target=countdown, args=(COUNT//2,))

# start the timer
start = time.time()

# start the threads
t1.start()
t2.start()

# block the calling thread until each thread terminates
t1.join()
t2.join()

# end the timer and print the execution time
end = time.time()
print('Time taken in seconds -', end - start)

Time taken in seconds - 3.8616461753845215


We see that the execution time has not decreased as we expected, but has actually increased! From the lecture slides, we know this is due to the Global Interpreter Lock (GIL) preventing concurrent execution of the thread's activity. The small increase in execution time is due to the overheads in spawning new threads.

Next we will try the multiprocessing builtin module to split the workload over new Python processes. To do this we create a `multiprocessing.Pool()` object and indicate how many processes we wish to use.

We then map the callable `countdown` over iterable of the count chunks.

In [11]:
# multiprocessing program

# start the timer
start = time.time()

# split the workload over multiple processes by creating a Pool object
# with `num_processors` number of processes
num_processors = 3
p = Pool(processes = num_processors)

# apply the task function to chunks of the total COUNT
p.map(countdown, num_processors*[COUNT//num_processors])

# end the timer
end = time.time()

print('Time taken in seconds -', end - start)   
    

Time taken in seconds - 1.9298930168151855


Unlike the multithreaded example, we can see that the execution time has significantly decreased meaning that the workload was indeed executed in parallel. We don't see linear speedups, likely due to the overheads of setting each process up allocating memory etc and gathering the results back to the main process.

# Asynchronous programming with Python

Asynchronous programming in Python is based on coroutines and uses the builtin `asyncio` module.

The general architecture of async programming is to write small modular coroutines and one wrapper function that serves to chain each of the smaller coroutines together. In this example, we have a coroutine `f()` that sleeps for a random interval. It prints when it starts and when it returns.

We then have a main function `g()` that calls `f()` a number of times.

The first example is synchronous, each call of `f()` starts, sleeps and ends before the next one can start. This is quite slow, and our program doesn't do anything useful whilst it is waiting for each function to sleep. 

In [12]:
# synchronous programming in Python

import random


def f(i: int):
    print(f"Routine {i} is starting ...")
    sleep_time = random.randint(1,5)
    time.sleep(sleep_time)
    print(f"Routine {i} slept for {sleep_time}s")
    return sleep_time


def g():
    r = [f(i) for i in range(5)]
    return r


# start the timer
start = time.time()

# call g()
g()

# end the timer
end = time.time()

print('Time taken in seconds -', end - start)   

Routine 0 is starting ...
Routine 0 slept for 4s
Routine 1 is starting ...
Routine 1 slept for 1s
Routine 2 is starting ...
Routine 2 slept for 1s
Routine 3 is starting ...
Routine 3 slept for 3s
Routine 4 is starting ...
Routine 4 slept for 3s
Time taken in seconds - 12.020970106124878


In the second example we will make use of asynchronous programming in Python by declaring the functions to be coroutines with the `async` signature and declaring when we want to switch context with the `await` keyword

```Python
async def g():
    # Pause execution of g() here and come back when f() has finished
    r = await f()
    return r
```

We define a function `g()` below to gather the tasks by mapping the central coroutine `f()` across an iterable. (In Python scripts, we can ignore the `nest_asyncio.apply()` line, as this is in order to use asyncio in Jupyter notebooks.)

Note all functions should be declared `async`, we shouldn't call synchronous functions from async functions as a general design principle. If any parts of the codebase are blocking functions, then they will grab hold of the execution and ruin the asynchronous desing pattern.

In [13]:
# asynchronous programming in Python

import asyncio
import nest_asyncio


async def f(i: int):
    print(f"Coroutine {i} is starting ...")
    sleep_time = random.randint(1,5)
    await asyncio.sleep(sleep_time)
    print(f"Coroutine {i} slept for {sleep_time}s")
    return sleep_time


async def g():
    r = await asyncio.gather(*(f(i) for i in range(5)))
    return r


# Enable nested asyncio event loop as we are running in a Jupyter notebook
nest_asyncio.apply()

# start the timer
start = time.time()

# call g() asynchronously
asyncio.run(g())

# end the timer
end = time.time()

print('Time taken in seconds -', end - start)   

Coroutine 0 is starting ...
Coroutine 1 is starting ...
Coroutine 2 is starting ...
Coroutine 3 is starting ...
Coroutine 4 is starting ...
Coroutine 1 slept for 1s
Coroutine 3 slept for 1s
Coroutine 0 slept for 4s
Coroutine 2 slept for 4s
Coroutine 4 slept for 5s
Time taken in seconds - 5.002790927886963


We can see that it didn't take long for the asynchronous version to return, this is because instead of waitiing idly on each sleep, the coroutines passed the context back to the main calling coroutine `g()` and allowed it to carry on its execution. We can see that the other calls of `f()` start before the first ends. 

# Subprocesses

The subprocesses builtin module allows you to run new programs and scripts by spawning a processes from inside a Python program. It uses Pipes to connect to the standard output and error streams as well as gathering return codes.

In [14]:
# An example of the subprocesses module

from subprocess import Popen, PIPE


# Create a Popen instance, passing command line arguments and piping the output streams
# In this case, we want to read the contents of the `utilities.py` file to standard output.
process = Popen(['cat','utilities.py'], stdout=PIPE, stderr=PIPE)

# communicate with the process and obtain standard output and error streams
stdout, stderr = process.communicate()

# print the standard output string
print(stdout)

b'COUNT = 100000000\n\ndef countdown(n):\n    while n>0:\n        n -= 1'
