# План семинарского ноутбука

- Установка библиотек в Colab (пример);
- Постановка задачи;
- Юнит-тесты в PyTest;
- Type hints и MyPy; 
- Валидация потоковых данных на основе кодогенерации.

# Постановка задачи

Сегодня мы рассмотрим типичную ситуацию из мира машинного обучения на больших данных: потоковую обработку т.н. **рекордов** (record, запись). Подробнее об этой задаче мы поговорим дальше в курсе, а пока что достаточно понимать, что ваши данные обычно хранятся в табличном виде, и таблицы эти обычно очень большие. Большая часть работ над этими таблицами сводится к тому, что вы считываете данные, профильтровываете их, (возможно) делаете join/concat с другими таблицами, преобразуете результат в нужный вид и записываете в другое место. **Задача: научиться делать это эффективно.**

Очевидное наблюдение заключается в том, что вам невыгодно делать эти преобразования на одной машине, т.к. обычно преобразования различных строк одной таблицы независимы, поэтому таблицу можно разбить на небольшие кусочки и отправить на отдельные машины -- т.н. **воркеры** (workers, обработчики), -- обработать там и сложить результат в нужное место. Это т.н. **map-reduce** парадигма, о которой мы будем много говорить в течении курса. Сейчас мы не будем касаться конкретных инструментов для решения map-reduce задач (хотя бы потому, что крупные компании в духе Google и Яндекса любят писать свои -- см. BigQuery, Nile over YT/YQL). 

---

**Сосредоточимся на узкой задаче: пусть нам дана история покупок (в виде идентификаторов товаров) пользователей нашего сервиса, и мы хотим для каждого пользователя определить топ-N самых популярных товаров** (например, чтобы напоминать купить забытый любимый хлеб).

**Более формально:** На вход подан поток записей со схемой `"user_id, purchased_items"`. Нужно вернуть поток записей со схемой `"user_id, top_items"`, где `'top_items'` -- словарь с ключами `'item_id', 'times_purchased'`. 

**Для решения этой задачи** нам придётся написать т.н. *редьюсер* (reducer): он принимает на вход пару (идентификатор пользователя, заказы пользователя) и выдаёт нужный record на выходе.

Главный источник фрустрации при решении таких задач заключается в том, что их исполнение занимает часы процессорного времени, и если в коде были смысловые ошибки, то, в силу утиной типизации, исполнение скорее всего не прервётся, а завершится и выдаст бессмысленный результат. 

---

Вначале мы напишем максимально простое решение данной задачи и якобы убедимся в её корректности при помощи юнит-тестов. Потом мы запустим код на реальных данных, он отработает и выдаст некорректный результат. Мы найдём причину и покажем, как информация о типах в виде **type hints** позволяет при помощи **MyPy** -- статического анализатора кода на Python --  найти ошибки ещё до того, как код будет запущен на реальных данных. 

## Установка библиотек

In [2]:
!pip install mypy
!pip install black
!pip install pytest



## Базовая реализация

Ниже приведена сравнительно очевидная функция, которая решает поставленную задачу. Она получает на вход сгруппированный поток записей в формате "ключ, список записей с данным ключом" и генерирует поток записей с нужной схемой.

Обратите внимание, что, несмотря на простоту задачи, опытный DS-разработчик сразу увидит в этой функции множество мест, где всё может пойти не так. Посмотрите на неё внимательно и подведите в конце занятия итог: сколько проблем вы смогли заметить? Программирование на языке Python кажется простым занятием до тех пор, пока с его помощью решаются тривиальные задачи. Как только контекст становится сложнее, достоинства языка Python становятся его недостатками, и в следующих нескольких разделах мы увидим, почему.

In [3]:
%%writefile item_counting_reducer.py
# -*- coding: utf-8 -*-
import collections
import json


