## Parallel Python -- multithreading

In [1]:
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor
import multiprocessing as mp
import ipyparallel as ipp
import requests
import time
import os

Parallel processing involves farming jobs out to different processors on your machine. All modern laptops have multiple processors that can be used to run tasks in parallel. However, writing parallel code can be tricky, and it often requires some understanding of how parallel processing works in order to write code to use it efficiently. 

### Processor IDs
The `pid` of a process is the processor ID. This is simply a number that is assigned to a process when it is started. You can think of a process as a reservation on one of your computer's CPUs. It is being reserved for a set task. When we run parallel code with 4 processes the work should be farmed out to four different `pids` that will run on different CPUs. Below is a very simple function that we will use just to check that our code is working the way that we expect. All this function does is tell `sleep` for 1 second, and then return the value of its `pid`. 

In [2]:
queries = range(10)

In [3]:
def who(i):
    time.sleep(i)
    return os.getpid()


####  (1) Run the who function on a single processor
As expected, this takes about 1 second to complete, and return a number corresponding to the process it was run on. 

In [4]:
# run single/normal function
who(2)

7209

####  (2) Run the who function on multiple processors
Here we will submit 10 jobs, one for each item in `queries`, and run the `who()` function on each. We would expect that on a single processor this should take at least 10 seconds to run if we use a 1 second sleep time, but because we're running it on 4 processors in parallel it should take less time. As you can see in the output, each of the four available processors runs a job (i.e., the same `pid` is returned several times). 

In [5]:
with ProcessPoolExecutor(max_workers=4) as pool:
    
    # submit queries to threads
    jobs = [pool.submit(who, 1) for q in queries]
    
    # collect results
    results = [i.result() for i in jobs]
results

[7223, 7224, 7225, 7226, 7226, 7223, 7224, 7225, 7226, 7223]

### ProcessPool and Multiprocessing are very similar. 
These are two different syntaxes for doing multiprocessing. The former is new to Python3, the latter works in Py2 or Py3 and is more commonly used currently. 

In [6]:
%%timeit -n 1 -r 5
with ProcessPoolExecutor(max_workers=4) as pool:
    
    # submit queries to threads
    jobs = [pool.submit(who, 1) for q in queries]
    
    # collect results
    results = [i.result() for i in jobs]

3.05 s ± 10.5 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)


In [39]:
%%timeit -n 1 -r 5
with mp.Pool(processes=4) as pool:
    
    # submit queries to threads (with args as tuples)
    jobs = [pool.apply_async(who, [1]) for q in queries]
    
    # collect results
    results = [i.get() for i in jobs]

3.04 s ± 3.2 ms per loop (mean ± std. dev. of 5 runs, 1 loop each)


### The `Pool` object
In the example above we use a `with` context manager to create a `Pool` object. The object is what interfaces with your CPUs (processes) to send inputs and receive outputs of function calls. There are several ways to send and receive jobs, but the most simple and understandable is the method called `apply_async` for `multiprocessing`, or `submit` for ProcessPoolExecutor.

In [40]:
# apply_async returns the result of a function call when requested, and 
# does not block you from continuing to execute code while it is running.
# This is true parallel processing.
with mp.Pool(processes=4) as pool:
    a = pool.apply_async(who, [1])
    b = pool.apply_async(who, [1])
    c = pool.apply_async(who, [1])
    d = pool.apply_async(who, [1])
    print([i.get() for i in (a, b, c, d)])

[7871, 7872, 7873, 7874]


### The `AsyncResult` object
The object that is return by an `apply_async()` function call is called an `AsyncResult`. This object is used to query the job that we sent to a different processor to ask whether it is finished yet, and to collect the results when it is done. The four main functions to call from "async" objects are `wait`, `get`, `ready` and `successful`. The first function, `wait`, tells the object to `block` until the job is finished, meaning you will no longer be able to run code while it is running. The `get` call is used to ask for a result. It will also `block` if you ask for a result and the job is not finished yet. The `ready` statement is useful because you can ask whether a job is ready yet and it does not `block` when you ask. And finally, the `successful` function will return whether the job completed or raised an error. 

