# Part I  Fundamentals 
https://www.quantstart.com/articles/Parallelising-Python-with-Threading-and-Multiprocessing   
https://www.ploggingdev.com/2017/01/multiprocessing-and-multithreading-in-python-3/    

## Terminology 
* Concurrency is when two or more tasks can start, run, and complete in overlapping time periods. It does not necessarily mean they will ever both be running at the same instant. E.g. multitasking on a single-core machine, in which it is impossible to run in the same instant. However, we can still have concurrency and multi-threading in this single-core case, and achieve performance gain for some special cases (network or IO bound tasks)

* Parallelism is when two or more tasks are executed simultaneously.

* A thread is a sequence of instructions **within a process**. It can be thought of as a lightweight process. Threads **share the same memory space**. Therefore, locking mechanisms such as critical sections (C++) are necessary to protect data from simultaneous accesses. 

* A process is an instance of a program running in a computer which can contain one or more threads. A process has its **independent memory space**. When different processes access same data, mutex is necessary to prevent from simultaneous modification of the data in C++. How about python? 

* Concurrent access to the same data by multiple threads or multiple processes is one of key difficulties in multi-threading / multi-processing calculations. Some problems, e.g. Monte Carlo simulations, usually do not need share states and thus particularly suitable for multi-tasking. The following examples belong to this category. For concurrent access of same data by multiple threads see C++ code and notes elsewhere. 

## Why sometimes a python program runs slower with multiple threads?
Because the **Python interpreter is not thread safe**, the internals of the main Python interpreter, CPython, negate the possibility of true multi-threading due to a process known as the Global Interpreter Lock (GIL). At any one time only a single thread can acquire a lock for a Python object or C API. The interpreter will reacquire this lock for every 100 bytecodes of Python instructions and around (potentially) blocking I/O operations. Because of this lock CPU-bound code will see no gain in performance when using the Threading library, but it will likely gain performance increases if the Multiprocessing library is used.  

## When is Python threading library useful?
Now that Python on the CPython interpreter does not support true multi-core execution via multithreading, what is the usage of its threading library? It is useful in many non-CPU-bound problems such as network programming or data input/output (I/O). In GUI applications to keep the UI thread responsive. Threads should not be used for CPU bound tasks. Using threads for CPU bound tasks will actually result in worse performance compared to using a single thread. Below are some typical IO-bound problems:  

* webscraping
* reading and writing to files
* sharing data between programs
* network communications

## Multi-processing in python
* In C/C++, either multi-threading or multi-processing can achieve performance gain from multi-core CPUs. In python, we can only gain from multi-processing. This works in a fundamentally different way to the Threading library, even though the syntax of the two is extremely similar. 

* The Multiprocessing library actually spawns multiple operating system processes for each parallel task. This nicely side-steps the GIL, by giving each process its own Python interpreter and thus own GIL. Hence each process can be fed to a separate processor core and then regrouped at the end once all processes have finished.

* There are some drawbacks, however. Spawning extra processes introduces I/O overhead as data is having to be shuffled around between processors. This can add to the overall run-time. However, assuming the data is restricted to each process, it is possible to gain significant speedup. Of course, one must always be aware of Amdahl's Law, which indicates the gain must have a limit due to the portion of un-parallelized code. 

* Typical CPU-bound processes that are suitable for multi-processing
    * computations
    * text formatting
    * image rescaling
    * data analysis


## Running multi-threading and multiprocessing python code
* Multi-threading python code can be run directly within Jupyter notebook.  

* Multi-processing python code cannot be run directly within Jupyter notebook (at least for current version). It needs further steps to run multi-processing code in Jupyter (check web for details). A simple way out here is to run the code in VS code or command line. Or use Jupyter magic functions to simulating the command line running, as shown in the following examples. 

## Vectorization
In numerical calculation, the vectorization can sometimes gain much more than either multi-threading or multi-processing. The gain can be a few hundreds times due to specific CPU structures. If we employ both vectorization and multi-tasking, then the over gain should be improved. 

# Examples -- Part I 

## Multithreading Example: Webscraping

Historically, the programming knowledge required to set up multithreading involved a good understanding of Python's Global Interpreter Lock (the GIL prevents multiple threads from running the same Python code at once). Also, you had to set up special classes that behave like Producers to divvy up the work, Consumers (aka "workers") to perform the work, and a Queue to hold tasks and provide communications. And that was just the beginning.

Fortunately, with the `map()` function and two standard libraries, *multiprocessing* and *multiprocessing.dummy*, setting up parallel processes and threads can become fairly straightforward.  

