# Python Thread

In [2]:
import logging
import random
import threading
import time

## Starting a Thread

In [1]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    fmt = "%(asctime)s: %(message)s"
    logging.basicConfig(format=fmt, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,))
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

10:20:08: Main    : before creating thread
10:20:08: Main    : before running thread
10:20:08: Thread 1: starting
10:20:08: Main    : wait for the thread to finish
10:20:08: Main    : all done
10:20:10: Thread 1: finishing


## Daemon Thread

In [5]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    fmt = "%(asctime)s: %(message)s"
    logging.basicConfig(format=fmt, level=logging.INFO,
                        datefmt="%H:%M:%S")

    logging.info("Main    : before creating thread")
    x = threading.Thread(target=thread_function, args=(1,), daemon=True)
    logging.info("Main    : before running thread")
    x.start()
    logging.info("Main    : wait for the thread to finish")
    # x.join()
    logging.info("Main    : all done")

10:23:01: Main    : before creating thread
10:23:01: Main    : before running thread
10:23:01: Thread 1: starting
10:23:01: Main    : wait for the thread to finish
10:23:03: Thread 1: finishing
10:23:03: Main    : all done


## Working with many threads

In [6]:
def thread_function(name):
    logging.info("Thread %s: starting", name)
    time.sleep(2)
    logging.info("Thread %s: finishing", name)

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    threads = list()
    for index in range(3):
        logging.info("Main    : create and start thread %d.", index)
        x = threading.Thread(target=thread_function, args=(index,))
        threads.append(x)
        x.start()

    for index, thread in enumerate(threads):
        logging.info("Main    : before joining thread %d.", index)
        thread.join()
        logging.info("Main    : thread %d done", index)

10:27:52: Main    : create and start thread 0.
10:27:52: Thread 0: starting
10:27:52: Main    : create and start thread 1.
10:27:52: Thread 1: starting
10:27:52: Main    : create and start thread 2.
10:27:52: Thread 2: starting
10:27:52: Main    : before joining thread 0.
10:27:54: Thread 0: finishing
10:27:54: Thread 1: finishing
10:27:54: Main    : thread 0 done
10:27:54: Main    : before joining thread 1.
10:27:54: Main    : thread 1 done
10:27:54: Main    : before joining thread 2.
10:27:54: Thread 2: finishing
10:27:54: Main    : thread 2 done


## Using a ThreadPoolExecutor

In [7]:
import concurrent.futures
def thread_function(name):
    logging.info(f"Thread {name}: starting")
    time.sleep(2)
    logging.info(f"Thread {name}: finishing")

if __name__ == "__main__":
    fmt = "%(asctime)s: %(message)s"
    logging.basicConfig(format=fmt, level=logging.INFO,
                        datefmt="%H:%M:%S")
    logging.info("Main    : before creating thread")
    with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(thread_function, range(3))
    logging.info("Main    : all done")



10:38:36: Main    : before creating thread
10:38:36: Thread 0: starting
10:38:36: Thread 1: starting
10:38:36: Thread 2: starting
10:38:38: Thread 0: finishing
10:38:38: Thread 1: finishing
10:38:38: Thread 2: finishing
10:38:38: Main    : all done


## What's Race Conditions

In [9]:
class FakeDatabase:
    def __init__(self):
        self.value = 0

    def update(self, name):
        logging.info(f"Thread {name}: starting update")
        local_copy = self.value
        local_copy += 1
        time.sleep(0.1)
        self.value = local_copy
        logging.info(f"Thread {name}: finishing update")


In [10]:
if __name__ == "__main__":
    fmt = "%(asctime)s: %(message)s"
    logging.basicConfig(format=fmt, level=logging.INFO,
                        datefmt="%H:%M:%S")
    database = FakeDatabase()
    logging.info(f"Testing update. Starting value is {database.value}")
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for i in range(2):
            executor.submit(database.update, i)
    logging.info(f"Testing update. Ending value is {database.value}")

10:51:38: Testing update. Starting value is 0
10:51:38: Thread 0: starting update
10:51:38: Thread 1: starting update
10:51:38: Thread 0: finishing update
10:51:38: Thread 1: finishing update
10:51:38: Testing update. Ending value is 1


## Lock

In [11]:
class FakeDatabaseWithLock:
    def __init__(self):
        self.value = 0
        self._lock = threading.Lock()

    def update(self, name):
       logging.info(f"Thread {name}: starting update")
       logging.info(f"Thread {name}: about to lock")

       with self._lock:
           logging.info(f"Thread {name}: lock acquired")
           local_copy = self.value
           local_copy += 1
           time.sleep(0.1)
           self.value = local_copy
           logging.info(f"Thread {name}: lock released")

       logging.info(f"Thread {name}: finishing update")

