<center>
    <img src="https://upload.wikimedia.org/wikipedia/commons/a/a8/%D0%9B%D0%9E%D0%93%D0%9E_%D0%A8%D0%90%D0%94.png" width=500px/>
    <font>Python 2024</font><br/>
    <br/>
    <br/>
    <b style="font-size: 2em">Корутины, async/await, asyncio</b><br/>
    <br/>
    <font>Ярослав Золотарёв, по материалам Вадима Мазаева🫡</font><br/>
</center>

Напоминание: IO-bound операции — операции, связанные с длительным ожиданием другого устройства, например, сетевой карты или диска

<center>
<img src="https://blog-assets.risingstack.com/2016/Apr/non_async_blocking_operations_example_in_node_hero-1459856858194.png" alt="io-operations" width=800/>
</center>

### DB operation

In [None]:
from datetime import date
import pymongo  # for async, use motor! 

client = MongoClient('localhost', 27017)

posts = client['web_db']['posts']

# blocks until DB answers
results = posts.find({'author': 'Vadim', 'date': date.today})

### Http request

In [None]:
import requests

# blocks until site returns response
response = requests.get('http://some.site')

## Asynchronous I/O

<center>
<img 
src="https://www.koyeb.com/_next/image?url=%2Fstatic%2Fimages%2Fblog%2Fsync-vs-async-schema.png&w=750&q=75" alt="sync-vs-async" width=700/>
</center>

Asynchronous I/O — неблокирующая обработка ввода/вывода, которая позволяет процессу продолжить выполнение не дожидаясь окончания передачи данных.

# Корутины (coroutines)

### с самого начала...

In [1]:
def eager_range(up_to: int) -> list[int]:
    sequence = []
    index = 0
    while index < up_to:
        sequence.append(index)
        index += 1
    return sequence

Начиная с Python 2.2 в языке появились генераторы

In [2]:
from collections.abc import Generator

In [3]:
def lazy_range(up_to: int) -> Generator[int, None, None]:
    index = 0
    while index < up_to:
        yield index
        index += 1

В Python 2.5 вводят метод `send()`

In [4]:
def jumping_range(up_to: int) -> Generator[int, int | None, None]:
    index = 0
    while index < up_to:
        jump = yield index
        if jump is None:
            jump = 1
        index += jump

In [5]:
generator = jumping_range(5)

In [6]:
print('next   :', next(generator))
print('send  2:', generator.send(2))
print('next   :', next(generator))
print('send -1:', generator.send(-1))

next   : 0
send  2: 2
next   : 3
send -1: 2


В Python 3.3 добавляется важный синтаксический сахар `yield from`

In [7]:
def bottom() -> Generator[int, None, int]:
    yield 42
    yield 84
    return 0

In [8]:
def top() -> Generator[int, None, None]:
    value = yield from bottom()
    print(value)

In [9]:
list(top())

0


[42, 84]

Наконец, в Python 3.4 вводят фреймворк `asyncio`

In [10]:
import asyncio
import types

In [11]:
@types.coroutine  # asyncio.coroutine doesn't work after 3.10
def countdown(n: int) -> Generator[asyncio.Future, None, None]:
    while n > 0:
        print(n)
        yield from asyncio.sleep(1)
        n -= 1

In [12]:
next(countdown(3))

3


<Future pending>

In [13]:
# to start nested event loop inside jupyter notebook
import nest_asyncio
nest_asyncio.apply()

In [14]:
loop = asyncio.get_event_loop()
loop.run_until_complete(countdown(3))

3
2
1


Довольно быстро становится понятно, что в языке теперь есть некоторая путаница между генераторами и корутинами

Поэтому в следующем же обновлении (Python 3.5) вводят `async/await`, заменив generator-based корутины на встроенные в язык

In [15]:
async def compute(a: int, b: int) -> int:
    print('Compute...')
    await asyncio.sleep(1)
    return a + b

In [16]:
compute(3, 5)

<coroutine object compute at 0x7f365863a400>

In [17]:
next(compute(3, 5))

  next(compute(3, 5))


TypeError: 'coroutine' object is not an iterator

In [18]:
next(compute(3, 5).__await__())  # awaitable

Compute...


<Future pending>

In [19]:
loop = asyncio.get_event_loop()
loop.run_until_complete(compute(3, 5))

Compute...


8

А затем в Python 3.6 появится возможность реализовывать асинхронные генераторы

In [20]:
async def ticker(upto: int) -> Generator[int, None, None]:
    for i in range(upto):
        print(f"iteration {i}")
        await asyncio.sleep(1)
        yield i

In [21]:
next(ticker(2).__anext__().__await__())

<Future pending>

In [22]:
loop = asyncio.get_event_loop()
loop.run_until_complete(ticker(2).__anext__())  # anext(...) from 3.10

