## Lecture 9: Python Concurrency
### March 29, 2021

Partly based on [https://nyu-cds.github.io/python-concurrency/](https://nyu-cds.github.io/python-concurrency/)

**Notes**: project office hours Tuesdays at 4pm ET (send me an email beforehand: alberto.bietti@nyu.edu)


## Improving performance by using concurrency

Concurrency vs parallelism:

    Concurrency is about dealing with lots of things at once. Parallelism is about doing lots of things at once.
    
[source](https://medium.com/@itIsMadhavan/concurrency-vs-parallelism-a-brief-review-b337c8dac350)

We will illustrate some benefits of concurrency with a program downloading images from the `imgur.com` website.

For this you will need to:

- create an account in [imgur.com](https://imgur.com/)
- register your application [here](https://api.imgur.com/oauth2)
  - Authorization Type: __OAuth 2 authorization with a callback URL__
  - Authorization Callback URL: __https://www.getpostman.com/oauth2/callback__
  - email:
  - Description:
  

---
The functions below fetchs a list of images and download them __imgur__ repository: 
[https://imgur.com/](https://imgur.com/)

- We will start with a version that downloads images sequentially, or one at a time

- Then improve the performance by introducing multiprocessing and threading

---
We will split the functionality into three separate functions, see the file `download.py`
- get_links
- download_link
- setup_download_dir

In [1]:
from time import time

# 'replace with your client ID'
CLIENT_ID = '50822086a2341d8'
from download import setup_download_dir, get_links, download_link

ts = time()
download_dir = setup_download_dir()

links = [l for l in get_links(CLIENT_ID)]

for i, link in enumerate(links):
    print("%2d %s" % (i, link))
    download_link(download_dir, link)

print('Took {}s'.format(time() - ts))

 0 https://i.imgur.com/9ALnD.jpg
 1 https://i.imgur.com/9ArWNox.jpg
 2 https://i.imgur.com/9AlFOqV.jpg
 3 https://i.imgur.com/9ACFJYB.jpg
 4 https://i.imgur.com/9AHUrVG.jpg
 5 https://i.imgur.com/9AHOzTK.jpg
 6 https://i.imgur.com/9Amoq.jpg
 7 https://i.imgur.com/9AsBCoj.jpg
 8 https://i.imgur.com/9AXDF0P.jpg
 9 https://i.imgur.com/9A4fKBJ.gif
10 https://i.imgur.com/9AvLR.jpg
11 https://i.imgur.com/9ALmwVk.gif
12 https://i.imgur.com/9ADihLY.jpg
13 https://i.imgur.com/9ACTVtT.jpg
14 https://i.imgur.com/9AtMB0s.gif
15 http://i.imgur.com/9A5BJsBh.gif
16 https://i.imgur.com/9ACdB.png
17 https://i.imgur.com/9AK3G93.jpg
18 https://i.imgur.com/9AoiU3y.gif
19 https://i.imgur.com/9AqNcuG.png
20 https://i.imgur.com/9ACyOCD.jpg
21 https://i.imgur.com/9AY3pq9.jpg
22 https://i.imgur.com/9AIVv.jpg
Took 194.42975997924805s


In [2]:
ls images/

9A4fKBJ.gif   9Amoq.jpg     IvZNuLg.jpg   J4g6ug2h.gif  azULFoo.jpg
9A5BJsBh.gif  9AoiU3y.gif   IvazJRp.png   J4hg9CC.jpg   azWE4hD.jpg
9ACFJYB.jpg   9AqNcuG.png   IvbUjcEh.gif  J4hoGbf.jpg   azZR9.jpg
9ACTVtT.jpg   9ArWNox.jpg   Ivdz1rth.gif  J4qReSY.jpg   azeC62o.jpg
9ACdB.png     9AsBCoj.jpg   IvokFq8.png   J4rbZt5.jpg   azes8fQ.jpg
9ACyOCD.jpg   9AtMB0s.gif   IvpfxS4.png   J4tqSLC.png   azetv.jpg
9ADihLY.jpg   9AvLR.jpg     IvvChbv.gif   J4w7bVoh.gif  azt63.gif
9AHOzTK.jpg   Iv3OUnqh.gif  J44Onuq.jpg   az1UN1c.jpg   azt9ishh.gif
9AHUrVG.jpg   Iv4qYBg.gif   J45OP.jpg     az7Sl.jpg     azuiejoh.gif
9AIVv.jpg     Iv7Tjqrh.gif  J47GJ.gif     azEyCEN.jpg   azweD1j.gif
9AK3G93.jpg   Iv8DGxTh.gif  J49NzF7.jpg   azJ920Sh.gif  azzbqMX.jpg
9ALmwVk.gif   Iv8T0NE.png   J4CRz.jpg     azKTWcU.png   azzeIPN.gif
9ALnD.jpg     IvO9PnT.gif   J4E9X.png     azNqWBp.jpg
9AXDF0P.jpg   IvQxaaq.gif   J4ReuDr.jpg   azNuM.jpg
9AY3pq9.jpg   IvTgFJu.png   J4YKCCY.jpg   azODl.jpg
9AlFOqV.jpg   I

---

- To improve the performance of the image downloader we can run **multiple copies** of the program at the same time. 


- However, we would need to know what images are available so that we could ensure that one process didnâ€™t download an image that had already been downloaded by a different process.  


- Fortunately the multiprocessing module is available for this purpose.

---

### Pool

- To use multiple processes we need a multiprocessing **Pool**. 


- The Pool class provides a map method that runs a function as a separate process, passing arguments from a supplied iterable. 


- The iterable is divided into a number of chunks, so that each process gets roughly the same number of elements. 


- We will pass the list of URLs to the pool, which starts 8 new processes and use each one to download the images in parallel.

In [3]:
from multiprocessing import cpu_count
print("number of CPU cores:", cpu_count())

number of CPU cores: 8


In [4]:
from functools import partial
from multiprocessing.pool import Pool

def multi_processes_download():
    ts = time()
    download_dir = setup_download_dir()
    links = get_links(CLIENT_ID)

    # functools.partial makes a new version of a function 
    # with one or more fixed arguments
    download = partial(download_link, download_dir)
    # the same way as:
#     download = lambda link: download_link(download_dir, link)
   
    # the same way as:
#     p = pool(8)
    # do something
#     p.close()

    with Pool(8) as p:
        p.map(download, links)
        
    print('Took {}s'.format(time() - ts))

multi_processes_download()

Took 101.27113175392151s


---

Although easy to implement, the parallelism bears some drawbacks:
- each process contains **a copy of the entire memory**
- it does not handle processes that depend on each other

Those issues can be tackled by shared memory and message passing mechanisms, which we will learn from later lessons.

## Using Threads

Threading is a well known approach to attaining concurrency: 
- typically threads are lighter weight than processes
- **lower memory requirements**, as **they share the same memory space**

A basic way to use threads is through `ThreadPoolExecutor` in `concurrent.futures`, which provides a similar interface to `multiprocessing.Pool`.

For more refined behavior will rely on the `Thread` class, which provides a `run` method that should be overridden with a method that does the actual work of the thread.

In [5]:
## Simple example with ThreadPoolExecutor

from functools import partial
from concurrent.futures import ThreadPoolExecutor

In [6]:
def multithreaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = get_links(CLIENT_ID)

    
   
    with ThreadPoolExecutor(max_workers=8) as ex:
        ex.map(download, links)
        # why this is called future ?
#         future = ex.submit(download, links[0])
#         future.result() # before this, future isn't executed!
        
    print('Took {}s'.format(time() - ts))

multithreaded_download()

Took 79.67692017555237s


### Thread Safety

- Variables in the program are shared by all the threads and should not be accessed the way you would normally access a variable. One thread may change the variable while another thread is reading it, or worse, two threads may try to update the variable at the same time. 


- This is known as a **race condition**, it is one of the leading sources of errors in threaded programs and needs to be addressed properly.



- A way to deal with thread safety is using the __Queue Class__

In [7]:
# Understanding Queue 
from queue import Queue

def do_work(q):
    while not q.empty():
        item = q.get()
        print(str(item)) 
        q.task_done()  # this is important when combining Queue with Threads

q = Queue() # FIFO queue

for i in range(20):
    q.put(i)

do_work(q)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


A simpler example before going back to the image downloader code

In [8]:
# in this example each thread prints an element of the queue

from time import sleep
from queue import Queue
from threading import Thread
import logging  

# set up a logger
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logging.basicConfig(format='(%(threadName)-9s) %(message)s', level=logging.DEBUG)

def do_work(q):
    while True:
        item = q.get()
        logger.debug("e" + str(item) + ' ')
        print(str(item) + ' ')
        q.task_done()
        sleep(2)
    
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_work, args=(q,), name='thread_' + str(i))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

# now we have started 10 threads:

for i in range(50):
    q.put(i)

q.join() # wait untill all threads have finished

(thread_1 ) e0 
(thread_2 ) e1 
(thread_3 ) e2 
(thread_4 ) e3 
(thread_5 ) e4 
(thread_6 ) e5 
(thread_7 ) e6 
(thread_8 ) e7 
(thread_9 ) e8 
(thread_0 ) e9 


0 
1 
2 
3 
4 
5 
6 
7 
8 
9 


(thread_1 ) e10 
(thread_3 ) e11 
(thread_2 ) e12 
(thread_4 ) e13 
(thread_5 ) e14 
(thread_6 ) e15 
(thread_7 ) e16 
(thread_8 ) e17 
(thread_9 ) e18 
(thread_0 ) e19 


10 
11 
12 
13 
14 
15 
16 
17 
18 
19 


(thread_1 ) e20 
(thread_3 ) e21 
(thread_2 ) e22 
(thread_4 ) e23 
(thread_5 ) e24 
(thread_6 ) e25 
(thread_7 ) e26 
(thread_9 ) e27 
(thread_8 ) e28 
(thread_0 ) e29 


20 
21 
22 
23 
24 
25 
26 
27 
28 
29 


(thread_1 ) e30 
(thread_3 ) e31 
(thread_2 ) e32 
(thread_5 ) e33 
(thread_4 ) e34 
(thread_6 ) e35 
(thread_7 ) e36 
(thread_9 ) e37 
(thread_8 ) e38 
(thread_0 ) e39 


30 
31 
32 
33 
34 
35 
36 
37 
38 
39 


(thread_1 ) e40 
(thread_3 ) e41 
(thread_5 ) e42 
(thread_2 ) e43 
(thread_7 ) e44 
(thread_4 ) e45 
(thread_6 ) e46 
(thread_9 ) e47 
(thread_0 ) e48 
(thread_8 ) e49 


40 
41 
42 
43 
44 
45 
46 
47 
48 
49 


In [9]:
from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
    def __init__(self, queue):
        super(DownloadWorker, self).__init__()
        self.queue = queue
    
    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            (directory, link) = self.queue.get()
            # call the function donwload_link (from download.py)
            download_link(directory, link)
            self.queue.task_done()

            
def threaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]
    
    # Create a queue to communicate with the worker threads
    queue = Queue()
    
    # Create 8 worker threads
    for _ in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit 
        # even if the workers are blocking
        worker.daemon = True
        worker.start()

    
    # Put the tasks into the queue as a tuple
    for link in links:
        print('Queueing: {}'.format(link))
        queue.put((download_dir, link))
    
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    
    print('Took {}s'.format(time() - ts))

threaded_download()

(MainThread) Starting new HTTPS connection (1): api.imgur.com:443
(MainThread) https://api.imgur.com:443 "GET /3/gallery/random/random/ HTTP/1.1" 200 21248
(Thread-11) Starting new HTTP connection (1): i.imgur.com:80
(Thread-13) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-9 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-12) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-14) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-10) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-8 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-7 ) Starting new HTTPS connection (1): i.imgur.com:443


