Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

37 python models in dbt exasol #59

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions dbt/adapters/exasol/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, Optional

import agate
from dbt.adapters.base.meta import available
from dbt.adapters.sql import SQLAdapter
from dbt.exceptions import raise_compiler_error
from dbt.utils import filter_null_values
Expand Down Expand Up @@ -79,3 +80,38 @@ def quote_seed_column(self, column: str, quote_config: Optional[bool]) -> str:
if quote_columns:
return self.quote(column)
return column

@available.parse_none
def submit_python_job(self, parsed_model: dict, compiled_code: str):
schema = getattr(parsed_model, "schema", self.config.credentials.schema)
identifier = parsed_model["alias"]
proc_name = f"{schema}.{identifier}__dbt_sp"
packages = parsed_model["config"].get("packages", [])
packages = "', '".join(packages)
target_name = f"{schema}.{identifier}"
python_stored_procedure = f"""
create or replace python3 scalar script {proc_name}()
emits (response varchar(200)) as

def load_table(table_name):
pass

{compiled_code}

def run(ctx):
ctx.emit(main(ctx))

/
"""
self.execute(python_stored_procedure, auto_begin=False, fetch=False)
response, _ = self.execute(
f"create table {target_name} as select {proc_name}()",
auto_begin=False,
fetch=False,
)
self.execute(
f"drop script if exists {proc_name}",
auto_begin=False,
fetch=False,
)
return response
19 changes: 12 additions & 7 deletions dbt/include/exasol/macros/adapters.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

/*
/*
LIST_RELATIONS_MACRO_NAME = 'list_relations_without_caching'
GET_COLUMNS_IN_RELATION_MACRO_NAME = 'get_columns_in_relation'
LIST_SCHEMAS_MACRO_NAME = 'list_schemas'
Expand Down Expand Up @@ -45,7 +44,7 @@ ALTER_COLUMN_TYPE_MACRO_NAME = 'alter_column_type'

{% macro exasol__drop_schema(relation) -%}
{% call statement('drop_schema') -%}
drop schema if exists {{relation}} cascade
drop schema if exists {{ relation }} cascade
{% endcall %}
{% endmacro %}

Expand Down Expand Up @@ -80,9 +79,15 @@ AS
{%- endcall %}
{% endmacro %}

{% macro exasol__create_table_as(temporary, relation, sql) -%}
CREATE OR REPLACE TABLE {{ relation.schema }}.{{ relation.identifier }} AS
{{ sql }}
{% macro exasol__create_table_as(temporary, relation, sql, language='sql') -%}
{%- if language == 'sql' -%}
CREATE OR REPLACE TABLE {{ relation.schema }}.{{ relation.identifier }} AS
{{ sql }}
{%- elif language == 'python' -%}
{{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }}
{%- else -%}
{% do exceptions.raise_compiler_error("exasol__create_table_as macro didn't get supported language, it got %s" % language) %}
{%- endif -%}
{% endmacro %}

{% macro exasol__current_timestamp() -%}
Expand Down Expand Up @@ -183,4 +188,4 @@ COMMENT IS '{{ model.description }}'
{% call statement('alter_column_type') %}
alter table {{ relation }} modify column {{ adapter.quote(column_name) }} {{ new_column_type }};
{% endcall %}
{% endmacro %}
{% endmacro %}
92 changes: 29 additions & 63 deletions dbt/include/exasol/macros/materializations/table.sql
Original file line number Diff line number Diff line change
@@ -1,81 +1,47 @@
{% materialization table, adapter='exasol' %}
{% materialization table, adapter='exasol', supported_languages=['sql', 'python']%}

{%- set identifier = model['alias'] -%}
{%- set tmp_identifier = model['name'] + '__dbt_tmp' -%}
{%- set backup_identifier = model['name'] + '__dbt_backup' -%}
{%- set language = model['language'] -%}

{% set grant_config = config.get('grants') %}

{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{%- set intermediate_relation = api.Relation.create(identifier=tmp_identifier,
schema=schema,
database=database, type='table') -%}

/*
See ../view/view.sql for more information about this relation.
*/

-- drop the backup relation if it exists, then make a new one that uses the old relation's type
{%- set backup_relation = adapter.get_relation(database=database, schema=schema, identifier=backup_identifier) -%}

{% if backup_relation is not none -%}
{{ adapter.drop_relation(backup_relation) }}
{%- endif %}
{%- set backup_relation_type = 'table' if old_relation is none else old_relation.type -%}
{%- set backup_relation = api.Relation.create(identifier=backup_identifier,
schema=schema,
database=database,
type=backup_relation_type) -%}

{%- set exists_as_table = (old_relation is not none and old_relation.is_table) -%}
{%- set exists_as_view = (old_relation is not none and old_relation.is_view) -%}

-- grab current tables grants config for comparision later on
{%- set grant_config = config.get('grants') -%}

-- drop the temp relations if they exists for some reason
{{ adapter.drop_relation(intermediate_relation) }}
{{ run_hooks(pre_hooks) }}

{{ run_hooks(pre_hooks, inside_transaction=False) }}

-- `BEGIN` happens here:
{{ run_hooks(pre_hooks, inside_transaction=True) }}
{#-- Drop the relation if it was a view to "convert" it in a table. This may lead to
-- downtime, but it should be a relatively infrequent occurrence #}
{% if old_relation is not none and not old_relation.is_table %}
{{ log("Dropping relation " ~ old_relation ~ " because it is of type " ~ old_relation.type) }}
{{ drop_relation_if_exists(old_relation) }}
{% endif %}

-- build model
{% call statement('main') -%}
{{ create_table_as(False, intermediate_relation, sql) }}
{% call statement('main', language=language) -%}
{{ create_table_as(False, target_relation, compiled_code, language) }}
{%- endcall %}

-- cleanup
{% if old_relation is not none %}
{% if old_relation.type == 'view' %}
{#-- This is the primary difference between Snowflake and Redshift. Renaming this view
-- would cause an error if the view has become invalid due to upstream schema changes #}
{{ log("Dropping relation " ~ old_relation ~ " because it is a view and this model is a table.") }}
{{ drop_relation_if_exists(old_relation) }}
{% else %}
{{ adapter.rename_relation(target_relation, backup_relation) }}
{% endif %}
{% endif %}

{{ drop_relation_if_exists(target_relation) }}
{{ adapter.rename_relation(intermediate_relation, target_relation) }}
{{ run_hooks(post_hooks) }}

{{ run_hooks(post_hooks, inside_transaction=True) }}
{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=full_refresh_mode) %}
{% do apply_grants(target_relation, grant_config, should_revoke) %}
{% do persist_docs(target_relation, model) %}

-- `COMMIT` happens here
{{ adapter.commit() }}
{{ return({'relations': [target_relation]}) }}

-- finally, drop the existing/backup relation after the commit
{{ drop_relation_if_exists(backup_relation) }}
{{ drop_relation_if_exists(intermediate_relation) }}
{% endmaterialization %}

{{ run_hooks(post_hooks, inside_transaction=False) }}
{% macro py_write_table(compiled_code, target_relation, temporary=False) %}
{{ compiled_code }}

{{ persist_docs(target_relation, model) }}
def main(session):
dbt = dbtObj()
return model(dbt, session)
{% endmacro %}

{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}
{% macro py_script_comment()%}
# df = model(dbt, session)
{%endmacro%}
Loading