# Lec 29-31: Iterators, Generators, and Coroutines, or Stream Paradigm

Jianwen Zhu <jzhu@eecg.toronto.edu>
v2.0, 2024-09

## Agenda

* Python Iterators
* Python Generators
* Python Coroutine
* Writing Coroutine in C
* Stream Processing

Disclaimer: Much material of this lecture is based on code samples from https://github.com/cl0ne/dabeaz-coroutines,
originally by David Beazley.

## Python Iterators 

* As you know, Python has a "for" statement • You use it to loop over a collection of items

In [1]:
for x in [1, 2, 3, 4, 5]:
    print( x )

1
2
3
4
5


* We have seen that you can iterate over any sequences (not just lists)
* If you loop over a dictionary you get keys

In [2]:
prices = { 'GOOG' : 490.10, 'AAPL' : 145.23, 'YHOO' : 21.71 }

for key in prices:
    print( key ) 

GOOG
AAPL
YHOO


• If you loop over a string, you get characters

In [3]:
s = "Hello, World" 
for c in s: 
    print( c )

H
e
l
l
o
,
 
W
o
r
l
d


* If you loop over a file you get lines
```
  for line in open("real.txt"):
    print( line )
```

* Many functions consume an "iterable" object, for example, reductions:
             sum(s), min(s), max(s)
* Constructors: convert iterables into concrete data structures
             list(s), tuple(s), set(s), dict(s)
* in operator
```
   item in s
```
* Many others in the library

### Iteration Protocol

The reason why you can iterate over different objects is that there is a specific protocol between the builtin API and the iterable object class.


In [4]:
items = [1, 4, 5]
it = iter(items) 
next(it)

1

In [5]:
next(it)

4

In [6]:
next(it)

5

In [7]:
next(it)

StopIteration: 

* An inside look at the for statement

```
for x in obj:
    # statements

```

* Underneath the covers:
```
_iter = obj.__iter__()
while 1:
    try:
        x = _iter.__next__()
    except StopIteration:
        break
    # statements
```

* Any object that supports:
```
   - __iter__() # iter(it) is equivalent to it.__iter__() 
   - __next__() # next(it) is equivalent to it.__next__()
```
  is said to be "iterable."

In [None]:
next(it)

## Writing Your Own Iterable

You just need to support the iteration protocol

In [None]:
class countdown(object):
    def __init__(self,start):
        self.count = start
    def __iter__(self):
        return self
    def __next__(self):
        if self.count <= 0:
            raise StopIteration
        r = self.count
        self.count -= 1
        return r

Now you can use countdown just like list, string and dictionary!

In [None]:
for x in countdown(10):
    print( x )

## Python Generators

Now let's look at something that behaves like an iterable but is actually much more expressive. 

A generator is a function that produces a sequence of results instead of a single value.

In [None]:
def countdown(n):
    while n > 0:
        yield n
        n -= 1
    # here we raise StopIteration except if reach here

In [None]:
for i in countdown(5): 
    print( i )

Instead of returning a value, you generate a series of values (using the yield statement). Typically, you hook it up to a for-loop.

* Behavior is quite different than normal function
* Calling a generator function creates an generator object. However, it does not start running the function.

In [None]:
x = countdown(5)
x

In [None]:
next(x)

Curiously, yield produces a value, but simultaneously suspend the function. But it can be executed again starting from where it stopped, next time you call next()!

In [None]:
next(x)

In [None]:
next(x)

In [None]:
next(x)
next(x)
next(x)

When the generator returns, iteration stops!

### Practical Example

* A Python version of Unix 'tail -f'

In [None]:
import time

def follow(thefile):
#    thefile.seek(0,2)      # Go to the end of the file
#    while True:
    for i in range(10) :    # modified to replace "while True" only to run in Jupyter finite number of times
        line = thefile.readline()
        if not line:
            time.sleep(0.1)    # Sleep briefly
            continue
        yield line

