In [1]:
import threading

In [2]:
threading.active_count()

8

In [3]:
threading.current_thread()

<_MainThread(MainThread, started 137017489358848)>

In [4]:
threading.enumerate()

[<_MainThread(MainThread, started 137017489358848)>,
 <Thread(IOPub, started daemon 137017420793408)>,
 <Heartbeat(Heartbeat, started daemon 137017412400704)>,
 <Thread(Thread-2 (_watch_pipe_fd), started daemon 137017387222592)>,
 <Thread(Thread-3 (_watch_pipe_fd), started daemon 137017040041536)>,
 <ControlThread(Control, started daemon 137017031648832)>,
 <HistorySavingThread(IPythonHistorySavingThread, started 137017021158976)>,
 <ParentPollerUnix(Thread-1, started daemon 137017012766272)>]

In [5]:
threading.TIMEOUT_MAX

9223372036.0

## Threading use case

`Major impact` - Tasks that spend much of their time waiting for external events (I/O) are generally good candidates for threading

`Minor impact` - Tasks that require heavy CPU computation & spend little time waiting for external events (these might not run faster)

If the usecase is to perform a CPU-Bound operation then `multiprocessing` module is to be considered

### Single Thread

In [3]:
import logging
import threading
import time

def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    
    x = threading.Thread(target=thread_function, args=(1,))
    
    # # Creating a Daemon thread
    # x = threading.Thread(target=thread_function, args=(1,), daemon=True)
    
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join() # comment/uncomment this to check the difference
    logging.info("Main    : all done")

19:35:46: Main    : before creating thread
19:35:46: Main    : before running thread
19:35:46: Thread 1: starting
19:35:46: Main    : wait for the thread to finish
19:35:46: Main    : all done
19:35:48: Thread 1: finishing


**Daemon**

In CS, it refers to a process that runs in background. In case of threading its a bit more specific.

It refers to a thread which runs in the background without worrying to shutdown because it terminates as the program exits.

**Non Daemon**

The program which starts these threads wait for the threads to complete before it terminates

*NOTE*

In the above example (x.join() when its commented), you will notice a small pause after the `all done` print. This is because the program is waiting for the `non daemon` thread to complete.

Internally, the threading._shutdown() goes through all the alive threads and calls .join() on those threads which don't have `daemon` flag set. This is what we also can manually add (as seen in the above code)

### Multiple Threads

In [18]:
"""
    The order in which threads are run is decided by the OS & its hard to predict it. Therefore, running the 
    below code multiple times will results in different finishing print order.

    Fortunately, Python provide several primitives that can help to coordinate threads (later section)
"""

threads = list()
for index in range(3):
    logging.info("Main    : create and start thread %d.", index)
    x = threading.Thread(target=thread_function, args=(index,))
    threads.append(x)
    x.start()

for index, thread in enumerate(threads):
    logging.info("Main    : before joining thread %d.", index)
    thread.join()
    logging.info("Main    : thread %d done", index)

16:44:19: Main    : create and start thread 0.
16:44:19: Thread 0: starting
16:44:19: Main    : create and start thread 1.
16:44:19: Thread 1: starting
16:44:19: Main    : create and start thread 2.
16:44:19: Thread 2: starting
16:44:19: Main    : before joining thread 0.
16:44:21: Thread 0: finishing
16:44:21: Thread 1: finishing
16:44:21: Main    : thread 0 done
16:44:21: Main    : before joining thread 1.
16:44:21: Thread 2: finishing
16:44:21: Main    : thread 1 done
16:44:21: Main    : before joining thread 2.
16:44:21: Main    : thread 2 done


### ThreadPoolExecutor

An easier way to start up a grp of threads.

The easiest way is to create using the `with` context manager

In [10]:
import concurrent.futures

# [rest of code]

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))

"""
    The `ThreadPoolExecutor` at the end of the `with` statement calls the .join() on each of the 
    thread in the Pool.

    Its highly recommended to use `ThreadPoolExecutor` with `with context manager` as there will be no case
    of issue caused due to forgetting to implement the `.join()` call
"""

16:40:05: Thread 0: starting
16:40:05: Thread 1: starting
16:40:05: Thread 2: starting
16:40:07: Thread 0: finishing
16:40:07: Thread 1: finishing
16:40:07: Thread 2: finishing


'\n    The `ThreadPoolExecutor` at the end of the `with` statement calls the .join() on each of the \n    thread in the Pool.\n\n    Its highly recommended to use `ThreadPoolExecutor` with `with context manager` as there will be no case\n    of issue caused due to forgetting to implement the `.join()` call\n'

