# Coroutines

We find two main senses for the verb “to yield” in dictionaries: to produce or to give way. Both senses apply in Python when we use the yield keyword in a generator.

- A line such as yield item produces a value that is received by the caller of next(…)
-  it also gives way, suspending the execution of the generator so that the caller may proceed until it’s ready to consume another value by invoking next() again.
-  A coroutine is syntactically like a generator: just a function with the yield keyword in its body
   -  However, in a coroutine, yield usually appears on the right side of an expression
   -  it may or may not produce a value
   -  if there is no expression after the yield keyword, the generator yields None.
   -  The coroutine may receive data from the caller, which uses .send(datum) instead of next(…) to feed the coroutine. Usually, the caller pushes values into the coroutine
   -  It is even possible that no data goes in or out through the yield keyword.
   -  Regardless of the flow of data, yield is a control flow device that can be used to implement cooperative multitasking: each coroutine yields control to a central scheduler so that other coroutines can be activated.

## Basic Behavior of a Generator Used as a Coroutine


In [1]:
def simple_coroutine():
    print('-> coroutine started')
    x = yield
    print('-> coroutine received:', x)
    
    
my_coro = simple_coroutine()
my_coro

<generator object simple_coroutine at 0x000001968A08EAC0>

In [2]:
next(my_coro)

-> coroutine started


In [3]:
my_coro.send(42)

-> coroutine received: 42


StopIteration: 

1. A coroutine is defined as a generator function with yield in its body
2. yield is used in an expression; when the coroutine is designed just to receive data from the client it yields None -- this is implicit because there is no expression to the right of the yield
3. As usual  with generators, you call the function to get a generator object back
4. The first call is next(...) because the generator hasn't started so it's not waiting in a yield and we can't send it any data initially
5. This call makes the yield in the coroutine body evaluate to 42; now the coroutine resume and runs until the next yield or termination
6. In this case, control flows off the end of the coroutine body, which prompts the generator machinery to raise StopIteration, as usual.

A coroutine can be in one of `four states`. You can determine the current state using the `inspect.getgeneratorstate(…)` function, which returns one of these strings:

- 'GEN_CREATED'
  - Waiting to start execution.
- 'GEN_RUNNING'
  - Currently being executed by the interpreter.
- 'GEN_SUSPENDED'
  - Currently suspended at a yield expression
- 'GEN_CLOSED'
  - Execution has completed.



In [4]:
my_coro = simple_coroutine()
my_coro.send(1729)

TypeError: can't send non-None value to a just-started generator

you can only make a call like my_coro.send(42) if the coroutine is currently suspended

In [1]:
def simple_coro2(a):
    print('-> Started: a =', a)
    b = yield a
    print('-> Received: b =', b)
    c = yield a + b
    print('-> Received: c =', c)

In [2]:
my_coro2 = simple_coro2(14)

In [3]:
from inspect import getgeneratorstate

In [4]:
getgeneratorstate(my_coro2)

'GEN_CREATED'

In [5]:
next(my_coro2)

-> Started: a = 14


14

In [6]:
getgeneratorstate(my_coro2)

'GEN_SUSPENDED'

In [7]:
my_coro2.send(28) 

-> Received: b = 28


42

In [8]:
my_coro2.send(99)

-> Received: c = 99


StopIteration: 

In [9]:
getgeneratorstate(my_coro2)

'GEN_CLOSED'

1. inspect.getgeneratorstate reports GEN_CREATED (i.e., the coroutine has not started).
2. Advance coroutine to first yield, printing -> Started: a = 14 message then yielding value of a and suspending to wait for value to be assigned to b.
3. getgeneratorstate reports GEN_SUSPENDED (i.e., the coroutine is paused at a yield expression).
4. Send number 28 to suspended coroutine;  the yield expression evaluates to 28 and that number is bound to b. The value of a + b is yielded (42), and the coroutine is suspended waiting for the value to be assigned to c.
5. Send number 99 to suspended coroutine; the yield expression evaluates to 99then the coroutine terminates, causing the generator object to raise StopIteration.
6. getgeneratorstate reports GEN_CLOSED (i.e., the coroutine execution has completed)


In a line like `b = yield a`, the value of b will only be set when the coroutine is activated later by the client code.

## Example: Coroutine to Compute a Running Average


In [20]:
def averager():
    total = 0.0
    count = 0 
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average =  total/count
        

