# Асинхронное программирование в Python

https://medium.com/analytics-vidhya/asyncio-threading-and-multiprocessing-in-python-4f5ff6ca75e8

## Определения

__Поток__ - наименьшая единица обработки, которую может назначить ядро операционной системы. Несколько потоков могут исполняться внутри одного процесса.

![thread.svg](attachment:thread.svg)

__Процесс__ — экземпляр программы во время выполнения, независимый объект, которому выделены системные ресурсы.

Практическая разница между ними заключается в том, что процессы не делят между собой память (т.е. у каждого процесса свой контекст исполнения, и они не могут обращаться к переменным друг друга), а разные потоки могут взаимодействовать с одними и теми же ресурсами.

## GIL - Global Interpreter Lock

В многопоточном приложении сразу несколько потоков могут изменять счетчик ссылок на объект, а это может привести к его непредвиденному удалению. Блокировка счетчика ссылок отдельных объектов - плохой вариант, поскольку это может привести к взаимной блокировке двух объектов. Поэтому в питоне используется GIL - глобальная блокировка самого интерпретатора. Таким образом, многопоточные приложения всё равно исполняются последовательно.

# Общие концепты

# Многопоточность

Несмотря на последовательное исполнение всех потоков в интерпретаторе, многопоточность несет в себе смысл. Иногда в коде появляются долгие операции, которые приостанавливают собой всё остальное выполнение программы. Их нужно выносить в отдельный поток, чтобы остальная часть программы могла исполняться в прерываниях того потока.

Напишем простую однопоточную программу:

In [None]:
import logging
import time

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")


def delay(delay, message):
    logging.info(f"{message} received")
    time.sleep(delay)
    logging.info(f"Printing {message}")
    return message

def main():
    logging.info("Main started")
    delay(2, "TWO SECONDS DELAY")
    delay(3, "THREE SECONDS DELAY")
    logging.info("Main Ended")

main()

Запишем ту же логику с использованием модуля threading, который отвечает за работу с потоками. Получим 2 секунды выигрыша, поскольку прерывание исполнения и передача управления другому потоку происходит во время "сна" потока.

In [None]:
import threading

def main():
    logging.info("Main started")
    threads = [
        threading.Thread(target=delay, args=(3, "THREE SECONDS DELAY")),
        threading.Thread(target=delay, args=(2, "TWO SECONDS DELAY")),
    ]
    for thread in threads:
        thread.start()
    logging.info("А тут мы видим, что главный поток тоже продолжает исполняться")
    for thread in threads:
        thread.join() # waits for thread to complete its task
    logging.info("Main Ended")
    
main()

### Футуры

Создание потоков - достаточно затратная операция, поэтому под каждый вызов функции создавать свой поток - долго. Футуры позволяют создать пул потоков, в котором отработавшие потоки будут переиспользоваться в новых заданиях. В них хранится текущий результат выполнения какой-либо задачи.

In [None]:
import concurrent.futures as cf

def main():
    with cf.ThreadPoolExecutor(max_workers=2) as executor:
        future_to_mapping = {
            executor.submit(delay, 3, "THREE SECONDS DELAY"): "3 secs",
            executor.submit(delay, 2, "TWO SECONDS DELAY"): "2 secs",
        }
        for future in cf.as_completed(future_to_mapping):
            logging.info(f"{future.result()} Done")
            
main()

# Asyncio

Библиотека, которая появилась в Python 3.4 и позволяет создавать асинхронные приложения с минимальными усилиями. В asyncio используются три основные сущности:

1. __Корутины__ - специальные функции, от которых ожидается передача управления в цикл событий, из которого они были запущены.
2. __Цикл событий__ - корутина, которая управляет вызовом остальных корутин.
3. __Awaitable объект__ - объект, который умеет "ожидать" результат.

Корутины можно задавать в стиле генератора:

In [None]:
def grep(pattern):
    print("Searching for", pattern)
    while True:
        line = (yield)
        if pattern in line:
            print(line)
            
            
search = grep('coroutine')
next(search)
search.send("I love you")
search.send("Don't you love me?")
search.send("I love coroutines instead!")
search.close()

Но лучше для этого использовать ключевые слова `async def`, которые появились в Python 3.5.

In [None]:
import asyncio
import time


async def delay_message(delay, message):
    logging.info(f"{message} received")
    await asyncio.sleep(delay) # time.sleep is blocking call. Hence, it cannot be awaited and we have to use asyncio.sleep
    logging.info(f"Printing {message}")
    
async def main():
    logging.info("Main started")
    logging.info(f'Текущие зарегистрированные задания: {len(asyncio.all_tasks())}')
    logging.info("Создадим задания")
    task_1 = asyncio.create_task(delay_message(2, "TWO SECONDS AWAIT") )
    task_2 = asyncio.create_task(delay_message(3, "THREE SECONDS AWAIT"))
    logging.info(f'Текущие зарегистрированные задания: {len(asyncio.all_tasks())}')
    await task_1
    await task_2
    logging.info("Main Ended")


