<hr/>
# Using a pool of workers
<hr/>

In many situations we have J jobs to do and P processors available.  If each job takes $T_j$ time, it would helpful to have an automated procedure for launching jobs on processes in such a way that each process uses approximately $W_p$ total work, where

\begin{equation*}
\mbox{total work per processor} \qquad W_p \approx \frac{1}{P}\sum_{j=1}^J T_j \qquad p = 1,2,...,P
\end{equation*}

The `multiprocessing` module's Pool object solves this problem.   

If we have several jobs which we expect to take unequal lengths of time to process, we can use a *pool* of workers.  The advantage of this approach is that the processes will be automatically launched, and results automatically collected and returned.  The steps in launching a pool are as follows : 

1.  Decide on how many *tasks* should be launched (`njobs`)

2.  Decide on how many *processes* should be launched (`nprocs`)

3.  Define functions that should be called when defining a worker process or processes (`worker`).  

4.  Launch the pool of workers.  There are two conceptual ways to launch jobs:

    * Use `apply` or `apply_async`.  This will launch a single job, with a single argument, using a process from the pool.   Multiple processes can be launched with multiple calls to `apply`.  
    
    * Use `map` or `map_async`.  This will launch multiple jobs to run the same task on an  *iterable* object (array, list, zip object, and so on).  
    
Here are four sample codes.  The function to be applied in each case is `f`, with an array of `njobs` tuples `data`.  In each case, the Pool is defined as
<pre><code>
pool = multiprocessing.Pool(processes=nprocs)
</code></pre>

Chunksize (illustrated in Example 5) is set to 1 by default.  For asynchronous calls, we can also specify a callback that will process results as they become available.

For all examples, the results from all processes are stored in `results`. 

<hr/>

### Blocking code using `Pool.map`

<pre><code>
# Blocks until all results are ready
results = pool.map(func=f, iterable=data)
</code></pre>

<hr/>

### Non-blocking code using `Pool.map_async`
Results can be processed as they become available using a `callback` function.

<pre><code>
# Non-blocking code.  Results can be processed by a callback.
async_results = pool.map_async(func=f, iterable=data, 
    callback=cb)

# Blocks until results are ready
results = async_results.get()    
</code></pre>

<hr/>

### Blocking code using `Pool.apply`
Code will run sequentially - probably not what we want for parallel computing!
<html><pre><code>
results = []
for d in data:
    # Blocks until job is done
    r = pool.apply(func=worker, args=(d,))        
    results.append(r)
</code></pre></html>

<hr/>

### Non-blocking code using `Pool.apply_async`
We can use a `callback` to process the results as they become available.
<pre><code>
async_results = []
for d in data:
    # Non-blocking calls
    r = pool.apply_async(func=f,args=(d,),callback=cb)
    async_results.append(r)
    
# Blocks until results are ready 
results = [r.get() for r in async_results] 
</code></pre>

<hr/>
## Results explained

In the code that follows, we provide several examples illustrating each of the above modes for calling a Pool of workers. 

The following are typical results obtained using the `Pool.map_async` call.
<pre><code>
Launching 19 jobs on 8 cores
In process 4916 ( 0) is sleeping   4.8323 seconds
In process 4918 ( 2) is sleeping   0.0375 seconds
In process 4921 ( 5) is sleeping   2.9111 seconds
In process 4917 ( 1) is sleeping   2.2037 seconds
In process 4922 ( 6) is sleeping   3.3578 seconds
In process 4920 ( 4) is sleeping   4.6963 seconds
In process 4919 ( 3) is sleeping   4.5549 seconds
In process 4923 ( 7) is sleeping   0.4197 seconds
In process 4918 ( 8) is sleeping   3.8324 seconds
In process 4923 ( 9) is sleeping   1.1840 seconds
In process 4923 (10) is sleeping   0.1541 seconds
In process 4923 (11) is sleeping   3.9439 seconds
In process 4917 (12) is sleeping   1.7304 seconds
In process 4921 (13) is sleeping   3.1164 seconds
In process 4922 (14) is sleeping   3.0791 seconds
In process 4918 (15) is sleeping   0.7428 seconds
In process 4917 (16) is sleeping   0.9155 seconds
In process 4919 (17) is sleeping   0.5721 seconds
In process 4918 (18) is sleeping   0.0731 seconds
<br/>
Total time spent in each process
Process  1 (4916)    4.8323(s)    1 job(s) (0,)
Process  2 (4917)    4.8496(s)    3 job(s) (1, 12, 16)
Process  3 (4918)    4.6857(s)    4 job(s) (2, 8, 15, 18)
Process  4 (4919)    5.1269(s)    2 job(s) (3, 17)
Process  5 (4920)    4.6963(s)    1 job(s) (4,)
Process  6 (4921)    6.0275(s)    2 job(s) (5, 13)
Process  7 (4922)    6.4369(s)    2 job(s) (6, 14)
Process  8 (4923)    5.7017(s)    4 job(s) (7, 9, 10, 11)

      Total work done (s)      42.3570
      Wall clock time (s)       6.4890