def find_items_most_frequently_puchased_by_user(
    user_id, records, n_most_frequent
):
    """
    По истории заказов определить топ самых частых товаров пользователя.
    Вернуть как идентификаторы этих товаров, так и соответствующие количества. 

    :param user_id: Идентификатор пользователя

    :param records: JSON-записи, схема которых содержит 'purchased_items'
    -- список заказов, каждый из которых представляет собой список товаров.

    :param n_most_frequent: Сколько самых популярных товаров отобрать.

    :return: n_most_frequent JSON-записей со схемой ['user_id', 
    'top_items'], где top_items -- список словарей, каждый из которых
    имеет схему ['item_id', 'times_purchased'] 
    """

    # Python-словарь, который в качестве значения хранит 
    # количество вхождений ключа в данные.
    item_counter = collections.Counter()
    for record in records:
        # Записи передают в json-формате (обычно сжатом),
        # т.к. он самый простой и универсальный. Следовательно,
        # записи вначале нужно распаковать обратно в Python-словарь.
        unpacked_record = json.loads(record)
        for item_id in unpacked_record['purchased_items']:
            item_counter[item_id] += 1

    # Метод most_common позволяет получить самые частотные товары
    top_items = item_counter.most_common(n_most_frequent)

    # Теперь нужно сформировать запись и запаковать её обратно в JSON.
    # Т.к. обработка данных потоковая, нужно возвращать записи по мере
    # готовности. Это позволяет обрабатывать группы с разными user_id
    # независимо друг от друга (как и должно быть по смыслу). Такие
    # функции в Python принято называть генераторами. 
    yield json.dumps(
        dict(
            user_id=user_id,
            top_items=[
                dict(item_id=item_id, times_purchased=times_purchased) 
                for item_id, times_purchased in top_items
            ]
        )
    )

Overwriting item_counting_reducer.py


In [3]:
def my_range(n):
    counter = 0
    while counter < n:
        yield counter
        counter += 1

## PyTest

Отлично, мы написали какую-то функцию. Как убедиться, что она работает?

Нужно написать т.н. unit-тесты: небольшие проверочные утверждения, которые оценивают корректность отдельных частей программы (но не их взаимодействия друг с другом, это уже интеграционное тестирование).

Для решения этой задачи мы воспользуемся библиотекой PyTest (см. слайды занятия).



In [4]:
%%writefile test_item_counting_reducer.py
# -*- coding: utf-8 -*-

import collections
import itertools
import json
import pytest
import random
import string

from item_counting_reducer import find_items_most_frequently_puchased_by_user


def test_on_empty_input():
    """
    История покупок пользователя пуста. Функция должна вернуть один record 
    с пустым списком в поле 'top_items'. 

    Комментарий: важно писать тесты, которые проверяют крайние случаи (в духе
    пустого входа). Их обычно мало, и в них обычно ясно, что должно произойти
    (и что не должно происходить, что ещё более важно).
    """
    user_id = 'some_user'
    records = []

    reduced_stream = list(
        find_items_most_frequently_puchased_by_user(
            user_id, records, n_most_frequent=5
        )
    )

    assert len(reduced_stream) == 1
    
    unpacked_response = json.loads(reduced_stream[0])

    assert len(unpacked_response['top_items']) == 0
    

def test_all_identical_items():
    """
    История пользователя состоит из одного заказа, в котором было 
    только три одинаковых товара. Нужно найти самый частотный товар.

    Функция должна вернуть один record, в котором будет один товар в 
    'top_items', с правильным идентификатором ('some_id') 
    и правильным количеством (3).

    Комментарий: полезно писать тесты с простейшими примерами входа, т.к. они
    позволяют найти простейшие баги, не зависящие от данных. 
    """
    user_id = 'some_user'
    item_id = 'some_id'

    records = [
        json.dumps(
            dict(purchased_items=[item_id, item_id, item_id])
        ),
    ]

    reduced_stream = list(
        find_items_most_frequently_puchased_by_user(
            user_id, records, n_most_frequent=1
        )
    )

    assert len(reduced_stream) == 1
    
    unpacked_response = json.loads(reduced_stream[0])

    assert len(unpacked_response['top_items']) == 1
    
    assert (
        unpacked_response['top_items'][0]['item_id'] == item_id
        and unpacked_response['top_items'][0]['times_purchased'] == 3
    )


