Multi-Core Parallelism
====

In [1]:
%matplotlib inline
import matplotlib.pyplot as plt
import numpy as np
import os

In [2]:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import multiprocessing as mp
from multiprocessing import Pool, Value, Array
import time

Vanilla Python
----

In [3]:
def mc_pi(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(-1, 1)
        y = np.random.uniform(-1, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [4]:
%%time

mc_pi(int(1e5))

CPU times: user 584 ms, sys: 4 ms, total: 588 ms
Wall time: 587 ms


3.14364

In [5]:
%%time

res = [mc_pi(int(1e5)) for i in range(10)]

CPU times: user 6.02 s, sys: 8 ms, total: 6.03 s
Wall time: 6.03 s


The `concurrent.futures` module
----

Concurrent processes are processes that will return the same results regardless of the order in which they were executed. A "future" is something that will return a result sometime in the future.  The `concurrent.futures` module provides an event handler, which can be fed functions to be scheduled for future execution. This provides us with a simple model for parallel execution on a multi-core machine.

While concurrent futures provide a simpler interface, it is slower and less flexible when compared with using `multiprocessing` for parallel execution.

Using processes in parallel with `ProcessPoolExecutor`
----

We get a linear speedup as expected.

In [6]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e5) for i in range(10)])

CPU times: user 32 ms, sys: 36 ms, total: 68 ms
Wall time: 1.87 s


In [7]:
np.array(list(res))

array([3.14444, 3.14444, 3.14444, 3.14444, 3.14568, 3.14568, 3.14568,
       3.14568, 3.14292, 3.14292])

### When you have many jobs

The `futures` object gives fine control over the process, such as adding callbacks and canceling a submitted job, but is computationally expensive. We can use the `chunksize` argument to reduce this cost when submitting many jobs - this specifies the number of tasks to be given to a worker at a time. A detailed explanation of `chunksize` is provided [here](https://stackoverflow.com/questions/53751050/python-multiprocessing-understanding-logic-behind-chunksize)

#### Using default `chunksize `

The total amount of computation whether you have 10 jobs of size 10,000,000 or 10,000 jobs of size 10,000 is essentially the same, so we would expect them both to take about the same amount of time, but this is not true due to the overhead described above.

In [8]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))])

CPU times: user 3.59 s, sys: 1.96 s, total: 5.55 s
Wall time: 3.36 s


#### Using `chunksize` of 100

In [9]:
%%time

with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))], chunksize=100)

CPU times: user 52 ms, sys: 56 ms, total: 108 ms
Wall time: 1.55 s


### Fine control of processes

#### Status of processes

In [10]:
def f1(x):
    return x**2

def f2(x, y):
    return x*y

In [11]:
with ProcessPoolExecutor(max_workers=4) as pool:
    a = pool.submit(f2, 1, 1)
    b = pool.submit(f2, 1,2)
    c = pool.submit(f1, 10)    

    print('a running:', a.running())
    print('a done:', a.done())

    print('b running:', b.running())
    print('b done:', b.done())

    print('c running:', c.running())
    print('c done:', c.done())

    print('a result', a.result())
    print('b result', b.result())
    print('c result', c.result())

a running: True
a done: False
b running: False
b done: False
c running: False
c done: False
a result 1
b result 2
c result 100


### Canceling jobs and adding callbacks

In [12]:
njobs = 24

res = []

with ProcessPoolExecutor(max_workers=4) as pool:

    for i in range(njobs):
        res.append(pool.submit(f2, *np.random.rand(2)))
        if i % 2 == 0:
            res[i].add_done_callback(lambda future: print("Process done!"))
    res[4].cancel()
    if res[4].cancelled():
        print("Process 4 cancelled")

    for i, x in enumerate(res):
        while x.running():
            print("Running")
            time.sleep(1)
        if not x.cancelled():
            print(x.result())

Process done!
Process done!
Process done!
Process done!
Process done!
0.31519074298915756Process done!

0.06704854566590644
0.1487331123666466
Process done!
0.31240894356208665
0.08606349503792518
0.042283915266939115
0.1251136196055298
0.3633936992188428
0.35568598614576685
0.673662927280906
0.9454890321275123
0.022222962987650002Process done!
0.008743570535942578
0.3442006639040779

0.11265572032056531
Process done!
0.024921543903694235
0.20528728247554254
Running
Process done!
Process done!
Process done!
0.11719654784926778
0.08696919374340895
0.42954575400997613
0.28141424648261726
0.05920277319195527
0.19906252971138028
0.3350965655000501


