In [None]:
talk_title = 'From Coroutines to Concurrency'
full_name  = 'Vishal Vivek Prasad'

In [None]:
# Discuss preliminaries

In [None]:
# Discuss the GIL

On the Global Interpreter Lock [Python 3.8.0 Alpha](https://github.com/python/cpython/blob/e42b705188271da108de42b55d9344642170aa2b/Python/ceval_gil.h)


In [None]:
# Discuss Generators

In [1]:
def my_range(n):
    i = 0
    while i < n:
        yield i
        i += 1
mr = my_range(10)
mr

<generator object my_range at 0x10f2eebf8>

In [12]:
next(mr)

StopIteration: 

In [13]:
for i in my_range(10):
    print(i)
l = list(my_range(10))
l

0
1
2
3
4
5
6
7
8
9


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [None]:
'''
# Indispensable for handling database records

import psycopg2
conn = psycopg2.connect("dbname=test user=postgres")
cur = conn.cursor()
cur.execute("SELECT * FROM test;")
'''

In [None]:
# Generator pipelining

In [14]:
def squared_range(n):
    for i in my_range(n):
        yield i ** 2
sr = squared_range(10)

In [25]:
next(sr)

StopIteration: 

[PEP-342](https://www.python.org/dev/peps/pep-0342/) enhances generators to not just produce values, but also to receive them

In [26]:
def squared():
    while True:
        n = yield
        print(n ** 2)
sqrd = squared()
next(sqrd)

In [27]:
sqrd.send(3)

9


In [28]:
sqrd.send(4)

16


In [42]:
def my_range_pipe(n, next_coroutine):
    i = 0
    while i < n:
        yield
        next_coroutine.send(i)
        i += 1

def squared_pipe(next_coroutine):
    while True:
        n = yield
        next_coroutine.send(n ** 2)
    
def print_sink():
    while True:
        n = yield
        print(n)
    
sink = print_sink()
next(sink)
squared = squared_pipe(sink)
next(squared)
mrp = my_range_pipe(10, squared)
next(mrp)

In [43]:
next(mrp)

0


In [None]:
# Coroutines and generalized control flow

> Subroutines are special cases of more general program components, called coroutines. In contrast to the unsymmetric relationship between a main routine and a subroutine, there is a complete symmetry between coroutines, which call on each other.

Knuth (AoCP Volume 1, 1.4.2)


In [None]:
# Coroutines as tasks/pseudothreads

In [None]:
# Cooperative Multitasking

In [44]:
sink = print_sink()
squared = squared_pipe(sink)
mrp1 = my_range_pipe(1, squared)
mrp2 = my_range_pipe(3, squared)
mrp3 = my_range_pipe(5, squared)
next(sink)
next(squared)
next(mrp1)
next(mrp2)
next(mrp3)

tasks = [mrp1, mrp2, mrp3]

def round_robin():
    while tasks:
        task = tasks.pop(0)
        try:
            next(task)
            tasks.append(task)
        except StopIteration:
            print('Finished')

round_robin()

0
Finished
0
0
1
1
4
Finished
4
9
16
Finished


In [45]:
class Scheduler:
    def __init__(self):
        self.tasks = []
    
    def add(self, task):
        self.tasks.append(task)
        
    def join(self):
        tasks = self.tasks
        while tasks:
            task = tasks.pop(0)
            try:
                next(task)
                tasks.append(task)
            except StopIteration:
                print('Finished')   

sink = print_sink()
squared = squared_pipe(sink)
mrp1 = my_range_pipe(1, squared)
mrp2 = my_range_pipe(3, squared)
mrp3 = my_range_pipe(5, squared)
next(sink)
next(squared)
next(mrp1)
next(mrp2)
next(mrp3)

s = Scheduler() 
s.add(mrp1)
s.add(mrp2)
s.add(mrp3)
s.join()

0
Finished
0
0
1
1
4
Finished
4
9
16
Finished


In [46]:
class Pool:
    def __init__(self, pool_size):
        self.tasks = []
        self.pool_size = pool_size
    
    def add(self, task):
        if self.pool_size > len(self.tasks):
            self.tasks.append(task)
            self.pool_size += 1
        
    def join(self):
        tasks = self.tasks
        while tasks:
            task = tasks.pop(0)
            try:
                next(task)
                self.tasks.append(task)
            except StopIteration:
                print('Finished')

class Scheduler:
    def __init__(self):
        # A task_unit can be a task, a Group, a Pool, etc
        self.task_units = []
    
    def add(self, task_unit):
        self.task_units.append(task_unit)
        
    def join(self):
        units = self.task_units
        while units:
            unit = units.pop(0)
            if isinstance(unit, Pool):
                unit.join()
            else:
                try:
                    next(unit)
                    units.append(unit)
                except StopIteration:
                    print('Finished') 
                    
sink = print_sink()
squared = squared_pipe(sink)
mrp1 = my_range_pipe(3, squared)
mrp2 = my_range_pipe(3, squared)
mrp3 = my_range_pipe(7, squared)
next(sink)
next(squared)
next(mrp1)
next(mrp2)
next(mrp3)

s = Scheduler() 
p = Pool(2)
p.add(mrp1)
p.add(mrp2)
s.add(p)
s.add(mrp3)
s.join()

0
0
1
1
4
Finished
4
Finished
0
1
4
9
16
25
36
Finished


Green pseudothreads, or [Greenlets](https://github.com/python-greenlet/greenlet/blob/master/greenlet.c)

In [None]:
# Cooperative Multitasking to Concurrency

In [49]:
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1111))

data = ('TASK_ONE ' * 1024 ** 2).encode()
len_data_sent = sock.send(data)
print(len_data_sent == len(data))

True


In [50]:
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1112))

