# Task Synchronization



In [1]:
import asyncio
import functools

In [5]:
# broom wagon coroutine
async def broom_wagon(tasks):
    """Waits for all tasks to complete before finishing."""
    done, pending = await asyncio.wait(tasks, return_when='ALL_COMPLETED')
    print('ALL TASKS COMPLETED')
    return done, pending

## Event

Use an `Event` to **synchronize the scheduling of multiple consumers**. (ie to trigger multiple coroutines at once

It is like runners in the starting blocks waiting for the gun shot.

Put simply, an event is like the gun shot at a running race, it lets the runners off the starting blocks.

### Event API

In [None]:
# instanciation
event = asyncio.Event()

# event status
event.is_set()  # True if the internal flag is True

# event manipulation
event.set()   # wake up all coroutines waiting on this event
event.clear() # set the internal flag to False

# event consumption
event.wait()! # wait until event is set

### Example

In [2]:
# event trigger function
def trigger(event):
    print('EVENT SET')
    event.set()

# event consumers
async def consumer_a(event):
    consumer_name = 'Consumer A'
    print('{} waiting'.format(consumer_name))
    await event.wait()
    print('{} triggered'.format(consumer_name))

async def consumer_b(event):
    consumer_name = 'Consumer B'
    print('{} waiting'.format(consumer_name))
    await event.wait()
    print('{} triggered'.format(consumer_name))

# broom wagon coroutine
async def broom_wagon(tasks):
    """Waits for all tasks to complete before finishing."""
    done, pending = await asyncio.wait(tasks)
    print('ALL TASKS COMPLETED')
    return done, pending

# event
event = asyncio.Event()

# tasks
tasks = [consumer_a(event), consumer_b(event)]

# event loop
event_loop = asyncio.get_event_loop()
event_loop.call_later(0.1, functools.partial(trigger, event))  # kick off event in 0.1 sec

done, pending = event_loop.run_until_complete(broom_wagon(tasks))

Consumer B waiting
Consumer A waiting
EVENT SET
Consumer B triggered
Consumer A triggered
ALL TASKS COMPLETED


## Condition

Use a `Condition` to

A condition variable allows one or more coroutines to wait until they are notified by another coroutine.

### Condition API

In [None]:
# instanciation
condition = asyncio.Condition()

# condition manipulation
condition.notify(n)  # wake up 'n' waiting coroutines (if any)
condition.notify_all() # wake up all waiting coroutines (if any)

# condition consumption
condition.wait()!  # wait until notified
condition.wait_for(predicate)!  # wait until the predicate becomes True

# other
condition.acquire()!  # acquire the underlying lock
condition.locked()    # True if the underlying lock is acquired
condition.release()   # release the underlying lock

### Example

In [3]:
# condition manipulation function
async def manipulate(condition):
    print('START MANIPULATION')

    # pause to let consumers start
    await asyncio.sleep(0.1)

    # notify 1 consumer, then notify 2 consumers
    for i in range(1, 3):
        with await condition:
            print('NOTIFY {} CONSUMERS'.format(i))
            condition.notify(n=i)
        await asyncio.sleep(0.1)

    # notify remaining consumers
    with await condition:
        print('NOTIFY REMAINING')
        condition.notify_all()

    print('END MANIPULATION')

# condition consumers
async def consumer(condition, n):
    consumer_name = 'Consumer {}'.format(n)
    with await condition:
        # condition is unlocked
        print('{} waiting'.format(consumer_name))
        await condition.wait()
        print('{} triggered'.format(consumer_name))
    
    print('{} done'.format(consumer_name))

# broom wagon coroutine
async def broom_wagon(tasks):
    """Waits for all tasks to complete before finishing."""
    done, pending = await asyncio.wait(tasks)
    print('ALL TASKS COMPLETED')
    return done, pending

# condition
condition = asyncio.Condition()

# wrap tasks in one future
consumers = [consumer(condition, i) for i in range(5)]
# main_future = asyncio.wait(consumers)

# event loop
event_loop = asyncio.get_event_loop()
event_loop.create_task(manipulate(condition))
# event_loop.call_later(0.1, functools.partial(manipulate, condition))

result = event_loop.run_until_complete(broom_wagon(consumers))


START MANIPULATION
Consumer 3 waiting
Consumer 4 waiting
Consumer 2 waiting
Consumer 1 waiting
Consumer 0 waiting
NOTIFY 1 CONSUMERS
Consumer 3 triggered
Consumer 3 done
NOTIFY 2 CONSUMERS
Consumer 4 triggered
Consumer 4 done
Consumer 2 triggered
Consumer 2 done
NOTIFY REMAINING
END MANIPULATION
Consumer 1 triggered
Consumer 1 done
Consumer 0 triggered
Consumer 0 done
ALL TASKS COMPLETED


## Queue

https://docs.python.org/dev/library/asyncio-queue.html

Use a `Queue` to **coordinate producer and consumer coroutines**.

### Queue API

In [None]:
# instanciation
queue = asyncio.Queue()

# queue info
queue.maxsize  # number of items allowed in the queue
queue.qsize()  # number of items in the queue
queue.empty()  # True if the queue is empty
queue.full()   # True if the queue has `maxsize` items

# queue manipulation
queue.put(item)!  # appends an item to the queue (if full, waits for a slot)
queue.get()!      # pops the first item of the queue (if empty, waits for an item)
queue.put_nowait(item) # like .put(item), but raises QueueFull if queue is full (no blocking)
queue.get_nowait()     # like .get(), but raises QueueEmpty if queue is empty (no blocking)

# other
queue.join()!  # blocks the queue until all items have been gotten and processed
queue.task_done()  # called by consumers to indicate when a formerly enqueued task (aka item) is complete

### Example

In [4]:
# queue
queue = asyncio.Queue()
for x in range(20):
    queue.put_nowait(x)
print('QUEUE : {}'.format(queue))

# queue consumers
async def consumer_a(queue):
    consumer_name = 'Consumer A'
    consumer_speed = 1
    while not queue.empty():
        item = await queue.get()
        print('{0} grabbed item: {1}'.format(consumer_name, item))
        await asyncio.sleep(1 / consumer_speed)

async def consumer_b(queue):
    consumer_name = 'Consumer B'
    consumer_speed = 2
    while not queue.empty():
        item = await queue.get()
        print('{0} grabbed item: {1}'.format(consumer_name, item))
        await asyncio.sleep(1 / consumer_speed)

async def consumer_c(queue):
    consumer_name = 'Consumer C'
    consumer_speed = 3
    while not queue.empty():
        item = await queue.get()
        print('{0} grabbed item: {1}'.format(consumer_name, item))
        await asyncio.sleep(1 / consumer_speed)

# broom wagon coroutine
async def broom_wagon(tasks):
    """Waits for all tasks to complete before finishing."""
    done, pending = await asyncio.wait(tasks, return_when='ALL_COMPLETED')
    print('ALL TASKS COMPLETED')
    return done, pending

# event loop
event_loop = asyncio.get_event_loop()

# tasks
tasks = [consumer_a(queue),
         consumer_b(queue),
         consumer_c(queue)]

done, pending = event_loop.run_until_complete(broom_wagon(tasks))
print('TASKS DONE : {}'.format(done))
print('TASKS PENDING : {}'.format(pending))

QUEUE : <Queue maxsize=0 _queue=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19] tasks=20>
Consumer A grabbed item: 0
Consumer B grabbed item: 1
Consumer C grabbed item: 2
Consumer C grabbed item: 3
Consumer B grabbed item: 4
Consumer C grabbed item: 5
Consumer A grabbed item: 6
Consumer B grabbed item: 7
Consumer C grabbed item: 8
Consumer C grabbed item: 9
Consumer B grabbed item: 10
Consumer C grabbed item: 11
Consumer A grabbed item: 12
Consumer B grabbed item: 13
Consumer C grabbed item: 14
Consumer C grabbed item: 15
Consumer B grabbed item: 16
Consumer C grabbed item: 17
Consumer A grabbed item: 18
Consumer B grabbed item: 19
ALL TASKS COMPLETED
TASKS DONE : {<Task finished coro=<consumer_a() done, defined at <ipython-input-4-c85ff10172a3>:8> result=None>, <Task finished coro=<consumer_c() done, defined at <ipython-input-4-c85ff10172a3>:24> result=None>, <Task finished coro=<consumer_b() done, defined at <ipython-input-4-c85ff10172a3>:16> result=None>}
TASK