</code></pre>
The first set of results (19 lines) is printed for each job launched and 
provides the process PID (decided by the OS), the task number (0-18), and the time spent in each job.   This information is printed as jobs are launched. 

The second set of results (8 lines) collects process information and provides the processor number (1-8) and PID, the total time spent in that process, the number of jobs launched on that processor, and the job numbers launched on that processor.

**Observations.** The first set of results (first 19 lines) are printed as jobs are launched and illustrates the `asynchronous` nature of the call.  Jobs do not necessarily start processing as soon as they are launched.  For example, jobs 2 and 5 are started before job 1.  On the other hand, if we were to sort this list on job number, we would see that jobs are launched on our 8 processors in order.  Here are the first several jobs taken from the top of a list sorted on job number. 

<pre><code>
In process 4916 ( 0) is sleeping   4.8323 seconds
In process 4917 ( 1) is sleeping   2.2037 seconds
In process 4918 ( 2) is sleeping   0.0375 seconds
In process 4919 ( 3) is sleeping   4.5549 seconds
In process 4920 ( 4) is sleeping   4.6963 seconds
In process 4921 ( 5) is sleeping   2.9111 seconds
In process 4922 ( 6) is sleeping   3.3578 seconds
In process 4923 ( 7) is sleeping   0.4197 seconds
In process 4918 ( 8) is sleeping   3.8324 seconds
In process 4923 ( 9) is sleeping   1.1840 seconds
....
</code></pre>
The first 8 jobs (jobnums 0-7) are launched in order on processors 4916-4923, but the 9th job (jobnum=8) is launched on process 4918, the first processor available (jobs on 4916 and 4917 were still processing).  Similarly, job 10 (jobnum=9) is launched on the next avalailable processor 4923.

Post-processing the results (second 8 lines), we see that to achieve approximately the same amount of time per processor (i.e. *load balancing*), jobs are distributed unequally.  To achieve an approximate time of 5s per process, processors 1 and 5 were only able to processs 1 job, whereas processors 3 and 8 were able to process 4 jobs each.

<hr/>
## Python modules used for all examples

The following creates and saves the module `pool_tools.py`, used by all example.   The magic command `%%file` creates the indicated file.

In [None]:
%%file pool_tools.py
from multiprocessing import Pool
import time, os, random

def worker(z):
    jobnum, t = z    # Distribute tuple to variables.
    id = os.getpid()
    print("In process {} ({:2d}) is sleeping {:8.4f} seconds".format(id,jobnum,t))
    time.sleep(t)
    return (jobnum,t,os.getpid())

def print_pool_results(res,np):
    # how much time was spent in each process? 
    pids = sorted(set([z[2] for z in res]))    # Get a unique set of PIDs
    print("")
    print("Total time spent in each process")
    total_time = 0
    for i,p in enumerate(pids):
        proc_count = sum([1 for z in res if z[2] == p])
        proc_time  = sum([z[1] for z in res if z[2] == p])
        proc_jobs  = tuple([z[0] for z in res if z[2] == p])
        print("Process {:2d} ({})  {:8.4f}(s) {:4d} job(s) {}"
              .format(i+1,p,proc_time,proc_count,proc_jobs))
        total_time += proc_time
    print("")
    print("{:>25s} {:12.4f}".format("Total work done (s)",total_time))                
        

<hr/>
## Example 1 : pool.map(func=f,iterable=data)

In [None]:
from pool_tools import *

def test1(data,np):
    pool = Pool(processes=np)              

    # This function blocks until results 'res' are available
    results = pool.map(func=worker,iterable=data)
    print_pool_results(results,np)
    
np = 8
njobs = 19
            
print("Launching {} jobs on {} cores".format(njobs,np))
    
random.seed(1234)

sleep_times = [5*random.random() for i in range(njobs)]
data = zip(range(njobs),sleep_times)    # Create list of tuples (p,t)
    
tr = %timeit -n 1 -r 1 -o -q pass; test1(data,np)
print("{:>25s} {:12.4f}".format("Wall clock time (s)",tr.best))

<hr/>
## Example 2 : pool.map_async(func=f,iterable=data)

In [None]:
from pool_tools import *

def test2(data,np):
    pool = Pool(processes=np)              
    
    # This is non-blocking
    async_results = pool.map_async(func=worker,iterable=data)
    
    # This call blocks until all results are available
    results = async_results.get()   
    
    print_pool_results(results,np)
    
np = 8
njobs = 19
            
print("Launching {} jobs on {} cores".format(njobs,np))
    
random.seed(1234)

sleep_times = [5*random.random() for i in range(njobs)]
data = zip(range(njobs),sleep_times)    # Create list of tuples (p,t)
    
tr = %timeit -n 1 -r 1 -o -q pass; test2(data,np)
print("{:>25s} {:12.4f}".format("Wall clock time (s)",tr.best))

<hr/>
## Example 3 : pool.apply(func = f,args = data)

