In [1]:
%run talktools.py

<img src="https://www.evernote.com/l/AUV1r1xvhBdPF6lX-2SJLkO-vkkmCXEDrMwB/image.png">
http://www.slideshare.net/ManojitNandi/parallel-programming-in-python-speeding-up-your-analysis

Remember, you can create (fork) many processes, which are copies of the original parent process (memory, data, state) and act independently of each other. To share data between them you have to explicitly do that within each process. The Pythonic way we do multiprocessing (creation of new processes, communication between processes) is with `multiprocessing`.

*"effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine. It runs on both Unix and Windows."*

- https://docs.python.org/3/library/multiprocessing.html

The analog to `threading.Thread` is `multiprocessing.Process`. You should be able to do a drop-in replacement. Instead of `current_thread()` you'd use `os.getpid()`.

In [2]:
%load_ext snakeviz

In [4]:
%%snakeviz

import logging
import random
import time
import os

root = logging.getLogger()
root.handlers=[]
logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-9s) %(message)s',)

import multiprocessing

def worker(num):
    """thread worker function"""
    
    sleep_time = random.randint(1,5)
    logging.debug('worker: {0} sleeping for {1} s, name: {2}'
                   .format(num,sleep_time,os.getpid()))
    time.sleep(sleep_time)
    logging.debug('done')
    return

procs = []
for i in range(2):
    p = multiprocessing.Process(target=worker, args=(i,))
    procs.append(p)
    p.start()

(MainThread) worker: 0 sleeping for 3 s, name: 9777
(MainThread) worker: 1 sleeping for 4 s, name: 9778


 
*** Profile stats marshalled to file '/var/folders/xn/ns06jr5x5w5fkhtq1148h6f00000gp/T/tmp5jm8qhj6'. 


(MainThread) done
(MainThread) done


If your machine has multiple cores, these two processes may get run on those two separate cores, independently.

You may need to share info between processes. You can do this, just like with Threads with `Queues`. You can also use the (UNIX-like) Pipe to have  two processes communicate with each other:

In [5]:
# https://docs.python.org/3/library/multiprocessing.html#exchanging-objects-between-processes
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


Using pools of workers (in separate processes) with multiprocessing Pool.

https://docs.python.org/3.5/library/multiprocessing.html#using-a-pool-of-workers

In [6]:
from multiprocessing import Pool 
pool = Pool(processes=4)     # start 4 worker processes 

In [7]:
pool

<multiprocessing.pool.Pool at 0x104827940>

In [8]:
import time
def g(x): 
    time.sleep(0.2)
    return x*x

In [10]:
%time list(map(g,range(10)))

CPU times: user 3.67 ms, sys: 3.04 ms, total: 6.72 ms
Wall time: 2.02 s


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [11]:
%time pool.map(g,range(10))

CPU times: user 2.32 ms, sys: 1.75 ms, total: 4.07 ms
Wall time: 605 ms


