Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 11 additions & 16 deletions airflow_dbt_python/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import contextmanager
from dataclasses import asdict, is_dataclass
from pathlib import Path
from tempfile import NamedTemporaryFile, TemporaryDirectory
from tempfile import TemporaryDirectory
from typing import Any, Iterator, Optional, Union
from urllib.parse import urlparse

Expand Down Expand Up @@ -241,23 +241,18 @@ def dbt_s3_hook(self):
def override_dbt_logging(self, dbt_directory: str = None):
"""Override dbt's logger.

dbt logger writes to STDOUT and I haven't found a way
to bubble up to the Airflow command logger. As a workaround,
I set the output stream to a temporary file that is later
read and logged using the command's logger.
We override the output stream of the dbt logger to use Airflow's StreamLogWriter
so that we can ensure dbt logs properly to the Airflow task's log output.
"""
with NamedTemporaryFile(dir=dbt_directory, mode="w+") as f:
with log_manager.applicationbound():
from airflow.utils.log.logging_mixin import StreamLogWriter

log_manager.reset_handlers()
log_manager.set_path(dbt_directory)
log_manager.set_output_stream(f)

yield

with open(f.name) as read_file:
for line in read_file:
self.log.info(line.rstrip())
with log_manager.applicationbound():
log_manager.reset_handlers()
log_manager.set_path(dbt_directory)
log_manager.set_output_stream(
StreamLogWriter(self.log, self.log.getEffectiveLevel())
)
yield

def run_dbt_command(self, args: list[Optional[str]]) -> tuple[RunResult, bool]:
"""Run a dbt command as implemented by a subclass."""
Expand Down