#### meetup python toulouse

# asyncio
## Pourquoi c'est nécessaire et comment ça marche ?

Retours sur l'histoire et le besoin de la concurrence en developpement, puis sur l'implementation en python de la librairie de reference d'IO asynchrone en python

# Who am I?

obayemi (Martial Elegbede)

Lead architect / full stack dev @Sensinov

Full time mainly django dev since 2017

Almost exclusively startups

# Why is asyncio

### What is multitasking

### How is asyncio


# An history of multitasking

# Multitasking #1: Processes

PROS:

- Whole other program
- Simple
- can use more than one cpu core

CONS:

- kernel land
- slow to start
- no shared memory without IPC
- lots of overhead (program memory)

# Multitasking #2: Threads

PROS:

- Same program, shared memory
- faster to boot (not a full process)

CONS:

- kernel land
- still slow to start

# Multitasking #3: coroutines (AKA green threads, cooperative, or non preemptive)

PROS:

- same program, shared memory
- no boot penalty
- no syscall
- cooperation between the scheduler and the tasks

CONS:

- single threaded

# Parrallelism

- multiple process
- multiple threads

# Concurrency

- coroutines

# Example: Nginx and the C10K problem

# In python: gunicorn worker types

- sync
  
- gthreads

- eventlet
- gevent
- tornado

- (also asyncio now but lets not talk about it _yet_)

All are both network/IO library and executor

Complex implementation or rely on heavy monkey patching

In [1]:
import time
import asyncio
import socket
from itertools import islice
from datetime import datetime, timedelta
import select
from functools import wraps
import warnings
warnings.filterwarnings("ignore")

now = datetime.now
class Timer:
    def __init__(self, prefix=None):
        self.start = datetime.now()
        self.prefix = prefix
    
    def prefixfmt(slef):
        if p := slef.prefix:
            return f'[{p}]'
        return ''
        
    def log(self, message):
        delta = datetime.now() - self.start
        print(f'{self.prefixfmt()}[{delta}] {message}')

# Asyncio: aiming for a better solution

# Blocking IO vs non blocking IO

In [2]:
def blocking_io():
    t = Timer()
    t.log('starting function')
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(('127.0.0.1', 8080))
    t.log('connected')
    s.sendall(f'Hello, world'.encode())
    t.log('sent payload')
    data = s.recv(1024)
    t.log('recieved response')
    s.close()
    return data.decode()

print(f'response: "{blocking_io()}"')

[0:00:00.000003] starting function
[0:00:00.000188] connected
[0:00:00.000218] sent payload
[0:00:02.001454] recieved response
response: "Hello, world"


In [3]:
def non_blocking_io():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(('127.0.0.1', 8080))

    s.setblocking(False)
    
    s.sendall(f'Hello, world'.encode()) # ???
    data = s.recv(1024) # ???
    s.close()
    return data.decode()

# print(f'response: "{non_blocking_io()}"')

# Extracting socket setup for readability

In [4]:
def setup_non_blocking_socket():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.connect(('127.0.0.1', 8080))
    s.setblocking(False)
    s.sendall('Hello, world'.encode())
    return s

In [5]:
def non_blocking_io():
    t = Timer()
    s = setup_non_blocking_socket()
    
    while True:
        r,_,_ = select.select([s.fileno()], [], [], 0)
        t.log('socket not ready to read')
        if len(r) > 0: break
        time.sleep(0.5)
        
    data = s.recv(1024)
    s.close()
    return data.decode()

print(f'response: "{non_blocking_io()}"')

[0:00:00.000192] socket not ready to read
[0:00:00.500378] socket not ready to read
[0:00:01.000719] socket not ready to read
[0:00:01.501463] socket not ready to read
[0:00:02.001629] socket not ready to read
response: "Hello, world"


# Generators are coroutines

In [6]:
def coroutine():
    a = 1
    while True:
        a += 1
        print('inside coroutine', a)
        yield a

gen = coroutine()
print(next(gen))
print(next(gen))
print(next(gen))
print(next(gen))
print(next(gen))

inside coroutine 2
2
inside coroutine 3
3
inside coroutine 4
4
inside coroutine 5
5
inside coroutine 6
6


