<div style="color:red;background-color:black">
Diamond Light Source

<h1 style="color:red;background-color:antiquewhite"> Python Fundamentals: Threading</h1>  

©2000-20 Chris Seddon 
</div>

## 1
Execute the following cell to activate styling for this tutorial

In [None]:
from IPython.display import HTML
HTML(f"<style>{open('my.css').read()}</style>")

## What is a Thread
A thread is a separate flow of execution through your code. This means that different threads may be running the same code at different times, but the again they may be executing entirely different code.

Threads are used when you want to run multiple sections of code concurrently.  In most programming languages, threads can be run on different CPUs to achieve true concurrency, but often time slicing on the same CPU is used to create apparent concurrency.  When multiple CPUs are used, threading can greatly spped up code.

Most Python programs are run using CPython which is the default implementation of Python.  However, CPython is not thread safe.  What this means is that CPython creates a Global Interface Lock (GIL) each time a thread is run, effectively serialising threaded code.

In practice, threading is still useful for concurrent tasks, but your code won't necessarily run faster.  IO-bound tasks spend a lot of time waiting for data to be ready.  For these tasks there is a real speed benefit, by switching to running code in another thread, when the the current thread becomes I/O bound.  However, for CPU-bound tasks, switching threads won't speed things up because no threads are waiting.

CPython has always been single threaded and it is highly unlikely that this will ever change.  PyPy is the other popular implementation of Python, but that too has a GIL.  The good news is that code that uses C/C++, Numpy or Cython may well support multithreaded CPU-bound tasks.

If you want to run concurrent CPU-bound Python code, you should check out the multiprocessing module instead.

## 1 Creating Threads
Recall that threads are used to perform concurrent tasks.  Threads are ultimately created by the operating system (kernel), but as far as we are concerned we make a Python call to start a thread; the Python interpreter then contacts the kernel.

Python provides a helper class to manage threads.  Rather confusingly, this class is called "Thread".  Objects of this class are NOT threads, just helper objects!

All programs start with a single thread (often called the main thread).  When the main thread wants to create further threads, it creates objects of the helper class and calls their "start" method:<pre>thread1.start()
thread2.start()
thread3.start()</pre>

Realize that when the new threads start, they need to perform a different task (or function) from the main thread.  This task is specified as a parameter when the main thread creates the helper objects:<pre>thread1 = Thread(target=myfunc, args=("1",))
thread2 = Thread(target=myfunc, args=("2",))
thread3 = Thread(target=myfunc, args=("3",))</pre>
Creating the helper objects DOES NOT create any threads - calling the start method creates and starts a thread.

After the "start" method has been called, execution of the main thread and the other threads continues in parallel.  Because the operating system may suspend threads at any time, it is not possible to predict which order code will execute unless we use special synchronization objects.

In this example the main thread creates 3 other threads which all execute the "myfunc" function.  Each of these threads terminate when they exit this function.  I've added some random timings to emphasize the parallel nature of this program:

In [None]:
import random
import time
import sys

from threading import Thread

def myfunc(name):
    for i in range (1, 50):
        sys.stdout.write(name)        
        time.sleep(random.random() * 0.1)      

# define a callback function - to be called via start()
thread1 = Thread(target=myfunc, args=("1",))
thread2 = Thread(target=myfunc, args=("2",))
thread3 = Thread(target=myfunc, args=("3",))

thread1.start()
thread2.start()
thread3.start()

print("\nEnd of main Thread") 

## 2 Joining Threads
Note that the main thread is counted a just another thread (it is not special).  However, often programs are designed such that the main thread is the last to complete.  To achieve this, the main thread can wait for the other threads to complete before proceding:<pre>thread1.join()
thread2.join()
thread3.join()</pre>

In [None]:
import random
import time
import sys

from threading import Thread

def myfunc(name):
    for i in range (1, 50):
        sys.stdout.write(name)        
        time.sleep(random.random() * 0.1)      

# define a callback function - to be called via start()
thread1 = Thread(target=myfunc, args=("1",))
thread2 = Thread(target=myfunc, args=("2",))
thread3 = Thread(target=myfunc, args=("3",))

thread1.start()
thread2.start()
thread3.start()

thread1.join()
thread2.join()
thread3.join()

print("\nEnd of main Thread") 

## 3 Using Methods as Callbacks
As an alternative, we can make the callback function a method in a class; this ultimately depends on operator overloading.  

In the previous example the callback function was "myfunc" and after the "start" method is called, Python calls this function:<pre>myfunc()</pre>

Thus Python hits the function pointer with the function call operator:<pre>( )</pre>

