# Занятие №1. Параллельная обработка данных на чистом Python

На занятии мы воспроизвели на Python паттерны, которые Apache Spark использует «под капотом». Итоговая программа читает большой файл с датасетом, фильтрует нужные события, группирует их в пакеты и обрабатывает параллельно в нескольких процессах.

**Такой подход решает две проблемы:**
- файл с событиями может весить десятки гигабайт; если загрузить его целиком в список Python, программа упадёт с ошибкой `MemoryError` или начнёт использовать файл подкачки `swap`, что сильно замедлит работу;
- интепретатор Python выполняет байткод только в одном потоке одновременно, даже если в компьютере, например, восемь ядер, обычная однопоточная программа нагрузит только одно из них, а остальные семь будут простаивать.

Финальный вариант программы состоит из несколько логических блоков, по которым мы разделим наши действия.

**Всего таких блоков восемь:**
1. **Прописываем импорты и настраиваем окружение.** Прежде чем писать логику обработки, настроим окружение. Программа использует несколько модулей стандартной библиотеки:

    - **functools** — содержит декораторы, которые позволяют работать с другими функциями. Из этого модуля нам понадобится декоратор `wraps`, который сохраняет метаданные функции, которую мы будем оборачивать;
    - **json** — парсит JSON-строки в словари Python;
    - **logging** — записывает сообщения о том, как выполняется программа; логирование поможет нам понять, что делает конкретный процесс;
    - **time** — измеряет время, за которое выполняются функции;
    - **collections.abc** — содержит абстрактные типы `Iterable` и `Iterator` для аннотаций, которые мы будем использовать;
    - **concurrent.futures** — содержит высокоуровневый интерфейс, который позволяет запускать задачи параллельно в нескольких процессах или потоках;
    - **pathlib** — предоставляет пути к файлам как к объектам, а не строкам.

Константа `WORKDIR` хранит путь к директории со скриптом. От неё мы будем строить пути к данным.

Логгер мы настраиваем так, чтобы каждое сообщение содержало имя процесса. Когда программа запустит несколько процессов параллельно, то сразу увидим, какой из них пишет в лог.

In [None]:
# Фрагмент программы №1

# Импортируем модуль, чтобы использовать декоратор wraps
import functools

# Импортируем модуль, чтобы парсить JSON-строки в словари
import json

# Импортируем модуль, чтобы логгер писал, какой процесс что делает
import logging

# Импортируем модуль, чтобы замерять, сколько секунд работает функция
import time

# Импортируем типы, чтобы аннотировать генераторы
from collections.abc import Iterable, Iterator

# Импортируем классы, чтобы запускать задачи в нескольких процессах параллельно
from concurrent.futures import ProcessPoolExecutor, as_completed

# Импортируем класс, чтобы работать с путями как с объектами
from pathlib import Path

# Сохраняем путь к директории со скриптом, чтобы строить относительные пути к данным
WORKDIR = Path(__file__).parent

# Настраиваем логгер: каждое сообщение показывает имя процесса
logging.basicConfig(level=logging.INFO, format="%(processName)s %(message)s")

# Создаём логгер для этого модуля, чтобы писать сообщения из нашего кода
logger = logging.getLogger(__name__)


2. **Используем декоратор `slowlog`**
    
    **Декоратор** — это функция, которая принимает другую функцию и возвращает её модифицированную версию. Декоратор оборачивает функцию и выполняет дополнительный код до или после неё.

    Декоратор `slowlog` измеряет, сколько времени выполняется функция, и пишет результат в лог. Параметр `threshold` задаёт порог в секундах. Если функция выполняется дольше, чем этот порог, то декоратор помечает её в логе как слишком медленную.

    В программе используется **фабрика декораторов** — это функция, которая возвращает декоратор. Такая конструкция нужна, чтобы передать параметр `threshold`. 

    **Давайте разберём её структуру:**
    1. **`slowlog(threshold)`** — это внешняя функция, которая принимает параметр `threshold` и возвращает декоратор `set_timer`.
    2. **`set_timer(func)`** — это сам декоратор, который принимает функцию и возвращает обёртку `wrapper`.
    3. **`wrapper(*args, **kwargs)`** — это обёртка, которая вызывает оригинальную функцию и логирует результат.
    4. **`@functools.wraps(func)`** — это декоратор, который копирует в обёртку метаданные оригинальной функции. Например, имя и документацию. У каждой функции в Python есть атрибут `__name__`, который хранит её имя. Без декоратора `wraps` обёртка `wrapper` потеряет имя оргинальной функции, то есть атрибут `wrapper.__name__` вернёт строку "wrapper" вместо настоящего имени.
    5. **`time.perf_counter()`** — это функция, которая использует монотонный счётчик процессора и возвращает текущее время в секундах время с максимальной точностью. Она лучше подходит, чтобы измерять короткие интервалы, чем функция `time.time()`, которая зависит от системных часов.
    6. **`try/finally`** — это блок, который гарантирует, что время запишется в лог даже если функция выбросит исключение.

