<center>
<img src="https://upload.wikimedia.org/wikipedia/commons/a/a8/%D0%9B%D0%9E%D0%93%D0%9E_%D0%A8%D0%90%D0%94.png" width=400/>
    <b style="font-size: 30px">Python. Корутины, async/await, asyncio</b>
    <br/>
    <br/>
<b style="font-size: 20px">Вадим Мазаев</b>
</center>

Напоминание: IO-bound операции — операции, связанные с длительным ожиданием другого устройства, например, сетевой карты или диска

<center>
<img src="https://blog-assets.risingstack.com/2016/Apr/non_async_blocking_operations_example_in_node_hero-1459856858194.png" alt="io-operations" width=800/>
</center>

### DB operation

In [None]:
from datetime import date
import pymongo

client = MongoClient('localhost', 27017)

posts = client['web_db']['posts']

# blocks until DB answers
results = posts.find({'author': 'Vadim', 'date': date.today})

### Http request

In [5]:
import requests

# blocks until site returns response
response = requests.get('http://very.slow.site')

## Asynchronous I/O

<center>
<img src="https://camo.githubusercontent.com/a05fd290b0ad342a6721ca3fc66d7ed65c004fa4/68747470733a2f2f63646e2d696d616765732d312e6d656469756d2e636f6d2f6d61782f313630302f312a36306975674742484d46375050536e2d6664517248512e706e67" alt="sync-vs-async" width=700/>
</center>

Asynchronous I/O — неблокирующяя обработка ввода/вывода, которая позволяет процессу продолжить выполнение не дожидаясь окончания передачи данных.

# Coroutines

### с самого начала...

In [2]:
def eager_range(up_to: int) -> list[int]:
    sequence = []
    index = 0
    while index < up_to:
        sequence.append(index)
        index += 1
    return sequence

Начиная с Python 2.2 в языке появились генераторы

In [1]:
from collections.abc import Generator

In [3]:
def lazy_range(up_to: int) -> Generator[int, None, None]:
    index = 0
    while index < up_to:
        yield index
        index += 1

В Python 2.5 вводят метод `send()`

In [4]:
import typing as tp

In [5]:
def jumping_range(up_to: int) -> Generator[int, tp.Optional[int], None]:
    index = 0
    while index < up_to:
        jump = yield index
        if jump is None:
            jump = 1
        index += jump

In [5]:
generator = jumping_range(5)

In [6]:
print('next   :', next(generator))
print('send  2:', generator.send(2))
print('next   :', next(generator))
print('send -1:', generator.send(-1))

next   : 0
send  2: 2
next   : 3
send -1: 2


В Python 3.3 добавляется важный синтаксический сахар `yield from`

In [26]:
def bottom() -> Generator[int, None, int]:
    yield 42
    yield 128
    return 24

In [27]:
def top() -> Generator[int, None, None]:
    value = yield from bottom()
    yield value

In [29]:
list(top())

[42, 128, 24]

Наконец, в Python 3.4 вводят фреймворк `asyncio`

In [41]:
import asyncio
from collections.abc import Generator

In [42]:
@asyncio.coroutine
def countdown(n: int) -> Generator[asyncio.Future, None, None]:
    while n > 0:
        print(n)
        yield from asyncio.sleep(1)
        n -= 1

  def countdown(n: int) -> Generator[asyncio.Future, None, None]:


In [38]:
next(countdown(3))

3


<Future pending>

In [35]:
# to start nested event loop inside jupyter notebook
import nest_asyncio
nest_asyncio.apply()

In [40]:
loop = asyncio.get_event_loop()
loop.run_until_complete(countdown(3))

3
2
1


Довольно быстро становится понятно, что в языке теперь есть некоторая путаница между генераторами и корутинами

И уже в Python 3.5 вводят `async/await`, заменив generator-based корутины на встроенные в язык

In [44]:
async def compute(a: int, b: int) -> int:
    print('Compute...')
    await asyncio.sleep(1.0)
    return a + b

In [46]:
compute(3, 5)

<coroutine object compute at 0x7f847d232d40>

In [47]:
loop = asyncio.get_event_loop()
loop.run_until_complete(compute(3, 5))

Compute...


8

А затем в Python 3.6 появится возможность реализовывать асинхронные генераторы

In [45]:
async def ticker(delay: int, to: int) -> None:
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

# Event Loop

In [48]:
async def compute(a: int, b: int) -> int:
    print('Compute...')
    await asyncio.sleep(1.0)
    return a + b

In [49]:
async def print_sum(a: int, b: int) -> None:
    result = await compute(a, b)
    print(f'{a} + {b} = {result}')

In [50]:
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))

Compute...
1 + 2 = 3


<center>
<img src="http://ntoll.org/static/images/tulip_coro.png" alt="event-loop" width=1200/>
</center>

# Современный asyncio

## python 3.9+

## Hello world

In [58]:
async def main() -> None:
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

In [59]:
asyncio.run(main())

Hello ...
... World!


## Шедулинг корутин

In [61]:
async def say_after(delay: int, what: str) -> None:
    print(f'wait for {delay}')
    await asyncio.sleep(delay)
    print(what)

In [60]:
import time

In [62]:
print(f"started at {time.strftime('%X')}")

await say_after(1, 'hello')
await say_after(2, 'world')

print(f"finished at {time.strftime('%X')}")

started at 19:42:50
hello
world
finished at 19:42:53


In [63]:
task1 = asyncio.create_task(say_after(2, 'hello'))
task2 = asyncio.create_task(say_after(1, 'world'))

