# Конкурентное выполнение корутин и использование библиотеки `aiofiles`

Мы рассмотрели понятие корутины в Python, её синтаксис, а также ключевые слова `async` и `await`, которые позволяют создавать асинхронные функции. Теперь познакомимся с конкурентным выполнением нескольких корутин и с библиотекой `aiofiles`, которая обеспечивает асинхронный ввод-вывод для работы с файлами.

Из этого текста ты узнаешь:
- как работает функция `asyncio.gather` и как её использовать;
- как конкурентно выполнять несколько корутин;
- как использовать библиотеку `aiofiles` для асинхронной работы с файлами.

Время чтения: ~20 минут.



## Функция `asyncio.gather`

`asyncio.gather` — ключевая функция библиотеки `asyncio`, предназначенная для конкурентного выполнения нескольких корутин. Она позволяет запускать несколько корутин одновременно и дожидаться их завершения, при этом возвращая результаты всех выполненных задач.

<!-- #### Основные моменты: -->
<!-- - **Параллельное выполнение**: `gather` позволяет выполнять корутины параллельно, то есть они будут работать "вместе", а не последовательно. -->
<!-- - **Ожидание всех задач**: Вызов функции `gather` завершится, когда завершатся все переданные ей корутины. -->
<!-- - **Результаты выполнения**: Она возвращает список результатов всех корутин, в том порядке, в котором они были переданы. -->

### Синтаксис:
```python
asyncio.gather(*coros)
```

- `*coros`: неограниченное количество корутин или асинхронных задач, которые нужно запустить.

### Особенности `asyncio.gather`

1. **Конкурентное выполнение**

 `gather` позволяет выполнять корутины как бы параллельно (так это выглядит для пользователя). Фактически выполнение задач чередуется. Это происходит из-за использования `await` внутри корутин: пока одни задачи в режиме ожидания, другие выполняются.

2. **Ожидание всех задач**

 Вызов функции `gather` завершится, когда завершатся все переданные ей корутины.

3. **Исключения**

 Если одна из корутин вызовет исключение, это оно распространится на все `gather`, и функция завершится с ошибкой. Однако завершённые корутины до этого момента не будут отменены.

4. **Результаты выполнения**:

 `gather` возвращает список результатов всех корутин в том порядке, в котором они были переданы.




<!-- ### Особенности `asyncio.gather`: -->
<!-- 1. **Конкурентное выполнение**: Несмотря на то, что задачи могут выполняться "параллельно", фактически выполнение задач чередуется. Это достигается за счёт использования `await` внутри корутин, которые позволяют другим задачам исполняться во время ожидания. -->
<!-- 2. **Возвращает результаты**: `gather` возвращает список результатов всех переданных задач, который можно использовать для дальнейшей обработки. -->
<!-- 3. **Исключения**: Если одна из корутин вызовет исключение, это исключение будет распространено на все `gather`, и оно завершится с ошибкой. Однако завершённые корутины до этого момента не будут отменены. -->



## Конкурентное выполнение корутин с использованием функции `asyncio.gather`

Асинхронное программирование позволяет выполнять несколько операций одновременно, что особенно полезно в случае сетевых запросов или длительных вычислений.

**Пример.** Есть задача собирать данные для веб-приложения с трёх разных API. Один возвращает текущие погодные условия, другой — информацию о текущем курсе валют, а третий — последние новости. Чтобы ускорить получение данных, можно сделать запросы к этим API параллельно.



In [None]:
import asyncio

# Асинхронная функция для получения данных о погоде
async def fetch_weather():
    print('Запрос данных о погоде...')
    await asyncio.sleep(2)  # имитация запроса
    return 'Текущая погода: 18°C, ясно'

# Асинхронная функция для получения курса валют
async def fetch_currency_rate():
    print('Запрос данных о курсе валют...')
    await asyncio.sleep(3)  # имитация запроса
    return 'Курс доллара: 75.5 RUB'