In [None]:
# Фрагмент программы №2

# Определяем фабрику декораторов, которая принимает порог в секундах
def slowlog(threshold: float = 2.5):
    
    # Определяем сам декоратор, который принимает функцию
    def set_timer(func):
        
        # Копируем имя и документацию оригинальной функции в обёртку
        @functools.wraps(func)
        
        # Определяем обёртку, которая принимает любые аргументы оригинальной функции
        def wrapper(*args, **kwargs):
            
            # Запоминаем время до вызова функции
            start = time.perf_counter()
            
            # Блок try/finally гарантирует, что время запишется в лог, даже если функция выбросит исключение
            try:
                # Вызываем оригинальную функцию и сохраняем результат
                result = func(*args, **kwargs)
            finally:
                # Вычисляем, сколько секунд выполнялась функция
                delta = time.perf_counter() - start
                
                # Проверяем, превысило ли время порог
                if delta > threshold:
                    # Пишем в лог имя функции, время и пометку, что она слишком медленная
                    logger.info("Finished %s in %.3f seconds (too slow)", func.__name__, delta)
                else:
                    # Пишем в лог имя функции и время без пометки
                    logger.info("Finished %s in %.3f seconds", func.__name__, delta)
            
            # Возвращаем результат оригинальной функции
            return result
        
        # Возвращаем обёртку вместо оригинальной функции
        return wrapper
    
    # Возвращаем декоратор
    return set_timer
    

3. **Используем генератор `read_events`.**

    **Генератор** — это функция, которая использует ключевое слово `yield` вместо `return`. Генератор не выполняет всю работу сразу, он возвращает объект-итератор, который выдаёт по одному значению при каждом обращении.

    Функция `read_events` читает файл в формате **JSONL, или JSON Lines,** — это формат, в котором каждая строка файла является отдельным JSON-объектом. Такой формат удобно использовать для потоковой обработки, так как нам не нужно парсить весь файл целиком, чтобы получить первую запись.

    Генератор открывает файл и проходит по нему построчно. Каждую строку он парсит в словарь и отдаёт через `yield`. Важно отметить, что пока внешний код не запросит следующее значение, генератор стоит на паузе и не читает следующую строку. Такой подход называется **ленивым вычислением**. Он позволяет сэкономить оперативную память, так как в каждый момент памяти находится только одна строка файла, а не весь файл целиком.

    Аннотация `Iterator[dict]` означает, что функция возвращает итератор, который выдаёт словари.

In [None]:
# Фрагмент программы №3

# Определяем генератор, который принимает путь к файлу и возвращает итератор словарей
def read_events(path: Path) -> Iterator[dict]:
    
    # Открываем файл на чтение в кодировке UTF-8
    with open(path, "r", encoding="utf-8") as f:
        
        # Проходим по файлу построчно
        for line in f:
            
            # Парсим строку из JSON в словарь и отдаём через ключевое слово yield
            yield json.loads(line)
            

4. **Используем генератор `filter_events`.**

    Функция `filter_events` принимает итератор событий и возвращает новый итератор, который пропускает только события с нужным типом. Этот генератор тоже ленивый, так как он не создаёт список отфильтрованных событий в памяти. Вместо этого он берёт события из входного итератора по одному, проверяет условие и отдаёт подходящие дальше.

    Когда мы соединяем `read_events` и `filter_events` в цепочку, то получаем **пайплайн** — это последовательность преобразований, где каждое звено обрабатывает данные и передаёт следующие. При этом ни одно звено не хранит данные целиком.

