# Multithreading Tutorial

Python Global Interpreter Lock (**[GIL](https://realpython.com/python-gil/)**) allows only one thread to hold the python interpreter. It means only one thread can execute each time. Even if there are multiple CPU or cores, only one CPU will be used.  This limitation means **threading** module in python works well only for IO-bound tasks. This is different in other programming languages such as C, JAVA. For CPU-bound tasks, we can still use **multiprocessing** module in Python. 
- https://realpython.com/intro-to-python-threading/#using-a-threadpoolexecutor
- https://docs.python.org/3/library/threading.html


In [6]:
import logging
from threading import Thread, current_thread
import time

## Get Started

In [19]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    print(f"The current thread is {current_thread().name}")
    
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)
    

20:59:43: Main    : create and start thread 0.
20:59:43: Thread 0: starting
20:59:43: Main    : create and start thread 1.
20:59:43: Thread 1: starting
20:59:43: Main    : create and start thread 2.
20:59:43: Thread 2: starting
20:59:43: Main    : before joining thread 0.


The current thread is MainThread


20:59:45: Thread 0: finishing
20:59:45: Main    : thread 0 done
20:59:45: Main    : before joining thread 1.
20:59:45: Thread 1: finishing
20:59:45: Main    : thread 1 done
20:59:45: Thread 2: finishing
20:59:45: Main    : before joining thread 2.
20:59:45: Main    : thread 2 done


In the above code, only one new thread is created, so there are two threads including the default one. Here `x.start()` starts the thread, `x.join()` will wait until the thread is done before moving to the next line. If `x.join()` is omitted, the last line will execute before the thread is finished. 

If we define the new thread as daemon thread by 
```x = Thread(target=thread_function, args=(1,), daemon=True)```, the last line in `thread_function` wouldn't run because when the main thread finishes, all daemon threads will be stopped without completion. Note that this statement applies only when running as a python script, not in jupyter notebook.

If `thread_function` has a return value, it's not easy to get them this way. But we can pass a mutable variable into it to store result, or use **ThreadPoolExecutor**. 

## ThreadPoolExecutor

In [20]:
import concurrent.futures

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

21:01:43: Thread 0: starting
21:01:43: Thread 1: starting
21:01:43: Thread 2: starting
21:01:45: Thread 0: finishing
21:01:45: Thread 1: finishing
21:01:45: Thread 2: finishing


The end of the with block causes the ThreadPoolExecutor to do a `.join()` on each of the threads in the pool. It is strongly recommended that you use ThreadPoolExecutor as a context manager when you can so that you never forget to `.join()` the threads.

In [26]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)
    return name

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,m
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        x = executor.submit(thread_function, 99)
        print(x.result())
    
    print("All done")

21:15:39: Thread 99: starting
21:15:41: Thread 99: finishing


99
All done


## Lock

Different processes are allocated different memories, but different threads in the same process can share memory. If multiple threads can write to the same variable, **race condition** could happen.


- `lock = Lock()`
- `lock.acquire()`
- `lock.release()`
- `lock.locked()`

In [27]:
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name): 
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
        
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)        
        


21:19:48: Testing update. Starting value is 0.
21:19:48: Thread 0: starting update
21:19:48: Thread 1: starting update
21:19:48: Thread 0: finishing update
21:19:48: Thread 1: finishing update
21:19:48: Testing update. Ending value is 1.


Here each thread will increment the value by 1, so the expected value is 2, but we get 1 due to race condition. We can use lock to solve this problem. 

In [28]:
from threading import Lock

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self.lock = Lock()

    def update(self, name): 
        self.lock.acquire()
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)
        self.lock.release()
        
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)    

21:22:33: Testing update. Starting value is 0.
21:22:33: Thread 0: starting update
21:22:33: Thread 0: finishing update
21:22:33: Thread 1: starting update
21:22:33: Thread 1: finishing update
21:22:33: Testing update. Ending value is 2.


When we are ready to update the value, get the lock first by `lock.acquire()`, then the lock is not available to other threads so they have to wait before the lock is released. We can also use lock as a context manager with the `with` statement so the `update` method becomes:
``` 
    def update(self, name): 
        with self.lock:
            logging.info("Thread %s: starting update", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.info("Thread %s: finishing update", name)
```

Note that we should call `lock.acquire()` twice. It will lead to dead lock, for instance:
```
import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")
```

There is another object called **RLock**. It allows the same thread to call `lock.acquire()` multiple times. The thread must release the lock at least once to set the lock back to unlocked state for other threads to use.

## Semaphore

Only one thread can hold a lock. When we want to have more than one thread to work, we can use **Semaphore**. There is an internal counter. It is incremented when you call `.release()` and decremented when you call `.acquire()`. The counter value can be negative, can also exceed the initial value set at initialization. There is also `BoundedSemaphore` object, for which exceeding initial value would raise **ValueError**.

The next special property is that if a thread calls `.acquire()` when the counter is zero, that thread will block until a different thread calls `.release()` and increments the counter to one.


