Skip to content

Commit

Permalink
fix: Support Python 3.7 shutil
Browse files Browse the repository at this point in the history
  • Loading branch information
tomasfarias committed Feb 18, 2022
1 parent 7f0232a commit 6bcf1c0
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 14 deletions.
39 changes: 35 additions & 4 deletions airflow_dbt_python/hooks/backends/localfs.py
Expand Up @@ -5,6 +5,7 @@
from __future__ import annotations

import shutil
import sys
from functools import partial
from pathlib import Path
from zipfile import ZipFile
Expand Down Expand Up @@ -52,7 +53,10 @@ def pull_many(self, source: StrPath, destination: StrPath) -> Path:

zip_destination.unlink()
else:
shutil.copytree(source, destination, dirs_exist_ok=True)
if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination)
else:
shutil.copytree(source, destination, dirs_exist_ok=True) # type: ignore

return Path(destination)

Expand Down Expand Up @@ -107,6 +111,33 @@ def push_many(

copy_function = partial(self.push_one, replace=replace)

shutil.copytree(
source, destination, copy_function=copy_function, dirs_exist_ok=True
)
if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination, replace)
else:
shutil.copytree( # type: ignore
source, destination, copy_function=copy_function, dirs_exist_ok=True
)


def py37_copytree(source: StrPath, destination: StrPath, replace: bool = True):
"""A (probably) poor attempt at replicating shutil.copytree for Python 3.7.
shutil.copytree is available in Python 3.7, however it doesn't have the
dirs_exist_ok parameter, and we really need that. If the destination path doesn't
exist, we can use shutil.copytree, however if it does then we need to copy files
one by one and make any subdirectories ourselves.
"""
if Path(destination).exists():
for path in Path(source).glob("**/*"):
if path.is_dir():
continue

target_path = Path(destination) / path.relative_to(source)
if target_path.exists() and not replace:
# shutil.copy replaces by default
continue

target_path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(path, target_path)
else:
shutil.copytree(source, destination)
25 changes: 16 additions & 9 deletions airflow_dbt_python/operators/dbt.py
Expand Up @@ -175,22 +175,29 @@ def execute(self, context: dict):
except Exception as e:
self.log.exception("There was an error executing dbt", exc_info=e)
success, results = False, {}
raise AirflowException(
f"An error has occurred while executing dbt: {config.dbt_task}"
) from e

if self.do_xcom_push is True:
# Some dbt operations use dataclasses for its results,
# found in dbt.contracts.results. Each DbtBaseOperator
# subclass should implement prepare_results to return a
# serializable object
finally:
res = self.serializable_result(results)
if context.get("ti", None) is not None:

if (
self.do_xcom_push is True
and context.get("ti", None) is not None
):
# Some dbt operations use dataclasses for its results,
# found in dbt.contracts.results. Each DbtBaseOperator
# subclass should implement prepare_results to return a
# serializable object
self.xcom_push_artifacts(context, dbt_dir)
self.xcom_push(context, key=XCOM_RETURN_KEY, value=res)

if success is not True:
if self.do_xcom_push is True and context.get("ti", None) is not None:
self.xcom_push(context, key=XCOM_RETURN_KEY, value=res)
raise AirflowException(
f"An error has occurred executing dbt {config.dbt_task}"
f"Dbt has failed to execute the following task: {config.dbt_task}"
)

return res

@property
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -101,6 +101,6 @@ filterwarnings = [
[tool.mypy]
ignore_missing_imports = true
strict_optional = true
warn_unused_ignores = true
warn_unused_ignores = false
warn_redundant_casts = true
warn_unused_configs = true
46 changes: 46 additions & 0 deletions tests/hooks/dbt/backends/test_dbt_localfs_backend.py
Expand Up @@ -9,6 +9,7 @@
import pytest

from airflow_dbt_python.hooks.backends import DbtLocalFsBackend
from airflow_dbt_python.hooks.backends.localfs import py37_copytree


def test_pull_dbt_profiles(tmpdir, profiles_file):
Expand Down Expand Up @@ -187,3 +188,48 @@ def test_push_dbt_project_to_zip_file(tmpdir, test_files):
backend.push_dbt_project(test_files[0].parent.parent, zip_path)

assert zip_path.exists()


def test_py37_copytree(test_files, tmpdir):
"""The Python 3.7 workaround should produce the same results as copytree."""
py37_dir = tmpdir / "py37_copytree_target"
assert not py37_dir.exists()
copytree_dir = tmpdir / "copytree_target"
assert not copytree_dir.exists()

shutil.copytree(test_files[0].parent.parent, copytree_dir)
py37_copytree(test_files[0].parent.parent, py37_dir)

for path in Path(copytree_dir).glob("**/*"):
if path.is_dir():
continue

py37_path = py37_dir / path.relative_to(copytree_dir)
assert py37_path.exists()


def test_py37_copytree_no_replace(test_files, tmpdir):
"""The Python 3.7 workaround should produce the same results as copytree."""
source = test_files[0].parent.parent
py37_copytree(source, source, replace=False)

all_paths = [p for p in source.glob("**/*") if not p.is_dir()]
assert len(all_paths) == 4


def test_py37_copytree_if_exists(test_files, tmpdir):
"""The Python 3.7 workaround should produce the same results as copytree."""
py37_dir = tmpdir / "py37_copytree_target"
py37_dir.mkdir()

assert py37_dir.exists()

source = test_files[0].parent.parent
py37_copytree(source, py37_dir)

for path in source.glob("**/*"):
if path.is_dir():
continue

py37_path = py37_dir / path.relative_to(source)
assert py37_path.exists()

0 comments on commit 6bcf1c0

Please sign in to comment.