## Lecture 9: Python Concurrency
### March 29, 2021

Partly based on [https://nyu-cds.github.io/python-concurrency/](https://nyu-cds.github.io/python-concurrency/)

**Notes**: project office hours Tuesdays at 4pm ET (send me an email beforehand: alberto.bietti@nyu.edu)


## Improving performance by using concurrency

Concurrency vs parallelism:

    Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.
    
[source](https://medium.com/@itIsMadhavan/concurrency-vs-parallelism-a-brief-review-b337c8dac350)

We will illustrate some benefits of concurrency with a program downloading images from the `imgur.com` website.

For this you will need to:

- create an account in [imgur.com](https://imgur.com/)
- register your application [here](https://api.imgur.com/oauth2)
  - Authorization Type: __OAuth 2 authorization with a callback URL__
  - Authorization Callback URL: __https://www.getpostman.com/oauth2/callback__
  - email:
  - Description:
  

---
The functions below fetchs a list of images and download them __imgur__ repository: 
[https://imgur.com/](https://imgur.com/)

- We will start with a version that downloads images sequentially, or one at a time

- Then improve the performance by introducing multiprocessing and threading

---
We will split the functionality into three separate functions, see the file `download.py`
- get_links
- download_link
- setup_download_dir

In [None]:
from time import time

# 'replace with your client ID'
CLIENT_ID = 'YOUR_ID'
from download import setup_download_dir, get_links, download_link

ts = time()
download_dir = setup_download_dir()

links = [l for l in get_links(CLIENT_ID)]

for i, link in enumerate(links):
    print("%2d %s" % (i, link))
    download_link(download_dir, link)

print('Took {}s'.format(time() - ts))

In [None]:
ls images/

---

- To improve the performance of the image downloader we can run **multiple copies** of the program at the same time. 


- However, we would need to know what images are available so that we could ensure that one process didn’t download an image that had already been downloaded by a different process.  


- Fortunately the multiprocessing module is available for this purpose.

---

### Pool

- To use multiple processes we need a multiprocessing **Pool**. 


- The Pool class provides a map method that runs a function as a separate process, passing arguments from a supplied iterable. 


- The iterable is divided into a number of chunks, so that each process gets roughly the same number of elements. 


- We will pass the list of URLs to the pool, which starts 8 new processes and use each one to download the images in parallel.

In [None]:
from multiprocessing import cpu_count
print("number of CPU cores:", cpu_count())

In [None]:
from functools import partial
from multiprocessing.pool import Pool

def multi_processes_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]

    # functools.partial makes a new version of a function 
    # with one or more fixed arguments
    download = partial(download_link, download_dir)
   
    with Pool(8) as p:
        p.map(download, links)
        
    print('Took {}s'.format(time() - ts))

multi_processes_download()

---

Although easy to implement, the parallelism bears some drawbacks:
- each process contains **a copy of the entire memory**
- it does not handle processes that depend on each other

Those issues can be tackled by shared memory and message passing mechanisms, which we will learn from later lessons.

## Using Threads

Threading is a well known approach to attaining concurrency: 
- typically threads are lighter weight than processes
- **lower memory requirements**, as **they share the same memory space**

A basic way to use threads is through `ThreadPoolExecutor` in `concurrent.futures`, which provides a similar interface to `multiprocessing.Pool`.

For more refined behavior will rely on the `Thread` class, which provides a `run` method that should be overridden with a method that does the actual work of the thread.

In [None]:
## Simple example with ThreadPoolExecutor

from functools import partial
from concurrent.futures import ThreadPoolExecutor

def multithreaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]

    download = partial(download_link, download_dir)
   
    with ThreadPoolExecutor(max_workers=8) as ex:
        ex.map(download, links)
        
    print('Took {}s'.format(time() - ts))

multithreaded_download()

### Thread Safety

- Variables in the program are shared by all the threads and should not be accessed the way you would normally access a variable. One thread may change the variable while another thread is reading it, or worse, two threads may try to update the variable at the same time. 


- This is known as a **race condition**, it is one of the leading sources of errors in threaded programs and needs to be addressed properly.



- A way to deal with thread safety is using the __Queue Class__

In [None]:
# Understanding Queue 
from queue import Queue

def do_work(q):
    while not q.empty():
        item = q.get()
        print(str(item)) 
        q.task_done()  # this is important when combining Queue with Threads

q = Queue() # FIFO queue

for i in range(20):
    q.put(i)

do_work(q)

A simpler example before going back to the image downloader code

In [None]:
# in this example each thread prints an element of the queue

from time import sleep
from queue import Queue
from threading import Thread
import logging  

# set up a logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.basicConfig(format='(%(threadName)-9s) %(message)s', level=logging.DEBUG)

def do_work(q):
    while True:
        item = q.get()
        logger.debug("e" + str(item) + ' ')
        print(str(item) + ' ')
        q.task_done()
        sleep(2)
    
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_work, args=(q,), name='thread_' + str(i))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

# now we have started 10 threads:

for i in range(50):
    q.put(i)

q.join() # wait untill all threads have finished

In [None]:
from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
    def __init__(self, queue):
        super(DownloadWorker, self).__init__()
        self.queue = queue
    
    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            (directory, link) = self.queue.get()
            # call the function donwload_link (from download.py)
            download_link(directory, link)
            self.queue.task_done()

            
def threaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]
    
    # Create a queue to communicate with the worker threads
    queue = Queue()
    
    # Create 8 worker threads
    for _ in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit 
        # even if the workers are blocking
        worker.daemon = True
        worker.start()

    
    # Put the tasks into the queue as a tuple
    for link in links:
        print('Queueing: {}'.format(link))
        queue.put((download_dir, link))
    
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    
    print('Took {}s'.format(time() - ts))

threaded_download()

## The Global Interpreter Lock
#### Not really parallel !

- Python has a **Global Interpreter Lock (GIL)**, which allows only **one thread to be executed at a time** throughout this process. Therefore, **this code is concurrent but not parallel**. 

- The reason it is still faster is because the image downloader is an input/output (I/O) bound task. 
The majority of the time is spent waiting for the network. This is why threading can provide a large speed increase. 

- **The processor can switch between the threads** whenever one of them is **ready** to do some work.



- If the program was performing a task that was CPU bound, using the threading module in Python or any other interpreted language with a GIL could actually result in reduced performance.

- For CPU bound tasks and truly parallel execution in Python, the multiprocessing module is a better option.

- Some parallelism is still possible with threads if the executed functions rely on low-level code that realeases the GIL (e.g. many Numpy/Scipy functions). This includes custom Cython programs (see the `nogil` keyword [here](https://cython.readthedocs.io/en/latest/src/userguide/parallelism.html) and [here](https://cython.readthedocs.io/en/latest/src/userguide/numpy_tutorial.html))

- Other packages for parallelization: task/job queues (e.g. [python-rq](https://python-rq.org/)), [joblib](https://joblib.readthedocs.io/en/latest/parallel.html), [dask](https://dask.org/)


### Example: sum of array elements in parallel

In [None]:
n = int(1e8)

In [None]:
# Sequential version
from time import time

ts = time()
s = 0
for i in range(n):
    s = s + i
print(s, '-->', time()-ts,'s')   

In [None]:
# multiprocessing version
from time import time
from multiprocessing.pool import Pool

from download import sum_multi_processes_1, sum_multi_processes_2

def sum_multi_processes_1_(chunk):
    y = 0
    for i in chunk:
        y = y + i
    return y


def sum_multi_processes_2_(start, end):
    y = 0
    for i in range(start, end):
        y = y + i
    return y

chunks1 = [list(range(i,i + 100)) for i in range(0, n, 100)]
chunks2 = [(i,i + 100) for i in range(0, n, 100)]

print(len(chunks1), 'chunks')

ts = time()
with Pool(8) as p:
     results = p.map(sum_multi_processes_1, chunks1)
#     results = p.starmap(sum_multi_processes_2, chunks2)

print(sum(results), '-->', time()-ts,'s')   

In [None]:
# Thread version
from queue import Queue
from threading import Thread
from threading import Lock

x = 0
lock = Lock()
def sum_chunk(q):
    while True:
        global x
        start, end = q.get()
        for i in range(start, end):
            with lock:  # force synchronization
                x = x + i
        q.task_done()

n = int(1e8)
chunks = [(i, i + 100) for i in range(0, n, 100)]

ts = time()
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=sum_chunk, args=(q, ))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

for chunk in chunks:
    q.put(chunk)

q.join()
print(x, '-->', time() - ts, 's')    

### Example: Pi Simulation

In [None]:
from download import monte_carlo_pi
import numpy as np

def monte_carlo_pi_(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(0, 1)
        y = np.random.uniform(0, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [None]:
%%time
result = [monte_carlo_pi(int(3e5)) for _ in range(10)]

In [None]:
np.array(result)

In [None]:
from multiprocessing.pool import Pool

In [None]:
%%time
with Pool(8) as pool:
    result = pool.map(monte_carlo_pi, [int(3e5) for _ in range(10)])

In [None]:
np.array(result)