-
Notifications
You must be signed in to change notification settings - Fork 734
initial scd2 #27430
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
initial scd2 #27430
Changes from all commits
1f519db
eed7038
ee5b21d
72ec449
5b1dbc7
d3883a6
0a5b9de
196a5d3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,324 @@ | ||
| # Использование процесса подмерживания изменения для реализации SCD2 в {{ ydb-full-name }} | ||
|
|
||
| В этой статье описывается реализация паттерна [Slowly Changing Dimensions Type 2 (SCD2)](./index.md#scd2) в {{ ydb-full-name }} с использованием процесса подмерживания изменений. | ||
|
|
||
| ## Используемые инструменты | ||
|
|
||
| Для поставки данных в SCD2 таблицу в данной статье будет использоваться следующая комбинация из доступной в {{ ydb-short-name }} функциональности: | ||
|
|
||
| 1. Таблица-источник `dimension_scd_changes`, содержащая информацию об атрибутах, их значениях и моментах изменений данных. | ||
| 1. Таблица-приёмник `dimension_scd2_final` для хранения результирующих данных. | ||
| 1. Периодически внешнее приложение должно вызывать запрос, который будет подмерживать изменения данных, накопившиеся в таблице `dimension_scd_changes`, в таблицу `dimension_scd2_final`. | ||
| 1. Для поставки данных из строковых таблиц для хранения их в формате SCD2 удобно использовать встроенный в {{ydb-short-name}} механизм [трансфера](../../../concepts/transfer.md). | ||
|
|
||
| {% note info %} | ||
|
|
||
| Таблицы `dimension_scd_changes`, `dimension_scd2_final` приведены для иллюстрации. Для реальных запросов вам нужно скорректировать структуру таблиц и их атрибутов. | ||
|
|
||
| {% endnote %} | ||
|
|
||
| ## Создание таблицы для приёма всех изменений `dimension_scd_changes` | ||
|
|
||
| ```sql | ||
| CREATE TABLE dimension_scd_changes ( | ||
| id Utf8 NOT NULL, -- Бизнес-ключ | ||
| attribute1 Utf8, -- Атрибут данных | ||
| attribute2 Utf8, -- Атрибут данных | ||
| change_time Timestamp NOT NULL, -- Момент изменения данных | ||
| operation Utf8, -- Тип изменений данных | ||
| PRIMARY KEY (change_time, id) | ||
| ) | ||
| PARTITION BY HASH(change_time, id) | ||
| WITH ( | ||
| STORE=COLUMN | ||
| ) | ||
| ``` | ||
|
|
||
| Описание полей таблицы: | ||
|
|
||
| - `id` — бизнес-ключ записи; | ||
| - `attribute1`, `attribute2` — атрибуты измерения; | ||
| - `change_time` — момент времени изменения данных; | ||
| - `operation` — тип изменения данных: `CREATE`, `UPDATE`, `DELETE`. | ||
|
|
||
| Первичный ключ создается как `PRIMARY KEY (change_time, id)`, так как по одному и тому же бизнес-ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять. Подробнее про выбор первичного ключа и ключа партиционирования, можно прочесть в документации - [выбор первичного ключа](../../../dev/primary-key/column-oriented.md##vybor-pervichnogo-klyucha), [выбор ключа партиционирования](../../../dev/primary-key/column-oriented.md##vybor-klyucha-particionirovaniya) | ||
|
|
||
| ## Создание финальной SCD2 таблицы `dimension_scd2_final` | ||
|
|
||
| ```sql | ||
| CREATE TABLE dimension_scd2_final ( | ||
| id Utf8 NOT NULL, -- Бизнес-ключ данных | ||
| attribute1 Utf8, -- Атрибут данных | ||
| attribute2 Utf8, -- Атрибут данных | ||
| valid_from Timestamp NOT NULL, -- Момент времени, с которого данные актуальны | ||
| valid_to Timestamp, -- Момент времени, до которого данные актуальны. | ||
| -- Если данные актуальны прямо сейчас, то в valid_to находится NULL | ||
| is_current Uint8, -- Признак, что данные актуальны прямо сейчас. | ||
| is_deleted Uint8, -- Признак, что данные были удалены. Если данные были удалены, то is_current = FALSE | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| PRIMARY KEY (valid_from, id) | ||
| ) | ||
| PARTITION BY HASH(valid_from, id) | ||
| WITH( | ||
| STORE=COLUMN | ||
| ) | ||
| ``` | ||
|
|
||
| Описание полей таблицы: | ||
|
|
||
| - `id` — бизнес-ключ записи; | ||
| - `attribute1`, `attribute2` — атрибуты измерения; | ||
| - `valid_from` — момент времени, с которого запись становится актуальной; | ||
| - `valid_to` — момент времени, до которого запись была актуальной, или `NULL` для текущих записей; | ||
| - `is_current` — флаг, указывающий, является ли запись текущей (1 - текущая запись) или (0 - запись историческая); | ||
| - `is_deleted` — флаг, указывающий, была ли запись удалена (1 - запись была удалена) или (0 - запись не была удалена). | ||
|
|
||
| Первичный ключ создается как `PRIMARY KEY (valid_from, id)`, так как по одному и тому же ключу данных может происходить множество изменений и все эти изменения по одному ключу важно сохранять. | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| ## Загрузка данных в таблицу изменений | ||
|
|
||
| Для загрузки данных в таблицу изменений можно использовать любой способ загрузки данных и автоматическую поставку изменений с помощью механизма [трансфер](../../../concepts/transfer.md). | ||
|
|
||
| Пример запроса для явной загрузки изменений: | ||
|
|
||
| ```sql | ||
| UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) | ||
| VALUES ('CUSTOMER_1001', 'John Doe', 'Los Angeles', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE'); | ||
|
|
||
| UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) | ||
| VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T17:00:00Z' as Timestamp)), 'CREATE'); | ||
|
|
||
| UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) | ||
| VALUES ('CUSTOMER_1001', 'John Doe', 'San Francisco', Unwrap(CAST('2025-08-22T19:00:00Z' as Timestamp)), 'UPDATE'); | ||
|
|
||
| UPSERT INTO dimension_scd_changes (id, attribute1, attribute2, change_time, operation) | ||
| VALUES ('CUSTOMER_1002', 'John Doe', 'New York', Unwrap(CAST('2025-08-22T21:00:00Z' as Timestamp)), 'DELETE'); | ||
| ``` | ||
|
|
||
|
|
||
| ## Запрос для размещения изменений в формате SCD2 | ||
|
|
||
| Чтобы преобразовать данные из таблицы изменений в формат SCD2 и загрузить их в финальную таблицу, используется специальный запрос. Этот запрос нужно запускать регулярно — с такой периодичностью, с какой вы хотите обновлять данные в финальной таблице. Для автоматического запуска можно воспользоваться [интеграцию](../../../integrations/orchestration/airflow.md) {{ ydb-short-name }} с Apache Airflow™: | ||
|
|
||
| ```sql | ||
| -- Шаг 1: Читаем все новые события из таблицы `dimension_scd_changes`. | ||
| -- Это именованное выражение ($changes) является исходным набором данных для всей последующей обработки в рамках этого запуска. | ||
| $changes = ( | ||
| SELECT | ||
| id, | ||
| attribute1, | ||
| attribute2, | ||
| change_time, | ||
| String::AsciiToUpper(operation) AS op | ||
| FROM dimension_scd_changes | ||
| ); | ||
|
|
||
| -- Шаг 2: Фильтруем события, оставляя только те, которых еще нет в целевой таблице. | ||
| -- Цель этого шага - обеспечить идемпотентность на уровне чтения, чтобы не обрабатывать | ||
| -- уже загруженные данные в случае сбоя и перезапуска скрипта. | ||
| $unprocessed_data = ( | ||
| SELECT | ||
| chg.id AS id, | ||
| chg.attribute1 AS attribute1, | ||
| chg.attribute2 AS attribute2, | ||
|
Comment on lines
+121
to
+122
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. если в новом изменении меняется только часть атрибутов, то выходит мы сотрём NULL'ом остальные? кажется тут конструкция из
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Кажется это задача источника поставлять данные в консистентном порядке |
||
| chg.change_time AS change_time, | ||
| chg.op AS op | ||
| FROM $changes AS chg | ||
| LEFT JOIN dimension_scd2_final AS scd | ||
| ON chg.id = scd.id AND chg.change_time = scd.valid_from -- Ищем записи по каждой сущности (id) и времени изменения | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. погоди, если change_time — это время нового только что приехавшего изменения, то откуда в уже подмёрдженном dimension_scd2_final по этому таймстемпу что-то найдётся в valid_from?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Как раз это ровно для того, чтобы не выфильтровывать события которых нет в таблице по паре id/change_time |
||
| WHERE scd.id IS NULL -- для исключения строк, которые уже были перенесены в таблицу dimension_scd2_final ранее | ||
| ); | ||
|
|
||
| -- Шаг 3: Находим в целевой таблице активные записи (`is_current=1`), для которых пришли обновления. | ||
| -- Формируем для них "закрывающие" версии, устанавливая `valid_to` равным времени | ||
| -- самого первого изменения из новой пачки ($unprocessed_data). | ||
| $close_open_intervals = ( | ||
| SELECT | ||
| target.id AS id, | ||
| target.attribute1 as attribute1, | ||
| target.attribute2 as attribute2, | ||
| target.valid_from as valid_from, | ||
| 0ut AS is_current, -- Закрываемая запись больше не является текущей | ||
| unprocessed_data.change_time AS valid_to, | ||
| target.is_deleted as is_deleted | ||
| FROM dimension_scd2_final AS target | ||
| INNER JOIN ( | ||
| SELECT | ||
| id, | ||
| MIN(change_time) AS change_time | ||
| FROM $unprocessed_data | ||
| GROUP BY id | ||
| ) AS unprocessed_data | ||
| ON target.id = unprocessed_data.id | ||
| WHERE target.is_current = 1ut | ||
| ); | ||
|
|
||
| -- Шаг 4: Преобразуем поток необработанных событий в версионные записи (строки для вставки). | ||
| -- Здесь вычисляются все необходимые атрибуты для новых версий: `valid_to`, `is_current`, `is_deleted`. | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| $updated_data = ( | ||
| SELECT | ||
| t.id AS id, | ||
| t.attribute1 AS attribute1, | ||
| t.attribute2 AS attribute2, | ||
| t.is_deleted AS is_deleted, | ||
| -- Логика флага `is_current`: он устанавливается в 1 только для последней | ||
| -- записи в цепочке (`next_change_time IS NULL`), и только если это не | ||
| -- операция удаления (`is_deleted == 0`). | ||
| IF(t.next_change_time IS NOT NULL OR t.is_deleted == 1ut, 0ut, 1ut) AS is_current, | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| t.change_time AS valid_from, | ||
| t.next_change_time AS valid_to | ||
| FROM ( | ||
| -- Подзапрос вычисляет для каждой строки флаг удаления (`is_deleted`) | ||
| -- и временную метку следующего события (`next_change_time`) с помощью оконной функции LEAD. | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| SELECT | ||
| unprocessed_data.id AS id, | ||
| unprocessed_data.attribute1 AS attribute1, | ||
| unprocessed_data.attribute2 AS attribute2, | ||
| unprocessed_data.op AS op, | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| unprocessed_data.change_time AS change_time, | ||
| IF(unprocessed_data.op = "DELETE", 1ut, 0ut) AS is_deleted, | ||
| LEAD(unprocessed_data.change_time) OVER (PARTITION BY id ORDER BY unprocessed_data.change_time) AS next_change_time | ||
| FROM $unprocessed_data AS unprocessed_data | ||
| ) AS t | ||
| ); | ||
|
|
||
| -- Шаг 5: Атомарно применяем все рассчитанные изменения к целевой таблице. | ||
| -- UPSERT обновит существующие записи (из $close_open_intervals) и вставит новые (из $updated_data). | ||
| UPSERT INTO dimension_scd2_final (id, attribute1, attribute2, is_current, is_deleted, valid_from, valid_to) | ||
| SELECT | ||
| id, | ||
| attribute1, | ||
| attribute2, | ||
| is_current, | ||
| is_deleted, | ||
| valid_from, | ||
| valid_to | ||
| FROM $close_open_intervals | ||
| UNION ALL | ||
| SELECT | ||
| id, | ||
| attribute1, | ||
| attribute2, | ||
| is_current, | ||
| is_deleted, | ||
| valid_from, | ||
| valid_to | ||
| FROM $updated_data; | ||
|
|
||
| -- Шаг 6: Очищает стейджинг-таблицу от обработанных записей. | ||
| DELETE FROM dimension_scd_changes ON | ||
| SELECT id, change_time FROM $changes; | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| ``` | ||
|
|
||
| ## Демонстрация работы | ||
|
|
||
| В примере ниже рассматривается сущность **Customer**: | ||
|
|
||
| - бизнес-ключ — поле `id`, | ||
| - атрибуты — `attribute1` (полное имя) и `attribute2` (город). | ||
|
|
||
| В момент времени `2025-08-22 17:00` создаются два клиента (John в Los Angeles - с id `CUSTOMER_1001` и Judy в New York с id `CUSTOMER_1002`), в момент времени `2025-08-22 19:00` клиент `CUSTOMER_1001` меняет город на San Francisco `UPDATE`, а в момент `2025-08-22 21:00` клиент `CUSTOMER_1002` удаляется `DELETE`. | ||
|
|
||
| | id | attribute1 | attribute2 | change\_time | operation | | ||
| | -------------- | ---------- | ------------- | ---------------- | --------- | | ||
| | CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | CREATE | | ||
| | CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | CREATE | | ||
| | CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | UPDATE | | ||
| | CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 21:00 | DELETE | | ||
|
|
||
| Процесс SCD2 преобразует такие события в интервальные версии записей с полями `valid_from` и `valid_to`. Например, у `CUSTOMER_1001` получится две последовательные версии: сначала с городом LA, затем с городом SF (актуальная запись, у которой `valid_to = NULL`). У `CUSTOMER_1002` будет одна устаревшая версия и последняя запись с флагами `is_deleted=1` и `is_current=0`, которая показывает, что пользователь удалён. | ||
|
|
||
| Ниже показаны исходные события и соответствующие им версии в финальной таблице. | ||
|
|
||
| ```mermaid | ||
| gantt | ||
| title История изменения данных | ||
| dateFormat YYYY-MM-DD HH:mm | ||
| axisFormat %H:%M | ||
| todayMarker off | ||
|
|
||
| section CUSTOMER_1001 — John Doe | ||
| Los Angeles :done, t0, 2025-08-22 17:00, 2025-08-22 19:00 | ||
| San Francisco (is_current) :active, t1, 2025-08-22 19:00, 2025-08-23 00:00 | ||
|
|
||
| section CUSTOMER_1002 — Judy Doe | ||
| New York :done, t0b, 2025-08-22 17:00, 2025-08-22 21:00 | ||
|
|
||
| ``` | ||
|
|
||
| | id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted | | ||
| | -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- | | ||
| | CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | 2025-08-22 19:00 | 0 | 0 | | ||
| | CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 | | ||
| | CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | 2025-08-22 21:00 | 0 | 0 | | ||
| | CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 21:00 | NULL | 0 | 1 | | ||
|
|
||
|
|
||
| ## Получение данных из SCD2-таблицы | ||
|
|
||
| ### Получение актуальных данных | ||
|
|
||
| ```sql | ||
| SELECT | ||
| id, | ||
| attribute1, | ||
| attribute2, | ||
| valid_from, | ||
| valid_to | ||
| FROM dimension_scd2_final | ||
| WHERE is_current = 1ut; | ||
| ``` | ||
|
|
||
| Результат: | ||
|
|
||
| | id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted | | ||
| | -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- | | ||
| | CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 | | ||
|
|
||
|
|
||
| ### Получение данных на определённый момент времени | ||
|
|
||
| ```sql | ||
| $as_of = Timestamp("2025-08-22T19:11:30.000000Z"); | ||
|
|
||
| SELECT | ||
| id, | ||
| attribute1, | ||
| attribute2, | ||
| valid_from, | ||
| valid_to | ||
| FROM dimension_scd2_final | ||
| WHERE valid_from <= $as_of | ||
| AND (valid_to IS NULL OR valid_to > $as_of) -- Получаем записи, которые действовали в $as_of момент времени | ||
| AND is_deleted = 0ut -- Только записи, которые не удалены | ||
| ``` | ||
|
|
||
| Результат: | ||
|
|
||
| | id | attribute1 | attribute2 | valid\_from | valid\_to | | ||
| | -------------- | ---------- | ------------- | ---------------- | ---------------- | | ||
| | CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | | ||
| | CUSTOMER\_1002 | Judy Doe | New York | 2025-08-22 17:00 | 2025-08-22 21:00 | | ||
|
|
||
|
|
||
| ### Получение истории изменений для конкретной записи | ||
|
|
||
| ```sql | ||
| SELECT | ||
| id, | ||
| attribute1, | ||
| attribute2, | ||
| valid_from, | ||
| valid_to, | ||
| is_current, | ||
| is_deleted | ||
orange13 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| FROM dimension_scd2_final | ||
| WHERE id = 'CUSTOMER_1001' | ||
| ORDER BY valid_from; | ||
| ``` | ||
|
|
||
| Результат: | ||
|
|
||
| | id | attribute1 | attribute2 | valid\_from | valid\_to | is\_current | is\_deleted | | ||
| | -------------- | ---------- | ------------- | ---------------- | ---------------- | ----------- | ----------- | | ||
| | CUSTOMER\_1001 | John Doe | Los Angeles | 2025-08-22 17:00 | 2025-08-22 19:00 | 0 | 0 | | ||
| | CUSTOMER\_1001 | John Doe | San Francisco | 2025-08-22 19:00 | NULL | 1 | 0 | | ||
Uh oh!
There was an error while loading. Please reload this page.