1. This infinite loop means this coroutine will keep on accepting values and producing results as long as the caller sends them. 
2. This coroutine will only terminate when the caller calls `.close()` on it, or when it’s garbage collected because there are no more references to it.

In [21]:
coro_avg = averager()
next(coro_avg)
coro_avg.send(10)

10.0

In [12]:
coro_avg.send(30)

20.0

In [13]:
coro_avg.send(5)

15.0

## Decorators for Coroutine Priming

1. You can’t do much with a coroutine without priming it: we must always remember to call next(my_coro) before my_coro.send(x). 
2. To make coroutine usage more convenient, a priming decorator is sometimes used.

In [22]:
from functools import wraps

def coroutine(func):
    """Decorator: primes `func` by advancing to first `yield`"""
    @wraps(func)
    def primer(*args,**kwargs):
        gen = func(*args,**kwargs)
        next(gen)
        return gen
    return primer

In [23]:
@coroutine
def averager():
    total = 0.0
    count = 0 
    average = None
    while True:
        term = yield average
        total += term
        count += 1
        average =  total/count

In [24]:
dec_coro_avg = averager()
dec_coro_avg.send(10)

10.0

## Coroutine Termination and Exception Handling

An unhandled exception within a coroutine propagates to the caller of the next or send that triggered it.

In [25]:
dec_coro_avg.send(100)

55.0

In [26]:
dec_coro_avg.send('100')

TypeError: unsupported operand type(s) for +=: 'float' and 'str'

- `generator.throw(exc_type[, exc_value[, traceback]])`
  - Causes the yield expression where the generator was paused to raise the exception given.
  - If the exception is handled by the generator, flow advances to the next yield, and the value yielded becomes the value of the generator.throw call
  - If the exception is not handled by the generator, it propagates to the context of the caller.
- `generator.close()`
  - Causes the yield expression where the generator was paused to raise a Generator Exit exception.
  -  No error is reported to the caller if the generator does not handle that exception or raises StopIteration
  -  When receiving a GeneratorExit, the generator must not yield a value, otherwise a Run timeError is raised.
  - If any other exception is raised by the generator, it propagates to the caller.

In [27]:
class DemoException(Exception):
    """An exception type for the demonstration"""
    
def demo_exc_handling():
    print(' -> coroutine started')
    while True:
        try:
            x = yield
        except DemoException:
            print('*** DemoException handled. Continuing...')
        else:
            print('-> coroutine received: {!r}'.format(x))


Activating and closing demo_exc_handling without an exception

In [28]:
exc_coro = demo_exc_handling()
next(exc_coro)

 -> coroutine started


In [29]:
exc_coro.send(11)

-> coroutine received: 11


In [30]:
exc_coro.close()

In [31]:
getgeneratorstate(exc_coro)

'GEN_CLOSED'

If the DemoException is thrown into the coroutine, it’s handled and the demo_exc_handling coroutine continues

In [32]:
exc_coro = demo_exc_handling()
next(exc_coro)

 -> coroutine started


In [33]:
exc_coro.send(11)

-> coroutine received: 11


In [34]:
exc_coro.throw(DemoException)

*** DemoException handled. Continuing...


In [35]:
getgeneratorstate(exc_coro)

'GEN_SUSPENDED'

Coroutine terminates if it can’t handle an exception thrown into it

In [36]:
exc_coro = demo_exc_handling()
next(exc_coro)
exc_coro.send(11)
exc_coro.throw(ZeroDivisionError)

 -> coroutine started
-> coroutine received: 11


ZeroDivisionError: 

In [37]:
getgeneratorstate(exc_coro)

'GEN_CLOSED'

If it’s necessary that some cleanup code is run no matter how the coroutine ends, you need to wrap the relevant part of the coroutine body in a try/finally block

In [38]:
class DemoException(Exception):
    """An exception type for the demonstration."""
    
    
def demo_finally():
    print(' -> coroutine started')
    try:
        while True:
            try:
                x = yield
            except DemoException:
                print('*** DemoException handled. Continuing...')
            else:
                print('-> coroutine received: {!r}'.format(x))
    finally:
        print('-> coroutine ending')


## Returning a Value from a Coroutine

- some coroutines do not yield anything interesting, but are designed to return a value at the end, often the result of some accumulation

In [39]:
from collections import namedtuple

Result = namedtuple('Result', 'count average')

def averager():
 total = 0.0
 count = 0
 average = None
 while True:
    term = yield
    if term is None:
        break 
    total += term
    count += 1
    average = total/count
 return Result(count, average) 