- `sem = Semaphore(value=1)`, the default value is 1
- `sem.acquire()`
- `sem.release(n=1)`, increment the internal counter by `n`, the defautl value is `1`

In [33]:
from threading import Semaphore

sem = Semaphore(3)

def func(idx):
    print(f"thread-{idx}: entering")
    sem.acquire()
    print(f"thread-{idx}: starting")
    time.sleep(idx)
    print(f"thread-{idx}: finishing")
    sem.release()
    
    
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
    executor.map(func, range(1, 6))

thread-1: entering
thread-1: starting
thread-2: entering
thread-2: starting
thread-3: entering
thread-3: starting
thread-4: entering
thread-5: entering
thread-1: finishing
thread-4: starting
thread-2: finishing
thread-5: starting
thread-3: finishing
thread-4: finishing
thread-5: finishing


In [41]:
from threading import Semaphore
from time import sleep

sem = Semaphore(0)

def func_acq():
    sem.acquire()
    print("executing func_acq")
    
def func_rel():
    print("executing func_rel")
    sem.release()
    


with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
    for _ in range(3):
        executor.submit(func_acq)
    for _ in range(3):
        sleep(1)
        executor.submit(func_rel)


executing func_rel
executing func_acq
executing func_rel
executing func_acq
executing func_rel
executing func_acq


## Timer

In [36]:
from threading import Semaphore, Timer

sem = Semaphore(3)
thread = Timer(2, lambda t: print(f"new-thread: running after {t} seconds"), (2,))
thread.start()

def func(idx):
    print(f"thread-{idx}: entering")
    sem.acquire()
    print(f"thread-{idx}: starting")
    time.sleep(idx)
    print(f"thread-{idx}: finishing")
    sem.release()
    
    
with concurrent.futures.ThreadPoolExecutor(max_workers = 5) as executor:
    executor.map(func, range(1, 6))

thread-1: entering
thread-1: starting
thread-2: entering
thread-2: starting
thread-3: entering
thread-3: starting
thread-4: enteringthread-5: entering

thread-1: finishing
thread-5: starting
new-thread: running after 2 seconds
thread-2: finishing
thread-4: starting
thread-3: finishing
thread-5: finishingthread-4: finishing



## Barrier

- `barrier = Barrier(parties)`
- `barrier.wait()`
- `barrier.parties`
- `barrier.n_waiting`

In [43]:
from threading import Barrier, Thread
from time import sleep, time

num = 2

# People will reach barrier and they can climb over the barrier only in group of "num" at each time
b = Barrier(num)
names = ['Alice', 'Bob', 'Charlie', 'David']

start = time()
def player(name):
    print('%s reached the barrier at: %s \n' % (name, int(time() - start)))
    print(f'The number of current waiting threads is {b.n_waiting}')
    b.wait()
    print('%s overcomed the barrier at: %s \n' % (name, int(time() - start)))


threads = []
print("Game starts now…")

for i in range(4):
    threads.append(Thread(target=player, args=(names[i],)))
    threads[-1].start()
    sleep(1)

for thread in threads:
    thread.join()

print("Game ends!")

Game starts now…
Alice reached the barrier at: 0 

The number of current waiting threads is 0
Bob reached the barrier at: 1 

The number of current waiting threads is 1
Bob overcomed the barrier at: 1 

Alice overcomed the barrier at: 1 

Charlie reached the barrier at: 2 

The number of current waiting threads is 0
David reached the barrier at: 3 

The number of current waiting threads is 1
David overcomed the barrier at: 3 

Charlie overcomed the barrier at: 3 

Game ends!


## Condition

https://docs.python.org/3/library/threading.html#condition-objects

* `condition = Condition(lock=None)`\
   default is creating a new lock
* `condition.acquire()`\
    acquire the internal lock
* `condition.release()`
* `condition.wait(timeout=None)`\
    wait for either `condition.notify()`, `condition.notify_all()` from another thread or timeout. 
    - It also releases the lock and wait for notification
    - re-acquire once it's notified.
* `condition.wait_for(predicate, timeout=None)`\
    Wait until a condition evaluates to true. predicate should be a callable which result will be interpreted as a boolean value. A timeout may be provided giving the maximum time to wait. Neglecting `timeout`, it's equivalent to calling `condition.wait()` in a `while True` loop.
* `condition.notify(n=1)`
* `condition.notify_all()`

**Note**: the notify() and notify_all() methods don’t release the lock; this means that the thread or threads awakened will not return from their wait() call immediately, but only when the thread that called notify() or notify_all() finally relinquishes ownership of the lock.


## Event

An event manages a flag that can be set to true with the `set()` method and reset to false with the `clear()` method. The `wait()` method blocks until the flag is true. The flag is **initially** false.

- `event = Event()`
- `event.set()`, set the internal flag to True
- `event.is_set()`, 
- `event.clear()`
- `event.wait()`