# Асинхронная функция для получения новостей
async def fetch_news():
    print('Запрос последних новостей...')
    await asyncio.sleep(4)  # имитация запроса
    return 'Последние новости: Новая экономическая реформа...'

# Основная асинхронная функция
async def main():
    weather_task = fetch_weather()
    currency_task = fetch_currency_rate()
    news_task = fetch_news()

    # Ожидаем завершения всех задач
    responses = await asyncio.gather(weather_task, currency_task, news_task)

    for response in responses:
        print(response)

# Запускаем асинхронную программу
await main()

Запрос данных о погоде...
Запрос данных о курсе валют...
Запрос последних новостей...
Текущая погода: 18°C, ясно
Курс доллара: 75.5 RUB
Последние новости: Новая экономическая реформа...


### Что происходит в этом коде
1. **Асинхронные функции:**
    - `fetch_weather()`, `fetch_currency_rate()` и `fetch_news()` симулируют асинхронные запросы к разным API. Каждая функция «засыпает» на определённое время (`await asyncio.sleep()`), имитируя задержку в получении данных с серверов.

2. **Функция `main()`:**
    - В функции `main()` параллельно запускается три задачи — по одной на каждый запрос (погода, курс валют, новости). Использование `asyncio.gather()` позволяет запустить все задачи одновременно и дождаться их завершения.

3. **Результаты:**
    - Программа не блокируется на время каждого запроса, а отправляет их одновременно. После того как все три задачи завершены, результаты выводятся на экран.

### Почему асинхронный подход выигрывает

Если делать запросы последовательно (сначала погода, затем валюты, потом новости), программа бы занимала в сумме **9 секунд** (2 + 3 + 4). Благодаря асинхронному выполнению с использованием `asyncio.gather()` программа выполнится всего за **4 секунды** (время самого длительного запроса).

Асинхронность в таких задачах позволяет не терять время на ожидание, что особенно важно для приложений, которые обращаются к нескольким удалённым источникам данных одновременно.

### Пример с обработкой ошибок

**Пример.** Нужно обработать несколько распространённых исключений, связанных с сетевыми запросами: `asyncio.TimeoutError` для таймаута и условное исключение для сброса соединения `ConnectionResetError`. Все остальные ошибки нужно перехватить с помощью базового `Exception`, чтобы не упустить неожиданные случаи.


In [None]:
import asyncio

# Имитация асинхронной функции для получения данных о погоде
async def fetch_weather():
    try:
        print('Запрос данных о погоде...')
        await asyncio.sleep(2)  # имитация запроса
        return 'Текущая погода: 18°C, ясно'
    except asyncio.TimeoutError:
        print('Ошибка: Превышено время ожидания при запросе данных о погоде')
    except ConnectionResetError:
        print('Ошибка: Сброс соединения при запросе данных о погоде')

# Имитация асинхронной функции для получения курса валют
async def fetch_currency_rate():
    try:
        print('Запрос данных о курсе валют...')
        await asyncio.sleep(3)  # имитация запроса
        raise asyncio.TimeoutError  # имитируем таймаут
    except asyncio.TimeoutError:
        print('Ошибка: Превышено время ожидания при запросе курса валют')
    except ConnectionResetError:
        print('Ошибка: Сброс соединения при запросе курса валют')

# Имитация асинхронной функции для получения новостей
async def fetch_news():
    try:
        print('Запрос последних новостей...')
        await asyncio.sleep(4)  # имитация запроса
        raise ConnectionResetError  # имитируем сброс соединения
        return 'Последние новости: Новая экономическая реформа...'
    except asyncio.TimeoutError:
        print('Ошибка: Превышено время ожидания при запросе новостей')
    except ConnectionResetError:
        print('Ошибка: Сброс соединения при запросе новостей')

# Основная асинхронная функция
async def main():
    weather_task = fetch_weather()
    currency_task = fetch_currency_rate()
    news_task = fetch_news()

    try:
        # Ожидаем завершения всех задач
        responses = await asyncio.gather(weather_task, currency_task, news_task)

        for response in responses:
            if response:
                print(response)
    except Exception as e:
        print(f'Произошла ошибка при выполнении задач: {e}')

