# Generators and Coroutines

## Processing Pipelines

In [1]:
def gen_lines(filename):
    """This is a 'source', so it does not take a generator as input."""
    with open(filename) as fp:
        for line in fp:
            yield line

In [2]:
def gstrip(gen):
    for line in gen:
        yield line.strip()

In [3]:
def gdecomment(gen):
    for line in gen:
        if line.startswith('#'):
            continue
        yield line

In [4]:
def gsplit(gen):
    for line in gen:
        yield line.split()

In [8]:
x = gen_lines('/etc/hosts')
x = gstrip(x)
x = gdecomment(x)
x = gsplit(x)
for line in x:
    print(line)

['127.0.0.1', 'localhost', 'localhost.carefol.io']
['255.255.255.255', 'broadcasthost']
['::1', 'localhost']
['fe80::1%lo0', 'localhost']
['127.0.0.1', 'eht_cf-web_1', 'eht_sso-web_1', 'eht_blob-web_1', 'eht_pcc-gw-web_1']


In [9]:
x = gen_lines('/etc/hosts')
x = (line.strip() for line in x)
x = (line for line in x if not line.startswith('#'))
x = (line for line in x if line)
x = (line.split() for line in x)
for line in x:
    print(line)

['127.0.0.1', 'localhost', 'localhost.carefol.io']
['255.255.255.255', 'broadcasthost']
['::1', 'localhost']
['fe80::1%lo0', 'localhost']
['127.0.0.1', 'eht_cf-web_1', 'eht_sso-web_1', 'eht_blob-web_1', 'eht_pcc-gw-web_1']


## Coroutines

In [10]:
def simple_coro(name):
    while True:
        value = yield
        print('coro {} got {}'.format(name, value))

In [11]:
sc0 = simple_coro('sc0')
sc1 = simple_coro('sc1')

In [12]:
next(sc0)
next(sc1)

In [13]:
sc0.send('foo')

coro sc0 got foo


In [14]:
sc1.send('bar')

coro sc1 got bar


### Coroutine use case: Forked Processing Pipelines

In [15]:
def handle_ipv4():
    value = yield
    while True:
        if value is None:
            break
        value = yield ('Got IPV4 line: {}'.format(value))
        
def handle_ipv6():
    value = yield
    while True:
        if value is None:
            break
        value = yield('Got IPV6 line: {}'.format(value))
        
ipv4_handler = handle_ipv4()
ipv6_handler = handle_ipv6()

# "start" the handlers
next(ipv4_handler)
next(ipv6_handler)
        
def splitter(gen):
    for line in gen:
        if '::' in line:
            yield ipv6_handler.send(line)
        else:
            yield ipv4_handler.send(line)
    ipv4_handler.send(None)
    ipv6_handler.send(None)

In [16]:
x = gen_lines('/etc/hosts')
x = gstrip(x)
x = gdecomment(x)
x = splitter(x)
for line in x:
    print(line)

Got IPV4 line: 127.0.0.1	localhost localhost.carefol.io
Got IPV4 line: 255.255.255.255	broadcasthost
Got IPV6 line: ::1             localhost
Got IPV6 line: fe80::1%lo0	localhost
Got IPV4 line: 127.0.0.1	eht_cf-web_1 eht_sso-web_1 eht_blob-web_1 eht_pcc-gw-web_1


RuntimeError: generator raised StopIteration

# Event streams

In [18]:
def every_xs(x):
    for i in range(5):
        name = yield x
        print('{}: every_xs({}, {})'.format(name, x, i))


In [19]:
import heapq

every_1s = every_xs(1)
every_2s = every_xs(2)
every_3s = every_xs(3)

def ev_loop(coros):
    now = 0
    events = []
    # Initialize coroutines
    for name, coro in coros.items():
        when = next(coro)
        heapq.heappush(events, (when + now, name, coro))
        
    while events:
        new_now, name, coro = heapq.heappop(events)
        if new_now != now:
            print('=== {} ==='.format(new_now))
            now = new_now
        try:
            offset = coro.send(name)
            sched = offset + now
            # print '{}: scheduled next event for {}'.format(name, offset+now)
            heapq.heappush(events, (offset + now, name, coro))
        except StopIteration:
            continue
        
        

In [20]:
ev_loop({'1s': every_1s, '2s': every_2s, '3s': every_3s})

=== 1 ===
1s: every_xs(1, 0)
=== 2 ===
1s: every_xs(1, 1)
2s: every_xs(2, 0)
=== 3 ===
1s: every_xs(1, 2)
3s: every_xs(3, 0)
=== 4 ===
1s: every_xs(1, 3)
2s: every_xs(2, 1)
=== 5 ===
1s: every_xs(1, 4)
=== 6 ===
2s: every_xs(2, 2)
3s: every_xs(3, 1)
=== 8 ===
2s: every_xs(2, 3)
=== 9 ===
3s: every_xs(3, 2)
=== 10 ===
2s: every_xs(2, 4)
=== 12 ===
3s: every_xs(3, 3)
=== 15 ===
3s: every_xs(3, 4)


In [21]:
def every_xs(sim, x):
    while True:
        print('Yielding from every_xs({})'.format(x))
        yield sim.delay(x)


In [22]:
class Event(object):
    
    def __init__(self, when, coro):
        self.when = when
        self.coro = coro
        
    def __lt__(self, other):
        return self.when < other.when

class Simulator(object):

    def __init__(self):
        self.now = 0
        self.events = []
        self.current = None

    def run(self, coro):
        try:
            self.current = coro
            ev = next(coro)
            offset, coro = ev.when, ev.coro
            heapq.heappush(self.events, Event(self.now + offset, coro))
        except StopIteration:
            pass
        
    def simulate(self, max_time=10):
        while self.events and self.now <= max_time:
            ev = heapq.heappop(self.events)
            self.now, self.current = ev.when, ev.coro
            try:
                ev = self.current.send(sim)
                offset, coro = ev.when, ev.coro
                heapq.heappush(self.events, Event(self.now + offset, coro))
            except StopIteration:
                pass
        print('Simulation terminated at', self.now)

            
    def delay(self, seconds):
        return Event(seconds, self.current)
        

In [23]:
sim = Simulator()
sim.run(every_xs(sim, 1))
sim.run(every_xs(sim, 2))
sim.run(every_xs(sim, 3))

Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(3)


In [24]:
sim.simulate()

Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(1)
Yielding from every_xs(3)
Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(1)
Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(3)
Yielding from every_xs(1)
Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(1)
Yielding from every_xs(3)
Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(1)
Yielding from every_xs(1)
Simulation terminated at 11


### Event streams use case: Asynchronous I/O

Asynchronous I/O can use event streams and coroutines to provide 'thread-like' syntax.

1. A top-level event loop is created to handle events
1. Each coroutine runs until it yields an object to 'wait' on -- call it an 'awaitable'. (This could be a socket that we are receiving data from.) The 'yield' goes to the event loop.
1. The event loop generally has a `select` or `poll` statement that checks for available data on all its awaitable objects.
1. When the 'awaitable' has data, that data is "sent" into the coroutine, which picks up *as though it had blocked waiting on data*


Open [Advanced Generators Lab](./advanced-generators-lab.ipynb)