# init

In [3]:
import logging
import threading
import time
import concurrent.futures
import random


# Multithreading

## producer - pipeline - consumer

* Threads wrap functions. thread.start causes a thread to start executing a function, whilst join blocks until the thread is done executing.
* Python's default programming model has the main thread wait for all spawned threads to finish executing before exiting. It is possible to not wait on a thread to finish by setting the daemon flag instead.
* The simplest way to manage concurrency in threads is to start a list of threads and then join them sequentially. This is fine, but there is a built-in that does this for you: `concurrent.futures.ThreadPoolExecutor`. That looks thus:

> `with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
      executor.map(thread_function, range(3))
`
* There is some confusing aspects of how this API works and is run. In particular, errors in the function signature of thread_function will be swallowed without an error being raised.

* The `GIL` held by a thread will always be released during I/O operations, time.sleep, and certain computationally intensive operations in e.g. numpy (there might be other operations but these are the two that I'm aware of). These are scenarios in which the thread gives the GIL up voluntarily. In other cases, the thread manager will release the GIL, and a new thread will apply it; these are free to occur at any time step in the code. so these are the occassions during which race conditions are possible.
* Data is __thread-safe__ if:
n It is data local to the thread, e.g. it is not shared memory.
> * It is shared memory but the data structure itself is thread-safe. For example, a queue, or an append-only data structure with no total ordering.
> * To avoid race conditions during non thread-safe segments, take a lock with `threading.Lock()`. You can acquire and release this lock using a `context manager` or using `l.acquire()` and then `l.release()`. It has a `blocking=True` argument and a `timeout`. Using a non-blocking lock will cause the lock to return False if the lock cannot be acquired (because it is held by another thread), allowing you to do the required logic to avoid deadlocks yourself.
* That gets us to the next, more complex example:

In [2]:
import random
import concurrent.futures

SENTINEL = object()


class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """

    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, who):
        logging.debug("%s:about to acquire getlock", who)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", who)
        message = self.message
        logging.debug("%s:about to release setlock", who)
        self.producer_lock.release()
        logging.debug("%s:setlock released", who)
        return message

    def set_message(self, message, who):
        logging.debug("%s:about to acquire setlock", who)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", who)
        self.message = message
        logging.debug("%s:about to release getlock", who)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", who)


def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")


def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)


format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
                    datefmt="%H:%M:%S")

pipeline = Pipeline()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, pipeline)
    executor.submit(consumer, pipeline)


12:48:39: Producer got message: 68
12:48:39: Producer got message: 84
12:48:39: Consumer storing message: 68
12:48:39: Producer got message: 72
12:48:39: Consumer storing message: 84
12:48:39: Producer got message: 59
12:48:39: Consumer storing message: 72
12:48:39: Producer got message: 14
12:48:39: Consumer storing message: 59
12:48:39: Producer got message: 48
12:48:39: Consumer storing message: 14
12:48:39: Producer got message: 41
12:48:39: Consumer storing message: 48
12:48:39: Producer got message: 24
12:48:39: Consumer storing message: 41
12:48:39: Producer got message: 19
12:48:39: Consumer storing message: 24
12:48:39: Producer got message: 82
12:48:39: Consumer storing message: 19
12:48:39: Consumer storing message: 82


## producer - consumer by Queue

In [None]:

import queue # FIFO

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")


def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            f"Consumer storing message: {message} (size={queue.qsize()})"
        )

    logging.info("Consumer received event. Exiting")


format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
                    datefmt="%H:%M:%S")

pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(producer, pipeline, event)
    executor.submit(consumer, pipeline, event)

    time.sleep(0.1)
    logging.info("Main: about to set event")
    event.set()


In [6]:
print(pipeline, pipeline.qsize())

<queue.Queue object at 0x0000020CF34BC6D0> 0