### Functions with multiple arguments

In [13]:
def f(a, b):
    return a + b

#### Using a function adapter

In [14]:
def f_(args):
    return f(*args)

In [15]:
xs = np.arange(24)
chunks = np.array_split(xs, xs.shape[0]//2)

In [16]:
chunks

[array([0, 1]),
 array([2, 3]),
 array([4, 5]),
 array([6, 7]),
 array([8, 9]),
 array([10, 11]),
 array([12, 13]),
 array([14, 15]),
 array([16, 17]),
 array([18, 19]),
 array([20, 21]),
 array([22, 23])]

In [17]:
with ProcessPoolExecutor(max_workers=4) as pool:
    res = pool.map(f_, chunks)
list(res)

[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

Using processes in parallel with ThreadPoolExecutor
----

We do not get any speedup because the GIL only allows one thread to run at one time.

In [18]:
%%time

with ThreadPoolExecutor(max_workers=4) as pool:
    res = pool.map(mc_pi, [int(1e5) for i in range(10)])

CPU times: user 6.36 s, sys: 0 ns, total: 6.36 s
Wall time: 6.26 s


In [19]:
np.array(list(res))

array([3.14232, 3.13844, 3.14356, 3.13676, 3.13676, 3.13928, 3.13996,
       3.1428 , 3.14516, 3.14312])

## Using `multiprocessing`

The `concurrent.futures.ProcessPoolExecutor` is actually a wrapper for `multiprocessing.Pool` to unify the threading and process interfaces. I typically just work directly with `mutliprocessing` since I don't have much use for threads. One nice thing about using `multiprocessing` apart from more fine-grai control if you need it, is that it works equally well for small numbers of large jobs, or large numbers of small jobs out of the box.

In [20]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi, [int(1e5) for i in range(10)])

CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 1.83 s


In [21]:
np.array(res)

array([3.13924, 3.13924, 3.13924, 3.13924, 3.13504, 3.13504, 3.13504,
       3.13504, 3.14184, 3.14184])

In [22]:
%%time

with mp.Pool(processes=4) as pool:
    res = pool.map(mc_pi, [int(1e2) for i in range(int(1e4))])

CPU times: user 16 ms, sys: 24 ms, total: 40 ms
Wall time: 1.63 s


In [23]:
np.array(res)

array([3.12, 3.04, 3.24, ..., 3.12, 3.36, 3.12])

### Functions with multiple arguments

Multiprocessing `Pool` has a `starmap` method that removes the need to write a wrapper function.

In [24]:
def f(a, b):
    return a + b

In [25]:
xs = np.arange(24)
with Pool(processes=4) as pool:
    res = pool.starmap(f, np.array_split(xs, xs.shape[0]//2))
list(res)

[1, 5, 9, 13, 17, 21, 25, 29, 33, 37, 41, 45]

#### Partial application

Sometimes, `functools.partial` can be used to reduce the number of arguments needed to just one.

In [26]:
def f(a, b):
    return a * b

In [27]:
from functools import partial

fp = partial(f, b=2)

In [28]:
xs = np.arange(24)
with Pool(processes=4) as pool:
    res = pool.map(fp, xs)
np.array(list(res))

array([ 0,  2,  4,  6,  8, 10, 12, 14, 16, 18, 20, 22, 24, 26, 28, 30, 32,
       34, 36, 38, 40, 42, 44, 46])

### Blocking and non-blocking calls

In [29]:
def func(n):
    time.sleep(n)
    return n

In [30]:
with Pool(processes=4) as pool:
    res = pool.map(func, [3,3,3,3,3])
    print("Control back!")
res

Control back!


[3, 3, 3, 3, 3]

In [31]:
with Pool(processes=4) as pool:
    res = pool.map_async(func, [3,3,3,3,3])
    print("Control back!")
    print(res.ready())
    res.wait()
    print(res.ready())
    print(res.get())

Control back!
False
True
[3, 3, 3, 3, 3]


#### Different jobs to different processes

In [32]:
def f1(n):
    time.sleep(1)
    return n

def f2(n):
    time.sleep(1)
    return n**2

def f3(n):
    time.sleep(1)
    return n**3

def f4(n):
    time.sleep(1)
    return n**4

In [33]:
%%time

with Pool(processes=4) as pool:
    res = []
    for i, f in enumerate([f4, f2, f3, f1]):
        res.append((i, pool.apply(f, [2])))
    print(res)

[(0, 16), (1, 4), (2, 8), (3, 2)]
CPU times: user 4 ms, sys: 40 ms, total: 44 ms
Wall time: 4.14 s


In [34]:
%%time

with Pool(processes=4) as pool:
    res = []
    for i, f in enumerate([f4, f2, f3, f1]):
        res.append((i, pool.apply_async(f, [2])))
    print([(i, r.get()) for i, r in res])

[(0, 16), (1, 4), (2, 8), (3, 2)]
CPU times: user 20 ms, sys: 20 ms, total: 40 ms
Wall time: 1.14 s


### Creating individual processes

If you need more control over individual processes than Pool provides - namely, if you need to share information across processes, you can work with individual workers and thread-safe memory structures. This is just for completeness as most data processing tasks do not require this level of control.

In [35]:
def f(i):
    time.sleep(np.random.random())
    print(os.getpid(), i)

In [36]:
for i in range(10):
    p = mp.Process(target=f, args=(i,))
    p.start()
    p.join()

11011 0
11014 1
11017 2
11020 3
11023 4
11026 5
11029 6
11032 7
11035 8
11038 9


#### Using Queues to share information between processes.

In [37]:
def f1(q, i):
    time.sleep(np.random.random())
    q.put((os.getpid(), i))

In [38]:
q = mp.Queue()

res = []
for i in range(10):
    p = mp.Process(target=f1, args=(q,i,))
    p.start()
    res.append(q.get())
    p.join()

res

[(11041, 0),
 (11043, 1),
 (11045, 2),
 (11047, 3),
 (11049, 4),
 (11051, 5),
 (11053, 6),
 (11055, 7),
 (11057, 8),
 (11059, 9)]

#### Using Value and Array for sharing data

#### Counting number of jobs (1)

This does not work.

In [39]:
def f2(i):
    global counter
    counter = counter + 1
    print(os.getpid(), i)

#### Checking

In [40]:
counter = 0
f2(10)
print(counter)

10887 10
1


In [41]:
counter = 0

for i in range(10):
    p = mp.Process(target=f2, args=(i,))
    p.start()
    p.join()

11061 0
11064 1
11067 2
11070 3
11073 4
11076 5
11079 6
11082 7
11085 8
11088 9


#### Note that separate processes have their own memory and DO NOT share global memory

In [42]:
counter

0

#### Counting number of jobs (2)

We can use shared memory to do this, but it is slow because multiprocessing has to ensure that only one process gets to use counter at any one time. Multiprocesing provides Value and Array shared memory variables, but you can also convert arbitrary Python variables into shared memory objects (less efficient).

In [43]:
def f3(i, counter, store):
    counter.value += 1
    store[os.getpid() % 10] += 1

In [44]:
%%time

counter = mp.Value('i', 0)
store = mp.Array('i', [0]*10)

for i in range(int(1e2)):
    p = mp.Process(target=f3, args=(i, counter, store))
    p.start()
    p.join()

print(counter.value)
print(store[:])

100
[10, 10, 10, 10, 10, 10, 10, 10, 10, 10]
CPU times: user 68 ms, sys: 240 ms, total: 308 ms
Wall time: 745 ms


#### Avoiding use of shared memory

#### Counting number of jobs (3)

We should try to avoid using shared memory as much as possible in parallel jobs as they drastically reduce efficiency. One useful approach is to use the `map-reduce` pattern. We should also use Pool to reuse processes rather than spawn too many of them. 

In [45]:
def f4(i):
    return (os.getpid(), 1, i)

In [46]:
%%time

# map step
with mp.Pool(processes=4) as pool:
    res = pool.map(f4, range(int(1e2)))

CPU times: user 4 ms, sys: 24 ms, total: 28 ms
Wall time: 126 ms


##### Reduce steps

In [47]:
res = np.array(res)

In [48]:
res[np.random.choice(len(res), 10)]

array([[11192,     1,    53],
       [11191,     1,    69],
       [11193,     1,    17],
       [11191,     1,    32],
       [11193,     1,    14],
       [11191,     1,    63],
       [11192,     1,    36],
       [11191,     1,    44],
       [11191,     1,    48],
       [11191,     1,    79]])

In [49]:
import pandas as pd

In [50]:
df = pd.DataFrame(res, columns=['pid', 'one', 'i'])

In [51]:
df.groupby('pid').sum()

Unnamed: 0_level_0,one,i
pid,Unnamed: 1_level_1,Unnamed: 2_level_1
11191,35,1575
11192,28,1358
11193,28,1652
11194,9,365
