## Coroutine in Python

### Objective

* Explore and compare the models programming for asynchronous problems
* Asychronous, use cases and some approaches
* What is the coroutines? How does it works? Comparations units of work
* Application of coroutines in real

### What is asynchronous?

Follow [Wikipedia](https://en.wikipedia.org/wiki/Asynchrony_(computer_programming)
> Asynchrony, in computer programming, refers to the occurrence of events independent of the main program flow and ways to deal with such events. These may be "outside" events such as the arrival of signals, or actions instigated by a program that take place concurrently with program execution, without the program blocking to wait for results. Asynchronous input/output is an example of the latter cause of asynchrony, and lets programs issue commands to storage or network devices that service these requests while the processor continues executing the program. Doing so provides a degree of parallelism

The under, we have some solutions for asynchronous tasks

![Async models](./assets/images/async-overview.jpg)

Notice, the python thread is the green thread, it is managed by interpreter instead of OS. Thus, python thread isn't really parallelism and I hate it.

We can see that threads and processes owns isolated space memory thus they can independent work with the main process.

Opposite, the event loop maintains tasks, which is shared memory and we must answer the question `How do we organize memory space for independent tasks?`

### When we use event loop, threads and processes?

In computer science, we can classify two classes of task:
* CPU bound
    > In computer science, a computer is CPU-bound (or compute-bound) when the time for it to complete a task is determined principally by the speed of the central processor: processor utilization is high, perhaps at 100% usage for many seconds or minutes. Interrupts generated by peripherals may be processed slowly, or indefinitely delayed.
    
* I/O bound
    > I/O bound refers to a condition in which the time it takes to complete a computation is determined principally by the period spent waiting for input/output operations to be completed. This is the opposite of a task being CPU bound. This circumstance arises when the rate at which data is requested is slower than the rate it is consumed or, in other words, more time is spent requesting data than processing it
    
**Example**:
* using multiprocess (native thread) for I/O bound

    Jason asks Dung that 'What do you do in today?' and Dung reponses that his tasks is pending in testing and he doesn't have task thus he will back to home and 😴.
    
    No, it isn't optimization solution. Instead, Dung should do other tasks until Ms.Quyen said that 'Hey Dung, your tasks failed, fix it' 😥.
    
    Awesome, this is how event loop works for I/O bound tasks
    
* using event loop for CPU bound

    Our team has 5 members and 5 tasks but only Dung takes all tasks then do. Because each task must be committed at the end of the day so Dung do each task for an hour and move on to another task. Thus Dung is sick in the weekend 😷.
    
    No, we have 5 members and why do we push Dung? Jason can deal the tasks to other members and end of the day, all tasks are committed but Dung is ok.
    
    This is how multiprocessing (native threads) works.
    
    
Really, event loop useful in the event system and related others. They can be best solution for I/O bound problems  

### Problems of task in event loop model

We known that we have a context in the function. The context includes variables, stack frame,... and they are unlocatted by interpreter after return command executed. 

In I/O problems, we usually have tasks, which consists of many small parts interrupted by commands delay for IO. At the interrupted point, we need return control for caller (event loop) of function (task) and we also need execute this function at the interrupt, which is return.

==> Solution: coroutine

#### What is coroutine?

Donald Knuth says:
> Subroutines are special cases of coroutine


How coroutine works?

![](assets/images/subroutine_coroutine.png)

#### Unit of works

|| Process | Native thread | Green thread | Goroutine | Coroutine |
| :-:| :-: | :-: | :-: | :-:| :-: |
|__Memory__| ≤ 8Mb | ≤ Nx2Mb | ≥ 64Kb | ≥ 8Kb | ≥ 0Mb |
|__OS managed__| Yes | Yes | No | No | No |
|__Pre-emptive scheduling__| Yes | Yes | Yes | No | No |
|__Private address space__| Yes | No | No | No | No |
|__Parallel__| Yes | Yes | No | Yes | No |




#### How implement coroutine from scratch?

```c
#include <stdio.h>

int coroutine() {
    static int i = 0, s = 0;
    switch (s) {
        case 0:
            for (i = 0;; ++i) {
                if (!s) s = 1;
                return i;
                case 1:;
            }
    }
}

int main(int argc, char** argv) {
    printf("%d\n", coroutine());     // ?
    printf("%d\n", coroutine());     // ?
    printf("%d\n", coroutine());     // ?
    return 0;
}

```


We can see that coroutine need static memory namespace for save context when it suspends and resumes without lost context. In C, static namespace is static variables, they are maintained by OS when function done. In Python, context of function is stored in stack frame. 

__Summary__ the coroutine has some features:
* is non preemptive scheduling
* can suspend and resume at the anywhere
* can maintain state
* for I/O bound, coroutine optimize memory and CPU

Coroutine increases many bugs of multiprocessing and I think it is best solution for networking tasks because it alive in single process.

In Python, we can define coroutine by use `yield` statement in the function. When we call functions, they return coroutine object instead of final value

In [None]:
def coro_fn():
    val = yield 'Starting'   # started coroutine and suspend, return control to caller
    print('Consume', val)
    yield 'Hello World'      # produce data
    
co = coro_fn()               # create a new coroutine object
print(co.send(None))         # start coroutine
print(co.send('data'))       # resume coroutine, pass control into coroutine
co.close()                   # close coroutine

Generator is the specical coroutine, they can only produce data without consume data

In [None]:
def fib_gen():
    a, b = 0, 1           # this is maintained context
    while 1:
        yield a           # return control to for loop
        a, b = b, a + b
        
fib = fib_gen()
for _ in range(10):
    print(next(fib), end=' ')

### Application of coroutine


#### 1. Asychronous TCP server

In [None]:
import logging
from sys import stdout
from socket import socket, SOCK_STREAM, AF_INET
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE


logging.basicConfig(stream=stdout, level=logging.DEBUG)


class Server:
    def __init__(self, host, port, buf_size=64):
        self.addr = (host, port)
        self.poll = DefaultSelector()
        self.m = {}
        self.buf_size = buf_size

    def handle_read(self, sock):  # make isolated environment for each connection
        buffer_size = self.buf_size
        handle_write = self.handle_write

        def _can_read():
            chunks = []
            while 1:
                chunk = sock.recv(buffer_size)
                if chunk.endswith(b'\n\n'):
                    chunks.append(chunk[:-2])
                    break
                else:
                    chunks.append(chunk)
                    yield

            handle_write(sock, b''.join(chunks))

        handler = _can_read()
        self.m[sock] = handler
        self.poll.register(sock, EVENT_READ, handler)

    def handle_write(self, sock, data):     # make isolated environment for each connection
        poll = self.poll
        m = self.m
        buffer_size = self.buf_size

        def _can_write():
            nonlocal data, sock
            start_, end_ = 0, 0
            data = b'Hello ' + data
            len_data = len(data)

            while 1:
                end_ = min(start_ + buffer_size, len_data)
                if start_ >= end_:
                    break
                sock.send(data[start_:end_])
                start_ += buffer_size
                yield

            # clear handler of connection and close conection
            poll.unregister(sock)
            del m[sock]
            sock.close()
            del sock

        handler = _can_write()
        m[sock] = handler
        poll.modify(sock, EVENT_WRITE, handler)

    def handle_accept(self, sock):
        while 1:
            s, addr = sock.accept()
            logging.debug(f'Accept the connection from {addr}')
            self.handle_read(s)
            yield

    def mainloop(self):
        try:
            sock = socket(AF_INET, SOCK_STREAM)
            sock.bind(self.addr)
            sock.setblocking(0)
            sock.listen(1024)

            self.m[sock] = self.handle_accept(sock)
            self.poll.register(sock, EVENT_READ, self.m[sock])

            logging.info(f'Server is running at {self.addr}')
            while 1:
                events = self.poll.select()
                for event, _ in events:
                    try:
                        cb = event.data
                        next(cb)
                    except StopIteration:
                        pass
        except Exception as e:
            sock.close()
            self.poll.close()
            raise e

In [None]:
server = Server('127.0.0.1', 5000)
server.mainloop()

### 2. Scheduler for OS

![](./assets/images/os-scheduler.png)

When statement hits a trap, program passes control to OS and OS executes the statements or switch other tasks and passes control to it.

But this model is non preemptive scheduler, I give you this model for explain relationship between `yield` expression and `trap` in OS

In [None]:
from queue import Queue


class SystemCall:
    __slots__ = ('sched', 'target')

    def handle(self):
        pass


class Task:
    __slots__ = ('id', 'target', 'sendval')
    _id = 0

    def __init__(self, target):
        Task._id += 1
        self.id = Task._id
        self.target = target
        self.sendval = None

    def run(self):
        return self.target.send(self.sendval)


class Scheduler:
    __slots__ = ('taskmap', 'ready')

    def __init__(self):
        self.taskmap = {}
        self.ready = Queue()

    def new(self, target):
        task = Task(target)
        self.taskmap[task.id] = task
        self.schedule(task)
        return task.id

    def mainloop(self):
        while self.taskmap:
            task = self.ready.get()
            try:
                result = task.run()
                if isinstance(result, SystemCall):
                    result.task = task
                    result.sched = self
                    result.handle()
                    continue
            except StopIteration:
                self.exit(task)
            else:
                self.schedule(task)

    def schedule(self, task):
        self.ready.put(task)

    def exit(self, task):
        print('Task %d terminated' % task.id)
        del self.taskmap[task.id]


class GetTid(SystemCall):
    def handle(self):
        self.task.sendval = self.task.id
        self.sched.schedule(self.task)


class NewTask(SystemCall):
    def __init__(self, target):
        self.target = target

    def handle(self):
        tid = self.sched.new(self.target)
        self.task.sendval = tid
        self.sched.schedule(self.task)


class KillTask(SystemCall):
    def __init__(self, tid):
        self.tid = tid

    def handle(self):
        task = self.sched.taskmap.get(self.tid, None)
        if task:
            task.target.close()
            self.task.sendval = True
        else:
            self.task.sendval = False
        self.sched.schedule(self.task)

In [None]:
def foo():
    tid = yield GetTid()
    print(f'I\'m foo and I am living in {tid} process')
    for i in range(5):
        print(f"Foo {tid} is in {i} step")
        yield

def bar():
    tid = yield GetTid()
    print(f"I'm bar and I'm living in {tid} process")
    yield NewTask(foo())
    for i in range(3):
        print(f"Bar {tid} is in {i} step")
        yield
    yield KillTask(1)

sched = Scheduler()
sched.new(foo())
sched.new(bar())
sched.mainloop()

### 3. Streamming system


We can use coroutines build up data processing system. Basically, the system was separate to logic blocks. They put in coroutines with owned context. You can see under picture.

![](assets/images/simple-data-processing.png)

We can describe many kind of systems if we create specify logic block: filter, conditional, selector, broadcast,...

Example: build statistic IP from access log in the web server

![](assets/images/IP-statistic.png)

In [None]:
def coroutine(f):
    def decorator(*args, **kwargs):
        co = f(*args, **kwargs)
        co.send(None)   # start coroutine before use it
        return co
    return decorator
 
@coroutine
def broadcast(targets):
    try:
        while 1:
            data = yield
            for target in targets:
                target.send(data)
    except GeneratorExit:
        for target in targets:
            target.close()
            
@coroutine
def map_(ip, next_):
    try:
        while 1:
            data = yield
            if data.startswith(ip):
                next_.send(ip)
    except GeneratorExit:
        next_.close()
        
@coroutine
def reduce_(on_done):
    m = {}
    try:
        while 1:
            data = yield
            if data not in m:
                m[data] = 1
            else:
                m[data] += 1
    except GeneratorExit:
        on_done(m)

In [None]:
result = {}
def on_done(r):
    global result
    result = r

reducer = reduce_(on_done)
flow = broadcast([
    map_('83.149.9.216', reducer),
    map_('93.114.45.13', reducer),
    map_('207.241.237.101', reducer),
])

# this is the source data
# We have 10000 lines in this log
%time
with open('assets/files/access.log', 'r') as fp:
    for line in fp.readlines():
        flow.send(line)
    flow.close()

print(result)

#### Improvement

We can wrapper threads (or process or machine) in coroutines, Why not?

Simiplify, I use threads instead of machine

OK, let's redesign above diagram

![](assets/images/IP-Statistic-v2.png)

In above diagram, I move logic into threads and I use queues as a communication channels with threads.

Furthermore, queues is used as buffer if rate of input is greater than rate of output.

In [96]:
from threading import Thread
from queue import Queue


def coroutine(f):
    def decorator(*args, **kwargs):
        co = f(*args, **kwargs)
        co.send(None)
        return co
    return decorator
 
@coroutine
def broadcast_threaded(targets):
    queue = Queue()
    def _run_target():
        nonlocal queue, targets
        while 1:
            data = queue.get()
            if data is GeneratorExit:
                for target in targets:
                    target.close()
                return
            else:
                for target in targets:
                    target.send(data)
    Thread(target=_run_target).run()
    try:
        while 1:
            data = yield
            queue.put(data)
    except GeneratorExit:
        queue.put(GeneratorExit)
            
@coroutine
def map_threaded(ip, next_):
    queue = Queue()
    def _run_target():
        nonlocal ip, queue
        while 1:
            data = queue.get()
            if data is GeneratorExit:
                next_.close()
                return
            else:
                if data.startswith(ip):
                    next_.send(ip)
    Thread(target=_run_target).run()
    try:
        while 1:
            data = yield
            queue.put(data)
    except GeneratorExit:
        queue.put(GeneratorExit)
        
@coroutine
def reduce_threaded(on_done):
    m = {}
    queue = Queue()
    def _run_target():
        nonlocal queue, m, on_done
        while 1:
            data = queue.get()
            if data is GeneratorExit:
                on_done(m)
                return
            else:
                if data not in m:
                    m[data] = 1
                else:
                    m[data] += 1
    t = Thread(target=_run_target)
    t.run()
    try:
        while 1:
            data = yield
            queue.put(data)
    except GeneratorExit:
        queue.put(GeneratorExit)