Skip to content

Commit

Permalink
refactor(log): Set dbt's logger handlers to Airflow's
Browse files Browse the repository at this point in the history
Starting with dbt v1 two loggers are created by default by dbt:
default_file and default_stdout. We currently do not need two and
would like Airflow to handle all logging. So we remove any handlers
from default_file and replace default_stdout for Airflow's handlers.
  • Loading branch information
tomasfarias committed Feb 1, 2022
1 parent d188437 commit 474d8f8
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 11 deletions.
21 changes: 10 additions & 11 deletions airflow_dbt_python/operators/dbt.py
Expand Up @@ -3,6 +3,7 @@

import datetime as dt
import json
import logging
import os
from contextlib import contextmanager
from dataclasses import asdict, is_dataclass
Expand All @@ -12,7 +13,6 @@
from urllib.parse import urlparse

from dbt.contracts.results import RunExecutionResult, agate
from dbt.logger import log_manager

from airflow import AirflowException
from airflow.models.baseoperator import BaseOperator
Expand Down Expand Up @@ -302,18 +302,17 @@ def dbt_hook(self):
def override_dbt_logging(self, dbt_directory: str = None):
"""Override dbt's logger.
We override the output stream of the dbt logger to use Airflow's StreamLogWriter
so that we can ensure dbt logs properly to the Airflow task's log output.
Starting with dbt v1, dbt initializes two loggers: default_file and
default_stdout. We override default_stdout's handlers with Airflow logger's
handlers.
"""
from airflow.utils.log.logging_mixin import StreamLogWriter
file_logger = logging.getLogger("default_file")
file_logger.handlers = []

with log_manager.applicationbound():
log_manager.reset_handlers()
log_manager.set_path(dbt_directory)
log_manager.set_output_stream(
StreamLogWriter(self.log, self.log.getEffectiveLevel())
)
yield
stdout_logger = logging.getLogger("default_stdout")
stdout_logger.handlers = self.log.handlers

yield

def serializable_result(
self, result: Optional[RunExecutionResult]
Expand Down
91 changes: 91 additions & 0 deletions tests/operators/test_dbt_seed.py
@@ -1,6 +1,8 @@
"""Unit test module for DbtSeedOperator."""
import json
import logging
from pathlib import Path
from typing import Iterator
from unittest.mock import patch

import pytest
Expand Down Expand Up @@ -89,6 +91,95 @@ def test_dbt_seed_models(profiles_file, dbt_project_file, seed_files):
assert isinstance(json.dumps(execution_results), str)


@pytest.fixture
def log_path() -> Iterator[Path]:
"""Path to a test log file."""
p = Path.cwd() / "test.log"
yield p
p.unlink()


def test_dbt_seed_models_logging(profiles_file, dbt_project_file, seed_files, log_path):
"""Test the dbt seed operator logs to a test file without duplicates."""
op = DbtSeedOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
select=[str(s.stem) for s in seed_files],
do_xcom_push=True,
debug=False,
)
op.log.addHandler(logging.FileHandler(log_path))

default_file_logger = logging.getLogger("default_file")
default_stdout_logger = logging.getLogger("default_stdout")

execution_results = op.execute({})

assert len(default_file_logger.handlers) == 0
assert len(default_stdout_logger.handlers) == 1
assert default_stdout_logger.handlers[0] == op.log.handlers[0]

with open(log_path) as log_file:
lines = log_file.readlines()
assert len(lines) >= 0

# Check for duplicate lines
line_1 = (
"1 of 2 START seed file public.seed_1.........................................."
".. [RUN]"
)
assert sum((line_1 in line for line in lines)) == 1

line_2 = "Finished running 2 seeds"
assert sum((line_2 in line for line in lines)) == 1

# Check thread tags are not present (that would indicate we are running with debug flag)
thread_tag = "[info ] [Thread-1 ]"
assert any((thread_tag in line for line in lines)) is False

main_thread_tag = "[info ] [MainThread]"
assert any((main_thread_tag in line for line in lines)) is False


def test_dbt_seed_models_debug_logging(
profiles_file, dbt_project_file, seed_files, log_path
):
"""Test the dbt seed operator debug logs to a test file without duplicates."""
op = DbtSeedOperator(
task_id="dbt_task",
project_dir=dbt_project_file.parent,
profiles_dir=profiles_file.parent,
select=[str(s.stem) for s in seed_files],
do_xcom_push=True,
debug=True,
)
op.log.addHandler(logging.FileHandler(log_path))

default_file_logger = logging.getLogger("default_file")
default_stdout_logger = logging.getLogger("default_stdout")

execution_results = op.execute({})

assert len(default_file_logger.handlers) == 0
assert len(default_stdout_logger.handlers) == 1
assert default_stdout_logger.handlers[0] == op.log.handlers[0]

with open(log_path) as log_file:
lines = log_file.readlines()
assert len(lines) >= 0

# Check for duplicate lines
line_1 = (
"[info ] [Thread-1 ]: 1 of 2 START seed file public.seed_1...................."
"........................ [RUN]"
)
assert sum((line_1 in line for line in lines)) == 1

line_2 = "[info ] [MainThread]: Finished running 2 seeds"
assert sum((line_2 in line for line in lines)) == 1


def test_dbt_seed_models_full_refresh(profiles_file, dbt_project_file, seed_files):
op = DbtSeedOperator(
task_id="dbt_task",
Expand Down

0 comments on commit 474d8f8

Please sign in to comment.