Skip to content

Commit

Permalink
Incremental Tests and Fixes (#13)
Browse files Browse the repository at this point in the history
* Added incremental tests (full table, and selected columns)
* Fixed a bug in incremental with unique_id not being implemented
* Support Vertica 11.0.x docker image in testing
  • Loading branch information
arosychuk committed Feb 22, 2022
1 parent c505f52 commit 331ac29
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 25 deletions.
24 changes: 22 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,20 @@ Uses [vertica-python](https://github.com/vertica/vertica-python) to connect to V

## Changes

### 1.0.1

- Fixed the Incremental method implementation (was buggy/incomplete)
- Removed the `unique_id` as it wasn't implemented
- Fixed when no fields were added - full table merge
- Added testing for Incremental materialization
- Testing for dbt Incremental full table
- Testing for dbt Incremental specified merged columns
- Added more logging to the connector to help understand why tests were failing
- Using the official [Vertica CE 11.0.x docker image](https://hub.docker.com/r/vertica/vertica-ce) now for tests

### 1.0.0

- Add support for DBT version 1.
- Add support for DBT version 1.0.0

### 0.21.1

Expand Down Expand Up @@ -65,7 +76,16 @@ Also, I would be excited to hear about anyone who is able to benefit from using

Run a local Vertica instance like:

docker run -p 5433:5433 jbfavre/vertica:9.2.0-7_centos-7
docker run -p 5433:5433 \
-p 5444:5444 \
-e VERTICA_DB_NAME=docker \
-e VMART_ETL_SCRIPT="" \
-e VMART_ETL_SQL="" \
vertica/vertica-ce

Access the local Vertica instance like:

docker exec -it <docker_image_name> /opt/vertica/bin/vsql

You need the pytest dbt adapter:

Expand Down
7 changes: 5 additions & 2 deletions dbt/adapters/vertica/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,14 @@ def open(cls, connection):
def get_response(cls, cursor):
code = cursor.description
rows = cursor.rowcount
message = cursor._message
arraysize = cursor.arraysize
operation = cursor.operation

return AdapterResponse(
_message="{} {}".format(code, rows),
_message="Operation: {}, Message: {}, Code: {}, Rows: {}, Arraysize: {}".format(operation, message, str(code), rows, arraysize),
rows_affected=rows,
code=code
code=str(code)
)

def cancel(self, connection):
Expand Down
39 changes: 21 additions & 18 deletions dbt/include/vertica/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,59 +14,62 @@
{% do return(strategy) %}
{% endmacro %}

{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %}
{% if strategy == 'merge' %}
{% do return(vertica__get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% do return(vertica__get_merge_sql(target_relation, tmp_relation, dest_columns)) %}
{% elif strategy == 'delete+insert' %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %}
{% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns)) %}
{% else %}
{% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %}
{% endif %}
{% endmacro %}


{% macro vertica__get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %}
{% macro vertica__get_merge_sql(target_relation, tmp_relation, dest_columns) %}
{%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%}
{%- set merge_columns = config.get("merge_columns", default=None)%}

merge into {{ target_relation }} as DBT_INTERNAL_DEST
using {{ tmp_relation }} as DBT_INTERNAL_SOURCE

{% if unique_key %}
on DBT_INTERNAL_DEST.{{ unique_key }} = DBT_INTERNAL_SOURCE.{{ unique_key }}
{% elif merge_columns %}
{#-- Test 1, find the provided merge columns #}
{% if merge_columns %}
on
{% for column in merge_columns %}
DBT_INTERNAL_DEST.{{ adapter.quote(column) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column) }}
{%- if not loop.last %} AND {%- endif %}
{%- if not loop.last %} AND {% endif %}
{%- endfor %}

{#-- Test 2, use all columns in the destination table #}
{% else %}
on FALSE
on
{% for column in dest_columns -%}
DBT_INTERNAL_DEST.{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
{%- if not loop.last %} AND {% endif %}
{%- endfor %}

{% endif %}

{% if unique_key %}
when matched then update set
{% for column in dest_columns -%}
{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
{%- if not loop.last %}, {%- endif %}
{%- endfor %}
{% endif %}
{% for column in dest_columns -%}
{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
{%- if not loop.last %}, {% endif %}
{%- endfor %}

when not matched then insert
({{ dest_columns_csv }})
values
(
{% for column in dest_columns -%}
DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }}
{%- if not loop.last %}, {%- endif %}
{%- if not loop.last %}, {% endif %}
{%- endfor %}
)

{%- endmacro %}

{% materialization incremental, adapter='vertica' %}

{% set unique_key = config.get('unique_key') %}
{% set full_refresh_mode = flags.FULL_REFRESH %}

{% set target_relation = this %}
Expand Down Expand Up @@ -101,7 +104,7 @@
from_relation=tmp_relation,
to_relation=target_relation) %}
{% set dest_columns = adapter.get_columns_in_relation(target_relation) %}
{% set build_sql = vertica__get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %}
{% set build_sql = vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %}
{% endif %}

{% call statement("main") %}
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pathlib

package_name = "dbt-vertica"
package_version = "1.0.0"
package_version = "1.0.1"
description = """The vertica adapter plugin for dbt (data build tool)"""

HERE = pathlib.Path(__file__).parent
Expand Down
113 changes: 111 additions & 2 deletions tests/integration.dbtspec
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,122 @@ target:
schema: "dbt_test_{{ var('_dbt_random_suffix') }}"
port: 5433
threads: 1

projects:
- overrides: incremental
paths:
models/incremental.sql:
materialized: incremental
# Wrap in a CTE due to added seed name varchar being size 9 and seed base name varchar being size 8
body: |
with incremental_data as
(
select id, name::varchar(9) as name, some_date from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
)
select * from incremental_data
- name: test_vertica_dbt_incremental_merge_columns
paths:
seeds/base.csv: files.seeds.base
seeds/added.csv: files.seeds.added
models/schema.yml: files.schemas.base
models/incremental.sql:
materialized: incremental
# Specifying which column to use, in this case, we will use the id
# Wrap in a CTE due to added seed name varchar being size 9 and seed base name varchar being size 8
body: |
{{
config(
materialized = 'incremental',
incremental_strategy = 'merge',
merge_columns = [ 'id', 'name' ]
)
}}
with incremental_data as
(
select id, name::varchar(9) as name, some_date from {{ source('raw', 'seed') }}
{% if is_incremental() %}
where id > (select max(id) from {{ this }})
{% endif %}
)
select * from incremental_data
facts:
seed:
length: 2
names:
- base
- added
run:
length: 1
names:
- incremental
catalog:
nodes:
length: 3
sources:
length: 1
persisted_relations:
- base
- added
- incremental
base:
rowcount: 10
added:
rowcount: 20

sequences:
test_dbt_empty: empty
test_dbt_base: base
test_dbt_ephemeral: ephemeral
# test_dbt_incremental: incremental
test_dbt_incremental: incremental
# test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp
# test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols
test_dbt_data_test: data_test
test_dbt_schema_test: schema_test
test_dbt_ephemeral_data_tests: data_test_ephemeral_models
test_dbt_ephemeral_data_tests: data_test_ephemeral_models

# Additional Vertica tests below
# Test the incremental merge based upon specific columns
test_vertica_dbt_incremental_merge_columns:
project: test_vertica_dbt_incremental_merge_columns
sequence:
- type: dbt
cmd: seed
- type: run_results
length: fact.seed.length
- type: dbt
cmd: run
vars:
seed_name: base
- type: relation_rows
name: base
length: fact.base.rowcount
- type: run_results
length: fact.run.length
- type: relations_equal
relations:
- base
- incremental
- type: dbt
cmd: run
vars:
seed_name: added
- type: relation_rows
name: added
length: fact.added.rowcount
- type: run_results
length: fact.run.length
- type: relations_equal
relations:
- added
- incremental
- type: dbt
cmd: docs generate
- type: catalog
exists: True
nodes:
length: fact.catalog.nodes.length
sources:
length: fact.catalog.sources.length

0 comments on commit 331ac29

Please sign in to comment.