Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions .flake8
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
[flake8]
max-line-length = 88
docstring-convention = google
# Will finish these later
extend-ignore = D101,D107
ignore = D107, W503
per-file-ignores =
tests/*.py: D1,D2,D4
examples/*.py: D1,D2,D4
__version__.py: D100
__init__.py: F401
exclude =
airflow
Expand All @@ -16,6 +12,7 @@ exclude =
setup.py
build
dist
tests
.venv
.tox
.mypy_cache
Expand Down
2 changes: 2 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ repos:
hooks:
- id: flake8
additional_dependencies: [flake8-docstrings]
args: ["--config", ".flake8"]
exclude: tests/

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v0.902
Expand Down
1 change: 1 addition & 0 deletions airflow_dbt_python/__version__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""The module's version information."""
__author__ = "Tomás Farías Santana"
__copyright__ = "Copyright 2021 Tomás Farías Santana"
__title__ = "airflow-dbt-python"
Expand Down
6 changes: 6 additions & 0 deletions airflow_dbt_python/hooks/dbt_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@


class DbtS3Hook(S3Hook):
"""Subclass of S3Hook with methods to pull dbt-related files.

A dbt hook should provide a method to pull a dbt profiles file (profiles.yml) and
all the files corresponding to a project.
"""

def get_dbt_profiles(
self, s3_profiles_url: str, profiles_dir: Optional[str] = None
) -> Path:
Expand Down
96 changes: 81 additions & 15 deletions airflow_dbt_python/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,14 @@ def __init__(
self._dbt_s3_hook = None

def execute(self, context: dict):
"""Execute dbt command with prepared arguments."""
"""Execute dbt command with prepared arguments.

Execution requires setting up a directory with the dbt project files and
overriding the logging.

Args:
context: The Airflow's task context
"""
with self.dbt_directory() as dbt_dir: # type: str
with self.override_dbt_logging(dbt_dir):
args = self.prepare_args()
Expand Down Expand Up @@ -125,6 +132,9 @@ def execute(self, context: dict):
def xcom_push_artifacts(self, context: dict, dbt_directory: str):
"""Read dbt artifacts and push them to XCom.

Artifacts are read from the target/ directory in dbt_directory. This method will
fail if the required artifact is not found.

Args:
context: The Airflow task's context.
dbt_directory: A directory containing a dbt project. Artifacts will be
Expand Down Expand Up @@ -160,7 +170,11 @@ def prepare_args(self) -> list[Optional[str]]:
return args

def args_list(self) -> list[str]:
"""Build a list of arguments to pass to dbt."""
"""Build a list of arguments to pass to dbt.

Building involves creating a list of flags for dbt to parse given the operators
attributes and the values specified by __dbt_args__.
"""
args = []
for arg in self.__dbt_args__:
value = getattr(self, arg, None)
Expand Down Expand Up @@ -289,7 +303,12 @@ def serializable_result(


class DbtRunOperator(DbtBaseOperator):
"""Executes dbt run."""
"""Executes a dbt run command.

The run command executes SQL model files against the given target. The
documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/run.
"""

command = "run"

Expand Down Expand Up @@ -337,7 +356,12 @@ def __init__(


class DbtSeedOperator(DbtBaseOperator):
"""Executes dbt seed."""
"""Executes a dbt seed command.

The seed command loads csv files into the the given target. The
documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/seed.
"""

command = "seed"

Expand Down Expand Up @@ -373,7 +397,12 @@ def __init__(


class DbtTestOperator(DbtBaseOperator):
"""Executes dbt test."""
"""Executes a dbt test command.

The test command runs data and/or schema tests. The
documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/test.
"""

command = "test"

Expand Down Expand Up @@ -424,7 +453,12 @@ def __init__(


class DbtCompileOperator(DbtBaseOperator):
"""Executes dbt compile."""
"""Executes a dbt compile command.

The compile command generates SQL files. The
documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/compile.
"""

command = "compile"

Expand Down Expand Up @@ -469,7 +503,11 @@ def __init__(


class DbtDepsOperator(DbtBaseOperator):
"""Executes dbt deps."""
"""Executes a dbt deps command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/deps.
"""

command = "deps"

Expand All @@ -478,7 +516,11 @@ def __init__(self, **kwargs) -> None:


class DbtCleanOperator(DbtBaseOperator):
"""Executes dbt clean."""
"""Executes a dbt clean command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/debug.
"""

command = "clean"

Expand All @@ -487,7 +529,11 @@ def __init__(self, **kwargs) -> None:


class DbtDebugOperator(DbtBaseOperator):
"""Execute dbt debug."""
"""Executes a dbt debug command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/debug.
"""

command = "debug"

Expand All @@ -505,7 +551,11 @@ def __init__(


class DbtSnapshotOperator(DbtBaseOperator):
"""Execute dbt snapshot."""
"""Executes a dbt snapshot command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/snapshot.
"""

command = "snapshot"

Expand Down Expand Up @@ -535,7 +585,11 @@ def __init__(


class DbtLsOperator(DbtBaseOperator):
"""Execute dbt list (or ls)."""
"""Executes a dbt list (or ls) command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/list.
"""

command = "ls"

Expand Down Expand Up @@ -572,7 +626,11 @@ def __init__(


class DbtRunOperationOperator(DbtBaseOperator):
"""Execute dbt run-operation."""
"""Executes a dbt run-operation command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/run-operation.
"""

command = "run-operation"

Expand All @@ -591,7 +649,11 @@ def __init__(


class DbtParseOperator(DbtBaseOperator):
"""Execute dbt parse."""
"""Executes a dbt parse command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/parse.
"""

command = "parse"

Expand All @@ -603,7 +665,11 @@ def __init__(


class DbtSourceOperator(DbtBaseOperator):
"""Execute dbt source."""
"""Executes a dbt source command.

The documentation for the dbt command can be found here:
https://docs.getdbt.com/reference/commands/source.
"""

command = "source"

Expand Down Expand Up @@ -640,7 +706,7 @@ def __init__(


class DbtBuildOperator(DbtBaseOperator):
"""Execute dbt build.
"""Executes a dbt build command.

The build command combines the run, test, seed, and snapshot commands into one. The
full Documentation for the dbt build command can be found here:
Expand Down
1 change: 1 addition & 0 deletions examples/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Usage examples of airflow-dbt-python."""
4 changes: 1 addition & 3 deletions examples/basic_dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Sample basic DAG which dbt runs a project
"""
"""Sample basic DAG which dbt runs a project."""
import datetime as dt

from airflow import DAG
Expand Down
5 changes: 3 additions & 2 deletions examples/complete_dbt_workflow_dag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""
Sample DAG showcasing a complete dbt workflow
"""Sample DAG showcasing a complete dbt workflow.

The complete workflow includes a sequence of source, seed, and several run commands.
"""
import datetime as dt

Expand Down
4 changes: 1 addition & 3 deletions examples/dbt_project_in_s3_dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Sample basic DAG which showcases a dbt project being pulled from S3
"""
"""Sample basic DAG which showcases a dbt project being pulled from S3."""
import datetime as dt

from airflow import DAG
Expand Down
8 changes: 2 additions & 6 deletions examples/use_dbt_artifacts_dag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
"""
Sample DAG to showcase pulling dbt artifacts from XCOM
"""
"""Sample DAG to showcase pulling dbt artifacts from XCOM."""
import datetime as dt

from airflow import DAG
Expand All @@ -10,9 +8,7 @@


def process_dbt_artifacts(**context):
"""
Report which model or models took the longest to compile and execute
"""
"""Report which model or models took the longest to compile and execute."""
run_results = context["ti"].xcom_pull(
key="run_results.json", task_ids="dbt_run_daily"
)
Expand Down
5 changes: 5 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""Conftest file including setting common fixtures.

Common fixtures include a connection to a postgres database, a set of sample model and
seed files, dbt configuration files, and temporary directories for everything.
"""
import boto3
import pytest
from dbt.version import __version__ as DBT_VERSION
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtBaseOperator."""
from pathlib import Path
from unittest.mock import patch

Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_build.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtBuildOperator."""
import json
from unittest.mock import patch

Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_clean.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtCleanOperator."""
from unittest.mock import patch

from airflow_dbt_python.operators.dbt import DbtCleanOperator, DbtCompileOperator
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_compile.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtCompileOperator."""
from unittest.mock import patch

from dbt.contracts.results import RunStatus
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_debug.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtDebugOperator."""
from unittest.mock import patch

from dbt.version import __version__ as DBT_VERSION
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_deps.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtDepsOperator."""
from unittest.mock import patch

import pytest
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_list.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtListOperator."""
from itertools import chain
from unittest.mock import patch

Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_parse.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtParseOperator."""
from pathlib import Path
from unittest.mock import patch

Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_run.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtRunOperator."""
import json
from unittest.mock import patch

Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_run_operation.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtRunOperationOperator."""
from unittest.mock import patch

import pytest
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_s3_hook.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtS3Hook."""
import pytest

try:
Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_seed.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtSeedOperator."""
import json
from unittest.mock import patch

Expand Down
3 changes: 2 additions & 1 deletion tests/test_dbt_snapshot.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
"""Unit test module for DbtSnapshotOperator."""
from unittest.mock import patch

import pytest
from airflow import AirflowException
from dbt.contracts.results import RunStatus

from airflow import AirflowException
from airflow_dbt_python.operators.dbt import DbtSnapshotOperator


Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtSourceOperator."""
from pathlib import Path
from unittest.mock import patch

Expand Down
1 change: 1 addition & 0 deletions tests/test_dbt_test.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Unit test module for DbtTestOperator."""
from unittest.mock import patch

import pytest
Expand Down