# Запускаем асинхронную программу
await main()

Запрос данных о погоде...
Запрос данных о курсе валют...
Запрос последних новостей...
Ошибка: Превышено время ожидания при запросе курса валют
Ошибка: Сброс соединения при запросе новостей
Текущая погода: 18°C, ясно



1. **Обработка конкретных исключений**:
   - **`asyncio.TimeoutError`**: если запрос превышает лимит времени ожидания, выбрасывается таймаут. Мы обрабатываем эту ошибку отдельно, чтобы знать, что проблема именно в медленном соединении или сервере.
   - **`ConnectionResetError`**: это условное исключение в ситуации, когда соединение с сервером было неожиданно прервано во время передачи данных.
   - **`Exception`**: любая другая непредвиденная ошибка будет перехвачена и обработана с сообщением об ошибке.

2. **Имитация ошибок**:
   - В функции `fetch_currency_rate()` вызываем исключение `asyncio.TimeoutError` для демонстрации обработки таймаута.
   - В функции `fetch_news()` вызываем исключение `ConnectionResetError` для демонстрации обработки сброса соединения.

3. **Использование `asyncio.gather`**:
   - Функция `gather` запускает все корутины параллельно. Мы обрабатываем их ошибки в каждой отдельной корутине, чтобы одна ошибка не прервала выполнение других задач.

4. **Выводы программы**:
   - Программа выведет результаты только тех задач, которые завершились успешно, и сообщит об ошибках в других.


### Когда использовать `asyncio.gather`
- Нужно параллельно выполнить несколько задач, которые могут длительное время находиться в ожидании (например, сетевые запросы или ввод-вывод).
- Необходимо собрать результаты всех задач после их завершения.
- Важен порядок возвращаемых результатов, как в переданном списке корутин.

### Когда не стоит использовать `gather`
- Если задачи должны выполняться **последовательно**. В этом случае лучше использовать простой цикл `for`, который последовательно ожидает завершения каждой задачи.

## Библиотека `aiofiles`

Библиотека `aiofiles` предоставляет асинхронный интерфейс для работы с файлами, аналогичный стандартному модулю `open`. Она повышает производительность приложений и помогает избежать блокировки основного потока во время операций с файлами. В отличие от традиционных блокирующих операций, `aiofiles` позволяет продолжать выполнение других задач, таких как сетевые запросы, в то время как файл читается или записывается. Это особенно полезно в высоконагруженных системах, где важно управлять многозадачностью и оптимизировать использование ресурсов.

Библиотека поддерживает стандартные методы чтения и записи файлов, такие как `read()`, `readline()`, `write()`, `writelines()`, в асинхронном режиме с использованием ключевого слова `await`.


#### Установка `aiofiles`

```bash
pip install aiofiles
```



In [None]:
!pip install aiofiles

### Сравнение с `Pandas`

Хотя в `aiofiles` есть мощные инструменты для работы с файлами, на практике многие задачи, связанные с обработкой данных, решаются с помощью более специализированных библиотек, таких как `Pandas`. `Pandas` умеет эффективно читать и записывать данные в формате CSV, Excel, HDF5 и других, при этом обрабатывает большие наборы данных с минимальными усилиями:

```python
import pandas as pd

# Чтение данных
df = pd.read_csv('business_data.csv')

# Запись данных
df.to_csv('business_data_output.csv', index=False)
```

Этот код выполняет те же задачи, что и вышеописанные примеры с `aiofiles`, но делает это более эффективно и с меньшим количеством строк кода.

### Зачем тогда изучать `aiofiles`

Основная цель изучения — понять принципы асинхронного взаимодействия с файловой системой. Реальные задачи с чтением и записью файлов в асинхронных приложениях чаще всего решаются с помощью таких библиотек, как `aiohttp`, для работы с веб-запросами. Но для начала стоит научиться работать с асинхронными операциями в более простых сценариях, чтобы глубже понять механизм работы асинхронных функций.