1. In order to return a value, a coroutine must terminate normally; this is why this version of averager has a condition to break out of its accumulating loop.
2. Return a namedtuple with the count and average. 
3. Sending None terminates the loop, causing the coroutine to end by returning the result. As usual, the generator object raises StopIteration. The value attribute of the exception carries the value returned.

In [40]:
coro_avg = averager()
next(coro_avg)
coro_avg.send(10) 
coro_avg.send(30)
coro_avg.send(6.5)
coro_avg.send(None) 

StopIteration: Result(count=3, average=15.5)

In [41]:
coro_avg = averager()
next(coro_avg)
coro_avg.send(10) 
coro_avg.send(30)
coro_avg.send(6.5)
try:
    coro_avg.send(None)
except StopIteration as exc:
    result = exc.value
result

Result(count=3, average=15.5)

## Using yield from

In [42]:
def gen():
    for c in 'AB':
        yield c
    for i in range(1, 3):
       yield i
       
list(gen()) 

['A', 'B', 1, 2]

In [43]:
def gen():
    yield from 'AB'
    yield from range(1, 3)
    
list(gen())

['A', 'B', 1, 2]

In [44]:
def chain(*iterables):
    for it in iterables:
        yield from it
        
s = 'ABC'
t = tuple(range(3))
list(chain(s, t))

['A', 'B', 'C', 0, 1, 2]

- `delegating generator`
  - The generator function that contains the yield from <iterable> expression
- `subgenerator`
  - The generator obtained from the <iterable> part of the yield from expression.
- `caller`
  -  client code that calls the delegating generator. 


1. While the delegating generator is suspended at yield from
2. the caller sends data directly to the subgenerator
3. which yields data back to the caller

In [49]:
from collections import namedtuple

Result = namedtuple('Result', 'count average')

# the subgenerator
def averager(): 
    total = 0.0
    count = 0
    average = None
    while True:
        term = yield 
        if term is None: 
            break
        total += term
        count += 1
        average = total/count
    return Result(count, average)


# the delegating generator
def grouper(results, key): 
    while True: 
        results[key] = yield from averager()
        

# the client code, a.k.a. the caller
def main(data): 
    results = {}
    for key, values in data.items():
        group = grouper(results, key) 
        next(group) 
    for value in values:
        group.send(value) 
    group.send(None) # important! 
    print(results) # uncomment to debug
    report(results)
    

# output report
def report(results):
    for key, result in sorted(results.items()):
        group, unit = key.split(';')
        print('{:2} {:5} averaging {:.2f}{}'.format(
        result.count, group, result.average, unit))

In [50]:
data = {
 'girls;kg':
 [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5],
 'girls;m':
 [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43],
 'boys;kg':
 [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3],
 'boys;m':
 [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46],
}

In [51]:
main(data)

{'boys;m': Result(count=9, average=1.3888888888888888)}
 9 boys  averaging 1.39m


1. Same averager coroutine as before
2. Each value sent by the client code in main will be bound to term here
3. The crucial terminating condition. Without it, a yield from calling this coroutine will block forever.
4. The returned Result will be the value of the yield from expression in grouper
5. grouper is the delegating generator.
6. Each iteration in this loop creates a new instance of averager; each is a generator object operating as a coroutine.
7. Whenever grouper is sent a value, it’s piped into the averager instance by the yield from. grouper will be suspended here as long as the averager instance is consuming values sent by the client. When an averager instance runs to the end, the value it returns is bound to results[key]. The while loop then proceeds to create another averager instance to consume more values.
8. main is the client code, or “caller” in PEP 380 parlance. This is the function that drives everything.
9. group is a generator objectresulting from calling grouper with the results dict to collect the results, and a particular key. It will operate as a coroutine
10. Prime the coroutine
11. Send each value into the grouper. That value ends up in the term = yield line of averager; grouper never has a chance to see it.
12. Sending None into grouper causes the current averager instance to terminate, and allows grouper to run again, which creates another averager for the next group of values

## The Meaning of yield from


- Any values that the subgenerator yields are passed directly to the caller of the delegating generator (i.e., the client code).
- Any values sent to the delegating generator using send() are passed directly to the subgenerator. If the sent value is None, the subgenerator’s `__next__()` method is called. If the sent value is not None, the subgenerator’s send() method is called. If the call raises StopIteration, the delegating generator is resumed. Any other exception is propagated to the delegating generator
- return expr in a generator (or subgenerator) causes StopIteration(expr) to be raised upon exit from the generator.
- The value of the yield from expression is the first argument to the StopIteration exception raised by the subgenerator when it terminates