[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

In [12]:
# print same numbers in arbitrary order
for i in pool.imap_unordered(g, range(10)):
    print(i,sep=" ",end=" ")

0 1 4 9 49 16 36 25 64 81 

Python is packaging (pickling) up your functions and sending them to different processes.

In [13]:
pool.map(lambda x: x**3, range(10))

PicklingError: Can't pickle <function <lambda> at 0x1048f19d8>: attribute lookup <lambda> on __main__ failed

In [21]:
# run only one process "g(10)" asynchronously 
result = pool.apply_async(g, [10])

# prints "100" unless you timeout
print(result.get(timeout=0.2)) 

100


In [15]:
result

<multiprocessing.pool.ApplyResult at 0x10490b0b8>

In [22]:
del pool

In [25]:
from multiprocessing import Pool 
import time

def f(x): 
    return x*x

for i in [1,2,3,4,8,16,32]:
    print(i,"*"*5,flush=True)
    pool = Pool(processes=i)               # start 4 worker processes 
    start = time.time()
    pool.map(f, range(100000))
    print("{0:0.4f} sec".format(time.time() - start))
    pool.terminate()
    del pool

1 *****
0.0707 sec
2 *****
0.0451 sec
3 *****
0.0369 sec
4 *****
0.0365 sec
8 *****
0.0289 sec
16 *****
0.0330 sec
32 *****
0.0420 sec


In [26]:
!ulimit -a

core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
file size               (blocks, -f) unlimited
max locked memory       (kbytes, -l) unlimited
max memory size         (kbytes, -m) unlimited
open files                      (-n) 256
pipe size            (512 bytes, -p) 1
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 709
virtual memory          (kbytes, -v) unlimited


# `concurrent.futures` - Launching parallel tasks

Built-in, create different pools for executing **maps** (single loop over data). Local resources.

`The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.`

https://docs.python.org/3/library/concurrent.futures.html

In [27]:
from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor()  # can also use a threadpool

In [28]:
%%time 

from time import sleep

results = []
for i in range(8):
    sleep(1)
    results.append(i + 1)

CPU times: user 9.71 ms, sys: 6.42 ms, total: 16.1 ms
Wall time: 8.03 s


In [29]:
%%time 
from time import sleep

from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor() 

def slowfunc(x):
    sleep(1)
    return(x+1)

results = list(e.map(slowfunc,range(8)))

CPU times: user 11 ms, sys: 16.2 ms, total: 27.2 ms
Wall time: 2.02 s


In [30]:
e.shutdown()

Figured out I have 4 cores and ran it in 4 separate processes.

## Breakout

Convert the sequential code to parallel using `concurrent.futures`

In [32]:
%%time

import requests
from bs4 import BeautifulSoup

url = "https://en.wikipedia.org/wiki/Special:Random"

lens = []
def run(i):
    a = requests.get(url)
    resp = a.text
    print("title=",BeautifulSoup(resp, 'html.parser')
          .title.string.split("- Wikipedia")[0],"len=",len(resp))
    lens.append(len(resp))

print(lens)

[]
CPU times: user 66 µs, sys: 36 µs, total: 102 µs
Wall time: 108 µs


### Executor.submit

`submit` starts an execution in a separate thread or process and immediately returns a `Future` object that points back to the result. Until the function completes, the future is pending. We get the result of a task with `.result()`, which blocks until the computation is complete.

In [33]:
%%time 
from time import sleep

from concurrent.futures import ProcessPoolExecutor
e = ProcessPoolExecutor() 

def slowfunc(x,y,delay=1):
    sleep(delay)
    return(x+y)

future = e.submit(slowfunc,1, 2)

CPU times: user 4.78 ms, sys: 12.9 ms, total: 17.7 ms
Wall time: 15.9 ms


In [34]:
future.result()

3

In [35]:
%%time 
futures = [e.submit(slowfunc,1,2, delay=1) for _ in range(10)]
results = [f.result() for f in futures]

CPU times: user 7.29 ms, sys: 3.95 ms, total: 11.2 ms
Wall time: 3.01 s


## Joblib

http://pythonhosted.org/joblib/

Running Python functions as pipeline jobs. The *vision is to provide tools to easily achieve better performance and reproducibility when working with long running jobs.* Specifically meant to work well with large data (ie. numpy arrays).

  - **Avoid computing twice the same thing**: code is rerun over an over, for instance when prototyping computational-heavy jobs (as in scientific development), but hand-crafted solution to alleviate this issue is error-prone and often leads to unreproducible results
  - **Persist to disk transparently**: persisting in an efficient way arbitrary objects containing large data is hard. Using joblib’s caching mechanism avoids hand-written persistence and implicitly links the file on disk to the execution context of the original Python object. As a result, joblib’s persistence is good for resuming an application status or computational job, eg after a crash.

Joblib strives to address these problems while leaving your code and your flow control as unmodified as possible (no framework, no new paradigms).

In [36]:
!conda install joblib -y

Fetching package metadata .........
Solving package specifications: ..........

# All requested packages already installed.
# packages in environment at /Users/Kamilobu/anaconda/envs/ay250:
#
joblib                    0.9.4                    py35_0  


In [37]:
from math import sqrt
[sqrt(i ** 2) for i in range(10)]

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

### Parallel Helpers

Joblib provides a simple helper class to write parallel for loops using multiprocessing. The core idea is to write the code to be executed as a generator expression, and convert it to parallel computing.

In [38]:
from math import sqrt
from joblib import Parallel, delayed

By default Parallel uses the Python multiprocessing module to fork separate Python worker processes to execute tasks concurrently on separate CPUs. This is a reasonable default for generic Python programs but it induces some overhead as the input and output data need to be serialized in a queue for communication with the worker processes. 

In [56]:
Parallel(n_jobs=2,backend="threading") \
  (delayed(sqrt)(i **2) for i in range(10))

[0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0]

In [58]:
import time
start = time.time()
Parallel(n_jobs=10,verbose=5) \
  (delayed(time.sleep)(1) for _ in range(100))
print(time.time()-start)

[Parallel(n_jobs=10)]: Done  52 tasks      | elapsed:    6.0s


10.166458129882812


[Parallel(n_jobs=10)]: Done 100 out of 100 | elapsed:   10.0s finished


### On demand recomputing: the `Memory` class

Caching long running results so it can be reused. Let's try to cache to disk:

In [59]:
from joblib import Memory
memory = Memory(cachedir="/tmp/", verbose=5)

In [60]:
@memory.cache
def f(x):
    print('Running f(%s)' % x)
    return x

In [61]:
print(f(1))

[Memory]    0.8s, 0.0min: Loading f...
___________________________________________________f cache loaded - 0.0s, 0.0min
1


In [62]:
print(f(1))

[Memory]    3.1s, 0.1min: Loading f...
___________________________________________________f cache loaded - 0.0s, 0.0min
1


In [64]:
print(f(20))

________________________________________________________________________________
[Memory] Calling __main__--Users-Kamilobu-Desktop-ASTRO-astro-python-CLASSES-class04-08_Parallelism-__ipython-input__.f...
f(20)
Running f(20)
________________________________________________________________f - 0.0s, 0.0min
20


In [65]:
!ls -lat /tmp/joblib/__main__--Users-jbloom-Classes-python-seminar-DataFiles_and_Notebooks-08_Parallelism-__ipython-input__/f

ls: /tmp/joblib/__main__--Users-jbloom-Classes-python-seminar-DataFiles_and_Notebooks-08_Parallelism-__ipython-input__/f: No such file or directory


In [47]:
memory = Memory(cachedir="/tmp/",verbose=1,mmap_mode="r+")

In [48]:
@memory.cache
def g(x,blah=True):
    print('Running g(%s)' % x)
    return x

In [49]:
print(g(1))

________________________________________________________________________________
[Memory] Calling __main__--Users-Kamilobu-Desktop-ASTRO-astro-python-CLASSES-class04-08_Parallelism-__ipython-input__.g...
g(1)
Running g(1)
________________________________________________________________g - 0.0s, 0.0min
1


In [50]:
print(g(1))

1


In [51]:
print(g(1,blah=False))

________________________________________________________________________________
[Memory] Calling __main__--Users-Kamilobu-Desktop-ASTRO-astro-python-CLASSES-class04-08_Parallelism-__ipython-input__.g...
g(1, blah=False)
Running g(1)
________________________________________________________________g - 0.0s, 0.0min
1


Ignoring variables:

In [52]:
@memory.cache(ignore=['blah'])
def h(x,blah=True):
    print('Running h(%s)' % x)
    return x

In [53]:
print(h(1))

________________________________________________________________________________
[Memory] Calling __main__--Users-Kamilobu-Desktop-ASTRO-astro-python-CLASSES-class04-08_Parallelism-__ipython-input__.h...
h(1)
Running h(1)
________________________________________________________________h - 0.0s, 0.0min
1


In [54]:
print(h(1,blah=False))

1


Note: joblib also gives (for persistence) `joblib.dump()` and `joblib.load()` provide a replacement for pickle to work efficiently on Python objects containing large data, in particular large numpy arrays.