Beacause of operator overloading, it is possible to define an object as a callback:<pre>t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))</pre>

and then Python will callback with:<pre>m1()
m2()
m3()</pre>

Now the function call operator "()" is implemented as the "\__call__" method:<pre>    def \__call__(self, name): ...</pre>

So we can use an object as a callback on the understanding that the object's "\__call__" method will be the callback function.

In [None]:
from threading import Thread
import random
import time
import sys


# create a callable class
class MyClass:
    def __init__(self):
        pass
    
    def __call__(self, name):
        for i in range (1, 50):
            sys.stdout.write(name)        
            time.sleep(random.random() * 0.1)    

    
m1 = MyClass()
m2 = MyClass()
m3 = MyClass()

# define a callback class - __call__() to be called via start()
t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print("\nEnd of main")

## 4 Locks
To control parallel threads we can use synchronization classes.  The most import is the "Lock" class.

A "Lock" object or lock will allow only one thread at a time execute code.  A thread acquires a lock with:<pre>lock.acquire()</pre>
and releases a lock with <pre>lock.release()</pre>

These locks are often called monitor locks; they monitor code and only allow one thread at a time execute code between the "acquire" and "release" calls.

In this example, 4 threads execute code in the "\__call__" method, but the monitor lock serializes execution:

In [None]:
from threading import Thread, Lock
import random
import time
import sys


# task for threads
def task(name, lock):
    lock.acquire()        
    for i in range (1, 50):
        sys.stdout.write(name)
        time.sleep(random.random() * 0.1)
    lock.release()    

    
lock = Lock()

t1 = Thread(target = task, args = ("1", lock))
t2 = Thread(target = task, args = ("2", lock))
t3 = Thread(target = task, args = ("3", lock))
t4 = Thread(target = task, args = ("4", lock))

# create 4 threads
t1.start()
t2.start()
t3.start()
t4.start()

## 5 Locks
If we modify the example slightly and create 2 locks, one lock shared by threads 1 and 3 and the other by threads 2 and 4 then the locks will prevent 1 and 3 running simultaneously and similarly with 2 and 4:

In [None]:
from threading import Thread, Lock
import random
import time
import sys


# task for threads
def task(name, lock):
    lock.acquire()        
    for i in range (1, 50):
        sys.stdout.write(name)
        time.sleep(random.random() * 0.1)
    lock.release()    

    
lockA = Lock()
lockB = Lock()

t1 = Thread(target = task, args = ("1", lockA))
t2 = Thread(target = task, args = ("2", lockB))
t3 = Thread(target = task, args = ("3", lockA))
t4 = Thread(target = task, args = ("4", lockB))

# create 4 threads
t1.start()
t2.start()
t3.start()
t4.start()

## 5 Data Contention
Locks can also be used to protect data.  In the program below we create 3 threads.  The threads increment two counters in a loop.  Note that access to "count1" is unprotected, but "count2" is protected by a lock.  

As we will see below, when a thread executes the instruction:<pre>count1 += 1</pre> it is possible that the kernel will suspend the thread in the middle of the instruction.  This can easily result in data corruption.  Before we investigate, let's run the program.  Each count gets incremented 6,000,000 times, but what are he final values of these counts?

In [None]:
from threading import Thread
from threading import Lock
import random
import time
import sys

# 3 threads increment 2 counts ...
# count1 is unprotected
# count2 is protected

class MyClass:
    def __call__(self, name):
        global lock, count1, count2
        for i in range(0, 2*1000*1000):
            count1 += 1
            lock.acquire()
            count2 += 1
            lock.release()

    
lock = Lock()
count1 = 0
count2 = 0

m1 = MyClass()
m2 = MyClass()
m3 = MyClass()

t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))

t1.start()
t2.start()
t3.start()

t1.join()
t2.join()
t3.join()

print(f"count1: {count1}")
print(f"count2: {count2}")

print("\nEnd of main")


## 6 Data Corruption
The data corruption occurs because:<pre>
    count1 += 1</pre>
is not an atomic operation.  We can see this by examining the byte code using the disassembler module:

In [None]:
import dis
dis.dis("x += 1")

## 7 Atomic Instructions
What can happen is that the thread can get suspended by the kernel just after the INPLACE_ADD instruction.  

Suppose "x" is some value, say 700.  Inside the interpreter, the INPLACE_ADD will add 1 to 700 and store 701 in a machine register.  If the thread then gets suspended, this register will be cached by the kernel.  

Other threads will now increment "x" many times.  Let's say "x" ends up with the value 3287 for sake of argument.

