Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor and add config for incremental model #32

Closed
wants to merge 21 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,19 @@ By default, `dbt-vertica` will request `ConnectionLoadBalance=true` (which is ge
There are three options for SSL: `ssl`, `ssl_env_cafile`, and `ssl_uri`.
See their use in the code [here](https://github.com/mpcarter/dbt-vertica/blob/d15f925049dabd2833b4d88304edd216e3f654ed/dbt/adapters/vertica/connections.py#L72-L87).

## Sample Incremental Model Configuration

```sql
{{
config(
materialized = 'incremental',
unique_key = ['your-first-id', 'your-second-id'],
incremental_strategy = 'merge',
merge_update_columns = ['column-to-update']
)
}}
```

## Reach out!

First off, I would not have been able to make this adapater if the smart folks at dbt labs didn't make it so easy. That said, it seems every database has its own little quirks. I ran into several different issues when adapting the macros to Vertica. If you find something not working right, please open an issue (assuming it has to do with the adapter and not dbt itself).
Expand Down
6 changes: 4 additions & 2 deletions dbt/adapters/vertica/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class verticaCredentials(Credentials):
withMaterialization: bool = False
ssl_env_cafile: Optional[str] = None
ssl_uri: Optional[str] = None
connection_load_balance: bool = True

@property
def type(self):
Expand All @@ -45,7 +46,7 @@ def unique_field(self):

def _connection_keys(self):
# return an iterator of keys to pretty-print in 'dbt debug'
return ('host','port','database','username','schema')
return ('host','port','database','username','schema', 'connection_load_balance')


class verticaConnectionManager(SQLConnectionManager):
Expand All @@ -67,7 +68,7 @@ def open(cls, connection):
'password': credentials.password,
'database': credentials.database,
'connection_timeout': credentials.timeout,
'connection_load_balance': True,
'connection_load_balance': credentials.connection_load_balance,
'session_label': f'dbt_{credentials.username}',
}
# if credentials.ssl.lower() in {'true', 'yes', 'please'}:
Expand Down Expand Up @@ -120,6 +121,7 @@ def open(cls, connection):

@classmethod
def get_response(cls, cursor):

code = cursor.description
rows = cursor.rowcount
message = cursor._message
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,71 @@

{% set invalid_strategy_msg -%}
Invalid incremental strategy provided: {{ strategy }}
Expected one of: 'merge', 'delete+insert'
Expected one of: 'merge', 'delete+insert', 'insert+overwrite'
{%- endset %}
{% if strategy not in ['merge', 'delete+insert'] %}
{% if strategy not in ['merge', 'delete+insert', 'insert+overwrite'] %}
{% do exceptions.raise_compiler_error(invalid_strategy_msg) %}
{% endif %}

{% do return(strategy) %}
{% endmacro %}

{% macro get_qouted_csv_with_prefix(prefix, column_names) %}
{% set quoted = [] %}

{% for col in column_names -%}
{%- do quoted.append(prefix + "." + adapter.quote(col)) -%}
{%- endfor %}

{%- set dest_cols_csv = quoted | join(', ') -%}

{{ return(dest_cols_csv) }}
{% endmacro %}

{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %}
{% if strategy == 'merge' %}
{% do return(vertica__get_merge_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'delete+insert' %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns)) %}
{% do return(vertica__get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'insert+overwrite' %}
{% do return(vertica__get_insert_overwrite_merge_sql(target_relation, tmp_relation, dest_columns)) %}
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}
{% endmacro %}

{% macro vertica__get_table_in_relation(relation) -%}
{% call statement('get_columns_in_relation', fetch_result=True) %}
select
column_name
, data_type
, character_maximum_length
, numeric_precision
, numeric_scale
from (
select
column_name
, data_type
, character_maximum_length
, numeric_precision
, numeric_scale
, ordinal_position
from v_catalog.columns
where table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
union all
select
column_name
, data_type
, character_maximum_length
, numeric_precision
, numeric_scale
, ordinal_position
from v_catalog.view_columns
where table_schema = '{{ relation.schema }}'
and table_name = '{{ relation.identifier }}'
) t
order by ordinal_position
{% endcall %}
{{ return(load_result('get_columns_in_relation').table) }}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
{% do to_drop.append(backup_relation) %}
{% else %}
{% set tmp_relation = make_temp_relation(target_relation) %}
{% do run_query(vertica__create_table_as(True, tmp_relation, sql)) %}
{% do adapter.expand_target_column_types(
from_relation=tmp_relation,
to_relation=target_relation) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
{% macro vertica__get_merge_sql(target_relation, tmp_relation, dest_columns) %}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_columns = config.get("merge_columns", default=None)%}
{%- set complex_type = config.get('include_complex_type', default = False) -%}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set unique_key = config.get("unique_key", default = dest_columns | map(attribute="name")) -%}
{%- set merge_update_columns = config.get('merge_update_columns', default = dest_columns | map(attribute="name")) -%}

