# Generators and Coroutines

## Processing Pipelines

In [1]:
def gen_lines(filename):
    """This is a 'source', so it does not take a generator as input."""
    with open(filename) as fp:
        for line in fp:
            yield line
            

In [2]:
def gstrip(gen):
    for line in gen:
        yield line.strip()

In [3]:
def gdecomment(gen):
    for line in gen:
        if line.startswith('#'):
            continue
        if not line:
            continue
        yield line

In [4]:
def gsplit(gen):
    for line in gen:
        yield line.split()

In [5]:
def remove_bom(gen):
    for line in gen:
        if line == '\ufeff':
            continue
        else:
            yield line

In [6]:
x = gen_lines('/etc/hosts')
x = gstrip(x)
x = gdecomment(x)
x = remove_bom(x)   # for wsl2...
x = gsplit(x)
for words in x:
    print(words)

['127.0.0.1', 'localhost']
['127.0.1.1', 'theodin.localdomain', 'theodin']
['192.168.1.90', 'host.docker.internal']
['192.168.1.90', 'gateway.docker.internal']
['127.0.0.1', 'kubernetes.docker.internal']
['::1', 'ip6-localhost', 'ip6-loopback']
['fe00::0', 'ip6-localnet']
['ff00::0', 'ip6-mcastprefix']
['ff02::1', 'ip6-allnodes']
['ff02::2', 'ip6-allrouters']


In [7]:
!cat /etc/hosts

# This file was automatically generated by WSL. To stop automatic generation of this file, add the following entry to /etc/wsl.conf:
# [network]
# generateHosts = false
127.0.0.1	localhost
127.0.1.1	theodin.localdomain	theodin
﻿
192.168.1.90	host.docker.internal
192.168.1.90	gateway.docker.internal
127.0.0.1	kubernetes.docker.internal

# The following lines are desirable for IPv6 capable hosts
::1     ip6-localhost ip6-loopback
fe00::0 ip6-localnet
ff00::0 ip6-mcastprefix
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters


In [10]:
x = gen_lines('/etc/hosts')
x = (line.strip() for line in x)   # strip
x = (line for line in x if not line.startswith('#'))  # decomment
x = (line for line in x if line)  # remove empty lines
x = (line for line in x if line != '\ufeff')  # remove BOM
x = (line.split() for line in x)  # split into words
for words in x:
    print(words)

['127.0.0.1', 'localhost']
['127.0.1.1', 'theodin.localdomain', 'theodin']
['192.168.1.90', 'host.docker.internal']
['192.168.1.90', 'gateway.docker.internal']
['127.0.0.1', 'kubernetes.docker.internal']
['::1', 'ip6-localhost', 'ip6-loopback']
['fe00::0', 'ip6-localnet']
['ff00::0', 'ip6-mcastprefix']
['ff02::1', 'ip6-allnodes']
['ff02::2', 'ip6-allrouters']


## CSV example

In [11]:
!head data/closing-prices.csv

,F,TSLA,GOOG,IBM,AAPL
2014-01-02,12.089,150.1,,157.6001,72.7741
2014-01-03,12.1438,149.56,,158.543,71.1756
2014-01-06,12.1986,147.0,,157.9993,71.5637
2014-01-07,12.042,149.36,,161.1508,71.0516
2014-01-08,12.1673,151.28,,159.6728,71.5019
2014-01-09,12.4022,147.53,,159.1716,70.5887
2014-01-10,12.5822,145.7199,,159.0696,70.1178
2014-01-13,12.6136,139.34,,156.4363,70.4849
2014-01-14,12.8406,161.27,,157.9314,71.8874


In [12]:
import csv
from datetime import datetime

In [14]:
import itertools

def month(tup):
    """tup is (date, price)
    
    where date is a string that looks like '2020-09-23'
    """
    date, price = tup
    dt = datetime.strptime(date, '%Y-%m-%d')
    return (dt.year, dt.month)