In [7]:
def really_non_blocking_io():
    s = setup_non_blocking_socket()
    
    while True:
        r,_,_ = select.select([s.fileno()], [], [], 0)
        if len(r) > 0: break
        yield
        
    data = s.recv(1024)
    s.close()
    return data.decode()

In [8]:
gen = really_non_blocking_io()
try:
    while True:
        next(gen)
        print('waiting...')
        time.sleep(.5)
except StopIteration as response:
    print(f'response: "{response.value}"')

waiting...
waiting...
waiting...
waiting...
waiting...
response: "Hello, world"


# Async executor

In [9]:
def run_until_complete(task):
    try:
        while True:
            next(task)
            time.sleep(.5)
    except StopIteration as response:
        return response.value

In [10]:
# cheeky redefinition for slightly more features
def run_until_complete(task, wait=.05, debug=None):
    try:
        while True:
            next(task)
            if debug:
                if type(debug) == bool:
                    print('executor waiting...')
                else:
                    debug()
            time.sleep(wait)
    except StopIteration as response:
        return response.value

In [11]:
t = Timer()
task = really_non_blocking_io()
t.log('running task')
ret = run_until_complete(task)
t.log(f'{ret=}')

[0:00:00.000048] running task
[0:00:02.009647] ret='Hello, world'


# async IO

In [12]:
class NonBlockingSocket:
    def __init__(self, address=('127.0.0.1', 8080)):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.socket.connect(address)
        self.socket.setblocking(False)

    def sendall(self, value):
        while len(select.select([], [self.socket.fileno()], [], 0)[1]) == 0:
            yield
        return self.socket.sendall(value)
        
    def recv(self, amount):
        while len(select.select([self.socket.fileno()], [], [], 0)[0]) == 0:
            yield
        return self.socket.recv(amount)

    def close(self):
        return self.socket.close()

In [13]:
def really_non_blocking_io():
    s = NonBlockingSocket()
    s.sendall("hello world")
    data = s.recv(1024)
    s.close()
    return data.decode()
    
# ret = really_non_blocking_io()
# print(f'response: "{ret}"')

In [14]:
def not_really_non_blocking_io():
    t = Timer()
    s = NonBlockingSocket()
    t.log('send')
    run_until_complete(s.sendall('Hello world'.encode()))
    t.log('recv')
    data = run_until_complete(s.recv(1024))
    t.log('close')
    s.close()
    return data.decode()

print(not_really_non_blocking_io())

[0:00:00.000098] send
[0:00:00.000211] recv
[0:00:02.016236] close
Hello world


# Generators all the way down

In [15]:
def really_really_non_blocking_io():
    t = Timer()
    s = NonBlockingSocket()

    gen = s.sendall('Hello world'.encode())
    try:
        while True:
            t.log('trying to write')
            yield next(gen)
    except StopIteration as response:
        pass
    
    gen = s.recv(1024)
    try:
        while True:
            t.log('trying to read')
            yield next(gen)
    except StopIteration as response:
        s.close()
        return response.value.decode()

print(run_until_complete(really_really_non_blocking_io(), wait=.5))

[0:00:00.000480] trying to write
[0:00:00.000592] trying to read
[0:00:00.500742] trying to read
[0:00:01.000961] trying to read
[0:00:01.501191] trying to read
[0:00:02.001779] trying to read
Hello world


# Yield from

In [16]:
def really_really_non_blocking_io():
    s = NonBlockingSocket()
    yield from s.sendall('Hello world'.encode())
    data = yield from s.recv(1024)
    s.close()
    return data.decode()

print(run_until_complete(really_really_non_blocking_io(), wait=.5, debug=True))

executor waiting...
executor waiting...
executor waiting...
executor waiting...
executor waiting...
Hello world


# Awaitable

In [17]:
# how to write :
async def func():
    await really_non_blocking_io(1)
# await func()

In [18]:
def make_awaitable(f):
    @wraps(f)
    def wrapper(*args, **kwds):
        class Awaitable:
            def __init__(self, gen):
                self._gen = gen
            def __await__(self):
                ret = yield from self._gen
                return ret
            __iter__ = __await__
            def __next__(self):
                next(self._gen)
        return Awaitable(f(*args, **kwds))
    return wrapper

