Skip to content

Commit

Permalink
fix: Support Python 3.7 shutil
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Feb 9, 2022
1 parent 41f5353 commit 78b897d
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 14 deletions.
39 changes: 35 additions & 4 deletions airflow_dbt_python/hooks/backends/localfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import shutil
import sys
from functools import partial
from pathlib import Path
from zipfile import ZipFile
Expand Down Expand Up @@ -52,7 +53,10 @@ def pull_many(self, source: StrPath, destination: StrPath) -> Path:

zip_destination.unlink()
else:
shutil.copytree(source, destination, dirs_exist_ok=True)
if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination)
else:
shutil.copytree(source, destination, dirs_exist_ok=True) # type: ignore

return Path(destination)

Expand Down Expand Up @@ -107,6 +111,33 @@ def push_many(

copy_function = partial(self.push_one, replace=replace)

shutil.copytree(
source, destination, copy_function=copy_function, dirs_exist_ok=True
)
if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination, replace)
else:
shutil.copytree( # type: ignore
source, destination, copy_function=copy_function, dirs_exist_ok=True
)


def py37_copytree(source, destination, replace: bool = True):
"""A (probably) poor attempt at replicating shutil.copytree for Python 3.7.
shutil.copytree is available in Python 3.7, however it doesn't have the
dirs_exist_ok parameter, and we really need that. If the destination path doesn't
exist, we can use shutil.copytree, however if it does then we need to copy files
one by one and make any subdirectories ourselves.
"""
if Path(destination).exists():
for path in Path(source).glob("**/*"):
if path.is_dir():
continue

target_path = Path(destination) / path.relative_to(source)
if target_path.exists() and not replace:
# shutil.copy replaces by default
continue

target_path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(path, target_path)
else:
shutil.copytree(source, destination)
25 changes: 16 additions & 9 deletions airflow_dbt_python/operators/dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,22 +174,29 @@ def execute(self, context: dict):
except Exception as e:
self.log.exception("There was an error executing dbt", exc_info=e)
success, results = False, {}
raise AirflowException(
f"An error has occurred while executing dbt: {config.dbt_task}"
) from e

if self.do_xcom_push is True:
# Some dbt operations use dataclasses for its results,
# found in dbt.contracts.results. Each DbtBaseOperator
# subclass should implement prepare_results to return a
# serializable object
finally:
res = self.serializable_result(results)
if context.get("ti", None) is not None:

if (
self.do_xcom_push is True
and context.get("ti", None) is not None
):
# Some dbt operations use dataclasses for its results,
# found in dbt.contracts.results. Each DbtBaseOperator
# subclass should implement prepare_results to return a
# serializable object
self.xcom_push_artifacts(context, dbt_dir)
self.xcom_push(context, key=XCOM_RETURN_KEY, value=res)

if success is not True:
if self.do_xcom_push is True and context.get("ti", None) is not None:
self.xcom_push(context, key=XCOM_RETURN_KEY, value=res)
raise AirflowException(
f"An error has occurred executing dbt {config.dbt_task}"
f"Dbt has failed to execute the following task: {config.dbt_task}"
)

return res

@property
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,6 @@ filterwarnings = [
[tool.mypy]
ignore_missing_imports = true
strict_optional = true
warn_unused_ignores = true
warn_unused_ignores = false
warn_redundant_casts = true
warn_unused_configs = true

0 comments on commit 78b897d

Please sign in to comment.