In [1]:
pwd

'/Users/sinayoks/dev/notebooks/little-book-of-semaphores'

In [2]:

#!mv /Users/sinayoks/Desktop/Screen\ Shot\ 2020-06-27\ at\ 14.49.39.png sync_app.png

# The Little Book of Semaphores

Working through my own implementations of concurrency and synchronization problems from the [Little Book of Semaphores](http://greenteapress.com/semaphores/LittleBookOfSemaphores.pdf) by A. B. Downey.

**Asyncio solutions in notebook**

We will implement examples using asyncio. 

**Reference solutions using Sync GUI app**

Solutions are also available from https://github.com/AllenDowney/LittleBookOfSemaphores/tree/master/code/sync_code and can be run using the Sync program provided in that repository:
```
git clone git@github.com:AllenDowney/LittleBookOfSemaphores.git
cd LittleBookOfSemaphores/code
python Sync.py sync_code/signal.py
```
![signalling problem using Downey's Sync app](sync_app.png)
```

```

In [3]:
import asyncio
import threading
from concurrent.futures import ThreadPoolExecutor
from IPython.core.debugger import set_trace


In [4]:
async def run_test(test_coroutine, attempts):
    """Run a test multiple times to make sure we don't get lucky."""
    [await test_coroutine(attempt) for attempt in range(attempts)]

## Basic patterns

### Signaling
**Signaling problem**
2 threads/coroutines having to coordinate to do an action in a particular order.

In [5]:
async def test_signaling(attempt):
    """Check the signaling approach serialises the threads so that actionA 
    takes place before actionB. 
    
    Push a value from coroutines A then one from coroutine B into a shared queue, 
    and make sure they've been pushed in that order. 
    """
    # Not using a Lock here because we want to signal/release before waiting/acquiring 
    # so we need a semaphore. 
    first_action_done = asyncio.Semaphore(0)
    queue = asyncio.Queue()
    
    async def push(value):
        await queue.put(value)

    async def coroutineA():
        """Do first action then signal to B that we're done"""
        asyncio.Task.current_task().name = "coroutineA"
        await push('A')
        first_action_done.release()

    async def coroutineB():
        asyncio.Task.current_task().name = "coroutineB"
        async with first_action_done:
            res = await push('B')


    await asyncio.gather(coroutineA(), coroutineB())
    res = [await queue.get() for _ in range(queue.qsize())]
    assert res == ['A', 'B'], f'Test failed for attempt {attempt}: got {res}'

In [6]:
await run_test(test_signaling, 100)

### Rendez Vous
Two threads must await each other before doing some action. 

In [7]:
async def test_rendez_vous(attempt):
    """Make sure action doesn't happen before a rendez vous. 
    
    In this test, the action is reading the key value pairs from a shared dictionary. 
    Each of the two coroutines adds a key-value pair to the dictionary before the rendez vous. 
    So if both coroutines have waited for the other one successfully, they should both 
    return the two same (key,value) pairs. 
    """
    b_has_arrived = asyncio.Semaphore(0)
    a_has_arrived = asyncio.Semaphore(0)
    shared_dict = {}
    lock = asyncio.Lock()  # a lock to protect updating the dictionary
    
    async def read_items_from_dict():
        """Return tuple of (key, value) pairs giving the items in the dictionary."""
        async with lock:
            res = tuple(sorted(shared_dict.items()))
        return res 

    async def coroutineA(key, value):
        """Store key value pair in shared dictionary, wait for rendez vous with B, 
        then read all key value pairs from dictionary. 
        """
        a_has_arrived.release()
        # put in a value in the queue
        async with lock:
              shared_dict[key] = value
        await b_has_arrived.acquire()
        res = await read_items_from_dict()
        return res

    async def coroutineB(key, value):
        """Store key value pair in shared dictionary, wait for rendez vous with A, 
        then read all key value pairs from dictionary. 
        """
        b_has_arrived.release()
        async with lock:
              shared_dict[key] = value
        await a_has_arrived.acquire()
        res = await read_items_from_dict()
        return res

    itemsA, itemsB = await asyncio.gather(
        coroutineA('A', 1), 
        coroutineB('B', 2)
    )
    expected_items = ('A', 1), ('B', 2)
    results = {k:v for k,v in locals().items() if k in ['attempt', 'itemsA', 'itemsB', 'expected_items']}
    assert itemsA == itemsB == expected_items, results

In [8]:
await run_test(test_rendez_vous, 10)

### Mutex
Protects access to critical regions. Mutal exclusion: two threads can't both have access to the mutex at the same time. 

How to test:
- Use a lot of threads and make them race hard: see `test_mutex_threadpoool`
- Better demonstrated without asyncio as the event loop tends to make some of these issues go away because operations between awaits are atomic (there's only one thread). 
  - we can demonstrate with asyncio by forcing a context switch: see `test_mutex_aio`
  - we also can also run the threadpool test using aio `test_mutex_threadpool_aio`
 

#### Thread pool test

In [9]:
import random
import time
from concurrent.futures import ThreadPoolExecutor
import threading

def test_mutex_threadpool():
    """Check we can protect a variable using a mutex. 
    
    Better demonstrated with threads rather than coroutines because with asyncio's event loop
    operations between "awaits" are atomic.
    
    Use a big threadpool and make the threads race hard 
    by all incrementing the counter many times.
     
    """
    max_workers=100
    pool = ThreadPoolExecutor(max_workers=max_workers)
    counter = 0
    lock = threading.Lock()
    
    iterations = 100_000
    def increment_shared_counter(_):
        nonlocal counter
        with lock:
            for _ in range(iterations):
                counter += 1
        #time.sleep(random.random())
        return counter
    intermediate_results = list(pool.map(increment_shared_counter, range(max_workers)))
    #print(sorted(intermediate_results))
    assert counter == iterations * max_workers, {
        k:v for k,v in locals().items() if k in 
        ['counter', 'intermediate_results']
    }
    
test_mutex_threadpool() 

#### Thread pool + asyncio test

In [10]:
async def test_mutex_threadpool_aio():
    """Check we can protect a variable using a mutex. 
    
    Use a big threadpool and make the threads race hard 
    by all incrementing the counter many times. 
    
    Use asyncio to run the experiment via loop.run_in_executor 
    """
    pool = ThreadPoolExecutor(max_workers=100)
    lock = threading.Lock()
    counter = 0

    def increment_shared_counter(_):
        nonlocal counter
        with lock:
            for _ in range(100_000):
                counter = counter + 1
        #time.sleep(random.random())
        return counter
    
    loop = asyncio.get_event_loop()
    tasks = [
        loop.run_in_executor(pool, increment_shared_counter, i) 
        for i in range(100)
    ]
    intermediate_results = await asyncio.gather(*tasks)
    #print(sorted(intermediate_results))
    assert counter == 100_000 * 100, {
        k:v for k,v in locals().items() if k in 
        ['counter', 'intermediate_results']
    }

In [11]:
await test_mutex_threadpool_aio()

#### Minimal asyncio test

In [12]:
async def test_mutex_aio():
    """Check we can protect a variable using a mutex. 
    
    Minimal test with asyncio to force a context switch between 
    reading and updating the counter. 
    """
    counter = 0
    lock = asyncio.Lock()
    
    async def increment_shared_counter():
        nonlocal counter
        async with lock:   # fails 
            # this is a bit convoluted but is a simple way to 
            # simulating non-thread safe concurrent updates to counter:
            # we're sleeping between reading and incrementing the value of counter
            # which allows a context switch: without the lock this will fail the test
            x = counter
            # forcing a context switch by sleeping 
            await asyncio.sleep(0.001)
            counter = x + 1
        return counter
    tasks = [increment_shared_counter() for _ in range(2)]
    intermediate_results = await asyncio.gather(*tasks)
    #print(sorted(intermediate_results))
    assert counter == 1 * 2, {
        k:v for k,v in locals().items() if k in 
        ['counter', 'intermediate_results']
    }

In [13]:
await test_mutex_aio()

### Multiplex
A kind of bounded mutex where up to a certain number of threads are allowed to own a lock. 

Think of a bouncer who allows up to certain number of people to get into a nightclub (or a supermarket during Covid 19 outbreak), and bounces people off when the max capacity has been reached. Then whenever someone leaves the room, another person is allowed to get in.

In [14]:
async def test_multiplex(attempt):
    """Check the max number of active threads in the critical region is as expected.
    
    Use a counter and a variable to keep track of the max number of threads, and 
    verify it's <= the size of the multiplex. 
    """
    multiplex = asyncio.Semaphore(5)
    # we're using asyncio so no real need for locks 
    # to protect these variables: operations on them will be atomic 
    # in between awaits. 
    counter = 0
    max_counter = 0
    
    async def multiplexed(name):
        nonlocal counter
        nonlocal max_counter
        async with multiplex:
            counter += 1    # enterning critical zone 
            if counter >= max_counter:
                max_counter = counter
            await asyncio.sleep(0.1)  # block to allow context switch
            counter -= 1  # exiting critical zone 
        return max_counter
    tasks = [multiplexed(name) for name in range(7)]
    intermediate_results = await asyncio.gather(*tasks)
    assert max_counter == 5, {
        k:v for k,v in locals().items() if k in 
        ['max_counter', 'intermediate_results', 'attempt']
    }

In [15]:
await run_test(test_multiplex, 10)

In [16]:
loop = asyncio.get_event_loop()

### Barrier
Generalised rendez vous: all coroutines have to wait at the barrier until all of them have arrived. 



In [17]:
async def test_barrier(attempt):
    """All coroutines must wait at the barrier until they're all waiting behind it. 
    Then the barrier opens. 
    
    Testing strategy: all coroutines add their own key,value pair to a shared dictionary. 
    Once a coroutine passes the barrier, it retrieves all key value pairs from the dictionary. 
    If the barrier's been effective, all coroutines should read the same content made of one key value pair per coroutine. 
    """
    barrier = asyncio.Semaphore(0)  
    barrier_size = 5         # open barrier if nb threads behind barrier reaches this count
    lock = asyncio.Lock()    # to protect access to dictionary
    shared_dict = {}
    arrived_count = 0
    
    async def read_items_from_dict():
        """Return tuple of (key, value) pairs giving the items in the dictionary."""
        async with lock:
            res = tuple(sorted(shared_dict.items()))
        return res
    
    async def push_wait_then_read(key, value):
        nonlocal arrived_count
        async with lock:
            shared_dict[key] = value 
        arrived_count += 1
        if arrived_count == barrier_size:
            barrier.release()
        else:
            await barrier.acquire()
            barrier.release()
        items = await read_items_from_dict()
        return items 
    
    expected_items = tuple((key, value) for key, value in zip(list('ABCDE'), range(5)))
    tasks = [push_wait_then_read(key, value) for (key, value) in expected_items]
    received_items_per_coroutine = await asyncio.gather(*tasks)
    assert all(
        (received_items == expected_items) 
        for received_items 
        in received_items_per_coroutine
    ), {
        k:v for k,v in locals().items() if k in 
        ['received_items_per_coroutine', 'attempt']
    }
        

In [18]:
await run_test(test_barrier, 10)

### Reusable Barrier
Generalised rendez vous: all coroutines have to wait at the barrier until all of them have arrived. Barrier is in a loop so needs to be reused. 



In [19]:
async def test_reusable_barrier(attempt):
    """All coroutines must wait at the barrier until they're all waiting behind it. 
    Then the barrier opens. 
    
    Testing strategy: all coroutines add their own key,value pair to a shared dictionary. 
    Once a coroutine passes the barrier, it retrieves all key value pairs from the dictionary. 
    If the barrier's been effective, all coroutines should read the same content made of one key value pair per coroutine. 
    """
    turnstile = asyncio.Semaphore(0)  
    barrier_size = 5         # open barrier if nb threads behind barrier reaches this count
    lock = asyncio.Lock()    # to protect access to dictionary
    shared_dict = {}
    arrived_count = 0
    released_count = 0
    
    async def read_items_from_dict():
        """Return tuple of (key, value) pairs giving the items in the dictionary."""
        async with lock:
            res = tuple(sorted(shared_dict.items()))
        return res
    
    async def push_wait_then_read(key, value):
        nonlocal arrived_count
        nonlocal released_count 
        async with lock:
            shared_dict[key] = value 
        arrived_count += 1
        if arrived_count == barrier_size:
            turnstile.release()
            released_count += 1
        else:
            await turnstile.acquire()
            released_count += 1 
            if released_count < barrier_size:
                turnstile.release()
            else:
                # we're done: last thread exiting the turnstile,
                # so let's reset the counters so we can reuse 
                # the barrier again 
                arrived_count = 0
                released_count = 0
        items = await read_items_from_dict()
        return items 
    
    expected_items = tuple((key, value) for key, value in zip(list('ABCDE'), range(5)))
    # first time 
    tasks1 = [push_wait_then_read(key, value) for (key, value) in expected_items]
    received_items_per_coroutine1 = await asyncio.gather(*tasks1)
    assert all(
        (received_items == expected_items) 
        for received_items 
        in received_items_per_coroutine1
    ), {
        k:v for k,v in locals().items() if k in 
        ['received_items_per_coroutine1', 'attempt']
    }
    # second time 
    shared_dict = {}  # clear the dict
    tasks2 = [push_wait_then_read(key, value) for (key, value) in expected_items]
    received_items_per_coroutine2 = await asyncio.gather(*tasks2)
    
    assert all(
        (received_items == expected_items) 
        for received_items 
        in received_items_per_coroutine2
    ), {
        k:v for k,v in locals().items() if k in 
        ['received_items_per_coroutine2', 'attempt']
    }
        

In [20]:
await run_test(test_reusable_barrier, 10)