## Пример работы с библиотекой `aiofiles`

Рассмотрим использование библиотеки `aiofiles`.



**Пример.**  Есть большой CSV-файл `sales_data_large.csv`, который содержит данные о продажах товаров в интернет-магазине.
Структура CSV:

```
order_id,product_name,quantity,price,date
1,Товар A,2,100.00,2024-10-20
2,Товар B,1,200.00,2024-10-21
3,Товар C,5,150.00,2024-10-22
4,Товар D,3,300.00,2024-10-23
```

Выполним следующие шаги:
1. Чтение данных из CSV-файла.
2. Обработка данных (например, расчёт общей суммы продаж).
3. Запись обработанных данных в новый CSV-файл.


In [None]:
import asyncio
import aiofiles
import csv

# Функция для чтения данных из CSV-файла
async def read_sales_data(file_path):
    sales_data = []
    async with aiofiles.open(file_path,'r') as f:
        # Читаем все строки файла
        content = await f.read()
        # Создаём DictReader с заголовком
        reader = csv.DictReader(content.splitlines())
        # Читаем данные в список
        for row in reader:
            sales_data.append(row)
    return sales_data

# Функция для записи обработанных данных в новый CSV-файл
async def write_sales_summary(file_path, summary):
    async with aiofiles.open(file_path, 'w') as f:
        writer = csv.writer(f)
        await f.write('total_sales  total_quantity\n')  # Заголовок
        await f.write(f'{summary["total_sales"]}  {summary["total_quantity"]}\n')

# Основная асинхронная функция
async def main():
    input_file = 'sales_data_large.csv'
    output_file = 'sales_summary.csv'

    # Чтение данных
    sales_data = await read_sales_data(input_file)
    print(f'Загружено {len(sales_data)} строк данных')

    # Обработка данных
    total_sales = 0
    total_quantity = 0
    for sale in sales_data:
        total_sales += float(sale['price']) * int(sale['quantity'])
        total_quantity += int(sale['quantity'])

    # Подготовка итогов
    summary = {
        'total_sales': total_sales,
        'total_quantity': total_quantity,
    }

    # Запись итогов в новый файл
    await write_sales_summary(output_file, summary)
    print(f'Результаты сохранены в {output_file}')

await main()


Загружено 100000 строк данных
Результаты сохранены в sales_summary.csv



- **Чтение данных:** `sales_data = await read_sales_data(input_file)` вызывает функцию `read_sales_data`, чтобы загрузить данные о продажах:
  - читаем содержимое файла целиком с помощью `await f.read()`;
  - создаём `DictReader`, передавая результат `content.splitlines()` для разделения содержимого на строки, что позволяет корректно интерпретировать заголовки.
- **Обработка данных:** в цикле `for sale in sales_data` происходит подсчёт общих продаж и количества товаров:
  - `total_sales` обновляется как произведение цены и количества каждого товара;
  - `total_quantity` увеличивается на количество товара.
- **Запись итогов:** вызывается функция `write_sales_summary`, чтобы сохранить результаты в выходной файл:
   - открываем файл для записи с помощью `aiofiles.open()`;
   - используем `csv.writer` для записи данных в формате CSV;
   - записываем заголовок и значения `total_sales` и `total_quantity` с помощью `await f.write`.


### Деление файла на чанки

При работе с большими файлами иногда возникает необходимость разделить файл на фрагменты, чтобы не загружать весь файл в память сразу. Это может быть полезно, например, при чтении по частям или стриминге данных.

> **Чанки** (англ. chunk) — это части, на которые был разделён исходный код.

Возьмём CSV-файл из примера выше и разберём чтение по чанкам (размер фрагмента 1024 байта):


In [None]:
import aiofiles
import asyncio

async def read_in_chunks(filename, chunk_size=1024):
    async with aiofiles.open(filename, 'r') as file:
        while True:
            chunk = await file.read(chunk_size)
            if not chunk:
                break
            print(chunk)

await read_in_chunks('sales_data_large.csv')

