# Асинхронное программирование на Python. Библиотека [asyncio](https://docs.python.org/3/library/asyncio.html) #

Слыша слова "асинхронное программирование" в основном представляется, что имея одну большую задачу мы хотим разбить ее на несколько небольших кусочков и выполнить их независимо друг от друга, а после склеить результаты и получить полное решение.

Однако как же это правильно реализовать на самом деле?

Задачи, которые перед нами возникают принято делить на 2 типа:

1. **CPU-bound**

    Вычислительные задачи, которые в основном зависят от ресурсов процессора. Примерами таких задач могут служить математичекие вычисления, майнинг криптовалют или взлом шифров по ключу с помощью подбора простых множителей.

* **I/O-bound**

    Это тип задач, которые которые в основном зависят от скорости ввода/вывода информации (input/output)

В зависимости от типа задачи используют разные подходы к её делению на подзадачи.


## CPU-bound задачи

Рассмотрим функцию, которая считает долго возводит число в квадрат :

In [2]:
import time

def slow_square(x):
    time.sleep(1)
    print(x)
    return x**2

Посчитаем с помощью этой функции квадраты натуральных чисел до пяти, а так же посчитаем время работы программы:

In [2]:
start_time = time.time()

for num in range(5):
    print(slow_square(num))
    
time.time() - start_time    

0
0
1
1
2
4
3
9
4
16


5.008170127868652

