# Agenda:
1. Parallel Programming in Python
2. Multi-threading (shared memory parallelism)
3. Multiprocessing (Distributed-memory parallelism)
    * Process Class
    * Pool Class
        * Synchronous calls
            * pool.Apply()
            * pool.Map()
            * pool.Starmap()
        * Asynchronous calls
            * pool.Apply_async()
            * pool.Map_async()
            * pool.Starmap_async()
            * pool.imap()
            * pool.imap_unordered()
    * Process-to-Process communication
        * Pipes

# __Parallel Programming in Python__

* It is a type of computation in which many calculations or the execution of processes are carried out simultaneously. 
* Large problems can be divided into smaller ones, which can then be solved at the same time

![Parallel execution of task](images/parallelism.png)

Similarly, in python code is executed sequentially


**The standard Python library has two main modules for parallel computing:**
* Threading
* Multiprocessing


**Before we go to the threading module let us understand what a thread and process is:**
* *Thread* : is a thread of execution in a program. Aka, lightweight process.
* *Process* : is an instance of a computer program that is being executed.
- Thread share the memory and state of the parent, process share nothing.
- Process use inter-process communication to communicate, thread do not

# __Multi-threading (shared memory parallelism)__ 

Multithreading is a programming concept and technique that allows a single process to have multiple threads of execution running concurrently within the same program. 
In simpler terms, it enables a program to perform multiple tasks simultaneously, dividing its workload into smaller units called threads

* Good for I/O bound tasks
* Subject to the Global Interpreter Lock (GIL)
  
__In python multi-threading is facilitated by the *threading* module__

let's take a look at a simple execution of the threading module.



In [4]:
# without multi-threading
import time

complete = False
count = 0
inp_list = list(range(21))

def worker(inp_list, count):
    while not complete:
        for i in inp_list:
            time.sleep(1)
            count = i
            print("count = ", count)

worker(inp_list, count)

input("hit enter to exit loop")
complete = True

count =  0
count =  1
count =  2
count =  3
count =  4
count =  5
count =  6


KeyboardInterrupt: 

In [5]:
# to run both the input statement and worker function simultaneously 

import time
import threading # importing threading module

complete = False
count = 0

def worker(count):
    while not complete:
        time.sleep(1)
        print("count = ", count)
        count = count + 1 

t1 = threading.Thread(target=worker, args=(count,)) # creating a thread called t1 using .Thread()
t1.start() # starting the thead

input("hit enter to exit loop\n")
complete = True

count =  0
count =  1
count =  2


count =  3


For certain algorithms, multi-threading can be more efficient than multi-processing.
* Use Multithreading for tasks that relies on the input from another part of the system, or from the user
* Use Multiprocessing for CPU intensive tasks (if you have multiple cores available)

# __Multiprocessing (Distributed-memory parallelism)__
It is the ability of a system to support multiple more than one processor at the same time. Each process gets its own portion of your computer’s memory, ensuring that no process can interfere with the execution of another. (though Multiprocessing in python allows sharing of data between processes running locally)

### Quick revision of processes ###
Processes are-
* Independent
* Don't share memory
* Run of different Processors, if available


### Overhead ###
* started a new process requires overhead (The code gets replicated every time a new process is spun)
* Hence, Use multiprocessing for job that more than a few seconds to run

# 'Process' class:  
*  individual process that can run independently from the main process
* The simplest way to spawn a second is to instantiate a Process object with a target function and call start() to let it begin working.

In [6]:
def standup(updates, no_of_members):
    """
    simulates a stand-up meeting by iterating over the range of `no_of_members` 
    and printing `updates` by introducing a 1-second delay between each update

    """
    for i in range(no_of_members):
        print(updates, i)
        time.sleep(1)

def work(task, no_of_tasks):
    """
    simulates a work task by iterating over the range of `no_of_tasks` 
    and printing `task` by introducing a 1.5-second delay between execution of each task
    """
    for i in range(no_of_tasks):
        print(task, i)
        time.sleep(1.5)

def coffee():
    """
    simulates a coffee break of 3 seconds
    """
    time.sleep(10)
    print("coffee break is done")

