# Parallelism

There are basically four interesting scenarios we can distinguish:

  - Few long-running IO bound jobs
  - Few long-running CPU bound jobs
  - Many short-running IO bound jobs
  - Many short-running CPU bound jobs

In [3]:
# The BLAS implementation used by numpy uses multiple processes, for our test we deactivate this
import os
os.environ['OMP_NUM_THREADS'] = '1'

In [7]:
import concurrent.futures
import time
import numpy as np

def long_io_bound():
    """ A long io bound job which runs for 1 s """
    time.sleep(1)
        
def long_cpu_bound():
    """ A long cpu bound job which runs for a few seconds """
    matrix = np.random.uniform(size=(90, 90))
    for i in range(0, 5000):
        matrix = matrix @ matrix
    
def short_io_bound():
    """ A short io bound job which runs for 1 ms """
    time.sleep(0.001)
    
def short_cpu_bound():
    """ A short cpu bound job which runs for a few ms """
    matrix = np.random.uniform(size=(90, 90))
    for i in range(0, 5):
        matrix = matrix @ matrix
        
few_jobs = 10
many_jobs = 10000

In [8]:
%%timeit 
long_io_bound()

1 s ± 271 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
%%timeit 
long_cpu_bound()

937 ms ± 5.15 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%%timeit 
short_io_bound()

1.14 ms ± 1.9 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


In [11]:
%%timeit 
short_cpu_bound()

1.17 ms ± 82.8 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


## Sequential

In [12]:
%%time
for i in range(few_jobs):
    long_io_bound()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 10 s


In [13]:
%%time
for i in range(few_jobs):
    long_cpu_bound()

CPU times: user 9.35 s, sys: 0 ns, total: 9.35 s
Wall time: 9.38 s


In [14]:
%%time
for i in range(many_jobs):
    short_io_bound()

CPU times: user 196 ms, sys: 92 ms, total: 288 ms
Wall time: 11.4 s


In [15]:
%%time
for i in range(many_jobs):
    short_cpu_bound()

CPU times: user 11.4 s, sys: 0 ns, total: 11.4 s
Wall time: 11.6 s


## Multiprocessing

In [16]:
%%time
with concurrent.futures.ProcessPoolExecutor() as executor:
    for i in range(few_jobs):
        executor.submit(long_io_bound)

CPU times: user 20 ms, sys: 8 ms, total: 28 ms
Wall time: 5.04 s


In [17]:
%%time
with concurrent.futures.ProcessPoolExecutor() as executor:
    for i in range(few_jobs):
        executor.submit(long_cpu_bound)

CPU times: user 4 ms, sys: 20 ms, total: 24 ms
Wall time: 6.39 s


In [18]:
%%time
with concurrent.futures.ProcessPoolExecutor() as executor:
    for i in range(many_jobs):
        executor.submit(short_io_bound)

CPU times: user 4.04 s, sys: 908 ms, total: 4.94 s
Wall time: 7.48 s


In [19]:
%%time
with concurrent.futures.ProcessPoolExecutor() as executor:
    for i in range(many_jobs):
        executor.submit(short_cpu_bound)

CPU times: user 3.2 s, sys: 628 ms, total: 3.82 s
Wall time: 11.2 s


## Threading

In [20]:
%%time
with concurrent.futures.ThreadPoolExecutor() as executor:
    for i in range(few_jobs):
        executor.submit(long_io_bound)

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 1 s


In [21]:
%%time
with concurrent.futures.ThreadPoolExecutor() as executor:
    for i in range(few_jobs):
        executor.submit(long_cpu_bound)

CPU times: user 10.1 s, sys: 8 ms, total: 10.1 s
Wall time: 5.25 s


In [22]:
%%time
with concurrent.futures.ThreadPoolExecutor() as executor:
    for i in range(many_jobs):
        executor.submit(short_io_bound)

CPU times: user 704 ms, sys: 140 ms, total: 844 ms
Wall time: 1.45 s


In [23]:
%%time
with concurrent.futures.ThreadPoolExecutor() as executor:
    for i in range(many_jobs):
        executor.submit(short_cpu_bound)

CPU times: user 12.7 s, sys: 184 ms, total: 12.9 s
Wall time: 7.61 s


## Async (Cooperative Parallelism)

In [30]:
import asyncio

async def cpu_bound(matrix):
    """ Asynchroneous matrix multiplication """
    return matrix @ matrix
    
async def long_io_bound():
    """ A long io bound job which runs for 1 s """
    await asyncio.sleep(1)
    
async def long_cpu_bound():
    """ A long cpu bound job which runs for a few seconds """
    matrix = np.random.uniform(size=(90, 90))
    for i in range(0, 5000):
        matrix = await cpu_bound(matrix)

async def short_io_bound():
    """ A short io bound job which runs for 1 ms """
    await asyncio.sleep(0.001)
    
async def short_cpu_bound():
    """ A short cpu bound job which runs for a few ms """
    matrix = np.random.uniform(size=(90, 90))
    for i in range(0, 5):
        matrix = await cpu_bound(matrix)

In [31]:
loop = asyncio.get_event_loop()

In [32]:
%%time
jobs = [asyncio.async(long_io_bound()) for i in range(few_jobs)]
loop.run_until_complete(asyncio.gather(*jobs))

CPU times: user 24 ms, sys: 4 ms, total: 28 ms
Wall time: 1.03 s


In [33]:
%%time
jobs = [asyncio.async(long_cpu_bound()) for i in range(few_jobs)]
loop.run_until_complete(asyncio.gather(*jobs))

CPU times: user 9.72 s, sys: 16 ms, total: 9.74 s
Wall time: 10.8 s


In [34]:
%%time
jobs = [asyncio.async(short_io_bound()) for i in range(many_jobs)]
loop.run_until_complete(asyncio.gather(*jobs))

CPU times: user 1.11 s, sys: 36 ms, total: 1.14 s
Wall time: 1.48 s


In [35]:
%%time
jobs = [asyncio.async(short_cpu_bound()) for i in range(many_jobs)]
loop.run_until_complete(asyncio.gather(*jobs))

CPU times: user 11.9 s, sys: 8 ms, total: 11.9 s
Wall time: 12.6 s
