# Python threads 101
- ## Sébastien Tixeuil
- ## LINCS Python Workshop 24 March 2021

# Sources
- <https://realpython.com/intro-to-python-threading/>
- <https://realpython.com/python-concurrency/>

# Basic Threading

In [None]:
import threading
import time

def thread_function(name):
    print(f"Thread {name} starting")
    time.sleep(2)
    print(f"Thread {name} finishing")

print(f"Main: Before starting thread")
x = threading.Thread(target=thread_function, args=(1,))
x.start()
print(f"Main: Thread has started")
x.join()
print(f"Main: Thread is finished")

# From `print` to `logging.info`
- `print` output is often mashed up when used is several threads
- `logging` permits thread-safe output

In [None]:
import logging

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO, datefmt="%H:%M:%S")

logging.info("Main    : before creating thread")
x = threading.Thread(target=thread_function, args=(1,))
logging.info("Main    : before running thread")
x.start()
logging.info("Main    : wait for the thread to finish")
x.join()
logging.info("Main    : all done")

# What if I don't join?

In [None]:
logging.info("Main    : before creating thread")
x = threading.Thread(target=thread_function, args=(1,))
logging.info("Main    : before running thread")
x.start()
#logging.info("Main    : wait for the thread to finish")
#x.join()
logging.info("Main    : all done")

# Many threads are better than one
- creating several threads is not harder
- in order to `join`, one must keep track of created threads

In [None]:
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function, args=(index,))
    logging.info("Main    : before running thread")
    x.start()
    threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

# Sequential counting
- can we improve the performance by using several threads?

In [None]:
def thread_function_count_to(name,count_to):
    logging.info("Thread %s: starting", name)
    count = 0
    for i in range(count_to):
        count += 1
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before counting")
thread_function_count_to(1,160000000)
logging.info("Main    : all done")

# Threaded counting
- split the counting between several threads, so as to use multiple cores in the CPU

In [None]:
def thread_function_count_to(name,count_to):
    logging.info("Thread %s: starting", name)
    count = 0
    for i in range(count_to):
        count += 1
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function_count_to, args=(index,40000000))
    logging.info("Main    : before running thread")
    x.start()
threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

# The Infamous GIL
- <https://realpython.com/python-gil/>

# Sequential, no session

In [None]:
import requests