In [7]:
# This code sequentially runs all the fuctions and does not make use of multiprocessing
import time

updates = "The update is being given by attendee no. "
task = "Task being completed is at priority "

start_time = time.time()

standup(updates, 4)
work(task, 3)
coffee()

print("It's time to go home")
print("time required = ", time.time() - start_time )

The update is being given by attendee no.  0
The update is being given by attendee no.  1
The update is being given by attendee no.  2
The update is being given by attendee no.  3
Task being completed is at priority  0
Task being completed is at priority  1
Task being completed is at priority  2
coffee break is done
It's time to go home
time required =  18.527772903442383


* Args should be passed using a list or tuple to Process.

eg: ```p = Process(target=print, args=[1])``` or
```p = Process(target=print, args=(1,))```
* Target should be passed as an object and not as function. Do not include parentheses

eg: ```p = Process(target=print)``` and not ```p = Process(target=print())```
  

In [31]:
# Speeding up the code using multiprocessing
import time
import multiprocessing
from with_multiprocessing import standup, work, coffee

if __name__ == "__main__":
    updates = "The update is being given by attendee no. "
    task = "Task being completed is at priority "

    start_time = time.time()

    # spawning up individual 
    t1 = multiprocessing.Process(target=standup,args=(updates, 4))
    t2 = multiprocessing.Process(target=work,args=(task, 3))
    t3 = multiprocessing.Process(target=coffee)

    # starting the processes
    t1.start()
    t2.start()
    t3.start()

    # wait until process 1 is finished
    t1.join()
    # wait until process 2 is finished
    t2.join()
    # wait until process 3 is finished
    t3.join()

    print("It's time to go home")
    print("time required = ", time.time() - start_time )

Task being completed is at priority  0
The update is being given by attendee no.  0
The update is being given by attendee no.  1
Task being completed is at priority  1
The update is being given by attendee no.  2
Task being completed is at priority  2
The update is being given by attendee no.  3
coffee break is done
It's time to go home
time required =  10.23836898803711


One difference between the __threading__ and __multiprocessing__ examples is the extra protection for ```__main__``` used in the multiprocessing examples. Due to the way the new processes are started, the child process needs to be able to import the script containing the target function. Wrapping the main part of the application in a check for ```__main__``` ensures that it is not run recursively in each child as the module is imported. Another approach is to import the target function from a separate script

# 'Pool' Class
The __Pool__ class represents a pool of worker processes. 
```multiprocessing.pool``` provides a simple and convenient interface for parallel processing. It allows you to create a pool of worker processes, which can execute functions concurrently to achieve parallelism and speed up the execution of tasks. 

__pool is responsible for a fixed number of processes.__

# Multiprocessing and Map Reduce
The multiprocessing.Pool provides an excellent mechanism for the parallelisation of map/reduce style calculations. 
* Simple Map/Reduce: In a MapReduce-based system, input data is broken down into chunks for processing by different worker instances. Each chunk of input data is _mapped_ to an intermediate state using a simple transformation. The intermediate data is then collected together and partitioned based on a key value so that all of the related values are together. Finally, the partitioned data is reduced to a result set.

NOTE:  the multiprocessing.pool module provides a reduce function, but it works a bit differently from the "reduce" step in MapReduce. The reduce function in multiprocessing.pool is used to perform a reduction operation on the elements of an iterable using a specified function, but it does not handle distributed data grouping as in the MapReduce paradigm.

Let's look at asimple program to understand this:

In [9]:
# This Code will execute sequentially making use of only 1 core
import time

def calc_square(i):
   """
    returns the square of the the input value
   """
   return i ** 2
    
list_of_nums = [1, 2, 3, 4, 5]
result_list = []

t1 = time.time()
for i in list_of_nums:
   result_list.append(calc_square(i))

print("The list of squares = ", result_list)
print("time required =", time.time() - t1)

The list of squares =  [1, 4, 9, 16, 25]
time required = 0.0007059574127197266


![image info](images/without_map_reduce.png)

Using the Pool class we can parallelize our work and map the inputs to the different cores

![image info](images/with_map_reduce.png)