In [None]:
# Фрагмент программы №4

# Определяем генератор, который принимает итератор событий и нужный тип события
def filter_events(events: Iterator[dict], wanted_event_type: str) -> Iterator[dict]:
    
    # Проходим по событиям из входного итератора по одному
    for event in events:
        
        # Проверяем, совпадает ли тип события с нужным
        if event["event_type"] == wanted_event_type:
            
            # Отдаём событие дальше, если тип совпал
            yield event
            

5. **Группируем потоки в пакеты.**

    **Батч** — это пакет элементов фиксированного размера.

    Когда программа передаёт данные в другой процесс, она сначала сериализует их, или превращает в байты. После чего отправляет, а процесс-получатель десереализует эти данные обратно. Эти операции занимают время. Если отправлять события по одному, то расходы на передачу данные перекроют время, которые мы получили благодаря параллельной обработки. Крупные пакеты выгоднее передавать, так как мы передаём один большой вместо тысяч маленьких.

    Функция `batcher` принимает итератор и размер батча. Она складывает элементы в список. Как только список достигает нужного размера, `batcher` отдаёт его через `yield` и начинает собирать следующий. После того, как цикл завершиться, в конце файла элементов может быть меньше, чем размер батча. Для этого мы используем условие `if batch`, чтобы проверить, так ли это, и отдать остаток элементов последним неполным пакетом.

    Аннтоация `Iterable[list[dict]]` говорит нам, что функция возвращает итерируемый объект, который выдаёт списки словарей.

In [None]:
# Фрагмент программы №5

# Определяем генератор, который принимает итератор и размер батча
def batcher(iterable: Iterable[dict], batch_size: int) -> Iterable[list[dict]]:
    
    # Создаём пустой список для накопления элементов
    batch = []
    
    # Проходим по элементам входного итератора
    for item in iterable:
        
        # Добавляем элемент в текущий батч
        batch.append(item)
        
        # Проверяем, достиг ли батч нужного размера
        if len(batch) == batch_size:
            
            # Отдаём заполненный батч через yield
            yield batch
            
            # Создаём новый пустой список для следующего батча
            batch = []
    
    # Проверяем, остались ли элементы после завершения цикла
    if batch:
        
        # Отдаём оставшиеся элементы последним неполным батчем
        yield batch
        

6. **Обрабатываем батч и имитируем CPU-bound операцию.**

    Функция `process_batch` имитирует тяжёлую вычислительную операцию. Когда мы вызываем `time.sleep(3)`, мы заставляем процесс ждать три секунды.

    Декоратор `@slowlog(5)` измеряет время, за сколько выполнится программа. Порог в пять секунд означает, что если функция выполняется дольше, лог пометит, что она медленная.

    Функция `process_batch` возвращает количество обработанных событий. Функция `main` суммирует эти значения, чтобы подсчитать общее число событий.

In [None]:
# Фрагмент программы №6

# Оборачиваем функцию декоратором slowlog с порогом 5 секунд
@slowlog(5)

# Определяем функцию, которая принимает батч и возвращает количество событий в нём
def process_batch(batch: list[dict]) -> int:
    
    # Имитируем тяжёлую вычислительную операцию, которая длится 3 секунды
    time.sleep(3)
    
    # Возвращаем количество обработанных событий
    return len(batch)
    