* Example use : Watch a web-server log file

```
logfile  = open("access-log")
for line in follow(logfile):
    print( line )
```

In [None]:
logfile  = open("access-log")
for line in follow(logfile):
    print( line )

### Generator as Stream pipelines

* Each generator process and generate a stream of data
* Stream processing pipeline: similar to shell pipes in Unix
* Idea: You can stack a series of generator functions together into a pipe and pull items through it with a for-loop

In [None]:
def grep(pattern,lines):
    for line in lines:
        if pattern in line:
            yield line

In [None]:
# Set up a processing pipe : tail -f | grep python
logfile  = open("access-log")
loglines = follow(logfile)
pylines  = grep("python",loglines)

# Pull results out of the processing pipeline
for line in pylines:
    print( line )

It is instructive to compare the execution of the above pipeline with ordinary function call.

1. Here we have a chain of function calls as well, but they are just *constructing* the pipeline, not executing the pipeline
2. Actual execution of a pipeline happens during the for loop
3. The pipeline stages execute one piece at a time, each producing one item of data (via yield);
4. The intermediate data is *VERY SMALL*, this is in contrast to the case where each each stage execute and accumulate lots of data and then pass to the next stage.

This style of programming is called streaming processing. It is very memory efficient!

You might as well just write it as the following and enjoy the elegance of expression.

In [None]:
for line in grep( "python", follow( open("access-log") ) ) :
    print( line )

## Python Coroutines

You could also use yield as an expression, which evaluate to a value, for example, on the right side of an assignment.

In [None]:
def grep(pattern):
    print( "Looking for %s" % pattern )
    while True:
        line = yield
        if pattern in line:
            print( line )

If you use yield more generally, you get a coroutine. These do more than just generate values. Instead, functions can consume values sent to it.

In [None]:
g = grep("python")
next(g)

First time calling next() to "prime it".

In [None]:
g.send("Yeah, but no, but yeah, but no")
g.send("A series of tubes")
g.send("python coroutine rock!")

Sent values are returned by (yield)!

* Execution is the same as for a generator
* When you call a coroutine, nothing happens
* They only run in response to next() and send() methods
* All coroutines must be "primed" by first calling next() (or send(None))
* This advances execution to the location of the first yield expression.
* At this point, it's ready to receive a value.

* It is easy to forget the prime process. So let's write a decorator for this.

In [None]:
def coroutine(func):
    def start(*args,**kwargs):
        cr = func(*args,**kwargs)
        next(cr)
        return cr
    return start

In [None]:
@coroutine
def grep(pattern):
    print( "Looking for %s" % pattern )
    while True:
        line = yield
        if pattern in line:
            print( line )

* A coroutine might run indefinitely
* Use .close() to shut it down

In [None]:
g = grep("python")
g.send("Yeah, but no, but yeah, but no")
g.send("A series of tubes")
g.send("python generators rock!")
g.close()

* close() can be caught as the exception GeneratorExit.
  Let's try again: 

In [None]:
@coroutine
def grep(pattern):
    print( "Looking for %s" % pattern )
    try:
        while True:
            line = yield
            if pattern in line:
                print( line )
    except GeneratorExit:
        print( "Going away.  Goodbye" )

In [None]:
g = grep("python")
g.send("Yeah, but no, but yeah, but no")
g.send("A series of tubes")
g.send("python generators rock!")
g.close()

* You can inject an exception into a couroutine

In [None]:
g = grep("python")
g.send("python generators rock!")
g.throw(RuntimeError,"You're hosed")

How to handle the exception is left to you as an exercise.

## Coroutine in C

The execution of generators and corouting looks magical. They look like a function but its behavior is different. Most shockingly, they can return in the middle and are re-entrant from where they stopped last time!

Have you wondered how they actually worked? We are going to illustrate how it works in its raw form, that is, in our favorite C. 

### Motivational Example