### After creating the process pool by calling a Multiprocessing.Pool object we submit tasks

There are two main approaches for submitting tasks to the process pool, they are:

1. __Issue tasks synchronously__:
   Issuing tasks synchronously means that the caller will block until the issued task or tasks have completed.
   Blocking calls to the process pool include map(), and starmap(), apply()
   
2. __Issue tasks asynchronously__:
   Issuing tasks asynchronously to the process pool means that the caller will not block, allowing the caller to continue on with other work while the tasks are executing
   Non-blocking calls to the process pool include apply_async(), map_async(), and starmap_async()

   Lets look at synchronous calls first...

### __1. Synchronous Calls__
# 1.a Apply()
```apply(func[, args[, kwds]])```
* ```apply()``` is used to issues a single task to the process pool.
* It blocks the program until the function completes execution, which may be useful for certain synchronization requirements.

In [11]:
def create_and_save_file_apply(file, string):
    """
    creates and saves a single file with the given filename and content string
    """
    file_name = f"file_{file}.txt"
    time.sleep(2)        
    with open(file_name, "w") as file:
        file.write(string)
    return file_name

In [13]:
import time
from multiprocessing import Pool
from apply import create_and_save_file_apply
 
# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    pool = Pool()
    # issue tasks to the process pool
    result = pool.apply(create_and_save_file_apply, args=("empid_27468", "Falguni"))
    # report value
    print(result, "created")
    # close the process pool
    pool.close()

file_empid_27468.txt created


# 1. b Map()
``` (func, iterable[, chunksize]) ```
* map() is used to apply a function to multiple input elements concurrently.
* It takes three arguments:
  1. the function to be executed
  2. an iterable (eg: like a list or tuple) containing multiple input arguments for that function.
  3. (Optional) Chunksize
* It is suitable when you want to process a batch of data and receive the results as a list in the same order as the input elements.
The function is executed on each input element in parallel, and the return values are collected and returned as a list of results in the same order as the input elements.

Let us look at a scenario where we aren't implementing pool.map

In [14]:
def create_and_save_files(string_array):
    """creates and saves files with the given input content string from string_array"""
    for i, string in enumerate(string_array):
        file_name = f"file_{i+1}.txt"
        time.sleep(2)
        with open(file_name, "w") as file:
            file.write(string)

In [18]:
# performs the file creation and writing operations sequentially, one after the other
import time

if __name__ == "__main__":
    # Array of 10 strings
    strings = [
        "Abhijit",
        "Anas",
        "Ayush",
        "Falguni",
        "Harjot",
        "Jigyasa",
        "Prachi"
        "Saurabh",
        "Sheshant",
        "Rahul"]
    start_time = time.time()
    create_and_save_files(strings)
    print("time required = ", time.time() - start_time )

time required =  18.030400037765503


 If the list of strings is large and the file operations take considerable time, this sequential approach may not be the most efficient

In [15]:
def create_and_save_file_map(string_info):
    i, string = string_info
    file_name = f"file_{i+10}.txt"
    time.sleep(2)
    with open(file_name, "w") as file:
        file.write(string)
    return file_name

In [16]:
import time
from multiprocessing import Pool, cpu_count
from pool_map import create_and_save_file_map

if __name__ == "__main__":
    strings = [
        "Abhijit",
        "Anas",
        "Ayush",
        "Falguni",
        "Harjot",
        "Jigyasa",
        "Prachi",
        "Saurabh",
        "Sheshant",
        "Rahul"
    ]
    start_time = time.time()


    # Using multiprocessing pool with worker processes = the number of core available on the os
    with Pool(processes=cpu_count()) as pool:
        for item in pool.map(create_and_save_file_map, enumerate(strings)):
            print(f"file {item} created")

    pool.close()

    print("time required =", time.time() - start_time)

file file_10.txt created
file file_11.txt created
file file_12.txt created
file file_13.txt created
file file_14.txt created
file file_15.txt created
file file_16.txt created
file file_17.txt created
file file_18.txt created
file file_19.txt created
time required = 2.5065529346466064