# asyncio.run(main()) # creates an event loop
    
await main()

Еще удобней использовать `asyncio.gather` для создания заданий:

In [None]:
async def main():
    logging.info("Main started")
    logging.info("Создаем несколько заданий с помощью asyncio.gather")
    await asyncio.gather(*[delay_message(i, f"WAITING {i} SECONDS") for i in range(6, 0, -1)])
    logging.info("Main Ended")
    

# asyncio.run(main()) # creates an event loop

await main()

## Блокировка ресурсов

Во время исполнения задание asyncio забирает себе все доступные ресурсы для процесса, в котором оно находится. Поэтому если сделать блокирующий вызов time.sleep, приостановится весь процесс, включая "параллельные" задачи:

In [None]:
import asyncio
import time


async def delay_message(delay, message):
    logging.info(f"{message} received")
    if '6' not in message:
        await asyncio.sleep(delay) # неблокирующий вызов - контекст передается в другой фрейм
    else:
        time.sleep(delay) # блокирующий вызов
    logging.info(f"Printing {message}")


async def main():
    logging.info("Main started")
    logging.info("Creating multiple tasks with asyncio.gather")
    await asyncio.gather(*[delay_message(i, f"WAITING {i} SECONDS") for i in range(10, 0, -1)])
    logging.info("Main Ended")


# asyncio.run(main())
await main()

## Блокировка потока (threading)

Объект `db` ниже - общий ресурс для двух потоков. В следующем примере мы пытаемся изменить значение `value` дважды, каждый раз по формуле `value ** 2 + 1`. Но еще до первого изменения контекст передается во второй вызов функции, в котором value всё еще равно изначальному нулю. Поэтому после обоих изменений значение изменится как после одного раза.

In [None]:
import concurrent.futures as cf
import time


class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # thread gets switched
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")


db = DbUpdate()
with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update), executor.submit(db.update)]
logging.info(f"Final value is {db.value}")

Для того, чтобы избежать таких ситуаций, на время изменения нужно заблокировать общий ресурс value на чтение и запись.

In [None]:
LOCK = threading.Lock()


class DbUpdate:
    def __init__(self):
        self.value = 0

    def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        time.sleep(2) # переключение потоков
        with LOCK:
            logging.info("Reading Value From Db")
            tmp = self.value**2 + 1
            logging.info("Updating Value")
            self.value = tmp
            logging.info("Update Finished")
        
db = DbUpdate()
with cf.ThreadPoolExecutor(max_workers=5) as executor:
    updates = [executor.submit(db.update) for _ in range(2)]
logging.info(f"Final value is {db.value}")

## Совместные ресурсы в asyncio

В asyncio о блокировках думать не нужно. Во время исполнения await'a контекст исполнения не прерывается. Поэтому такой код будет сразу работать так, как запланировано:

In [None]:
class DbUpdate:
    def __init__(self):
        self.value = 0

    async def update(self):
        logging.info("Update Started")
        logging.info("Sleeping")
        await asyncio.sleep(1)
        logging.info("Reading Value From Db")
        tmp = self.value**2 + 1
        logging.info("Updating Value")
        self.value = tmp
        logging.info("Update Finished")
        
async def main():
    db = DbUpdate()
    await asyncio.gather(*[db.update() for _ in range(2)])
    logging.info(f"Final value is {db.value}")
    
# asyncio.run(main())
await main()

### Deadlock

Тем не менее корутины asyncio могут оказаться в ситуации dead lock - когда одна корутина ожидает исполнения второй, а вторая - первой. Таких случаев нужно избегать в коде, поскольку это приводит к RecursionError.

In [None]:
async def foo():
    await boo()
    
async def boo():
    await foo()
    
async def main():
    await asyncio.gather(*[foo(), boo()])
    
# asyncio.run(main())
await main()

# Multiprocessing

Использование нескольких процессов разумно применять тогда, когда обрабатываются тяжелые для процессора операции. В следующем примере представлена сортировка слиянием очень больших списков. Рассмотрим преджде синхронный вариант. 

In [None]:
import concurrent.futures as cf
import logging
import math
import numpy as np
import time
import threading

logger_format = '%(asctime)s:%(threadName)s:%(message)s'
logging.basicConfig(format=logger_format, level=logging.INFO, datefmt="%H:%M:%S")

r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

logging.info("Starting Sorting")
for r_list in r_lists:
    _ = merge_sort(r_list)
logging.info("Sorting Completed")