Eventually, the original thread will be restarted.  Its registers will be reinstated by the kernel, so it can continue where it left off.  The thread was just about to execute the STORE_NAME instruction; when it does execute the instruction it uses the value 701 from the reinstated register.  This overwrites 3287 with 701, thereby corrupting the count.  That's what happened above.

Conclusion: all non-atomic operations need protecting by locks.

But how do we know if an operation is atomic?  Take a look at the following:

In [None]:
import dis
dis.dis("[2,5,3,6].sort()")        

## 8 Condition Variables
In the above, the "sort" method is executed as a single byte code instruction CALL_METHOD.  This is what makes it atomic.  The Python interpreter cannot suspend a thread part way through a byte code instruction.

The instruction:<pre>count1 += 1</pre>was several byte code instructions and therefore was non-atomic.

This all occurs despite the GIL.  The Python interpreter creates a Global Interpreter Lock (GIL) for the thread, but it releases the lock every 15 msec, but not during an atomic operation.

So operations consisting of a single byte code are thread safe.

Apart from locks, there are some other synchronization primatives to consider.  Let's now look at the producer/consumer code below.  The problem we have here is that the producer will create data for each of the consumers, but it might take some time to do so.  It is important that the consumers don't attempt to use the data before it is available.

We can use a "condition" variable to synchronize the threads:<pre>dataAvailable = threading.Condition()</pre>
The consumers all wait on the "condition" variable:<pre>dataAvailable.wait()</pre>
until the producer is ready to provide the data.  The producer notifies all the consumers that they can proceed with:<pre>dataAvailable.notifyAll()</pre>

In [None]:
import threading
from threading import Thread
import random
import time
import sys

class Producer:
    def __call__(self, dataAvailable):
        print("Producer is obtaining data")
        time.sleep(5)
        with dataAvailable:         # grab the lock
            print("Producer is notifying all consumers")
            dataAvailable.notifyAll()

class Consumer:
    def __call__(self, name, dataAvailable):
        with dataAvailable:
            print(f"consumer{name} is waiting")
            dataAvailable.wait()
            print(f"consumer{name} is has obtained the data")

    
dataAvailable = threading.Condition()

producer = Producer()
consumer1 = Consumer()
consumer2 = Consumer()
consumer3 = Consumer()

# give each thread a lock
t = Thread(target = producer, args = (dataAvailable,))
t1 = Thread(target = consumer1, args = ("1", dataAvailable))
t2 = Thread(target = consumer2, args = ("2", dataAvailable))
t3 = Thread(target = consumer3, args = ("3", dataAvailable))

t.start()
t1.start()
t2.start()
t3.start()

t.join()
t1.join()
t2.join()
t3.join()

print("\nEnd of main")

## 9 Events
Event objects are very similar to condition variables.  

The event object is created by:<pre>event = Event()</pre>
and any thread can wait on the event:<pre>event.wait()</pre>
All waiting threads are released when any thread "sets" the event:<pre>event.set()</pre>

In [None]:
from threading import Thread
from threading import Event
import random
import time
import sys


class MyClass:
    def __call__(self, name):
        global event
        print(f"{name} waiting for event");
        event.wait()
        print(f"\t{name} proceeding after event");


event = Event()

m1 = MyClass()
m2 = MyClass()
m3 = MyClass()

t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))

t1.start()
t2.start()
t3.start()

print("... main waiting for 15 seconds")
time.sleep(15)
print("... main clearing event flag")
event.set()

t1.join()
t2.join()
t3.join()

print("\nEnd of main")

## 10 Bounded Semaphores
Bounded semaphores are like a set of multiple locks.  

A bounded semaphore is created with an initial count:<pre>semaphore = BoundedSemaphore(3)</pre>

Threads can acquire the semaphore by decrementing the count:<pre>semaphore.acquire()</pre>
However the count can never go negative.  So after 3 threads have acquired the semaphore the next thread will be blocked until another thread releases the semaphore and increments the count by one<pre>semaphore.release()</pre>
This continues until all the threads have acquired and released the semaphore.

Thus this bounded semaphore behave as a set of 3 locks.

In [None]:
from threading import Thread
from threading import BoundedSemaphore
import random
import time
import sys


class MyClass:
    def __call__(self, name):
        global semaphore
        semaphore.acquire()
        print((name + " claimed semaphore"));
        time.sleep(5)
        print(("\t" + name + " released semaphore"));
        semaphore.release()



semaphore = BoundedSemaphore(3)

m1 = MyClass()
m2 = MyClass()
m3 = MyClass()
m4 = MyClass()
m5 = MyClass()
m6 = MyClass()
m7 = MyClass()