#### Example of the `get()` and `wait()` functions.
The `wait()` function is called inside of the `get()` function, `wait` is only used if you want to block anything else from happening, such as other jobs from being submitted, until the `get` function can be called, meaning that results can be returned. 

In [9]:
# start a pool
with mp.Pool(processes=4) as pool:

    # this does NOT BLOCK and so the code below is executed while 
    # the async job is running on a different processor
    asyncr = pool.apply_async(time.sleep, [2])
    print('job was submitted')
    print('waiting for result...')
    print('executing print statements in this notebook while waiting...')

    # collect result
    asyncr.get()
    print('job finished')

job was submitted
waiting for result...
executing print statements in this notebook while waiting...
job finished


In [10]:
# start a pool
with mp.Pool(processes=4) as pool:

    # this DOES BLOCK and so the code below is NOT executed 
    # while the async job is running on a different processor,
    # but instead only executes after the job is finished.
    asyncr = pool.apply_async(time.sleep, [2])
    print('job was submitted')

    asyncr.wait()

    # collect result
    asyncr.get()
    print("job finished; this couldn't be printed until it was finished")

job was submitted
job finished; this couldn't be printed until it was finished


#### Example of the `ready()` and `successful()` functions.


In [11]:
# start a pool
with mp.Pool(processes=4) as pool:

    # submit a job that will run successfully
    async0 = pool.apply_async(who, [2])
    
    # submit a job that will fail
    async1 = pool.apply_async(who, ['apple'])
    
    # join jobs into an iterable 
    jobs = [async0, async1]
    
    # iterate over async results and print result of succesful jobs
    while 1:
        for job in jobs:
            # skip over the job for now if not ready yet
            if job.ready():
                if job.successful():
                    print('{} was successful'.format(job))
                else:
                    print('{} had an error'.format(job))
                # remove job from queue
                jobs.remove(job)
        
        # end loop if all jobs are finished 
        time.sleep(0.5)
        if len(jobs) == 0:
            break

<multiprocessing.pool.ApplyResult object at 0x7f7a58dcedd8> had an error
<multiprocessing.pool.ApplyResult object at 0x7f7a58dce9e8> was successful


### Ipyparallel -- large-scale computing
The libraries above have limitations for large-scale computing. Because multithreading shares the memory space among threads it is limited to the number of physically connected cores on a CPU. Which is to say that it cannot split work up across multiple CPUs or computers. Similarly, multiprocessing can split work up among CPUs on a single computer, but cannot connect multiple computers or nodes of a cluster together. To do this we need a more complex protocol. This is where `ipyparallel` comes in. 

This library has a powerful program called `ipcluster` for starting a cluster and connecting CPUs from across one of more computers or nodes, and then it has a `multiprocessing`-like Python API for distributing jobs across those cores. Using this tool you can connect to hundreds or thousands of cores and distribute work to them while working in an interactive jupyter notebook. 

Before running the code below you will need to start an ipcluster instance running in a separate terminal with the command below. If you're working on a cluster then use the terminal tab in the jupyter dashboard to open a terminal and make sure you source the proper conda environment (Py2 or Py3) that is the same as the code you are running here. 

In [17]:
# ipcluster start --n=4

In [18]:
# import libraries on remote engines using px magic
%px import time, os

In [19]:
# connect to a running ipcluster instance 
ipyclient = ipp.Client()

In [20]:
# choose method for distributing jobs on client
lb = ipyclient.load_balanced_view()

# send jobs
jobs = [lb.apply(who, 1) for q in queries]

# collect results
res = [i.get() for i in jobs]
res

[7488, 7491, 7484, 7485, 7488, 7484, 7491, 7485, 7488, 7484]