```
----------------------------------------------
    Example
----------------------------------------------
    /* Decompression code */
    while (1) {
        c = getchar();
        if (c == EOF)
            break;
        if (c == 0xFF) {
            len = getchar();
            c = getchar();
            while (len--)
                emit(c);
        } else
            emit(c);
    }
    emit(EOF);
    
    /* Parser code */
    while (1) {
        c = getchar();
        if (c == EOF)
            break;
        if (isalpha(c)) {
            do {
                add_to_token(c);
                c = getchar();
            } while (isalpha(c));
            got_token(WORD);
        }
        add_to_token(c);
        got_token(PUNCT);
    }
```

### Rewriting

The conventional answer is to rewrite one of the ends of the
communication channel so that it's a function that can be
called. Here's an example of what that might mean for each of the
example fragments.

```
int decompressor(void) {
    static int repchar;
    static int replen;
    if (replen > 0) {
        replen--;
        return repchar;
    }
    c = getchar();
    if (c == EOF)
        return EOF;
    if (c == 0xFF) {
        replen = getchar();
        repchar = getchar();
        replen--;
        return repchar;
    } else
        return c;
}
void parser(int c) {
    static enum {
        START, IN_WORD
    } state;
    switch (state) {
    case IN_WORD:
        if (isalpha(c)) {
            add_to_token(c);
            return;
        }
        got_token(WORD);
        state = START;
        /* fall through */

    case START:
        add_to_token(c);
        if (isalpha(c))
            state = IN_WORD;
        else
            got_token(PUNCT);
        break;
    }
}

```


Of course you don't have to rewrite both of them; just one will do. If
you rewrite the decompressor in the form shown, so that it returns one
character every time it's called, then the original parser code can
replace calls to getchar() with calls to decompressor(), and the
program will be happy. Conversely, if you rewrite the parser in the
form shown, so that it is called once for every input character, then
the original decompression code can call parser() instead of emit()
with no problems. You would only want to rewrite both functions as
callees if you were a glutton for punishment.

### Knuth's Coroutines

In The Art of Computer Programming, Donald Knuth presents a solution
to this sort of problem. His answer is to throw away the stack concept
completely. Stop thinking of one process as the caller and the other
as the callee, and start thinking of them as cooperating equals.

In practical terms: replace the traditional "call" primitive with a
slightly different one. The new "call" will save the return value
somewhere other than on the stack, and will then jump to a location
specified in another saved return value. So each time the decompressor
emits another character, it saves its program counter and jumps to the
last known location within the parser - and each time the parser needs
another character, it saves its own program counter and jumps to the
location saved by the decompressor. Control shuttles back and forth
between the two routines exactly as often as necessary.

This is very nice in theory, but in practice you can only do it in
assembly language, because no commonly used high level language
supports the coroutine call primitive. Languages like C depend utterly
on their stack-based structure, so whenever control passes from any
function to any other, one must be the caller and the other must be
the callee. So if you want to write portable code, this technique is
at least as impractical as the Unix pipe solution.



### First Attempt

```
/* First Attempt */
int function(void) {
    int i;
    for (i = 0; i < 10; i++)
        return i;   /* won't work, but wouldn't it be nice if it does! */
}

int function(void) {
    static int i, state = 0;
    switch (state) {
        case 0: goto LABEL0;
        case 1: goto LABEL1;
    }
    LABEL0: /* start of function */
    for (i = 0; i < 10; i++) {
        state = 1; /* so we will come back to LABEL1 */
        return i;
        LABEL1:; /* resume control straight after the return */
    }
}

```

### Second Attempt using Duff's Device


Does this piece of code event compile? Could you recognize what it actually do? 

```
    switch (count % 8) {
        case 0:        do {  *to = *from++;
        case 7:              *to = *from++;
        case 6:              *to = *from++;
        case 5:              *to = *from++;
        case 4:              *to = *from++;
        case 3:              *to = *from++;
        case 2:              *to = *from++;
        case 1:              *to = *from++;
                       } while ((count -= 8) > 0);
    }

```

