# Parallel programming in Python

Several applications in science, research, and others, rely on performing iterable tasks than can increase in size and complexity rather rapidly. Most often, these can become a bottleneck for fast computation of results. In traditional serial computing, a single task is executed at a time. When tasks are executed sequentially, the program waits for each task to complete before moving on to the next. This can lead to waisting processing time, especially if some tasks are independent and don't need to wait for others to complete. However, with parallel processing, multiple tasks are executed simultaneously, resulting in faster execution times. In this case, tasks are divided and allocated to different processors. Each processor works on its assigned task simultaneously, thereby reducing the overall execution time. This is particularly beneficial in programs where tasks are independent.

## Overview

When interested in doing high-throughput computing, we can benefit from running multiple tasks concurrently. In this notebook we will learn the basic implementations for running parallel computations in python. We focus attention of two alternatives, namely, the [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) and [dask](https://docs.dask.org/en/stable/delayed.html) libraries.

# 1. Libraries

In [None]:
import gc
import dask
import math

import multiprocessing as mp

from dask.distributed import Client, LocalCluster

global cores

# 2. Multiprocessing

We start by defining two functions. The first, `driver`, consists of doing a simple operation on a given number and returning that result. Notice, however, that you can include your choice of implementation within this callable function.

In [None]:
def driver(number, num=0.0, den=0.0):
    
    numerator   = math.log(number + num + 1.0)
    denominator = math.sqrt(number + den + 1.0)
    
    result      = numerator/denominator
    
    return result

The second, `collect`, will call the garbage collector to help us clean up after running our tests.

In [None]:
def collect():
    
    gc.collect(2)
    gc.collect(1)
    gc.collect()

A third function called `array` will help us generate a list with variable and optional arguments.

In [None]:
def array(size, num=0.0, den=0.0, option='none'):
    
    arguments = ( (i, num, den) for i in range(size) ) if 'star' in option.lower() else ( i for i in range(size) )
    
    return arguments

We are all set. Now let's have a look at the available cores. For that we can use the `mp.cpu_count()` function. Please keep in mind that one should not assign all available cores for our computation.

In [None]:
print(f'{mp.cpu_count() = }')

Once we know our resources, we can select an appropriate number of cores for running parallel computations. We will assign the number to the global variable `cores`. It will be available for all the functions in our Notebook.

In [None]:
cores = 4

Now we need to define a base line in order to compare our different implementations. This will be the `serial` run.

In [None]:
def serial(args, num=0.0, den=0.0):
    
    result = [ driver(i, num, den) for i in args]
    
    return result

## 2.1 `map`, `map_async`, `starmap` and `starmap_async`

The multiprocessing library provides with a class called `Pool`. As its name suggests, it creates a pool of worker processes. The Pool object creates by default a worker process per core. There are three methods available for the Pool object, these are map, map_async, starmap, and starmap_async. The difference between the synchornous and asynchronous methods is that the later provide a workaround to the limitations related to blocking processes. These methods do not block the pool until tasks are complete, instead, they return an `AsyncResult` object from which the results may be retrieved.

* The `map` function usually is applied to each item in an iterable variable. It then submits all items as tasks to the pool then blocks it until all tasks are complete. This method is limited to functions that make use of a single argument.

* The `starmap` function extends the `map` functionality to callable functions requiring more that a single argument.

* The `map_async` function corresponds to the asynchronous version of `map`.

* The `starmap_async` function provides an asynchronous version of `starmap`.

Creating a pool consits of three basic steps:

1. Define the Pool object
2. Execute the Pool
3. Terminate the Pool.

Let's write the code for these steps for the `driver` we defined previously. **NOTE** In order to avoid conflicts between processes, it is good practice to create the Pool object within some function() or main().

In [None]:
if __name__ == '__main__':
#
### Step 1. Define the pool
#
    pool = mp.Pool(cores)
#
### Run the Pool
#
    result_parallel = pool.map(driver, (i for i in range(64)))
#
### Terminate the Pool
#
    pool.close()
    pool.join()

The `serial` run, instead, looks like the following

In [None]:
result_serial = [ driver(i) for i in range(64) ]

Verify that both approaches yield the same result

In [None]:
print(result_serial)
print(result_parallel)

For practical purposes and ease of implementation, we can wrap all three methods in a single function called `parallel`. Let's write it

Run some quick tests to compare the execution time for different choices of Pool and array size

In [None]:
if __name__ == '__main__':
#
### Define parameters
#

#
### Serial run
#

#
### Parallel runs
#


We can also use the `with` context manager. This alternative greatly facilitates implementation

In [None]:
with mp.Pool(cores) as pool:
    result = pool.starmap( driver, ( (i, 3.0, 3.0) for i in range(64) ) )
    
print(result)

# 3. Dask

Dask provides the means for dealing with larger-than-memory data sets, it achieves this through multicore and distributed parallel execution. It is simple to use, first we import a Python Dask dependency named `delayed` that is used to achieve the parallelization, then we wrap the functions or methods to the delayed function. That's it! However, you might notice that running that code results in the lazy object of the delayed function. This instance contains everything that you need to compute the result. To get the result you must call the `compute()` method.

Let's have a look at a simple code snipet that illustrates the previous paragraph.

In [None]:
if __name__ == '__main__':
    
    num, den = 0.0, 0.0
    
    size     = 4**8
    
    print(f'evaluating array with {size:,} elements')
    
    for scheduler in ['single-threaded','synchronous', 'threading','multiprocessing', 'processes']:
        print(f'\n{scheduler = }')
        
        iterator = [ dask.delayed(driver)(i, num=num, den=den) for i in range(size) ]
        
        %timeit -n 10 -r 2 dask.compute(*iterator, scheduler=scheduler, n_workers=cores)
        
        collect()

## 3.1 Cluster and Client

Dask also offers the possibility to define a `LocalCluster` that sets the environment for computation. Then, we may start a `Client` for our runs.

In [None]:
num, den = 0.0, 0.0

size     = 4**10

with LocalCluster( n_workers=cores) as cluster, Client() as client:

    iterator = [ dask.delayed(driver)(i, num=num, den=den) for i in range(size) ]

    result   = client.compute(iterator)

    %timeit -n 10 -r 2 client.gather(result)

For those cases where we want to interact and modify constantly the settings for our computing environment, it is beneficial to instantiate our cluster and client in interactive mode

In [None]:
cluster = LocalCluster(n_workers=cores)
client  = Client(cluster)

Now we can interact directly with the cluster and client

In [None]:
cluster

In [None]:
client

In [None]:
num, den = 0.0, 0.0

size     = 1000

iterator = [ dask.delayed(driver)(i, num=num, den=den) for i in range(size) ]

result   = client.compute(iterator)

%timeit -n 10 -r 2 client.gather(result)

Once we are done with our computations, we **MUST CLOSE** the client and cluster.

In [None]:
client.close()
cluster.close()