0

# Event Loop

In [23]:
async def compute(a: int, b: int) -> int:
    print('Compute...')
    await asyncio.sleep(1)
    return a + b

In [24]:
async def print_sum(a: int, b: int) -> None:
    result = await compute(a, b)
    print(f'{a} + {b} = {result}')

In [25]:
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))

Compute...
1 + 2 = 3


<center>
<img src="http://ntoll.org/static/images/tulip_coro.png" alt="event-loop" width=1200/>
</center>

In [26]:
# Simplified code from
# https://github.com/python/cpython/blob/1d1bb95abdcafe92c771fb3dc4722351b032cc24/Lib/asyncio/tasks.py#L630
async def sleep(delay: float) -> None:
    if delay <= 0:
        await __sleep0()
        return result

    loop = asyncio.get_running_loop()
    future = loop.create_future()
    loop.call_later(
        delay,
        future.set_result,
        None,
    )
    
    await future

In [27]:
@types.coroutine
def __sleep0() -> Generator[None, None, None]:
    yield

# Современный asyncio

## python 3.9+

## Hello world

In [28]:
async def main() -> None:
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

In [29]:
asyncio.run(main())

Hello ...
... World!


## Шедулинг корутин

In [5]:
async def say_after(delay: int, what: str) -> None:
    print(f'wait for {delay}s')
    await asyncio.sleep(delay)
    print(what)

In [6]:
import time

In [11]:
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
# say_after(2, 'world')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

started at 10:18:22
wait for 1s
hello
wait for 2s
world
finished at 10:18:25


In [12]:
task1 = asyncio.create_task(say_after(2, 'hello'))
task2 = asyncio.create_task(say_after(1, 'world'))

print(f"started at {time.strftime('%X')}")

await task1
await task2  # can just ignore it

print(f"finished at {time.strftime('%X')}")

started at 10:18:25
wait for 2s
wait for 1s
world
hello
finished at 10:18:27


Задачи (tasks) используются, чтобы запланировать (schedule) корутины на выполнение "параллельно"

Когда из корутины создают задачу через `asyncio.create_task()`, она автоматически запускается на следующем такте event loop'а (run soon)

## asyncio.gather

In [19]:
async def factorial(name: str, number: int) -> None:
    result = 1
    for i in range(2, number + 1):
#         if number % 2 and i == 2:
#             raise RuntimeError('Whoops!')
        print(f'Task {name}: Compute factorial({i})...')
        await asyncio.sleep(1)
        result *= i
    print(f'Task {name}: factorial({number}) = {result}')
    return result

In [20]:
await asyncio.gather(
    factorial('A', 2),
    factorial('B', 3),
    factorial('C', 4),
#     return_exceptions=True,
)

# Q: How to parse exceptions? If there are 2 different exceptions? 
# A: ExceptionGroup since 3.11

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24


[2, 6, 24]

## asyncio.TaskGroup
#### python 3.11+

In [30]:
async def factorial(name: str, number: int) -> None:
    result = 1
    for i in range(2, number + 1):
        if number == 2:
            raise RuntimeError('Whoops 2!')
        if number == 3:
            raise ValueError('Whoops 3!')
        print(f'Task {name}: Compute factorial({i})...')
        await asyncio.sleep(1)
        result *= i
    print(f'Task {name}: factorial({number}) = {result}')
    return result

In [31]:
try:
    async with asyncio.TaskGroup() as tg:
        tg.create_task(factorial('A', 2))
        tg.create_task(factorial('B', 3))
        tg.create_task(factorial('C', 4))
        print('tasks pending to start')
except* (ValueError, RuntimeError) as excgroup:
    for exc in excgroup.exceptions:
        print(f'Caught exception: {exc} with class {exc.__class__.__name__}')

print('tasks complete')

tasks pending to start
Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
tasks complete


## Ожидание & таймауты

In [32]:
async def eternity() -> None:
    await asyncio.sleep(3600)  # 1 hour sleep
    print('yay!')

In [33]:
try:
    await asyncio.wait_for(eternity(), timeout=1)  # wait for at most 1 second
except TimeoutError:
    print('timeout!')

timeout!


## Отмена (cancellation) корутин

In [34]:
async def another_eternity() -> None:
    try:
        await asyncio.sleep(3600)
        print(f'{time.strftime("%X")} yay!')
    except asyncio.CancelledError:
        print(f"{time.strftime("%X")} Hey, I am cancelled!")
        raise
    finally:
        await asyncio.sleep(3)  # 30
        print(f'{time.strftime("%X")} coro finished')

In [35]:
print(f'{time.strftime("%X")} started')
try:
    # Send CancelledError to coro after 1 sec and wait for coro completion
    await asyncio.wait_for(another_eternity(), timeout=1)
