# Python Multithreading


Copyright (c) 2016-2017 Duc Nguyen. Released under GPLv3.

Note:
- *This script is written for Python 3.5. Other Python verions may not work as expected.*
- *Each code block is independent from each other, to run an example, just run a code block, you don't need to run from top to bottom*

Content:
- Create new thread
- Data sharing between threads
- Threads synchronization
    + Events
    + Lock
    + Barrier
    + Condition
    + Semaphore
- Helper functions

## Create new thread

New thread can easily be created by initiating a new `threading.Thread` instance. There are two ways to initiate and run a thread:

- Define a function and assign that function to parameter `target` of the new thread, then run the Thread by calling its `start()` method. If the assigned function takes any arguments, include those arguments inside a tuple and assign that tuple to `args`
- Subclass `Thread.run()`, initiate that subclass and then run that subclass with `.start()`. Calling `.start()` will execute the thread's `run()`

In [4]:
import threading
import time

def worker(thread_name, counter):
    """ Print the thread name every 1 second for *counter* times """
    for _idx in range(counter):
        time.sleep(1)
        print("{} - {}".format(thread_name, _idx))

class DummyThread(threading.Thread):
    def __init__(self, name, counter):
        super(DummyThread, self).__init__()
        self.name = name
        self.counter = counter
    def run(self):
        for _idx in range(self.counter):
            time.sleep(1)
            print("{} (initialized from DummyThread) - {}".format(self.name, _idx))
            
# Create thread0 by assigning function to `target`, and assign the parameters to function through `args`
thread0 = threading.Thread(target=worker, args=("Thread 0", 2))
thread0.start()

# Create thread1 by initialize DummyThread
thread1 = DummyThread("Thread 1", 5)
thread1.start()

Thread 1 (initialized from DummyTrhead) - 0
Thread 0 - 0
Thread 1 (initialized from DummyTrhead) - 1
Thread 0 - 1
Thread 1 (initialized from DummyTrhead) - 2
Thread 1 (initialized from DummyTrhead) - 3
Thread 1 (initialized from DummyTrhead) - 4


### Threads can also be created and started from other working threads

In the below example, a thread named "Inside thread" is created from "Thread 0". Notice that even though "Inside thread" is created inside "Thread 0", "Thread 0" can actually terminates before "Inside thread". An intuition can be obtained from this is that each thread can be considered as an independent code execution.

In [5]:
import threading
import time

def worker(thread_name, counter):
    """ Print the thread name every 1 second for *counter* times """
    for _idx in range(counter):
        time.sleep(1)
        print("{} - {}".format(thread_name, _idx))
    print("{} is exitting...".format(thread_name))

class DummyThread(threading.Thread):
    
    def __init__(self, name, counter):
        super(DummyThread, self).__init__()
        self.name = name

    def run(self):
        print("{} is running".format(self.name))
        print("Creating a new thread from {}...".format(self.name))
        thread1 = threading.Thread(target=worker, args=("Inside thread", 5))
        thread1.start()
        print("{} is exitting...".format(self.name))
            
# Create thread0 by initialize DummyThread
thread0 = DummyThread("Thread 0", 5)
thread0.start()

Thread 0 is running
Creating a new thread from Thread 0...
Thread 0 is exitting...
Inside thread - 0
Inside thread - 1
Inside thread - 2
Inside thread - 3
Inside thread - 4
Inside thread is exitting...


## Data sharing between threads

Spawned threads can access data from thread that defines them. As a result, to share data between threads, just create that mutual data inside the scope that defines all involving threads (e.g the main thread)

#### Example 1:

In [15]:
import threading

arbitrary_data = 1

def worker(thread_name):
    """ Print the thread name and the value of global's arbitrary_data """
    print("{} - arbitrary_data: {} (worker thread can access global variables)".format(thread_name, arbitrary_data))

    
thread0 = threading.Thread(target=worker, args=("Thread 0",))
thread0.start()

# Increment arbitrary_data from global scope
arbitrary_data += 1
thread1 = threading.Thread(target=worker, args=("Thread 1",))
thread1.start()

Thread 0 - arbitrary_data: 1 (worker thread can access global variables)
Thread 1 - arbitrary_data: 2 (worker thread can access global variables)


#### Example 2:

