## Lecture: Python Concurrency
### March 31, 2020  

Part of lecture is based on [https://nyu-cds.github.io/python-concurrency/](https://nyu-cds.github.io/python-concurrency/)

---

**Main goal to explore techniques for improving performance by introducing different forms of concurrency.**


---

For this you will need to:

- create an account in [imgur.com](https://imgur.com/)
- register your application [here](https://api.imgur.com/oauth2/addclient)
  - Authorization Type: __OAuth 2 authorization with a callback URL__
  - Authorization Callback URL: __https://www.getpostman.com/oauth2/callback__
  - email:
  - Description:
  

---
The functions below fetchs a list of images and download them __imgur__ repository: 
[https://imgur.com/](https://imgur.com/)

- We will start with a version that downloads images sequentially, or one at a time

- Then improve the performance by introducing multiprocessing and threading

---
We will split the functionality into three separate functions:
- get_links
- download_link
- setup_download_dir

In [16]:
import json
import os
from pathlib import Path
from urllib.request import urlopen, Request
from time import time

types = {'image/jpeg', 'image/png', 'image/gif'}

def get_links(client_id):
    '''used to obtain a list of available images'''
    headers = {'Authorization': 'Client-ID {}'.format(client_id)}
    req = Request('https://api.imgur.com/3/gallery/random/random/', headers=headers, method='GET')
    
    with urlopen(req) as response:
        data = json.loads(response.read().decode('utf-8'))
    return [item['link'] for item in data['data'] if 'type' in item and item['type'] in types]

def download_link(directory, link):
    '''downloads the image given by the URL _link_ into _directory_'''
    download_path = directory / os.path.basename(link)
    with urlopen(link) as image, download_path.open('wb') as f:
        f.write(image.read())

def setup_download_dir():
    '''creates a download destination directory if it doesn’t already exist'''
    download_dir = Path('images')
    if not download_dir.exists():
        download_dir.mkdir()
    return download_dir

In [28]:
# 'replace with your client ID'
CLIENT_ID = '49142d088194030'

ts = time()
download_dir = setup_download_dir()

links = [l for l in get_links(CLIENT_ID)]
print(links)

for i, link in enumerate(links):
    print("%2d %s" % (i, link))
    download_link(download_dir, link)

print('Took {}s'.format(time() - ts))



['https://i.imgur.com/xBtu4J1.gif', 'https://i.imgur.com/xBkxENb.gif', 'http://i.imgur.com/xBVWwgEh.gif', 'https://i.imgur.com/xBNzJtj.gif', 'http://i.imgur.com/xB1BOxCh.gif', 'https://i.imgur.com/xBGuEtH.jpg', 'https://i.imgur.com/xBeJF4y.jpg', 'https://i.imgur.com/xBKgJLH.png', 'https://i.imgur.com/xBSPB.jpg', 'https://i.imgur.com/xBwQkxb.jpg', 'https://i.imgur.com/xBDxGoH.jpg', 'https://i.imgur.com/xBe1Mj9.gif', 'https://i.imgur.com/xBsxskT.jpg', 'https://i.imgur.com/xBNkoqC.png', 'http://i.imgur.com/xBdUFUhh.gif', 'https://i.imgur.com/xBu75eF.png', 'https://i.imgur.com/xBracyt.jpg', 'https://i.imgur.com/xBp23.png', 'https://i.imgur.com/xBMHHHQ.jpg', 'https://i.imgur.com/xBWjSnM.png', 'https://i.imgur.com/xBGtJKl.jpg', 'https://i.imgur.com/xBerQPk.jpg', 'http://i.imgur.com/xB36aTMh.gif', 'https://i.imgur.com/xBwPM.jpg']
 0 https://i.imgur.com/xBtu4J1.gif
 1 https://i.imgur.com/xBkxENb.gif
 2 http://i.imgur.com/xBVWwgEh.gif
 3 https://i.imgur.com/xBNzJtj.gif
 4 http://i.imgur.com/xB1

---

- To improve the performance of the image downloader we can run **multiple copies** of the program at the same time. 


- However, we would need to know what images are available so that we could ensure that one process didn’t download an image that had already been downloaded by a different process.  


- Fortunately the multiprocessing module is available for this purpose.

---

### Pool

- To use multiple processes we need a multiprocessing **Pool**. 


- The Pool class provides a map method that runs a function as a separate process, passing arguments from a supplied iterable. 


- The iterable is divided into a number of chunks, so that each process gets roughly the same number of elements. 


- We will pass the list of URLs to the pool, which starts 8 new processes and use each one to download the images in parallel.

In [18]:
from multiprocessing import cpu_count
print("number of CPU cores:", cpu_count())

number of CPU cores: 4


In [19]:
from functools import partial
from multiprocessing.pool import Pool

def multi_processes_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]

    # functools.partial makes a new version of a function 
    # with one or more fixed arguments
    download = partial(download_link, download_dir)
   
    with Pool(8) as p:
        p.map(download, links)
        
    print('Took {}s'.format(time() - ts))

multi_processes_download()

Took 2.773810863494873s


---

Although easy to implement, the parallelism bears some drawbacks:
- each process contains **a copy of the entire memory**
- it does not handle processes that depend on each other

Those issues can be tackled by shared memory and message passing mechanisms, which we will learn from later lessons.

---
## Using Threads

Threading is a well known approach to attaining concurrency: 
- typically threads are lighter weight than processes
- **lower memory requirements**, as **they share the same memory space**

We will relay on __Thread class__, which provides a run method that should be overridden with a method that does the actual work of the thread.

### Thread Safety

- Variables in the program are shared by all the threads and cannot be accessed the way you would normally access a variable. One thread may change the variable while another thread is reading it, or worse, two threads may try to update the variable at the same time. 


- This is known as a **race condition**, it is one of the leading sources of errors in threaded programs and needs to be addressed properly.



- A way to deal with thread safety is using the __Queue Class__

In [20]:
# Understanding Queue 
from queue import Queue

def do_work(q):
    while not q.empty():
        item = q.get()
        print(str(item)) 
        q.task_done()  # this is important when combining Queue with Threads

q = Queue() # FIFO queue

for i in range(20):
    q.put(i)

do_work(q)

0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19


---
A simpler example before going back to the image downloader code

In [21]:
from time import sleep

In [22]:
# in this example each thread prints an element of the queue

from queue import Queue
from threading import Thread
import logging  

logging.basicConfig(format='(%(threadName)-9s) %(message)s',)

def do_work(q):
    while True:
        item = q.get()
        logging.debug("e" + str(item) + ' ')
        print(str(item) + ' ')
        q.task_done()
        sleep(2)
           
    
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=do_work, args=(q,), name='thread_' + str(i))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

# now we have started 10 threads:

for i in range(50):
    q.put(i)

q.join() # wait untill all threads have finished

0 1 2 3 

4 

5 
6 
7 
8 
9 

10 11 12 13 14 


15 

16 
17 18 19 



20 21 

22 23 24 25 26 27 





28 29 

30 31 
32 33 34 



35 36 37 38 
39 



40 41 42 43 

44 
45 
46 


47 48 

49 


In [23]:
from queue import Queue
from threading import Thread

class DownloadWorker(Thread):
    def __init__(self, queue):
        Thread.__init__(self)
        self.queue = queue
    
    def run(self):
        while True:
            # Get the work from the queue and expand the tuple
            (directory, link) = self.queue.get()
            # call the function donwload_link (previously defined)
            download_link(directory, link)
            self.queue.task_done()

            
def threaded_download():
    ts = time()
    download_dir = setup_download_dir()
    links = [l for l in get_links(CLIENT_ID)]
    
    
    # Create a queue to communicate with the worker threads
    queue = Queue()
    
    # Create 8 worker threads
    for _ in range(8):
        worker = DownloadWorker(queue)
        # Setting daemon to True will let the main thread exit 
        # even though the workers are blocking
        worker.daemon = True
        worker.start()

    
    # Put the tasks into the queue as a tuple
    for link in links:
        print('Queueing: {}'.format(link))
        queue.put((download_dir, link))
    
    # Causes the main thread to wait for the queue to finish processing all the tasks
    queue.join()
    
    print('Took {}'.format(time() - ts))

threaded_download()

Queueing: https://i.imgur.com/JNZDHzh.png
Queueing: https://i.imgur.com/JNY1o.jpg
Queueing: https://i.imgur.com/JNQBYYf.gif
Queueing: http://i.imgur.com/JNvNYZ0h.gif
Queueing: https://i.imgur.com/JNAta.jpg
Queueing: https://i.imgur.com/JN9gfnS.jpg
Queueing: https://i.imgur.com/JNVjl.jpg
Queueing: https://i.imgur.com/JN7QO9j.jpg
Queueing: https://i.imgur.com/JNXsyZj.png
Queueing: https://i.imgur.com/JNIrcCZ.gif
Queueing: http://i.imgur.com/JN4iUhwh.gif
Queueing: https://i.imgur.com/JN5qJhO.gif
Queueing: https://i.imgur.com/JN7KYfy.jpg
Queueing: https://i.imgur.com/JN6sYGV.gif
Queueing: https://i.imgur.com/JNlTxyE.jpg
Queueing: https://i.imgur.com/JNq3x.jpg
Queueing: http://i.imgur.com/JNoeRUYh.gif
Queueing: https://i.imgur.com/JNh5gMX.jpg
Queueing: https://i.imgur.com/JNDEkJg.jpg
Queueing: https://i.imgur.com/JNgAtzT.jpg
Queueing: http://i.imgur.com/JN9xP7Xh.gif
Queueing: https://i.imgur.com/JNpFVMb.jpg
Queueing: https://i.imgur.com/JNOqO.jpg
Queueing: http://i.imgur.com/JNQI5GOh.gif
Qu

---

#### Not really parallel !

- Python  has a **Global Interpreter Lock (GIL)**, which makes **one thread to be executed at a time** throughout this process. Therefore, **this code is concurrent but not parallel**. 

- The reason it is still faster is because the image downloader is an input/output (I/O) bound task. 
The majority of the time is spent waiting for the network. This is why threading can provide a large speed increase. 

- **The processor can switch between the threads** whenever one of them is **ready** to do some work.



- If the program was performing a task that was CPU bound, using the threading module in Python or any other interpreted language with a GIL could actually result in reduced performance. 

- For CPU bound tasks and truly parallel execution in Python, the multiprocessing module is a better option.

---

### Sum of array elements in parallel

In [24]:

n = int(1e6)

In [25]:
# Sequencial version
from time import time

ts = time()
s = 0
for i in range(n):
    s = s + i
print(s, '-->', time()-ts,'ms')   

499999500000 --> 0.27997374534606934 ms


In [29]:
# multiprocessor version
from time import time
from multiprocessing.pool import Pool


def sum_multi_processes(chunk):
    y = 0
    for i in chunk:
        y = y + i
    return(y)

chunks = [list(range(i,i + 100)) for i in range(0, n, 100)]

ts = time()
with Pool(10) as p:
     results = p.map(sum_multi_processes, chunks)

print(sum(results), '-->', time()-ts,'ms')   

499999500000 --> 0.21544790267944336 ms


In [33]:
# Thread version
from queue import Queue
from threading import Thread
from threading import Lock

x = 0
def sum_chunk(q):
    while True:
        global x
        chunk = q.get()
        for i in chunk:
            with Lock():  # force synchronization
                x = x + i
        q.task_done()

n = int(1e6)
chunks = [list(range(i, i + 100)) for i in range(0, n, 100)]

ts = time()
q = Queue()
num_threads = 10

for i in range(num_threads):
    worker = Thread(target=sum_chunk, args=(q, ))
    worker.setDaemon(True) # this stop the threads when the program quits  
    worker.start()         # start the threads

for chunk in chunks:
    q.put(chunk)

q.join()
print(x, '-->', time() - ts, 'ms')    

491981097441
0
494988753057 --> 0.9030847549438477 ms


### Example: Pi Simultation

In [22]:
import numpy as np

def monte_carlo_pi(n):
    s = 0
    for i in range(n):
        x = np.random.uniform(0, 1)
        y = np.random.uniform(0, 1)
        if (x**2 + y**2) < 1:
            s += 1
    return 4*s/n

In [23]:
%%time
result = [monte_carlo_pi(int(1e6)) for _ in range(10)]

CPU times: user 13.3 s, sys: 0 ns, total: 13.3 s
Wall time: 13.3 s


In [24]:
np.array(result)

array([3.140836, 3.139644, 3.141024, 3.142308, 3.142552, 3.14224 ,
       3.141252, 3.141348, 3.142184, 3.14512 ])

In [25]:
from multiprocessing.pool import Pool

In [26]:
%%time
with Pool(8) as pool:
    result = pool.map(monte_carlo_pi, [int(1e6) for _ in range(10)])

CPU times: user 42.9 ms, sys: 24.8 ms, total: 67.7 ms
Wall time: 5.27 s


In [27]:
np.array(result)

array([3.14078 , 3.14078 , 3.14078 , 3.14078 , 3.14078 , 3.14078 ,
       3.14078 , 3.14078 , 3.141616, 3.141616])