# 12 - Concurrency

## Starting and Stopping Threads
The threading library can be used to execute any Python callable in its own thread. To do this, you create a Thread instance and supply the callable that you wish to execute as a target. 

In [1]:
import time

def countdown(n):
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(0.25)


In [2]:
from threading import Thread

t = Thread(target=countdown, args=(5,))
t.start()

T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


In [3]:
t.is_alive()

False

The interpreter remains running until all threads terminate. For long-running threads or background tasks that run forever, you should consider making the thread daemonic. Daemon threads are destroyed when the main thread terminates.

In [4]:
t = Thread(target=countdown, args=(5,), daemon=True)
t.start()

T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


If you want to be able to terminate threads, the thread must be programmed to poll for exit at selected points.

In [6]:
class CountdownTask:
    def __init__(self):
        self._running = True
 
    def terminate(self):
        self._running = False

    def run(self, n):
        while self._running and n > 0:
            print('T-minus', n)
            n -= 1
            time.sleep(1)


In [7]:
c = CountdownTask()
t = Thread(target=c.run, args=(10,))
t.start()

T-minus 10
T-minus 9


In [8]:
c.terminate() # signal termination
t.join()      # wait for actual termination (if needed)

Polling for thread termination can be tricky to coordinate if threads perform blocking operations such as I/O. To correctly deal with this case, you’ll need to carefully program thread to utilize timeout loops. 

In [9]:
class IOTask:
    def terminate(self):
        self._running = False
 
    def run(self, sock):
        # sock is a socket
        sock.settimeout(5)  # set timeout period
        while self._running:
            # perform a blocking I/O operation w/ timeout
            try:
                data = sock.recv(8192)
                break
            except socket.timeout:
                continue
                # continued processing
        return


Due to a global interpreter lock (GIL), Python threads are restricted to an execution model that only allows one thread to execute in the interpreter at any given time. For this reason, Python threads should generally not be used for computationally intensive tasks where you are trying to achieve parallelism on multiple CPUs.

Sometimes you will see threads defined via inheritance from the Thread class. Although this works, it introduces an extra dependency between the code and the threading library. By freeing your code of such dependencies, it becomes usable in other
contexts that may or may not involve threads.

## Determining If a Thread Has Started
To solve such problems, use the Event object from the threading library.

In [2]:
from threading import Thread, Event
import time


def countdown(n, started_evt):
    print('countdown starting')
    started_evt.set()
    while n > 0:
        print('T-minus', n)
        n -= 1
        time.sleep(1)


started_evt = Event()
print('Launching countdown')
t = Thread(target=countdown, args=(5, started_evt))
t.start()

# wait for the thread to start
started_evt.wait()
print('countdown is running')


Launching countdown
countdown starting
T-minus 5
countdown is running
T-minus 4
T-minus 3
T-minus 2
T-minus 1