The DummyThread0_child is defined inside global scope (since `worker()` is defined as a global function). As a result, DummyThread0_child accesses `arbitrary_data` from global scope, rather than from `DummyThread0` scope.

The DummyThread1_child is defined inside `DummyThread1` scope (since `worker()` is defined as a local function). As a result, DummyThread1_child will first try to access `arbitrary_data` from its local scope.

The DummyThread2_child is defined inside `DummyThread2` scope. As a result, DummyThread2_child will first try to access `arbitrary_data` from its local scope. However, since `DummyThread2.run` does not have `arbitrary_data` in its local namespace, DummyThread2_child then tries to find `arbitrary_data` in the parent scope of `DummyThread2`.

In [23]:
import threading

arbitrary_data = "data from main thread"

def worker(thread_name):
    """ Print the thread name and the value of arbitrary_data """
    print("{} - Arbitrary data: {}".format(thread_name, arbitrary_data))
    
class DummyThread0(threading.Thread):
    
    def __init__(self, name):
        super(DummyThread0, self).__init__()
        self.name = name

    def run(self):
        arbitrary_data = "data from non-main thread" 
        dummythread0_child = threading.Thread(target=worker, args=("DummyThread0_child",))
        dummythread0_child.start()
    
    
class DummyThread1(threading.Thread):
    
    def __init__(self, name):
        super(DummyThread1, self).__init__()
        self.name = name

    def run(self):
        arbitrary_data = "data from non-main thread"
        
        # worker definition is the only difference between DummyThread0 and DummyThread1
        def worker(thread_name):
            """ Print the thread name and the value of arbitrary_data """
            print("{} - Arbitrary data: {}".format(thread_name, arbitrary_data))
        
        dummythread1_child = threading.Thread(target=worker, args=("DummyThread1_child",))
        dummythread1_child.start()
        
class DummyThread2(threading.Thread):
    
    def __init__(self, name):
        super(DummyThread2, self).__init__()
        self.name = name

    def run(self):        
        
        # does not have local arbitrary_data
        
        def worker(thread_name):
            """ Print the thread name and the value of arbitrary_data """
            print("{} - Arbitrary data: {}".format(thread_name, arbitrary_data))
        
        dummythread2_child = threading.Thread(target=worker, args=("DummyThread2_child",))
        dummythread2_child.start()
        

thread0 = DummyThread0("Thread 0")
thread0.start()

thread1 = DummyThread1("Thread 1")
thread1.start()

thread2 = DummyThread2("Thread 2")
thread2.start()

DummyThread0_child - Arbitrary data: data from main thread
DummyThread1_child - Arbitrary data: data from non-main thread
DummyThread2_child - Arbitrary data: data from main thread


### Shared primitive data cannot be modified by worker threads

Worker threads cannot modify primitive data from the parent scope (just like function). Attempt to do so will create a thread-local variable, or will raise an UnboundLocalError.

In [24]:
import threading

arbitrary_data = 1

def worker_increment(thread_name):
    """ Modify arbitrary data """
    try:
        arbitrary_data += 1
        print("{} - Arbitrary data: {}".format(thread_name, arbitrary_data))
    except UnboundLocalError:
        print("UnboundLocalError was raised")
        
def worker(thread_name):
    """ Set the arbitrary data """
    arbitrary_data = 10
    print("{} - Set arbitrary data to {}".format(thread_name, arbitrary_data))
        
thread0 = threading.Thread(target=worker_increment, args=("Thread increment",))
thread0.start()

thread1 = threading.Thread(target=worker, args=("Thread set",))
thread1.start()

# Temporarily block the main thread until both thread0 and thread1 terminate
thread0.join()
thread1.join()

print("arbitrary_data from main thread: {}".format(arbitrary_data))

UnboundLocalError was raised
Thread set - Set arbitrary data to 10
arbitrary_data from main thread: 1


### Shared objects can be modified by worker threads

Worker threads can access and modify object data from parent scope (just like function).

In [28]:
import threading

arbitrary_data = []

def worker_append(thread_name, value):
    """ Append `value` to `arbitrary_data` """
    arbitrary_data.append(value)
    
def worker_len():
    """ Check the length of `arbitrary_data """
    print("(Worker thread) Length arbitrary_data: {}".format(len(arbitrary_data)))

