Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support on_schema_change for incremental models #136

Merged
merged 2 commits into from
Sep 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changes/unreleased/Features-20220922-085214.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
kind: Features
body: Support `on_schema_change` on incremental models
time: 2022-09-22T08:52:14.6753+02:00
custom:
Author: mdesmet
Issue: "48"
PR: "134"
14 changes: 2 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,18 +122,6 @@ on your Trino instance.

See also: https://trino.io/docs/current/security/authentication-types.html

#### Required configuration

dbt fundamentally works by dropping and creating tables and views in databases.
As such, the following Trino configs must be set for dbt to work properly on Trino:

```properties
hive.metastore-cache-ttl=0s
hive.metastore-refresh-interval = 5s
hive.allow-drop-table=true
hive.allow-rename-table=true
```

#### Session properties per model

In some specific cases, there may be needed tuning through the Trino session properties only
Expand Down Expand Up @@ -231,6 +219,8 @@ select * from {{ ref('events') }}
{% endif %}
```

Use the `+on_schema_change` property to define how dbt-trino should handle column changes. See [dbt docs](https://docs.getdbt.com/docs/building-a-dbt-project/building-models/configuring-incremental-models#what-if-the-columns-of-my-incremental-model-change).

###### `append` (default)

The default incremental strategy is `append`. `append` only adds the new records based on the condition specified in the `is_incremental()` conditional block.
Expand Down
24 changes: 24 additions & 0 deletions dbt/include/trino/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,27 @@
{{ return('%s') }}
{%- endif -%}
{% endmacro %}


{% macro trino__alter_relation_add_remove_columns(relation, add_columns, remove_columns) %}
{% if add_columns is none %}
{% set add_columns = [] %}
{% endif %}
{% if remove_columns is none %}
{% set remove_columns = [] %}
{% endif %}

{% for column in add_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} add column {{ column.name }} {{ column.data_type }}
{%- endset -%}
{% do run_query(sql) %}
{% endfor %}

{% for column in remove_columns %}
{% set sql -%}
alter {{ relation.type }} {{ relation }} drop column {{ column.name }}
{%- endset -%}
{% do run_query(sql) %}
{% endfor %}
{% endmacro %}
7 changes: 6 additions & 1 deletion dbt/include/trino/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{%- set strategy = validate_get_incremental_strategy(config) -%}
{%- set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') -%}

{{ run_hooks(pre_hooks) }}

Expand All @@ -138,7 +139,11 @@
{% set drop_tmp_relation_sql = "drop table if exists " ~ tmp_relation %}
{% do run_query(drop_tmp_relation_sql) %}
{% do run_query(create_table_as(True, tmp_relation, sql)) %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{#-- Process schema changes. Returns dict of changes if successful. Use source columns for upserting/merging --#}
{% set dest_columns = process_schema_changes(on_schema_change, tmp_relation, existing_relation) %}
{% if not dest_columns %}
{% set dest_columns = adapter.get_columns_in_relation(existing_relation) %}
{% endif %}
{% set build_sql = get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% endif %}

Expand Down
Loading