-
Notifications
You must be signed in to change notification settings - Fork 21
Description
Is there an existing issue for this?
- I have searched the existing issues
Current Behavior
Snapshots using the new_record method for hard deletes will insert redundant entries on each execution for records already flagged with is_deleted as True. See dbt-labs/dbt-adapters#654
Additionally, when a previously deleted row comes back in the source data, the snapshot will not update the row that marked it as deleted and will not insert a new row. See dbt-labs/dbt-adapters#654 (comment)
Expected Behavior
One entry should be inserted upon the first time the record in the source is deleted. Subsequent snapshot runs should not insert new entries for records already marked with is_deleted True.
The expected result would be only 2 entries: initially with is_deleted as False and only a single entry with is_deleted as True no matter how many times the dbt snapshot command is run on top of the same source data.
Previously deleted rows should result in the last snapshot record flagged dbt_is_deleted='True'
getting dbt_valid_to
updated to the current timestamp and a new row inserted to indicate that the record has been restored and is no longer deleted.
Steps To Reproduce
See dbt-labs/dbt-adapters#654 for further info. I encountered the same issues as they show there.
Relevant log output using --debug
flag enabled
Environment
- OS: debian 12 (bookworm)
- Python: 3.12.8
- dbt: 1.9.4
- oracle adapter: 1.9.1
What Oracle database version are you using dbt with?
19c
Additional Context
I fixed this by overriding the oracle__snapshot_staging_table macro.
Diff from current macro: https://www.diffchecker.com/jLLUZl5S/
My solution below:
{% macro oracle__snapshot_staging_table(strategy, source_sql, target_relation) -%}
{% set columns = config.get('snapshot_table_column_names') or get_snapshot_table_column_names() %}
{% if strategy.hard_deletes == 'new_record' %}
{% set new_scd_id = snapshot_hash_arguments([columns.dbt_scd_id, snapshot_get_time()]) %}
{% endif %}
with snapshot_query as (
{{ source_sql }}
),
snapshotted_data as (
select {{ target_relation }}.*,
{{ unique_key_fields(strategy.unique_key) }}
from {{ target_relation }}
where
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
),
insertions_source_data as (
select
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ oracle__get_dbt_valid_to_current(strategy, columns) }},
{{ strategy.scd_id }} as {{ columns.dbt_scd_id }}
from snapshot_query
),
updates_source_data as (
select
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }},
{{ strategy.updated_at }} as {{ columns.dbt_updated_at }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_from }},
{{ strategy.updated_at }} as {{ columns.dbt_valid_to }}
from snapshot_query
),
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
deletes_source_data as (
select
snapshot_query.*,
{{ unique_key_fields(strategy.unique_key) }}
from snapshot_query
),
{% endif %}
insertions as (
select
'insert' as dbt_change_type,
source_data.*
{%- if strategy.hard_deletes == 'new_record' -%}
,'False' as {{ columns.dbt_is_deleted }}
{%- endif %}
from insertions_source_data source_data
left outer join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "snapshotted_data") }}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and ({{ strategy.row_changed }})
{%- if strategy.hard_deletes == 'new_record' -%}
or ({{ unique_key_is_not_null(strategy.unique_key, "snapshotted_data") }} and snapshotted_data.{{ columns.dbt_is_deleted }} = 'True')
{%- endif %}
)
),
updates as (
select
'update' as dbt_change_type,
source_data.*,
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from updates_source_data source_data
join snapshotted_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where (
{{ strategy.row_changed }}
)
{%- if strategy.hard_deletes == 'new_record' -%}
or snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
{%- endif %}
)
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' -%}
,
deletes as (
select
'delete' as dbt_change_type,
source_data.*,
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
{{ snapshot_get_time() }} as {{ columns.dbt_valid_to }},
snapshotted_data.{{ columns.dbt_scd_id }}
{%- if strategy.hard_deletes == 'new_record' -%}
, snapshotted_data.{{ columns.dbt_is_deleted }}
{%- endif %}
from snapshotted_data
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
{%- if strategy.hard_deletes == 'new_record' %}
and not (
--avoid updating the record's valid_to if the latest entry is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
and
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
)
{%- endif %}
)
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
{% set source_sql_cols = get_column_schema_from_query(source_sql) %}
,
deletion_records as (
select
'insert' as dbt_change_type,
{%- for col in source_sql_cols -%}
snapshotted_data.{{ adapter.quote(col.column) }},
{% endfor -%}
{%- if strategy.unique_key | is_list -%}
{%- for key in strategy.unique_key -%}
snapshotted_data.{{ key }} as dbt_unique_key_{{ loop.index }},
{% endfor -%}
{%- else -%}
snapshotted_data.dbt_unique_key as dbt_unique_key,
{% endif -%}
{{ snapshot_get_time() }} as {{ columns.dbt_valid_from }},
{{ snapshot_get_time() }} as {{ columns.dbt_updated_at }},
snapshotted_data.{{ columns.dbt_valid_to }} as {{ columns.dbt_valid_to }},
{{ new_scd_id }} as {{ columns.dbt_scd_id }},
'True' as {{ columns.dbt_is_deleted }}
from snapshotted_data
left join deletes_source_data source_data
on {{ unique_key_join_on(strategy.unique_key, "snapshotted_data", "source_data") }}
where {{ unique_key_is_null(strategy.unique_key, "source_data") }}
and not (
--avoid inserting a new record if the latest one is marked as deleted
snapshotted_data.{{ columns.dbt_is_deleted }} = 'True'
and
{% if config.get('dbt_valid_to_current') %}
{% set source_unique_key = columns.dbt_valid_to | trim %}
{% set target_unique_key = config.get('dbt_valid_to_current') | trim %}
( {{ equals(source_unique_key, target_unique_key) }} or {{ source_unique_key }} is null )
{% else %}
{{ columns.dbt_valid_to }} is null
{% endif %}
)
)
{%- endif %}
select * from insertions
union all
select * from updates
{%- if strategy.hard_deletes == 'invalidate' or strategy.hard_deletes == 'new_record' %}
union all
select * from deletes
{%- endif %}
{%- if strategy.hard_deletes == 'new_record' %}
union all
select * from deletion_records
{%- endif %}
{%- endmacro %}
Based on my testing I think this is the fix needed.