# Important class of threading
event
Semaphore
local
condition/lock

# Signaling Between Threads
Although the point of using multiple threads is to spin separate operations off to run concurrently, there are times when it is important to be able to synchronize the operations in two or more threads. A simple way to communicate between threads is using Event objects. An Event manages an internal flag that callers can either set() or clear(). Other threads can wait() for the flag to be set(), effectively blocking progress until allowed to continue.

In [1]:
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )
                    
def wait_for_event(e):
    """Wait for the event to be set before doing anything"""
    logging.debug('wait_for_event starting')
    event_is_set = e.wait()
    logging.debug('event set: %s', event_is_set)

def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.isSet():
        logging.debug('wait_for_event_timeout starting')
        event_is_set = e.wait(t)
        logging.debug('event set: %s', event_is_set)
        if event_is_set:
            logging.debug('processing event')
        else:
            logging.debug('doing other work')


e = threading.Event()
t1 = threading.Thread(name='block', 
                      target=wait_for_event,
                      args=(e,))
t1.start()

t2 = threading.Thread(name='non-block', 
                      target=wait_for_event_timeout, 
                      args=(e, 2))
t2.start()

logging.debug('Waiting before calling Event.set()')
time.sleep(3)
e.set()
logging.debug('Event is set')

(block     ) wait_for_event starting
(non-block ) wait_for_event_timeout starting
(MainThread) Waiting before calling Event.set()
(non-block ) event set: False
(non-block ) doing other work
(non-block ) wait_for_event_timeout starting
(MainThread) Event is set
(non-block ) event set: True
(block     ) event set: True
(non-block ) processing event


## Semaphore-Limiting Concurrent Access to Resources -- Semaphore
Sometimes it is useful to allow more than one worker access to a resource at a time, while still limiting the overall number. For example, a connection pool might support a fixed number of simultaneous connections, or a network application might support a fixed number of concurrent downloads. A Semaphore is one way to manage those connections.

In [2]:
import logging
import random
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-2s) %(message)s',
                    )

class ActivePool(object):
    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()
    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('Running: %s', self.active)
    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('Running: %s', self.active)

def worker(s, pool):
    logging.debug('Waiting to join the pool')
    with s:
        name = threading.currentThread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(target=worker, name=str(i), args=(s, pool))
    t.start()

(0         ) Waiting to join the pool
(1         ) Waiting to join the pool
(2         ) Waiting to join the pool
(3         ) Waiting to join the pool
(0         ) Running: ['0']
(1         ) Running: ['0', '1']
(0         ) Running: ['1']
(2         ) Running: ['1', '2']
(1         ) Running: ['2']
(3         ) Running: ['2', '3']
(2         ) Running: ['3']
(3         ) Running: []


## Thread-specific Data--Local
While some resources need to be locked so multiple threads can use them, others need to be protected so that they are hidden from view in threads that do not “own” them. The local() function creates an object capable of hiding values from view in separate threads.

In [1]:
import random
import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )

def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

(MainThread) No value yet
(MainThread) value=1000
(Thread-4  ) No value yet
(Thread-5  ) No value yet
(Thread-4  ) value=57
(Thread-5  ) value=11


Notice that local_data.value is not present for any thread until it is set in that thread.
To initialize the settings so all threads start with the same value, use a subclass and set the attributes in __init__().

In [2]:
import random
import threading
import logging

logging.basicConfig(level=logging.DEBUG,
                    format='(%(threadName)-10s) %(message)s',
                    )


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('No value yet')
    else:
        logging.debug('value=%s', val)

def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)

class MyLocal(threading.local):
    def __init__(self, value):
        logging.debug('Initializing %r', self)
        self.value = value

local_data = MyLocal(1000)
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

(MainThread) Initializing <__main__.MyLocal object at 0x7f470c45aa08>
(MainThread) value=1000
(Thread-6  ) Initializing <__main__.MyLocal object at 0x7f470c45aa08>
(Thread-7  ) Initializing <__main__.MyLocal object at 0x7f470c45aa08>
(Thread-6  ) value=1000
(Thread-7  ) value=1000
(Thread-6  ) value=35
(Thread-7  ) value=88


## Synchronizing Threads--Condition¶
In addition to using Events, another way of synchronizing threads is through using a Condition object. Because the Condition uses a Lock, it can be tied to a shared resource. This allows threads to wait for the resource to be updated. In this example, the consumer() threads wait() for the Condition to be set before continuing. The producer() thread is responsible for setting the condition and notifying the other threads that they can continue.

In [3]:
import logging
import threading
import time

logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s (%(threadName)-2s) %(message)s',
                    )

def consumer(cond):
    """wait for the condition and use the resource"""
    logging.debug('Starting consumer thread')
    t = threading.currentThread()
    with cond:
        cond.wait()
        logging.debug('Resource is available to consumer')

def producer(cond):
    """set up the resource to be used by the consumer"""
    logging.debug('Starting producer thread')
    with cond:
        logging.debug('Making resource available')
        cond.notifyAll()

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer, args=(condition,))
c2 = threading.Thread(name='c2', target=consumer, args=(condition,))
p = threading.Thread(name='p', target=producer, args=(condition,))

c1.start()
time.sleep(2)
c2.start()
time.sleep(2)
p.start()

(c1        ) Starting consumer thread
(c2        ) Starting consumer thread
(p         ) Starting producer thread
(p         ) Making resource available
(c1        ) Resource is available to consumer
(c2        ) Resource is available to consumer
