# Parallel processing in Python

The number of libraries and packages for Parallel Processing in Python is huge. Check https://wiki.python.org/moin/ParallelProcessing to get the general picture.

In this session we present `multiprocessing`, one of the most important frameworks to implement parallel applications in Python.



# Python multiprocessing module

The `multiprocessing` module (https://docs.python.org/3/library/multiprocessing.html) is a powerful and versatile library included in Python's standard distribution, designed to facilitate concurrent execution by spawning multiple processes. It provides a high-level interface for creating and managing processes, as well as a wide array of low-level tools to build complex systems for inter-process communication and synchronization.

Unlike threads, which operate within the same memory space and are subject to the Global Interpreter Lock (GIL), `multiprocessing` spawns separate processes with individual memory spaces. This allows true parallelism on multi-core processors. The ideal case would be running one process per physical processor. In this case, each process would really be executed in parallel.

The module includes tools for:

- Creating and managing processes.
- Sharing data between processes using shared memory or managed objects.
- Establishing communication channels like Pipes and Queues.
- Synchronizing operations with primitives such as Locks, Events, Semaphores, and Conditions.

The library provides both low-level primitives for advanced users and high-level abstractions for easier use. The `Process` class is the main low level interface for creating processes. Higher-level constructs like `Pools` allow for an easy management of groups of worker processes, without the caveats of managing low-level interprocess communication and syncronization tasks.



---
Part of the code in this notebook is modified from (and further information in):

- Python 201, Michael Driscoll

- https://www.machinelearningplus.com/python/parallel-processing-python/

- https://github.com/mmckerns/tuthpc

- https://github.com/csc-training/hpc-python. Repository of course: PYTHON IN HIGH PERFORMANCE COMPUTING PARTNERSHIP FOR ADVANCED COMPUTING IN EUROPE (PRACE)


---






### Guess the number of processors in your computer

The number of processes should be related to the number of physical processors in the computer.

In [None]:
# check the number of cores
import multiprocessing as mp

# multiprocessing cpu_count just provides the logical cores
print("Number of logical cores: ", mp.cpu_count())

In [None]:
# use psutil for more detailed information
import psutil

# Number of logical cores
logical_cores_psutil = psutil.cpu_count(logical=True)

# Number of physical cores
physical_cores_psutil = psutil.cpu_count(logical=False)

# Print the results
print(f"Logical Cores (psutil): {logical_cores_psutil}")
print(f"Physical Cores (psutil): {physical_cores_psutil}")

## The Process Class (low level process parallelism)

The Process class allows to create a series of processes that call a given function(s).

You have to create and `start()` the process. Then, just call the `join()` method on each process, which tells Python to wait for the process to terminate. If you need to stop a process, you can call its `terminate()` method.

In [None]:
from multiprocessing import Process
import os

def doubler(number):
  """
  A doubling function that can be used by a process
  """
  result = number * 2
  proc = os.getpid()
  print(f'{number} doubled to {result} by process id: {proc}')

numbers = [5, 10, 15, 20, 25]

procs_list = []
# each process could receive a different target function and/or data args
for index, number in enumerate(numbers):
  p = Process(target=doubler, args=[number]) # args requieres an iterable
  procs_list.append(p)
  p.start()

# wait for the workers to end
for p in procs_list:
  p.join()


However, with this approach, getting the output values of each process would requiere to instantiate a `multiprocessing.Queue` or `multiprocessing.Manager`. So this is a complex aproach, which is mainly used for spawning unrelated processes working on their own (typically, not the case in scientific computing)

In [None]:
from multiprocessing import Process

def tarea():
    return 42  # This value can NOT be retrieved directly

p = Process(target=tarea)
p.start()
p.join()

The following code demonstrates how the child process can send a value to the parent process using a Queue object:

In [None]:
from multiprocessing import Process, Queue

def tarea(q):
    q.put(42)  # Send value to the queue

q = Queue()
p = Process(target=tarea, args=(q,))
p.start()
p.join()

resultado = q.get() 
print(f'Received value = {resultado}') 


**IMPORTANT NOTE**: When you create multiple processes using multiprocessing. Process, each process runs independently and has its own memory space. This means they cannot easily share results with each other or with the main process. Using processes to parallelize tasks is recommended when the tasks are independent and do not require sharing any information.

## The Pool Class

**The `Pool` class is used to represent a pool of worker processes**. It has methods which can allow you to offload tasks to the worker processes.

**It is easier to work with, and higher level, than the `Process` class.**

The Master process submit tasks to the workers, the workers perform the tasks, and finally the master retrieves results from the workers.

The most used methods in the ``Pool`` Class are (although there are much more):

1. Synchronous (blocking) execution: the processes are completed in the same order in which they were started. This is achieved by locking the main program until the respective processes are finished.
  - ``Pool.map()`` and ``Pool.starmap()``
  - ``Pool.apply()``

2. Asynchronous (non-blocking) execution: doesn’t involve locking. As a result, the order of results can get mixed up but usually gets work done quicker.
  - ``Pool.map_async()`` and ``Pool.starmap_async()``
  - ``Pool.apply_async()``

The `map` method is only applicable to a function that accepts a single argument. For routines that accept multiple arguments, the `starmap` method must be used instead. Both versions take an iterable and chunk it into tasks, where every task has the same (mapped) target function.

With regards to `apply` and `apply_async`, both take an `args` argument that accepts the parameters passed to the ‘function-to-be-parallelized’ as an argument, unlike `map` and similar to `starmap`. However, in this case, `apply` just makes a single call to the function to be parallelized. What does this mean? To really parallelize the funcion **you have to manually iterate the call to `apply` to make use of the pool of workers**. This has the advantage that in each call **you can specify not just a new chunk of data, but also a different task (function) to be executed in that worker**. That is, you can pass a task list and a data list with the arguments for each task.

For more info on `apply_async` see:

https://stackoverflow.com/questions/53035293/purpose-of-multiprocessing-pool-apply-and-multiprocessing-pool-apply-async

https://stackoverflow.com/questions/52985131/how-to-write-a-multithreaded-function-for-processing-different-tasks-concurrentl/52992065#52992065

https://docs.python.org/3.8/library/multiprocessing.html#multiprocessing.pool.Pool.apply_async

In compute-intensive tasks, typically the problem is to apply the same function to a huge bunch of data, hence, we will give more attention here to `map` and derived methods.

Other methods in the multiprocessing library permit the creation of pipes, queues and other approaches, but **the pool of (parallel) workers is by far the most typical and straitghforward approach to parallelize between the cores of a single computer**.



In [None]:
# Summary table of Pool class methods
# ----------------------------------------------------------------------------------------------
#                           |           SINGLE FUNCTION               |  MULTIPLE FUNCTIONS    |
# ----------------------------------------------------------------------------------------------
#                           |  Single argument |  Multiple arguments  |  Multiple arguments    |
# ----------------------------------------------------------------------------------------------
# sync process (blocking)   | Pool.map         | Pool.starmap         |  Pool.apply            |
#
# async proc (non-blocking) | Pool.map_async   | Pool.starmap_async   |  Pool.apply_async      |
# ----------------------------------------------------------------------------------------------

### Using Pool.map

All multiprocessing `map` functions for a pool of workers behave similar to the standard python `map` function: they execute a specified function for each item in an iterable that it takes as input (both function and iterable):

```
def square(n):
    return n * n

num_list = [1,2,3,4]
result = map(square, num_list)
print('Mapped result is: ', list(result))

Output:
>> Mapped result is:  [1, 4, 9, 16]
```



In [None]:
import os
import multiprocessing as mp

def doubler(number):
  """
  A doubling function that can be used by a process
  """
  result = number * 2
  proc = os.getpid()
  print(f'{number} doubled to {result} by process id: {proc}')
  return result

numbers = [5, 10, 15, 20, 25]

# instantiate a pool of 3 processes
pool = mp.Pool(processes=3)
result = pool.map(doubler, numbers)
pool.close()

print(f'input data: {numbers}')
print(f'results are in corresponding order: {result}')

In this example, the `pool.map` function works as follows: 

1. **Step 1: Creating a Pool with 3 processes**
2. **Step 2: Assigning Tasks**
    - The Pool splits the list [5, 10, 15, 20, 25] into tasks and assigns them to the 3 processes.
    - Initially, the 3 processes start working on the first 3 numbers:
        * *Process 1*: 5
        * *Process 2*: 10
        * *Process 3*: 15
3. **Step 3: Parallel Execution**
    - The 3 processes execute the doubler function in parallel.
    - When a process finishes, the Pool assigns it the next available number:
        * If *Process 3* finishes first, it is assigned the number 20.
        * If *Process 1* finishes next, it is assigned the number 25.

4. **Step 4: Collecting Results**
    - Even though the processes may finish in different orders, `pool.map` ensures that the results are returned in the correct order:
        * The first result corresponds to 5.
        * The second result corresponds to 10.
        * The third result corresponds to 15.
        * The fourth result corresponds to 20.
        * The fifth result corresponds to 25.

### Using Pool.map_async

`pool.map_async` is the asynchronous version of `pool.map`. This means:

- **It does not block the main program**:
    * Unlike `pool.map`, which waits for all processes to finish before continuing, `pool.map_async` immediately returns an `AsyncResult` object and allows the main program to continue running.
- **Retrieving results**:
    * To get the results, you must call the `.get()` method on the `AsyncResult` object. This method blocks the main program until all processes have finished and the results are available.
- **Order of results**:
    * Although `pool.map_async` is asynchronous, it guarantees that the results are returned in the same order as the input list. This is similar to `pool.map`.


In [None]:
import os
import multiprocessing as mp
import time

def doubler(number):
  """
  A doubling function that can be used by a process
  """
  result = number * 2
  proc = os.getpid()
  print(f'{number} doubled to {result} by process id: {proc}')
  return result

numbers = [5, 10, 15, 20, 25]

# instantiate a pool of 3 processes
pool = mp.Pool(processes=3)
result = pool.map_async(doubler, numbers)
pool.close()  # Don't accept more tasks

####### Here we could do some stuff while the processes run in parallel...

pool.join()   # Wait for all processes ending
results = result.get()# recover the real output data from the result object

print(f'input data: {numbers}')
print(f'results are in corresponding order: {results}')

### Using Pool.starmap

With `Pool.starmap` instead of a single parameter, multiple parameters are passed as tuples to the function that is being ran in parallel.

Hence passing an iterable like `[(1,2), (3,4), ...]` results in `[func(1,2), func(3,4), ...]`.

In [None]:
import os
import multiprocessing as mp

def doubler_adder(a, b):
  result = a * 2 + b * 2
  proc = os.getpid()
  print(f'{a}  and {b} doubled and added to {result} by process id: {proc}')
  return result

# numbers = [5, 10, 15, 20, 25]
numbers_in_tuples = [(x,x+1) for x in range(0,10)] # [(0,1), (1,2), ....]

# instantiate a pool of 3 processes
pool = mp.Pool(processes=3)
result = pool.starmap(doubler_adder, numbers_in_tuples)
# with a single argument, starmap could also be used:
# result = pool.starmap(doubler, [(5,), (10,), (15,), (20,), (25,)])
pool.close()

print(f'input data: {numbers_in_tuples}')
print(f'results are in corresponding order: {result}')

Note: You can obtain the same effect using `Pool.map` (instead of `Pool.starmap`), if you take the additional effort of joining together several arguments of the target function in a single argument (data objetct), like in the next example, where 'doubler_adder' function has been modified to take just 1 parameter, which is in fact a list of the two original parameters:

In [None]:
import os
import multiprocessing as mp

# function modified to take just a single parameter:
def doubler_adder(a):
  result = a[0] * 2 + a[1] * 2
  proc = os.getpid()
  print(f'{a[0]}  and {a[1]} doubled and added to {result} by process id: {proc}')
  return result

# numbers = [5, 10, 15, 20, 25]
numbers_in_list = [[x,x+1] for x in range(0,10)] # [[0,1], [1,2], ....]
# instantiate a pool of 3 processes
pool = mp.Pool(processes=3)
result = pool.map(doubler_adder, numbers_in_list)
pool.close()

print(f'input data: {numbers_in_list}')
print(f'results are in corresponding order: {result}')

## Comparing execution times

The following scripts compare the **blocking multiprocess** approach with the **non-blocking multiprocess** and the **single-process** solution, measuring execution time. Employ a virtual machine with 2 logical cores. 


### 1. Naive approach:

No performance gain from parallel execution for I/O bounded tasks, simple tasks, or few tasks.

In [None]:
import multiprocessing as mp
import time
import numpy as np

def f(x, y):
    return (x+y)**(2)

# generate 2 arrays of 1 million random integers between 1 and 10
x = np.random.randint(1,10,1000000)
y = np.random.randint(1,10,1000000)
print(f'first values in x: {x[0:10]}')
print(f'first values in y: {y[0:10]}')

xy_tuple = [(int(x[i]),int(y[i])) for i in range(0,len(x))]
print(f'first values in xy_tuple: {xy_tuple[0:10]}')

In [None]:
# generate a 2-process pool.starmap
pool = mp.Pool(2)
# Blocking multiprocess execution
t0 = time.time()
result1 = pool.starmap(f, xy_tuple)
t1 = time.time()
pool.close()
# print results
print(f'first values in result1: {result1[0:10]}')
print(f'time for pool.starmap: {t1-t0}')

In [None]:
# generate a 2-process pool.starmap_async
pool = mp.Pool(2)
# Non-blocking multiprocess execution "in the background"
t0 = time.time()
result2_ = pool.starmap_async(f, xy_tuple)
pool.close()  # Don't accept more tasks

####### Here we could do some stuff while the processes run in parallel...

pool.join()   # Wait for all the process ending
result2 = result2_.get()# recover the real output data from the result object
t1 = time.time()
# print results
print(f'first values in result2: {result2[0:10]}')
print(f'time for pool.starmap_async: {t1-t0}')

In [None]:
# Compare with the single-process solution: send data sequentially
result3 = np.zeros(len(x), dtype=int)
#result3 = np.zeros(len(x))
t0 = time.time()
for i in range(len(x)):
  result3[i] = f(x[i],y[i])
t1 = time.time()
print(f'first values in result3: {result3[0:10]}')
print(f'time for single-process: {t1-t0}')

In [None]:
# Compare with the single-process solution: using vectorized operators
t0 = time.time()
result3 = f(x,y)
t1 = time.time()
print(f'first values in result3: {result3[0:10]}')
print(f'time for single-process: {t1-t0}')

Compare and analyze the times you obtained when executing the different versions of the programs. 

- At first sight, one would expect the execution time of the parallel versions to be half of that required by the single-process sequential version of the program. Have you observed such a reduction in the time measurements? If not, what is the reason?

- By comparing the execution times of the synchronous and asynchronous parallel versions, what conclusions can you draw?

- Which version of the program is faster? What are the reasons that make this version the most optimized?


> **IMPORTANT NOTE**
>- A parallel version of a program can be slower than a sequential one. This can happen when there exists a huge overhead caused by sending data to each process.
>- When you define relatively simple tasks (x+y)<sup>2</sup>, the time spent sending data and gathering results is much greater than the time spent on the actual computation.
>- To take advantage of parallel processing, we should keep process pool occupied in relatively complex tasks and minimize communication overhead. And, of course, we should use a larger number of physical processors.


### 2. Data-chunked version.

As a demonstration, check the next code, slighty modified from the previous example:

**NOTE**:

We will also make use of the "`with`" blocks,  referred to as **context managers**. A context manager is a construct that allows you to allocate and release resources automatically when entering and exiting a block of code. The `with` statement ensures that setup and cleanup operations are handled correctly, even if exceptions occur within the block. In this case, it eliminates the need to explicitly call `pool.close()` and `pool.join()` at the end of the parallel section.

The use of context managers is generally recomended in python programming, but it is particularly important in parallel programming.

In [None]:
# This example defines a compute-intensive function and sends data in chunks
# NOTE: Recommended to test this example in a (virtual) machine with 4 logical cores.
import time
import numpy as np
import multiprocessing as mp

# Generate 40 million random floats
N = 40_000_000
x = np.random.rand(N).astype(np.float64)

# Create 4 chunks of 10 million elements each:
chunk_size = 10_000_000
chunks = [x[i:i + chunk_size] for i in range(0, N, chunk_size)]
print(f"Number of chunks: {len(chunks)}")
print(f"Size of each chunk: {chunk_size}")

# Define a CPU-intensive function
def heavy_function(array):
    for _ in range(3):
        array = (np.sin(array)+np.cos(array))**(array*array)
    return np.sum(array)  # Just return the sum of the processed chunk

# Multiprocessing - blocking version (Pool.map)
def parallel_map(chunks_list, num_procs=4):
    with mp.Pool(processes=num_procs) as pool:
        results = pool.map(heavy_function, chunks_list)
    return results

# Multiprocessing - async version (Pool.map_async)
def parallel_map_async(chunks_list, num_procs=4):
    with mp.Pool(processes=num_procs) as pool:
        async_result = pool.map_async(heavy_function, chunks_list)
        results = async_result.get()  # Wait for processes to finish
    return results

# Single-process (serial) execution
def serial_execution(chunks_list):
    results = []
    for chunk in chunks_list:
        results.append(heavy_function(chunk))
    return results

# time all three approaches:
# execute and time Parallel blocking code (Pool.map)
start_time = time.time()
results_map = parallel_map(chunks)
end_time = time.time()
print(f"[Pool.map]     Elapsed time: {end_time - start_time:.2f} seconds")

# execute and time Parallel async code (Pool.map_async)
start_time = time.time()
results_map_async = parallel_map_async(chunks)
end_time = time.time()
print(f"[map_async]    Elapsed time: {end_time - start_time:.2f} seconds")

#execute and time Serial code
start_time = time.time()
results_serial = serial_execution(chunks)
end_time = time.time()
print(f"[Single-process] Elapsed time: {end_time - start_time:.2f} seconds")


### 3. Another example: prime decompositon

A new example, in this case using a more complex task: prime factor decomposition of large numbers.

In this case, we are using also `tqdm` to provide a progress bar which supports both single-process and multi-process execution.

In [None]:
# This example uses a function that computes prime factors of a number
# NOTE: On a machine with just 1 physical core there won't be performance
# gains, use an engine with more cores to perceive the gain in this example 

import time
import numpy as np
from multiprocessing import Pool
from tqdm import tqdm

# Define a function to decompose a number into its prime factors
def prime_factors(n):
    factors = []
    divisor = 2
    while n > 1:
        while n % divisor == 0:
            factors.append(divisor)
            n //= divisor
        divisor += 1
        if divisor * divisor > n and n > 1:
            factors.append(n)
            break
    return factors

# Create data to process (large numbers for factorization)
# create a list of 100 random floating point numbers between 1e14 and 1e18
data = np.random.uniform(1e10, 1e12, 100)

# Single-process execution
def single_process_execution(data):
    results = []
    for number in tqdm(data, desc="Single-process execution"):
        results.append(prime_factors(number))
    return results

# Multi-process execution with starmap
def multi_process_execution(data):
    with Pool(2) as pool:  # Use 2 processes
        results = list(tqdm(pool.map(prime_factors, data), total=len(data), desc="Multi-process execution"))
    return results

# Measure time for single process execution
start_time = time.time()
single_results = single_process_execution(data)
single_duration = time.time() - start_time
print(f"Single-process execution time: {single_duration:.2f} seconds")

# Measure time for multi-process execution
start_time = time.time()
multi_results = multi_process_execution(data)
multi_duration = time.time() - start_time
print(f"Multi-process execution time (2 cores): {multi_duration:.2f} seconds")

# Verify that the results are identical
assert single_results == multi_results, "Results do not match!"
print("Results are identical.")


## Running several python scripts in parallel

In [None]:
%%file script1.py
import os
print(f'hello from script 1, executed by process {os.getpid()}.')
f= open("file1.txt","w+")
f.write("hello from script 1")
f.close()


In [None]:
%%file script2.py
import os

print(f'hello from script 2, executed by process {os.getpid()}.')
f= open("file2.txt","w+")
f.write("hello from script 2")
f.close()

In [None]:
%%file script3.py
import os

print(f'hello from script 3, executed by process {os.getpid()}.')
f= open("file3.txt","w+")
f.write("hello from script 3")
f.close()

 Now, run 3 processes so that each process executes one of the python scripts in parallel with the other:

In [None]:
import os
import multiprocessing as mp
import subprocess

script_list = ['script1.py', 'script2.py', 'script3.py']

def run_python(process):
  result = subprocess.run(["python", process], capture_output=True, text=True)
  return result.stdout

pool = mp.Pool(processes=3)
results = pool.map(run_python, script_list)
pool.close()

print(f'results = {results}')

## Using Pool.apply

The Pool.map and Pool.apply will lock the main program until all processes are finished, which is quite useful if we want to obtain results in a particular order for certain applications.

In contrast, the async variants will submit all processes at once and retrieve the results as soon as they are finished. One more difference is that we need to use the get method after the apply_async() call in order to obtain the return values of the finished processes.

The order of the results is not guaranteed to be the same as the order of the calls to Pool.apply_async.

Notice also that you could call a number of different functions with Pool.apply_async (not all calls need to use the same function). In contrast, Pool.map applies the same function to many arguments.






In [None]:
from multiprocessing import Pool

def doubler(number):
  """
  A doubling function that can be used by a process
  """
  result = number * 2
  proc = os.getpid()
  print(f'{number} doubled to {result} by process id: {proc}')
  return result

numbers = [5, 10, 15, 20, 25]

results =[]
pool = Pool(processes=3)
for i,number in enumerate(numbers): # note the for-loop!!! calling apply_async creates just a single process
  results.append(pool.apply_async(doubler, (numbers[i],)).get(timeout=1))

print(results)
