Skip to content

Commit

Permalink
fix: Correct casting of run-operation args to str
Browse files Browse the repository at this point in the history
When running a macro without arguments using DbtRunOperationOperator
the args parameter was set to None. However, dbt always expects a
string. So, now we always cast to string and have set the default to
"{}", that matches dbt's default.
  • Loading branch information
tomasfarias committed May 29, 2022
1 parent c85a189 commit 37da708
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 25 deletions.
2 changes: 1 addition & 1 deletion airflow_dbt_python/__version__.py
Expand Up @@ -2,4 +2,4 @@
__author__ = "Tomás Farías Santana"
__copyright__ = "Copyright 2021 Tomás Farías Santana"
__title__ = "airflow-dbt-python"
__version__ = "0.14.4"
__version__ = "0.14.5"
21 changes: 10 additions & 11 deletions airflow_dbt_python/hooks/dbt.py
Expand Up @@ -85,18 +85,18 @@ def __eq__(self, other):
return Enum.__eq__(self, other)


def parse_vars(vars: Optional[Union[str, dict[str, Any]]]) -> dict[str, Any]:
"""Parse CLI vars as dbt would.
def parse_yaml_args(args: Optional[Union[str, dict[str, Any]]]) -> dict[str, Any]:
"""Parse YAML arguments as dbt would.
This means:
- When vars is a string, we treat it as a YAML dict str.
- When args is a string, we treat it as a YAML dict str.
- If it's already a dictionary, we just return it.
- Otherwise (it's None), we return an empty dictionary.
"""
if isinstance(vars, str):
return yaml_helper.load_yaml_text(vars)
elif isinstance(vars, dict):
return vars
if isinstance(args, str):
return yaml_helper.load_yaml_text(args)
elif isinstance(args, dict):
return args
else:
return {}

Expand Down Expand Up @@ -169,7 +169,7 @@ def __post_init__(self):
Raises:
ValueError: When setting two mutually exclusive parameters.
"""
self.parsed_vars = parse_vars(self.vars)
self.parsed_vars = parse_yaml_args(self.vars)
self.vars = yaml.dump(self.parsed_vars)

mutually_exclusive_attrs = (
Expand Down Expand Up @@ -555,16 +555,15 @@ class RunTaskConfig(TableMutabilityConfig):
class RunOperationTaskConfig(BaseConfig):
"""Dbt run-operation task arguments."""

args: Optional[str] = None
args: str = "{}"
cls: BaseTask = dataclasses.field(default=RunOperationTask, init=False)
macro: Optional[str] = None
which: str = dataclasses.field(default="run-operation", init=False)

def __post_init__(self):
"""Support dictionary args by casting them to str after setting."""
super().__post_init__()
if isinstance(self.args, dict):
self.args = str(self.args)
self.args = str(self.args)


@dataclass
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
@@ -1,6 +1,6 @@
[tool.poetry]
name = "airflow-dbt-python"
version = "0.14.4"
version = "0.14.5"
description = "A dbt operator and hook for Airflow"
authors = ["Tomás Farías Santana <tomas@tomasfarias.dev>"]
license = "MIT"
Expand Down
21 changes: 19 additions & 2 deletions tests/conftest.py
Expand Up @@ -488,13 +488,30 @@ def snapshot_files(dbt_project_dir):


@pytest.fixture
def macro_file(dbt_project_dir):
def macro_name(dbt_project_dir):
"""Create a dbt macro file."""
d = dbt_project_dir / "macros"
d.mkdir(exist_ok=True)
m = d / "my_macro.sql"
m.write_text(MACRO)
return m
return "my_macro"


NON_ARG_MACRO = """
{% macro one() %}
1
{% endmacro %}
"""


@pytest.fixture
def non_arg_macro_name(dbt_project_dir):
"""Create a dbt macro file."""
d = dbt_project_dir / "macros"
d.mkdir(exist_ok=True)
m = d / "my_non_arg_macro.sql"
m.write_text(NON_ARG_MACRO)
return "one"


@pytest.fixture
Expand Down
6 changes: 3 additions & 3 deletions tests/hooks/dbt/test_dbt_hook_configs.py
Expand Up @@ -26,7 +26,7 @@
RunTaskConfig,
SeedTaskConfig,
TestTaskConfig,
parse_vars,
parse_yaml_args,
)


Expand Down Expand Up @@ -214,8 +214,8 @@ def test_build_task_minimal_config_singular(hook, profiles_file, dbt_project_fil
),
],
)
def test_parse_vars(vars, expected):
result = parse_vars(vars)
def test_parse_yaml_args(vars, expected):
result = parse_yaml_args(vars)
assert result == expected


Expand Down
18 changes: 16 additions & 2 deletions tests/hooks/dbt/test_dbt_run_operation.py
@@ -1,14 +1,28 @@
"""Unit test module for running dbt run-operation with the DbtHook."""


def test_dbt_run_operation_task(hook, profiles_file, dbt_project_file, macro_file):
def test_dbt_run_operation_task(hook, profiles_file, dbt_project_file, macro_name):
"""Test a dbt run-operation task."""
factory = hook.get_config_factory("run-operation")
config = factory.create_config(
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
macro=str(macro_file.stem),
macro=macro_name,
args={"an_arg": 123},
)
success, results = hook.run_dbt_task(config)
assert success is True


def test_dbt_run_operation_task_with_no_args(
hook, profiles_file, dbt_project_file, non_arg_macro_name
):
"""Test a dbt run-operation task."""
factory = hook.get_config_factory("run-operation")
config = factory.create_config(
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
macro=non_arg_macro_name,
)
success, results = hook.run_dbt_task(config)
assert success is True
24 changes: 19 additions & 5 deletions tests/operators/test_dbt_run_operation.py
Expand Up @@ -38,7 +38,7 @@ def test_dbt_run_operation_mocked_all_args():


def test_dbt_run_operation_non_existent_macro(
profiles_file, dbt_project_file, macro_file
profiles_file, dbt_project_file, macro_name
):
"""Test exectuion of DbtRunOperationOperator with a non-existent macro."""
op = DbtRunOperationOperator(
Expand All @@ -53,33 +53,47 @@ def test_dbt_run_operation_non_existent_macro(


def test_dbt_run_operation_missing_arguments(
profiles_file, dbt_project_file, macro_file
profiles_file, dbt_project_file, macro_name
):
"""Test exectuion of DbtRunOperationOperator with missing arguments."""
op = DbtRunOperationOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
macro=str(macro_file.stem),
macro=macro_name,
)

with pytest.raises(AirflowException):
op.execute({})


def test_dbt_run_operation_run_macro(profiles_file, dbt_project_file, macro_file):
def test_dbt_run_operation_run_macro(profiles_file, dbt_project_file, macro_name):
"""Test a dbt run-operation operator basic execution."""
op = DbtRunOperationOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
macro=str(macro_file.stem),
macro=macro_name,
args={"an_arg": 123},
)
execution_results = op.execute({})
assert execution_results["success"] is True


def test_dbt_run_operation_run_non_arg_macro(
profiles_file, dbt_project_file, non_arg_macro_name
):
"""Test a dbt run-operation operator basic execution."""
op = DbtRunOperationOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
macro=non_arg_macro_name,
)
execution_results = op.execute({})
assert execution_results["success"] is True


BROKEN_MACRO1 = """
{% macro my_broken_macro(an_arg) %}
{% set sql %}
Expand Down

0 comments on commit 37da708

Please sign in to comment.