# Корутины, async/await, asyncio

# Как работает веб-сервер, обратывающий сотни тысяч запросов в секунду?

### (если процессорных ядер на машине меньше сотни)

## Concurrency

- Concurrency (конкурентность) — две или более задачи могут запускаться, выполняться и завершаться в перекрывающиеся периоды времени (наиболее общее понятие)

- Parallel execution (параллелизм) — исполнение нескольких задач одновременно в буквальном смысле, например, при помощи многоядерного процессора

- Multitheading (многопоточность) — один из способов реализации конкурентности путем выделения абстракции "рабочего потока" (возможна и на многоядерных, и на одноядерных процессорах)

- Asynchrony (асинхронность) — возникновение событий, которые происходят одновременно с выполнением программы, без блокировки программы для ожидания результатов

## 2 типа операций

- CPU-bound — нагружают вычислительные мощности текущего устройства

- IO-bound — связаны с длительным ожиданием другого устройства, например, сетевой карты или диска

![io-operations](https://blog-assets.risingstack.com/2016/Apr/non_async_blocking_operations_example_in_node_hero-1459856858194.png)

### File read

In [None]:
with open('large_file.txt') as f:
    # blocks until OS reads all the data
    data = f.read()

### DB operation

In [None]:
from datetime import date
import pymongo

client = MongoClient('localhost', 27017)

posts = client['web_db']['posts']

# blocks until DB answers
results = posts.find({'author': 'Vadim', 'date': date.today})

### Http request

In [5]:
import requests

# blocks until site returns response
response = requests.get('http://very.slow.site')

Возвращаясь к вопросу про веб-сервер...

## Asynchronous I/O

Асинхронные операции чтения/записи позволяют программе продолжать выполнение, не дожидаясь результата их исполнения

# Callbacks

In [5]:
from tornado.web import RequestHandler
from tornado.httpclient import AsyncHTTPClient, HTTPClient

In [37]:
http_client = HTTPClient()

%time response = http_client.fetch('http://yandex.ru')

print('\n{:.70}...'.format(response.body))

CPU times: user 9.11 ms, sys: 5.03 ms, total: 14.1 ms
Wall time: 957 ms

<!DOCTYPE html><html class="i-ua_js_no i-ua_css_standart i-ua_browser_...


In [29]:
def handle_response(response):
    print('\n{:.70}...'.format(response.body))

http_client = AsyncHTTPClient()

%time http_client.fetch('http://yandex.ru', callback=handle_response)

CPU times: user 1.37 ms, sys: 0 ns, total: 1.37 ms
Wall time: 62.3 ms

<!DOCTYPE html><html class="i-ua_js_no i-ua_css_standart i-ua_browser_...


### Callback hell

In [39]:
http_client = AsyncHTTPClient()


def parse_links(page):
    pass


def handle_response(response):
    if response.error:
        print('Error:', response.error)
        return

    links = parse_links(response.body)

urls = ['http://www.google.com', 'http://www.yandex.ru', 'http://www.python.org']
for url in urls:
    http_client.fetch(url, callback=handle_response)

# Coroutines

### from the very beginning...

In [40]:
def eager_range(up_to):
    sequence = []
    index = 0
    while index < up_to:
        sequence.append(index)
        index += 1
    return sequence

Начиная с Python 2.2 в языке появились генераторы

In [41]:
def lazy_range(up_to):
    index = 0
    while index < up_to:
        yield index
        index += 1

В Python 2.5 вводят метод `send()`

In [1]:
def jumping_range(up_to):
    index = 0
    while index < up_to:
        jump = yield index
        if jump is None:
            jump = 1
        index += jump

In [6]:
generator = jumping_range(5)

print('next:', next(generator))
print('send 2:', generator.send(2))
print('next:', next(generator))
print('send -1:', generator.send(-1))

next: 0
send 2: 2
next: 3
send -1: 2


В Python 3.3 добавляется довольно важный синтаксический сахар `yield from`

In [7]:
def bottom():
    return (yield 42)

def middle():
    return (yield from bottom())

def top():
    return (yield from middle())

In [8]:
gen = top()
value = next(gen)
print(value)
try:
    value = gen.send(value * 2)
except StopIteration as exc:
    value = exc.value
print(value)

42
84


Наконец, в Python 3.4 вводят фреймворк `asyncio`

In [4]:
import asyncio

In [14]:
@asyncio.coroutine
def countdown(label, n):
    while n > 0:
        print('{}: {}'.format(label, n))
        yield from asyncio.sleep(1)
        n -= 1

In [22]:
loop = asyncio.get_event_loop()
tasks = [
    countdown('A', 2),
    countdown('B', 3)
]
loop.run_until_complete(asyncio.wait(tasks))

B: 3
A: 2
B: 2
A: 1
B: 1


Довольно быстро становится понятно, что в языке теперь есть некоторая путаница между генераторами и корутинами

И уже в Python 3.5 вводят `async/await`, окончательно скрыв тот факт, что корутина - это всё тот же генератор

In [8]:
async def compute(a, b):
    print("Compute...")
    await asyncio.sleep(1.0)
    return a + b

А затем в Python 3.6 появится возможность реализовывать асинхронные генераторы

In [None]:
async def ticker(delay, to):
    for i in range(to):
        yield i
        await asyncio.sleep(delay)

# Event Loop

In [3]:
async def compute(a, b):
    print('Compute...')
    await asyncio.sleep(1.0)
    return a + b

async def print_sum(a, b):
    result = await compute(a, b)
    print('{} + {} = {}'.format(a, b, result))

In [4]:
loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

Compute...
1 + 2 = 3


![event-loop](http://ntoll.org/static/images/tulip_coro.png)

# Modern asyncio

### Python 3.7+

## Hello world

In [6]:
import asyncio

async def main():
    print('Hello ...')
    await asyncio.sleep(1)
    print('... World!')

# Python 3.7+
asyncio.run(main())

Hello ...
... World!


## Coroutines

In [9]:
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

say_after(5, 'test')

<coroutine object say_after at 0x7f492c0c49c8>

In [10]:
async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

started at 19:07:36
hello
world
finished at 19:07:39


In [21]:
async def main():
    task1 = asyncio.create_task(say_after(1, 'hello'))  # 3.7+
    task2 = asyncio.create_task(say_after(2, 'world'))
    
    print(f"started at {time.strftime('%X')}")

    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

started at 19:20:03
hello
world
finished at 19:20:05


Задачи (Tasks) используются, чтобы запланировать (schedule) корутины на выполнение "параллельно"

Когда корутину оборачивают в задачу с помощью `asyncio.create_task()`, корутина автоматически планируется к выполнению в скором времени (run soon)

## Concurrency

In [23]:
async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: factorial(2) = 2
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24


## Waiting & timeouts

In [24]:
async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

timeout!


In [8]:
async def factorial(number):
    f = 1
    for i in range(2, number + 1):
        await asyncio.sleep(1)
        f *= i
    return number, f

async def main():
    for fut in asyncio.as_completed([factorial(4), factorial(3),
                                     factorial(5), factorial(2)]):
        number, result = await fut
        print(f"Factorial({number}) = {result}")
        
asyncio.run(main())

Factorial(2) = 2
Factorial(3) = 6
Factorial(4) = 24
Factorial(5) = 120


## async with

Асинхронный контекстный менеджер - это контекстный менджер, который умеет приостанавливать выполнение в методах входа и выхода: `__aenter__()`, `__aexit__()`

In [None]:
lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

In [None]:
lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

## aiohttp

In [4]:
import aiohttp

async with aiohttp.ClientSession() as session:
    async with session.get('http://yandex.ru') as resp:
        text = await resp.text()
        print('{:.70}...'.format(text))


<!DOCTYPE html><html class="i-ua_js_no i-ua_css_standart i-ua_browser_...


### Simple server 

In [None]:
from aiohttp import web

async def handle(request):
    name = request.match_info.get('name', 'Anonymous')
    text = 'Hello, ' + name
    # do you logic asynchronously with async/await
    return web.Response(text=text)

app = web.Application()
app.add_routes([web.get('/', handle),
                web.get('/{name}', handle)])

web.run_app(app)

## async for

Асинхронный итерируемый (iterable) объект может вызывать асинхронный код внутри его iter-метода (`__aiter__()`), а асинхронный итератор может выполнять асинхронный код внутри его next-метода (`__anext__()`).

In [11]:
async def ticker(to):
    for i in range(to):
        yield i
        await asyncio.sleep(1)

In [12]:
async def main():
    async for i in ticker(3):
        print(i)

        
asyncio.run(main())

0
1
2


## Motor

In [2]:
from motor.motor_asyncio import AsyncIOMotorClient

client = AsyncIOMotorClient('localhost', 27017)

In [3]:
async def insert(db):
    document = {'key': 'value'}
    await db.test_collection.insert_one(document)

In [4]:
async def find(db):
    async for document in db.test_collection.find({'key': 'value'}):
        print(document)

In [5]:
await insert(client.test_database)
await find(client.test_database)

{'_id': ObjectId('5bd600a7553512569a3d836a'), 'key': 'value'}


## More complex web-server with database

In [6]:
import asyncio
from aiohttp import web
from motor.motor_asyncio import AsyncIOMotorClient

In [7]:
async def setup_db():
    client = AsyncIOMotorClient('localhost', 27017)
    db = client.test_database

    await db.pages.drop()
    
    html = '<html><body>{}</body></html>'
    await db.pages.insert_one({'_id': 'page-one',
                               'body': html.format('Hello!')})
    await db.pages.insert_one({'_id': 'page-two',
                               'body': html.format('Goodbye.')})

    return db

In [8]:
async def page_handler(request):
    # If the visitor gets "/pages/page-one", then page_name is "page-one"
    page_name = request.match_info.get('page_name')
    db = request.app['db']  # Retrieve database handle
    document = await db.pages.find_one(page_name)  # Find page by id

    if not document:
        return web.HTTPNotFound(text=f'No page named {page_name}')

    return web.Response(body=document['body'].encode(),
                        content_type='text/html')

In [None]:
async def create_app():
    app = web.Application()
    app['db'] = await setup_db()  # Add database handle
    app.router.add_get('/pages/{page_name}', page_handler)  # Add page handler
    return app

web.run_app(create_app())

# Спасибо за внимание!