awaitable_non_blocking_io = make_awaitable(really_really_non_blocking_io)
await awaitable_non_blocking_io()

'Hello world'

# Actual event loop

```python
loop.create_task(awaitable_non_blocking_io()) #  ??
loop.create_task(awaitable_non_blocking_io()) #  ??
loop.run() #  ??
```

In [19]:
@make_awaitable
def asleep(s):
    target = now() + timedelta(seconds=s)
    while now() < target:
        yield
    return

In [20]:
class Task:
    '''
    Lets not talk about this too much for now
    '''
    def __init__(self, coro):
        self._coro = coro
        
    def _step(self):
        try:
            return self._coro.send(None)
        except StopIteration as result:
            raise result

In [21]:
class BaseTask:
    def __init__(self, coro, name=None):
        self._coro = coro
        self._name = name

    def __repr__(self):
        return f'<{self._name}>' if self._name is not None else super().__repr__()
        
    def _step(self):
        try:
            return self._coro.send(None)
        except StopIteration as result:
            raise result

class Task(BaseTask):
    pass

In [22]:
class BaseLoop:
    tasks = []

    def create_task(self, coro):
        task = Task(coro)
        self._schedule_task(task)
        return task

    def _schedule_task(self, task):
        self.tasks.append(task)

    def run_until_complete(self):
        while len(self.tasks) != 0:
            print('loop queue', self.tasks)
            task = self.tasks.pop(0)
            self.advance(task)
            time.sleep(0.5)

In [23]:
# sneaky rewrite to add the immediate arg and not use a global tasks list

class BaseLoop:
    def __init__(self):
        self.tasks = []

    def create_task(self, coro, name=None):
        task = Task(coro, name)
        self._schedule_task(task)
        return task

    def _schedule_task(self, task, immediately=False):
        if immediately:
            self.tasks.insert(0, task)
        else:
            self.tasks.append(task)

    def run_until_complete(self):
        while len(self.tasks) != 0:
            print('loop queue', self.tasks)
            task = self.tasks.pop(0)
            self.advance(task)
            time.sleep(0.5)

In [24]:
class Loop(BaseLoop):
    def advance(self, task):
        try:
            task._step()
            self.tasks.append(task)
        except StopIteration as result:
            print(f'task {task!r} result: "{result.value}"')

In [35]:
async def awaitable_io(n):
    await asleep(2)
    return f'Hello world {n}'

async def wait_and_print(n):
    result = await awaitable_io(n)
    print(f' -> received "{result}"')
    return result

In [36]:
l = Loop()
l.create_task(wait_and_print(1), 'task1')
l.create_task(wait_and_print(2), 'task2')
l.run_until_complete()

loop queue [<task1>, <task2>]
loop queue [<task2>, <task1>]
loop queue [<task1>, <task2>]
loop queue [<task2>, <task1>]
loop queue [<task1>, <task2>]
 -> received "Hello world 1"
set_result <task1>: Hello world 1
task <task1> result: "Hello world 1"
loop queue [<task2>]
 -> received "Hello world 2"
set_result <task2>: Hello world 2
task <task2> result: "Hello world 2"


# Tasks

In [37]:
async def run_subtask():
    await asleep(1)

l.create_task(run_subtask(), 'run_subtask')
l.run_until_complete()

loop queue [<run_subtask>]
loop queue [<run_subtask>]
loop queue [<run_subtask>]
set_result <run_subtask>: 
task <run_subtask> result: "None"


In [28]:
async def background_task():
    await asleep(2)
    print('finished background task')

async def parent_task():
    task = l.create_task(background_task(), 'background_task')
    await asleep(1)
    # await task ?
    # print('finished waiting for background tasks')

l.create_task(parent_task(), 'parent_task')
l.run_until_complete()

loop queue [<parent_task>]
loop queue [<background_task>, <parent_task>]
loop queue [<parent_task>, <background_task>]
task <parent_task> result: "None"
loop queue [<background_task>]
loop queue [<background_task>]
loop queue [<background_task>]
finished background task
task <background_task> result: "None"


# Futures

In [29]:
from asyncio.futures import _PENDING, _CANCELLED, _FINISHED

