![Название изображения](poster_event_1930847.jpg)

<p>Мне сказали, что у вас любят шутки :-)</p>

## Контекст

Вы работаете с покерной платформой, на которой существует несколько типов разметок игроков:
<ol>
    <li>Автоматические разметки:</li>
    <ul>
        <li>Запускаются ежедневно через Airflow DAG.</li>
        <li>Игроки получают запись в таблице, если попадают в разметку, и не получают, если не попадают.</li>
    </ul>
    <li>Ручные разметки:</li>
    <ul>
        <li>Формируются операторами вручную: при попадании добавляется новая строка в таблицу, если игрок больше не должен там быть - статус меняется на амнистирован</li>
        <li>Данные хранятся в файлах, обработка которых также выполняется через Airflow DAG.</li>
    </ul>
</ol>
<p>Примерный объем записей 100 тысяч по разметке в день.</p>

## Проблема
<p>Разметки создавались разными командами в разное время и имеют различные форматы.</p>

## Задача 
<p>Разработать архитектуру сводной таблицы, объединяющей разметки в единую структуру.</p>

## Цели
<ol>
    <li>Одним простым запросом понимать состояние игрока на конкретный момент времени - в каких разметках у него метка 1</li>
    <li>Хранить историю игрока: когда попал в разметку, когда перестал попадать, когда попал снова</li>
</ol>

## Подход к решению проблемы и достижению целей

<p>Исходя из 1 цели, мы можем предположить, что нужен запрос вида
<pre><code>
SELECT * FROM player_annotations pa WHERE pa.Target = 1
</code></pre>
</p>
<p>Либо, если нам нужен конкретный игрок</p>
<pre><code>
SELECT * FROM player_annotations pa WHERE pa.Target = 1 AND pa.PlayerID = 25
</code></pre>

<p>Здесь есть нюанс, что метка - по сути означает, что у игрока нет записи Detected от прошлого числа из автоматической разметки или нет записей в ручной разметке. В нашем случае проще всего будет использовать логику пустой даты. <br>
<b>Пример 1:</b> игрок получил автоматический Detected 10.02.2025 и мы записали это в конечную сводную таблицу с пустой датой окончания. В день, когда автоматическая система не присвоила ему Detected, мы ставим дату окончания. <br>
<b>Пример 2:</b> игрока записали в файл 10.02.2025 и мы спарсили это в конечную таблицу с пустой датой окончания. В день, когда появилась запись Amnisted, мы ставим дату окончания. <br>
</p>

<p>Соответственно, SQL запрос изменится:
<pre><code>
SELECT * FROM player_annotations pa WHERE pa.ValidUntil IS NULL AND pa.PlayerID = 25
</code></pre>
</p>
С помощью такой логики, мы получим всего 2 возможных строки: <br>
<ul>
    <li>для автоматической разметки</li>
    <li>для ручной разметки</li>
</ul>
А по запросу
<pre><code>
SELECT * FROM player_annotations pa WHERE pa.PlayerID = 25
</code></pre>
Мы получим всю историю игрока, тем самым мы достигнем второй цели

## Структура сводной таблицы
<p>
    <pre><code>
CREATE TABLE player_annotations (
    ID UInt64 DEFAULT rowNumber() OVER (),
    PlayerID UInt64,                
    DetectedType Enum('Automatic' = 1, 'Manual' = 2),  
    ValidFrom DateTime,              
    ValidUntil Nullable(DateTime),  
    CreatedBy String,              
    PRIMARY KEY (PlayerID, ID)  
) ENGINE = MergeTree()
ORDER BY (PlayerID, ID);


</code></pre>
</p>

<p>Зачем нужно поле <b>CreatedBy?</b><br>
Исходя из своего опыта, могу сказать, что это поле нужно для дальнейшей аналитики и раздачи кнутов и пряников. Когда нам потребуется понять кто из операторов выдал Detected или Amnisted, количество и т.д. С автоматической всё понятно.
</p>

## Логика добавления записей

![изображения](diagram.png)

<p>Как видно из графа, у нас будет корень - выгрузка из сводной таблицы записей с пустой датой и 2 ветви Auto и Manual. В Airflow мы можем пойти двумя путями:
<ul>
    <li>распараллелить эти две таски</li>
    <li>выполнять их друг за другом</li>
</ul>
Из ТЗ могу предположить, что эти две разметки не влияют друг на другом, а значит нам подойдёт любой вариант. <br>
Но я выберу вариант с параллельным выполнением, так как один из источников может быть недоступен. К примеру, у нас отвалилась БД или сетевой диск с файлами.
</p>

In [1]:
from airflow import DAG
from airflow.operators.python import PythonOperator
import pandas as pd
import clickhouse_connect
from datetime import datetime, timedelta