Strangely, the switch statement is *interleaved* with the do-while statement!
But this is legal C and it works. To keep your brain sane, it is best to understand
each case statement simply as a label for the evaluation of the switch statement to
to jump to. (The above code fragment is actually part of loop unroll macro magic.

This is named after its inventor. Let's see how we can use Duff's Device to implement coroutine.


```
/* 2nd attempt with Duff's Device */

int function(void) {
    static int i, state = 0;
    switch (state) {
    case 0: /* start of function */
        for (i = 0; i < 10; i++) {
            state = 1; /* so we will come back to "case 1" */
            return i;
    case 1:; /* resume control straight after the return */
        }
    }
}
```

Note that we turn variable i into "static int i", why?

### Third Attempt using Generalizing Macros

```
#define crBegin static int state=0; switch(state) { case 0:
#define crYield(i,x) do { state=i; return x; case i:; } while (0)
#define crFinish }
int function(void) {
    static int i;
    crBegin;
    for (i = 0; i < 10; i++)
        crYield(1, i);
    crFinish;
}

```

### Fourth Attempt with Better Macros

```
#define crBegin static int state=0; switch(state) { case 0:
#define crYield(x) do { state=__LINE__; return x; \
                         case __LINE__:; } while (0)
#define crFinish }

int function(void) {
    static int i;
    crBegin;
    for (i = 0; i < 10; i++)
        crYield(i);
    crFinish;
}
```


### Back with Motivating Example

```
int decompressor(void) {
    static int c, len;
    crBegin;
    while (1) {
        c = getchar();
        if (c == EOF)
            break;
        if (c == 0xFF) {
            len = getchar();
            c = getchar();
            while (len--)
	        crYield(c);
        } else
	    crYield(c);
    }
    crReturn(EOF);
    crFinish;
}

void parser(int c) {
    crBegin;
    while (1) {
        /* first char already in c */
        if (c == EOF)
            break;
        if (isalpha(c)) {
            do {
                add_to_token(c);
                crYield();
            } while (isalpha(c));
            got_token(WORD);
        }
        add_to_token(c);
        got_token(PUNCT);
	crYield( );
    }
    crFinish;
}
```

## Stream Processing

### Streaming Processing Using Corutine
We have seen how to build a pipeline using generators. We can do the same with coroutines.


```
send()              send()               send()
------> |coroutine|-------> |coroutine| -------> |coroutine| ------> 
```

We just chain them together and *push* data through the pipe with send() operations.

* Pipeline Sources
    - The pipeline needs an initial source (a producer)

```
def source(target):
    while not done:
        item = produce_an_item()
        ...
        target.send(item)
        ...
    target.close()
```
    - It is typically not a coroutine

* Pipeline Sinks
    - The pipeline must have an end-point (sink)


```
send()              send()               
------> |coroutine|-------> |sink| 
```

    - Collects all data sent to it and processes it

```
@coroutine
def sink():
    try:
        while True:
            item = yield   # Receive an item
            ...
    except GeneratorExit:    # Handle .close()
        # Done
        ...
```

* Back to Previous Example


In [None]:
## A source that mimics Unix 'tail -f'
import time
def follow(thefile, target):
#    thefile.seek(0,2)      # Go to the end of the file
#    while True:
     for i in range(10) :
        line = thefile.readline()
        if not line:
            time.sleep(0.1)    # Sleep briefly
            continue
        target.send(line)

In [None]:
def coroutine(func):
#    @wraps(func)
    def start(*args, **kwargs):
        cr = func(*args, **kwargs)
        next(cr)
        return cr

    return start
 
## A sink that just prints the lines
@coroutine
def printer():
    while True:
        line = yield
        print( line )

* Hooking it Together

Critical point : follow() is driving the entire computation by reading lines and pushing them into the printer() coroutine.

In [None]:
f = open("access-log")
follow(f, printer())

* Adding Pipeline Filters
    - Intermediate stages both receive and send

```
send()              send()               
------> |coroutine|-------> 
```

Typically perform some kind of data transformation, filtering, routing, etc.

```
@coroutine
def filter(target):
while True:
    item = (yield)    # Receive an item
    # Transform/filter item
    ...
    # Send it along to the next stage
    target.send(item)
```

Let's see the complete example below.

In [None]:
import time
from functools import wraps

   
def follow(thefile, target):
#    thefile.seek(0,2)      # Go to the end of the file
#    while True:
     for i in range(10) :
        line = thefile.readline()
        if not line:
            time.sleep(0.1)    # Sleep briefly
            continue
        target.send(line)

# A filter.
@coroutine
def grep(pattern, target):
    while True:
        line = yield  # Receive a line
        if pattern in line:
            target.send(line)  # Send to next stage


# A sink.  A coroutine that receives data
@coroutine
def printer():
    while True:
        line = yield
        print(line, end=" ")

In [None]:
f = open("access-log")
follow(f, grep("python", printer()))

### Generator-based Pipeline vs Coroutine-based Pipeline

* Generator is pull-based: pull data through the pipe with "for" by the sink

```
input seq -----> |generator|-------> |generator| -------> |generator| ------> for x in s : 
```

* Coroutine is push-based: push data through the pipe with send() by the source

```
         send()               send()
|source|-------> |coroutine| -------> |coroutine sink| 
```

* You could build an entire dataflow processing !
  - With branch
  - With broadcast
  - In a graph structure

### XML Event Processing

* Problem

- Where is my bus?
- Chicago Transit Authority (CTA) equips most of its buses with real-time GPS tracking
- You can get current data on every bus on the street as a big XML document

* Some XML

```
<?xml version="1.0"?>
  <buses>
    <bus>
        <id>7574</id>
        <route>147</route>
        <color>#3300ff</color>
        <revenue>true</revenue>
        <direction>North Bound</direction>
        <latitude>41.925682067871094</latitude>
        <longitude>-87.63092803955078</longitude>
        <pattern>2499</pattern>
        <patternDirection>North Bound</patternDirection>
        <run>P675</run>
        <finalStop><![CDATA[Paulina & Howard Terminal]]></finalStop>
        <operator>42493</operator>
    </bus>
    <bus>
       ...
    </bus>
  </buses>
```

* XML Parsing

    - There are many possible ways to parse XML
    - An old-school approach: SAX (Simple API for XML)
    - SAX is an event driven interface

```
                 events
 | XML Parser | ---------> | Handler |
```

```
class Handler:
   def startElement():
       ...
   def endElement():
       ...
   def characters():
```
~~~

* Minimal SAX Example

In [None]:
# basicsax.py
#
# A very simple example illustrating the SAX XML parsing interface

import xml.sax


class MyHandler(xml.sax.ContentHandler):
    def startElement(self, name, attrs):
        print("startElement", name)

    def endElement(self, name):
        print("endElement", name)

    def characters(self, text):
        print("characters", repr(text)[:40])


xml.sax.parse("allroutes.xml", MyHandler())

* Some Issues
    - SAX is often used because it can be used to incrementally process huge XML files without a large memory footprint
    - However, the event-driven nature of SAX parsing makes it rather awkward and low-level to deal with
* From SAX to Coroutines
    - You can dispatch SAX events into coroutines
    - Consider this SAX handler: it does nothing, but send events to a target

In [None]:
import xml.sax

class EventHandler(xml.sax.ContentHandler):
    def __init__(self, target):
        self.target = target

    def startElement(self, name, attrs):
        self.target.send(("start", (name, attrs._attrs)))

    def characters(self, text):
        self.target.send(("text", text))

    def endElement(self, name):
        self.target.send(("end", name))


In [None]:
@coroutine
def printer():
    while True:
        event = yield
        print(event)

xml.sax.parse("allroutes.xml", EventHandler(printer()))

* An Event Stream
    - The big picture

```
                 events                  send()
 | SAX Parser | ---------> | Handler |  --------->  (event,value)

                                                 'start'      ('direction',{})
                                                 'end'        'direction'
                                                 'text'       'North Bound'

```
* Event Processing

  - To do anything interesting, you have to process the event stream
  - Example: Convert bus elements into dictionaries (XML sucks, dictionaries rock)
  - From:

```
<bus>
    <id>7574</id>
    <route>147</route>
    <revenue>true</revenue>
    <direction>North Bound</direction>
    ...
</bus>
```

  - To:

```
{
  'id' : '7574',
  'route' : '147',
  'revenue' : 'true',
  'direction' : 'North Bou
  ...
}
```

In [None]:
@coroutine
def buses_to_dicts(target):
    while True:
        event, value = yield
        # Look for the start of a <bus> element
        if event == "start" and value[0] == "bus":
            busdict = {}
            fragments = []
            # Capture text of inner elements in a dict
            while True:
                event, value = yield
                if event == "start":
                    fragments = []
                elif event == "text":
                    fragments.append(value)
                elif event == "end":
                    if value != "bus":
                        busdict[value] = "".join(fragments)
                    else:
                        target.send(busdict)
                        break


* State Machines
    - The previous code works by implementing a simple state machine

```
             ('start',('bus',*))
       <->   -------------------->   <->
---> |  A  |                       |  B  | 
             <--------------------
                 ('end','bus')
```
    - State A: Looking for a bus
    - State B: Collecting bus attributes
    - Comment : Coroutines are perfect for this


* Buses to Dictionaries
  - State A

```
    while True:
        event, value = yield
        # Look for the start of a <bus> element
        if event == "start" and value[0] == "bus":
 
```
  - State B
```
            while True:
                event, value = yield
                if event == "start":
                    fragments = []
                elif event == "text":
                    fragments.append(value)
                elif event == "end":
                    if value != "bus":
                        busdict[value] = "".join(fragments)
                    else:
                        target.send(busdict)
                        break

```

* Filtering Elements

  - Let's filter on dictionary fields

```
  - Examples:
```
filter_on_field("route","22",target)
filter_on_field("direction","North Bound",target)
```

In [None]:
@coroutine
def filter_on_field(fieldname, value, target):
    while True:
        d = yield
        if d.get(fieldname) == value:
            target.send(d)

* Processing Elements
  - Where's my bus?

   - This receives dictionaries and prints a table

```
     22,1485,"North Bound",41.880481123924255,-87.62948191165924
     22,1629,"North Bound",42.01851969751819,-87.6730209876751
```

In [None]:
@coroutine
def bus_locations():
    while True:
        bus = yield
        print('%(route)s,%(id)s,"%(direction)s",' "%(latitude)s,%(longitude)s" % bus)


* Hooking it Together
    - Find all locations of the North Bound #22 bus (the slowest moving object in the universe)
    - This final step involves a bit of plumbing, but each of the parts is relatively simple

In [None]:
xml.sax.parse(
        "allroutes.xml",
        EventHandler(
            buses_to_dicts(
                filter_on_field(
                    "route",
                    "22",
                    filter_on_field("direction", "North Bound", bus_locations()),
                )
            )
        ),
)

## Recap

* Iteration protocol allows user-defined *iterables* with sequence-like behavior
* Generator allows development of "dynamic" iteratibles with reentrant execution
* Couroutine (where yield appear at RHS) enables reentrant execution while receiving values at each execution
* Both can be used for stream (event) processing
  - pushed-based: coroutine
  - pull-based: generator
  - Write *state machines* WITHOUT explicit state management (simplified and intuitive programming!)
  - Small footprint for intermediate data: keep everything in memory!