if __name__ == "__main__":
    fmt = "%(asctime)s: %(message)s"
    logging.basicConfig(format=fmt, level=logging.INFO,
                        datefmt="%H:%M:%S")
    database = FakeDatabaseWithLock()
    logging.info(f"Testing update. Starting value is {database.value}")
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        for i in range(2):
            executor.submit(database.update, i)
    logging.info(f"Testing update. Ending value is {database.value}")

11:14:30: Testing update. Starting value is 0
11:14:30: Thread 0: starting update
11:14:30: Thread 1: starting update
11:14:30: Thread 0: about to lock
11:14:30: Thread 1: about to lock
11:14:30: Thread 0: lock acquired
11:14:30: Thread 0: lock released
11:14:30: Thread 0: finishing update
11:14:30: Thread 1: lock acquired
11:14:30: Thread 1: lock released
11:14:30: Thread 1: finishing update
11:14:30: Testing update. Ending value is 2


## Producer-Consumer Threading

In [12]:
import random

class Pipeline:
    def __init__(self):
        self.message = 0
        self.producer_lock = threading.Lock()
        self.consumer_lock = threading.Lock()
        self.consumer_lock.acquire()

    def get_message(self, name):
        logging.info(f"{name}: about to acquire lock")
        self.consumer_lock.acquire()
        logging.info(f"{name}: acquired lock")
        message = self.message
        logging.info(f"{name}: about to release lock")
        self.producer_lock.release()
        logging.info(f"{name}: released lock")
        return message

    def set_message(self, name, message):
        logging.info(f"{name}: about to acquire lock")
        self.producer_lock.acquire()
        logging.info(f"{name}: acquired lock")
        self.message = message
        logging.info(f"{name}: about to release lock")
        self.consumer_lock.release()
        logging.info(f"{name}: released lock")

SENTINEL = object()

def producer(pipeline):
    for _ in range(10):
        message = random.randint(1, 100)
        logging.info(f"Producer got message: {message}")
        pipeline.set_message("Producer", message)
    pipeline.set_message(SENTINEL, "Producer")

def consumer(pipeline):
    message = 0
    while message is not SENTINEL:
        message = pipeline.get_message("Consumer")
        if message is not SENTINEL:
            logging.info(f"Consumer storing message: {message}")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")
    pipeline = Pipeline()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline)
        executor.submit(consumer, pipeline)


11:39:39: Producer got message: 20
11:39:39: Consumer: about to acquire lock
11:39:39: Producer: about to acquire lock
11:39:39: Producer: acquired lock
11:39:39: Producer: about to release lock
11:39:39: Producer: released lock
11:39:39: Consumer: acquired lock
11:39:39: Producer got message: 89
11:39:39: Consumer: about to release lock
11:39:39: Producer: about to acquire lock
11:39:39: Consumer: released lock
11:39:39: Producer: acquired lock
11:39:39: Consumer storing message: 20
11:39:39: Producer: about to release lock
11:39:39: Consumer: about to acquire lock
11:39:39: Producer: released lock
11:39:39: Consumer: acquired lock
11:39:39: Producer got message: 16
11:39:39: Consumer: about to release lock
11:39:39: Consumer: released lock
11:39:39: Producer: about to acquire lock
11:39:39: Consumer storing message: 89
11:39:39: Producer: acquired lock
11:39:39: Consumer: about to acquire lock
11:39:39: Producer: about to release lock
11:39:39: Producer: released lock
11:39:39: Consu

KeyboardInterrupt: 

## Queue is thread safe

In [None]:
import concurrent.futures
import logging
import queue
import random
import threading
import time

class Pipeline(queue.Queue):
    def __init__(self):
        super().__init__(maxsize=10)

    def get_message(self, name):
        logging.debug("%s:about to get from queue", name)
        value = self.get()
        logging.debug("%s:got %d from queue", name, value)
        return value

    def set_message(self, value, name):
        logging.debug("%s:about to add %d to queue", name, value)
        self.put(value)
        logging.debug("%s:added %d to queue", name, value)

def producer(queue, event):
    """Pretend we're getting a number from the network."""
    while not event.is_set():
        message = random.randint(1, 101)
        logging.info("Producer got message: %s", message)
        queue.put(message)

    logging.info("Producer received event. Exiting")

def consumer(queue, event):
    """Pretend we're saving a number in the database."""
    while not event.is_set() or not queue.empty():
        message = queue.get()
        logging.info(
            "Consumer storing message: %s (size=%d)", message, queue.qsize()
        )

    logging.info("Consumer received event. Exiting")

if __name__ == "__main__":
    format = "%(asctime)s: %(message)s"
    logging.basicConfig(format=format, level=logging.INFO,
                        datefmt="%H:%M:%S")

    pipeline = queue.Queue(maxsize=10)
    event = threading.Event()
    with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(producer, pipeline, event)
        executor.submit(consumer, pipeline, event)

        time.sleep(0.1)
        logging.info("Main: about to set event")
        event.set()