From 61903d4c4827506557841aadaf016278c85deec6 Mon Sep 17 00:00:00 2001 From: orange13 Date: Thu, 6 Nov 2025 15:48:18 +0300 Subject: [PATCH] BACKPORT-CONFLICT --- .../analyst/practical-guides/scd/index.md | 36 ++ .../practical-guides/scd/scd2-merge.md | 324 ++++++++++++++++++ .../analyst/practical-guides/scd/toc_p.yaml | 9 + 3 files changed, 369 insertions(+) create mode 100644 ydb/docs/ru/core/analyst/practical-guides/scd/index.md create mode 100644 ydb/docs/ru/core/analyst/practical-guides/scd/scd2-merge.md create mode 100644 ydb/docs/ru/core/analyst/practical-guides/scd/toc_p.yaml diff --git a/ydb/docs/ru/core/analyst/practical-guides/scd/index.md b/ydb/docs/ru/core/analyst/practical-guides/scd/index.md new file mode 100644 index 000000000000..f564b58de6da --- /dev/null +++ b/ydb/docs/ru/core/analyst/practical-guides/scd/index.md @@ -0,0 +1,36 @@ +# Работа с медленно меняющимися измерениями + +В текущем разделе собраны практические руководства по реализации [медленно меняющихся измерений](https://ru.wikipedia.org/wiki/Медленно_меняющееся_измерение) (Slowly Changing Dimensions, SCD) — популярного подхода к управлению историческими данными в аналитических хранилищах. + +В разделе описываются варианты SCD1 и SCD2: + +* [SCD Type 1](#scd1): Cтарые значения атрибутов заменяются новыми, сохраняя только актуальное состояние данных. +* [SCD Type 2](#scd2): Cохраняется полная история изменений путем добавления новых записей для каждой новой версии атрибута. + +## Особенности SCD1 {#scd1} + +[Медленно меняющиеся измерения (тип 1) или SCD1 (Type 1)](https://ru.wikipedia.org/wiki/Медленно_меняющееся_измерение#Тип_1) — это подход, при котором при изменении атрибута измерения старое значение заменяется новым. Хранится только текущее состояние данных. Этот подход используется, когда: + +- историческая информация не требуется; +- важно иметь только актуальные данные; +- необходимо минимизировать размер хранилища данных; +- требуется простая структура данных для аналитики. + +## Особенности SCD2 и подход append-only {#scd2} + +[Медленно меняющиеся измерения (тип 2) или SCD2 (Type 2)](https://ru.wikipedia.org/wiki/Медленно_меняющееся_измерение#Тип_2) — это подход, при котором при изменении атрибута измерения создаётся новая запись, а старая помечается как неактуальная. Таким образом, сохраняется история изменений. Этот подход используется, когда: + +- требуется отслеживать историю изменений данных; +- необходимо выполнять анализ данных с учётом временных периодов; +- важно сохранять аудиторский след изменений; +- требуется возможность восстановления состояния данных на определённый момент времени. + +## Доступные руководства + +В разделе рассматриваются различные технические способы реализации этих механизмов: + +* С использованием связки [Change Data Capture (CDC)](../../../concepts/cdc.md) и [Transfer](../../../concepts/transfer.md) для автоматической потоковой репликации изменений из таблиц-источников. + * [{#T}](scd1-transfer.md) + * [{#T}](scd2-transfer.md) +* С помощью периодических YQL-запросов, которые обрабатывают пакеты изменений из промежуточной таблицы и подмерживают их в основную SCD-таблицу. + * [{#T}](scd2-merge.md) \ No newline at end of file diff --git a/ydb/docs/ru/core/analyst/practical-guides/scd/scd2-merge.md b/ydb/docs/ru/core/analyst/practical-guides/scd/scd2-merge.md new file mode 100644 index 000000000000..b175676d4c99 --- /dev/null +++ b/ydb/docs/ru/core/analyst/practical-guides/scd/scd2-merge.md @@ -0,0 +1,324 @@ +# Использование процесса подмерживания изменения для реализации SCD2 в {{ ydb-full-name }} + +В этой статье описывается реализация паттерна [Slowly Changing Dimensions Type 2 (SCD2)](./index.md#scd2) в {{ ydb-full-name }} с использованием процесса подмерживания изменений. + +## Используемые инструменты + +Для поставки данных в SCD2 таблицу в данной статье будет использоваться следующая комбинация из доступной в {{ ydb-short-name }} функциональности: + +1. Таблица-источник `dimension_scd_changes`, содержащая информацию об атрибутах, их значениях и моментах изменений данных. +1. Таблица-приёмник `dimension_scd2_final` для хранения результирующих данных. +1. Периодически внешнее приложение должно вызывать запрос, который будет подмерживать изменения данных, накопившиеся в таблице `dimension_scd_changes`, в таблицу `dimension_scd2_final`. +1. Для поставки данных из строковых таблиц для хранения их в формате SCD2 удобно использовать встроенный в {{ydb-short-name}} механизм [трансфера](../../../concepts/transfer.md). + +{% note info %} + +Таблицы `dimension_scd_changes`, `dimension_scd2_final` приведены для иллюстрации. Для реальных запросов вам нужно скорректировать структуру таблиц и их атрибутов. + +{% endnote %} + +## Создание таблицы для приёма всех изменений `dimension_scd_changes` + +```sql +CREATE TABLE dimension_scd_changes ( + id Utf8 NOT NULL, -- Бизнес-ключ + attribute1 Utf8, -- Атрибут данных + attribute2 Utf8, -- Атрибут данных + change_time Timestamp NOT NULL, -- Момент изменения данных + operation Utf8, -- Тип изменений данных + PRIMARY KEY (change_time, id) +) +PARTITION BY HASH(change_time, id) +WITH ( + STORE=COLUMN +) +``` + +Описание полей таблицы: + +- `id` — бизнес-ключ записи; +- `attribute1`, `attribute2` — атрибуты измерения; +- `change_time` — момент времени изменения данных; +- `operation` — тип изменения данных: `CREATE`, `UPDATE`, `DELETE`. + +Первичный ключ создается как `PRIMARY KEY (change_time, id)`, так как по одному и тому же бизнес-ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять. Подробнее про выбор первичного ключа и ключа партиционирования, можно прочесть в документации - [выбор первичного ключа](../../../dev/primary-key/column-oriented.md##vybor-pervichnogo-klyucha), [выбор ключа партиционирования](../../../dev/primary-key/column-oriented.md##vybor-klyucha-particionirovaniya) + +## Создание финальной SCD2 таблицы `dimension_scd2_final` + +```sql +CREATE TABLE dimension_scd2_final ( + id Utf8 NOT NULL, -- Бизнес-ключ данных + attribute1 Utf8, -- Атрибут данных + attribute2 Utf8, -- Атрибут данных + valid_from Timestamp NOT NULL, -- Момент времени, с которого данные актуальны + valid_to Timestamp, -- Момент времени, до которого данные актуальны. + -- Если данные актуальны прямо сейчас, то в valid_to находится NULL + is_current Uint8, -- Признак, что данные актуальны прямо сейчас. + is_deleted Uint8, -- Признак, что данные были удалены. Если данные были удалены, то is_current = FALSE + PRIMARY KEY (valid_from, id) +) +PARTITION BY HASH(valid_from, id) +WITH( + STORE=COLUMN +) +``` + +Описание полей таблицы: + +- `id` — бизнес-ключ записи; +- `attribute1`, `attribute2` — атрибуты измерения; +- `valid_from` — момент времени, с которого запись становится актуальной; +- `valid_to` — момент времени, до которого запись была актуальной, или `NULL` для текущих записей; +- `is_current` — флаг, указывающий, является ли запись текущей (1 - текущая запись) или (0 - запись историческая); +- `is_deleted` — флаг, указывающий, была ли запись удалена (1 - запись была удалена) или (0 - запись не была удалена). + +Первичный ключ создается как `PRIMARY KEY (valid_from, id)`, так как по одному и тому же ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять. + +## Загрузка данных в таблицу изменений + +Для загрузки данных в таблицу изменений можно использовать любой способ загрузки данных и автоматическую поставку изменений с помощью механизма [трансфер](../../../concepts/transfer.md). + +Пример запроса для явной загрузки изменений: + +```sql +UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) +VALUES ('CUSTOMER_1001', 'John Doe', 'Los Angeles', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE'); + +UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) +VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE'); + +UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) +VALUES ('CUSTOMER_1001', 'John Doe', 'San Francisco', Unwrap(CAST('2025-08-22T19:00:00Z' as Timestamp)), 'UPDATE'); + +UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) +VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T21:00:00Z' as Timestamp)), 'DELETE'); +``` + + +## Запрос для размещения изменений в формате SCD2 + +Чтобы преобразовать данные из таблицы изменений в формат SCD2 и загрузить их в финальную таблицу, используется специальный запрос. Этот запрос нужно запускать регулярно — с такой периодичностью, с какой вы хотите обновлять данные в финальной таблице. Для автоматического запуска можно воспользоваться [интеграцию](../../../integrations/orchestration/airflow.md) {{ ydb-short-name }} с Apache Airflow™: + +```sql +-- Шаг 1: Читаем все новые события из таблицы `dimension_scd_changes`. +-- Это именованное выражение ($changes) является исходным набором данных для всей последующей обработки в рамках этого запуска. +$changes = ( + SELECT + id, + attribute1, + attribute2, + change_time, + String::AsciiToUpper(operation) AS op + FROM dimension_scd_changes +); + +-- Шаг 2: Фильтруем события, оставляя только те, которых еще нет в целевой таблице. +-- Цель этого шага - обеспечить идемпотентность на уровне чтения, чтобы не обрабатывать +-- уже загруженные данные в случае сбоя и перезапуска скрипта. +$unprocessed_data = ( + SELECT + chg.id AS id, + chg.attribute1 AS attribute1, + chg.attribute2 AS attribute2, + chg.change_time AS change_time, + chg.op AS op + FROM $changes AS chg + LEFT JOIN dimension_scd2_final AS scd + ON chg.id = scd.id AND chg.change_time = scd.valid_from -- Ищем записи по каждой сущности (id) и времени изменения + WHERE scd.id IS NULL -- для исключения строк, которые уже были перенесены в таблицу dimension_scd2_final ранее +); + +-- Шаг 3: Находим в целевой таблице активные записи (`is_current=1`), для которых пришли обновления. +-- Формируем для них "закрывающие" версии, устанавливая `valid_to` равным времени +-- самого первого изменения из новой пачки ($unprocessed_data). +$close_open_intervals = ( + SELECT + target.id AS id, + target.attribute1 as attribute1, + target.attribute2 as attribute2, + target.valid_from as valid_from, + 0ut AS is_current, -- Закрываемая запись больше не является текущей + unprocessed_data.change_time AS valid_to, + target.is_deleted as is_deleted + FROM dimension_scd2_final AS target + INNER JOIN ( + SELECT + id, + MIN(change_time) AS change_time + FROM $unprocessed_data + GROUP BY id + ) AS unprocessed_data + ON target.id = unprocessed_data.id + WHERE target.is_current = 1ut +); + +-- Шаг 4: Преобразуем поток необработанных событий в версионные записи (строки для вставки). +-- Здесь вычисляются все необходимые атрибуты для новых версий: `valid_to`, `is_current`, `is_deleted`. +$updated_data = ( + SELECT + t.id AS id, + t.attribute1 AS attribute1, + t.attribute2 AS attribute2, + t.is_deleted AS is_deleted, + -- Логика флага `is_current`: он устанавливается в 1 только для последней + -- записи в цепочке (`next_change_time IS NULL`), и только если это не + -- операция удаления (`is_deleted == 0`). + IF(t.next_change_time IS NOT NULL OR t.is_deleted == 1ut, 0ut, 1ut) AS is_current, + t.change_time AS valid_from, + t.next_change_time AS valid_to + FROM ( + -- Подзапрос вычисляет для каждой строки флаг удаления (`is_deleted`) + -- и временную метку следующего события (`next_change_time`) с помощью оконной функции LEAD. + SELECT + unprocessed_data.id AS id, + unprocessed_data.attribute1 AS attribute1, + unprocessed_data.attribute2 AS attribute2, + unprocessed_data.op AS op, + unprocessed_data.change_time AS change_time, + IF(unprocessed_data.op = "DELETE", 1ut, 0ut) AS is_deleted, + LEAD(unprocessed_data.change_time) OVER (PARTITION BY id ORDER BY unprocessed_data.change_time) AS next_change_time + FROM $unprocessed_data AS unprocessed_data + ) AS t +); + +-- Шаг 5: Атомарно применяем все рассчитанные изменения к целевой таблице. +-- UPSERT обновит существующие записи (из $close_open_intervals) и вставит новые (из $updated_data). +UPSERT INTO dimension_scd2_final (id, attribute1, attribute2, is_current, is_deleted, valid_from, valid_to) +SELECT + id, + attribute1, + attribute2, + is_current, + is_deleted, + valid_from, + valid_to +FROM $close_open_intervals +UNION ALL +SELECT + id, + attribute1, + attribute2, + is_current, + is_deleted, + valid_from, + valid_to +FROM $updated_data; + +-- Шаг 6: Очищает стейджинг-таблицу от обработанных записей. +DELETE FROM dimension_scd_changes ON +SELECT id, change_time FROM $changes; +``` + +## Демонстрация работы + +В примере ниже рассматривается сущность **Customer**: + +- бизнес-ключ — поле `id`, +- атрибуты — `attribute1` (полное имя) и `attribute2` (город). + +В момент времени `2025-08-22 17:00` создаются два клиента (John в Los Angeles - с id `CUSTOMER_1001` и Judy в New York с id `CUSTOMER_1002`), в момент времени `2025-08-22 19:00` клиент `CUSTOMER_1001` меняет город на San Francisco `UPDATE`, а в момент `2025-08-22 21:00` клиент `CUSTOMER_1002` удаляется `DELETE`. + +| id | attribute1 | attribute2 | change\_time | operation | +| -------------- | ---------- | ------------- | ---------------- | --------- | +| CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | CREATE | +| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | CREATE | +| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | UPDATE | +| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 21:00 | DELETE | + +Процесс SCD2 преобразует такие события в интервальные версии записей с полями `valid_from` и `valid_to`. Например, у `CUSTOMER_1001` получится две последовательные версии: сначала с городом LA, затем с городом SF (актуальная запись, у которой `valid_to = NULL`). У `CUSTOMER_1002` будет одна устаревшая версия и последняя запись с флагами `is_deleted=1` и `is_current=0`, которая показывает, что пользователь удалён. + +Ниже показаны исходные события и соответствующие им версии в финальной таблице. + +```mermaid + gantt + title История изменения данных + dateFormat YYYY-MM-DD HH:mm + axisFormat %H:%M + todayMarker off + + section CUSTOMER_1001 — John Doe + Los Angeles :done, t0, 2025-08-22 17:00, 2025-08-22 19:00 + San Francisco (is_current) :active, t1, 2025-08-22 19:00, 2025-08-23 00:00 + + section CUSTOMER_1002 — Judy Doe + New York :done, t0b, 2025-08-22 17:00, 2025-08-22 21:00 + +``` + +| id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted | +| -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- | +| CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | 2025-08-22 19:00 | 0 | 0 | +| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 | +| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | 2025-08-22 21:00 | 0 | 0 | +| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 21:00 | NULL | 0 | 1 | + + +## Получение данных из SCD2-таблицы + +### Получение актуальных данных + +```sql +SELECT + id, + attribute1, + attribute2, + valid_from, + valid_to +FROM dimension_scd2_final +WHERE is_current = 1ut; +``` + +Результат: + +| id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted | +| -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- | +| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 | + + +### Получение данных на определённый момент времени + +```sql +$as_of = Timestamp("2025-08-22T19:11:30.000000Z"); + +SELECT + id, + attribute1, + attribute2, + valid_from, + valid_to +FROM dimension_scd2_final +WHERE valid_from <= $as_of + AND (valid_to IS NULL OR valid_to > $as_of) -- Получаем записи, которые действовали в $as_of момент времени + AND is_deleted = 0ut -- Только записи, которые не удалены +``` + +Результат: + +| id | attribute1 | attribute2 | valid\_from | valid\_to | +| -------------- | ---------- | ------------- | ---------------- | ---------------- | +| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | +| CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | 2025-08-22 21:00 | + + +### Получение истории изменений для конкретной записи + +```sql +SELECT + id, + attribute1, + attribute2, + valid_from, + valid_to, + is_current, + is_deleted +FROM dimension_scd2_final +WHERE id = 'CUSTOMER_1001' +ORDER BY valid_from; +``` + +Результат: + +| id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted | +| -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- | +| CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | 2025-08-22 19:00 | 0 | 0 | +| CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 | diff --git a/ydb/docs/ru/core/analyst/practical-guides/scd/toc_p.yaml b/ydb/docs/ru/core/analyst/practical-guides/scd/toc_p.yaml new file mode 100644 index 000000000000..3eacfd53e9b9 --- /dev/null +++ b/ydb/docs/ru/core/analyst/practical-guides/scd/toc_p.yaml @@ -0,0 +1,9 @@ +items: + - name: Обзор + href: index.md + - name: SCD1 с использованием TRANSFER + href: scd1-transfer.md + - name: SCD2 с использованием TRANSFER + href: scd2-transfer.md + - name: SCD2 с использованием подмерживания изменений + href: scd2-merge.md \ No newline at end of file