print("Length arbitrary_data before: {}".format(len(arbitrary_data)))
thread0 = threading.Thread(target=worker_append, args=("Thread append", 1))
thread0.start()

# Temporarily block main thread until thread0 terminates
thread0.join()
print("Length arbitrary_data after: {}".format(len(arbitrary_data)))

thread1 = threading.Thread(target=worker_len)
thread1.start()

Len arbitrary_data before: 0
Len arbitrary_data after: 1
(Worker thread) Length arbitrary_data: 1


## Thread synchronization

### Events

The idea: several threads share the same `Event` object. This `Event` object in its core is just a boolean value that has `.set()`, `.clear()`, `.wait()` methods. `.set()` will set the value of `Event` object to True, `.clear()` will set the value of `Event` objet to False. Whenever a thread has `Event.wait()`, and the value of `Event` object at that time is False, then that thread will stop until the value of that object is True (probably by other running threads). If the thread has `Event.wait()`, and the value of `Event` object at that time is True, then that thread continues to run as normal

In [7]:
import threading
import time

def worker(ev):
    """ Count and print """
    for _idx in range(10):
        ev.wait()
        time.sleep(1)
        print("Worker thread is running: {}".format(_idx))
    print("Worker thread is exitting...")

event = threading.Event()
        
thread0 = threading.Thread(target=worker, args=(event,))
thread0.start()

print("Worker thread is started")
print("Initially the event flag is false, and worker thread has event.wait() "
      "so the code in worker thread is stuck at wait(). It will not do anything"
      " until event is set to True")
time.sleep(2)

event.set()
print("Main thread: setting event to true with .set()")
time.sleep(5)

event.clear()
print("Main thread: setting event to false with .clear(). Worker thread should pause now")
time.sleep(10)

event.set()
print("Main thread: reset event to True")

Worker thread is started
Initially the event flag is false, and worker thread has event.wait() so the code in worker thread is stuck at wait(). It will not do anything until event is set to True
Main thread: setting event to true with .set()
Worker thread is running: 0
Worker thread is running: 1
Worker thread is running: 2
Worker thread is running: 3
Main thread: setting event to false with .clear(). Worker thread should pause now
Worker thread is running: 4
Main thread: reset event to True
Worker thread is running: 5
Worker thread is running: 6
Worker thread is running: 7
Worker thread is running: 8
Worker thread is running: 9
Worker thread is exitting...


### Lock

The idea: several threads share the same `Lock` object. A `Lock` object can be either of the two states: locked (set by `.acquire()`) or unlocked (set by `.release()`). When a lock is unlocked, and a thread calls `.acquire()`, the lock changes to locked and other threads can not acquire that lock until the current thread `.release()` the lock. When a lock is already locked, and a thread calls `.acquire()`, that thread will pause until the thread that currently has the lock releases it.

This mechanism is useful to deal with race condition (e.g. to avoid several threads modifying a data object at the same time, acquire a lock before modifying that object and release that lock when finishes modifying the object -> this way only 1 thread can access the data object at a single time)

Lock can also be used in context manager. Given the below code, change put the code between `lock.acquire()` and `lock.release()` inside `with lock:`

In [10]:
import threading
import time
import queue

tasks = queue.Queue(10)

def worker(lock):
    """ Get the task and print it """
    for _idx in range(10):
        time.sleep(1)
        lock.acquire()
        print("    Worker: get {}".format(tasks.get()))
        lock.release()

def master(lock):
    """ Add task to the tasks queue """
    for _idx in range(10):
        time.sleep(0.5)
        lock.acquire()
        print("Master: put {}".format(_idx))
        tasks.put(_idx)
        lock.release()
        
lock = threading.Lock()

thread0 = threading.Thread(target=master, args=(lock,))
thread0.start()

thread1 = threading.Thread(target=worker, args=(lock,))
thread1.start()

Master: put 0
    Worker: get 0
Master: put 1
    Worker: get 1
Master: put 2
    Worker: get 2
Master: put 3
    Worker: get 3
Master: put 4
    Worker: get 4
Master: put 5
    Worker: get 5
Master: put 6
    Worker: get 6
Master: put 7
    Worker: get 7
Master: put 8
    Worker: get 8
Master: put 9
    Worker: get 9


### Condition