7. **Используем оркестрацию, чтобы связать все компоненты вместе.**

    Функция `main` собирает все компоненты в единый пайплайн и запускает параллельную обработку батчей.

    1. **Строим пайплайн, который состоит из цепочки генераторов:**
        1. `read_events(filename)` — возвращает итератор, который читает файл построчно.
        2. `filter_events(events, 'purchase')` — оборачивает предыдущий итератор и фильтрует события.
        3. `batcher(filtered_events, batch_size=50_000)` — оборачивает фильтр и группирует события в батчи по 50 000 штук. На этом этапе программа ещё не прочитала ни одну строку файла.

            Генераторы начнут работать только тогда, когда кто-то запросит у них данные, потому что они ленивые.
            
    2. **Инициализируем параллельное выполнение через `ProcessPoolExecutor`.**

        **`ProcessPoolExecutor`** — это класс из модуля `concurrent.futures`. Он создаёт пул процессов, то есть набор заранее запущенных процессов, которые ждут задачи. Когда мы отправляем задачу в пул, один из свободных процессов выполняет её. По умолчанию количество процессов в пуле равно числу ядер процессора.

        Мы используем процессы, а не потоки, потому что в Python есть **GIL, или Global Interpreter Lock, то есть глобальная блокировка интерпретатора.** GIL не позволяет нескольким потокам выполнять байт-код одновременно. Потоки помогают только для I/O-bound-задачами, которые связаны, например, с сетью, где поток большую часть времени просто ждёт. Для CPU-bound задач, в которых много вычислений, нужны отдельные процессы, каждый из которых имеет свой интерпретатор и GIL.

        Менеджер контекста `with ProcessPoolExecutor() as executor` автоматически завершит все процессы при выходе из блока.

    3. **Отправляем задачи и собираем результат**.

        Метод `executor.submit(process_batch, batch)` отправляет задачу в пул и сразу возвращает объект `Future`, который указывает на то, что результат обязательно будет. Эта задача выполянется в фоне, а основной код продолжает работать.

        Генератор списка `[executor.submit(process_batch, batch) for batch in batches]` итерируется по генератору батчей. В этот момент генераторы начинают читать файл, фильтровать и группировать данные. Каждый готовый батч сразу уходит в пул на обработку.

        Функция `as_completed(futures)` возвращает итератор, который выдаёт объекты по мере того, как они завершаются. Это значит, что какой процесс первый закончит работать, то первым и отдаст результат. То есть не нужно ждать, пока завершатся все задачи, чтобы начать обрабатывать результаты.

        Метод `future.result()` блокирует выполнение кода, пока задача не завершится, и возвращает результат. Если задача выбросила исключение, `result()` его пробросит.

In [None]:
# Фрагмент программы №7

# Оборачиваем функцию декоратором slowlog с порогом 20 секунд
@slowlog(20)

# Определяем главную функцию, которая связывает все компоненты
def main():
    
    # Строим путь к файлу с событиями относительно директории скрипта
    filename = WORKDIR / "data" / "events.jsonl"
    
    # Создаём генератор, который читает файл построчно
    events = read_events(filename)
    
    # Оборачиваем генератор фильтром, который пропускает только события с типом "purchase"
    filtered_events = filter_events(events, "purchase")
    
    # Оборачиваем фильтр генератором, который группирует события в батчи по 50 000 штук
    batches = batcher(filtered_events, batch_size=50_000)
    
    # Инициализируем счётчик обработанных событий
    total = 0
    
    # Создаём пул процессов, который автоматически завершится при выходе из блока
    with ProcessPoolExecutor() as executor:
        
        # Отправляем каждый батч в пул на обработку и собираем объекты Future в список
        futures = [executor.submit(process_batch, batch) for batch in batches]
        
        # Получаем результаты по мере завершения задач
        for future in as_completed(futures):
            
            # Прибавляем количество обработанных событий к общему счётчику
            total += future.result()
    
    # Пишем в лог общее количество обработанных событий
    logger.info("Total events: %d", total)
    

8. **Защищаемся от повторного импорта.**

    Условие `if __name__ == "__main__"` проверяет, что файл запущен напрямую, а не импортирован как модуль.

    Когда мы работаем с `ProcessPoolExecutor` это особенно важно. Когда пул создаёт новый процесс, он импортирует модуль заново. Если бы мы не использовали `if __name__ == "__main__"`, то каждый дочерний процесс пытался бы создать свой пул процессов, который создал бы ещё процессы, и так до бесконечности. 

In [None]:
# Фрагмент программы №8

# Проверяем, запущен ли файл напрямую, а не импортирован как модуль
if __name__ == "__main__":
    
    # Вызываем главную функцию
    main()
    