И асинхронная версия. По умолчанию количество процессов равняется количеству ядер процессора на компьютере, но оптимальное количество процессов нужно подбирать эмпирически.

In [None]:
import math
import numpy as np


r_lists = [[np.random.randint(500000) for _ in range(30000)] for _ in range(1000)]

def merge(l_1, l_2):
    out = []
    key_1 = 0
    key_2 = 0
    for i in range(len(l_1) + len(l_2)):
        if l_1[key_1] < l_2[key_2]:
            out.append(l_1[key_1])
            key_1 += 1
            if key_1 == len(l_1):
                out = out + l_2[key_2:]
                break
        else:
            out.append(l_2[key_2])
            key_2 += 1
            if key_2 == len(l_2):
                out = out + l_1[key_1:]
                break
    return out

def merge_sort(l):
    if len(l) == 1:
        return l
    mid_point = math.floor((len(l) + 1) / 2)
    l_1, l_2 = merge_sort(l[:mid_point]), merge_sort(l[mid_point:])
    out = merge(l_1, l_2)
    del l_1, l_2
    return out

logging.info("Starting Sorting")
with cf.ProcessPoolExecutor() as executor:
    sorted_lists_futures = [executor.submit(merge_sort, r_list) for r_list in r_lists]
logging.info("Sorting Completed")

# Asyncio API

Рассмотрим использование библиотеки более подробно.

http://onreader.mdl.ru/UsingAsyncioPython3/content/Ch03.html

In [None]:
import time
import asyncio

async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

# 1. Прежде всего необходим экземпляр цикла событий. Если он уже запущен, мы можем его получить с помощью
# get_event_loop. Если нет - нужно создать новый
def get_or_create_loop():
    loop = asyncio.get_event_loop()
    if loop.is_closed():
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
    return loop

loop = get_or_create_loop()
# 2. Создаем задание. Этот код пока не исполняет корутину main, но регистрирует ее в цикле событий
loop.create_task(main())
# 3. Запускаем цикл событий на исполнение без критериев останова цикла
loop.run_forever()
# 4. Выберем все задания, относящиеся к данному циклу
pending = asyncio.Task.all_tasks(loop=loop)
# 5. Функция gather собирает все пока еще не законченные задания и оборачивает их в футуры
group = asyncio.gather(*pending, return_exceptions=True)
# 6. run_until_complete завершает все поданные задания
loop.run_until_complete(group)
# 7. останавливаем цикл событий
loop.close()

### Обработка блокирующих операций

Иногда внутри некоторого кода, который мы хотим исполнить асинхронно, попадаются блокирующие вызовы. Чтобы они не останавливали наш event loop, мы должны запускать их в отдельном executor:

In [None]:
async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(5)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(3)
    print(f"{time.ctime()} Hello from a thread!")

loop = get_or_create_loop()

loop.create_task(main())
loop.run_in_executor(None, blocking)
loop.run_forever()

pending = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()

Важный нюанс предыдущего кода состоит в том, что корутина main выполняется дольше, чем функция blocking. А что будет, если сделать наоборот?

In [None]:
async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(3)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(6)
    print(f"{time.ctime()} Hello from a thread!")


loop = get_or_create_loop()

loop.create_task(main())

loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()

Эта ошибка возникает из-за того, что функция `run_in_executor` не создает экземпляр Task. Соответственно, из `asyncio.Task.all_tasks()` его не возвращает, и `run_until_complete` не ждет завершения этой функции. Есть несколько вариантов, что с этим делать.

Вариант 1 - добавить новую корутину, которую мы будем регистрировать в цикле событий, в которой будет исполняться блокирующий код в новом executor'e.

In [None]:
async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(3)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(6)
    print(f"{time.ctime()} Hello from a thread!")

async def run_blocking():
    await loop.run_in_executor(None, blocking)

loop = get_or_create_loop()
loop.create_task(main())
loop.create_task(run_blocking())
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)
loop.close()

Вариант 2 - вручную добавить нашу футуру в цикл событий

In [None]:
async def main():
    print(f'{time.ctime()} Hello!')
    await asyncio.sleep(3)
    print(f'{time.ctime()} Goodbye!')
    loop.stop()

def blocking():
    time.sleep(6)
    print(f"{time.ctime()} Hello from a thread!")

loop = get_or_create_loop()
loop.create_task(main())
future = loop.run_in_executor(None, blocking)
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
for t in tasks:
    t.cancel()
group_tasks = asyncio.gather(*tasks, return_exceptions=True)
# вот тут
group = asyncio.gather(group_tasks, future)
loop.run_until_complete(group)
loop.close()

## Корутины

По способу работы корутины похожи на генераторы. Async def функция возвращает объект-корутину, так же, как функция с yield возвращает генератор:

In [None]:
main()

### Что исполняет цикл событий у корутины

