# Concurrency and Multithreading

We will examine the benefits---and potential pitfalls---of concurrency and multi-threading in Python.

This includes:
 *  Threaded mult-tasking with `threading` on a single compute core.
 *  Asynchronous computing with `async`
 * Multiprocessing (parallelism through multiple processors) using `multiprocessing`


The appropriate type of concurrency depends on whether the application is CPU or I/O bound.

### Threading
To control the order of operations on a processor, Python runs with the dreaded <b>G</b>lobal <b>I</b>nterpretor <b>L</b>ock (GIL). This restriction means you have complete control of execution order and guaranteed determinism. However, this also means that true threading is not possible with the default version of Python (known as "CPython"). 

For many operations, you may not care about the order of execution, as long as the tasks get done. Introducing a threading framework may elucidate the logic behind a section of code. However, because of the GIL, very rarely will this result in an increase in performance.

 *  Good candidates for multithreading: tasks that often wait for external triggers
 *  Poor candidates for multithreading: CPU-intensive tasks

The Python standard library comes with the `threading` package:

Let's create a secondary thread to identify itself and calculate a mean. We'll keep track of the 'main' thread too:

In [4]:
import numpy as np
import logging
import threading

def print_mean(n):
    logging.info("(Thread) We've already begun. Where are you?")
    print("(Thread) Trying to calculate the mean now...\n")
    time.sleep(2)
    print(np.mean(np.random.randn(n)))
    print(f"(Thread) I am thread number {threading.get_ident()}")
    

print(f"(Main) I am thread number {threading.get_ident()}")

logging.info("(Main) Before secondary thread creation")

my_thread = threading.Thread(target=print_mean, args=[10])

logging.info("(Main) Before running secondary thread")

my_thread.start()

logging.info("(Main) Waiting for this thing to finish(?)")

logging.info("(Main) Done. What about you?")

print(f"(Main) I am thread number {threading.get_ident()}")

(Main) I am thread number 140689096308480
(Thread) Trying to calculate the mean now...
(Main) I am thread number 140689096308480



Exception in thread Thread-7:
Traceback (most recent call last):
  File "/home/ra/anaconda3/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/ra/anaconda3/lib/python3.6/threading.py", line 864, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-4-993d9c0e575d>", line 8, in print_mean
    time.sleep(2)
NameError: name 'time' is not defined



### Daemon threads
Daemon threads shut down upon exit of the program. You can think of them conceptually as background processes. The analogy is to the operating system daemon (_e.g._ in \*nix systems).

In [47]:
logging.info("(Main) Before thread creation")

my_thread = threading.Thread(target=print_mean, args=[10], daemon=True)

logging.info("(Main) Before running thread")

my_thread.start()

logging.info("(Main) Waiting for it to finish(?)")

my_thread.join()

logging.info("(Main) I'm done. What about you?")



07:39:55: (Main) Before thread creation
07:39:55: (Main) Before running thread
07:39:55: (Thread) We've already begun. Where are you?
07:39:55: (Main) Waiting for it to finish(?)


(Thread) Trying to calculate the mean now...



07:39:57: (Main) I'm done. What about you?


0.22205654722377893
(Thread) I am thread number 139851874989824


### Race conditions

In [6]:
import itertools
import random

punchline = ["I", "don't", "want", "any", "race", "conditions", "like", "last", "time"]
random.shuffle(punchline)
A = " ".join(punchline).capitalize()

print(f"Two threads walk into a bar...\n\n")
print(f"The barkeeper looks up and yells:\nHey! {A}!")

Two threads walk into a bar...


The barkeeper looks up and yells:
Hey! Race like time want don't conditions any last i!


### Asynchronous computing and `asyncio`


In [2]:
import asyncio

async def main():
    print('hello')
    await asyncio.sleep(1)
    print('world')

asyncio.run(main())
#main()

RuntimeError: asyncio.run() cannot be called from a running event loop

### Multiprocessing

Although Moore's Law still currently holds for microprocessors, raw processor speed plateaued around the mid-2000s, with the introduction of multicore processors: 

![The power, speed and logical core count of processors over the last four decades. Note the introduction of multicore processors in 2003](./images/40-years-processor-trend.png)

Hence it is possible to run applications that achieve high throughput via parallelism on the same machine (local concurrency). Of course the same logic is scalable across multiple compute nodes (remote concurrency).

The `multiprocessing` package offers both local and remote concurrency via sub-processes, rather than threads. This circumvents the Global Interpreter Lock (GIL). Recall all the pain of the above? Python can make use of the multicore processors, regardless of operating system, achieving true parallelism.

---

Depending on the platform, multiprocessing supports three ways to start a process. These start methods are `spawn`, `fork` and `forkserver`. The last two method calls are defined only for \*nix systems (system calls with `os.fork()`).

This is achieved either by `set_start_method()`:

In [3]:
import multiprocessing as mp

def say_hello(my_queue):
    my_queue.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    my_queue = mp.Queue()
    p = mp.Process(target=say_hello, args=(my_queue,))
    p.start()
    print(my_queue.get())
    p.join()

KeyboardInterrupt: 

Or using a context manager, replacing the `mp.set_start_method('spawn')` with `ctx = mp.get_context('spawn')`. 

the Pool object which offers a convenient means of parallelizing the execution of a function across multiple input values, distributing the input data across processes (data parallelism). The following example demonstrates the common practice of defining such functions in a module so that child processes can successfully import that module. This basic example of data parallelism using Pool,

In [4]:
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

  super().__init__(process_obj)


KeyboardInterrupt: 

In multiprocessing, processes are spawned by creating a Process object and then calling its start() method. Process follows the API of threading.Thread. A trivial example of a multiprocess program is:

In [2]:
from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

hello bob


To show the individual process IDs involved, here is an expanded example:

In [3]:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

main line
module name: __main__
parent process: 31240
process id: 9612
function f
module name: __main__
parent process: 9612
process id: 14798
hello bob


`multiprocessing` supports two types of communication channel between processes: `Queues` and `Pipes`. The former allows multiple producers and consumers; the latter allows for a connection between two processes.

In [4]:
from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())
    p.join()

[42, None, 'hello']


The `Pipe()` function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

In [7]:
from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

[42, None, 'hello']


For sychronization, use a lock to ensure that only one process prints to standard output at a time:

It is best to avoid using shared state as far as possible. This is particularly true when using multiple processes.

However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

**Shared memory**

Data can be stored in a shared memory map using Value or Array:

In [13]:
from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


## Conclusion

We examined a number of concurrency concepts from Python. We looked at threaded multi-tasking with `threading`, asynchronous computing with `asyncio` and exploiting multiple processors using `multiprocessing`. 

We also looked at performance in light of the Global Interpretor Lock (GIL).