# Concurrency and Parallelism

**Concurrency** is when a computer does many different things *seemingly* at the same time.

**Parallelism** is *actually* doing many different things at the same time.

The key difference between parallelism and concurrency is *speedup*.

Python makes it easy to write concurrent programs. But it can be very difficult to make concurrent Python code truly run in parallel.

- [Item 36: Use subprocess to Manage Child Processes](#Item-36:-Use-subprocess-to-Manage-Child-Processes)
- [Item 37: Use Threads for Blocking I/O, Avoid for Parallelism](#Item-37:-Use-Threads-for-Blocking-I/O,-Avoid-for-Parallelism)
- [Item 38: Use Lock to Prevent Data Races in Threads](#Item-38:-Use-Lock-to-Prevent-Data-Races-in-Threads)
- [Item 39: Use Queue to Coordinate Work Between Threads](#Item-39:-Use-Queue-to-Coordinate-Work-Between-Threads)
- [Item 40: Consider Coroutines to Run Many Functions Concurrently](#Item-40:-Consider-Coroutines-to-Run-Many-Functions-Concurrently)

## Item 36: Use *subprocess* to Manage Child Processes

With the Python of today, the best and simplest choice for managing child processes is to use the *subprocess* built-in module.

In [None]:
import subprocess

proc = subprocess.Popen(
        ['echo', 'Hello from the child'],
        stdout=subprocess.PIPE)
out, err = proc.communicate()
print(out.decode('utf-8'))

print('starting')
proc = subprocess.Popen(['sleep', '1'])
print('started')
proc.communicate()
print('communicated')
while proc.poll() is None:
    pass
    
print('Exit status', proc.poll())

In [None]:
from time import time

def run_sleep(period):
    proc = subprocess.Popen(['sleep', str(period)])
    return proc

start = time()
procs = []
for _ in range(10):
    proc = run_sleep(0.1)
    procs.append(proc)
    
for proc in procs:
    proc.communicate()
end = time()
print('Finished in %.3f seconds' % (end - start))

In [None]:
import os

def run_openssl(data):
    env = os.environ.copy()
    env['password'] = b'\xe24U\n\xd0Ql3S\x11'
    proc = subprocess.Popen(
        ['openssl', 'enc', '-des3', '-pass', 'env:password'],
        env=env,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE)
    proc.stdin.write(data)
    proc.stdin.flush()
    return proc

procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    procs.append(proc)
    
for proc in procs:
    out, err = proc.communicate()
    print(out[-10:])
    
def run_md5(input_stdin):
    proc = subprocess.Popen(
        ['md5'],
        stdin=input_stdin,
        stdout=subprocess.PIPE)
    return proc

input_procs = []
hash_procs = []
for _ in range(3):
    data = os.urandom(10)
    proc = run_openssl(data)
    input_procs.append(proc)
    hash_proc = run_md5(proc.stdout)
    hash_procs.append(hash_proc)
    
for proc in input_procs:
    proc.communicate()
for proc in hash_procs:
    out, err = proc.communicate()
    print(out.strip())

In [None]:
def run_sleep(period):
    proc = subprocess.Popen(['sleep', str(period)])
    return proc

proc = run_sleep(10)
try:
    proc.communicate(timeout=0.1)
except subprocess.TimeoutExpired:
    proc.terminate()
    proc.wait()
    
print('Exit status', proc.poll())

### Things to Remember

- Use the *subprocess* module to run child processes and manage their input and output streams.
- Child processes run in parallel with the Python interpreter, enabling you to maximize your CPU usage.
- Use the *timeout* parameter with *communicate* to avoid deadlocks and hanging child processes.

## Item 37: Use Threads for Blocking I/O, Avoid for Parallelism

Python enforces coherence with a mechanism called the *global interpreter lock* (GIL).

The GIL has an important negative side effect. Although Python supports multiple threads of execution, the GIL causes only one of them to make forward progress at a time. This means that when you reach for threads to do parallel computation and speed up your Python programs, you will be sorely disappointed.

In [None]:
from time import time

def factorize(number):
    for i in range(1, number + 1):
        if number % i == 0:
            yield i
            
    
numbers = [2139079, 218502, 589213, 789123]
start = time()
for number in numbers:
    list(factorize(number))
end = time()
print('Took %.3f seconds' % (end - start))

from threading import Thread

class FactorizeThread(Thread):
    def __init__(self, number):
        super().__init__()
        self.number = number
        
    def run(self):
        self.factors = list(factorize(self.number))
        
start = time()
threads = []
for number in numbers:
    thread = FactorizeThread(number)
    thread.start()
    threads.append(thread)
    
for thread in threads:
    thread.join()
end = time()
# Even longer with multithread
print('Took %.3f seconds' % (end - start))

There are ways to get CPython to utilize multiple cores, but it doesn't work with the standard **Thread** class and it can require substantial effort.

Why does Python support threads at all?

1. Multiple threads make it easy for your program to seem like it's doing multiple things at the same time.
2. Python supports threads is to deal with blocking I/O, which happens when Python does certain types of system calls.

In [None]:
import select

def slow_systemcall():
    select.select([], [], [], 0.1)
    
start = time()
for _ in range(5):
    slow_systemcall()
end = time()
print('Took %.3f seconds' % (end - start))

start = time()
threads = []
for _ in range(5):
    thread = Thread(target=slow_systemcall)
    thread.start()
    threads.append(thread)

for thread in threads:
    thread.join()
end = time()
print('Took %.3f seconds' % (end - start))

The GIL prevents Python code from running in parallel, but it has no negative effect on system calls. This works because Python threads release the GIL just before they make system calls and reacquire the GIL as soon as the system calls are done.

### Things to Remember

- Python threads can't run bytecode in parallel on multiple CPU cores because of the global interpreter lock (GIL)
- Python threads are still useful despite the GIL because they provide an easy way to do multiple things at seemingly the same time.
- Use Python threads to make multiple system calls in parallel. This allows you to do blocking I/O at the same time as computation.

## Item 38: Use *Lock* to Prevent Data Races in Threads


The global interpreter lock (GIL) will not protect you.

In [None]:
from time import time

class Counter(object):
    def __init__(self):
        self.count = 0
        
    def increment(self, offset):
        self.count += offset
        
def worker(sensor_index, how_many, counter):
    for _ in range(how_many):
        counter.increment(1)
        
def run_threads(func, how_many, counter):
    threads = []
    for i in range(5):
        args = (i, how_many, counter)
        thread = Thread(target=func, args=args)
        threads.append(thread)
        thread.start()
    for thread in threads:
        thread.join()
        
how_many = 10**5
counter = Counter()
start = time()
run_threads(worker, how_many, counter)
end = time()
print('Counter should be %d, found %d, time: %.3f' %
      (5 * how_many, counter.count, end - start))

from threading import Lock

class LockingCounter(object):
    def __init__(self):
        self.lock = Lock()
        self.count = 0
        
    def increment(self, offset):
        with self.lock:
            self.count += offset
            
counter = LockingCounter()
start = time()
run_threads(worker, how_many, counter)
end = time()
print('Locking Counter should be %d, found %d, time: %.3f' %
      (5 * how_many, counter.count, end - start))

### Things to Remember

- Even though Python has a global interpreter lock, you're still responsible for protecting against data races between the threads in your programs.
- Your programs will corrupt their data structures if you allow multiple threads to modify the same objects without locks.
- The Lock class in the threading built-in module is Python's standard mutual exclusion lock implementation.

## Item 39: Use Queue to Coordinate Work Between Threads

In [None]:
from collections import deque
from time import sleep

class MyQueue(object):
    def __init__(self):
        self.items = deque()
        self.lock = Lock()
        
    def put(self, item):
        with self.lock:
            self.items.append(item)
            
    def get(self):
        with self.lock:
            return self.items.popleft()
        
        
class Worker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.polled_count = 0
        self.work_done = 0
        
    def run(self):
        while True:
            self.polled_count += 1
            try:
                item = self.in_queue.get()
            except IndexError:
                sleep(0.01)  # No work to do
            else:
                result = self.func(item)
                self.out_queue.put(result)
                self.work_done += 1

def download(item):
    sleep(0.001)
    
def resize(item):
    sleep(0.00001)
    
def upload(item):
    sleep(0.001)
                
download_queue = MyQueue()
resize_queue = MyQueue()
upload_queue = MyQueue()
done_queue = MyQueue()
threads = [
    Worker(download, download_queue, resize_queue),
    Worker(resize, resize_queue, upload_queue),
    Worker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()
    
for _ in range(1000):
    download_queue.put(object())
    
while len(done_queue.items) < 1000:
    sleep(0.1)
    
processed = len(done_queue.items)
polled = sum(t.polled_count for t in threads)
print('Proccessd', processed, 'items after polling',
      polled, 'times')

Three problems with above implementation:

1. busy wait on the *done_queue*
2. In *Worker* the *run* method will execute forever in its busy loop. There's no way to signal to a worker thread that it's time to exit.
3. Out of memory for a slow queue

### Queue to the Rescue

In [None]:
from threading import Thread
from queue import Queue


queue = Queue()

def consumer():
    print('Comsumer waiting')
    queue.get()
    print('Consumer done')
    
thread = Thread(target=consumer)
thread.start()

print('Producer putting')
queue.put(object())
thread.join()
print('Producer done')

In [None]:
from queue import Queue
from time import sleep

queue = Queue(1)

def consumer():
    sleep(0.1)
    queue.get()
    print('Consumer got 1')
    queue.get()
    print('Consumer got 2')
    
thread = Thread(target=consumer)
thread.start()

queue.put(object())
print('Producer put 1')
queue.put(object())
print('Producer put 2')
thread.join()
print('Producer done')

In [None]:
from queue import Queue
from threading import Thread

in_queue = Queue()

def consumer():
    print('Consumer waiting')
    work = in_queue.get()
    print('Consumer working 1')
    work = in_queue.get()
    print('Consumer working 2')
    print('Consumer done')
    in_queue.task_done()
    in_queue.task_done()
    
Thread(target=consumer).start()
in_queue.put(object())
print('Producer put 1')
in_queue.put(object())
print('Producer put 2')
print('Producer waiting')
in_queue.join()
print('Producer done')

In [None]:
from queue import Queue
from threading import Thread

class ClosableQueue(Queue):
    SENTINEL = object()
    
    def close(self):
        self.put(self.SENTINEL)
        
    def __iter__(self):
        while True:
            item = self.get()
            try:
                if item is self.SENTINEL:
                    return  # Exit
                yield item
            finally:
                self.task_done()
                

class StoppableWorker(Thread):
    def __init__(self, func, in_queue, out_queue):
        super().__init__()
        self.func = func
        self.in_queue = in_queue
        self.out_queue = out_queue
        
    def run(self):
        for item in self.in_queue:
            result = self.func(item)
            self.out_queue.put(result)
            
def download(item):
    sleep(0.001)
    
def resize(item):
    sleep(0.00001)
    
def upload(item):
    sleep(0.001)

download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()

threads = [
    StoppableWorker(download, download_queue, resize_queue),
    StoppableWorker(resize, resize_queue, upload_queue),
    StoppableWorker(upload, upload_queue, done_queue),
]

for thread in threads:
    thread.start()
for _ in range(10):
    download_queue.put(object())
download_queue.close()
download_queue.join()
resize_queue.close()
resize_queue.join()
upload_queue.close()
upload_queue.join()
print(done_queue.qsize(), 'items finished')

### Things to Remember

- Pipelines are a great way to organize sequences of work that run concurrently using multiple Python threads.
- Be aware of the many problems in building concurrent pipelines: busy waiting, stopping workers, and memory explosion.
- The **Queue** class has all of the facilities you need to build robust pipelines: blocking operations, buffer sizes, and joining.

## Item 40: Consider Coroutines to Run Many Functions Concurrently

Three big problems with threads:

1. They require special tools to coordinate with each other safely (Lock and Queue).
2. Threads require a lot of memory, about 8MB per executing thread.
3. Threads are costly to start. If you want to constantly be creating new concurrent functions and finishing them, the overhead of using threads becomes large and slows everything down.

Python can work around all these issues with coroutines.

In [None]:
def my_coroutine():
    while True:
        received = yield
        print('Received:', received)
        
it = my_coroutine()
next(it)
it.send('First')
it.send('Second')

In [None]:
def minimize():
    current = yield
    while True:
        value = yield current
        current = min(value, current)
        
it = minimize()
next(it)
print(it.send(10))
print(it.send(9))
print(it.send(-1))
print(it.send(50))
it.close()

Coroutines pause at each **yield** expression in the generator function and resume after each call to send from the outside. This is the magical mechanism of coroutines.

### The Game of Life

In [None]:
from collections import namedtuple

ALIVE = '*'
EMPTY = '-'

Query = namedtuple('Query', ('y', 'x'))

def count_neighbors(y, x):
    n_ = yield Query(y + 1, x + 0)
    ne = yield Query(y + 1, x + 1)
    e_ = yield Query(y + 0, x + 1)
    se = yield Query(y - 1, x + 1)
    s_ = yield Query(y - 1, x + 0)
    sw = yield Query(y - 1, x - 1)
    w_ = yield Query(y + 0, x - 1)
    nw = yield Query(y + 1, x - 1)
    neighbor_states = [n_, ne, e_, se, s_, sw, w_, nw]
    count = 0
    for state in neighbor_states:
        if state == ALIVE:
            count += 1
    return count


Transition = namedtuple('Transition', ('y', 'x', 'state'))

def game_logic(state, neighbors):
    if state == ALIVE:
        if neighbors < 2 or neighbors > 3:
            return EMPTY
    elif neighbors == 3:
        return ALIVE
    return state

def step_cell(y, x):
    state = yield Query(y, x)
    neighbors = yield from count_neighbors(y, x)
    next_state = game_logic(state, neighbors)
    yield Transition(y, x, next_state)
    
TICK = object()

def simulate(height, width):
    while True:
        for y in range(height):
            for x in range(width):
                yield from step_cell(y, x)
        yield TICK
        
class Grid(object):
    def __init__(self, height, width):
        self.height = height
        self.width = width
        self.rows = []
        for _ in range(self.height):
            self.rows.append([EMPTY] * self.width)
            
    def __str__(self):
        return '\n'.join(''.join(self.rows[r]) for r in range(self.height))
            
    def query(self, y, x):
        return self.rows[y % self.height][x % self.width]
    
    def assign(self, y, x, state):
        self.rows[y % self.height][x % self.width] = state
        
def live_a_generation(grid, sim):
    progeny = Grid(grid.height, grid.width)
    item = next(sim)
    while item is not TICK:
        if isinstance(item, Query):
            state = grid.query(item.y, item.x)
            item = sim.send(state)
        else:  # Must be a Transition
            progeny.assign(item.y, item.x, item.state)
            item = next(sim)
    return progeny


#  Start
grid = Grid(3, 3)
# grid.assign(0, 1, ALIVE)
# grid.assign(1, 2, ALIVE)
grid.assign(1, 0, ALIVE)
grid.assign(2, 0, ALIVE)
# grid.assign(2, 1, ALIVE)
grid.assign(0, 2, ALIVE)
# grid.assign(1, 1, ALIVE)

print(grid, '\n')

sim = simulate(grid.height, grid.width)
for i in range(1):
    grid = live_a_generation(grid, sim)
    
print(grid)

In [None]:
it = count_neighbors(10, 5)
q1 = next(it)
print('First yield:   ', q1)
q2 = it.send(ALIVE)
print('Second yield:  ', q2)
q3 = it.send(ALIVE)
print('Third yield:   ', q3)
q4 = it.send(ALIVE)
print('Fourth yield:  ', q4)
q5 = it.send(ALIVE)
print('Fifth yield:   ', q5)
q6 = it.send(ALIVE)
print('Sixth yield:   ', q6)
q7 = it.send(ALIVE)
print('Seventh yield: ', q7)
q8 = it.send(EMPTY)
try:
    count = it.send(EMPTY)
except StopIteration as e:
    print('Count: ', e.value)

In [None]:
it = step_cell(10, 5)
q0 = next(it)
print('Me:        ', q0)
q1 = it.send(ALIVE)
print('Q1:        ', q1)
q2 = it.send(ALIVE)
print('Q2:        ', q2)
q3 = it.send(ALIVE)
print('Q3:        ', q3)
q4 = it.send(ALIVE)
print('Q4:        ', q4)
q5 = it.send(ALIVE)
print('Q5:        ', q5)
q6 = it.send(ALIVE)
print('Q6:        ', q6)
q7 = it.send(ALIVE)
print('Q7:        ', q7)
q8 = it.send(ALIVE)
print('Q8:        ', q8)
t1 = it.send(EMPTY)
print('Outcome: ', t1)

### Things to Remember

- Coroutines provide an efficient way to run tens of thousands of functions seemingly at the same time.
- Within a generator, the value of the yield expression will be whatever value was passed to the generator's send method from the exterior code.
- Coroutines give you a powerful tool for separating the code logic of your program from its interaction with the surrounding environment.
- Python 2 doesn't support yield from or returning values from generators.