In [38]:
class NaiveFuture:
    _state = _PENDING
    _result = None
    
    def __await__(self):
        print(f'await future {self!r}')
        while not self.done():
            print(f'await future yield {self!r}, {self._state}')
            yield self
        return self._result

    def done(self): return self._state != _PENDING

    def set_result(self, result):
        print(f'set_result {self!r}: {result}')
        self._result = result
        self._state = _FINISHED


In [39]:
class Task(BaseTask, NaiveFuture):
    def _step(self):
        try:
            return self._coro.send(None)
        except StopIteration as result:
            self.set_result(result)
            raise result

In [41]:
l = Loop()
async def background_task():
    await asleep(2)

async def parent_task():
    task1 = l.create_task(background_task(), 'task1')
    task2 = l.create_task(background_task(), 'task2')

    await asleep(1)
    print('finished main task')
    await task1
    await task2

l.create_task(parent_task(), 'parent_task')
l.run_until_complete()

loop queue [<parent_task>]
loop queue [<task1>, <task2>, <parent_task>]
loop queue [<task2>, <parent_task>, <task1>]
loop queue [<parent_task>, <task1>, <task2>]
finished main task
await future <task1>
await future yield <task1>, PENDING
loop queue [<task1>, <task2>, <parent_task>]
loop queue [<task2>, <parent_task>, <task1>]
loop queue [<parent_task>, <task1>, <task2>]
await future yield <task1>, PENDING
loop queue [<task1>, <task2>, <parent_task>]
set_result <task1>: 
task <task1> result: "None"
loop queue [<task2>, <parent_task>]
set_result <task2>: 
task <task2> result: "None"
loop queue [<parent_task>]
await future <task2>
set_result <parent_task>: 
task <parent_task> result: "None"


# Actual futures are not coroutines

In [47]:
class Future(NaiveFuture):
    _asyncio_future_blocking = False
    _callbacks = []

    def __await__(self):
        yield self
        if not self.done(): raise Exception("Future is not ready")
        return self._result

    def set_result(self, result):
        super().set_result(result)
        for c in self._callbacks: c(self)

    def register_callback(self, cb):
        self._callbacks.append(cb)

In [48]:
class Task(BaseTask, Future):
    pass

# New event loop

In [54]:
class Loop(BaseLoop):
    def advance(self, task):
        try:
            step = task._step()
            # both schedule later execution
            if asyncio.isfuture(step):
                step.register_callback(lambda task: self._schedule_task(task))
            else:
                self._schedule_task(task)
        except StopIteration as result:
            print(f'task {task!r} result: {result.value}')

# What we've learned so far

In [56]:
l = Loop()
async def background_task():
    await asleep(2)
    print('finished background task')

async def parent_task():
    task1 = l.create_task(background_task(), 'task1')
    task2 = l.create_task(background_task(), 'task2')

    await asleep(1)
    print('finished main task')
    await task1
    await task2
    print('finished waiting for background tasks')
    
l.create_task(parent_task(), 'parent_task')
l.run_until_complete()

loop queue [<parent_task>]
loop queue [<task1>, <task2>, <parent_task>]
loop queue [<task2>, <parent_task>, <task1>]
loop queue [<parent_task>, <task1>, <task2>]
finished main task
loop queue [<task1>, <task2>]
loop queue [<task2>, <task1>]
loop queue [<task1>, <task2>]
finished background task
task <task1> result: None
loop queue [<task2>]
finished background task
task <task2> result: None


# asyncio + async / await:

- non blocking io with setblocking(False)
- generators based coroutines
- syntax to make talking to coroutines easier
- event loop to schedule coroutines
- Futures = unscheduled until callback
  - (future is not scheduled, not the underlying task)


# references:

- https://stackoverflow.com/questions/49005651/how-does-asyncio-actually-work
- https://docs.python.org/3/library/asyncio-task.html
- https://docs.python.org/3/library/socket.html
- https://bbc.github.io/cloudfit-public-docs/asyncio/asyncio-part-2
- https://medium.com/vaidikkapoor/understanding-non-blocking-i-o-with-python-part-1-ec31a2e2db9b
- https://github.com/python/cpython/tree/main/Lib/asyncio