## Queue

https://docs.python.org/3/library/asyncio-sync.html#asyncio.Lock

Use a `Queue` to **coordinate producer and consumer coroutines**.
Use a `Lock` to **guard access to a shared resource**.

A `Lock` is in one of two states:
- *locked*: the lock is acquired. Ac
- *unlocked*: the lock is released. Access is blocked 

### Lock API

In [None]:
# initialization
lock = asyncio.Lock()

# lock states
lock.locked()  # True if lock is locked/acquired, False if it is unlocked/released

# lock manipulation
lock.acquire()! # sets the lock state to 'locked'. waits for lock to be unlocked first 
lock.release()  # sets the lock state to 'unlocked'

### Example

In [17]:
def unlock(lock):
    print('RELEASE LOCK')
    lock.release()


async def consumer_a(lock):
    consumer_name = 'Consumer A'
    print('{} waiting'.format(consumer_name))
    # lock context management usage (implicit release)
    with await lock:
        print('{} acquired lock'.format(consumer_name))
    print('{} released lock'.format(consumer_name))


async def consumer_b(lock):
    consumer_name = 'Consumer B'
    print('{} waiting'.format(consumer_name))
    # lock direct usage (explicit release)
    await lock
    try:
        print('{} acquired lock'.format(consumer_name))
    finally:
        print('{} released lock'.format(consumer_name))
        lock.release()

# lock
lock = asyncio.Lock()

# event loop
event_loop = asyncio.get_event_loop()

# set lock state to 'locked' (before starting coroutines)
event_loop.run_until_complete(lock.acquire())

# unlock the lock after 0.1sec.
event_loop.call_later(0.1, functools.partial(unlock, lock))

# tasks
tasks = [consumer_a(lock), consumer_b(lock)]

done, pending = event_loop.run_until_complete(broom_wagon(tasks))

Consumer B waiting
Consumer A waiting
RELEASE LOCK
Consumer B acquired lock
Consumer B released lock
Consumer A acquired lock
Consumer A released lock
ALL TASKS COMPLETED