t1 = Thread(target = m1, args = ("1",))
t2 = Thread(target = m2, args = ("2",))
t3 = Thread(target = m3, args = ("3",))
t4 = Thread(target = m4, args = ("4",))
t5 = Thread(target = m5, args = ("5",))
t6 = Thread(target = m6, args = ("6",))
t7 = Thread(target = m7, args = ("7",))

t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
t6.start()
t7.start()

t1.join()
t2.join()
t3.join()
t4.join()
t5.join()
t6.join()
t7.join()

print("\nEnd of main")

## 11 Barriers
Barriers are yet another synchronization object.  

A barrier is created with a count and a timeout:<pre>b = Barrier(5, timeout=10)</pre>

In this example a server and 4 clients synchronize by waiting on this barrier in their respective threads:<pre>b.wait()</pre>
When all five threads are waiting, the barrier is satisfied and the Python interpreter removes the barrier and all 5 threads continue.

In [None]:
from threading import Thread, Barrier
import time


# In this example a server a 4 clients synchronize by waiting on a barrier
# in their respective threads.  When all five threads are waiting, 
# the barrier is removed and all 5 threads continue.

b = Barrier(5, timeout=10)

class Server:
    def __init__(self):
        print("server initializing ...")
        self.thread = Thread(target=self)
        self.thread.start()

    def __call__(self):
        time.sleep(5)
        b.wait()
        print("server ready to accept connections")
        
    def connect(self, client):
        print(f"{client.name} has connected")
        
class Client:
    def __init__(self, name, server):
        self.name = name
        self.server = server
        print(f"{self.name} waiting to connect")
        self.thread = Thread(target=self)
        self.thread.start()
    
    def __call__(self):
        b.wait()
        self.server.connect(self)

def main():
    server = Server()
    client1 = Client("client1", server)
    client2 = Client("client2", server)
    client3 = Client("client3", server)
    client4 = Client("client4", server)
    
    server.thread.join()
    client1.thread.join()
    client2.thread.join()
    client3.thread.join()
    client4.thread.join()
    
    print("end of program")
    
main()


## 12 Timers
I should mention the simple Timer:

In [None]:
from threading import Timer

def hello():
    print("hello, world")

t = Timer(15.0, hello)
t.start() # after 15 seconds, "hello, world" will be printed

## 13 Benchmarking
Finally, as mentioned in the introduction to this tutorial, with multi threaded CPU-bound programs, the threads are executed sequentially because of the GIL.  Performance then becomes an issue.

It is recommended to use the multiprocessing module to speed thing up in such situations.  We don't use threads in this case, but execute code in separate processes such that the GIL is irrelevant.

It will be interesting to compare a multthreaded program with a mutiprocessing program for timings.  Both programs calculate the value of  

$$\sum i^{0.3}$$  
where i ranges from 0 to 50,000,000.  We can see the performance of both with varying numbers of threads and processes (don't worry to much about the code details):

In [None]:
import time, os
from threading import Thread
from multiprocessing import Process, Pool
import numpy as np
from itertools import chain

''' Calculate the sum of i**0.3 where i ranges from 0 to M
    Use multiple threads or processes to perform the calculation
    Split the calculation into ranges using the intervals function below
'''

M = 50*1000*1000

def calculate(lo, hi):
    '''the calculation to perform'''
    sum = 0
    for i in range (lo, hi):
        sum += float(i)**0.3
    return sum   

def intervals(duration, parts):
    '''splits an interval into several(part) ranges'''
    part_duration = int(duration / parts)
    return [(int(i * part_duration), int((i + 1) * part_duration)) for i in range(parts)]

# calculate the sum using multiple threads
def jobUsingThreads(threadCount):
    threadList = []
    it = intervals(M, threadCount)
    
    for i in range(threadCount):
        t = Thread(target = calculate, args = it[i])
        t.start()
        threadList.append(t)
        
    for t in threadList:
        t.join()

# calculate the sum using multiple threads
def jobUsingProcesses(processCount):
    p = Pool(processes=processCount)
    it = intervals(M, processCount)
    result = p.starmap(calculate, it)

# run job with varying number of processes
for N in chain(range(1, 11), range(20, 101, 20)):
    start = time.perf_counter()
    jobUsingProcesses(N)
    finish = time.perf_counter()
    print(f"{N:2} processes:{finish-start:6.2f}")

# run job with varying number of processes
for N in chain(range(1, 11), range(20, 101, 20)):
    start = time.perf_counter()
    jobUsingThreads(N)
    finish = time.perf_counter()
    print(f"{N:2} threads:{finish-start:6.2f}")