Here's a classic multithreading example provided by [IBM](http://www.ibm.com/developerworks/aix/library/au-threadingpython/) and adapted by [Chris Kiehl](http://chriskiehl.com/article/parallelism-in-one-line/) where you divide the task of retrieving web pages across multiple threads:




In [None]:
import time 
import threading 
import Queue 
import urllib2 

class Consumer(threading.Thread): 
    def __init__(self, queue): 
    threading.Thread.__init__(self)
    self._queue = queue 

    def run(self):
    while True: 
        content = self._queue.get() 
        if isinstance(content, str) and content == 'quit':
        break
        response = urllib2.urlopen(content)
    print 'Thanks!'


def Producer():
    urls = [
    'http://www.python.org', 'http://www.yahoo.com'
    'http://www.scala.org', 'http://www.google.com'
    # etc.. 
    ]
  
  queue = Queue.Queue()
  
  worker_threads = build_worker_pool(queue, 4)
  start_time = time.time()

  # Add the urls to process
  for url in urls: 
    queue.put(url)  
  # Add the poison pill
  for worker in worker_threads:
    queue.put('quit')
  for worker in worker_threads:
    worker.join()

  print 'Done! Time taken: {}'.format(time.time() - start_time)

def build_worker_pool(queue, size):
  workers = []
  for _ in range(size):
    worker = Consumer(queue)
    worker.start() 
    workers.append(worker)
  return workers

if __name__ == '__main__':
  Producer()

Using the multithreading library provided by the *multiprocessing.dummy* module and `map()` all of this becomes:

In [None]:
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
#This is from multiprocessing.dummy, but it is a multithreading example, as shown in the title? 

pool = ThreadPool(4) # choose a number of workers

urls = [
'http://www.python.org', 'http://www.yahoo.com'
'http://www.scala.org', 'http://www.google.com'
# etc.. 
]

results = pool.map(urllib2.urlopen, urls)
pool.close() 
pool.join()


In the above code, the *multiprocessing.dummy* module provides the parallel threads, and `map(urllib2.urlopen, urls)` assigns the labor!

## Multiprocessing Example: Monte Carlo

### Monte Carle Method and Estimating Pi

For a given number of points *n*, we have $$π = \frac{4 \cdot points\ inside\ circle}{total\ points\ n}$$

To set up our multiprocessing program, we first derive a function for finding Pi that we can pass to `map()`:

In [3]:
from random import random  # perform this import outside the function

def find_pi(n):
    """
    Function to estimate the value of Pi
    """
    inside=0

    for i in range(0,n):
        x=random()
        y=random()
        if (x*x+y*y)**(0.5)<=1:  # if i falls inside the circle
            inside+=1

    pi=4*inside/n
    return pi

Let's test `find_pi` on 5,000 points:

In [4]:
find_pi(5000)

3.1168

This ran very quickly, but the results are not very accurate!

