Skip to content

Commit

Permalink
Fix sync_all_columns schema change strategy for incremental models
Browse files Browse the repository at this point in the history
  • Loading branch information
posulliv committed Jul 13, 2023
1 parent b9e3c74 commit 5385ee0
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .changes/unreleased/Fixes-20230713-102021.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Fixes
body: Fix sync_all_columns schema change strategy for incremental models
time: 2023-07-13T10:20:21.52287-04:00
custom:
Author: posulliv
Issue: "326"
PR: "324"
17 changes: 17 additions & 0 deletions dbt/include/trino/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -290,3 +290,20 @@
{{ return({'relations': [target_relation]}) }}
{% endmacro %}
{% macro trino__alter_column_type(relation, column_name, new_column_type) %}
{#
1. Create a new column (w/ temp name and correct type)
2. Copy data over to it
3. Drop the existing column (cascade!)
4. Rename the new column to existing column
#}
{%- set tmp_column = column_name + "__dbt_alter" -%}
{% call statement('alter_column_type') %}
alter table {{ relation }} add column {{ adapter.quote(tmp_column) }} {{ new_column_type }};
update {{ relation }} set {{ adapter.quote(tmp_column) }} = CAST({{ adapter.quote(column_name) }} AS {{ new_column_type }});
alter table {{ relation }} drop column {{ adapter.quote(column_name) }};
alter table {{ relation }} rename column {{ adapter.quote(tmp_column) }} to {{ adapter.quote(column_name) }}
{% endcall %}
{% endmacro %}
64 changes: 64 additions & 0 deletions tests/functional/adapter/materialization/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,18 @@
- name: id
tests:
- unique
- name: incremental_sync_all_columns_diff_data_types
columns:
- name: id
tests:
- unique
- name: incremental_sync_all_columns_diff_data_types_target
columns:
- name: id
tests:
- unique
"""

model_a_sql = """\
Expand Down Expand Up @@ -335,6 +347,50 @@
order by id
"""

incremental_sync_all_columns_diff_data_types_sql = """\
{{
config(
materialized='incremental',
unique_key='id',
on_schema_change='sync_all_columns'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
{% if is_incremental() %}
SELECT id,
cast(id as varchar) "field1" -- to validate data type changes
FROM source_data WHERE id NOT IN (SELECT id from {{ this }} )
{% else %}
select id,
id "field1"
from source_data where id <= 3
order by id
{% endif %}
"""

incremental_sync_all_columns_diff_data_types_target_sql = """\
{{
config(
materialized='table'
)
}}
WITH source_data AS (SELECT * FROM {{ ref('model_a') }} )
select id,
cast(id as varchar) "field1"
from source_data
order by id
"""

select_from_a_sql = "select * from {{ ref('model_a') }} where false"

select_from_incremental_append_new_columns_sql = (
Expand Down Expand Up @@ -366,3 +422,11 @@
select_from_incremental_sync_all_columns_target_sql = (
"select * from {{ ref('incremental_sync_all_columns_target') }} where false"
)

select_from_incremental_sync_all_columns_diff_data_types_sql = (
"select * from {{ ref('incremental_sync_all_columns_diff_data_types') }} where false"
)

select_from_incremental_sync_all_columns_diff_data_types_target_sql = (
"select * from {{ ref('incremental_sync_all_columns_diff_data_types_target') }} where false"
)
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
incremental_fail_sql,
incremental_ignore_sql,
incremental_ignore_target_sql,
incremental_sync_all_columns_diff_data_types_sql,
incremental_sync_all_columns_diff_data_types_target_sql,
incremental_sync_all_columns_sql,
incremental_sync_all_columns_target_sql,
model_a_sql,
Expand All @@ -20,6 +22,8 @@
select_from_incremental_append_new_columns_target_sql,
select_from_incremental_ignore_sql,
select_from_incremental_ignore_target_sql,
select_from_incremental_sync_all_columns_diff_data_types_sql,
select_from_incremental_sync_all_columns_diff_data_types_target_sql,
select_from_incremental_sync_all_columns_sql,
select_from_incremental_sync_all_columns_target_sql,
)
Expand All @@ -45,6 +49,8 @@ def models(self):
"incremental_fail.sql": incremental_fail_sql,
"incremental_sync_all_columns.sql": incremental_sync_all_columns_sql,
"incremental_sync_all_columns_target.sql": incremental_sync_all_columns_target_sql,
"incremental_sync_all_columns_diff_data_types.sql": incremental_sync_all_columns_diff_data_types_sql,
"incremental_sync_all_columns_diff_data_types_target.sql": incremental_sync_all_columns_diff_data_types_target_sql,
"schema.yml": schema_base_yml,
}

Expand All @@ -60,6 +66,8 @@ def tests(self):
"select_from_incremental_ignore_target.sql": select_from_incremental_ignore_target_sql,
"select_from_incremental_sync_all_columns.sql": select_from_incremental_sync_all_columns_sql,
"select_from_incremental_sync_all_columns_target.sql": select_from_incremental_sync_all_columns_target_sql,
"select_from_incremental_sync_all_columns_diff_data_types.sql": select_from_incremental_sync_all_columns_diff_data_types_sql,
"select_from_incremental_sync_all_columns_diff_data_types_target.sql": select_from_incremental_sync_all_columns_diff_data_types_target_sql,
}

def list_tests_and_assert(self, include, exclude, expected_tests):
Expand Down Expand Up @@ -170,6 +178,24 @@ def run_incremental_sync_all_columns(self, project):
project, select, exclude, expected, compare_source, compare_target
)

def run_incremental_sync_all_columns_data_type_change(self, project):
select = "model_a incremental_sync_all_columns_diff_data_types incremental_sync_all_columns_diff_data_types_target"
compare_source = "incremental_sync_all_columns_diff_data_types"
compare_target = "incremental_sync_all_columns_diff_data_types_target"
exclude = None
expected = [
"select_from_a",
"select_from_incremental_sync_all_columns_diff_data_types",
"select_from_incremental_sync_all_columns_diff_data_types_target",
"unique_model_a_id",
"unique_incremental_sync_all_columns_diff_data_types_id",
"unique_incremental_sync_all_columns_diff_data_types_target_id",
]
self.list_tests_and_assert(select, exclude, expected)
self.run_tests_and_assert(
project, select, exclude, expected, compare_source, compare_target
)

def run_incremental_fail_on_schema_change(self, _):
select = "model_a incremental_fail"
run_dbt(["run", "--models", select, "--full-refresh"])
Expand All @@ -186,6 +212,9 @@ def test_run_incremental_append_new_columns(self, project):
def test_run_incremental_sync_all_columns(self, project):
self.run_incremental_sync_all_columns(project)

def test_run_incremental_sync_all_columns_data_type_change(self, project):
self.run_incremental_sync_all_columns_data_type_change(project)

def test_run_incremental_fail_on_schema_change(self, project):
self.run_incremental_fail_on_schema_change(project)

Expand Down Expand Up @@ -215,3 +244,9 @@ def project_config_update(self):
@pytest.mark.xfail(reason="This connector does not support dropping columns")
def test_run_incremental_sync_all_columns(self, project):
super(TestDeltaOnSchemaChange, self).test_run_incremental_sync_all_columns(project)

@pytest.mark.xfail(reason="This connector does not support dropping columns")
def test_run_incremental_sync_all_columns_data_type_change(self, project):
super(
TestDeltaOnSchemaChange, self
).test_run_incremental_sync_all_columns_data_type_change(project)

0 comments on commit 5385ee0

Please sign in to comment.