def test_on_random_items():
    """
    Сгенерировать 10 пользователей со случайным количеством товаров в истории
    покупок у каждого. Проверить, что возвращаемое значение функции корректно. 

    Тесты со случайными входами писать трудно (по сути, вам нужно научиться
    формировать корректные данные, а это нетривиальная задача, особенно в 
    research-задачах), но полезно, т.к. можно перебрать огромное количество
    входов -- сильно больше, чем можно подставить вручную. Если ошибка есть,
    и она неочевидна, то она обычно находится именно на случайных тестах. 
    
    Разумеется, вы не будете писать такие тесты на каждую небольшую функцию 
    в своём коде. Тем не менее, полезно понимать, как это выглядит и как 
    этим пользоваться, чтобы гарантировать корректность работы критического
    функционала вашего проекта.
    """

    # нужно для воспроизводимости результатов: если зафиксировать seed,
    # то из раза в раз будут генерироваться одни и те же данные (это так
    # называемая псевдослучайность даёт о себе знать).
    random.seed(42)  

    # фиксируем, сколько пользователей мы хотим
    n_users = 10
    # просто занумеруем их
    user_ids = [str(i) for i in range(n_users)]
    # и, на всякий случай, перемешаем номера (т.к. функция у нас без скрытого
    # состояния, результат её вычисления не должен зависеть от уже увиденных
    # идентификаторов)
    random.shuffle(user_ids)

    # сгенерируем 20 случайных идентификаторов товаров
    item_pool_size = 20
    # каждый идентификатор -- случайная строка длины 10
    item_id_length = 10
    item_pool = [
        ''.join(
            # берём случайные 10 латинских букв в верхнем регистре или цифр
            random.choices(
                string.ascii_uppercase + string.digits, 
                k=item_id_length
            )
        )
        for _ in range(item_pool_size)
    ]
    
    def random_split(lst):
        """
        Сгенерировать случайное разбиение списка (равновероятно).
        """
        out = [[]]
        for item in lst:
            # добавляем следующий элемент в текущий элемент разбиения
            out[-1].append(item)
            # если выпало True, то создаём новый элемент разбиения
            # и на следующем шаге будем добавлять элементы уже в него
            if random.choice((True, False)):
                out.append([])
        # возвращаем непустые блоки разбиения
        return [l for l in out if len(l)]

    grouped_stream = []  # входные данные
    true_answers = dict()  # ответы на этих данных

    for user_id in user_ids:
        # сколько товаров есть в истории покупок пользователя в сумме
        history_size = random.randint(0, 100)
        # генерируем сами товары (выбор с повторениями)
        historical_items = random.choices(
            item_pool, k=history_size
        )
        # сколько самых частотных товаров брать
        n_most_frequent = random.randint(1, history_size)
        # считаем честный ответ через counter на списке исторических товаров,
        # а не на заказах (разбиении этого списка)
        true_answers[user_id] = [
            dict(item_id=item_id, times_purchased=times_purchased)
            for item_id, times_purchased in 
            collections.Counter(historical_items).most_common(
                n_most_frequent
            )
        ]
        # распределяем товары по заказам случайным образом и пакуем это в JSON
        grouped_stream.append(
            (
                user_id,
                [
                    json.dumps(dict(purchased_items=purchased_items))
                    for purchased_items in random_split(historical_items)
                ]
            )
        )

    # itertools.chain позволяет из списка генераторов получить поток записей
    reduced_stream = itertools.chain(
        *[
            find_items_most_frequently_puchased_by_user(
                user_id, 
                records, 
                n_most_frequent=len(true_answers[user_id])
            )
            for user_id, records in grouped_stream
        ]
    )

    # бежим по потоку
    processed_user_ids = set()
    for record in reduced_stream:
        # распаковываем запись
        unpacked_response = json.loads(record)
        # проверяем, что ответ совпадает с истинным
        assert (
            true_answers[unpacked_response['user_id']] 
            == unpacked_response['top_items']
        )
        processed_user_ids.add(unpacked_response['user_id'])

    # проверяем, что никого не забыли
    assert len(processed_user_ids) == n_users


Overwriting test_item_counting_reducer.py


