# Concurrency in Python

￼Discuss preliminaries

## Threading

### The GIL
￼
On the Global Interpreter Lock Python 3.8.0 Alpha, [python/ceval_gil.h](https://github.com/python/cpython/blob/27e2d1f21975dfb8c0ddcb192fa0f45a51b7977e/Python/ceval_gil.h)

Prevents multiple threads from the same process from executing CPython bytecode simultaneously on separate cores.

Released under two conditions:
1. Holding thread is performing I/O
2. Another thread makes a gil_drop_request to the interpreter, which will (maybe) coordinate a GIL switch

In [None]:
import threading
import time

def hello_worker():
    for i in range(10):
        print('Hello')

def world_worker():
    for i in range(10):
        print('World')
    
threading.Thread(target=hello_worker).start()
threading.Thread(target=world_worker).start()

In [None]:
# Context-switching every n milliseconds. Chunked

In [None]:
def hello_worker():
    for i in range(10):
        time.sleep(0.01)        
        print('Hello')

def world_worker():
    for i in range(10):
        time.sleep(0.01)
        print('World')
    
threading.Thread(target=hello_worker).start()
threading.Thread(target=world_worker).start()

In [None]:
def hello_worker(n):
    print('Hello')
    time.sleep(1)
    print('Goodbye')
    
for i in range(4):
        threading.Thread(target=hello_worker(i)).start()
    
# Why is this synchronous???

In [None]:
# Evaluation in main thread. 

# Use lambda to create a closure...

for i in range(4):
    threading.Thread(target=lambda: hello_worker(i)).start()

In [None]:
# ... Or use the built-in threading parameter args:

for i in range(4):
    threading.Thread(target=hello_worker, args=(i,)).start()

In [None]:
def my_timeit(f):
    
    def timed(*args, **kw):
        ts = time.time()
        result = f(*args, **kw)
        te = time.time()
        print('func:%r args:[%r, %r] took: %2.4f sec' % (f.__name__, args, kw, te-ts))
        return result

    return timed

In [None]:
@my_timeit
def bad_fib(n):

    def recurse(n):
        if n == 0 or n == 1:
            return n
        else:
            return recurse(n-1) + recurse(n-2)
    print(recurse(n))

threading.Thread(target=bad_fib, args=(35,)).start()

In [None]:
for i in range(5):
    print('Started thread %r' %(i))
    threading.Thread(target=bad_fib, args=(35,)).start()

In [None]:
# Interesting thread-level scheduler thrashing. Behavior used to be A LOT worse ten years ago!

### Python & Kernel Threads

Python threads are not green. 1 Python thread = 1 OS thread.

### threading.Thread API

The threading interface in Python is similar to that of Java in that there are two main ways to write threads:
1. The functional (and pythonic) way seen above
2. By subclassing Thread, e.g.:

```
class MyThread(threading.Thread):

    def run(self):
        logging.debug('running')
        return

for i in range(5):
    t = MyThread()
    t.start()
```


### C Extensions

You can get "native" thread parallelism by writing a C extension.
The original plan here was to write a parallelized vector math C extension, but while writing this I realized it required too much prior knowledge in Cpython internals, C programming, and some parallel API that would render the exercise not useful given time constraints. However, I will post a full tutorial on my Github later this week on this topic.

### Further materials to study

1. David Beazley - [Understanding the Python GIL](https://www.youtube.com/watch?v=Obt-vMVdM8s)
2. Philip Guo's fun [CPython internals lecture series](http://www.pgbovine.net/cpython-internals.htm)
3. Larry Hastings - [Gilectomy](https://www.youtube.com/watch?v=P3AyI_u66Bw)

### Project Ideas

1. Write an echo server using `threading.Thread`
2. An intimidating but doable project (I promise!): Referencing just [RFC 1459](https://tools.ietf.org/html/rfc1459), implement an IRC server and IRC client using `threading.Thread`.
3. Extend the dummy C extension module to include a full suite of parallelized vector math

## Locks

Let's step away from Python for a bit to review some basic synchronization concepts.

The GIL is great in that we users get certain guarantees about the absence of deadlocks, race conditions in the garbage collector, etc. But if we're going to write our own threaded code, we might need to get into the business of locks...




In [None]:
# Basic race condition
        
class JobQueue:
    def __init__(self):
        self.nq = 0
        self.q = []

    def enqueue(self, job):
        self.nq += 1
        self.q.append(job)

    def dequeue(self):
        if self.q:
            self.nq -= 1
            return self.q.pop(-1)

class PrintJob:
    def __init__(self, s):
        self.s = s

    def work(self):
        #print(self.s)
        return self.s

j = JobQueue()

def producer():
    jobs = [PrintJob(str(i)) for i in range(10)]
    for i in range(100000):
        j.enqueue(jobs[i % 10])

def consumer():
    while j.q:
        j.dequeue().work()
        

t1 = threading.Thread(target=producer)
t1.start()
t2 = threading.Thread(target=producer)
t2.start()
t1.join()
t2.join()

t3 = threading.Thread(target=consumer)
t3.start()
t3.join()

print(len(j.q))
print(j.nq)

In [None]:
# Fixed!

class JobQueue:
    def __init__(self):
        self.nq = 0
        self.q = []
        self.lock = threading.Lock()

    def enqueue(self, job):
        with self.lock:
            self.nq += 1
            self.q.append(job)

    def dequeue(self):
        with self.lock:
            if self.q:
                self.nq -= 1
                return self.q.pop(-1)

class PrintJob:
    def __init__(self, s):
        self.s = s

    def work(self):
        #print(self.s)
        return self.s

j = JobQueue()

def producer():
    jobs = [PrintJob(str(i)) for i in range(10)]
    for i in range(100000):
        j.enqueue(jobs[i % 10])

def consumer():
    while j.q:
        j.dequeue().work()
        

t1 = threading.Thread(target=producer)
t1.start()
t2 = threading.Thread(target=producer)
t2.start()
t1.join()
t2.join()

t3 = threading.Thread(target=consumer)
t3.start()
t3.join()

print(len(j.q))
print(j.nq)

### Deadlock

Given locks L1 and L2, two threads T1 and T2.  

T1 tries to acquire L1 then L2. 
T2 tries to acquire L2 then L1.

1. T1 acquires L1.
2. T2 acquires L2.
3. T1 tries to acquire L2. Can't. Block.
4. T2 tries to acquire L1. Can't. Block.

### Coffman Conditions

Deadlocks can occur in a system iff:
1. Mutual exclusion: At least one resource must be held in a non-shareable mode. Otherwise, the processes would not be prevented from using the resource when necessary. Only one process can use the resource at any given instant of time.
2. Hold and wait or resource holding: a process is currently holding at least one resource and requesting additional resources which are being held by other processes.
3. No preemption: a resource can be released only voluntarily by the process holding it.
4. Circular wait: each process must be waiting for a resource which is being held by another process, which in turn is waiting for the first process to release the resource. In general, there is a set of waiting processes, P = {P1, P2, …, PN}, such that P1 is waiting for a resource held by P2, P2 is waiting for a resource held by P3 and so on until PN is waiting for a resource held by P1.

This suggests four distinct methods to guarantee deadlock-free systems:

1. No Mutual Exclusion: Have no non-sharable data. This can be achieved by reducing reliance on non-thread-local state, or by relying on non-mutable, read-only data (this will be discussed at length later)
2. No Hold and Wait: Prevent threads from requesting new locks if they already have one, or give up a lock if they need a new one (resource intensive)
3. Preemption: If a thread is hogging a resource, save its state and unlock the resource. Restore the state later on. 
4. No Circular Wait: One interesting technique here is to impose an ordering on all locks (L1, L2, ...) and modify code such that no thread can wait on a lock that has a lower ordinal position than all the locks they currently possess. One common ordering heuristic is the memory address of the mutex. Obviously, it is difficult to optimize performance while relying on such an ordering.

> And the problem with almost every published example I see on how to use locks is that they are way too simple. You've got one little resource, one lock to acquire, one to release... and because they show you a simple example it creates the illusion that locks are easy to use. But when you start to put them in larger systems outside of Operating Systems [college classes] you'll find that if you add enough locks it becomes insanely difficult to reason about your code.

Raymond Hettinger - [PyBay 2017 Keynote on Concurrency](https://www.youtube.com/watch?v=9zinZmE3Ogk)

### Further Material

1. Read the Little Book of Semaphores

### Exercise

1. Implement a solution to the [Dining Philosophers Problem](https://en.wikipedia.org/wiki/Dining_philosophers_problem) using just Locks.

## Queues

"Do not communicate by sharing memory; instead, share memory by communicating."

Thread-safe queues are one primitive that allow us to avoid mutual exclusion. Instead of locking down shared mutable state, threads/processes can communicate state via queues

In [None]:
'''
Instead of having the producer and consumer workers interact directly
with your state, we have a thread-safe message queue mediating.
'''
from queue import Queue

#This is the same as before
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1

c = Counter()

# Message Queue is now the interface to the Counter
message_queue = Queue()
def message_queue_worker():
    while True:
        command = message_queue.get()
        if command == 'increment':
            c.increment()


def producer():
    for i in range(100000):
        message_queue.put('increment')

        
mqw = threading.Thread(target=message_queue_worker)
mqw.start()

t1 = threading.Thread(target=producer)
t1.start()
t2 = threading.Thread(target=producer)
t2.start()

while True:
    print('Counter: %r' % c.count)
    time.sleep(1)

Simple concept which doesn't require much more commentary, but just want to emphasize how useful this paradigm is. This concept of "sharing memory by communicating" is not just useful for writing safe and easy-to-understand concurrent servers/clients, but is useful on the distributed system level. Queueing services like AWS SQS and RabbitMQ essentially are means to achieve the same design goals, but on a larger scale.

## Multiprocessing

Delicious parallelism. Obvious gains for CPU bound tasks. Python processes are however quite heavy. Not to be used for, say, handling a large number of client connections concurrently. 

For the working Python programmer: `multiprocessing` tends to be a bit more robust and feature complete than `threading`. The interface for `multiprocessing` is very similar to `threading` however. 

Processes are often interacted with via Pools. For example:

In [None]:
import multiprocessing

def bad_fib2(n):
    def recurse(n):
        if n == 0 or n == 1:
            return n
        else:
            return recurse(n-1) + recurse(n-2)
    print(recurse(n))
    
p = multiprocessing.Pool(1)

@my_timeit
def foo():
    p.map(bad_fib2, [35, 35, 35])

foo()

In [None]:
p = multiprocessing.Pool(3)

@my_timeit
def foo():
    p.map(bad_fib2, [35, 35, 35])

foo()
#Check htop...

For synchronization, `multiprocessing` contains `Lock`s and shared memory objects like `Value` and `Array`. I would maintain though that interprocess state is best implemented via `multiprocessing.Queue` (and similarly via `multiprocessing.Pipe` which produces a pair of reading/writing sockets)

In [None]:
'''
Problem: Count the number of appearances of words in a set of documents,
leveraging a multiprocessing pipeline
'''
from bs4 import BeautifulSoup
from collections import Counter
import requests

urls = ['https://en.wikipedia.org/wiki/MapReduce',
        'https://en.wikipedia.org/wiki/Computer_cluster',
        'https://en.wikipedia.org/wiki/Operating_system',
        'https://en.wikipedia.org/wiki/Linux',
        'https://en.wikipedia.org/wiki/GNU_General_Public_License',
        'https://en.wikipedia.org/wiki/End_user',
        'https://en.wikipedia.org/wiki/Light-emitting_diode',
        'https://en.wikipedia.org/wiki/James_R._Biard'
       ]

def scrape_one(url):
    words = Counter()
    page = requests.get(url)
    soup = BeautifulSoup(page.text)
    ps = soup.find_all('p')
    for p in ps:
        words.update(p.text.split(' '))
    return words

@my_timeit
def scrape_all_and_merge(urls):
    words = Counter()
    for url in urls:
        words.update(scrape_one(url))
    return words

scrape_all_and_merge(urls)

In [None]:
from collections import Counter

def scrape_all_and_merge(urls):
    words = Counter()
    for url in urls:
        words.update(scrape_one(url))
    return words

n = 4
pool = multiprocessing.Pool(n)

def split_chunk(l, n):
    #Split l into a list of n lists
    step = len(l) // n
    return [l[i*step:(i+1)*step:] for i in range(n)]

split_urls = split_chunk(urls, n)

@my_timeit
def foo():
    words = Counter()
    counter_list = pool.map(scrape_all_and_merge, split_urls)
    for c in counter_list:
        words.update(c)
    print(words)

foo()

#Generalize into a recursive structure of processing and reduction.