print(f"started at {time.strftime('%X')}")

await task1
await task2

print(f"finished at {time.strftime('%X')}")

started at 19:45:36
wait for 2
wait for 1
world
hello
finished at 19:45:38


Задачи (Tasks) используются, чтобы запланировать (schedule) корутины на выполнение "параллельно"

Когда корутину оборачивают в задачу с помощью `asyncio.create_task()`, корутина автоматически планируется к выполнению в скором времени (run soon)

## asyncio.gather

In [64]:
async def factorial(name: str, number: int) -> None:
    result = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        if i > 3:
            raise Exception('exception text')
        result *= i
    print(f"Task {name}: factorial({number}) = {result}")
    return result

In [65]:
a, b, c = await asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
    return_exceptions=True,
)
print(a, b, c)

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
2 6 exception text


## Ожидание & таймауты

In [66]:
async def eternity() -> None:
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

In [67]:
# Wait for at most 1 second
try:
    await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
    print('timeout!')

timeout!


## Отмена (cancellation) корутин и защита от отмены

In [68]:
async def another_eternity() -> None:
    try:
        await asyncio.sleep(3600)
        print(f'{time.strftime("%X")} yay!')
    except asyncio.CancelledError:
        print(f'{time.strftime("%X")} cancelled')
        await asyncio.sleep(5)

In [69]:
print(f'{time.strftime("%X")} started')
try:
    # Send CancelledError to coro after 1 sec and wait for coro completion
    await asyncio.wait_for(another_eternity(), timeout=1.0)
except asyncio.TimeoutError:
    print(f'{time.strftime("%X")} timeout!')
print(f'{time.strftime("%X")} finished')

19:55:25 started
19:55:26 cancelled
19:55:31 timeout!
19:55:31 finished


In [70]:
async def important_task() -> None:
    print(f'{time.strftime("%X")} important task started')
    await asyncio.sleep(5)
    print(f'{time.strftime("%X")} important task finished')

In [71]:
print(f'{time.strftime("%X")} started')
try:
    await asyncio.wait_for(asyncio.shield(important_task()), timeout=1.0)
except asyncio.TimeoutError:
    print(f'{time.strftime("%X")} timeout!')
    await asyncio.sleep(1)
print(f'{time.strftime("%X")} finished')

20:00:15 started
20:00:15 important task started
20:00:16 timeout!
20:00:17 finished
20:00:20 important task finished


## as_completed

In [72]:
async def factorial(number: int) -> tuple[int, int]:
    result = 1
    for i in range(2, number + 1):
        await asyncio.sleep(1)
        result *= i
    return number, result

In [73]:
for i, future in enumerate(asyncio.as_completed([factorial(4), factorial(3),
                                                 factorial(5), factorial(2)])):
    number, result = await future
    print(f"Factorial({number}) = {result}")

Factorial(2) = 2
Factorial(3) = 6
Factorial(4) = 24
Factorial(5) = 120


## async with

Асинхронный контекстный менеджер - это контекстный менеджер, который умеет приостанавливать выполнение в методах входа и выхода: `__aenter__()`, `__aexit__()`

In [None]:
lock = asyncio.Lock()

async with lock:
    # access shared state

### aiohttp

In [74]:
import aiohttp

In [92]:
async with aiohttp.ClientSession() as session:
    async with session.get('http://yandex.ru') as resp:
        text = await resp.text()
        print(text[:70], '...')

<!DOCTYPE html><html   class="i-ua_js_no i-ua_css_standart i-ua_browse ...


In [95]:
import requests

In [96]:
with requests.Session() as session:
    with session.get('http://yandex.ru') as resp:
        text = resp.text
        print(text[:70], '...')

<!DOCTYPE html><html   class="i-ua_js_no i-ua_css_standart i-ua_browse ...


## async for

Асинхронный итерируемый (iterable) объект может вызывать асинхронный код внутри его iter-метода (`__aiter__()`), а асинхронный итератор может выполнять асинхронный код внутри его next-метода (`__anext__()`).

In [76]:
async def ticker(to: int) -> tp.Iterable[int]:
    for i in range(to):
        yield i
        await asyncio.sleep(1)

In [77]:
async for i in ticker(3):
    print(i, end=' ')

0 1 2 

### Motor

In [97]:
from motor.motor_asyncio import AsyncIOMotorClient

In [98]:
client = AsyncIOMotorClient('localhost', 27017)

In [104]:
await client.test_database.test_collection.insert_one({'key': 'value'})

<pymongo.results.InsertOneResult at 0x7f847d034540>

In [105]:
async for document in client.test_database.test_collection.find({'key': 'value'}):
        print(document)

{'_id': ObjectId('619c15d7abc574f6aa38fba4'), 'key': 'value'}


## Запуск синхронного кода

In [106]:
def blocking_io() -> None:
    print(f"{time.strftime('%X')} start blocking IO")
    time.sleep(5)
    print(f"{time.strftime('%X')} finished blocking IO")

In [107]:
non_blocking_io = asyncio.to_thread(blocking_io)

In [108]:
print(f"{time.strftime('%X')} start gather")
_ = await asyncio.gather(non_blocking_io, asyncio.sleep(5))
print(f"{time.strftime('%X')} finished gather")

20:16:12 start gather
20:16:12 start blocking IO
20:16:17 finished blocking IO
20:16:17 finished gather


## Debugging asyncio

`$ PYTHONASYNCIODEBUG=1 python asyncio_program_to_debug.py`

https://docs.python.org/3/library/asyncio-dev.html

# Спасибо за внимание!