Next we'll write a script that sets up a pool of workers, and lets us time the results against varying sized pools. We'll set up two arguments to represent *processes* and *total_iterations*. Inside the script, we'll break *total_iterations* down into the number of iterations passed to each process, by making a processes-sized list.<br>For example:

    total_iterations = 1000
    processes = 5
    iterations = [total_iterations//processes]*processes
    iterations
    # Output: [200, 200, 200, 200, 200]
    
This list will be passed to our `map()` function along with `find_pi()`

In [5]:
%%writefile test.py
from random import random
from multiprocessing import Pool
import timeit

def find_pi(n):
    """
    Function to estimate the value of Pi
    """
    inside=0

    for i in range(0,n):
        x=random()
        y=random()
        if (x*x+y*y)**(0.5)<=1:  # if i falls inside the circle
            inside+=1

    pi=4*inside/n
    return pi

if __name__ == '__main__':
    N = 10**5  # total iterations
    P = 5      # number of processes
    
    p = Pool(P)
    print(timeit.timeit(lambda: print(f'{sum(p.map(find_pi, [N//P]*P))/P :0.7f}'), number=10)) 
    # Take the average of the pi calculated and then print out. 
    
    p.close()
    p.join()
    print(f'{N} total iterations with {P} processes')

Overwriting test.py


In [6]:
! python test.py

3.1367600
3.1442800
3.1329200
3.1370800
3.1380800
3.1417600
3.1467200
3.1326000
3.1434000
3.1481200
0.2432337451411549
100000 total iterations with 5 processes


Now that we know our script works, let's increase the number of iterations, and compare two different pools. 

In [7]:
%%writefile test.py
from random import random
from multiprocessing import Pool
import timeit

def find_pi(n):
    """
    Function to estimate the value of Pi
    """
    inside=0

    for i in range(0,n):
        x=random()
        y=random()
        if (x*x+y*y)**(0.5)<=1:  # if i falls inside the circle
            inside+=1

    pi=4*inside/n
    return pi

if __name__ == '__main__':
    N = 10**7  # total iterations
    
    P = 1      # number of processes
    p = Pool(P)
    print(timeit.timeit(lambda: print(f'{sum(p.map(find_pi, [N//P]*P))/P:0.7f}'), number=10))
    p.close()
    p.join()
    print(f'{N} total iterations with {P} processes')
    
    P = 5      # number of processes
    p = Pool(P)
    print(timeit.timeit(lambda: print(f'{sum(p.map(find_pi, [N//P]*P))/P:0.7f}'), number=10))
    p.close()
    p.join()
    print(f'{N} total iterations with {P} processes')

Overwriting test.py


In [8]:
! python test.py

3.1415920
3.1412692
3.1409872
3.1418424
3.1413748
3.1413968
3.1419348
3.1407936
3.1410084
3.1413128
38.84810802688602
10000000 total iterations with 1 processes
3.1410436
3.1414548
3.1416328
3.1414500
3.1417748
3.1412176
3.1412932
3.1414372
3.1415492
3.1418072
12.2747860630176
10000000 total iterations with 5 processes


## More is Better ...to a point.

The gain in speed as you add more parallel processes tends to flatten out at some point. In any collection of tasks, there are going to be one or two that take longer than average, and no amount of added processing can speed them up. This is best described in [Amdahl's Law](https://en.wikipedia.org/wiki/Amdahl%27s_law).

**The speed-up depends on the number of CPU and the portion of code that cannot be parallelized.**

## Advanced Script

In the example below, we'll add a context manager to shrink these three lines

    p = Pool(P)
    ...
    p.close()
    p.join()
    
to one line:

    with Pool(P) as p:
    
And we'll accept command line arguments using the *sys* module.
    

In [None]:
%%writefile test2.py
from random import random
from multiprocessing import Pool
import timeit
import sys

N = int(sys.argv[1])  # these arguments are passed in from the command line
P = int(sys.argv[2])

def find_pi(n):
    """
    Function to estimate the value of Pi
    """
    inside=0

    for i in range(0,n):
        x=random()
        y=random()
        if (x*x+y*y)**(0.5)<=1:  # if i falls inside the circle
            inside+=1

    pi=4*inside/n
    return pi

if __name__ == '__main__':
    
    with Pool(P) as p:
        print(timeit.timeit(lambda: print(f'{sum(p.map(find_pi, [N//P]*P))/P:0.5f}'), number=10))
    print(f'{N} total iterations with {P} processes')

In [None]:
! python test2.py 10000000 500

# Examples -- Part II  

## The use of threads for filesystem IO
A queue is used to store the files that need to be processed. A dictionary is used to store the input and output file names. The process_queue() function is used to retrieve items from the queue and perform the copy operation. The copy operation is done in the copy_op function using the shutil module.

Note : The v1.mp4 and v2.mp4 were 250MB each

In [None]:
import threading
from queue import Queue
import time
import shutil

print_lock = threading.Lock()

def copy_op(file_data):
    with print_lock:
        print("Starting thread : {}".format(threading.current_thread().name))

    mydata = threading.local()
    mydata.ip, mydata.op = next(iter(file_data.items()))

    shutil.copy(mydata.ip, mydata.op)

    with print_lock:
        print("Finished thread : {}".format(threading.current_thread().name))

def process_queue():
    while True:
        file_data = compress_queue.get()
        copy_op(file_data)
        compress_queue.task_done()

compress_queue = Queue()

output_names = [{'v1.mp4' : 'v11.mp4'},{'v2.mp4' : 'v22.mp4'}]

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for file_data in output_names:
    compress_queue.put(file_data)

compress_queue.join()

print("Execution time = {0:.5f}".format(time.time() - start))

Note : The v1.mp4 and v2.mp4 were 250MB each.  
7 to 10 seconds was the time taken when using one thread  
4.5 to 5.5 seconds was the time taken when using two threads  
So it’s clear that threads can be used for parallel filesystem IO.  

## The use of threads for network IO
The following example demonstrates the use of threads for network IO using the requests library. This is a toy example use case of threads for networking IO.

In [None]:
import threading
from queue import Queue
import requests
import bs4
import time

print_lock = threading.Lock()

def get_url(current_url):

    with print_lock:
        print("\nStarting thread {}".format(threading.current_thread().name))
    res = requests.get(current_url)
    res.raise_for_status()

    current_page = bs4.BeautifulSoup(res.text,"html.parser")
    current_title = current_page.select('title')[0].getText()

    with print_lock:
        print("{}\n".format(threading.current_thread().name))
        print("{}\n".format(current_url))
        print("{}\n".format(current_title))
        print("\nFinished fetching : {}".format(current_url))

def process_queue():
    while True:
        current_url = url_queue.get()
        get_url(current_url)
        url_queue.task_done()

url_queue = Queue()

url_list = ["https://www.google.com"]*5

for i in range(5):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for current_url in url_list:
    url_queue.put(current_url)

url_queue.join()

print(threading.enumerate())

print("Execution time = {0:.5f}".format(time.time() - start))

Single thread : 4 seconds  
Two threads : 3 seconds  
Five threads : 2 seconds  
In network IO, most of the time is spent waiting for the response from the URL, so this is another use case where using threads improves performance.  

## The use of thread in CPU bound tasks
In the following program a queue holds numbers. The task is to find the sum of prime number less than or equal to the given number. This is clearly a CPU bound task. 

In [None]:
import threading
from queue import Queue
import time

list_lock = threading.Lock()

def find_rand(num):
    sum_of_primes = 0

    ix = 2

    while ix <= num:
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1

    sum_primes_list.append(sum_of_primes)

def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

def process_queue():
    while True:
        rand_num = min_nums.get()
        find_rand(rand_num)
        min_nums.task_done()

min_nums = Queue()

rand_list = [1000000, 2000000, 3000000]
sum_primes_list = list()

for i in range(2):
    t = threading.Thread(target=process_queue)
    t.daemon = True
    t.start()

start = time.time()

for rand_num in rand_list:
    min_nums.put(rand_num)

min_nums.join()

end_time = time.time()

sum_primes_list.sort()
print(sum_primes_list)

print("Execution time = {0:.5f}".format(end_time - start))

Single thread : 25.5 seconds
Two threads : 28 seconds
The results are very clear : don't use threads to improve performance of CPU bound tasks. You will always end up with worse performance.  **Note these statements are only true for python. For other languages such as C/C++, threads can also improve performance for CPU bound tasks.**  

## Multiprocessing for parallel execution of tasks
In the following example we take the same task used above and process the inputs in parallel using the multiprocessing module.

In [None]:
%%writefile test1.py

from multiprocessing import Pool
import time

def sum_prime(num):
    
    sum_of_primes = 0

    ix = 2

    while ix <= num:
        if is_prime(ix):
            sum_of_primes += ix
        ix += 1

    return sum_of_primes

def is_prime(num):
    if num <= 1:
        return False
    elif num <= 3:
        return True
    elif num%2 == 0 or num%3 == 0:
        return False
    i = 5
    while i*i <= num:
        if num%i == 0 or num%(i+2) == 0:
            return False
        i += 6
    return True

if __name__ == '__main__':
    
    start = time.time()
    with Pool(4) as p:
        print(p.map(sum_prime, [1000000, 2000000, 3000000])) #original sentence
    print("Time taken = {0:.5f}".format(time.time() - start))

In [None]:
!python test1.py

Using a single process : 20.29 seconds  
Using four processes : 11.59 seconds  

We see a huge improvement from using a single process to using four multiple processes (quad cores CPU). So using the multiprocessing module results in the full utilization of the CPU. However, using more than four processes will not improve performance anymore. 

## Inter-process communication  
Inter process communication can be achieved using queues or pipes.  

* The Queue in the multiprocessing module works similar to the queue module used to demonstrate how the threading module works earlier. 

* Another useful communication mechanism between processes is a pipe. A pipe is a duplex (two way) communication channel. Note : Reading or writing to the same end of the pipe simultaneously can result in data corruption. The following is a basic example:

In [1]:
%%writefile test2.py
import multiprocessing as mp
import os

def info(conn):
    conn.send("Hello from {}\nppid = {}\npid={}".format(mp.current_process().name, os.getppid(), os.getpid()))
    conn.close()

if __name__ == '__main__':

    parent_conn, child_conn = mp.Pipe()
    p = mp.Process(target=info, args=(child_conn,))
    p.daemon = True
    p.start()
    print(parent_conn.recv())

Writing test2.py


In [2]:
!python test2.py

Hello from Process-1
ppid = 11044
pid=15072