The other two features of yield from have to do with exceptions and termination:
- Exceptions other than GeneratorExit thrown into the delegating generator are passed to the throw() method of the subgenerator. If the call raises StopIteration, the delegating generator is resumed. Any other exception is propagated to the delegating generator.
- If a GeneratorExit exception is thrown into the delegating generator, or the close() method of the delegating generator is called, then the close() method of the subgenerator is called if it has one. If this call results in an exception, it is propagated to the delegating generator. Otherwise, GeneratorExit is raised in the delegating generator

## Use Case: Coroutines for Discrete Event Simulation

Coroutines are a natural way of expressing many algorithms, such as simulations, games, asynchronous I/O, and other forms of event-driven programming or co-operative multitasking.

### About Discrete Event Simulations

A discrete event simulation (DES) is a type of simulation where a system is modeled as a sequence of events. s. In a DES, the simulation “clock” does not advance by fixed increments, but advances directly to the simulated time of the next modeled event.

In [4]:
import collections

Event = collections.namedtuple('Event', 'time proc action')

def taxi_process(ident, trips, start_time=0): 
    """Yield to simulator issuing event at each state change"""
    time = yield Event(start_time, ident, 'leave garage') 
    for i in range(trips): 
        time = yield Event(time, ident, 'pick up passenger') 
        time = yield Event(time, ident, 'drop off passenger') 
    yield Event(time, ident, 'going home')

In [2]:
taxi = taxi_process(ident=13, trips=2, start_time=0)
next(taxi)

Event(time=0, proc=13, action='leave garage')

In [3]:
taxi.send(_.time + 7)

Event(time=7, proc=13, action='pick up passenger')

In [4]:
taxi.send(_.time + 23)

Event(time=30, proc=13, action='drop off passenger')

In [5]:
taxi.send(_.time + 5)

Event(time=35, proc=13, action='pick up passenger')

In [6]:
taxi.send(_.time + 48)

Event(time=83, proc=13, action='drop off passenger')

In [7]:
taxi.send(_.time + 1)

Event(time=84, proc=13, action='going home')

In [8]:
taxi.send(_.time + 10)

StopIteration: 

1. Create a generator object to represent a taxi with ident=13 that will make two trips and start working at t=0
2. prim the coroutine, it yields the initial event
3. can now send current time, the _ variable is bound to the last result
4. After two complete trips, the loop ends and the 'going home' event is yielded.
5. The next attempt to send to the coroutine causesit to fall through the end. When it returns, the interpreter raises StopIteration.

In [2]:
import queue

class Simulator:
    
    def __init__(self, procs_map):
        self.events = queue.PriorityQueue()
        self.procs = dict(procs_map)
        
    def run(self, end_time): 
        """Schedule and display events until time is up"""
        # schedule the first event for each cab
        for _, proc in sorted(self.procs.items()): 
            first_event = next(proc) 
            # main loop of the simulation
            sim_time = 0 
            while sim_time < end_time: 
                if self.events.empty(): 
                    print('*** end of events ***')
                    break
            current_event = self.events.get() 
            sim_time, proc_id, previous_action = current_event 
            print('taxi:', proc_id, proc_id * ' ', current_event) 
            active_proc = self.procs[proc_id] 
            next_time = sim_time + compute_duration(previous_action) 
            try:
                 next_event = active_proc.send(next_time) 
            except StopIteration:
                del self.procs[proc_id] 
            else:
                self.events.put(next_event) 
        else: 
            msg = '*** end of simulation time: {} events pending ***'
            print(msg.format(self.events.qsize()))

In [1]:
taxis = {0: taxi_process(ident=0, trips=2, start_time=0),
            1: taxi_process(ident=1, trips=4, start_time=5),
            2: taxi_process(ident=2, trips=6, start_time=10)}
 
sim = Simulator(taxis)
sim.run(10)

# Chapter Summary

- common use for a coroutine: as an accumulator processing items
- decorator can be applied to prime a coroutine, making it more convenient to use in some cases.
- coverage of yield from
  - delegating generator (defined by the use of yield from in its body)
  - the subgenerator activated by yield from
  - client code tha actually drives the whole setup by sending values to the subgenerator through the pass-through channel established by yield from in the delegating generator