Queueing: https://i.imgur.com/kKPnun1.jpg
Queueing: https://i.imgur.com/kKgxWqu.png
Queueing: https://i.imgur.com/kKQ9hGK.jpg
Queueing: https://i.imgur.com/kKzCA5N.gif
Queueing: https://i.imgur.com/kKfpW.jpg
Queueing: http://i.imgur.com/kKRZCONh.gif
Queueing: https://i.imgur.com/kKVld.jpg
Queueing: https://i.imgur.com/kK3ja.jpg
Queueing: https://i.imgur.com/kK4E1kK.jpg
Queueing: https://i.imgur.com/kK1s9TQ.gif
Queueing: https://i.imgur.com/kKeiYLQ.gif
Queueing: https://i.imgur.com/kK0RP0b.jpg
Queueing: https://i.imgur.com/kK2n3xm.jpg
Queueing: https://i.imgur.com/kKoDntm.jpg
Queueing: https://i.imgur.com/kKXDGro.png
Queueing: https://i.imgur.com/kKKGl.jpg
Queueing: https://i.imgur.com/kKxxhLn.gif
Queueing: https://i.imgur.com/kKuVG.png


(Thread-11) http://i.imgur.com:80 "GET /kKRZCONh.gif HTTP/1.1" 301 0
(Thread-11) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-9 ) https://i.imgur.com:443 "GET /kKPnun1.jpg HTTP/1.1" 200 48995
(Thread-7 ) https://i.imgur.com:443 "GET /kKVld.jpg HTTP/1.1" 200 198560
(Thread-10) https://i.imgur.com:443 "GET /kKQ9hGK.jpg HTTP/1.1" 200 48550
(Thread-8 ) https://i.imgur.com:443 "GET /kKfpW.jpg HTTP/1.1" 200 90577
(Thread-14) https://i.imgur.com:443 "GET /kK3ja.jpg HTTP/1.1" 200 126619
(Thread-13) https://i.imgur.com:443 "GET /kKgxWqu.png HTTP/1.1" 200 183926
(Thread-12) https://i.imgur.com:443 "GET /kKzCA5N.gif HTTP/1.1" 200 12953308
(Thread-10) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-9 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-8 ) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-14) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-13) Starting new HTTPS connection (1): i.imgur.com:443
(Thread-7 ) Starting new HTTPS conn