### the Pool.map() function only takes one iterable as an argument. This means that the target function executed in the process can only take a single argument. However, if we want multiple iterators as an input, we can use starmap()


# 1.c Starmap()
```(func, iterable[, chunksize])```
* starmap() is used to issue multiple tasks to the process pool.
* It takes 3 arguments:
  1. The function to be executed
  2. An iterable of tuples
     eg. ```[(1,2), (3, 4)]``` results in ```[func(1,2), func(3,4)]```.
  4. (Optional)  Chunksize
 * starmap() treats each element of the iterable as a separate set of arguments to the function.
 * If your function can handle multiple arguments or if you need to pass multiple pieces of data to the function for each element, then starmap() is the better choice


In [17]:
def create_and_save_file_starmap(string_info, time_out):
    i, string = string_info
    file_name = f"file_{i+20}.txt"
    time.sleep(time_out)
    with open(file_name, "w") as file:
        file.write(string)
    return file_name

In [18]:
# in this code along with the string we also want one of the arguments to be the individual time_outs

import time
from multiprocessing import Pool, cpu_count
from pool_starmap import create_and_save_file_starmap

if __name__ == "__main__":
    strings = [
        "Abhijit",
        "Anas",
        "Ayush",
        "Falguni",
        "Harjot",
        "Jigyasa",
        "Prachi",
        "Saurabh",
        "Sheshant",
        "Rahul"
    ]
    time_out = [1, 2, 3, 1, 2, 3, 1, 2, 3, 2]
    start_time = time.time()

    # prepare arguments
    input_tuples = list(zip(enumerate(strings), time_out))

    print(input_tuples)
    
    # Using multiprocessing pool with worker processes = the number of core available on the os
    with Pool(processes=cpu_count()) as pool:
        for item in pool.starmap(create_and_save_file_starmap, input_tuples):
            print(f"file {item} created")

    pool.close()

    print("time required =", time.time() - start_time)

[((0, 'Abhijit'), 1), ((1, 'Anas'), 2), ((2, 'Ayush'), 3), ((3, 'Falguni'), 1), ((4, 'Harjot'), 2), ((5, 'Jigyasa'), 3), ((6, 'Prachi'), 1), ((7, 'Saurabh'), 2), ((8, 'Sheshant'), 3), ((9, 'Rahul'), 2)]
file file_20.txt created
file file_21.txt created
file file_22.txt created
file file_23.txt created
file file_24.txt created
file file_25.txt created
file file_26.txt created
file file_27.txt created
file file_28.txt created
file file_29.txt created
time required = 3.4832839965820312


### Chunksize
* In both map() and starmap() it is possible to split up the items in the iterable evenly to worker processes.
For example, if we had a process pool with 4 processes and an iterable with 40 items, we can split up the items into 4 chunks of 10 items, with one chunk allocated to each worker process.
``` pool.map(task, items, chunksize=10) ```

__Importantly, it is important to note that in all create_and_save_file(), the function calls are issued and executed before the iterator of results is returned__

### 2. __Asynchronous Calls__



# 2.a Apply_async()
```(func[, args[, kwds[, callback[, error_callback]]]])```

* The apply_async() function is an asynchronous method used to execute a function on a single input argument at a time, __but it does not block the program's execution__.
* Instead of waiting for the result, it __returns an AsyncResult__ object immediately, representing the result of the function call (which might not be ready yet).
* You can later use the ```get()``` method on the AsyncResult object to __retrieve the result__, which will block the program's execution until the result is available.


### .get()
We can later use the ```get()``` method on the AsyncResult object to retrieve the results, and this call can block the program's execution until all the results are available.

In [19]:
import time
from multiprocessing import Pool
from apply import create_and_save_file_apply

# protect the entry point
if __name__ == '__main__':
    # create and configure the process pool
    pool = Pool()
    # issue tasks to the process pool
    result = pool.apply_async(create_and_save_file_apply, args=("empid_27468", "Falguni"))
    # report value
    print(result.get(), "created")
    # close the process pool
    pool.close()

file_empid_27468.txt created