def get_prices_for(it, ticker):
    """it is an iterator of dictionaries "d"
    
    where d[''] is the date on which trades occurred
    and d['TSLA'] contains the closing price for that date
    """
    for d in it:
        date_str = d['']
        closing_price = float(d[ticker])
        yield (date_str, closing_price)

In [15]:
lines = gen_lines('./data/closing-prices.csv')
dicts = csv.DictReader(lines)

In [16]:
tesla_prices = get_prices_for(dicts, 'TSLA')

In [17]:
for month, sub_iter in itertools.groupby(tesla_prices, month):
    prices = [price for (dt, price) in sub_iter]
    o = prices[0]
    h = max(prices)
    l = min(prices)
    c = prices[-1]
    print(month, (o, h, l, c))

(2014, 1) (150.1, 182.84, 139.34, 181.41)
(2014, 2) (177.11, 253.0, 174.42, 244.81)
(2014, 3) (250.56, 254.84, 207.32, 208.45)
(2014, 4) (216.97, 230.29, 193.91, 207.89)
(2014, 5) (207.73, 216.61, 178.59, 207.77)
(2014, 6) (204.7, 240.06, 202.3, 240.06)
(2014, 7) (239.72, 239.72, 215.4, 223.3)
(2014, 8) (233.27, 269.7, 233.27, 269.7)
(2014, 9) (284.12, 286.04, 242.68, 242.68)
(2014, 10) (240.24, 260.62, 221.67, 241.7)
(2014, 11) (242.59, 258.68, 230.97, 244.52)
(2014, 12) (231.64, 231.64, 197.81, 222.41)
(2015, 1) (219.31, 219.31, 191.87, 203.6)
(2015, 2) (210.94, 220.99, 202.88, 203.34)
(2015, 3) (197.325, 202.435, 185.0, 188.77)
(2015, 4) (187.59, 232.45, 187.59, 226.05)
(2015, 5) (226.03, 251.45, 226.03, 250.8)
(2015, 6) (249.45, 268.79, 245.92, 268.26)
(2015, 7) (269.15, 282.26, 253.01, 266.15)
(2015, 8) (259.99, 270.13, 218.87, 249.06)
(2015, 9) (238.63, 264.2, 238.63, 248.4)
(2015, 10) (239.88, 247.57, 206.93, 206.93)
(2015, 11) (213.79, 232.36, 207.19, 230.26)
(2015, 12) (237.19

## Coroutines

In [18]:
def simple_coro(name):
    print('Entering coro', name)
    while True:
        value = yield
        print('coro {} got {}'.format(name, value))

In [19]:
sc0 = simple_coro('sc0')
sc1 = simple_coro('sc1')

In [20]:
next(sc0)
next(sc1)

Entering coro sc0
Entering coro sc1


In [21]:
sc0.send('foo')

coro sc0 got foo


In [22]:
sc1.send('bar')

coro sc1 got bar


In [23]:
sc1.send('bat')

coro sc1 got bat


# Event streams & discrete event simulation

In [24]:
def every_xs(x):
    for i in range(5):
        name = yield x   # "wait for 'x' ticks"
        print('{}: every_xs({}, {})'.format(name, x, i))


In [25]:
every_1s = every_xs(1)
every_2s = every_xs(2)
every_3s = every_xs(3)

In [26]:
import heapq

def ev_loop(coros):
    now = 0
    events = []
    
    # Initialize coroutines
    for name, coro in coros.items():
        offset = next(coro)
        heapq.heappush(events, (offset + now, name, coro))
        
    while events:
        new_now, name, coro = heapq.heappop(events)
        if new_now != now:
            print('=== {} ==='.format(new_now))
            now = new_now
        try:
            offset = coro.send(name)
            sched = offset + now
            # print '{}: scheduled next event for {}'.format(name, offset+now)
            heapq.heappush(events, (sched, name, coro))
        except StopIteration:
            print(name, 'is done!')
            
    print('No more events!')
        

In [27]:
ev_loop({'1s': every_1s, '2s': every_2s, '3s': every_3s})

=== 1 ===
1s: every_xs(1, 0)
=== 2 ===
1s: every_xs(1, 1)
2s: every_xs(2, 0)
=== 3 ===
1s: every_xs(1, 2)
3s: every_xs(3, 0)
=== 4 ===
1s: every_xs(1, 3)
2s: every_xs(2, 1)
=== 5 ===
1s: every_xs(1, 4)
1s is done!
=== 6 ===
2s: every_xs(2, 2)
3s: every_xs(3, 1)
=== 8 ===
2s: every_xs(2, 3)
=== 9 ===
3s: every_xs(3, 2)
=== 10 ===
2s: every_xs(2, 4)
2s is done!
=== 12 ===
3s: every_xs(3, 3)
=== 15 ===
3s: every_xs(3, 4)
3s is done!
No more events!


In [28]:
def every_xs(sim, x):
    while True:
        print('Yielding from every_xs({})'.format(x))
        yield sim.delay(x)


In [29]:
class Event:
    
    def __init__(self, when, coro, value=None):
        self.when = when
        self.coro = coro
        self.value = value
        
    def __lt__(self, other):
        return self.when < other.when

class Simulator:

    def __init__(self):
        self.now = 0
        self.events = []
        self.current = None

    def run(self, coro):
        try:
            self.current = coro
            ev = next(coro)
            heapq.heappush(self.events, ev)
        except StopIteration:
            pass
        
    def simulate(self, max_time=10):
        while self.events and self.now <= max_time:
            ev = heapq.heappop(self.events)
            if ev.when != self.now:
                print(f'=== {ev.when} ===')
            self.now, self.current = ev.when, ev.coro
            try:
                ev = self.current.send(ev.value)
                heapq.heappush(self.events, ev)
            except StopIteration:
                pass
        print('Simulation terminated at', self.now)

    def delay(self, seconds):
        return Event(self.now + seconds, self.current, 'delay expired!')
        

In [30]:
sim = Simulator()
sim.run(every_xs(sim, 1))
sim.run(every_xs(sim, 2))
sim.run(every_xs(sim, 3))

Yielding from every_xs(1)
Yielding from every_xs(2)
Yielding from every_xs(3)


In [31]:
sim.simulate()

=== 1 ===
Yielding from every_xs(1)
=== 2 ===
Yielding from every_xs(2)
Yielding from every_xs(1)
=== 3 ===
Yielding from every_xs(3)
Yielding from every_xs(1)
=== 4 ===
Yielding from every_xs(2)
Yielding from every_xs(1)
=== 5 ===
Yielding from every_xs(1)
=== 6 ===
Yielding from every_xs(2)
Yielding from every_xs(3)
Yielding from every_xs(1)
=== 7 ===
Yielding from every_xs(1)
=== 8 ===
Yielding from every_xs(2)
Yielding from every_xs(1)
=== 9 ===
Yielding from every_xs(3)
Yielding from every_xs(1)
=== 10 ===
Yielding from every_xs(2)
Yielding from every_xs(1)
=== 11 ===
Yielding from every_xs(1)
Simulation terminated at 11


### Event streams use case: Asynchronous I/O

Asynchronous I/O can use event streams and coroutines to provide 'thread-like' syntax.

1. A top-level event loop is created to handle events
1. Each coroutine runs until it yields an object to 'wait' on -- call it an 'awaitable'. (This could be a socket that we are receiving data from.) The 'yield' goes to the event loop.
1. The event loop generally has a `select` or `poll` statement that checks for available data on all its awaitable objects.
1. When the 'awaitable' has data, that data is "sent" into the coroutine, which picks up *as though it had blocked waiting on data*


```python
# your code does this
data = yield from socket.async_recv(100)

# socket.async_recv does this
value_to_be_result_of_yield_from = yield wait_for_socket_to_have_data

# the event loop eventually does this
coro.send(data_from_socket)

# socket.async_recv eventually does this:
raise StopIteration(data_from_socket)
# alternatively
return data_from_socket
```

Open [Advanced Generators Lab](./advanced-generators-lab.ipynb)