data = ('task_two ' * 1024 ** 2).encode()
len_data_sent = sock.send(data)
print(len_data_sent == len(data))

True


In [None]:
# Non-blocking sockets

In [52]:
import socket

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1111))
sock.setblocking(0) # Now does not wait for I/O to complete

data = ('TASK_ONE ' * 1024 ** 2).encode()
len_data_sent = sock.send(data)
print(len_data_sent == len(data))


False


In [53]:
import socket
import select

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 1111))
sock.setblocking(0)

data = ('TASK_ONE ' * 1024 ** 2).encode()

while data:
    select.select([], [sock], []) # Blocks until sock is free to write to
    len_data_sent = sock.send(data)
    data = data[len_data_sent:]

print(len(data))

0


In [None]:
# Concurrent write scheduler

In [54]:
from time import sleep 

class Scheduler:
    def __init__(self):
        self.tasks = []
        self.write_waiting = {} # Maps sockets to tasks
    
    def add(self, task):
        self.tasks.append(task)
        
    def join(self):
        while self.tasks or self.write_waiting:
            if self.tasks:
                task = self.tasks.pop(0)
                try:
                    # Coroutines that do I/O now yield the socket they are waiting on
                    fd = next(task)
                    self.write_waiting[fd] = task
                except StopIteration:
                    print('Finished')
                    
            # Poll all waiting sockets and add them back to the task queue
            if self.write_waiting:
                r, w, e = select.select([],
                                        self.write_waiting.keys(),
                                        [],
                                        0) # Timeout. When zero we poll and do not block.
                for freed_fd in w:
                    freed_task = self.write_waiting[freed_fd]
                    self.write_waiting.pop(freed_fd, None)
                    self.tasks.append(freed_task)


In [56]:
import socket
import select

def send_msg(port, msg):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect(('localhost', port))
    sock.setblocking(0)

    data = (msg).encode()

    while data:
        try:
            len_data_sent = sock.send(data)
            data = data[len_data_sent:]
        except BlockingIOError:
            yield (sock.fileno()) # When socket is unwrittable, yield socket fd to scheduler

s = Scheduler()
t1 = send_msg(1111, 'TASK_ONE ' * 2 * 1024 ** 2)
t2 = send_msg(1112, 'task_two ' * 2 * 1024 ** 2)
next(t1)
next(t2)
s.add(t2)
s.add(t1)
s.join()

Finished
Finished


Farewell to generator-based coroutines.

[AsyncIO](https://docs.python.org/3/library/asyncio-task.html#generator-based-coroutines)

In [None]:
# Mutual Recursion w/ AsyncIO

> Co-routines are to state machines what recursion is to stacks

Eli Bendersky ('Co-routines as an alternative to state machines')

In [None]:
# Due diligence
email = vishal.prasad@onepeloton.com
https://github.com/koreindian # Files will be put here later
Pygotham Video up on Youtube.
Check out Pygotham 2018 videos on https://pyvideo.org