except asyncio.TimeoutError:
    print(f'{time.strftime("%X")} timeout!')
    
print(f'{time.strftime("%X")} block finished')

10:26:28 started
10:26:29 Hey, I am cancelled!
10:26:32 coro finished
10:26:32 timeout!
10:26:32 block finished


## Защита (shield) от отмены корутин

In [36]:
async def important_task() -> None:
    try:
        print(f'{time.strftime("%X")} important task started')
        await asyncio.sleep(5)
        print(f'{time.strftime("%X")} important task finished')
    except asyncio.CancelledError:
        print(f"{time.strftime("%X")} Hey, I am cancelled!")
        raise

In [37]:
print(f'{time.strftime("%X")} started')
try:
    await asyncio.wait_for(asyncio.shield(important_task()), timeout=1.0)
except asyncio.TimeoutError:
    print(f'{time.strftime("%X")} timeout!')
    
print(f'{time.strftime("%X")} block finished')

10:26:56 started
10:26:56 important task started
10:26:57 timeout!
10:26:57 block finished
10:27:01 important task finished


## as_completed

In [38]:
async def factorial(number: int) -> tuple[int, int]:
    result = 1
    for i in range(2, number + 1):
        await asyncio.sleep(1)
        result *= i
    return number, result

In [39]:
for i, future in enumerate(asyncio.as_completed([factorial(4), factorial(3),
                                                 factorial(5), factorial(2)])):
    number, result = await future
    print(f"Factorial({number}) = {result}")

Factorial(2) = 2
Factorial(3) = 6
Factorial(4) = 24
Factorial(5) = 120


## async with

Асинхронный контекстный менеджер - это контекстный менеджер,

который умеет приостанавливать выполнение в методах

входа и выхода: `__aenter__()`, `__aexit__()`

In [None]:
lock = asyncio.Lock()

async with lock:
    # access shared state
    # Coro 1
    # await asyncio.sleep(1)
    # Coro 2
    # await asyncio.sleep(2)

### aiohttp

In [66]:
import aiohttp

In [67]:
async with aiohttp.ClientSession() as session:  # closing connection requires having an active event loop
    async with session.get('http://ya.ru') as resp:
        text = await resp.text()
        print(text[:70], '...')

<!doctype html><html prefix="og: http://ogp.me/ns#" lang="ru"><meta ht ...


In [68]:
import requests

In [69]:
with requests.Session() as session:
    with session.get('http://ya.ru') as resp:
        text = resp.text
        print(text[:70], '...')

<!doctype html><html prefix="og: http://ogp.me/ns#" lang="ru"><meta ht ...


## async for

Асинхронный итерируемый (iterable) объект должен возвращать асинхронный итератор из его iter-метода (`__aiter__()`).

Асинхронный итератор должен возвращать awaitable-объект, который возвращает следующее значение итератора при вызове `await`, либо кидать `StopAsyncIteration` из его next-метода (`__anext__()`).

In [70]:
from collections.abc import Iterator

In [71]:
async def ticker(upto: int) -> Iterator[int]:
    for i in range(upto):
        await asyncio.sleep(1)
        yield i

In [72]:
async for i in ticker(3):
    print(i, end=' ')

0 1 2 

In [73]:
async_iter = aiter(ticker(2))  # 3.10+
async_iter

<async_generator object ticker at 0x7f361c517370>

In [74]:
await anext(async_iter)  # 3.10+

0

In [75]:
await anext(async_iter)

1

In [76]:
await anext(async_iter)

StopAsyncIteration: 

## Запуск синхронного кода

In [77]:
def blocking_io() -> None:
    print(f"{time.strftime('%X')} start blocking IO")
    time.sleep(5)  # blocks, but at least releases GIL
    # Non-releasing GIL code will block the event loop!
    print(f"{time.strftime('%X')} finished blocking IO")

In [78]:
non_blocking_io = asyncio.to_thread(blocking_io)  # N threads is more expensive than 1 thread with N coros
# await non_blocking_io  <-- blocks if GIL is not released inside the sync coro

In [79]:
print(f"{time.strftime('%X')} start gather")
await asyncio.gather(non_blocking_io, asyncio.sleep(5))
print(f"{time.strftime('%X')} finished gather")

00:42:35 start gather
00:42:35 start blocking IO
00:42:40 finished blocking IO
00:42:40 finished gather


## Debugging asyncio

`$ PYTHONASYNCIODEBUG=1 python asyncio_program_to_debug.py`

https://docs.python.org/3/library/asyncio-dev.html

<center>
<img 
src="https://foni.papik.pro/uploads/posts/2024-10/foni-papik-pro-155x-p-kartinki-yeralash-vse-na-prozrachnom-fone-9.png" alt="sync-vs-async" width=500/>
</center>