Skip to content

Latest commit

 

History

History
87 lines (59 loc) · 3.67 KB

EXAMPLES.md

File metadata and controls

87 lines (59 loc) · 3.67 KB

Example Use Cases

Use customised adapter, provide jinja with further python methods

When you want to add more methods to existing adapter and make these methods available to jinja. you can use dbt_custom_adapter variable and use your adapter class with dbt.

Step-1: Extend existing adapter, add new methods to it.

class DuckDBAdapterV2Custom(DuckDBAdapter):
@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
model_unique_id = parsed_model.get('unique_id')
__py_code = f"""
{compiled_code}
# NOTE this is local python execution so session is None
model(dbt=dbtObj(None), session=None)
"""
with tempfile.NamedTemporaryFile(suffix=f'__{model_unique_id}.py', delete=False) as fp:
fp.write(__py_code.encode('utf-8'))
fp.close()
print(f"Created temp py file {fp.name}")
Utils.runcommand(command=['python', fp.name])

Step-2: Edit dbt_project.yml file, set dbt_custom_adapter variable to the class name of your custom adapter.

vars:
  dbt_custom_adapter: opendbt.examples.DuckDBAdapterV2Custom

Step-3: Run dbt, now dbt is loading and using the provided adapter class.

from opendbt import OpenDbtProject

dp = OpenDbtProject(project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir")
dp.run(command="run")

Execute Python Model Locally

Using customized adapter and a custom materialization we can extend dbt to run local python code. this is useful for the scenarios where data is imported from external API.

Step-1: Extend existing adapter, Add new adapter method which runs given python code. Here we are extending DuckDBAdapter and adding new method submit_local_python_job to it. This method executes given python code as a subprocess

class DuckDBAdapterV2Custom(DuckDBAdapter):
@available
def submit_local_python_job(self, parsed_model: Dict, compiled_code: str):
model_unique_id = parsed_model.get('unique_id')
__py_code = f"""
{compiled_code}
# NOTE this is local python execution so session is None
model(dbt=dbtObj(None), session=None)
"""
with tempfile.NamedTemporaryFile(suffix=f'__{model_unique_id}.py', delete=False) as fp:
fp.write(__py_code.encode('utf-8'))
fp.close()
print(f"Created temp py file {fp.name}")
Utils.runcommand(command=['python', fp.name])

Step-2: Create materialization named executepython, In this materialization (from the jonja) we call this new( above) adapter method to run compiled python code

{% materialization executepython, supported_languages=['python']%}
{%- set identifier = model['alias'] -%}
{%- set language = model['language'] -%}
{% set grant_config = config.get('grants') %}
{%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%}
{%- set target_relation = api.Relation.create(identifier=identifier,
schema=schema,
database=database, type='table') -%}
{{ run_hooks(pre_hooks) }}
{% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %}
{%- set res = adapter.submit_local_python_job(model, compiled_code) -%}
{% endcall %}
{{ run_hooks(post_hooks) }}
{% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}
{{ return({'relations': [target_relation]}) }}
{% endmaterialization %}

Step-3: Creating a sample python model using the executepython materialization. which is executed locally by dbt.

import os
import platform
from dbt import version
def print_info():
_str = f"name:{os.name}, system:{platform.system()} release:{platform.release()}"
_str += f"\npython version:{platform.python_version()}, dbt:{version.__version__}"
print(_str)
def model(dbt, session):
dbt.config(materialized="executepython")
print("==================================================")
print("========IM LOCALLY EXECUTED PYTHON MODEL==========")
print("==================================================")
print_info()
print("==================================================")
print("===============MAKE DBT GREAT AGAIN===============")
print("==================================================")
return None

Enable Model-Level Orchestration Using Airflow

Step-1: Create Dag to run dbt project

with DAG(
dag_id='dbt_workflow',
default_args=default_args,
description='DAG To run dbt',
schedule_interval=None,
start_date=days_ago(3),
catchup=False,
max_active_runs=1
) as dag:
start = EmptyOperator(task_id="start")
end = EmptyOperator(task_id="end")
DBTTEST_DIR = Path("/opt/dbttest")
p = OpenDbtAirflowProject(project_dir=DBTTEST_DIR, profiles_dir=DBTTEST_DIR, target='dev')
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)

airflow-dbt-flow.png

Create dag using subset of dbt models

from opendbt.airflow import OpenDbtAirflowProject

# create dbt build tasks for models with given tag
p = OpenDbtAirflowProject(resource_type='model', project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir",
                          target='dev', tag="MY_TAG")
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)

Create dag to run tests

from opendbt.airflow import OpenDbtAirflowProject

# create dbt test tasks with given model tag
p = OpenDbtAirflowProject(resource_type='test', project_dir="/dbt/project_dir", profiles_dir="/dbt/profiles_dir",
                          target='dev', tag="MY_TAG")
p.load_dbt_tasks(dag=dag, start_node=start, end_node=end)

Create page on Airflow Server to serve DBT docs

While its very practical to use airflow for dbt executions, it could also be used to server dbt docs.

here is how: Step-1: Create python file under airflow /{airflow}/plugins directory, with following code. Adjust the given path to the folder where dbt docs are published.

from pathlib import Path
from opendbt.airflow import dbtdocs
# create public page on airflow server to serve DBT docs
airflow_dbtdocs_page = dbtdocs.init_plugins_dbtdocs_page(Path("/opt/dbttest/target"))

Step-2: Restart airflow, and check that new link DBT Docs is created. airflow-dbt-docs-link.png

Step-3: open the link and browse dbt docs. airflow-dbt-docs-page.png