def download_site(url):
    with requests.get(url) as response:
        logging.info(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
    for url in sites:
        download_site(url)
            
sites = [
        "https://realpython.com/intro-to-python-threading/",
        "https://realpython.com/python-concurrency/",
] * 50
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
logging.info(f"Downloaded {len(sites)} in {duration} seconds")

# Sequential, sessions
- sessions can optimize HTTP related traffic

In [None]:
import requests

def download_site(url, session):
    with session.get(url) as response:
        logging.info(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)
            
sites = [
        "https://realpython.com/intro-to-python-threading/",
        "https://realpython.com/python-concurrency/",
] * 50
start_time = time.time()
download_all_sites(sites)
duration = time.time() - start_time
logging.info(f"Downloaded {len(sites)} in {duration} seconds")

# Multithread, no session

In [None]:
import requests

def download_site(url):
    with requests.get(url) as response:
        logging.info(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
    for url in sites:
        download_site(url)
            
sites = [
        "https://realpython.com/intro-to-python-threading/",
        "https://realpython.com/python-concurrency/",
] * 50

threads = list()
start_time = time.time()
for index in range(5):
    x = threading.Thread(target=download_all_sites, args=(sites[index*20:(index+1)*20],), daemon=True)
    x.start()
    threads.append(x)
logging.info("Main    : wait for the thread to finish")
for thread in threads:   
    thread.join()
duration = time.time() - start_time
logging.info(f"Downloaded {len(sites)} in {duration} seconds")

# Multithread, session
- one can combine threads and sessions

In [None]:
import requests

def download_site(url,session):
    with session.get(url) as response:
        logging.info(f"Read {len(response.content)} from {url}")

def download_all_sites(sites):
    with requests.Session() as session:
        for url in sites:
            download_site(url, session)
            
sites = [
        "https://realpython.com/intro-to-python-threading/",
        "https://realpython.com/python-concurrency/",
] * 50

threads = list()
start_time = time.time()
for index in range(5):
    x = threading.Thread(target=download_all_sites, args=(sites[index*20:(index+1)*20],), daemon=True)
    x.start()
    threads.append(x)
logging.info("Main    : wait for the thread to finish")
for thread in threads:
    thread.join()
duration = time.time() - start_time
logging.info(f"Downloaded {len(sites)} in {duration} seconds")

# Shared variables
- global variables can be shared between threads

In [None]:
value = 0
def thread_function_2(name):
    global value
    logging.info("Thread %s: starting", name)
    value += 1
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function_2, args=(index,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

logging.info(f"Value = {value}")

# More Shared variables

In [None]:
value = 0
def thread_function_3(name):
    global value
    logging.info("Thread %s: starting", name)
    for index in range(1000000):
        value += 1
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function_3, args=(index,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

logging.info(f"Value = {value}")

# Many operations are not atomic

In [None]:
def inc(x):
    x += 1
    
import dis
dis.dis(inc)

# Race condition
- race conditions occur when several threads modify the same variable/object, but the modification is not atomic 

In [None]:
value = 0
def thread_function_4(name):
    global value
    logging.info("Thread %s: starting", name)
    local_value = value
    local_value += 1
    time.sleep(0.1)
    value = local_value
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function_4, args=(index,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

print(f"Value = {value}")

# Basic locks
- a `Lock` permits to make an operation atomic
- first `aquire` lock to make sure we are the only one modifying the variable
- modify the variable
- last, `release` lock to enable other threads to modify the variable as well

In [None]:
value = 0
lock = threading.Lock()

def thread_function_4_lock(name):
    global value
    global lock
    logging.info("Thread %s: starting", name)
    lock.acquire()
    logging.info("Thread %s: acquired lock", name)
    local_value = value
    local_value += 1
    time.sleep(0.1)
    value = local_value
    lock.release()
    logging.info("Thread %s: released lock", name)
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function_4_lock, args=(index,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

print(f"Value = {value}")

# Basic locks are compatible with context managers
- permits exception-safe code with (almost) no headhache
- not suitable for all constructions

In [None]:
value = 0
lock = threading.Lock()

def thread_function_4_lock(name):
    global value
    global lock
    logging.info("Thread %s: starting", name)
    with lock:
        logging.info("Thread %s: acquired lock", name)
        local_value = value
        local_value += 1
        time.sleep(0.1)
        value = local_value
    logging.info("Thread %s: released lock", name)
    logging.info("Thread %s: finishing", name)
    
logging.info("Main    : before creating threads")
threads = list()
for index in range(4):
    x = threading.Thread(target=thread_function_4_lock, args=(index,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    threads.append(x)
for thread in threads:
    logging.info("Main    : wait for the thread to finish")
    thread.join()
logging.info("Main    : all done")

print(f"Value = {value}")

# Producer / Consumer
- important multi-thread construct
- one (or several) **producer** produces values/objects/requests
- one (or several) **consumer** consumes values/objects/requests and executes code accordingly

In [None]:
import random

message = 0
producer_lock = threading.Lock()
consumer_lock = threading.Lock()

def pipeline_init():
    global message, producer_lock, consumer_lock
    message = 0
    consumer_lock.acquire()
    
def consume_message():
    global message, producer_lock, consumer_lock
    consumer_lock.acquire()
    r = message
    producer_lock.release()
    return r

def produce_message( m ):
    global message, producer_lock, consumer_lock
    producer_lock.acquire()
    message = m
    consumer_lock.release()
    
def consumer_task():
    r = consume_message()
    while r != 0:
        logging.info(f"Consumed: {r}")
        r = consume_message()
        
def producer_task():
    m = random.randint(0,10)
    produce_message( m )
    while m != 0:
        m = random.randint(0,10)
        produce_message( m )
        
logging.info("Main    : before creating threads")
pipeline_init()

c = threading.Thread(target=consumer_task)
p = threading.Thread(target=producer_task)

logging.info("Main    : before running thread")
c.start()
p.start()

logging.info("Main    : wait for the thread to finish")
p.join()
c.join()
logging.info("Main    : all done")


# Producer / Consumer with a queue
- `queue.Queue` is thread-safe
- simply use `get` and `put`, the locks will be managed accordingly

In [None]:
import random
import queue

queue = queue.Queue(maxsize=10)
   
def consumer_task():
    global queue
    r = queue.get()
    while r != 0:
        logging.info(f"Consumed: {r}")
        r = queue.get()
        
def producer_task():
    global queue
    m = random.randint(0,10)
    queue.put( m )
    while m != 0:
        m = random.randint(0,10)
        queue.put( m )
        
logging.info("Main    : before creating threads")

c = threading.Thread(target=consumer_task, daemon=True)
p = threading.Thread(target=producer_task, daemon=True)

logging.info("Main    : before running thread")
c.start()
p.start()

logging.info("Main    : wait for the thread to finish")
p.join()
c.join()
logging.info("Main    : all done")


# Producer / **slow** consumer and a queue
- the consumer may slow down if the related code is time consuming

In [None]:
import random
import queue

queue = queue.Queue(maxsize=10)
   
def consumer_task():
    global queue
    r = queue.get()
    while r != 0:
        logging.info(f"Consumed: {r}")
        time.sleep(0.5)
        r = queue.get()
        
def producer_task():
    global queue
    r = 0
    m = random.randint(0,10)
    queue.put( m )
    while m != 0:
        r += 1
        m = random.randint(0,10)
        queue.put( m )
    logging.info(f"Produced {r} values")
        
logging.info("Main    : before creating threads")

c = threading.Thread(target=consumer_task, daemon=True)
p = threading.Thread(target=producer_task, daemon=True)

logging.info("Main    : before running thread")
c.start()
p.start()

logging.info("Main    : wait for the thread to finish")
p.join()
c.join()
logging.info("Main    : all done")


# Producer with many **slow** consumers and a queue

In [None]:
import random
import queue

queue = queue.Queue(maxsize=10)
   
def consumer_task():
    global queue
    r = queue.get()
    while r != 0:
        logging.info(f"Consumed: {r}")
        time.sleep(0.5)
        r = queue.get()

def producer_task():
    global queue
    r = 0
    m = random.randint(0,10)
    queue.put( m )
    while m != 0:
        r += 1
        m = random.randint(0,10)
        queue.put( m )
    logging.info(f"Produced {r} values")
        
logging.info("Main    : before creating threads")
threads = list()
for index in range(5):
    c = threading.Thread(target=consumer_task, daemon=True)
    c.start()
    threads.append(c)
p = threading.Thread(target=producer_task, daemon=True)
p.start()
threads.append(p)

logging.info("Main    : wait for the threads to finish")
for thread in threads:
    thread.join()
logging.info("Main    : all done")


- Only one consumer thread exited, the others are stuck on `get`
- As a result, the main thread is stuck on `join`

# Producer with many **slow** consumers and a queue with `timeout`

In [None]:
import random
import queue
from queue import Empty as Empty

queue = queue.Queue(maxsize=10)
   
def consumer_task():
    global queue
    try:
        r = queue.get(timeout=5)
        while r != 0:
            logging.info(f"Consumed: {r}")
            time.sleep(0.5)
            r = queue.get(timeout=5)
    except Empty:
        logging.info(f"Queue was empty for a while, exiting thread")
        
def producer_task():
    global queue
    r = 0
    m = random.randint(0,10)
    queue.put( m )
    while m != 0:
        r += 1
        m = random.randint(0,10)
        queue.put( m )
    logging.info(f"Produced {r} values")
        
logging.info("Main    : before creating threads")
threads = list()
for index in range(5):
    c = threading.Thread(target=consumer_task, daemon=True)
    c.start()
    threads.append(c)
p = threading.Thread(target=producer_task, daemon=True)
p.start()
threads.append(p)

logging.info("Main    : wait for the threads to finish")
for thread in threads:
    thread.join()
logging.info("Main    : all done")
