![Py4Eng](img/logo.png)

# Concurrency
## Yoav Ram

# Threading

Threads are very useful for maintaining multiple program flows running (quasi-)simultaneously. 
In Python, threads are real system threads and are managed by the operating system.

But CPython, the standard Python implementation, is not thread-safe, so the [Global Interperter Lock (**GIL**)](http://www.dabeaz.com/GIL) allows only one thread to execute at any given time. Therefore, the main benefit from threading is that one waiting job (I/O, sleep, waiting for user event) doesn't block other jobs from running. Or as the saying goes, threads are good for doing nothing: waiting mostly.

Let's start with a simple example: a worker thread that counts from 1 to 10, waiting one second between numbers, but doesn't block the main thread that counts from 11 to 20 (also waiting). 

We use the [threading](https://docs.python.org/3.5/library/threading.html) module from the standard library.

In [1]:
import threading
import time

In [2]:
def create_counting_task(start, end):
    def task():
        for i in range(start, end):
            print(" ", i, end=" ")
            time.sleep(1)
    return task

In [3]:
main_task = create_counting_task(11, 21)
worker_task = create_counting_task(1, 11)
worker = threading.Thread(target=worker_task)
worker.start()
main_task()
worker.join()

  1   11    2  12    3  13   4   14   5   15   6   16   7   17   8   18   9   19   10   20 

# Concurrency in I/O bounded programs

When we do any kind of I/O, the GIL is released as soon as control is given to the OS or to lower-level C code. So threads are great for concurrency in I/O bounded programs, because as one thread waits for I/O, other threads can go on doing their jobs, as the GIL is released. This is true as long as I/O is not very quick and there are not too many concurrent jobs; if there are many concurrent short jobs, they will start a [GIL war](http://www.dabeaz.com/GIL/), which is bad for performence.

Let's start with a synchrounous program that reads books from the Gutenberg project and finds the most common word. Finding the most common word takes a while, but a lot less than reading the data from the web, so this is definately an I/O-bounded program.

In [2]:
from collections import Counter
import urllib.request
import time

We download and parse a [set of stop words](https://github.com/Alir3z4/stop-words/raw/25c6a0aea665871e887f155b883e950c3743ce50/english.txt) not to be included in the analysis:

In [6]:
stop_words_url = 'https://github.com/Alir3z4/stop-words/raw/25c6a0aea665871e887f155b883e950c3743ce50/english.txt'
with urllib.request.urlopen(stop_words_url) as f:
    stop_words = frozenset(line.decode().strip() for line in f)
print(list(stop_words)[:5])

["i'll", 'this', 'ours', 'so', 'having']


We will read a bunch of books (see `names`) and parse them for the most common word (`most_common_word`).

In [7]:
names = [
    'Gulliver',
    'Alice in Wonderland',
    'Pride and prejudice',
    'Yellow wallpaper',
    'Metamorphosis ',
    'A Tale of Two Cities',
    'The Importance of Being Earnest',
    'Frankenstein'
]
url_template = 'https://raw.githubusercontent.com/yoavram/Py4Eng/master/data/{}.txt'
urls = {name: urllib.parse.quote(url_template.format(name), safe=":/") for name in names}

def most_common_word(item):
    name, url = item
    with urllib.request.urlopen(url) as f:        
        counter = Counter()
        for line in f:        
            if not line:
                break
            line = line.decode().lower()
            counter.update(line.split())
    for word in stop_words:
        counter[word] = 0
    word, count = counter.most_common(1)[0]
    return name, word, count

Here is a simple [context manager](https://docs.python.org/3.5/library/contextlib.html) for measuring time (`%timeit` is less useful for concurrency):

In [8]:
from contextlib import contextmanager

@contextmanager
def tictoc():
    tic = time.time()
    yield
    toc = time.time()
    print("Elapsed time: {:.2f} seconds".format(toc - tic))

## Sequential run

Using (one thread) to get a baseline:

In [9]:
with tictoc():
    results = map(most_common_word, urls.items())
    for name, word, count in results:
        print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Most common word in Metamorphosis  is "gregor" (168 appearances)
Most common word in The Importance of Being Earnest is "jack." (224 appearances)
Most common word in Yellow wallpaper is "project" (82 appearances)
Most common word in Alice in Wonderland is "said" (130 appearances)
Most common word in Pride and prejudice is "mr." (766 appearances)
Most common word in Gulliver is "upon" (201 appearances)
Most common word in Frankenstein is "will" (194 appearances)
Most common word in A Tale of Two Cities is "mr." (602 appearances)
Elapsed time: 10.06 seconds


## Multi-threading

To run a multi-threaded version of the above, we could use `threading` and create our threads etc., but there is a lot of boilerplate. This boilerplate can be handeled by a thread pool from the [concurrent.futures](https://docs.python.org/3/library/concurrent.futures.html) module:

In [10]:
import concurrent.futures
n_workers = len(urls)

In [11]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor(n_workers) as executor:
        results = executor.map(most_common_word, urls.items())
        for name, word, count in results:
            print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Most common word in Metamorphosis  is "gregor" (168 appearances)
Most common word in The Importance of Being Earnest is "jack." (224 appearances)
Most common word in Yellow wallpaper is "project" (82 appearances)
Most common word in Alice in Wonderland is "said" (130 appearances)
Most common word in Pride and prejudice is "mr." (766 appearances)
Most common word in Gulliver is "upon" (201 appearances)
Most common word in Frankenstein is "will" (194 appearances)
Most common word in A Tale of Two Cities is "mr." (602 appearances)
Elapsed time: 2.78 seconds


Much better, a 3-fold improvement in running time.

## Where are the futures?

The module is called `concurrent.futures` and in the documentation you can read that you are actually creating `Future` objects. These are like promises - they represent computational tasks that will be completed; therefore, they allow for an asynchronous style of programming, as we can start a task, go on to do something else, and then either check if it finished, wait for it to finish, or a assign a callback to be called when it is finished.

In the above, the `Future`s were  are handled by the executor `map` function, which creates `Future`s and waits for them to finish working. Now we will use them directly:

In [12]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor(n_workers) as executor:
        futures = [executor.submit(most_common_word, item) for item in urls.items()]
        for future in concurrent.futures.as_completed(futures):
            name, word, count = future.result()
            print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Most common word in Alice in Wonderland is "said" (130 appearances)
Most common word in Yellow wallpaper is "project" (82 appearances)
Most common word in Metamorphosis  is "gregor" (168 appearances)
Most common word in The Importance of Being Earnest is "jack." (224 appearances)
Most common word in Frankenstein is "will" (194 appearances)
Most common word in Gulliver is "upon" (201 appearances)
Most common word in Pride and prejudice is "mr." (766 appearances)
Most common word in A Tale of Two Cities is "mr." (602 appearances)
Elapsed time: 3.29 seconds


# CPU bounded program

In some cases running multiple threads actually helps even if we are in a CPU bounded scenario, because the OS may run these threads on separate cores, and **if the code that we use releases the GIL** in some way, then we can achieve "true multi-threading". Note: if the code doesn't release the GIL, we will get into a [GIL war](http://www.dabeaz.com/GIL/) and performence will suffer compared to a single-core single-thread program!

In the following example we calculate a hash of our books using the *very slow* function, `pbkdf2_hmac`. The [`haslib` docs](https://docs.python.org/3/library/hashlib.html) specify that if the data is larger than 2047 **the GIL is released** (the computation is done in C, so the GIL can be explicitly released) and therefore if we use threads we will see an improvement on multi-core machines.

In [17]:
import hashlib
import binascii
import concurrent.futures
import time

## Exercise

We start by reading books to memory so that I/O won't be an issue. Why don't you fill the missing code, which should use the `read_book` function on the `filenames` dict to read the books from the local filesystem into a dictionary called `books`. Keys are the book names, values are the book texts.

In [18]:
names = [
    'Gulliver',
    'Alice in Wonderland',
    'Pride and prejudice',
    'Yellow wallpaper',
    'Metamorphosis ',
    'A Tale of Two Cities',
    'The Importance of Being Earnest',
    'Frankenstein'
]
filenames = {name: '../data/{}.txt'.format(name) for name in names}

def read_book(item):
    name, filename = item    
    with open(filename) as f:
        data = f.read()        
        return name, data

# Your code here

`hash_book` is a slo function that takes an entire book and performs a specific [hash function](https://docs.python.org/3/library/hashlib.html#hashlib.pbkdf2_hmac) on it with multiple iterations. This is to create a slow function.

In [15]:
def hash_book(item):
    name, data = item    
    # very slow function
    fingerprint = hashlib.pbkdf2_hmac('sha512', data.encode('utf8'), b'salt', 1000000)
    return name, binascii.hexlify(fingerprint).decode()

%timeit -n 3 hash_book(list(books.items())[0])

Running in a single-thread mode - open your process monitor to see that only one core is used:

In [19]:
with tictoc():
    results = map(hash_book, books.items())
    for name, fingerprint in results:
        print('Fingerprint for {} is "{}"'.format(name, fingerprint))

Fingerprint for Metamorphosis  is "834f837094aaa5dac36fbcf090c572a40c7ac5274267e311ce26db937e3153c4ae2c1fef1453a83f56d4d46f718f337889c170c7e7a5174278f0512d3451c599"
Fingerprint for The Importance of Being Earnest is "2205918c9d6ef846723b02d306320bc2560e06d53dffe74dcc64c7d0796da20438cd9b679537e56bfa2ae69c5a5a9cdc840ecb62a3ede42404676fd852118425"
Fingerprint for Yellow wallpaper is "6830e20c22d552077a4c64b2bfff289c8776e2f83d0e9793fa634121ec639f11133fb860416a386669b791647d7c53acdea8d9c086b90ba1d8ef20b92b2ff161"
Fingerprint for Alice in Wonderland is "71a49a0fa0d36338e3072c4cff1649b2db1ad788b829613015d5e557bcf8c03ff498ac1ec1d11365ca9d70be73b791b260976caf5322d624fc21aebdf99bab9d"
Fingerprint for Pride and prejudice is "216f57cef336e96218f07d50ae1f7ab34fc42d5d4144e803595139dbd59010b8861a0ce14c6991ac5ba394ea1e414eefee88a2e1d54d1fe6887276c55707fccc"
Fingerprint for Gulliver is "279397822ce37e80b9235eb01370875546d5d5e92bb6b6d1c4d74d1fcc21a397ef42edbff782b18d27a1aa9941d73c65941f96717da62f680c11d

In multi-thread mode, you'll see that all the cores are used, at least on Windows (it is OS-dependent!):

In [20]:
with tictoc():
    with concurrent.futures.ThreadPoolExecutor() as executor:
        results = executor.map(hash_book, books.items())
        for name, fingerprint in results:
            print('Fingerprint for {} is "{}"'.format(name, fingerprint))

Fingerprint for Metamorphosis  is "834f837094aaa5dac36fbcf090c572a40c7ac5274267e311ce26db937e3153c4ae2c1fef1453a83f56d4d46f718f337889c170c7e7a5174278f0512d3451c599"
Fingerprint for The Importance of Being Earnest is "2205918c9d6ef846723b02d306320bc2560e06d53dffe74dcc64c7d0796da20438cd9b679537e56bfa2ae69c5a5a9cdc840ecb62a3ede42404676fd852118425"
Fingerprint for Yellow wallpaper is "6830e20c22d552077a4c64b2bfff289c8776e2f83d0e9793fa634121ec639f11133fb860416a386669b791647d7c53acdea8d9c086b90ba1d8ef20b92b2ff161"
Fingerprint for Alice in Wonderland is "71a49a0fa0d36338e3072c4cff1649b2db1ad788b829613015d5e557bcf8c03ff498ac1ec1d11365ca9d70be73b791b260976caf5322d624fc21aebdf99bab9d"
Fingerprint for Pride and prejudice is "216f57cef336e96218f07d50ae1f7ab34fc42d5d4144e803595139dbd59010b8861a0ce14c6991ac5ba394ea1e414eefee88a2e1d54d1fe6887276c55707fccc"
Fingerprint for Gulliver is "279397822ce37e80b9235eb01370875546d5d5e92bb6b6d1c4d74d1fcc21a397ef42edbff782b18d27a1aa9941d73c65941f96717da62f680c11d

The `concurrent.futures` module has another pool executor - a `ProcessPoolExecutor` that uses processes for the jobs. It's as easy to use as the `ThreadPoolExecutor`, but in this case no further improvement can be had by replacing `ThreadPoolExecutor` with `ProcessPoolExecutor`, at least on my machine. See [hashbook.py](../scripts/hashbook.py) for a comparison.

# Coroutines

**coroutines** are functions that can be suspended and resumed, similar to generators. When doing I/O bounded async programming, coroutines can also become `Future` objects. You can think about them as lightweight threads or green threads: they are scheduled by Python, rather than the operating system, and run in the same thread and process - so not useful for CPU-bounded programs even if releasing the GIL.

Coroutines are defined using `async def` (or in Python 3.4, using `def` with a special decorator). 

Coroutines can yield the control using the `await` statement (or `yield from` in Python < 3.5) to other coroutines, which lets the scheduler know that this coroutines in now waiting for results from another coroutine. The scheduler then schedules the other coroutine to run. This is in contrast to theads, in which control switches are managed by the operating system (and sometimes prevented by the GIL), which requires us (sometimes) to use locks and semaphors in order to make sure that our data stays "whole", and creates risks like GIL wars.

Async programming with coroutines can enchance runtime in I/O-bouded programs, and the programmatic overhead is relatively small (as can be seen below). Moreover, coroutines are much cheaper than threads, in terms of their memory footprint, as they all live in a single interperter and thread. Therefore, we can spawn thousands of them, while we cannot afford to spawn thousands of threads.

Let's see an implementation of the above `most_common_word` program using async programming.

- we define the coroutine, which uses the *aiohttp* package to read files from the web instead of the *urllib* package
- *aiohttp* does async HTTP requests and responses (both for servers and clients)
- we instantiate the coroutines (much like we would do with generators, rather than functions)
- we create an event loop (using the standard library module *asyncio*), which will be responsible for scheduling and driving the coroutines, and handling the switching of control between them
- give the loop all the coroutines
- run the loop.
- collect the async results
- print the results

Note: to install *aiohttp*, use `pip install aiohttp` (if you are following this notebook on *Binder* and the next import doesn't work, try running this in a cell: `!/home/main/anaconda2/envs/binder/bin/python -m pip install aiohttp`).

In [21]:
import aiohttp
import asyncio

In [22]:
async def most_common_word(name, url):
    response = await aiohttp.get(url)
    assert response.status == 200
    counter = Counter()
    async for line in response.content:
        if not line:
            break
        line = line.decode().lower()
        counter.update(line.split())
    response.close()
    for word in stop_words:
        counter[word] = 0
    word, count = counter.most_common(1)[0]
    return name, word, count

In [23]:
coros = [most_common_word(name, url) for name, url in urls.items()]
loop = asyncio.get_event_loop()
type(coros[0])

coroutine

In [25]:
with tictoc():
    tasks = loop.run_until_complete(asyncio.wait(coros))
    for t in tasks[0]:
        name, word, count = t.result()
        print('Most common word in {} is "{}" ({} appearances)'.format(name, word, count))

Most common word in Metamorphosis  is "gregor" (168 appearances)
Most common word in Alice in Wonderland is "said" (130 appearances)
Most common word in Frankenstein is "will" (194 appearances)
Most common word in Yellow wallpaper is "project" (82 appearances)
Most common word in The Importance of Being Earnest is "jack." (224 appearances)
Most common word in A Tale of Two Cities is "mr." (602 appearances)
Most common word in Gulliver is "upon" (201 appearances)
Most common word in Pride and prejudice is "mr." (766 appearances)
Elapsed time: 2.77 seconds


# Multiprocessing using IPython Parallel

From the [threading](https://docs.python.org/3/library/threading.html) module:
> CPython implementation detail: In CPython, due to the Global Interpreter Lock, **only one thread can execute Python code at once**... If you want your application to make better use of the **computational resources of multi-core machines**, you are advised to use `multiprocessing` or `concurrent.futures.ProcessPoolExecutor`.

The standard library module, [multiprocessing](https://docs.python.org/3/library/multiprocessing.html), provides low-level interfaces for the use of multiple processes. `concurrent.futures` provides a high-level API using pools and such.

We will use the [IPython Parallel](https://ipyparallel.readthedocs.org/) pacakge, which you can install with `conda install ipyparallel` or `pip install ipyparallel`. We'll start with some examples, but there are more [examples](http://nbviewer.jupyter.org/github/ipython/ipyparallel/blob/master/examples/Index.ipynb) and [demos](https://ipyparallel.readthedocs.org/en/latest/demos.html). *Note*: IPython Parallel is separate from the notebook and can run without it just fine.

If you are using *Binder*, you can install with `!/home/main/anaconda2/envs/binder/bin/python -m pip install ipyparallel` and then open a new notebook and run in a cell: `!/home/main/anaconda2/envs/binder/bin/ipcluster start -n 3`.

In [1]:
import ipyparallel as ipp

IPython Parallel lets us create a cluster. The default is to start a local cluster using subprocesses - let's start an IPython Cluster with 3 nodes:
```
ipcluster start -n 3
```

But in general we can also start a cluster using an MPI environment, SGE (qsub) environment, and other environments. You can also [start one cluster controller](https://ipyparallel.readthedocs.org/en/latest/process.html#starting-the-controller-and-engines-on-different-hosts) on one machine and then cluster engines on several other machines connected to the same LAN (or using SSH tunnels).

Next, we create a cluster client, with a view that uses all cores:

In [7]:
rc = ipp.Client()
v = rc.load_balanced_view()

Our first option for paralleling operations is by using the `map` method of the view instead of the regular `map` function:

In [None]:
result = executor.map(lambda x: 2*x, range(10))
print("Simple sync map: ", list(result))

We can also use an async map which submits tasks, gives us back IDs, and we can then wait for the tasks to finish:

In [11]:
aresult = v.map_async(lambda x: 2*x, range(10))
print("Submitted tasks, got ids: ", aresult.msg_ids)
result = aresult.get()
print("Using a mapper: ", result)

Submitted tasks, got ids:  ['2fde147d-c1c4-425a-9d27-278d30feef88', '4af5560f-eef0-4e33-81af-2cbb94322502', 'dcfd248a-b0f3-4e29-b933-60e4d94574d1', 'b1818a89-8dba-4cd3-969f-d709e16e3f8f', '71a4f9bb-95dd-4e91-8d2c-52e219ad273e', 'd4d15f3f-63e7-434c-8f0a-4403780bc887', 'a6472ab8-3168-4d2e-8109-7e74bc9794bc', 'dca78456-2dbb-491f-8364-127a2ec23529', 'b3e4e3f5-cf67-4b41-9088-775388f9decb', '0df4e086-0c02-48f7-aeb6-5f993ed28934']
Using a mapper:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


In [12]:
type(aresult), type(result)

(ipyparallel.client.asyncresult.AsyncMapResult, list)

We can also use a decorator to create a "parallel function" (with the sync or async approach, depending on if we specify `block=True` or `False`):

In [15]:
@v.parallel(block=True)
def f(x): 
    return 2*x

result = f.map(range(10))
print("Using a parallel function: ", result)

Using a parallel function:  [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]


If the `map` idiom doesn't quite fit our use case, we can just use `apply`. For example, take this function (details are in the [linear algebra session](linear-algebra.ipynb)):

In [26]:
def rand_mat_eigvals(bottom, top, n):
    import numpy as np
    X = np.random.randint(bottom, top, size=(n, n))
    eigvals = np.linalg.eigvals(X)
    return eigvals.max()
%timeit -n 1 -r 1 rand_mat_eigvals(0, 9, 1000)

1 loop, best of 1: 1.36 s per loop


In [27]:
res1 = v.apply_async(rand_mat_eigvals, 0, 9, 1000)
res2 = v.apply_async(rand_mat_eigvals, 0, 9, 1000)
res3 = v.apply_async(rand_mat_eigvals, 0, 9, 1000)
res4 = v.apply_async(rand_mat_eigvals, 0, 9, 1000)
res1.get(), res2.get(), res3.get(), res4.get()

((4001.0273477265564+0j),
 (3999.6972418514506+0j),
 (4000.2252682862822+0j),
 (4000.7451400583459+0j))

Note that we needed to import Numpy **inside** the function, otherwise the "other" processes wouldn't know what `np` is and will throw a `NameError`.

Let's look at the example [wordfreq.py](https://github.com/ipython/ipyparallel/blob/master/examples/daVinci%20Word%20Count/wordfreq.py) and [pwordfreq.py](https://github.com/ipython/ipyparallel/blob/master/examples/daVinci%20Word%20Count/pwordfreq.py).

# Solutions

For reading the books:

```py
with concurrent.futures.ThreadPoolExecutor() as executor:
    books = dict(executor.map(read_book, filenames.items()))   
```

# References

- [General concepts in concurrency](https://learn-gevent-socketio.readthedocs.org/en/latest/general_concepts.html)
- [Understanding the GIL](http://www.dabeaz.com/GIL/), a presentation by David Beazley, is a deep and engaing description of the old and new Python GIL.
- [Python Concurrency From the Ground Up](https://www.youtube.com/watch?v=MCs5OvhV9S4), a video presentation by David Beazley from PyCon 2015.
- The [threading](https://docs.python.org/3/library/threading.html) module
- The [multiprocessing](https://docs.python.org/3/library/multiprocessing.html) module
- [IPython Parallel](https://ipyparallel.readthedocs.org/)
- [distributed](https://distributed.readthedocs.org/en/latest/) by Continuum is a new package that facilitates data analysis on multiple machines.


## Colophon
This notebook was written by [Yoav Ram](http://www.yoavram.com) and is part of the _Python for Engineers_ course.

The notebook was written using [Python](http://pytho.org/) 3.4.4, [IPython](http://ipython.org/) 4.0.3 and [Jupyter](http://jupyter.org) 4.0.6.

This work is licensed under a CC BY-NC-SA 4.0 International License.

![Python logo](https://www.python.org/static/community_logos/python-logo.png)