# 2.b Map_async()
```(func, iterable[, chunksize[, callback[, error_callback]]])```
* The map_async() function is a non-blocking method that applies the specified function to each element of the input iterable in parallel.
* Return values are collected asynchronously using an __AsyncResult object__.
* The map_async() call returns immediately, allowing the program to continue executing other tasks without waiting for the results.

map_async() is more efficient in dealing with slow-running processes and it can be useful when you have other tasks to perform while waiting for the function calls to complete. Overall program efficiency and responsiveness could be improved using this.

_NOTE_: it's essential to manage the AsyncResult objects correctly to avoid potential issues like race conditions or excessive resource consumption

In [8]:
import time
from multiprocessing import Pool, cpu_count
from pool_map_async import create_and_save_file_map

if __name__ == "__main__":
    strings = [
        "Abhijit",
        "Anas",
        "Ayush",
        "Falguni",
        "Harjot",
        "Jigyasa",
        "Prachi",
        "Saurabh",
        "Sheshant",
        "Rahul"
    ]
    start_time = time.time()

    with Pool(processes=cpu_count()) as pool:
        result_async = pool.map_async(create_and_save_file_map, enumerate(strings))
        print("Other work while waiting for functions to complete...")

        # blocking call using .get()
        results = result_async.get()

        for item in results:
            print(f"file {item} created")

    print("Time required =", time.time() - start_time)

Other work while waiting for function calls to complete...
file file_30.txt created
file file_31.txt created
file file_32.txt created
file file_33.txt created
file file_34.txt created
file file_35.txt created
file file_36.txt created
file file_37.txt created
file file_38.txt created
file file_39.txt created
Time required = 2.5167250633239746


# 2.c Starmap_async()

```(func, iterable[, chunksize[, callback[, error_callback]]])```
* ```starmap_async()``` combination of ```starmap()``` and ```map_async()``` that iterates over iterable of iterables.
* when called unpacks the iterables and returns an AsyncResult object.
  Similar to map_async(), starmap_async() it can be useful when you have other tasks to perform while waiting for the function calls to complete.

In [7]:
from pool_starmap_async import create_and_save_file_starmap
import time
from multiprocessing import Pool, cpu_count

if __name__ == "__main__":
    strings = [
        "Abhijit",
        "Anas",
        "Ayush",
        "Falguni",
        "Harjot",
        "Jigyasa",
        "Prachi",
        "Saurabh",
        "Sheshant",
        "Rahul"
    ]
    start_time = time.time()

    time_out = [1, 2, 3, 1, 2, 3, 1, 2, 3, 2]
    start_time = time.time()

    # prepare arguments
    input_tuples = list(zip(enumerate(strings), time_out))

    print(input_tuples)
    
    # Using multiprocessing pool with worker processes = the number of core available on the os
    with Pool(processes=cpu_count()) as pool:
        # Use map_async instead of map
        result_async = pool.starmap_async(create_and_save_file_starmap, input_tuples)
        print("Other work while waiting for functions to complete...")

        # blocking call using get()
        results = result_async.get()

        for item in results:
            print(f"file {item} created")

    pool.close()

    print("time required =", time.time() - start_time)

[((0, 'Abhijit'), 1), ((1, 'Anas'), 2), ((2, 'Ayush'), 3), ((3, 'Falguni'), 1), ((4, 'Harjot'), 2), ((5, 'Jigyasa'), 3), ((6, 'Prachi'), 1), ((7, 'Saurabh'), 2), ((8, 'Sheshant'), 3), ((9, 'Rahul'), 2)]
Other work while waiting for function calls to complete...
file file_40.txt created
file file_41.txt created
file file_42.txt created
file file_43.txt created
file file_44.txt created
file file_45.txt created
file file_46.txt created
file file_47.txt created
file file_48.txt created
file file_49.txt created
time required = 3.5598671436309814


## Callback
We can send tasks to the process pool, and for each task's returned value, we can specify a callback function to handle and process that value using a callback function

Example usage (not to be run):

In [None]:
def task():
    return "done"

def custom_callback(result):
    print(f'Callback got values: {result}')
 
if __name__ == '__main__':
    
    with Pool() as pool:
        _ = pool.map_async(create_and_save_file_map, enumerate(strings), callback=custom_callback)
        pool.close()