Выполнение корутины также похоже на исполнение генератора: в конце вызывается StopIteration. А начало исполнения запускается методом `send` с аргументом `None`. Именно это и вызывает у корутины цикл событий, когда мы его запускаем либо ставим корутину на ожидание результата оператором await.

In [None]:
async def f():
    return "abc"

coroutine = f()
try:
    coroutine.send(None)
except StopIteration as e:
    print(e.value)

### Await

Новый поток запускается из цикла событий. Если нам нужно последовательно выполнять корутины в этом потоке, мы можем использовать ключевое слово await. Оно принимает единственный объект - корутину либо любой другой объект, реализующий интерфейс `__await__() -> Iterator`.

In [None]:
async def f():
    print("f")
    await asyncio.sleep(1.0)
    print("f end")
    loop.stop()
    return 123

async def main():
    print("main")
    result = await f()
    print("main end")
    return result


loop = get_or_create_loop()
loop.create_task(main())
loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=False)
loop.run_until_complete(group)


### Прекращение корутины

Внутрь корутины можно пробросить исключение, которое будет вызвано внутри корутины во время ее await. В asyncio предусмотрен специальный тип исключения, который используется для отмены исполнения корутины.

In [None]:
async def f():
    try:
        while True:
            await asyncio.sleep(0)
    except asyncio.CancelledError:
        print('I was cancelled!')
    else:
        return 111

coro = f()
coro.send(None)
coro.send(None)
coro.throw(asyncio.CancelledError)

## Асинхронные менеджеры контекста

используются тогда, когда `__enter__` и/или `__exit__` должны использовать await в своем коде. Чтобы класс стал классом асинхронного менеджера контекста, он должен реализовать следующий интерфейс. Кстати, по аналогии с `@contextlib.contextmanager` для упрощенного создания асинхронных менеджеров контекста существует декоратор `@contextlib.asynccontextmanager`. Единственное отличие в применении - им оборачиваются корутины.

In [None]:
class Connection:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        
    async def __aenter__(self):
        self.conn = await get_conn(self.host, self.port)
        return conn
    
    async def __aexit__(self, exc_type, exc, tb):
        await self.conn.close()

async with Connection('localhost', 9001) as conn:
    <do stuff with conn>

## Асинхронный итератор и генератор (async for)

Мы можем написать итератор или генератор, итерироваться по которому можно асинхронно. Для этого реализуется следующий интерфейс:
`__aiter__` и `__anext__`.

In [None]:
async def mygen(u: int = 10):
    i = 0
    while i < u:
        yield i
        i += 1
        await asyncio.sleep(2)

        
async def main():
    async for i in mygen(15):
        print(i)
        loop.stop()

loop = get_or_create_loop()

loop.create_task(main())

loop.run_forever()
tasks = asyncio.Task.all_tasks(loop=loop)
group = asyncio.gather(*tasks, return_exceptions=True)
loop.run_until_complete(group)
loop.close()

Но асинхронность здесь заключается не в изменении порядка следования элементов, по которым мы итерируемся, а в том, что параллельно с итерациями может исполняться другой асинхронный код.

In [None]:
async def mygen(u: int = 10):
    i = 0
    while i < u:
        yield i
        i += 1
        await asyncio.sleep(1)
    loop.stop()

        
async def main():
    async for i in mygen(15):
        print(i)
        
        
async def sth_else():
    for j in 'abcdefghjklmnop':
        print(j)
        await asyncio.sleep(0.5)

loop = get_or_create_loop()

t1 = loop.create_task(main())
t2 = loop.create_task(sth_else())

loop.run_forever()
group = asyncio.gather(t1, t2, return_exceptions=True)
loop.run_until_complete(group)
loop.stop()
loop.close()

Async for можно также исползовать в генераторах коллекций

In [None]:
async def doubler(n):
    for i in range(n):
        yield i, i * 2
        await asyncio.sleep(0.1)
        
        
async def main():
    result = [x async for x in doubler(5)]
    print(result)
    
    result = {x: y async for x, y in doubler(3)}
    print(result)
    
    result = {x async for x in doubler(3)}
    print(result)
    
get_or_create_loop().run_until_complete(main())

# Asyncio и http-запросы

Для работы с http асинхронно есть библиотека aiohttp.

In [None]:
!pip install aiohttp 

In [None]:
import aiohttp
import asyncio

async def main():
    print("before 1st request")

    async with aiohttp.request('GET', "http://python.org") as response:
        print("Python.org:", response.status)
        
    print("before 2nd request")
        
    async with aiohttp.request('GET', "http://microsoft.com") as response:
        print("microsoft.com:", response.status)

loop = get_or_create_loop()
loop.run_until_complete(main())

# await main()

## Задание

Написать скрипт, который асинхронно скачивает заданные в списке файлы.