Получилось слишком долго :( 

Можем сделать быстрее, ведь вычисления не зависят от результатов предыдущих - создадим 5 разных процессов, в каждом из которых запустим выполнение функции.

Для этого воспользуемся классом `Process` из модуля `multiprocessing`:

In [3]:
from multiprocessing import Process

start_time = time.time()
process_lst = []
for i in range(5):
    process_lst.append(Process(target=slow_square, args=(i,)))
    process_lst[i].start()
        
for i in range(5):    
    process_lst[i].join()
    
print(time.time() - start_time)

1
0
2
3
4
1.0951440334320068


Видно, что мы победили проблему - мы сократили время подсчета до времени исполнения одной функции. 

Можем сделать то же самое, но удобнее с помощью класса `Pool`, который делает ту же работу, только удобнее:

In [4]:
from multiprocessing import Pool

pool = Pool(processes=5)
result = pool.map(slow_square, [1, 2, 3, 4, 5])

print(result)

13254

[1, 4, 9, 16, 25]





Process ForkPoolWorker-8:
Process ForkPoolWorker-6:
Process ForkPoolWorker-7:
Process ForkPoolWorker-10:
Process ForkPoolWorker-9:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
  File "/home/zetman13/miniconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/zetman13/miniconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/zetman13/miniconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/zetman13/miniconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/zetman13/miniconda3/lib/python3.9/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/home/zetman13/miniconda3/lib/python3.9/multiprocessing/process.py", line 108, in run
    self._target(*se

У работы с процессами есть свои минусы: 

* Это затратно
* Необходима синхронизация 

Можно попробовать подход решать задачи с помощью потоков, однако в Python есть одно существенное неудобство при работе с ними - это GIL. Однако, в питоне все же есть библиотека *threading* для работы с потоками.

То, что мы делали в примерах до этого, можно понимать как разделение множества задач между работниками, при чем если один работник получил 2 задачи он будет выполнять их последовательно.

## IO-bound задачи

Сейчас же перейдем к немного другой концепции - будем распределять множество задач одному работнику, причем пока одна из задач не требует непосредственного его участия он начинает делать следующую.

Именно эта концепция и лежит в основе асинхронного решения I/O-bound задач. Множество задач внутри которых есть какое-либо ожидание, например ответа от сервера, запускаются по очереди, причем пока одна задача находится в режиме ожидания другая начинает работать. Стоит отметить, что все происходит в одном потоке. 

**Как же будет устроена работа асинхронного кода?** 

У нас будут функции, которые умеют засыпать и просыпаться в нужное время, а так же обработчик (*event loop*). Мы собираем функции в обработчик. Когда одна из функций засыпает управление получает обработчик, после чего отдает управление очередной функции.

Рассмотрим простейший пример:

In [3]:
queue = []

def counter():
    cnt = 0
    while True:
        print(cnt)
        cnt += 1
        yield    
        
        
def printer():
    cnt = 1
    while True:
        if cnt%3 == 0:
            print("Go!")
        cnt += 1
        yield


g1 = counter()
queue.append(g1)

g2 = printer()
queue.append(g2)

i = 0
while True:
    g = queue.pop(0)
    time.sleep(0.5)
    next(g)
    queue.append(g)
    i += 1
    if i == 10: break

0
1
2
Go!
3
4


В Python много библиотек для асинхронного программирования, наиболее популярными являются Tornado, Asyncio и Gevent. Давайте посмотрим, как работает Asyncio.

## Asyncio ##

In [5]:
import asyncio

Объект корутины (асинхронная функция) объявляется с помощью `async def` и исполняется с помощью `await`: 

In [6]:
async def rocket():
    print(1)
    await asyncio.sleep(1)
    print(2)
    await asyncio.sleep(1)
    print(3)
    await asyncio.sleep(1)
    print("Go!")
    
await rocket()

1
2
3
Go!


Заметим, что просто написав rocket() у нас ничего не выполнится. 

In [8]:
rocket()

<coroutine object rocket at 0x7f97fc21b040>

Разберем следующий пример:

In [8]:
async def say_after(delay, msg):
    await asyncio.sleep(delay)
    print(msg)

async def main():
    started = time.time()

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"Time: {time.time() - started}")
    
await main()

hello
world
Time: 3.007875919342041


Хм, что-то не то, ждем все три секунды, где же профит?

Но для начала несколько слов про `Task()`.

`Task()` - это объект, который запускает корутину. Объект `Task` используется для запуска корутин в циклах событий при помощи оператора `await`. 

Циклы событий используют **совместное планирование**. Другими словами, цикл событий запускает одну задачу за раз. Пока объект задачи Task ожидает готовности, цикл событий запускает другие задачи, обратные вызовы или выполняет операции ввода-вывода.

Теперь создадим задачи и посмотрим, исправило ли эту проблему :

In [14]:
async def main():
    task1 = asyncio.create_task(  # в этот момент задача была запланирована и вскоре начнет выполняться
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    started = time.time()

    await task1  # а здесь мы лишь дожидаемся, пока задача завершится
    await task2  # если закомментить строку выше, обе задачи все равно выполняться, поскольку task1 отработает раньше task2
    
    print(f"Time: {time.time() - started}")
    
await main()

hello
world
Time: 2.0049421787261963


Ура !! Помогло !!

Рассмотрим некоторые методы класса `Task()`:

In [25]:
task = asyncio.create_task(
    say_after(1, 'hello')
)

await task

hello


Метод `Task.cancelled()` возвращает `True`, если задача `Task` отменена.

Задача отменяется, когда отмена была запрошена с помощью метода `Task.cancel()` и обернутая сопрограмма распространила переданное в нее исключение `asyncio.CancelledError`.

In [14]:
task.cancelled()

False

Метод `Task.done()` возвращает `True`, если задача `Task` выполнена.

Задача считается выполненной, когда обернутая сопрограмма либо отработала и вернула результат, либо вызвала исключение, либо была отменена.

In [16]:
task.done()

True

In [15]:
long_task = asyncio.create_task(
    say_after(10, 'hello')
)

In [21]:
long_task.done()

True

Метод `Task.set_name()` устанавливает имя для задачи `Task`. Аргументом значения может быть любой объект, который затем преобразуется в строку.

In [None]:
task.set_name(123)  # начиная с python 3.8

Метод `Task.get_name()` возвращает имя name задачи Task. Если имя задачи Task не было явно установлено, то по умолчанию оно генерируется во время создания:

In [None]:
task.get_name()  # начиная с python 3.8

Так же важно знать про объекты `Future`. Класс `asyncio.Future()` представляет собой конечный результат асинхронной операции. Класс `Task()` является подклассом класса `Future()`. 

### Функция gather ###

Функция `gather()` одновременно запускает объекты, переданные в функцию. Рассмотрим это на примере - мы хотим подключаться к пользователю и отправлять ему тестовое сообщение:

In [28]:
id_list = [1, 2, 3, 4]

async def connect_to_user(usr_id):
    print(f'Подключаюсь к {usr_id}')
    await asyncio.sleep(1)
    print(f'Отправляю команду test пользователю {usr_id}')
    await asyncio.sleep(1)


async def send_msg(id_list):
    coroutines = map(connect_to_user, id_list)
    result = await asyncio.gather(*coroutines, return_exceptions=False )
    return result

res = await send_msg(id_list)

Подключаюсь к 1
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 4
Отправляю команду test пользователю 1
Отправляю команду test пользователю 2
Отправляю команду test пользователю 3
Отправляю команду test пользователю 4


А что произойдет, если попытка подключения окажется неудачной и выбросится исключение? Как мы будем его обрабатывать? 

Для этого есть аргумент `return_exceptions`:
- `return_exceptions=False` (по умолчанию) - первое появившееся исключение, немедленно распространяется на ту задачу, в которой оно возникло в момент ожидания `asyncio.gather()`, при этом другие объекты в последовательности не будут отменены и продолжат выполнение.
- `return_exceptions=True` - исключения обрабатываются так же, как успешные результаты и передаются в совокупный список результатов.

In [24]:
async def connect_to_user(usr_id):
    print(f'Подключаюсь к {usr_id}')
    if usr_id == 4:
        raise OSError(f'Не могу подключиться к {usr_id}')
    await asyncio.sleep(1)
    print(f'Отправляю команду test пользователю {usr_id}')
    await asyncio.sleep(1)

    
async def send_msg(id_list, return_ex = False):
    coroutines = map(connect_to_user, id_list)
    result = await asyncio.gather(*coroutines,return_exceptions=return_ex)
    return result

await send_msg(id_list, False)

Подключаюсь к 1
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 4


OSError: Не могу подключиться к 4

Отправляю команду test пользователю 1
Отправляю команду test пользователю 2
Отправляю команду test пользователю 3


In [25]:
await send_msg(id_list, True)

Подключаюсь к 1
Подключаюсь к 2
Подключаюсь к 3
Подключаюсь к 4
Отправляю команду test пользователю 1
Отправляю команду test пользователю 2
Отправляю команду test пользователю 3


[None, None, None, OSError('Не могу подключиться к 4')]

### Функция shield ###

Функция `shield` защищает задачи от отмены методом `Task.cancel()`:

In [26]:
async def start_task():
    print('Старт задачи...')
    await asyncio.sleep(2)
    print('А что со мной хотели сделать?')

async def cancel(some_task):
    await asyncio.sleep(0.5)
    some_task.cancel()
    print('Отмена!')

async def main():
    real_task = asyncio.create_task(start_task())
    shield = asyncio.shield(real_task)
    asyncio.create_task(cancel(shield))
    await real_task


await main()

Старт задачи...
Отмена!
А что со мной хотели сделать?


### Функция wait_for ###

Следит за тем, чтобы корутина не выполнялась больше заданного времени. Если истекает таймаут, то отменяет задачу и бросает `TimeoutError`.

Если необходимо избежать отмены задачи, то лучше обренуть в `shield()`.

In [28]:
async def fast_func():
    await asyncio.sleep(360)
    print('Да не долго я работаю')

async def main():
    try:
        await asyncio.wait_for(fast_func(), timeout=5.0)
    except asyncio.TimeoutError:
        print('Какая-то не очень быстрая ...')

await main()

Какая-то не очень быстрая ...


### Функция wait ###

Выполняет корутины до таймаута или до параметра функции `return_when`. 

Возваращает 2 множества корутин: выполненных и нет.

In [29]:
async def foo():
    await asyncio.sleep(1)
    print('hello ...')
    
async def bar():
    await asyncio.sleep(2)
    print('... world!')
    
print_hello = asyncio.create_task(foo())
print_world = asyncio.create_task(bar())

done, pending = await asyncio.wait({print_hello,print_world}, return_when=asyncio.FIRST_COMPLETED)

print(done)
print(pending)

hello ...
{<Task finished coro=<async-def-wrapper.<locals>.foo() done, defined at /var/folders/y4/cg33f0j16psf49d79blwc81srjjwcd/T/ipykernel_39409/2465487325.py:4> result=None>}
{<Task pending coro=<async-def-wrapper.<locals>.bar() running at /var/folders/y4/cg33f0j16psf49d79blwc81srjjwcd/T/ipykernel_39409/2465487325.py:9> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7feb22f6a990>()]>>}


In [30]:
print_hello = asyncio.create_task(foo())
print_world = asyncio.create_task(bar())

done, pending = await asyncio.wait({print_hello,print_world}, timeout=1.5)

print(done)
print(pending)

hello ...
{<Task finished coro=<async-def-wrapper.<locals>.foo() done, defined at /var/folders/y4/cg33f0j16psf49d79blwc81srjjwcd/T/ipykernel_39409/2465487325.py:4> result=None>}
{<Task pending coro=<async-def-wrapper.<locals>.bar() running at /var/folders/y4/cg33f0j16psf49d79blwc81srjjwcd/T/ipykernel_39409/2465487325.py:9> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7feb22f98610>()]>>}


In [31]:
print_hello = asyncio.create_task(foo())
print_world = asyncio.create_task(bar())

done, pending = await asyncio.wait({print_hello,print_world}, return_when=asyncio.ALL_COMPLETED)

print(done)
print(pending)

hello ...
... world!
{<Task finished coro=<async-def-wrapper.<locals>.bar() done, defined at /var/folders/y4/cg33f0j16psf49d79blwc81srjjwcd/T/ipykernel_39409/2465487325.py:8> result=None>, <Task finished coro=<async-def-wrapper.<locals>.foo() done, defined at /var/folders/y4/cg33f0j16psf49d79blwc81srjjwcd/T/ipykernel_39409/2465487325.py:4> result=None>}
set()


**Замечание** - `wait` в отличии от `wait_for` не отменяет задачи.

Полезные ссылки по теме семинара:
* Документация Asyncio : https://docs.python.org/3/library/asyncio.html
* Документация Tornado : https://www.tornadoweb.org/en/stable/guide/async.html
* Документация Gevent : http://www.gevent.org/index.html
* Документация Multiproccesing : https://docs.python.org/3/library/multiprocessing.html
* Документация Threading: https://docs.python.org/3/library/threading.html
* GIL: https://wiki.python.org/moin/GlobalInterpreterLock