Multithreading & Multiprocessing in python
==========================================

# General Understanding

Art of doing multiple task at the same time. 'Same time' is bit of an confusing term. If there are multiple processors then we can actually do multiple task simultaneously. Else we can atleast take advantage of process which are I/O blocked.  

## Multi-Threading
Multithreading in Python allows to take advantage when our operation is blocked by I/O. Like calling multiple webservices or database calls or network calls at the same time.

## Multi-Processing
If we want to take advantage of the multiple processors present on the machine we would need to use multiprocessing


## Example without multithreading or multiprocessing

In the below example we are executing code without multithreading.
It will take 4 seconds as there are two functions which take 2 seconds each.

In [None]:
import time
import threading

def calculate_raise_to(x, y):
    time.sleep(2)
    print([i ** y for i in x])

if __name__ == '__main__':
    input = [1,2,3,4,5,6,7]
    start = time.time()
    calculate_raise_to(input, 2)
    calculate_raise_to(input,3,)
    print('time taken', round(time.time()-start,2) , 'seconds')

## Example with multithreading

In the below example we are executing code with multithreading.
It will take 2 seconds as there are two functions which are running in parallel and waiting also in parallel, due to the above it finishes in just more than 2 seeconds.

In [None]:
import time
import threading

def calculate_raise_to(x, y):
    time.sleep(2)
    print([i ** y for i in x])

if __name__ == '__main__':
    input = [1,2,3,4,5,6,7]
    start = time.time()
    t1 = threading.Thread(target = calculate_raise_to, args=(input,2,))
    t2 = threading.Thread(target = calculate_raise_to, args=(input,3,))
    t1.start()
    t2.start() 
    t1.join()
    t2.join()
    print('time taken', round(time.time()-start,2) , 'seconds')

## Global variables with multithreading

Multithreading happens with in the same process, so the global variables are shared accross various threads.

Check in the below example that the global variables would be filled up with results

In [None]:
import threading

global_results = []

def calculate_raise_to(x, y):
    global global_results
    results.append([i ** y for i in x])
    print('results is',global_results)

if __name__ == '__main__':
    input = [1,2,3,4,5,6,7]
    t1 = threading.Thread(target = calculate_raise_to, args=(input,2,))
    t1.start()
    t1.join()
    print('results is',global_results)

## Example with multithreading

In the below example we are executing code with multiprocessnig. It will take 2 seconds as there are two functions which are running on seperate processes which take 2 seconds each.

If you take a snapshot of process before and after the process are started, then we can notice that we have a couple of process added and then removed once the processes are finished

In [None]:
import time
import multiprocessing as mp
import psutil

def print_python_processes(msg):
    print(msg)
    for pid in psutil.process_iter():
        if('python' in str(pid)):
            print(str(pid))
        
def calculate_raise_to(x, y):
    time.sleep(5)
    print([i ** y for i in x])


if __name__ == '__main__':
    input = [1,2,3,4,5,6,7]
    start = time.time()
    p1 = mp.Process(target = calculate_raise_to, args=(input,2,), name='Process_test_1_123')
    p2 = mp.Process(target = calculate_raise_to, args=(input,3,), name='Process_test_2_abc')
    print_python_processes('processes before we start our process')
    p1.start()
    p2.start() 
    print_python_processes('processes after we start our process')
    p1.join()
    p2.join()
    print_python_processes('processes after we end our process')
    print('time taken', round(time.time()-start,2) , 'seconds')

## Global variables with multiprocessing

Multiprocessing happens with on different process, so the global variables are on discrete/different address spaces and are not shared.

Check in the below example that the global variables would ***not*** be filled up with results

In [None]:
import multiprocessing

results = []

def calculate_raise_to(x, y):
    global results
    results.append([i ** y for i in x])
    print('results is',results)
    
if __name__ == '__main__':
    input = [1,2,3,4,5,6,7]
    t1 = multiprocessing.Process(target = calculate_raise_to, args=(input,2,))
    t1.start()
    t1.join()
    print('results is',results)

## Share memory between processes # 1

As discribed above we would need something special to share information across processes.

There are many ways, two of the them creating special arrays and values.

