From 8612b4a0352305cd9355e4824b8df838824bd7b5 Mon Sep 17 00:00:00 2001 From: Abhishek Singh Date: Thu, 1 Jun 2023 14:13:54 -0700 Subject: [PATCH 1/4] Changes for Python models --- .github/workflows/oracle-xe-adapter-tests.yml | 2 +- dbt/adapters/oracle/connections.py | 40 +++++++- dbt/adapters/oracle/impl.py | 61 ++++++++++++ dbt/include/oracle/macros/adapters.sql | 50 +++++----- .../incremental/incremental.sql | 25 +++-- .../materializations/python_model/python.sql | 97 +++++++++++++++++++ .../macros/materializations/table/table.sql | 42 ++++++-- .../macros/materializations/view/view.sql | 9 +- dbt_adbs_test_project/models/sales_cost.sql | 11 +++ .../models/sales_cost_incremental.py | 13 +++ dbt_adbs_test_project/models/test_py_ref.py | 6 ++ .../models/test_py_source.py | 7 ++ dbt_adbs_test_project/profiles.yml | 3 +- requirements.txt | 2 +- requirements_dev.txt | 2 +- setup.cfg | 4 +- setup.py | 4 +- tox.ini | 2 +- 18 files changed, 313 insertions(+), 67 deletions(-) create mode 100644 dbt/include/oracle/macros/materializations/python_model/python.sql create mode 100644 dbt_adbs_test_project/models/sales_cost.sql create mode 100644 dbt_adbs_test_project/models/sales_cost_incremental.py create mode 100644 dbt_adbs_test_project/models/test_py_ref.py create mode 100644 dbt_adbs_test_project/models/test_py_source.py diff --git a/.github/workflows/oracle-xe-adapter-tests.yml b/.github/workflows/oracle-xe-adapter-tests.yml index 5790805..0f9c03b 100644 --- a/.github/workflows/oracle-xe-adapter-tests.yml +++ b/.github/workflows/oracle-xe-adapter-tests.yml @@ -48,7 +48,7 @@ jobs: - name: Install dbt-oracle with core dependencies run: | python -m pip install --upgrade pip - pip install pytest dbt-tests-adapter==1.5.0 + pip install pytest dbt-tests-adapter==1.5.1 pip install -r requirements.txt pip install -e . diff --git a/dbt/adapters/oracle/connections.py b/dbt/adapters/oracle/connections.py index 8921473..1c10c78 100644 --- a/dbt/adapters/oracle/connections.py +++ b/dbt/adapters/oracle/connections.py @@ -25,7 +25,11 @@ from dbt.adapters.base import Credentials from dbt.adapters.sql import SQLConnectionManager from dbt.contracts.connection import AdapterResponse +from dbt.events.functions import fire_event +from dbt.events.types import ConnectionUsed, SQLQuery, SQLCommit, SQLQueryStatus from dbt.events import AdapterLogger +from dbt.events.contextvars import get_node_info +from dbt.utils import cast_to_str from dbt.version import __version__ as dbt_version from dbt.adapters.oracle.connection_helper import oracledb, SQLNET_ORA_CONFIG @@ -105,6 +109,9 @@ class OracleAdapterCredentials(Credentials): retry_count: Optional[int] = 1 retry_delay: Optional[int] = 3 + # Fetch an auth token to run Python UDF + oml_auth_token_uri: Optional[str] = None + _ALIASES = { 'dbname': 'database', @@ -129,7 +136,7 @@ def _connection_keys(self) -> Tuple[str]: 'service', 'connection_string', 'shardingkey', 'supershardingkey', 'cclass', 'purity', 'retry_count', - 'retry_delay' + 'retry_delay', 'oml_auth_token_uri' ) @classmethod @@ -293,8 +300,13 @@ def add_query( if auto_begin and connection.transaction_open is False: self.begin() - logger.debug('Using {} connection "{}".' - .format(self.TYPE, connection.name)) + fire_event( + ConnectionUsed( + conn_type=self.TYPE, + conn_name=cast_to_str(connection.name), + node_info=get_node_info(), + ) + ) with self.exception_handler(sql): if abridge_sql_log: @@ -302,11 +314,22 @@ def add_query( else: log_sql = sql - logger.debug(f'On {connection.name}: f{log_sql}') + fire_event( + SQLQuery( + conn_name=cast_to_str(connection.name), sql=log_sql, node_info=get_node_info() + ) + ) + pre = time.time() cursor = connection.handle.cursor() cursor.execute(sql, bindings) - logger.debug(f"SQL status: {self.get_status(cursor)} in {(time.time() - pre)} seconds") + fire_event( + SQLQueryStatus( + status=str(self.get_response(cursor)), + elapsed=round((time.time() - pre)), + node_info=get_node_info(), + ) + ) return connection, cursor def add_begin_query(self): @@ -317,3 +340,10 @@ def add_begin_query(self): @classmethod def data_type_code_to_name(cls, type_code) -> str: return DATATYPES[type_code.name] + + def commit(self): + connection = self.get_thread_connection() + fire_event(SQLCommit(conn_name=connection.name, node_info=get_node_info())) + self.add_commit_query() + connection.transaction_open = False + return connection diff --git a/dbt/adapters/oracle/impl.py b/dbt/adapters/oracle/impl.py index ebab47b..1572c50 100644 --- a/dbt/adapters/oracle/impl.py +++ b/dbt/adapters/oracle/impl.py @@ -14,6 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. """ +import datetime from typing import ( Optional, List, Set ) @@ -24,6 +25,7 @@ Dict) import agate +import requests import dbt.exceptions from dbt.adapters.base.relation import BaseRelation, InformationSchema @@ -345,3 +347,62 @@ def render_raw_columns_constraints(cls, raw_columns: Dict[str, Dict[str, Any]]) rendered_column_constraints.append(" ".join(rendered_column_constraint)) return rendered_column_constraints + + def get_oml_auth_token(self) -> str: + if self.config.credentials.oml_auth_token_uri is None: + raise dbt.exceptions.DbtRuntimeError("oml_auth_token_uri should be set to run dbt-py models") + data = { + "grant_type": "password", + "username": self.config.credentials.user, + "password": self.config.credentials.password + } + try: + r = requests.post(url=self.config.credentials.oml_auth_token_uri, + json=data) + r.raise_for_status() + except requests.exceptions.RequestException: + raise dbt.exceptions.DbtRuntimeError("Error getting OML OAuth2.0 token") + else: + return r.json()["accessToken"] + + def submit_python_job(self, parsed_model: dict, compiled_code: str): + """Submit user defined Python function + + The function pyqEval when used in Oracle Autonomous Database, + calls a user-defined Python function. + + pyqEval(PAR_LST, OUT_FMT, SRC_NAME, SRC_OWNER, ENV_NAME) + + - PAR_LST -> Parameter List + - OUT_FMT -> JSON clob of the columns + - ENV_NAME -> Name of conda environment + + + """ + identifier = parsed_model["alias"] + oml_oauth_access_token = self.get_oml_auth_token() + py_q_script_name = f"{identifier}_dbt_py_script" + py_q_eval_output_fmt = '{"result":"number"}' + py_q_eval_result_table = f"o$pt_dbt_pyqeval_{identifier}_tmp_{datetime.datetime.utcnow().strftime('%H%M%S')}" + + conda_env_name = parsed_model["config"].get("conda_env_name") + if conda_env_name: + logger.info("Custom python environment is %s", conda_env_name) + py_q_eval_sql = f"""CREATE GLOBAL TEMPORARY TABLE {py_q_eval_result_table} + AS SELECT * FROM TABLE(pyqEval(NULL, ''{py_q_eval_output_fmt}'',''{py_q_script_name}'', NULL, ''{conda_env_name}''))""" + else: + py_q_eval_sql = f"""CREATE GLOBAL TEMPORARY TABLE {py_q_eval_result_table} + AS SELECT * FROM TABLE(pyqEval(NULL, ''{py_q_eval_output_fmt}'',''{py_q_script_name}'', NULL))""" + + py_exec_main_sql = f""" + BEGIN + sys.pyqSetAuthToken('{oml_oauth_access_token}'); + sys.pyqScriptCreate('{py_q_script_name}', '{compiled_code.strip()}', FALSE, TRUE); + EXECUTE IMMEDIATE '{py_q_eval_sql}'; + EXECUTE IMMEDIATE 'DROP TABLE {py_q_eval_result_table}'; + sys.pyqScriptDrop('{py_q_script_name}'); + END; + """ + response, _ = self.execute(sql=py_exec_main_sql) + logger.info(response) + return response diff --git a/dbt/include/oracle/macros/adapters.sql b/dbt/include/oracle/macros/adapters.sql index 924c854..4653eaf 100644 --- a/dbt/include/oracle/macros/adapters.sql +++ b/dbt/include/oracle/macros/adapters.sql @@ -136,29 +136,33 @@ {%- endmacro %} -{% macro oracle__create_table_as(temporary, relation, sql) -%} - {%- set sql_header = config.get('sql_header', none) -%} - {%- set parallel = config.get('parallel', none) -%} - {%- set compression_clause = config.get('table_compression_clause', none) -%} - {%- set contract_config = config.get('contract') -%} - - {{ sql_header if sql_header is not none }} - - create {% if temporary -%} - global temporary - {%- endif %} table {{ relation.include(schema=(not temporary)) }} - {%- if contract_config.enforced -%} - {{ get_assert_columns_equivalent(sql) }} - {{ get_table_columns_and_constraints() }} - {%- set sql = get_select_subquery(sql) %} - {% endif %} - {% if temporary -%} on commit preserve rows {%- endif %} - {% if not temporary -%} - {% if parallel %} parallel {{ parallel }}{% endif %} - {% if compression_clause %} {{ compression_clause }} {% endif %} - {%- endif %} - as - {{ sql }} +{% macro oracle__create_table_as(temporary, relation, sql, language='sql') -%} + {%- if language == 'sql' -%} + {%- set sql_header = config.get('sql_header', none) -%} + {%- set parallel = config.get('parallel', none) -%} + {%- set compression_clause = config.get('table_compression_clause', none) -%} + {%- set contract_config = config.get('contract') -%} + {{ sql_header if sql_header is not none }} + create {% if temporary -%} + global temporary + {%- endif %} table {{ relation.include(schema=(not temporary)) }} + {%- if contract_config.enforced -%} + {{ get_assert_columns_equivalent(sql) }} + {{ get_table_columns_and_constraints() }} + {%- set sql = get_select_subquery(sql) %} + {% endif %} + {% if temporary -%} on commit preserve rows {%- endif %} + {% if not temporary -%} + {% if parallel %} parallel {{ parallel }}{% endif %} + {% if compression_clause %} {{ compression_clause }} {% endif %} + {%- endif %} + as + {{ sql }} +{%- elif language == 'python' -%} + {{ py_write_table(compiled_code=compiled_code, target_relation=relation, temporary=temporary) }} + {%- else -%} + {% do exceptions.raise_compiler_error("oracle__create_table_as macro didn't get supported language, it got %s" % language) %} + {%- endif -%} {%- endmacro %} {% macro oracle__create_view_as(relation, sql) -%} diff --git a/dbt/include/oracle/macros/materializations/incremental/incremental.sql b/dbt/include/oracle/macros/materializations/incremental/incremental.sql index 6aa4e8e..1681437 100644 --- a/dbt/include/oracle/macros/materializations/incremental/incremental.sql +++ b/dbt/include/oracle/macros/materializations/incremental/incremental.sql @@ -14,25 +14,22 @@ See the License for the specific language governing permissions and limitations under the License. #} -{% materialization incremental, adapter='oracle' %} +{% materialization incremental, adapter='oracle', supported_languages=['sql', 'python'] %} {% set unique_key = config.get('unique_key') %} {% set full_refresh_mode = flags.FULL_REFRESH %} - + {%- set language = model['language'] -%} {% set target_relation = this.incorporate(type='table') %} {% set existing_relation = load_relation(this) %} {% set tmp_relation = make_temp_relation(this) %} {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set grant_config = config.get('grants') %} - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} + {{ run_hooks(pre_hooks) }} {% set to_drop = [] %} {% if existing_relation is none %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = create_table_as(False, target_relation, sql, language) %} {% elif existing_relation.is_view or full_refresh_mode %} {#-- Make sure the backup doesn't exist so we don't encounter issues with the rename below #} {% set backup_identifier = existing_relation.identifier ~ "__dbt_backup" %} @@ -43,12 +40,16 @@ {% else %} {% do adapter.rename_relation(existing_relation, backup_relation) %} {% endif %} - {% set build_sql = create_table_as(False, target_relation, sql) %} + {% set build_sql = create_table_as(False, target_relation, sql, language) %} {% do to_drop.append(backup_relation) %} {% else %} {% set tmp_relation = make_temp_relation(target_relation) %} {% do to_drop.append(tmp_relation) %} - {% do run_query(create_table_as(True, tmp_relation, sql)) %} + {% call statement("make_tmp_relation", language=language) %} + {{create_table_as(True, tmp_relation, sql, language)}} + {% endcall %} + {#-- After this language should be SQL --#} + {% set language = 'sql' %} {% do adapter.expand_target_column_types( from_relation=tmp_relation, to_relation=target_relation) %} @@ -66,14 +67,12 @@ {% endif %} - {% call statement("main") %} + {% call statement("main", language=language) %} {{ build_sql }} {% endcall %} {% do persist_docs(target_relation, model) %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - -- `COMMIT` happens here {% do adapter.commit() %} @@ -82,7 +81,7 @@ {% do adapter.drop_relation(rel) %} {% endfor %} - {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ run_hooks(post_hooks) }} {% set should_revoke = should_revoke(existing_relation.is_table, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} diff --git a/dbt/include/oracle/macros/materializations/python_model/python.sql b/dbt/include/oracle/macros/materializations/python_model/python.sql new file mode 100644 index 0000000..45d0a37 --- /dev/null +++ b/dbt/include/oracle/macros/materializations/python_model/python.sql @@ -0,0 +1,97 @@ +{% macro build_ref_function(model) %} + {%- set ref_dict = {} -%} + {%- for _ref in model.refs -%} + {% set _ref_args = [_ref.get('package'), _ref['name']] if _ref.get('package') else [_ref['name'],] %} + {%- set resolved = ref(*_ref_args, v=_ref.get('version')) -%} + {%- if _ref.get('version') -%} + {% do _ref_args.extend(["v" ~ _ref['version']]) %} + {%- endif -%} + {%- do ref_dict.update({_ref_args | join('.'): resolve_model_name(resolved)}) -%} + {%- endfor -%} + + def ref(*args, **kwargs): + refs = {{ ref_dict | tojson }} + key = ".".join(args) + version = kwargs.get("v") or kwargs.get("version") + if version: + key += f".v{version}" + schema, table = refs[key].split(".") + # Use oml.sync(schema=schema, table=table) + dbt_load_df_function = kwargs.get("dbt_load_df_function") + return dbt_load_df_function(schema=schema.upper(), table=table.upper()) + +{% endmacro %} + +{% macro build_source_function(model) %} + + {%- set source_dict = {} -%} + {%- for _source in model.sources -%} + {%- set resolved = source(*_source) -%} + {%- do source_dict.update({_source | join("."): resolve_model_name(resolved)}) -%} + {%- endfor -%} + + def source(*args, dbt_load_df_function): + sources = {{ source_dict | tojson }} + key = ".".join(args) + schema, table = sources[key].split(".") + # Use oml.sync(schema=schema, table=table) + return dbt_load_df_function(schema=schema.upper(), table=table.upper()) + +{% endmacro %} + +{% macro build_config_dict(model) %} + {%- set config_dict = {} -%} + {% set config_dbt_used = zip(model.config.config_keys_used, model.config.config_keys_defaults) | list %} + {%- for key, default in config_dbt_used -%} + {# weird type testing with enum, would be much easier to write this logic in Python! #} + {%- if key == 'language' -%} + {%- set value = 'python' -%} + {%- endif -%} + {%- set value = model.config.get(key, default) -%} + {%- do config_dict.update({key: value}) -%} + {%- endfor -%} + config_dict = {{ config_dict }} +{% endmacro %} + +{% macro py_script_postfix(model) %} +def main(): + import oml + import pandas as pd + {{ build_ref_function(model ) }} + {{ build_source_function(model ) }} + {{ build_config_dict(model) }} + + class config: + def __init__(self, *args, **kwargs): + pass + + @staticmethod + def get(key, default=None): + return config_dict.get(key, default) + + class this: + """dbt.this() or dbt.this.identifier""" + database = "{{ this.database }}" + schema = "{{ this.schema }}" + identifier = "{{ this.identifier }}" + def __repr__(self): + return "{{ this }}" + + + class dbtObj: + def __init__(self, load_df_function) -> None: + self.source = lambda *args: source(*args, dbt_load_df_function=load_df_function) + self.ref = lambda *args: ref(*args, dbt_load_df_function=load_df_function) + self.config = config + self.this = this() + self.is_incremental = {{ is_incremental() }} + + {{ model.raw_code | indent(width=4, first=False, blank=True)}} + + +{{py_script_comment()}} +{% endmacro %} + +{#-- entry point for add instuctions for running compiled_code --#} +{%macro py_script_comment()%} +{%endmacro%} diff --git a/dbt/include/oracle/macros/materializations/table/table.sql b/dbt/include/oracle/macros/materializations/table/table.sql index ddf8283..161a7c3 100644 --- a/dbt/include/oracle/macros/materializations/table/table.sql +++ b/dbt/include/oracle/macros/materializations/table/table.sql @@ -14,9 +14,10 @@ See the License for the specific language governing permissions and limitations under the License. #} -{% materialization table, adapter='oracle' %} +{% materialization table, adapter='oracle', supported_languages=['sql', 'python'] %} {% set identifier = model['alias'] %} {% set grant_config = config.get('grants') %} + {% set language = model['language'] %} {% set tmp_identifier = model['alias'] + '__dbt_tmp' %} {% set backup_identifier = model['alias'] + '__dbt_backup' %} {% set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) %} @@ -56,14 +57,11 @@ {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ run_hooks(pre_hooks, inside_transaction=False) }} - - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} + {{ run_hooks(pre_hooks) }} -- build model - {% call statement('main') %} - {{ create_table_as(False, intermediate_relation, sql) }} + {% call statement('main', language=language) %} + {{ create_table_as(False, intermediate_relation, sql, language) }} {%- endcall %} -- cleanup @@ -79,8 +77,6 @@ {% do create_indexes(target_relation) %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - {% do persist_docs(target_relation, model) %} -- `COMMIT` happens here @@ -89,10 +85,36 @@ -- finally, drop the existing/backup relation after the commit {{ drop_relation_if_exists(backup_relation) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ run_hooks(post_hooks) }} {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} {{ return({'relations': [target_relation]}) }} {% endmaterialization %} + +{% macro py_write_table(compiled_code, target_relation, temporary=False) %} +{{ compiled_code.replace(model.raw_code, "", 1) }} + def materialize(df, table, session): + if isinstance(df, pd.core.frame.DataFrame): + oml.create(df, table=table) + elif isinstance(df, oml.core.frame.DataFrame): + df.materialize(table=table) + + dbt = dbtObj(load_df_function=oml.sync) + final_df = model(dbt, session=oml) + + {{ log("Python model materialization is " ~ model.config.materialized, info=True) }} + {% if model.config.materialized.lower() == 'table' %} + table_name = f"{dbt.this.identifier}__dbt_tmp" + {% else %} + # incremental materialization + {% if temporary %} + table_name = "{{target_relation.identifier}}" + {% else %} + table_name = dbt.this.identifier + {% endif %} + {% endif %} + materialize(final_df, table=table_name.upper(), session=oml) + return pd.DataFrame.from_dict({"result": [1]}) +{% endmacro %} diff --git a/dbt/include/oracle/macros/materializations/view/view.sql b/dbt/include/oracle/macros/materializations/view/view.sql index 1045783..4477f8a 100644 --- a/dbt/include/oracle/macros/materializations/view/view.sql +++ b/dbt/include/oracle/macros/materializations/view/view.sql @@ -45,13 +45,10 @@ schema=schema, database=database) -%} - {{ run_hooks(pre_hooks, inside_transaction=False) }} + {{ run_hooks(pre_hooks) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} - -- `BEGIN` happens here: - {{ run_hooks(pre_hooks, inside_transaction=True) }} - -- if old_relation was a table {% if old_relation is not none and old_relation.type == 'table' %} {{ adapter.rename_relation(old_relation, backup_relation) }} @@ -64,13 +61,11 @@ {% do persist_docs(target_relation, model) %} - {{ run_hooks(post_hooks, inside_transaction=True) }} - {{ adapter.commit() }} {{ drop_relation_if_exists(backup_relation) }} - {{ run_hooks(post_hooks, inside_transaction=False) }} + {{ run_hooks(post_hooks) }} {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} diff --git a/dbt_adbs_test_project/models/sales_cost.sql b/dbt_adbs_test_project/models/sales_cost.sql new file mode 100644 index 0000000..6723111 --- /dev/null +++ b/dbt_adbs_test_project/models/sales_cost.sql @@ -0,0 +1,11 @@ +{{ config(materialized='table')}} +with sales_cost_cte as( + select prod_id, + cast(time_id as TIMESTAMP) as cost_timestamp, + promo_id, + channel_id, + unit_cost, + unit_price + from {{ source('sh_database', 'costs') }} +) +select * from sales_cost_cte \ No newline at end of file diff --git a/dbt_adbs_test_project/models/sales_cost_incremental.py b/dbt_adbs_test_project/models/sales_cost_incremental.py new file mode 100644 index 0000000..c574a70 --- /dev/null +++ b/dbt_adbs_test_project/models/sales_cost_incremental.py @@ -0,0 +1,13 @@ +def model(dbt, session): + # Must be either table or incremental + dbt.config(materialized="incremental", incremental_strategy="merge") + # oml.DataFrame representing a datasource + sales_cost_df = dbt.ref("sales_cost") + + if dbt.is_incremental: + cr = session.cursor() + result = cr.execute(f"select max(cost_timestamp) from {dbt.this.identifier}") + max_timestamp = result.fetchone()[0] + sales_cost_df = sales_cost_df[sales_cost_df["COST_TIMESTAMP"] > max_timestamp] + + return sales_cost_df diff --git a/dbt_adbs_test_project/models/test_py_ref.py b/dbt_adbs_test_project/models/test_py_ref.py new file mode 100644 index 0000000..3085f6e --- /dev/null +++ b/dbt_adbs_test_project/models/test_py_ref.py @@ -0,0 +1,6 @@ +def model(dbt, session): + # Must be either table or incremental (view is not currently supported) + dbt.config(materialized="table") + # oml.core.DataFrame representing a datasource + s_df = dbt.ref("sales_cost") + return s_df diff --git a/dbt_adbs_test_project/models/test_py_source.py b/dbt_adbs_test_project/models/test_py_source.py new file mode 100644 index 0000000..e650c4b --- /dev/null +++ b/dbt_adbs_test_project/models/test_py_source.py @@ -0,0 +1,7 @@ +def model(dbt, session): + # Must be either table or incremental (view is not currently supported) + dbt.config(materialized="table") + # dbt.config(conda_env_name="dbt_py_env") + # oml.core.DataFrame representing a datasource + s_df = dbt.source("sh_database", "channels") + return s_df diff --git a/dbt_adbs_test_project/profiles.yml b/dbt_adbs_test_project/profiles.yml index d5582e2..8e544b8 100644 --- a/dbt_adbs_test_project/profiles.yml +++ b/dbt_adbs_test_project/profiles.yml @@ -11,6 +11,7 @@ dbt_test: service: "{{ env_var('DBT_ORACLE_SERVICE') }}" #database: "{{ env_var('DBT_ORACLE_DATABASE') }}" schema: "{{ env_var('DBT_ORACLE_SCHEMA') }}" + oml_auth_token_uri: "{{ env_var('DBT_ORACLE_OML_AUTH_TOKEN_API')}}" retry_count: 1 retry_delay: 5 shardingkey: @@ -19,7 +20,7 @@ dbt_test: - sskey cclass: CONNECTIVITY_CLASS purity: self - threads: 4 + threads: 1 test: type: oracle user: "{{ env_var('DBT_ORACLE_USER') }}" diff --git a/requirements.txt b/requirements.txt index bf8d997..2230318 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,4 @@ -dbt-core==1.5.0 +dbt-core==1.5.1 cx_Oracle==8.3.0 oracledb==1.3.1 diff --git a/requirements_dev.txt b/requirements_dev.txt index b45a51c..2c0d18c 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -6,4 +6,4 @@ tox coverage twine pytest -dbt-tests-adapter==1.5.0 +dbt-tests-adapter==1.5.1 diff --git a/setup.cfg b/setup.cfg index d520483..8764d57 100644 --- a/setup.cfg +++ b/setup.cfg @@ -33,12 +33,12 @@ zip_safe = False packages = find: include_package_data = True install_requires = - dbt-core==1.5.0 + dbt-core==1.5.1 cx_Oracle==8.3.0 oracledb==1.3.1 test_suite=tests test_requires = - dbt-tests-adapter==1.5.0 + dbt-tests-adapter==1.5.1 pytest scripts = bin/create-pem-from-p12 diff --git a/setup.py b/setup.py index 858e787..4cad1af 100644 --- a/setup.py +++ b/setup.py @@ -32,13 +32,13 @@ requirements = [ - "dbt-core==1.5.0", + "dbt-core==1.5.1", "cx_Oracle==8.3.0", "oracledb==1.3.1" ] test_requirements = [ - "dbt-tests-adapter==1.5.0", + "dbt-tests-adapter==1.5.1", "pytest" ] diff --git a/tox.ini b/tox.ini index cfeda42..30543f5 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,7 @@ passenv = deps = -rrequirements.txt - dbt-tests-adapter==1.5.0 + dbt-tests-adapter==1.5.1 pytest commands = pytest From dd854314b8d54b7bccd2ae61835aee36c495c431 Mon Sep 17 00:00:00 2001 From: Abhishek Singh Date: Mon, 12 Jun 2023 21:23:54 -0700 Subject: [PATCH 2/4] Added named parameters to pyqeval --- .gitignore | 1 + dbt/adapters/oracle/impl.py | 11 +++++++++-- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index a5d0763..7ce8abf 100644 --- a/.gitignore +++ b/.gitignore @@ -146,3 +146,4 @@ doc/build.gitbak .venv1.3/ .venv1.4/ .venv1.5/ +dbt_adbs_py_test_project \ No newline at end of file diff --git a/dbt/adapters/oracle/impl.py b/dbt/adapters/oracle/impl.py index 1572c50..495fee6 100644 --- a/dbt/adapters/oracle/impl.py +++ b/dbt/adapters/oracle/impl.py @@ -389,10 +389,17 @@ def submit_python_job(self, parsed_model: dict, compiled_code: str): if conda_env_name: logger.info("Custom python environment is %s", conda_env_name) py_q_eval_sql = f"""CREATE GLOBAL TEMPORARY TABLE {py_q_eval_result_table} - AS SELECT * FROM TABLE(pyqEval(NULL, ''{py_q_eval_output_fmt}'',''{py_q_script_name}'', NULL, ''{conda_env_name}''))""" + AS SELECT * FROM TABLE(pyqEval(par_lst => NULL, + out_fmt => ''{py_q_eval_output_fmt}'', + scr_name => ''{py_q_script_name}'', + scr_owner => NULL, + env_name => ''{conda_env_name}''))""" else: py_q_eval_sql = f"""CREATE GLOBAL TEMPORARY TABLE {py_q_eval_result_table} - AS SELECT * FROM TABLE(pyqEval(NULL, ''{py_q_eval_output_fmt}'',''{py_q_script_name}'', NULL))""" + AS SELECT * FROM TABLE(pyqEval(par_lst => NULL, + out_fmt => ''{py_q_eval_output_fmt}'', + scr_name => ''{py_q_script_name}'', + scr_owner => NULL))""" py_exec_main_sql = f""" BEGIN From b6d555dc48f6a9acc44948dcafb658fcb9e06ef8 Mon Sep 17 00:00:00 2001 From: Abhishek Singh Date: Mon, 12 Jun 2023 21:34:10 -0700 Subject: [PATCH 3/4] Reverted run_hooks to original calls --- .gitignore | 2 +- .../macros/materializations/incremental/incremental.sql | 9 +++++++-- .../oracle/macros/materializations/table/table.sql | 9 +++++++-- dbt/include/oracle/macros/materializations/view/view.sql | 9 +++++++-- 4 files changed, 22 insertions(+), 7 deletions(-) diff --git a/.gitignore b/.gitignore index 7ce8abf..be7452a 100644 --- a/.gitignore +++ b/.gitignore @@ -146,4 +146,4 @@ doc/build.gitbak .venv1.3/ .venv1.4/ .venv1.5/ -dbt_adbs_py_test_project \ No newline at end of file +dbt_adbs_py_test_project diff --git a/dbt/include/oracle/macros/materializations/incremental/incremental.sql b/dbt/include/oracle/macros/materializations/incremental/incremental.sql index 1681437..93dc8ac 100644 --- a/dbt/include/oracle/macros/materializations/incremental/incremental.sql +++ b/dbt/include/oracle/macros/materializations/incremental/incremental.sql @@ -25,7 +25,10 @@ {% set on_schema_change = incremental_validate_on_schema_change(config.get('on_schema_change'), default='ignore') %} {% set grant_config = config.get('grants') %} - {{ run_hooks(pre_hooks) }} + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} {% set to_drop = [] %} {% if existing_relation is none %} @@ -73,6 +76,8 @@ {% do persist_docs(target_relation, model) %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + -- `COMMIT` happens here {% do adapter.commit() %} @@ -81,7 +86,7 @@ {% do adapter.drop_relation(rel) %} {% endfor %} - {{ run_hooks(post_hooks) }} + {{ run_hooks(post_hooks, inside_transaction=False) }} {% set should_revoke = should_revoke(existing_relation.is_table, full_refresh_mode) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} diff --git a/dbt/include/oracle/macros/materializations/table/table.sql b/dbt/include/oracle/macros/materializations/table/table.sql index 161a7c3..29b23c6 100644 --- a/dbt/include/oracle/macros/materializations/table/table.sql +++ b/dbt/include/oracle/macros/materializations/table/table.sql @@ -57,7 +57,10 @@ {{ drop_relation_if_exists(preexisting_intermediate_relation) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} - {{ run_hooks(pre_hooks) }} + {{ run_hooks(pre_hooks, inside_transaction=False) }} + + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} -- build model {% call statement('main', language=language) %} @@ -77,6 +80,8 @@ {% do create_indexes(target_relation) %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + {% do persist_docs(target_relation, model) %} -- `COMMIT` happens here @@ -85,7 +90,7 @@ -- finally, drop the existing/backup relation after the commit {{ drop_relation_if_exists(backup_relation) }} - {{ run_hooks(post_hooks) }} + {{ run_hooks(post_hooks, inside_transaction=False) }} {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} diff --git a/dbt/include/oracle/macros/materializations/view/view.sql b/dbt/include/oracle/macros/materializations/view/view.sql index 4477f8a..1045783 100644 --- a/dbt/include/oracle/macros/materializations/view/view.sql +++ b/dbt/include/oracle/macros/materializations/view/view.sql @@ -45,10 +45,13 @@ schema=schema, database=database) -%} - {{ run_hooks(pre_hooks) }} + {{ run_hooks(pre_hooks, inside_transaction=False) }} {{ drop_relation_if_exists(preexisting_backup_relation) }} + -- `BEGIN` happens here: + {{ run_hooks(pre_hooks, inside_transaction=True) }} + -- if old_relation was a table {% if old_relation is not none and old_relation.type == 'table' %} {{ adapter.rename_relation(old_relation, backup_relation) }} @@ -61,11 +64,13 @@ {% do persist_docs(target_relation, model) %} + {{ run_hooks(post_hooks, inside_transaction=True) }} + {{ adapter.commit() }} {{ drop_relation_if_exists(backup_relation) }} - {{ run_hooks(post_hooks) }} + {{ run_hooks(post_hooks, inside_transaction=False) }} {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} From d87c20f852013ffd6a65a94eb62df9fbfb84ce51 Mon Sep 17 00:00:00 2001 From: Abhishek Singh Date: Mon, 12 Jun 2023 21:37:11 -0700 Subject: [PATCH 4/4] Upgraded version to 1.5.1 --- Makefile | 2 +- dbt/adapters/oracle/__version__.py | 2 +- setup.cfg | 2 +- setup.py | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index f5d6747..9764a9e 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ # Configuration variables -VERSION=1.5.0 +VERSION=1.5.1 PROJ_DIR?=$(shell pwd) VENV_DIR?=${PROJ_DIR}/.bldenv BUILD_DIR=${PROJ_DIR}/build diff --git a/dbt/adapters/oracle/__version__.py b/dbt/adapters/oracle/__version__.py index 7497b56..54eae38 100644 --- a/dbt/adapters/oracle/__version__.py +++ b/dbt/adapters/oracle/__version__.py @@ -14,4 +14,4 @@ See the License for the specific language governing permissions and limitations under the License. """ -version = "1.5.0" +version = "1.5.1" diff --git a/setup.cfg b/setup.cfg index 8764d57..647e275 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,6 +1,6 @@ [metadata] name = dbt-oracle -version = 1.5.0 +version = 1.5.1 description = dbt (data build tool) adapter for the Oracle database long_description = file: README.md long_description_content_type = text/markdown diff --git a/setup.py b/setup.py index 4cad1af..5473d31 100644 --- a/setup.py +++ b/setup.py @@ -52,7 +52,7 @@ url = 'https://github.com/oracle/dbt-oracle' -VERSION = '1.5.0' +VERSION = '1.5.1' setup( author="Oracle", python_requires='>=3.7.2',