The condition variable is like a combination of lock and event. The basic idea is to set up blocks of code that only 1 thread can run at a time (like lock), but to allow one thread to intentionally pause until another thread allows the former to continue (like event).

### Barrier

Barrier is used to make threads started at different time synchronize with each other. Specifically, threads are blocked until the barrier condition is satisfied. After that, all blocked threads run as normal.

### Semaphore

Semaphore is used to limit the number of concurrent threads accessing a common resource. Basic idea: put the resource inside a semaphore inside thread function, if the number of threads accessing that object is equal to the semaphore value, then other threads trying to access that resources will temporarily be blocked until the number of threads decreases below the semaphore level.

## Helper functions

### Determine the current threads

To get the thread object the current piece of code is working on, use the helper function `threading.current_thread()`. This function will return the thread object that calls this function.

In [2]:
import threading
import time

def worker():
    """ Just print the current thread """
    print("Worker thread: {}".format(threading.current_thread()))
    
thread0 = threading.Thread(target=worker)
thread0.start()

thread0.join()
print("Main thread: {}".format(threading.current_thread()))

thread1 = threading.Thread(target=worker)
thread1.start()

Worker thread: <Thread(Thread-5, started 123145330724864)>
Main thread: <_MainThread(MainThread, started 140735226400768)>
Worker thread: <Thread(Thread-6, started 123145330724864)>


### Mark a thread as daemon

By default, any thread specifically created by `threading.Thread` is non-daemon. A program cannot exit when any non-daemon thread is still working. On the other hand, daemon threads will automatically terminate if the program exits. This makes daemon threads appropriate for trivial tasks that can be done in parallel. To make a thread daemon, set `daemon` parameter/attribute to True.

In [6]:
# NOTE: do not run this script inside jupyter notebook, because there will always be non-daemon thread running

import threading
import time

def worker(infinite):
    """ Run infinitely or not """
    if infinite:
        print("This code never terminates, but it is in a daemon thread, so "
              "whenever the program exits, this thread will also terminate")
        _idx = 0
        while True:
            _idx += 1
            time.sleep(1)
            print("Daemon, {}".format(_idx))
    else:
        print("This code is not in a daemon thread, whenever this code terminates"
              ", and no non-daemon thread running, the program exits")
        for _idx in range(5):
            time.sleep(1)
            
thread0 = threading.Thread(target=worker, args=(True,), daemon=True)
thread0.start()

thread1 = threading.Thread(target=worker, args=(False,))
thread1.start()

This code never terminates, but it is in a daemon thread, so whenever the program exits, this thread will also terminate
This code is not in a daemon thread, whenever this code terminates, and no non-daemon thread running, the program exits
Daemon, 1
Daemon, 2
Daemon, 3
Daemon, 4


### Enumerate all active threads

You can enumerate all active threads using `threading.enumerate()`. This function will return a list of all active threads (including the thread that calling `threading.enumerate()`

In [2]:
import threading
import time

class DummyThread(threading.Thread):
    def __init__(self, name):
        super(DummyThread, self).__init__()
        self.name = name        
    def run(self):
        for _idx in range(10):
            time.sleep(1)
            
thread0 = DummyThread("Thread 0")
thread0.start()

thread1 = DummyThread("Thread 1")
thread1.start()

# Enumerate all active threads
# If run from notebook, this will print some daemon threads used by Jupyter
for _thread in threading.enumerate():
    print("Name: {}, daemon: {}".format(_thread.name, _thread.daemon))

Name: MainThread, daemon: False
Name: Thread-2, daemon: True
Name: Thread 0, daemon: False
Name: Thread-1, daemon: True
Name: IPythonHistorySavingThread, daemon: True
Name: Thread-3, daemon: True
Name: Thread 1, daemon: False


# Timer

A built-in subclass of `threading.Thread`, `threading.Timer` will execute its `.run()` method after a specialized amount of time.

The example below will run 

In [4]:
import threading
import time

cont = True
l = []

def worker():
    """ Print """
    print("Hello, World!")
    l.append("a")
    if cont:
        thread = threading.Timer(5, worker)
        thread.start()
    
thread0 = threading.Timer(5, worker)
thread0.start()

time.sleep(30)
cont = False
time.sleep(6)
print("`worker()` is executed {} times".format(len(l)))

Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!
Hello, World!
6
