## Get number of active threads with threading.Thread().active_count()

## Use threading.Event() to end a thread

In [1]:
import threading
import time

def spin_and_sleep(event: threading.Event):
    count = 0
    while not event.is_set():
        pass

In [2]:
event = threading.Event()
for _ in range(5):
    thread = threading.Thread(target=spin_and_sleep, args=(event,))
    thread.start()
print(f'Before event.set() there are {threading.active_count()} active threads')
event.set()
time.sleep(0.1)
print(f'After event.set() there are {threading.active_count()} active threads')

Before event.set() there are 10 active threads
After event.set() there are 5 active threads


## Use queues for producer/consumer

Uses a dictionary where the keys are thread IDs and the value is another dictionary with the keys 'msgs_to_consume' and 'msg_queue'. A thread may register itself as interested in specific messages by adding its thread ID to the MSG_QUEUES dictionary and adding the messages it is interested in along with an empty queue for the producer to place messages in.

In [3]:
MSG_QUEUES = {}

In [4]:
import queue

class MessageConsumer:

    def __init__(self, *args):
        self._msgs_to_consume = set(args)

    def __enter__(self):
        """Add this thread to MSG_QUEUES dict"""
        MSG_QUEUES[threading.get_ident()] = {
            'msgs_to_consume': self._msgs_to_consume,
            'msg_queue': queue.Queue()
        }
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        """Remove msg consumer from queue"""
        MSG_QUEUES.pop(threading.get_ident(), None)
        if exc_type:
            print(exc_type)
            print(exc_value)
            print(traceback)
            return False
        return True
    
    def get(self, block=True, timeout=None):
        return MSG_QUEUES[threading.get_ident()]['msg_queue'].get(block=block, timeout=timeout)

In [5]:
msgs_list = ['hello', 'list', 'of', 'words']

In [6]:
import time

def wait_and_print_msgs():

    timeout = time.time() + 10

    with MessageConsumer('hello') as ms:
        while time.time() < timeout:
            try:
                new_msg = ms.get(timeout=5)
            except queue.Empty:
                print('No messages arrived in 5 seconds')
                pass
            else:
                print(f'New msg received: {new_msg}')

In [None]:
import random

t = threading.Thread(target=wait_and_print_msgs)
t.start()

timeout = time.time() + 10

while time.time() < timeout:
    time.sleep(0.5)
    new_msg = random.choice(msgs_list)
    print(f'new msg to put = {new_msg}')
    for thread_id in MSG_QUEUES:
        if new_msg in MSG_QUEUES[thread_id]['msgs_to_consume']:
            try:
                print(f'found thread interested, putting {new_msg}')
                MSG_QUEUES[thread_id]['msg_queue'].put(new_msg)
            except KeyError:
                pass

new msg to put = words
new msg to put = list
new msg to put = words
new msg to put = list
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = of
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = of
new msg to put = of
new msg to put = list
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = list
new msg to put = words
new msg to put = of
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = list
new msg to put = list
new msg to put = of
new msg to put = words
new msg to put = words
new msg to put = words
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = hello
found thread interested, putting hello
New msg received: hello
new msg to put = list
new ms