# 3.a imap()

```(func, iterable[, chunksize])```
* Lazy map()
* We can issue tasks to the process pool one-by-one via the imap() function.
* Returns an iterator that yields the results as they become available.

* __Difference between imap() and map()__: map() returns a list containing all results, while imap() returns an iterator that yields results as they are computed
* __Difference between imap() and map_async()__: imap() and map_async() both are non-blocking calls. However, 
while imap() returns an iterator that yields results lazily, map_async() returns an AsyncResult object immediately, and you can retrieve the results later using get(). map_async() still collects all results in memory, but it doesn't block the program while waiting for results, allowing you to perform other tasks.
* __imap() is useful over map() when you want to process large datasets efficiently and avoid high memory usage__



# 3.b imap_unordered
```(func, iterable[, chunksize])```
* The __same as imap()__ except that the ordering of the results.
* The iterable will yield return values as tasks are completed, in the order that tasks were completed, not the order they were issued
* Hence, order of the results could be arbitrary

__Use imap_unordered() when you have a large dataset and the processing order is not critical.__

# Progess Bar
We can add a progress bar using ```tqdm``` to monitor the execution of processes within a pool in real-time

In [None]:
import time
import multiprocessing
from tqdm import tqdm
from multiprocessing import Pool
from progress_bar import slow_square

def slow_square(number):
    # Simulate a computationally inefficient task by adding a delay
    time.sleep(1)
    return number ** 2

if __name__ == "__main__":
    numbers = range(100)
    results = []

    progress_bar = tqdm(total=len(numbers), desc='Processing')

    with Pool() as pool:
        for item in pool.imap(slow_square, numbers):
            results.append(item)

            # Update the progress bar
            progress_bar.update(1)

    # Close the progress bar
    progress_bar.close()

    print("Results with multiprocessing:", results)

### In multiprocessing, any newly created process will do following:
* run independently
* have their own memory space.
the map() function in the multiprocessing.pool module does not directly support process-to-process communication during mapping.
However, The multiprocessing module in Python provides mechanisms for inter-process communication (IPC) to enable communication between different processes

Two of the most popular methods are:
1. Pipes (for a connection between two processes)
2. Queues (which allows multiple producers and consumers)

# 1. Pipe
```multiprocessing.Pipe([duplex])```

* Enables inter-process communication (IPC) between two processes.
* Allows two processes to communicate by creating a unidirectional connection.
* One end of the pipe is used for sending data, and the other end is used for receiving data.
* Exchange data sequentially.
* One process acts as the sender, writing data into the pipe, while the other process acts as the receiver, reading the data from the pipe.

In [29]:
def send_message(conn):
    message = "Hello, team!"

    conn.send(message)  # Sending message through the pipe
    print(f"Sent: {message}")

    conn.close()

def receive_message(conn):
    message = conn.recv()  # Receiving message from the pipe
    print(f"Received: {message}")

In [30]:
import multiprocessing
from pipe import send_message, receive_message

if __name__ == "__main__":
    # Create a pipe
    parent_conn, child_conn = multiprocessing.Pipe()

    # Create two processes: one for sending messages and one for receiving messages
    sender_process = multiprocessing.Process(target=send_message, args=(parent_conn,))
    receiver_process = multiprocessing.Process(target=receive_message, args=(child_conn,))

    sender_process.start()
    receiver_process.start()

    sender_process.join()
    receiver_process.join()

    print("done.")

Sent: Hello, team!
Received: Hello, team!
done.


* The ```Connection.send()``` function can be used to send objects from one process to another.
* The ```Connection.recv()``` function can be used to receive objects in one process sent by another.
* When we call multiprocessing.Pipe() it returns a tupple eg (conn1, conn2). conn1 can only send data and conn2 can only be used to recieve data.

  However, A multiprocessing.Pipe can be used to both send and receive data between two processes.
  This is called a duplex and can be achieved by setting the __“duplex”__ argument to True when creating a pipe
  example: ``` parent_conn, child_conn = multiprocessing.Pipe(duplex=True)```