CLICKHOUSE_HOST = 'Хост'
CLICKHOUSE_DB = 'БД'


def fetch_null_valid_until(**kwargs):
    """
    Извлекает записи из таблицы player_annotations, 
    где поле ValidUntil имеет значение NULL.
    Данные загружаются в DataFrame и передаются 
    в XCom для последующего использования.
    Параметры:
        kwargs (dict): Словарь параметров Airflow, содержащий контекст выполнения DAG.
    Возвращает:
        None: Данные сохраняются в XCom с ключом 'player_data'.
    """
    client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, database=CLICKHOUSE_DB)

    #Здесь я специально выбираю конкретные поля, чтобы получить ожидаемый результат
    #так как структура таблицы может измениться (добавят новое поле)
    #защита от дурака =)
    query = """
        SELECT pa.ID, pa.PlayerID, pa.DetectedType, pa.ValidFromDateTime, 
               pa.ValidUntil, pa.CreatedBy
        FROM player_annotations pa 
        WHERE pa.ValidUntil IS NULL
    """
    
    result = client.query(query)
    df = pd.DataFrame(result.result_rows, columns=[col[0] for col in result.result_columns])

    #записываю в словарь параметров, чтобы перекидывать между функциями
    kwargs['TillDetected'].xcom_push(key='player_data', value=df.to_dict())


def auto_annotation(**kwargs):
    """
    Функция обработки автоматической разметки. 
    Записывает новых игроков с пометкой Detected и обновляет записи для старых.
    По PEP8 её нужно разбить на 2 отдельные функции, но в рамках тестового мне это делать лень =)
    Параметры:
        kwargs (dict): Словарь параметров Airflow, содержащий контекст выполнения DAG.
    Возвращает:
        None: Данные сохраняются в XCom с ключом 'player_data'.
    """
    ti = kwargs['TillDetected']
    ds = kwargs['ds']  # Здесь возможно использовать {ds}, вместо kwargs['ds']
    
    df = pd.DataFrame.from_dict(ti.xcom_pull(task_ids='fetch_null_valid_until', key='player_data'))
    
    client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, database=CLICKHOUSE_DB)
    
    # Предположим, что таблица которая автоматический размечает игроков называется logs
    query_logs = f"""
        SELECT l.PlayerID, l.DetectedAt 
        FROM logs l 
        WHERE l.DetectedAt = '{ds}'
    """
    logs_df = client.query_df(query_logs)
    
    # Проверяем, каких PlayerID нет в df, но есть в logs
    missing_players = logs_df[~logs_df['PlayerID'].isin(df['PlayerID'])]
    
    #Здесь сформируется запись вида "5 Auto 2025.02.19 None Auto"
    #Т.е. при выгрузке SELECT * FROM player_annotations pa WHERE pa.PlayerID = 5
    #AND pa.ValidUntil IS NULL
    #мы получим именно эту запись или вместе с ручной
    new_records = [
        (uuid.uuid4().int, row['PlayerID'], "Auto", ds, None, "Auto") 
        for _, row in missing_players.iterrows()
    ]
    
    if new_records:
        client.insert(
            'player_annotations',
            new_records,
            column_names=['ID', 'PlayerID', 'DetectedType', 'ValidFrom', 'ValidUntil', 'Status']
        )
    
    # Проверяем, кого из PlayerID в df нет в logs
    missing_in_logs = df[~df['PlayerID'].isin(logs_df['PlayerID'])]

    #А здесь, мы завершаем действие записи. Т.е. у нас была запись вида "5 Auto 2025.02.19 None Auto"
    #Теперь мы делаем её "5 Auto 2025.02.19 2025.02.25 Auto", что будет означать игрок пробыл в авто-разметке с 19 по 25 февраля
    if not missing_in_logs.empty:
        for player_id in missing_in_logs['PlayerID']:
            update_query = f"""
                ALTER TABLE player_annotations pa
                UPDATE pa.ValidUntil = '{ds}' 
                WHERE pa.PlayerID = '{player_id}' AND pa.ValidUntil IS NULL AND pa.DetectedType = 'Auto'
            """
            client.command(update_query)

            