Couple of examples below will explain the same

In [None]:
import multiprocessing
import time

def calculate_raise_to(x, y, results6, value6):
    print('result before sleep is', results6[:])
    print('value before sleep is', value6.value)
    time.sleep(10)
    print('result after sleep is', results6[:])
    print('value after sleep is', value6.value)
    for index,value in enumerate(x):
        results6[index] = results6[index] + (value ** y)
    print('results before we return',results6[:]) 
    value6.value = 10
    print('value before we return', value6.value)


if __name__ == '__main__':
    input7 = [1,2,3,4,5]
    value7 = multiprocessing.Value('i')
    value7.value = 1
    result7 = multiprocessing.Array('i', len(input7))
    t1 = multiprocessing.Process(target = calculate_raise_to, args=(input7,2,result7,value7))
    t1.start()
    time.sleep(5)
    result7[0] = 10
    value7.value = 2
    t1.join()
    print('results after we return',result7[:]) 
    print('value after we return', value7.value)


## Share memory between processes # 2

Queue is a very good way of sharing data between multiprocesses.

Process 1 --> Queue --> Process 2

Below is a sample producer consumer problem

In [None]:
import multiprocessing


def produce(queuep):
    for i in range(100):
        print('in produce', i)
        queuep.put(i)


def consume(queuec):
    for i in range(100):
        print('in consume', queuec.get())

if __name__ == '__main__':
    queue = multiprocessing.Queue(10)
    consume_process = multiprocessing.Process(name='consumer', target=consume, args=(queue,))
    produce_process = multiprocessing.Process(name='producer', target=produce, args=(queue,))
    consume_process.start()
    produce_process.start()
    consume_process.join()
    produce_process.join()


## Lock(synchronized memory) between processes

Its sometimes required to ensure that there are no process are accessing the shared memory at the same time. If they do, they might end up in using stale state of the value and might missout on writes.

Lock helps us ensuring that the code are that is using shared state (reading and writing both), would be executed in exclusitivity and will never end up using the sate state of the value.

Below example demostrates the same

In [None]:
import multiprocessing
import time


def deposit(dollard):
    for k in range(100):
        time.sleep(0.01)
        dollard.value = dollard.value + 1


def withdraw(dollarw):
    for j in range(100):
        time.sleep(0.01)
        dollarw.value = dollarw.value - 1


def deposit_lock(dollard, lockd):
    for k in range(100):
        time.sleep(0.01)
        lockd.acquire()
        dollard.value = dollard.value + 1
        lockd.release()


def withdraw_lock(dollarw, lockw):
    for j in range(100):
        time.sleep(0.01)
        lockw.acquire()
        dollarw.value = dollarw.value - 1
        lockw.release()


def test_without_lock():
    error_count = 0
    for i in range(10):
        dollars = multiprocessing.Value('i', 200)
        consume_process = multiprocessing.Process(name='deposit', target=deposit, args=(dollars,))
        produce_process = multiprocessing.Process(name='withdraw', target=withdraw, args=(dollars,))
        consume_process.start()
        produce_process.start()
        consume_process.join()
        produce_process.join()
        if dollars.value != 200:
            error_count = error_count + 1
    print('Error Count without lock', error_count)


def test_with_lock():
    error_count = 0
    for i in range(10):
        lock = multiprocessing.Lock()
        dollars = multiprocessing.Value('i', 200)
        consume_process = multiprocessing.Process(name='deposit', target=deposit_lock, args=(dollars, lock,))
        produce_process = multiprocessing.Process(name='withdraw', target=withdraw_lock, args=(dollars, lock,))
        consume_process.start()
        produce_process.start()
        consume_process.join()
        produce_process.join()
        if dollars.value != 200:
            error_count = error_count + 1
    print('Error Count with lock', error_count)


if __name__ == '__main__':
    test_without_lock()
    test_with_lock()


## Use Pool for map-reduce paradigm

Many times we have a task which are applied on a very big list/set, but the task can be excuted on the individual elements simultaneously and independently.

For such problems we can use Pools from multiprocessing which can divide the work on available processors and then construct the result again
