# Multiprocessing and Multithreading

## Sections

- [Multithreading](#Multithreading)
- [Multiprocessing](#Multiprocessing)
- [Pros and Cons](#Pros-and-Cons)
- [Multiprocessing Example](#Multiprocessing-Example)
- [Threading Example](#Threading-Example)
- [Sharing Data with Processes Using Value](#Sharing-Data-with-Processes-Using-Value)
- [Sharing Data with Processes Using Array](#Sharing-Data-with-Processes-Using-Array)
- [Implementing Multiprocess Locks](#Implementing-Multiprocess-Locks)


Python is slow comparing to other programming languages.

In a Python script execution:
- When you type python script.py in your shell you instruct your processor to create and to schedule a single process which is the smallest unit of processing
- The allocated process will start to execute the script line by line.
- Once the script hit the EOF, the process will be terminated and its resources will be returned to the free pool to be used by other processes.

We can overcome the Python speed problem using multiprocessing or multithreading.

For example, with multithreding, we can have inside a process multiple threads.

But the problem with assigning a lot of threads to one process without special handling is what’s called Race Condition.

CPython uses GIL(Global Interpreter Lock) to protect memory amont multiple threads.

## Multithreading

Threading in python is used to run multiple threads (tasks, function calls) at the same time inside the same process.

Thereding is not suitable for CPU intensive application

Python threads are best used in cases where the execution of a task involves some waiting, for example working with web servers, networking devices, SQL databases.

Python uses the threading module to start multiple threads.

## Multiprocessing

Multiprocessing achieves true parallelism in Python.

Low risk of data-corruption when using multiprocessing.

Each spawned process will have their own allocated memory.

Each process has it’s owned GIL so there’s no resource conflict or race condition here.

Python uses the multiprocessing module to achieve parallel programming.

Multiptocessing is used in CPU intensive applications.

## Pros and Cons

__Multiprocessing Pros__

- Separate memory space.
- Takes advantage of multiple CPUs & cores.
- Avoids GIL limitations of CPython.
- Child processes are interruptible/killable.
- A must with CPython for CPU-bound processing.

__Multiprocessing Cons__

- Inter Process Communication (IPC) a little more complicated with more overhead.
- Larger memory consumption.

__Multithreading Pros__

- Lightweight , low memory consumption.
- Shared memory, makes access to state from another context easier.
- Allows you to easily make responsive UIs.
- Great option for I/O - bound applications.

__Multithreading Cons__

- CPython, subject to the GIL.
- Threads are not interruptible/killable.
- If not following a command queue/message model (using the Queue module), then manual use of synchronization becomes a necessity.
- Code is usually harder to understand and to get right due to the potential of race conditions increases dramatically.

### Multiprocessing Example

In [8]:
# Importing the module multiptocessing

import multiprocessing as mp
import time
 
# Creating the target function. Each process will execute this function in parallel
def name_and_time(name):
    print(f"Hello {name}, current time is {time.time()}")
    print("Sleeping for 2 seconds ...")
    time.sleep(2)
    print("Walking up ... finishing function.")
    
# This will be run only if the script is run directly (not imported as a module in another script)

if __name__ == '__main__':
    process_list = list()     # list that stores the processes
 
    # Creating 5 processes
    for i in range(5):
        ## Creating each process. 1st argument is the target function
        ## 2nd argument is a tuple (target function's arguments)
        process = mp.Process(target=name_and_time, args=("Popeye",))
        process_list.append(process)    # Appending each process to the list
 
    # Iterating over the list and start each process
    for p in process_list:
        p.start()
 
    # Join the processes back to the main process OR
    # The main process will wait for forked-processes to finish
    for p in process_list:
        p.join()
 
    print("Other instructions of the main module...")
    print("More instructions of the main module...")
    print("End of Script")

Other instructions of the main module...
More instructions of the main module...
End of Script


### Threading Example

In [13]:
# Importing the module
import threading
import time
 
# Creating the target function. Each thread will execute this function in parallel
def name_and_time(name):
    print(f"Hello {name}, current time is {time.time()}")
    print("Sleeping for 5 seconds ...")
    time.sleep(5)
    print("\nWalking up ... finishing function.")

# This will be run only if the script is run directly 
if __name__ == '__main__':
    thread_list = list()     # list that stores the threads
 
    # Creating 3 threads
    for i in range(3):
        ## Creating each thread. 1st argument si the target function
        ## 2nd argument is a tuple (target function's arguments)
        thread = threading.Thread(target=name_and_time, args=("Popeye",))
        thread_list.append(thread)  # Appending each thread to the list
 
    ## Iterating over the list and start each thread
    for t in thread_list:
        t.start()
 
 
    ## Join the threads back to the main thread OR
    ## The main thread will wait for forked-threads to finish
    ## This is optional
    for t in thread_list:
        t.join()
 
    print("Other instructions of the main module...")
    print("More instructions of the main module...")
    print("End of Script")

Hello Popeye, current time is 1606880190.5148027
Sleeping for 5 seconds ...
Hello Popeye, current time is 1606880190.517795
Sleeping for 5 seconds ...
Hello Popeye, current time is 1606880190.5207868
Sleeping for 5 seconds ...

Walking up ... finishing function.
Walking up ... finishing function.

Walking up ... finishing function.

Other instructions of the main module...
More instructions of the main module...
End of Script


### Sharing Data with Processes Using Value

In [14]:
import multiprocessing as mp
 
# Target function that increments a counter (multiprocessing.Value)
def increment(counter):
    counter.value += 1
 
# Target function that increments a counter (integer)
def my_increment(my_counter):
    my_counter  += 1
  
if __name__ == '__main__':
    my_counter = 1      # type integer
    counter = mp.Value('i', 1)   # type multiprocessing.Value
 
    # Creating, starting and joining 5 processes. They increment the counter. This is of type multiprocessing.Value
    for i in range(5):
        process = mp.Process(target=increment, args=(counter,))
        process.start()
        process.join()
 
    print(f'counter of type multiprocessing.Value is {counter.value}')
 
 
    # Creating, starting and joining 5 processes. They increment my_counter. This is of type integer
    for i in range(5):
        process = mp.Process(target=my_increment, args=(my_counter,))
        process.start()
        process.join()
 
    print(f'my_counter of type integer is {my_counter}')

counter of type multiprocessing.Value is 1
my_counter of type integer is 1


### Sharing Data with Processes Using Array

In [15]:
## Sharing data between processes using multiprocessing.Array
 
import multiprocessing as mp
 
# Target function1
# The 2nd parameter is a list that will be modified inside the forked process
def squares(numbers, squares_list):
    for n in numbers:
        squares_list.append(n**2)
    print(f'square_list inside process {squares_list}')
 
# Target function
# The 2nd parameter is an Array used for sharing data between processes
def cubes(numbers, result):
    i = 0
    for num in numbers:
        result[i] = num ** 3
        i += 1
    print(f'result Array inside process/function: {result[::]}')
 
 
# This will be run only if the script is run directly (not imported as a module in another script)
# This is necessary!
if __name__ == '__main__':
 
    # Calculating the square of these numbers in a new process
    # Adding the result (the squares) in a list inside a process
    numbers = [1,2,3]
    squares_list = list()
 
    # Creating, Starting and Joining the process
    p = mp.Process(target=squares, args=(numbers, squares_list))
    p.start()
    p.join()
 
    # Printing the squares_list
    # !! It hasn't been modified inside the process. The main process and the forked process didn't work on the same data
    print(f'squares_list outsite process {squares_list}')
 
    # Creating an Array to share data beetween main process and the forked process
    result = mp.Array('i', len(numbers))
 
    # Creating, Starting and Joining the process
    p1 = mp.Process(target=cubes, args=(numbers, result))
    p1.start()
    p1.join()
 
    # Printing the Array. It has been modified inside the forked-process. The array was shared between main and forked process
    print(f'result Array outside process {result[::]}')

squares_list outsite process []
result Array outside process [0, 0, 0]


### Implementing Multiprocess Locks

In [16]:
import multiprocessing as mp
import time
 
# Target function. It increments a balance by 0.01  100 times
def deposit(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        balance.value += 1
        lock.release()
 
# Target function. It decrements a balance by 0.01  100 times
def withdraw(balance, lock):
    for i in range(100):
        time.sleep(0.01)
        lock.acquire()
        balance.value -= 1
        lock.release()
 
if __name__ == '__main__':
    balance = mp.Value('i', 500)    ## starting balance
    print(f'Balance BEFORE running processes: {balance.value}')
 
    lock = mp.Lock()    # lock Object
 
    # Creating, starting and joining 2 processes. They increment and decrement the shared value
    p1 = mp.Process(target=deposit, args=(balance, lock))
    p2 = mp.Process(target=withdraw, args=(balance, lock))
 
    p1.start()
    p2.start()
 
    p1.join()
    p2.join()
 
    # The final value of balance
    print(f'Balance AFTER running processes: {balance.value}')

Balance BEFORE running processes: 500
Balance AFTER running processes: 500


## Concurrent Applications using Asyn IO

Async IO is a single threaded, single process design that uses cooperative multitasking.

Cooperative  is no OS intervention, each task decides when to give up.

Async IO is language agnostc, it is implemented in several laguages.

Asyncio is the Python Standar Library that provides support for implementing Asinc IO. Avoids the memry issues from the hreading design.

Paramiko that is very common for networking engineers, does not use Async IO, uses threading.

AsyncSSH is the option for Asyncronous SSH that uses Async IO.

In [1]:
import asyncio
import time

In [2]:
# Sync code
def sync_func():
    print(" one ", end="")
    time.sleep(1)
    print(" two ", end="")
    time.sleep(1)
    print(" three ", end="")

for _ in range(3):
    sync_func()

 one  two  three  one  two  three  one  two  three 

In [4]:
# Async code
async def async_func():
    print("one", end="")
    await asyncio.sleep(1)
    print("two", end="")
    await asyncio.sleep(1)
    print("three", end="")
    
async def main():
    #  task = [async_f(), async_f(), asyncio] 
    task = [async_f() for _ in range(3)]

    await asyncio.gather(*tasks)

# It seems it seams something else to run on Jupyter Notebook
# asyncio.run(main())

In [None]:
# Example for iterating with the shell of multiple devices
import asyncio

async def run(cmd):
    proc = await asyncio.create_subprocess-shell(cmd, stdout=asyncio.subprocess.PIP, stderr=asyncio.subprocess.PIPE)
    stdout, stderr = await proc.communicate()
    
    print(f"{cmd} exited with status code. {proc.returncode}")
    
    if stdout:
        print(f"STDOUT:\n{stdout.decode("UTF-8")}")
              
    if stderr:
        print(f"STDERR:\n{stderr.decode("UTF-8")}")
    
async def main(commads):
    tasks = []
    for cmd in commands:
        tasks.append(run(cmd))
    
    await asyncio.gather(*tasks)

commands = ("ifconfig", "ls", "who", "uname -a")
asyncio.run(main(commands))
              

### References for Async IO

[Async IO in Python: A Complete Walkthrough (by Real Python)](https://realpython.com/async-io-python/)

### AsyncSSH

Requires the instalation of an external library

In [9]:
import asyncio
# import asyncssh

In [None]:
# One command, one client

async def connect_and_run(host, username, password, command):
    async with async.connect(host=host, username0username, password=password, known-hosts=None) as connection:
        result = await conection.run(command)
        return result

command = "ifconfig"
result = async.run(connect_and_run("192.168.0.50", "student", "student", command))
print(f"STDOUT:\n {result.stdout}")
print(f"STDERR:\n {result.stderr}")

In [None]:
# Multiple commands, one client

async def connect_and_run(host, username, password, commands):
    async with async.connect(host=host, username0username, password=password, known-hosts=None) as connection:
    
    results = []
    
    for cmd in commands:
        result = await conection.run(comd)
        results.append(result)

commands = ("ifconfig", "who -a", "uname -a")
results = async.run(connect_and_run("192.168.0.50", "student", "student", commands))
for result in results:
    print(f"STDOUT:\n {result.stdout}")
    print(f"STDERR:\n {result.stderr}")

In [None]:
# Multiple commands, multiple client

async def connect_and_run(host, username, password, command):
    async with async.connect(host=host, username0username, password=password, known-hosts=None) as connection:
        return await connection.run(command)
        
async def run_multiple_client(hosts):
    tasks = []
    for host in hosts:
        task = run_client(host["host"], host["username"], host["password"], host["command"])
        tasks.append(task)
    
    result = await asyncio.gather(*taks, return_exceptions=True)

    i = 0
    for result in results:
        i += 1
        if isinstance(result, Exception):
            print(f"Task {i} failed. {str(result)}"")
        elif result.exit_status != 0:
            print(f"taks {i} existed with status. {result.exit_status}")
            print(result.stderr, end="")
        else:
            print(f"Task {i} succeeded:")
            print(result.stdout, end="")
        print("#" * 50)
    
hosts = [
    {"host": "192.168.1.1", "username": "student", "password": "student", "command": "ifconfif"},
    {"host": "192.168.1.2", "username": "student", "password": "student", "command": "who"},
    {"host": "192.168.1.3", "username": "student", "password": "student", "command": "uname -a"},    
]
                  
asyncio.run(run_multile-clients(hosts))