def manual_annotation(**kwargs):
    """
    Функция обработки ручной аннотации.
    Записывает новых игроков с пометкой Detected и обновляет записи для старых добавленных через файлы.
    Параметры:
        kwargs (dict): Словарь параметров Airflow, содержащий контекст выполнения DAG.
    Возвращает:
        None: Данные сохраняются в XCom с ключом 'player_data'.
   
    """
    ti = kwargs['TillDetected']
    ds = kwargs['ds'] 
    tasks_folder = "Tasks"
    
    all_data = []
    
    #Считываем все файлы в папке
    for filename in os.listdir(tasks_folder):
        filepath = os.path.join(tasks_folder, filename)
        if os.path.isfile(filepath) and filename.endswith(".csv"):
            df = pd.read_csv(filepath)
            
            # Защита от дурака. Оператор мог добавить новый столбец с комментариями
            # и мы можем не получить ожидаемого результата
            required_columns = {'DetectedAt', 'PlayerID', 'Status'}
            if not required_columns.issubset(df.columns):
                continue
            
            # Любая запись где есть хоть один пропуск не имеет смысла
            # С пропущенной датой - непонятно когда начинать/заканчивать
            # С пропущенным PlayerID - непонятно кому выдаём
            # С пропущенным Status - непонятно, наказываем или амнистируем
            df = df.dropna(subset=required_columns)
            
            # Добавляем имя файла в колонку 
            #(вообще тут должно быть поле, которое отвечает за оператора, но пока просто файл)
            df['FileName'] = filename
            
            all_data.append(df)
    
    if not all_data:
        return
    
    combined_df = pd.concat(all_data, ignore_index=True)

    #Мы предполагаем, что записей "задним числом" быть не может и поэтому отбираем только сегодняшние записи
    filtered_df = combined_df[combined_df['DetectedAt'] == ds]
    
    client = clickhouse_connect.get_client(host=CLICKHOUSE_HOST, database=CLICKHOUSE_DB)
    
    #Здесь сформируется запись вида "5 Manual 2025.02.19 None Petrenko.csv"
    #Т.е. при выгрузке SELECT * FROM player_annotations pa WHERE pa.PlayerID = 5
    #AND pa.ValidUntil IS NULL
    #мы получим именно эту запись или вместе с автоматической
    detected_records = [
        (row['PlayerID'], "Manual", ds, None, row['FileName'])
        for _, row in filtered_df[filtered_df['Status'] == "Detected"].iterrows()
    ]
    
    if detected_records:
        client.insert(
            'player_annotations',
            detected_records,
            column_names=['PlayerID', 'DetectedType', 'ValidFrom', 'ValidUntil', 'CreatedBy']
        )
    
    # А здесь, мы завершаем действие ручной записи
    amnisted_players = filtered_df[filtered_df['Status'] == "Amnisted"]['PlayerID'].tolist()
    
    if amnisted_players:
        for player_id in amnisted_players:
            update_query = f"""
                ALTER TABLE player_annotations pa 
                UPDATE pa.ValidUntil = '{ds}' 
                WHERE pa.PlayerID = '{player_id}' AND pa.ValidUntil IS NULL AND pa.DetectedType = 'Manual'
            """
            client.command(update_query)


# Определяем DAG
default_args = {
    'owner': 'Deeplay',
    'start_date': datetime(2025, 1, 1),
    'retries': 5, #делаем перезапуск 5 раз с интервалом 30 минут на случай перебоев с сетью
    'retry_delay': timedelta(minutes=30), 
}

dag = DAG(
    'clickhouse_annotation_pipeline',
    default_args=default_args,
    schedule_interval='0 1 * * *',  #запускаем каждый день в час ночи (+3 UTC если на сервере старая версия Airflow)
    catchup=False #Тут возможно True, в зависимости от того когда мы запустим и хотим ли мы добрать историю 
)

# Задача: выгрузка данных из ClickHouse
fetch_data = PythonOperator(
    task_id='fetch_null_valid_until',
    python_callable=fetch_null_valid_until,
    provide_context=True,
    dag=dag
)

# Задачи аннотации
auto_annotate = PythonOperator(
    task_id='auto_annotation',
    python_callable=auto_annotation,
    provide_context=True,
    dag=dag
)

manual_annotate = PythonOperator(
    task_id='manual_annotation',
    python_callable=manual_annotation,
    provide_context=True,
    dag=dag
)

# Очередность выполнения
# Мы будем ждать выполнения fetch_data, а вот auto и manual будут выполняться параллельно
# что гарантирует в случае падения одного из источников, что второй продолжит работу
fetch_data >> [auto_annotate, manual_annotate]


ModuleNotFoundError: No module named 'airflow'

<p>
    В целом тут ещё много можно добавить для поиска исключений или различной логики (к примеру, что Auto и Manual влияют друг на друга). <br>
    Но тут нужно уже знать контекст задачи. <br>
    Делал на личном компьютере, естественно у меня нет airflow и проверить скрипт не смог. А корпоративный Airflow имеет немного другой синтаксис, что было бы неудобно для вас. <br>
    Вместо Pandas можно использовать Polars, Spark или другие.
</p>