Took 55.426631927490234s


## The Global Interpreter Lock
#### Not really parallel !

- Python has a **Global Interpreter Lock (GIL)**, which allows only **one thread to be executed at a time** throughout this process. Therefore, **this code is concurrent but not parallel**. 

- The reason it is still faster is because the image downloader is an input/output (I/O) bound task. 
The majority of the time is spent waiting for the network. This is why threading can provide a large speed increase. 

- **The processor can switch between the threads** whenever one of them is **ready** to do some work.



- If the program was performing a task that was CPU bound, using the threading module in Python or any other interpreted language with a GIL could actually result in reduced performance.

- For CPU bound tasks and truly parallel execution in Python, the multiprocessing module is a better option.

- Some parallelism is still possible with threads if the executed functions rely on low-level code that realeases the GIL (e.g. many Numpy/Scipy functions). This includes custom Cython programs (see the `nogil` keyword [here](https://cython.readthedocs.io/en/latest/src/userguide/parallelism.html) and [here](https://cython.readthedocs.io/en/latest/src/userguide/numpy_tutorial.html))

- Other packages for parallelization: task/job queues (e.g. [python-rq](https://python-rq.org/)), [joblib](https://joblib.readthedocs.io/en/latest/parallel.html), [dask](https://dask.org/)


### Example: sum of array elements in parallel

In [10]:
n = int(1e8)

In [11]:
# Sequential version
from time import time

ts = time()
s = 0
for i in range(n):
    s = s + i
print(s, '-->', time()-ts,'s')   

4999999950000000 --> 9.110804080963135 s


In [12]:
# multiprocessing version
from time import time
from multiprocessing.pool import Pool

from download import sum_multi_processes_1, sum_multi_processes_2

def sum_multi_processes_1_(chunk):
    y = 0
    for i in chunk:
        y = y + i
    return y


def sum_multi_processes_2_(start, end):
    y = 0
    for i in range(start, end):
        y = y + i
    return y

chunks1 = [list(range(i,i + 100)) for i in range(0, n, 100)]
chunks2 = [(i,i + 100) for i in range(0, n, 100)]

print(len(chunks1), 'chunks')

ts = time()
with Pool(8) as p:
     results = p.map(sum_multi_processes_1, chunks1)
#     results = p.starmap(sum_multi_processes_2, chunks2)

print(sum(results), '-->', time()-ts,'s')   

1000000 chunks
4999999950000000 --> 7.163640737533569 s


In [13]:
# Thread version
from queue import Queue
from threading import Thread
from threading import Lock

x = 0
lock = Lock()
def sum_chunk(q):
    while True:
        global x
        start, end = q.get()
        for i in range(start, end):
            with lock:  # force synchronization
                x = x + i
        q.task_done()

n = int(1e8)
chunks = [(i, i + 100) for i in range(0, n, 100)]

ts = time()
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=sum_chunk, args=(q, ))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

for chunk in chunks:
    q.put(chunk)

q.join()
print(x, '-->', time() - ts, 's')    

4999999950000000 --> 42.91472411155701 s


### Example: Pi Simulation

In [1]:
from download import monte_carlo_pi
import numpy as np

def monte_carlo_pi_(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(0, 1)
        y = np.random.uniform(0, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [2]:
%%time
result = [monte_carlo_pi(int(3e5)) for _ in range(10)]

CPU times: user 13.1 s, sys: 18.4 ms, total: 13.2 s
Wall time: 13.2 s


In [3]:
np.array(result)

array([3.13942667, 3.14046667, 3.14348   , 3.14836   , 3.14108   ,
       3.14306667, 3.14144   , 3.139     , 3.13890667, 3.13962667])

In [4]:
from multiprocessing.pool import Pool

In [5]:
%%time
with Pool(8) as pool:
    result = pool.map(monte_carlo_pi, [int(3e5) for _ in range(10)])

CPU times: user 28.9 ms, sys: 57.8 ms, total: 86.6 ms
Wall time: 4.85 s


In [6]:
np.array(result)

array([3.14409333, 3.13914667, 3.14488   , 3.13768   , 3.14605333,
       3.13893333, 3.14205333, 3.14314667, 3.15153333, 3.14897333])