**Note**: Using a ThreadPoolExecutor can cause some confusing errors.

For example, if you call a function that takes no parameters, but you pass it parameters in .map(), the thread will throw an exception.

Unfortunately, ThreadPoolExecutor will hide that exception, and (in the case above) the program terminates with no output. This can be quite confusing to debug at first.

### Race Conditions

- Race conditions can occur when two or more threads access a shared piece of data or resource.
- Frequently, they only occur rarely, and they can produce confusing results. As you can imagine, this makes them quite difficult to debug.

In [19]:
# Creating a race condition
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info("Thread %s: starting update", name)
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info("Thread %s: finishing update", name)

In [20]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

16:49:18: Testing update. Starting value is 0.
16:49:18: Thread 0: starting update
16:49:18: Thread 1: starting update
16:49:18: Thread 0: finishing update
16:49:18: Thread 1: finishing update
16:49:18: Testing update. Ending value is 1.


Above the expected value should be 2 (as python threads run one at a time)

When a thread starts it creates a `local_copy`. Therefore, thread 1 creates copy -> update -> sleep

When thread 1 sleeps thread 2 gets the opportunity to start -> copy -> update. Notice that the `value` has not been updated. So, thread 2 also copies 0.

Then each thread update the `self.value` to be 1 as they aren't aware of other threads doing

### Basic Sync Using `Lock`

One of the ways to avoid `race conditions`

A Lock is an object that acts like a hall pass. Only one thread at a time can have the Lock. Any other thread that wants the Lock must wait until the owner of the Lock gives it up.

In some other languages this same idea is called a `mutex`.

`.acquire()` and `.release()` are the two basic functions used to perform this.

**NOTE** - If the thread doesn't ever release the lock then in that case the program will be stuck.

Fortunately, Python’s Lock will also operate as a `context manager`, so you can use it in a with statement, and it gets released automatically when the with block exits for any reason.

In [9]:
import concurrent.futures

class FakeDatabase:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def locked_update(self, name):
        logging.info("Thread %s: starting update", name)
        logging.debug("Thread %s about to lock", name)

        # Using as a context manager
        with self._lock: # using the .acquire() internally
            logging.debug("Thread %s has lock", name)
            local_copy = self.value
            local_copy += 1
            time.sleep(0.1)
            self.value = local_copy
            logging.debug("Thread %s about to release lock", name)
            
        # released using the .release() internally
        logging.debug("Thread %s after release", name)
        logging.info("Thread %s: finishing update", name)

# ASSUMPTION :: here we are using sleep but because it has locked the resource another thread using the
# same resource is not started

In [11]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    database = FakeDatabase()
    logging.info("Testing update. Starting value is %d.", database.value)
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for index in range(2):
            executor.submit(database.locked_update, index)
    logging.info("Testing update. Ending value is %d.", database.value)

19:40:29: Testing update. Starting value is 0.
19:40:29: Thread 0: starting update
19:40:29: Thread 1: starting update
19:40:29: Thread 0: finishing update
19:40:29: Thread 1: finishing update
19:40:29: Testing update. Ending value is 2.


```python
import threading

l = threading.Lock()
print("before first acquire")
l.acquire()
print("before second acquire")
l.acquire()
print("acquired lock twice")
```

The above will hang as the lock is not released ( `Deadlock` )

Here there is no thread creation so what is getting the lock. As far as my understanding goes, the program irrespective of therad creation runs on `main thread` so the main thread has the lock & it again calls for the lock before releasing resulting in a deadlock.

Deadlocks usually happen from one of two subtle things:
- An implementation bug where a Lock is not released properly
- A design issue where a utility function needs to be called by functions that might or might not already have the Lock

The 1st issue can be greatly reduced using `context manager form`

The 2nd issue can be tackled using `RLock`. It allows a thread to .acquire() an RLock multiple times before it calls .release(). That thread is still required to call .release() the same number of times it called .acquire(), but it should be doing that anyway.

`Lock` & `RLock` are the two basic tools used in threaded programming to prevent race conditions

I/O & Memory bound operations in a program release the GIL for other threads to keep the CPU busy
- open() and close() file operations
- read(), write(), readline(), readlines(), and writelines()
- socket.recv(), socket.send(), socket.recvfrom(), socket.sendto()
- Database operations using libraries like sqlite3, psycopg2, MySQLdb, etc.
- Certain operations in the `collections` module like deque, Queue, Counter, etc.
- Some operations in the os module, such as os.system(), os.fork(), etc.
- subprocess module functions like subprocess.run(), subprocess.Popen()
- Async I/O operations (asyncio)