Using this mode, jobs are launched in order and run sequentially.  Jobs are not run in parallel. 

In [None]:
from pool_tools import *

def test3(data,np):
    pool = Pool(processes=np)          

    results = []
    for d in data:
        # This call is blocking;  jobs run sequentially
        r = pool.apply(worker,args=(d,))
        results.append(r)
    
    print_pool_results(results,np)
    
np = 8
njobs = 19
            
print("Launching {} jobs on {} cores".format(njobs,np))
    
random.seed(1234)

sleep_times = [5*random.random() for i in range(njobs)]
pnum = range(njobs)
data = zip(pnum,sleep_times)
    
tr = %timeit -n 1 -r 1 -o -q pass; test3(data,np)
print("{:>25s} {:12.4f}".format("Wall clock time (s)",tr.best))

<hr/>
## Example 4 : pool.apply_async(func=data,args=data)

In [None]:
from pool_tools import *

def test4(data,np):
    pool = Pool(processes=np)             

    # This call is non-blocking;  
    async_results = []
    for d in data:
        r = pool.apply_async(worker,args = (d,))
        async_results.append(r)
    pool.close()
    pool.join()     # Block here or with r.get() below
    results = [r.get() for r in async_results]  # this blocks if pool is not closed/joined
    print_pool_results(results,np)
    
np = 8
njobs = 19
            
print("Launching {} jobs on {} cores".format(njobs,np))
    
random.seed(1234)

sleep_times = [5*random.random() for i in range(njobs)]
pnum = range(njobs)
data = zip(pnum,sleep_times)
    
tr = %timeit -n 1 -r 1 -o -q pass; test4(data,np)
print("{:>25s} {:12.4f}".format("Wall clock time (s)",tr.best))

<hr/>
## Example 5 : Controlling how tasks are distributed

We can have some control over how tasks are handed off to processors using the `chunksize` keyword.  Setting `chunksize=4` when calling `map_async` for example, the pool will put the first four tasks on the first processor, the second four tasks on the second processor, and so on.  

There are two main drawbacks to this approach : 

* This can lead to very bad load balancing.
* Some processors in the pool may not get used at all.

In the following example, we create 23 tasks for 8 processors, with a chunksize of 4.  With this configuration, 5 processors will get 4 tasks each, 1 processor will get 3 tasks, and 2 processors will remain idle.

In [None]:
from pool_tools import  *

def test5(data,np):
    pool = Pool(processes=np)              

    # This function blocks until results 'res' are available
    async_results = pool.map_async(func=worker,iterable=data,chunksize=4)
    results = async_results.get()
    print_pool_results(results,np)
    
np = 8
njobs = 19
            
print("Launching {} jobs on {} cores".format(njobs,np))
    
random.seed(1234)

sleep_times = [5*random.random() for i in range(njobs)]
data = zip(range(njobs),sleep_times)    # Create list of tuples (p,t)
    
tr = %timeit -n 1 -r 1 -o -q pass; test5(data,np)
print("{:>25s} {:12.4f}".format("Wall clock time (s)",tr.best))

<hr/>
## Example 6 : Using a callback to process results

In this example, we use a callback to process results as they become available.  Notice that results are available even before all jobs have been launched.  Also in this example, we illustrate the idea that the main program can be doing work while we are waiting for all jobs to complete. 

The callback takes the result returned from our worker process and computes both the total job time (`rt`) taken so far, and total wall-clock (`wc`) time.  Since we are running on multiple processors, we expect `rt` $>$ `wc`. 

**Note:**  For this example, we have increased the time spent in each process to some value in $[0,30]$ (rather than $[0,5]$ in previous examples.  

In [None]:
from pool_tools import *

# Process results right away.
running_total = 0
def cb(res):
    global t0, running_total
    r = res
    t1 = time.time()
    wc = t1-t0
    running_total += r[1]
    print("Process {}, job {:2d} is done in {:8.4f} (s) (wc/rt {:8.2f}/{:.2f})".
          format(r[2],r[0],r[1],wc,running_total))

def test6(data,np):
    pool = Pool(processes=np)              # start 4 worker processes

    # This call is non-blocking;  
    async_results = []
    for d in data:
        r = pool.apply_async(func=worker, args=(d,), callback=cb)
        async_results.append(r)
    pool.close()
    print("---> Do some useful work while we are waiting for background jobs.")
    time.sleep(25)
    print("---> Done with our other work !!!")
    pool.join()     # Block here or with r.get() below
    results = [r.get() for r in async_results]  # this blocks if pool is not closed/joined
    print_pool_results(results,np)
    
np = 8
njobs = 19
            
print("Launching {} jobs on {} cores".format(njobs,np))
    
random.seed(1234)

sleep_times = [30*random.random() for i in range(njobs)]
pnum = range(njobs)
data = zip(pnum,sleep_times)
    
t0 = time.time()
tr = %timeit -n 1 -r 1 -o -q pass; test6(data,np)
print("{:>25s} {:12.4f}".format("Wall clock time (s)",tr.best))