[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
95199,Товар M,4,119.5,2035-08-11 14:00:00
95200,Товар N,5,121.0,2035-08-11 15:00:00
95201,Товар O,1,100.0,2035-08-11 16:00:00
95202,Товар P,2,101.5,2035-08-11 17:00:00
95203,Товар Q,3,103.0,2035-08-11 18:
00:00
95204,Товар R,4,104.5,2035-08-11 19:00:00
95205,Товар S,5,106.0,2035-08-11 20:00:00
95206,Товар T,1,105.0,2035-08-11 21:00:00
95207,Товар U,2,106.5,2035-08-11 22:00:00
95208,Товар V,3,108.0,2035-08-11 23:00:00
95209,Товар W,4,109.5,2035-08-12 00:00:00
95210,Товар X,5,111.0,2035-08-12 01:00:00
95211,Товар Y,1,110.0,2035-08-12 02:00:00
95212,Товар Z,2,111.5,2035-08-12 03:00:00
95213,Товар A,3,113.0,2035-08-12 04:00:00
95214,Товар B,4,114.5,2035-08-12 05:00:00
95215,Товар C,5,116.0,2035-08-12 06:00:00
95216,Товар D,1,115.0,2035-08-12 07:00:00
95217,Товар E,2,116.5,2035-08-12 08:00:00
95218,Товар F,3,118.0,2035-08-12 09:00:00
95219,Товар G,4,119.5,2035-08-12 10:00:00
95220,Товар H,5,121.0,2035-08-12 11

#### Основные шаги:

1. Открытие файла асинхронно для чтения (`async with`), что позволяет не блокировать основной поток программы.
2. В цикле файл читается блоками по 1024 байта (`await file.read(chunk_size)`).
3. Каждый прочитанный блок выводится на экран. Когда файл полностью прочитан, цикл завершает работу.

### Обработка неполных строк

В коде выше может возникнуть проблема. Если строки в файле превысят размер чанка (1024 байта), то при чтении файла любая из строк может быть разбита на несколько частей, то есть разделена между двумя последовательнями чанками.  

Часть строки, которая попала в два чанка, может быть потеряна или неправильно обработана. Это важно учитывать при работе с текстовыми файлами.

Чтобы избежать таких ситуаций, нужно тщательно продумывать размер чанка или добавлять логику обработки строк.

Сравни новый скрипт, который позволит не разбивать строки, с предыдущим:


In [None]:
import aiofiles
import asyncio

async def read_in_chunks(filename, chunk_size=1024):
    async with aiofiles.open(filename, 'r') as file:
        partial_line = ''
        while True:
            chunk = await file.read(chunk_size)
            if not chunk:
                break
            partial_line += chunk
            lines = partial_line.split('\n')
            for line in lines[:-1]:
                print(line)  # Обработка полной строки
            partial_line = lines[-1]  # Неполная строка сохраняется для следующего чанка

await read_in_chunks('sales_data_large.csv')

[1;30;43mВыходные данные были обрезаны до нескольких последних строк (5000).[0m
95001,Товар W,1,100.0,2035-08-03 08:00:00
95002,Товар X,2,101.5,2035-08-03 09:00:00
95003,Товар Y,3,103.0,2035-08-03 10:00:00
95004,Товар Z,4,104.5,2035-08-03 11:00:00
95005,Товар A,5,106.0,2035-08-03 12:00:00
95006,Товар B,1,105.0,2035-08-03 13:00:00
95007,Товар C,2,106.5,2035-08-03 14:00:00
95008,Товар D,3,108.0,2035-08-03 15:00:00
95009,Товар E,4,109.5,2035-08-03 16:00:00
95010,Товар F,5,111.0,2035-08-03 17:00:00
95011,Товар G,1,110.0,2035-08-03 18:00:00
95012,Товар H,2,111.5,2035-08-03 19:00:00
95013,Товар I,3,113.0,2035-08-03 20:00:00
95014,Товар J,4,114.5,2035-08-03 21:00:00
95015,Товар K,5,116.0,2035-08-03 22:00:00
95016,Товар L,1,115.0,2035-08-03 23:00:00
95017,Товар M,2,116.5,2035-08-04 00:00:00
95018,Товар N,3,118.0,2035-08-04 01:00:00
95019,Товар O,4,119.5,2035-08-04 02:00:00
95020,Товар P,5,121.0,2035-08-04 03:00:00
95021,Товар Q,1,100.0,2035-08-04 04:00:00
95022,Товар R,2,101.5,2035-08-04 05:

В новом коде используется переменная `partial_line`, которая сохраняет часть строки, если она была разбита между чанками. Это делается для того, чтобы предотвратить потерю данных при разрыве строки посередине.

 ```python
   partial_line += chunk
   lines = partial_line.split('\n')
   for line in lines[:-1]:
       print(line)  # Обработка полной строки
   partial_line = lines[-1]  # Неполная строка сохраняется для следующего чанка
   ```
   
   <!-- В предыдущем коде эта проблема не учитывалась: строки выводились напрямую, и если строка была разбита между чанками, она выводилась частично, что могло привести к ошибкам при дальнейшей обработке. -->
  
   - Сначала весь текущий чанк добавляется к предыдущему неполному фрагменту строки (`partial_line += chunk`).
   - Затем происходит разделение полученных данных на строки по символу новой строки (`\n`).
   - Полные строки обрабатываются сразу (например, выводятся с помощью `print`), а последняя строка, которая может быть неполной, сохраняется в `partial_line`, чтобы продолжить обработку в следующем чанке.

<!-- 2. **Отсутствие потери данных при разрыве строки**: -->
   <!-- - Когда файл читается чанками, строки могут быть разделены между двумя последовательными чанками. В старом коде эта ситуация не обрабатывалась, и часть строки, которая попала в два чанка, могла быть потеряна или неправильно обработана. -->
   <!-- - В новом коде эта проблема решена за счёт сохранения неполной строки для дальнейшего добавления к следующему чанку. -->

### Результат

- **Предыдущий код:** читает и выводит данные чанками, но не учитывает, что строки могут быть разделены между чанками. Это может привести к ошибкам.
- **Новый код:** добавляет логику для корректной обработки разорванных строк, сохраняя часть строки, если она оказалась разделённой, и добавляя её к следующему чанку.

>**Важно.** Когда ты работаешь с большими объёмами данных или часто обращаешься к одним и тем же файлам, использование техники кэширования может значительно улучшить производительность. Это позволяет временно хранить части данных в памяти для быстрого доступа. В контексте чтения и записи файлов это можно реализовать через буферизацию, когда данные собираются в блоки (чанки), а затем записываются или читаются партиями.


## Заключение

<!-- Асинхронное программирование в Python с использованием библиотеки asyncio, функции `asyncio.gather` и библиотеки `aiofiles` предоставляет мощные инструменты для написания эффективного и отзывчивого кода. Понимание того, как использовать корутины для параллельного выполнения задач, а также асинхронного ввода-вывода, является ключевым аспектом современного программирования на Python. -->

<!-- С помощью этих концепций и инструментов ты сможешь создавать приложения, которые могут эффективно управлять сетевыми запросами, файловыми операциями и многими другими асинхронными задачами, значительно повышая производительность программ. -->

- Функция `asyncio.gather` позволяет запускать несколько корутин одновременно и дожидаться их завершения, при этом возвращает результаты всех выполненных задач.
- Библиотека `aiofiles` позволяет продолжать выполнение других задач, таких как сетевые запросы, в то время как файл читается или записывается.
- При работе с большими файлами иногда разделяют файл на фрагменты, или чанки, чтобы не загружать весь файл в память сразу.
- Когда файл читается чанками, строки могут быть разделены между двумя последовательными чанками. Чтобы избежать таких ситуаций, нужно тщательно продумывать размер чанка или добавлять логику обработки строк.
<!-- - Переменная `partial_line` сохраняет часть строки, если она была разбита между чанками. -->