In [5]:
!python3 -m pytest test_item_counting_reducer.py

platform linux -- Python 3.7.10, pytest-3.6.4, py-1.10.0, pluggy-0.7.1
rootdir: /content, inifile:
plugins: typeguard-2.7.1
[1mcollecting 0 items                                                             [0m[1mcollecting 3 items                                                             [0m[1mcollected 3 items                                                              [0m

test_item_counting_reducer.py ...[36m                                        [100%][0m



Казалось бы, тесты пройдены (даже на случайных данных!), код работает, можно с чистой совестью делать pull request и использовать в продакшене. **Но как бы не так...**

## Проверка боем на реальных данных

Давайте посмотрим на то, что произойдёт, когда мы запустим код на реальных данных из сервиса. Конкретно здесь пример синтетический, но точно такая же ситуация имела место в реальности.

In [6]:
import subprocess


def download_file_from_gdrive(gdrive_file_id: str, outfile: str) -> None:
    """ Скачивает файл из Google Drive по ID """

    upload_cmd = (
        "wget --load-cookies /tmp/cookies.txt"
        " \"https://docs.google.com/uc?export=download&confirm=$("
        " wget --quiet --save-cookies /tmp/cookies.txt --keep-session-cookies"
        " --no-check-certificate 'https://docs.google.com/uc?export=download"
        f"&id={gdrive_file_id}'"
        " -O- | sed -rn 's/.*confirm=([0-9A-Za-z_]+).*/\\1\\n/p')"
        f"&id={gdrive_file_id}\" "
        f" -O {outfile} && rm -rf /tmp/cookies.txt"
    )
    subprocess.check_call(upload_cmd, shell=True)

PATH_TO_REAL_DATA = 'grouped_records_json.txt'
download_file_from_gdrive(
    gdrive_file_id='1hbWT_SdefPqpsAWV6bKMxSyYsRjBuWjf',
    outfile=PATH_TO_REAL_DATA
)

In [7]:
import json

grouped_stream = []
with open(PATH_TO_REAL_DATA, 'r') as infile:
    for record in infile:
        grouped_stream.append(json.loads(record.strip()))
print(len(grouped_stream))

1


In [8]:
import pprint

from item_counting_reducer import find_items_most_frequently_puchased_by_user

user_id, records = grouped_stream[0]
response = next(
    find_items_most_frequently_puchased_by_user(user_id, records, 3)
)
pprint.pprint(json.loads(response))

{'top_items': [{'item_id': 1, 'times_purchased': 3},
               {'item_id': '1', 'times_purchased': 1},
               {'item_id': '2', 'times_purchased': 1}],
 'user_id': 'user_1'}


**Произошла оказия!** У самого популярного товара ключ оказался вовсе не строкой, а числом! Более того, при написании всего кода выше мы неявно полагали, что идентификаторы товаров это строки. *Т.е. тесты написаны, код их проходит, но реализация всё равно неправильная!*

**Как же так вышло?** Обычно логика такова: товаров мало (тысячи, ну десятки тысяч), но их идентификаторы встречаются везде: в логах сервиса, в обучающих данных, в SQL-запросах... Почему бы просто не нумеровать товары подряд по мере расширения ассортимента? Числа занимают сильно меньше места, их проще воспринимать, чем те же SHA1-хэши, про которые мы говорили на прошлом занятии. 

Но десктопное приложение и приложение под iOS разрабатывали разные команды. Одной показалось логичнее хранить идентификаторы товаров как строки, другой -- как числа. В итоге при попытке собрать данные из обоих источников получился бессмысленный результат. Т.е. 1 и '1' обозначают один и тот же товар, но компьютер этого не понимает.

**А как этой проблемы можно было избежать?** Если вы думаете, что всегда будете знать, как устроены входные данные, вы заблуждаетесь. :) Нужно каким-то образом записать в коде наши интуитивные соображения о природе данных и бросать ошибку, если какое-то из условий нарушается.

И здесь на помощь приходят...

## Type hints и MyPy

*Введение в type hints и определение MyPy есть в лекционных слайдах. Здесь мы ими воспользуемся.*

Когда мы пишем код, мы примерно представляем себе, какие значения у каких переменных в каждый момент времени. Но ключевое слово здесь **примерно**. Почему бы не помочь себе и другим людям, которые будут поддерживать ваш код, и не записать эти предположения в явном виде?

In [9]:
%%writefile item_counting_reducer.py
# -*- coding: utf-8 -*-
import collections
import json
import typing as tp


def find_items_most_frequently_puchased_by_user(
    user_id: str, records: tp.Iterable[str], n_most_frequent: int
) -> tp.Generator[str, None, None]:
    """
    По истории заказов определить топ самых частых товаров пользователя.
    Вернуть как идентификаторы этих товаров, так и соответствующие количества. 

    :param user_id: Идентификатор пользователя

    :param records: JSON-записи, схема которых содержит 'purchased_items'
    -- список заказов, каждый из которых представляет собой список товаров.

    :param n_most_frequent: Сколько самых популярных товаров отобрать.

    :return: n_most_frequent JSON-записей со схемой ['user_id', 
    'top_items'], где top_items -- список словарей, каждый из которых
    имеет схему ['item_id', 'times_purchased'] 
    """

    # Python-словарь, который в качестве значения хранит 
    # количество вхождений ключа в данные. 
    # --------------------------------------------------
    # Предполагается, что ключ строковый.
    item_counter: tp.Counter[str] = collections.Counter()
    for record in records:
        # Записи передают в json-формате (обычно сжатом),
        # т.к. он самый простой и универсальный. Следовательно,
        # записи вначале нужно распаковать обратно в Python-словарь.
        # -----------------------------------------------------------
        # tp.Any, потому что мы не знаем полную схему record-а.
        unpacked_record: tp.Dict[str, tp.Any] = json.loads(record)
        for item_id in unpacked_record['purchased_items']:
            item_counter[item_id] += 1

    # Метод most_common позволяет получить самые частотные товары
    top_items: tp.List[tp.Tuple[str, int]] = (
        item_counter.most_common(n_most_frequent)
    )

    # Теперь нужно сформировать запись и запаковать её обратно в JSON.
    # Т.к. обработка данных потоковая, нужно возвращать записи по мере
    # готовности. Это позволяет обрабатывать группы с разными user_id
    # независимо друг от друга (как и должно быть по смыслу). Такие
    # функции в Python принято называть генераторами. 
    # ------------------------------------------------------------------------
    # tp.Union, потому что значения могут быть как строковыми, так и числовыми
    packed_top_items: tp.List[tp.Dict[str, tp.Union[str, int]]] = [
        dict(item_id=item_id, times_purchased=times_purchased) 
        for item_id, times_purchased in top_items
    ]
    yield json.dumps(dict(user_id=user_id, top_items=packed_top_items))

Overwriting item_counting_reducer.py


Отлично. **Теперь мы явно говорим, где какие типы должны быть в моменте**. Да, безусловно, код стал более "кучерявым" и менее pythonic. В маленьких скриптах и иллюстративных примерах так писать не стоит. Но в больших проектах очень рекомендуется.

Кроме того, **теперь PyCharm тоже понимает, что вы хотели сказать своим кодом**. Попробуйте открыть две версии этого файла в своей IDE и сравнить, где более точные подсказки, где проще открыть документацию и так далее.

---

Тем не менее, въедливый слушатель заметит, что **мы пока так и не решили проблему**. В самом деле: от того, что мы прописали везде type hints, код не стал работать корректно. MyPy при этом тоже ничего не смущает.

In [10]:
!mypy item_counting_reducer.py

[1m[32mSuccess: no issues found in 1 source file[m


Обратите внимание на строку, в которой происходит распаковывание record-а из JSON-строки в словарь. Согласитесь, `tp.Dict[str, tp.Any]` не особо помогает понять, а что, собственно, должно находиться в данных. MyPy здесь не поможет, потому что величины, помеченные как `tp.Any`, им игнорируются.

**Но постойте, у нас же есть схема входного потока!** Не целиком, конечно, но мы хотя бы знаем, какие поля точно *должны* быть в записях, и знаем, какого они должны быть типа.

Более того! На практике за схематизацию таблиц, которые вы считываете в поток записей, отвечает DE / DS / DA, который создал эти таблицы и поддерживает их в актуальном состоянии. Т.е. вы можете найти эти таблицы через интерфейс, посмотреть, какая там схема нужных вам полей. 

**Вооружённые этим знанием, вы можете написать классы для распаковки ваших записей сразу в нужные значения с нужными типами!**

In [11]:
%%writefile definitions.py
import dataclasses
import json
import typing as tp


@dataclasses.dataclass
class DesktopRecord:
    purchased_items: tp.List[int]

    @staticmethod
    def from_json_string(json_str: str) -> 'DesktopRecord':
        unpacked = json.loads(json_str)
        assert 'purchased_items' in unpacked
        assert isinstance(unpacked['purchased_items'], list)
        assert all(
            [
                isinstance(item_id, int) 
                for item_id in unpacked['purchased_items']
            ]
        )
        return DesktopRecord(purchased_items=unpacked['purchased_items'])


@dataclasses.dataclass
class IOSRecord:
    purchased_items: tp.List[str]
    
    @staticmethod
    def from_json_string(json_str: str) -> 'IOSRecord':
        unpacked = json.loads(json_str)
        assert 'purchased_items' in unpacked
        assert isinstance(unpacked['purchased_items'], list)
        assert all(
            [
                isinstance(item_id, str) 
                for item_id in unpacked['purchased_items']
            ]
        )
        return IOSRecord(purchased_items=unpacked['purchased_items'])


Writing definitions.py


Кроме того, теперь мы понимаем, что в реальных данных схема содержит ещё и очень полезное поле `app`, по которому поток данных разбиваются на несовместимые компоненты. Будем пользоваться им для того, чтобы правильно распаковать запись.

In [12]:
%%writefile item_counting_reducer.py
# -*- coding: utf-8 -*-
import collections
import json
import typing as tp

from definitions import DesktopRecord, IOSRecord


def find_items_most_frequently_puchased_by_user(
    user_id: str, records: tp.Iterable[str], n_most_frequent: int
) -> tp.Generator[str, None, None]:
    """
    По истории заказов определить топ самых частых товаров пользователя.
    Вернуть как идентификаторы этих товаров, так и соответствующие количества. 

    :param user_id: Идентификатор пользователя

    :param records: JSON-записи, схема которых содержит 'purchased_items'
    -- список заказов, каждый из которых представляет собой список товаров.

    :param n_most_frequent: Сколько самых популярных товаров отобрать.

    :return: n_most_frequent JSON-записей со схемой ['user_id', 
    'top_items'], где top_items -- список словарей, каждый из которых
    имеет схему ['item_id', 'times_purchased'] 
    """

    # Python-словарь, который в качестве значения хранит 
    # количество вхождений ключа в данные. 
    # --------------------------------------------------
    # Предполагается, что ключ строковый.
    item_counter: tp.Counter[str] = collections.Counter()
    for record in records:
        # Записи передают в json-формате (обычно сжатом),
        # т.к. он самый простой и универсальный. Следовательно,
        # записи вначале нужно распаковать обратно в Python-словарь.
        # -----------------------------------------------------------
        # tp.Any, потому что мы не знаем полную схему record-а.
        unpacked_record_dict: tp.Dict[str, tp.Any] = json.loads(record)
        # После распаковки наша запись может быть одного из двух этих типов
        unpacked_record: tp.Optional[tp.Union[DesktopRecord, IOSRecord]] = None 
        if unpacked_record_dict['app'] == 'Desktop':
            unpacked_record = DesktopRecord.from_json_string(record)
        elif unpacked_record_dict['app'] == 'iOS':
            unpacked_record = IOSRecord.from_json_string(record)
        # Если в поле 'app' пришло что-то, что мы не умеем парсить,
        # нужно кинуть ошибку. Лучше так, чем молча сделать новые ошибки.
        if unpacked_record is None:
            raise NotImplementedError(
                'Unknown data source.\n'
                'Expected \'app\' to be either \'Desktop\' or \'iOS\'.'
            )
        for item_id in unpacked_record.purchased_items:
            item_counter[item_id] += 1

    # Метод most_common позволяет получить самые частотные товары
    top_items: tp.List[tp.Tuple[str, int]] = (
        item_counter.most_common(n_most_frequent)
    )

    # Теперь нужно сформировать запись и запаковать её обратно в JSON.
    # Т.к. обработка данных потоковая, нужно возвращать записи по мере
    # готовности. Это позволяет обрабатывать группы с разными user_id
    # независимо друг от друга (как и должно быть по смыслу). Такие
    # функции в Python принято называть генераторами. 
    # ------------------------------------------------------------------------
    # tp.Union, потому что значения могут быть как строковыми, так и числовыми
    packed_top_items: tp.List[tp.Dict[str, tp.Union[str, int]]] = [
        dict(item_id=item_id, times_purchased=times_purchased) 
        for item_id, times_purchased in top_items
    ]
    yield json.dumps(dict(user_id=user_id, top_items=packed_top_items))

Overwriting item_counting_reducer.py


Снова запустим `mypy`:

In [13]:
!mypy item_counting_reducer.py

item_counting_reducer.py:54: [1m[31merror:[m Invalid index type [m[1m"Union[int, str]"[m for [m[1m"Counter[str]"[m; expected type [m[1m"str"[m[m
[1m[31mFound 1 error in 1 file (checked 1 source file)[m


Отлично! Ошибка обнаружена ещё до того, как мы запустили программу! 

**Но какой ценой.** 
1. *Во первых, нам пришлось написать какие-то странные классы. Не будем же мы их писать каждый раз? Учитывая, в скольки местах нам это нужно делать, нам будет просто некогда работать!*

    К счастью, в промышленной разработке есть такая вещь как **кодогенерация**. Вы просто описываете поля, которые должны быть в ваших классах, вместе с их типами, а всё остальное компьютер делает за вас: создаёт класс со всеми нужными проверками типов, с умением собираться из JSON-строки и словаря etc. Золотой инструмент, вам понравится. **В реальной жизни это делается проще, а результат выглядит сильно аккуратнее.**

    Кроме того, вы **обычно, всё же, не пишете редьюсеры руками**. Существующие решения для обработки табличных или потоковых данных -- в духе `pandas` и `Apache Spark` соответственно -- достаточно гибкие и выразительные, чтобы большинство задач можно было решать в пределах встроенных конструкций этих библиотек. **Проверка корректности преобразований типов при этом происходит автоматически.** 
    
    Но иногда бывает так, что этой гибкости **не хватает**. Разумеется, не в этом случае: приведенный пример заведомо максимально упрощён. Эту задачу можно решить сильно проще. Но, к сожалению, такая ситуация встречается достаточно часто, чтобы вы захотели научиться обрабатывать её правильно. 

2. *Во вторых, код стало сложнее читать. Разве весь смысль Zen of Python не в том, чтобы решать задачи максимально просто? Мы ведь могли просто явно приводить `item_id` к строке перед тем, как подать его в `Counter`, и проблема решена.*

    А вы уверены, что это единственная проблема? :) А что, если среди `item_id` есть пропуски? Вдруг там иногда `int`, иногда `str`, а иногда вообще `NoneType`? Тогда что? 
    
    В реальной жизни классы, в которые вы распаковываете свои record-ы, гораздо больше и сложнее. Они позволяют вам контролировать корректность данных прямо в момент обработки, чем **несказанно** упрощают отладку. Вы ведь не хотите понять, что ваш код написан неправильно после того, как он через сутки закончит обрабатывать 100 терабайт логов вашего сервиса, и потом вручную искать ошибку в этих данных? Правда ведь? 

--- 
В обычной жизни вы знаете схему входных потоков **до** того, как начнёте писать свой редьюсер. И классы для распаковки записей тоже можете написать **заранее** (часто они уже написаны за вас). Поэтому вы найдёте ошибку **до** того, как запустите код на реальных данных, а не после, как в этом примере. **Цените эту возможность.**

---
Теперь, когда `mypy` указал нам на ошибку в коде, мы можем её исправить. Этот код всё ещё не умеет работать с пропусками, но это отдельная большая тема, которую мы обсудим на отдельном занятии.



In [14]:
%%writefile item_counting_reducer.py
# -*- coding: utf-8 -*-
import collections
import json
import typing as tp

from definitions import DesktopRecord, IOSRecord


def find_items_most_frequently_puchased_by_user(
    user_id: str, records: tp.Iterable[str], n_most_frequent: int
) -> tp.Generator[str, None, None]:
    """
    По истории заказов определить топ самых частых товаров пользователя.
    Вернуть как идентификаторы этих товаров, так и соответствующие количества. 

    :param user_id: Идентификатор пользователя

    :param records: JSON-записи, схема которых содержит 'purchased_items'
    -- список заказов, каждый из которых представляет собой список товаров.

    :param n_most_frequent: Сколько самых популярных товаров отобрать.

    :return: n_most_frequent JSON-записей со схемой ['user_id', 
    'top_items'], где top_items -- список словарей, каждый из которых
    имеет схему ['item_id', 'times_purchased'] 
    """

    # Python-словарь, который в качестве значения хранит 
    # количество вхождений ключа в данные. 
    # --------------------------------------------------
    # Предполагается, что ключ строковый.
    item_counter: tp.Counter[str] = collections.Counter()
    for record in records:
        # Записи передают в json-формате (обычно сжатом),
        # т.к. он самый простой и универсальный. Следовательно,
        # записи вначале нужно распаковать обратно в Python-словарь.
        # -----------------------------------------------------------
        # tp.Any, потому что мы не знаем полную схему record-а.
        unpacked_record_dict: tp.Dict[str, tp.Any] = json.loads(record)
        # После распаковки наша запись может быть одного из двух этих типов
        unpacked_record: tp.Optional[tp.Union[DesktopRecord, IOSRecord]] = None 
        if unpacked_record_dict['app'] == 'Desktop':
            unpacked_record = DesktopRecord.from_json_string(record)
        elif unpacked_record_dict['app'] == 'iOS':
            unpacked_record = IOSRecord.from_json_string(record)
        # Если в поле 'app' пришло что-то, что мы не умеем парсить,
        # нужно кинуть ошибку. Лучше так, чем молча сделать новые ошибки.
        if unpacked_record is None:
            raise NotImplementedError(
                'Unknown data source.\n'
                'Expected \'app\' to be either \'Desktop\' or \'iOS\'.'
            )
        for item_id in unpacked_record.purchased_items:
            item_counter[str(item_id)] += 1

    # Метод most_common позволяет получить самые частотные товары
    top_items: tp.List[tp.Tuple[str, int]] = (
        item_counter.most_common(n_most_frequent)
    )

    # Теперь нужно сформировать запись и запаковать её обратно в JSON.
    # Т.к. обработка данных потоковая, нужно возвращать записи по мере
    # готовности. Это позволяет обрабатывать группы с разными user_id
    # независимо друг от друга (как и должно быть по смыслу). Такие
    # функции в Python принято называть генераторами. 
    # ------------------------------------------------------------------------
    # tp.Union, потому что значения могут быть как строковыми, так и числовыми
    packed_top_items: tp.List[tp.Dict[str, tp.Union[str, int]]] = [
        dict(item_id=item_id, times_purchased=times_purchased) 
        for item_id, times_purchased in top_items
    ]
    yield json.dumps(dict(user_id=user_id, top_items=packed_top_items))


Overwriting item_counting_reducer.py


In [15]:
!mypy item_counting_reducer.py

[1m[32mSuccess: no issues found in 1 source file[m


Внимательный читатель заметит, что мы не обновили тесты, потому они обязательно упадут при следующем запуске: в самом деле, мы не добавили в тестовые данные поле `app`, и наш код поэтому кинет ошибку. Здесь мы можем это проигнорировать, т.к. задача учёбная, но в реальной жизни, разумеется, нужно __всегда__ запускать тесты перед тем, как сделать `git push`.