### Producer Consumer Threading

This is a standard computer science problem used to look at threading or process synchronization issues,

#### Product Consumer using Lock

In [1]:
import threading
import logging
import concurrent.futures

In [5]:
class Pipeline:
    """
    Class to allow a single element pipeline between producer and consumer.
    """
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.debug("%s:about to acquire getlock", name)
        self.consumer_lock.acquire()
        logging.debug("%s:have getlock", name)
        message = self.message
        logging.debug("%s:about to release setlock", name)
        self.producer_lock.release()
        logging.debug("%s:setlock released", name)
        return message

    def set_message(self, message, name):
        logging.debug("%s:about to acquire setlock", name)
        self.producer_lock.acquire()
        logging.debug("%s:have setlock", name)
        self.message = message
        logging.debug("%s:about to release getlock", name)
        self.consumer_lock.release()
        logging.debug("%s:getlock released", name)

In [6]:
import random

SENTINEL = object()

def producer(pipeline):
    """Pretend we're getting a message from the network."""
    for index in range(10):
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    # Send a sentinel message to tell consumer we're done
    pipeline.set_message(SENTINEL, "Producer")

def consumer(pipeline):
    """Pretend we're saving a number in the database."""
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info("Consumer storing message: %s", message)

In [8]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)

07:37:20: Producer got message: 27
07:37:20: Producer got message: 83
07:37:20: Consumer storing message: 27
07:37:20: Producer got message: 100
07:37:20: Consumer storing message: 83
07:37:20: Producer got message: 47
07:37:20: Consumer storing message: 100
07:37:20: Producer got message: 82
07:37:20: Consumer storing message: 47
07:37:20: Producer got message: 63
07:37:20: Consumer storing message: 82
07:37:20: Producer got message: 33
07:37:20: Consumer storing message: 63
07:37:20: Producer got message: 72
07:37:20: Consumer storing message: 33
07:37:20: Producer got message: 97
07:37:20: Consumer storing message: 72
07:37:20: Producer got message: 90
07:37:20: Consumer storing message: 97
07:37:20: Consumer storing message: 90


#### Producer Consumer using Queue

Here there is no need to include all the locking because `Queue` has the locking mechanism incorporated internally. `Queue` is `thread-safe`

In [2]:
import queue

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

In [3]:
def producer(pipeline, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        pipeline.set_message(message, "Producer")

    logging.info("Producer received EXIT event. Exiting")

def consumer(pipeline, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not pipeline.empty():
        message = pipeline.get_message("Consumer")
        logging.info(
            "Consumer storing message: %s  (queue size=%s)",
            message,
            pipeline.qsize(),
        )

    logging.info("Consumer received EXIT event. Exiting")

In [None]:
if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    # logging.getLogger().setLevel(logging.DEBUG)

    pipeline = Pipeline()
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.01)
        logging.info("Main: about to set event")
        event.set()

### Threading Objects

There are a few more primitives offered by the Python threading module

#### Semaphore

A Semaphore is a counter with a few special properties.
- The first one is that the counting is atomic.
- This means that there is a guarantee that the operating system will not swap out the thread in the middle of incrementing or decrementing the counter.
- The internal counter is incremented when you call .release() and decremented when you call .acquire().
- The next special property is that if a thread calls .acquire() when the counter is zero, that thread will block until a different thread calls .release() and increments the counter to one.

These are usually used to protect resources

#### Timer

A threading.Timer is a way to schedule a function to be called after a certain amount of time has passed

```python
t = threading.Timer(30.0, my_function)
t.start()
```
The function will be called on a new thread at some point after the specified time, but be aware that there is `no promise that it will be called exactly at the time you want`.

The function call can be cancel in any required case by using `.cancel()`

#### Barrier

- A threading.Barrier can be used to keep a fixed number of threads in sync.
- When creating a Barrier, the caller must specify how many threads will be synchronizing on it
- Each thread calls .wait() on the Barrier. They all will remain blocked until the specified number of threads are waiting, and then the are all released at the same time.

Remember that threads are scheduled by the operating system so, even though all of the threads are released simultaneously, they will be scheduled to run one at a time.

One use for a Barrier is to allow a pool of threads to initialize themselves. Having the threads wait on a Barrier after they are initialized will ensure that none of the threads start running before all of the threads are finished with their initialization.