{%- if complex_type %}
{{vertica__create_table_from_relation(True, tmp_relation, target_relation, dest_columns, sql)}}
{% else %}
{{vertica__create_table_as(True, tmp_relation, sql)}}
{% endif %}

merge into {{ target_relation }} as DBT_INTERNAL_DEST
using {{ tmp_relation }} as DBT_INTERNAL_SOURCE

{#-- Test 1, find the provided merge columns #}
{% if merge_columns %}
on
{% for column in merge_columns %}
DBT_INTERNAL_DEST.{{ adapter.quote(column) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column) }}
{%- if not loop.last %} AND {% endif %}
{%- endfor %}
{#-- Test 2, use all columns in the destination table #}
{% else %}
on
{% for column in dest_columns -%}
DBT_INTERNAL_DEST.{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
{%- if not loop.last %} AND {% endif %}
{%- endfor %}
on HASH( {{ get_qouted_csv_with_prefix("DBT_INTERNAL_DEST", unique_key) }} ) = HASH({{ get_qouted_csv_with_prefix("DBT_INTERNAL_SOURCE", unique_key) }})

{%- if merge_update_columns is string %}
{%- set merge_update_columns = merge_update_columns.split(',') %}
{% endif %}

when matched then update set
{% for column in dest_columns -%}
{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
{%- if not loop.last %}, {% endif %}
{%- endfor %}
{%- if merge_update_columns | length > 0 %}
when matched then update set
{% for column_name in merge_update_columns -%}
{{ adapter.quote(column_name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column_name) }}
{%- if not loop.last %}, {% endif %}
{%- endfor %}
{%- endif %}

when not matched then insert
({{ dest_columns_csv }})
Expand All @@ -39,10 +39,67 @@
{%- endmacro %}


{# No need to implement get_delete_insert_merge_sql(). Syntax supported by default. #}
{% macro vertica__get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns) -%}
{%- set complex_type = config.get('include_complex_type') -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set unique_key = config.get('unique_key', default = dest_columns | map(attribute="name")) -%}
{%- set unique_key_columns_csv = get_quoted_csv(unique_key) -%}

{%- if complex_type %}
{{ vertica__create_table_from_relation(True, tmp_relation, target_relation, dest_columns, sql) }}
{% else %}
{{ vertica__create_table_as(True, tmp_relation, sql) }}
{% endif %}


{% if unique_key is sequence and unique_key is not string %}
delete from {{ target_relation }}
where (
HASH( {{ unique_key_columns_csv }} ) in (
select HASH({{ unique_key_columns_csv }})
from {{ tmp_relation }} )
);
{% else %}
delete from {{ target_relation }}
where (
{{ unique_key }}) in (
select {{ unique_key }}
from {{ tmp_relation }}
);

{% endif %}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
)

{%- endmacro %}

{% macro vertica__get_insert_overwrite_merge_sql(target_relation, tmp_relation, dest_columns) -%}
{%- set complex_type = config.get('include_complex_type') -%}
{%- set partitions = config.get('partitions') -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{%- if complex_type %}
{{ vertica__create_table_from_relation(True, tmp_relation, target_relation, dest_columns, sql) }}
{% else %}
{{ vertica__create_table_as(True, tmp_relation, sql) }}
{% endif %}


{% for partition in partitions %}
SELECT DROP_PARTITIONS('{{ target_relation.schema }}.{{ target_relation.table }}', '{{ partition }}', '{{ partition }}');
SELECT PURGE_PARTITION('{{ target_relation.schema }}.{{ target_relation.table }}', '{{ partition }}');
{% endfor %}

insert into {{ target_relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }}
from {{ tmp_relation }}
)
{% endmacro %}



{% macro vertica__get_insert_overwrite_merge_sql() -%}
{{ exceptions.raise_not_implemented(
'get_insert_overwrite_merge_sql macro not implemented for adapter '+adapter.type()) }}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,29 @@
as (
{{ sql }}
);
{% endmacro %}
{% endmacro %}

{% macro vertica__create_table_from_relation(temporary, relation, target, dest_columns, sql) -%}
{%- set sql_header = config.get('sql_header', none) -%}
{%- set table_schema = vertica__get_table_in_relation(target) -%}
{%- set dest_cols_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}

{{ sql_header if sql_header is not none }}

create {% if temporary: -%}local temporary{%- endif %} table {{ relation }}
(
{% for row in table_schema %}
{{ row.column_name }} {{ row.data_type }}
{%- if not loop.last %}, {% endif %}
{% endfor %}
) on commit preserve rows;

insert into {{ relation }} ({{ dest_cols_csv }})
(
select {{ dest_cols_csv }} from (
{{ sql }}
) as DBT_MASKED_TARGET
);

{% endmacro %}

2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib

package_name = "dbt-vertica"
package_version = "1.0.3.1"
package_version = "1.0.3.1-rc7"
description = """The vertica adapter plugin for dbt (data build tool)"""

HERE = pathlib.Path(__file__).parent
Expand Down