If a thread is going to repeatedly signal an event over and over, you’re probably better off using a [Condition](https://docs.python.org/3/library/threading.html#condition-objects) object instead. For example, this code implements a periodic timer that other threads can monitor to see whenever the timer expires.

In [6]:
import threading
import time


class PeriodicTimer:
    def __init__(self, interval):
        self._interval = interval
        self._flag = 0
        self._cv = threading.Condition()
 
    def start(self):
        t = threading.Thread(target=self.run)
        t.daemon = True
        t.start()

    def run(self):
        '''
        Run the timer and notify waiting threads after each interval
        '''
        while True:
            time.sleep(self._interval)
            with self._cv:
                self._flag ^= 1
                self._cv.notify_all()

    def wait_for_tick(self):
        '''
        Wait for the next tick of the timer
        '''
        with self._cv:
            last_flag = self._flag
            while last_flag == self._flag:
                self._cv.wait()


In [8]:
# example use of the timer
ptimer = PeriodicTimer(2)
ptimer.start()

# two threads that synchronize on the timer
def countdown(nticks):
    while nticks > 0:
        ptimer.wait_for_tick()
        print('T-minus', nticks)
        nticks -= 1

def countup(last):
    n = 0
    while n < last:
        ptimer.wait_for_tick()
        print('Counting', n)
        n += 1


In [9]:
threading.Thread(target=countdown, args=(10,)).start()
threading.Thread(target=countup, args=(5,)).start()

T-minus 10
Counting 0
T-minusCounting 1
 9
CountingT-minus 8
 2
CountingT-minus 7 3

CountingT-minus 6
 4
T-minus 5
T-minus 4
T-minus 3
T-minus 2
T-minus 1


## Communicating Between Threads
Perhaps the safest way to send data from one thread to another is to use a Queue from the queue library.

In [3]:
from queue import Queue
from threading import Thread

# a thread that produces data
def producer(out_q):
    data = 1
    while True:
        out_q.put(data)
        data += 1
        if data > 5:
            break

# a thread that consumes data
def consumer(in_q):
    while True:
        data = in_q.get()
        print(data)


# create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

1
2
3
4
5


Queue instances already have all of the required locking, so they can be safely shared by as many threads as you wish.

When using queues, it can be somewhat tricky to coordinate the shutdown of the producer and consumer. A common solution to this problem is to rely on a special sentinel value, which when placed in the queue, causes consumers to terminate.

In [5]:
from queue import Queue
from threading import Thread

_sentinel = object()

# a thread that produces data
def producer(out_q):
    data = 1
    while True:
        out_q.put(data)
        data += 1
        if data > 5:
            out_q.put(_sentinel)

# a thread that consumes data
def consumer(in_q):
    while True:
        data = in_q.get()
        
        if data is _sentinel:
            in_q.put(_sentinel)
            break

        print(data)


# create the shared queue and launch both threads
q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

1
2
3
4
5


A subtle feature of this example is that the consumer, upon receiving the special sentinel value, immediately places it back onto the queue. This propagates the sentinel to other consumers threads that might be listening on the same queue—thus shutting them all down one after the other.

If a thread needs to know immediately when a consumer thread has processed a particular item of data, you should pair the sent data with an Event object that allows the producer to monitor its progress.

In [7]:
from queue import Queue
from threading import Thread, Event

collection = list(range(1,6))

def producer(out_q):
    while True:
        if len(collection) == 0:
            break
        data = collection.pop()
        evt = Event()
        out_q.put((data, evt))
        evt.wait()


def consumer(in_q):
    while True:
        data, evt = in_q.get()
        print(data)
        evt.set()


q = Queue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()

5
4
3
2
1


If you are concerned about shared state, it may make sense to only pass immutable data structures.

##  Locking Critical Sections
Your program uses threads and you want to lock critical sections of code to avoid race conditions.

In [8]:
import threading

class SharedCounter:
    def __init__(self, initial_value = 0):
        self._value = initial_value
        self._value_lock = threading.Lock()
 

    def incr(self,delta=1):
        with self._value_lock:
            self._value += delta
 
    def decr(self,delta=1):
        with self._value_lock:
            self._value -= delta


The with statement is more elegant and less prone to error—especially in situations where a programmer might forget to call the release() method or if a program happens to raise an exception while holding a lock.

## Locking with Deadlock Avoidance
You’re writing a multithreaded program where threads need to acquire more than one lock at a time while avoiding deadlock.

For instance, if a thread acquires the first lock, but then blocks trying to acquire the second lock, that thread can potentially block the progress of other threads and make the program freeze. We can enforce an ordering rule that only allows multiple locks to be acquired in ascending order.

See [treading.local()](https://docs.python.org/3/library/threading.html#thread-local-data).

In [2]:
import threading

_local = threading.local()
_local

<_thread._local at 0x22229904410>

In [1]:
import threading
from contextlib import contextmanager

# thread-local state to stored information on locks already acquired
_local = threading.local()

@contextmanager
def acquire(*locks):
    # sort locks by object identifier
    locks = sorted(locks, key=lambda x: id(x))
 
    # make sure lock order of previously acquired locks is not violated
    acquired = getattr(_local,'acquired',[])
    if acquired and max(id(lock) for lock in acquired) >= id(locks[0]):
        raise RuntimeError('Lock Order Violation')
    
    # acquire all of the locks
    acquired.extend(locks)
    _local.acquired = acquired

    try:
        for lock in locks:
            lock.acquire()
        yield
    finally:
        # Release locks in reverse order of acquisition
        for lock in reversed(locks):
            lock.release()
        del acquired[-len(locks):]


To use this context manager, you simply allocate lock objects in the normal way, but use the acquire() function whenever you want to work with one or more locks.

In [4]:
import threading
x_lock = threading.Lock()
y_lock = threading.Lock()

def thread_1():
    count = 0
    while True:
        count += 1
        with acquire(x_lock, y_lock):
            print('Thread-1')
        if count >= 3:
            break

def thread_2():
    count = 0
    while True:
        count += 1
        with acquire(y_lock, x_lock):
            print('Thread-2')
        if count >= 3:
            break

t1 = threading.Thread(target=thread_1)
t1.daemon = True
t1.start()

t2 = threading.Thread(target=thread_2)
t2.daemon = True
t2.start()

Thread-1
Thread-1
Thread-1
Thread-2
Thread-2
Thread-2


The issue of deadlock is a well-known problem with programs involving threads (as well as a common subject in textbooks on operating systems). As a rule of thumb, as long as you can ensure that threads can hold only one lock at a time, your program will be deadlock free. However, once multiple locks are being acquired at the same time, all bets are off.

## Storing Thread-Specific State
To do this, create a thread-local storage object using threading.local(). Attributes stored and read on this object are only visible to the executing thread and no others.

In [5]:
from socket import socket, AF_INET, SOCK_STREAM
import threading

class LazyConnection:
    def __init__(self, address, family=AF_INET, type=SOCK_STREAM):
        self.address = address
        self.family = AF_INET
        self.type = SOCK_STREAM
        self.local = threading.local()
 
    def __enter__(self):
        if hasattr(self.local, 'sock'):
            raise RuntimeError('Already connected')
        self.local.sock = socket(self.family, self.type)
        self.local.sock.connect(self.address)
        return self.local.sock
 
    def __exit__(self, exc_ty, exc_val, tb):
        self.local.sock.close()
        del self.local.sock


In [7]:
from functools import partial

def test(conn):
    with conn as s:
        s.send(b'GET /index.html HTTP/1.0\r\n')
        s.send(b'Host: www.python.org\r\n')
        s.send(b'\r\n')
        resp = b''.join(iter(partial(s.recv, 8192), b''))
    print('Got {} bytes'.format(len(resp)))


conn = LazyConnection(('www.python.org', 80))
t1 = threading.Thread(target=test, args=(conn,))
t2 = threading.Thread(target=test, args=(conn,))
t1.start()
t2.start()
t1.join()
t2.join()


Got 400 bytes
Got 400 bytes


In [8]:
t1

<Thread(Thread-14, stopped 15412)>

In [9]:
t1.is_alive()

False

## Creating a Thread Pool
The concurrent.futures library has a ThreadPoolExecutor class that can be used for this purpose.

In [32]:
from concurrent.futures import ThreadPoolExecutor
import threading
import random


def task():
    print("Executing our Task")
    result = 0
    i = 0
    for i in range(int(1e6)):
        result = result + i
    print("I: {}".format(result))
    print("Task Executed {}".format(threading.current_thread()))

    
executor = ThreadPoolExecutor(max_workers=3)
task1 = executor.submit(task)
task2 = executor.submit(task)

Executing our Task
Executing our Task
I: 499999500000
Task Executed <Thread(ThreadPoolExecutor-12_1, started daemon 12084)>
I: 499999500000
Task Executed <Thread(ThreadPoolExecutor-12_0, started daemon 15032)>


Or for use as a context manager:

In [33]:
from concurrent.futures import ThreadPoolExecutor

def task(n):
    print("Processing {}".format(n))


print("Starting ThreadPoolExecutor")
with ThreadPoolExecutor(max_workers=3) as executor:
    executor.submit(task, (2))
    executor.submit(task, (3))
    executor.submit(task, (4))

print("All tasks complete")

Starting ThreadPoolExecutor
Processing 2
Processing 3
Processing 4
All tasks complete


## Performing Simple Parallel Programming
The concurrent.futures library provides a ProcessPoolExecutor class that can be used to execute computationally intensive functions in a separately running instance of the Python interpreter. 

In [35]:
from concurrent.futures import ProcessPoolExecutor
import os


def task():
    print("Executing our Task on Process {}".format(os.getpid()))

    
executor = ProcessPoolExecutor(max_workers=3)
task1 = executor.submit(task)
task2 = executor.submit(task)

In [36]:
from concurrent.futures import ProcessPoolExecutor
import os

def task():
    print("Executing our Task on Process: {}".format(os.getpid()))


with ProcessPoolExecutor(max_workers=3) as executor:
    task1 = executor.submit(task)
    task2 = executor.submit(task)


##  Dealing with the GIL (and How to Stop Worrying About It)

Although Python fully supports thread programming, parts of the C implementation of the interpreter are not entirely thread safe to a level of allowing fully concurrent execution. In fact, the interpreter is protected by a so-called Global Interpreter Lock
(GIL) that only allows one Python thread to execute at any given time. It is important to emphasize that the GIL tends to only affect programs that are heavily CPU bound (i.e., dominated by computation).

## Defining an Actor Task
The 'actor model' is one of the oldest and most simple approaches to concurrency and distributed computing. In fact, its underlying simplicity is part of its appeal. In a nutshell, an actor is a concurrently executing task that simply acts upon messages sent to it.

In [39]:
from queue import Queue
from threading import Thread, Event

# Sentinel used for shutdown
class ActorExit(Exception):
    pass

class Actor:
    def __init__(self):
        self._mailbox = Queue()
 
    def send(self, msg):
        self._mailbox.put(msg)
 
    def recv(self):
        msg = self._mailbox.get()
        if msg is ActorExit:
            raise ActorExit()
        return msg
 
    def close(self):
        self.send(ActorExit)
 
    def start(self):
        self._terminated = Event()
        t = Thread(target=self._bootstrap)
        t.daemon = True
        t.start()

    def _bootstrap(self):
        try:
            self.run()
        except ActorExit:
            pass
        finally:
            self._terminated.set()
    
    def join(self):
        self._terminated.wait()
     
    def run(self):
        while True:
            msg = self.recv()


In [40]:
# Sample ActorTask
class PrintActor(Actor):
    def run(self):
        while True:
            msg = self.recv()
            print('Got:', msg)


In [41]:
p = PrintActor()
p.start()
p.send('Hello')
p.send('World')
p.close()
p.join()

Got: Hello
Got: World


## Implementing Publish/Subscribe Messaging
To implement publish/subscribe messaging, you typically introduce a separate 'exchange' or 'gateway' object that acts as an intermediary for all messages.

In [1]:
from collections import defaultdict

class Exchange:
    def __init__(self):
        self._subscribers = set()

    def attach(self, task):
        self._subscribers.add(task)
 
    def detach(self, task):
        self._subscribers.remove(task)
 
    def send(self, msg):
        for subscriber in self._subscribers:
            subscriber.send(msg)


In [2]:
# dictionary of all created exchanges
_exchanges = defaultdict(Exchange)

# return the exchange instance associated with a given name
def get_exchange(name):
    return _exchanges[name]


In [3]:
class Task:
    def __init__(self, name):
        self.name = name

    def send(self, msg):
        print(self.name, "received", msg)

task_a = Task("task_a")
task_b = Task("task_b")

exc = get_exchange('name')
exc.attach(task_a)
exc.attach(task_b)


In [4]:
exc.send('msg1')

task_a received msg1
task_b received msg1


In [5]:
exc.send('msg2')

task_a received msg2
task_b received msg2


In [6]:
# Example of unsubscribing
exc.detach(task_a)
exc.detach(task_b)

The concept of tasks or threads sending messages to one another (often via queues) is easy to implement and quite popular. However, the benefits of using a public/subscribe (pub/sub) model instead are often overlooked.

Here is a simple diagnostic class that would display sent messages:

In [7]:
class DisplayMessages:
    def __init__(self):
        self.count = 0
 
    def send(self, msg):
        self.count += 1
        print('msg[{}]: {!r}'.format(self.count, msg))

exc = get_exchange('name')
d = DisplayMessages()
exc.attach(d)

In [8]:
exc.send("some")

msg[1]: 'some'


In [9]:
exc.send("data")

msg[2]: 'data'


## Using Generators As an Alternative to Threads
You want to implement concurrency using generators (coroutines) as an alternative to system threads. This is sometimes known as user-level threading or green threading.

The fundamental behavior of yield is that it causes a generator to suspend its execution. By suspending execution, it is possible to write a scheduler that treats generators as a kind of 'task' and alternates their execution using a kind of cooperative task switching. Also see [deque](https://docs.python.org/2/library/collections.html#collections.deque).

In [1]:
def countdown(n):
    while n > 0:
        print('T-minus', n)
        yield
        n -= 1
    print('Blastoff!')


def countup(n):
    x = 0
    while x < n:
        print('Counting up', x)
        yield
        x += 1


In [2]:
from collections import deque

class TaskScheduler:
    def __init__(self):
        self._task_queue = deque()
 
    def new_task(self, task):
        self._task_queue.append(task)
 
    def run(self):
        while self._task_queue:
            task = self._task_queue.popleft()
            try:
                # run until the next yield statement
                next(task)
                self._task_queue.append(task)
            except StopIteration:
                pass


In [3]:
sched = TaskScheduler()
sched.new_task(countdown(10))
sched.new_task(countdown(5))
sched.new_task(countup(15))
sched.run()

T-minus 10
T-minus 5
Counting up 0
T-minus 9
T-minus 4
Counting up 1
T-minus 8
T-minus 3
Counting up 2
T-minus 7
T-minus 2
Counting up 3
T-minus 6
T-minus 1
Counting up 4
T-minus 5
Blastoff!
Counting up 5
T-minus 4
Counting up 6
T-minus 3
Counting up 7
T-minus 2
Counting up 8
T-minus 1
Counting up 9
Blastoff!
Counting up 10
Counting up 11
Counting up 12
Counting up 13
Counting up 14


In [5]:
sched._task_queue

deque([])

Generator functions are the tasks and the yield statement is how tasks signal that they want to suspend. The scheduler simply cycles over the tasks until none are left executing. For a tread free version of actors:

In [20]:
from collections import deque

class ActorScheduler:
    def __init__(self):
        self._actors = {}  # mapping of names to actors
        self._msg_queue = deque() # message queue
 
    def new_actor(self, name, actor):
        self._msg_queue.append((actor,None))
        self._actors[name] = actor
 
    def send(self, name, msg):
        actor = self._actors.get(name)
        if actor:
            self._msg_queue.append((actor,msg))
 
    def run(self):
        while self._msg_queue:
            actor, msg = self._msg_queue.popleft()
            try:
                actor.send(msg)
            except StopIteration:
                pass


In [21]:
def printer():
    while True:
        msg = yield
        print('Got:', msg)

def counter(sched):
    while True:
        n = yield
        if n == 0:
            break

        sched.send('printer', n)
        # Send the next count to the counter task (recursive)
        sched.send('counter', n-1)
    
    
sched = ActorScheduler()

# Create the initial actors
sched.new_actor('printer', printer())
sched.new_actor('counter', counter(sched))

# Send an initial message to the counter to initiate
sched.send('counter', 7)
sched.run()


Got: 7
Got: 6
Got: 5
Got: 4
Got: 3
Got: 2
Got: 1


## Polling Multiple Thread Queues