Skip to content

Commit

Permalink
Support on_schema_change for incremental models
Browse files Browse the repository at this point in the history
  • Loading branch information
mdesmet committed Sep 23, 2022
1 parent 2183091 commit 2f887fd
Show file tree
Hide file tree
Showing 6 changed files with 586 additions and 1 deletion.
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220922-085214.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Support `on_schema_change` on incremental models
time: 2022-09-22T08:52:14.6753+02:00
custom:
Author: mdesmet
Issue: "48"
PR: "134"
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,8 @@ select * from {{ ref('events') }}
{% endif %}
```

Use the `+on_schema_change` property to define how dbt-trino should handle column changes. See [dbt docs](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models#what-if-the-columns-of-my-incremental-model-change).

###### `append` (default)

The default incremental strategy is `append`. `append` only adds the new records based on the condition specified in the `is_incremental()` conditional block.
Expand Down
24 changes: 24 additions & 0 deletions dbt/include/trino/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,27 @@
{{ return('%s') }}
{%- endif -%}
{% endmacro %}


{% macro trino__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

{% for column in add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} add column {{ column.name }} {{ column.data_type }}
{%- endset -%}
{% do run_query(sql) %}
{% endfor %}

{% for column in remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} drop column {{ column.name }}
{%- endset -%}
{% do run_query(sql) %}
{% endfor %}
{% endmacro %}
7 changes: 6 additions & 1 deletion dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{%- set strategy = validate_get_incremental_strategy(config) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{{ run_hooks(pre_hooks) }}

Expand All @@ -138,7 +139,11 @@
{% set drop_tmp_relation_sql = "drop table if exists " ~ tmp_relation %}
{% do run_query(drop_tmp_relation_sql) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}

Expand Down
330 changes: 330 additions & 0 deletions tests/functional/adapter/materialization/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,333 @@
tests:
- not_null
"""


schema_base_yml = """\
version: 2
models:
- name: model_a
columns:
- name: id
tests:
- unique
- name: incremental_ignore
columns:
- name: id
tests:
- unique
- name: incremental_ignore_target
columns:
- name: id
tests:
- unique
- name: incremental_append_new_columns
columns:
- name: id
tests:
- unique
- name: incremental_append_new_columns_target
columns:
- name: id
tests:
- unique
- name: incremental_append_new_columns_remove_one
columns:
- name: id
tests:
- unique
- name: incremental_append_new_columns_remove_one_target
columns:
- name: id
tests:
- unique
- name: incremental_sync_all_columns
columns:
- name: id
tests:
- unique
- name: incremental_sync_all_columns_target
columns:
- name: id
tests:
- unique
"""

model_a_sql = """\
{{
config(materialized='table')
}}
with source_data as (
select 1 as id, 'aaa' as field1, 'bbb' as field2, 111 as field3, 'TTT' as field4
union all select 2 as id, 'ccc' as field1, 'ddd' as field2, 222 as field3, 'UUU' as field4
union all select 3 as id, 'eee' as field1, 'fff' as field2, 333 as field3, 'VVV' as field4
union all select 4 as id, 'ggg' as field1, 'hhh' as field2, 444 as field3, 'WWW' as field4
union all select 5 as id, 'iii' as field1, 'jjj' as field2, 555 as field3, 'XXX' as field4
union all select 6 as id, 'kkk' as field1, 'lll' as field2, 666 as field3, 'YYY' as field4
)
select id
,field1
,field2
,field3
,field4
from source_data
"""

incremental_ignore_sql = """\
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='ignore'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id, field1, field2, field3, field4 FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
SELECT id, field1, field2 FROM source_data LIMIT 3
{% endif %}
"""

incremental_ignore_target_sql = """\
{{
config(materialized='table')
}}
with source_data as (
select * from {{ ref('model_a') }}
)
select id
,field1
,field2
from source_data
"""

incremental_append_new_columns = """\
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id,
cast(field1 as varchar) as field1,
cast(field2 as varchar) as field2,
cast(field3 as varchar) as field3,
cast(field4 as varchar) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
SELECT id,
cast(field1 as varchar) as field1,
cast(field2 as varchar) as field2
FROM source_data where id <= 3
{% endif %}
"""

incremental_append_new_columns_target_sql = """\
{{
config(materialized='table')
}}
with source_data as (
select * from {{ ref('model_a') }}
)
select id
,cast(field1 as varchar) as field1
,cast(field2 as varchar) as field2
,cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as varchar) AS field3
,cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as varchar) AS field4
from source_data
"""

incremental_append_new_columns_remove_one_sql = """\
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='append_new_columns'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id,
cast(field1 as varchar) as field1,
cast(field3 as varchar) as field3,
cast(field4 as varchar) as field4
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
SELECT id,
cast(field1 as varchar) as field1,
cast(field2 as varchar) as field2
FROM source_data where id <= 3
{% endif %}
"""

incremental_append_new_columns_remove_one_target_sql = """\
{{
config(materialized='table')
}}
with source_data as (
select * from {{ ref('model_a') }}
)
select id,
cast(field1 as varchar) as field1,
cast(CASE WHEN id > 3 THEN NULL ELSE field2 END as varchar) AS field2,
cast(CASE WHEN id <= 3 THEN NULL ELSE field3 END as varchar) AS field3,
cast(CASE WHEN id <= 3 THEN NULL ELSE field4 END as varchar) AS field4
from source_data
"""


incremental_fail_sql = """\
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='fail'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id, field1, field2 FROM source_data
{% else %}
SELECT id, field1, field3 FROm source_data
{% endif %}
"""

incremental_sync_all_columns_sql = """\
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='sync_all_columns'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id,
cast(field1 as varchar) as field1,
cast(field3 as varchar) as field3, -- to validate new fields
cast(field4 as varchar) AS field4 -- to validate new fields
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
select id,
cast(field1 as varchar) as field1,
cast(field2 as varchar) as field2
from source_data where id <= 3
{% endif %}
"""

incremental_sync_all_columns_target_sql = """\
{{
config(materialized='table')
}}
with source_data as (
select * from {{ ref('model_a') }}
)
select id
,cast(field1 as varchar) as field1
--,field2
,cast(case when id <= 3 then null else field3 end as varchar) as field3
,cast(case when id <= 3 then null else field4 end as varchar) as field4
from source_data
order by id
"""

select_from_a_sql = "select * from {{ ref('model_a') }} where false"

select_from_incremental_append_new_columns_sql = (
"select * from {{ ref('incremental_append_new_columns') }} where false"
)

select_from_incremental_append_new_columns_remove_one_sql = (
"select * from {{ ref('incremental_append_new_columns_remove_one') }} where false"
)

select_from_incremental_append_new_columns_remove_one_target_sql = (
"select * from {{ ref('incremental_append_new_columns_remove_one_target') }} where false"
)

select_from_incremental_append_new_columns_target_sql = (
"select * from {{ ref('incremental_append_new_columns_target') }} where false"
)

select_from_incremental_ignore_sql = "select * from {{ ref('incremental_ignore') }} where false"

select_from_incremental_ignore_target_sql = (
"select * from {{ ref('incremental_ignore_target') }} where false"
)

select_from_incremental_sync_all_columns_sql = (
"select * from {{ ref('incremental_sync_all_columns') }} where false"
)

select_from_incremental_sync_all_columns_target_sql = (
"select * from {{ ref('incremental_sync_all_columns_target') }} where false"
)
Loading

0 comments on commit 2f887fd

Please sign in to comment.