diff --git a/README.md b/README.md index 431eed5..07554f8 100644 --- a/README.md +++ b/README.md @@ -6,9 +6,20 @@ Uses [vertica-python](https://github.com/vertica/vertica-python) to connect to V ## Changes +### 1.0.1 + +- Fixed the Incremental method implementation (was buggy/incomplete) + - Removed the `unique_id` as it wasn't implemented + - Fixed when no fields were added - full table merge +- Added testing for Incremental materialization + - Testing for dbt Incremental full table + - Testing for dbt Incremental specified merged columns +- Added more logging to the connector to help understand why tests were failing +- Using the official [Vertica CE 11.0.x docker image](https://hub.docker.com/r/vertica/vertica-ce) now for tests + ### 1.0.0 -- Add support for DBT version 1. +- Add support for DBT version 1.0.0 ### 0.21.1 @@ -65,7 +76,16 @@ Also, I would be excited to hear about anyone who is able to benefit from using Run a local Vertica instance like: - docker run -p 5433:5433 jbfavre/vertica:9.2.0-7_centos-7 + docker run -p 5433:5433 \ + -p 5444:5444 \ + -e VERTICA_DB_NAME=docker \ + -e VMART_ETL_SCRIPT="" \ + -e VMART_ETL_SQL="" \ + vertica/vertica-ce + +Access the local Vertica instance like: + + docker exec -it /opt/vertica/bin/vsql You need the pytest dbt adapter: diff --git a/dbt/adapters/vertica/connections.py b/dbt/adapters/vertica/connections.py index e624f19..58f8740 100644 --- a/dbt/adapters/vertica/connections.py +++ b/dbt/adapters/vertica/connections.py @@ -122,11 +122,14 @@ def open(cls, connection): def get_response(cls, cursor): code = cursor.description rows = cursor.rowcount + message = cursor._message + arraysize = cursor.arraysize + operation = cursor.operation return AdapterResponse( - _message="{} {}".format(code, rows), + _message="Operation: {}, Message: {}, Code: {}, Rows: {}, Arraysize: {}".format(operation, message, str(code), rows, arraysize), rows_affected=rows, - code=code + code=str(code) ) def cancel(self, connection): diff --git a/dbt/include/vertica/macros/materializations/incremental.sql b/dbt/include/vertica/macros/materializations/incremental.sql index fb2b55a..e0caa8a 100644 --- a/dbt/include/vertica/macros/materializations/incremental.sql +++ b/dbt/include/vertica/macros/materializations/incremental.sql @@ -14,43 +14,47 @@ {% do return(strategy) %} {% endmacro %} -{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} +{% macro vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %} {% if strategy == 'merge' %} - {% do return(vertica__get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %} + {% do return(vertica__get_merge_sql(target_relation, tmp_relation, dest_columns)) %} {% elif strategy == 'delete+insert' %} - {% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, unique_key, dest_columns)) %} + {% do return(get_delete_insert_merge_sql(target_relation, tmp_relation, dest_columns)) %} {% else %} {% do exceptions.raise_compiler_error('invalid strategy: ' ~ strategy) %} {% endif %} {% endmacro %} -{% macro vertica__get_merge_sql(target_relation, tmp_relation, unique_key, dest_columns) %} +{% macro vertica__get_merge_sql(target_relation, tmp_relation, dest_columns) %} {%- set dest_columns_csv = get_quoted_csv(dest_columns | map(attribute="name")) -%} {%- set merge_columns = config.get("merge_columns", default=None)%} merge into {{ target_relation }} as DBT_INTERNAL_DEST using {{ tmp_relation }} as DBT_INTERNAL_SOURCE - {% if unique_key %} - on DBT_INTERNAL_DEST.{{ unique_key }} = DBT_INTERNAL_SOURCE.{{ unique_key }} - {% elif merge_columns %} + {#-- Test 1, find the provided merge columns #} + {% if merge_columns %} on {% for column in merge_columns %} DBT_INTERNAL_DEST.{{ adapter.quote(column) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column) }} - {%- if not loop.last %} AND {%- endif %} + {%- if not loop.last %} AND {% endif %} {%- endfor %} + + {#-- Test 2, use all columns in the destination table #} {% else %} - on FALSE + on + {% for column in dest_columns -%} + DBT_INTERNAL_DEST.{{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} + {%- if not loop.last %} AND {% endif %} + {%- endfor %} + {% endif %} - {% if unique_key %} when matched then update set - {% for column in dest_columns -%} - {{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} - {%- if not loop.last %}, {%- endif %} - {%- endfor %} - {% endif %} + {% for column in dest_columns -%} + {{ adapter.quote(column.name) }} = DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} + {%- if not loop.last %}, {% endif %} + {%- endfor %} when not matched then insert ({{ dest_columns_csv }}) @@ -58,7 +62,7 @@ ( {% for column in dest_columns -%} DBT_INTERNAL_SOURCE.{{ adapter.quote(column.name) }} - {%- if not loop.last %}, {%- endif %} + {%- if not loop.last %}, {% endif %} {%- endfor %} ) @@ -66,7 +70,6 @@ {% materialization incremental, adapter='vertica' %} - {% set unique_key = config.get('unique_key') %} {% set full_refresh_mode = flags.FULL_REFRESH %} {% set target_relation = this %} @@ -101,7 +104,7 @@ from_relation=tmp_relation, to_relation=target_relation) %} {% set dest_columns = adapter.get_columns_in_relation(target_relation) %} - {% set build_sql = vertica__get_incremental_sql(strategy, tmp_relation, target_relation, unique_key, dest_columns) %} + {% set build_sql = vertica__get_incremental_sql(strategy, tmp_relation, target_relation, dest_columns) %} {% endif %} {% call statement("main") %} diff --git a/setup.py b/setup.py index 7988ee3..907d52b 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ import pathlib package_name = "dbt-vertica" -package_version = "1.0.0" +package_version = "1.0.1" description = """The vertica adapter plugin for dbt (data build tool)""" HERE = pathlib.Path(__file__).parent diff --git a/tests/integration.dbtspec b/tests/integration.dbtspec index 8119577..932b730 100644 --- a/tests/integration.dbtspec +++ b/tests/integration.dbtspec @@ -7,13 +7,122 @@ target: schema: "dbt_test_{{ var('_dbt_random_suffix') }}" port: 5433 threads: 1 + +projects: + - overrides: incremental + paths: + models/incremental.sql: + materialized: incremental + # Wrap in a CTE due to added seed name varchar being size 9 and seed base name varchar being size 8 + body: | + with incremental_data as + ( + select id, name::varchar(9) as name, some_date from {{ source('raw', 'seed') }} + {% if is_incremental() %} + where id > (select max(id) from {{ this }}) + {% endif %} + ) + select * from incremental_data + - name: test_vertica_dbt_incremental_merge_columns + paths: + seeds/base.csv: files.seeds.base + seeds/added.csv: files.seeds.added + models/schema.yml: files.schemas.base + models/incremental.sql: + materialized: incremental + # Specifying which column to use, in this case, we will use the id + # Wrap in a CTE due to added seed name varchar being size 9 and seed base name varchar being size 8 + body: | + {{ + config( + materialized = 'incremental', + incremental_strategy = 'merge', + merge_columns = [ 'id', 'name' ] + ) + }} + with incremental_data as + ( + select id, name::varchar(9) as name, some_date from {{ source('raw', 'seed') }} + {% if is_incremental() %} + where id > (select max(id) from {{ this }}) + {% endif %} + ) + select * from incremental_data + facts: + seed: + length: 2 + names: + - base + - added + run: + length: 1 + names: + - incremental + catalog: + nodes: + length: 3 + sources: + length: 1 + persisted_relations: + - base + - added + - incremental + base: + rowcount: 10 + added: + rowcount: 20 + sequences: test_dbt_empty: empty test_dbt_base: base test_dbt_ephemeral: ephemeral - # test_dbt_incremental: incremental + test_dbt_incremental: incremental # test_dbt_snapshot_strategy_timestamp: snapshot_strategy_timestamp # test_dbt_snapshot_strategy_check_cols: snapshot_strategy_check_cols test_dbt_data_test: data_test test_dbt_schema_test: schema_test - test_dbt_ephemeral_data_tests: data_test_ephemeral_models \ No newline at end of file + test_dbt_ephemeral_data_tests: data_test_ephemeral_models + + # Additional Vertica tests below + # Test the incremental merge based upon specific columns + test_vertica_dbt_incremental_merge_columns: + project: test_vertica_dbt_incremental_merge_columns + sequence: + - type: dbt + cmd: seed + - type: run_results + length: fact.seed.length + - type: dbt + cmd: run + vars: + seed_name: base + - type: relation_rows + name: base + length: fact.base.rowcount + - type: run_results + length: fact.run.length + - type: relations_equal + relations: + - base + - incremental + - type: dbt + cmd: run + vars: + seed_name: added + - type: relation_rows + name: added + length: fact.added.rowcount + - type: run_results + length: fact.run.length + - type: relations_equal + relations: + - added + - incremental + - type: dbt + cmd: docs generate + - type: catalog + exists: True + nodes: + length: fact.catalog.nodes.length + sources: + length: fact.catalog.sources.length \ No newline at end of file