From 6bcf1c0f3a4bac7d05bc9f6c69354cbb39978956 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tom=C3=A1s=20Far=C3=ADas=20Santana?= Date: Tue, 8 Feb 2022 16:42:41 +0000 Subject: [PATCH] fix: Support Python 3.7 shutil --- airflow_dbt_python/hooks/backends/localfs.py | 39 ++++++++++++++-- airflow_dbt_python/operators/dbt.py | 25 ++++++---- pyproject.toml | 2 +- .../dbt/backends/test_dbt_localfs_backend.py | 46 +++++++++++++++++++ 4 files changed, 98 insertions(+), 14 deletions(-) diff --git a/airflow_dbt_python/hooks/backends/localfs.py b/airflow_dbt_python/hooks/backends/localfs.py index 249e20f..3dbd4e2 100644 --- a/airflow_dbt_python/hooks/backends/localfs.py +++ b/airflow_dbt_python/hooks/backends/localfs.py @@ -5,6 +5,7 @@ from __future__ import annotations import shutil +import sys from functools import partial from pathlib import Path from zipfile import ZipFile @@ -52,7 +53,10 @@ def pull_many(self, source: StrPath, destination: StrPath) -> Path: zip_destination.unlink() else: - shutil.copytree(source, destination, dirs_exist_ok=True) + if sys.version_info.major == 3 and sys.version_info.minor < 8: + py37_copytree(source, destination) + else: + shutil.copytree(source, destination, dirs_exist_ok=True) # type: ignore return Path(destination) @@ -107,6 +111,33 @@ def push_many( copy_function = partial(self.push_one, replace=replace) - shutil.copytree( - source, destination, copy_function=copy_function, dirs_exist_ok=True - ) + if sys.version_info.major == 3 and sys.version_info.minor < 8: + py37_copytree(source, destination, replace) + else: + shutil.copytree( # type: ignore + source, destination, copy_function=copy_function, dirs_exist_ok=True + ) + + +def py37_copytree(source: StrPath, destination: StrPath, replace: bool = True): + """A (probably) poor attempt at replicating shutil.copytree for Python 3.7. + + shutil.copytree is available in Python 3.7, however it doesn't have the + dirs_exist_ok parameter, and we really need that. If the destination path doesn't + exist, we can use shutil.copytree, however if it does then we need to copy files + one by one and make any subdirectories ourselves. + """ + if Path(destination).exists(): + for path in Path(source).glob("**/*"): + if path.is_dir(): + continue + + target_path = Path(destination) / path.relative_to(source) + if target_path.exists() and not replace: + # shutil.copy replaces by default + continue + + target_path.parent.mkdir(exist_ok=True, parents=True) + shutil.copy(path, target_path) + else: + shutil.copytree(source, destination) diff --git a/airflow_dbt_python/operators/dbt.py b/airflow_dbt_python/operators/dbt.py index 8659571..ad40487 100644 --- a/airflow_dbt_python/operators/dbt.py +++ b/airflow_dbt_python/operators/dbt.py @@ -175,22 +175,29 @@ def execute(self, context: dict): except Exception as e: self.log.exception("There was an error executing dbt", exc_info=e) success, results = False, {} + raise AirflowException( + f"An error has occurred while executing dbt: {config.dbt_task}" + ) from e - if self.do_xcom_push is True: - # Some dbt operations use dataclasses for its results, - # found in dbt.contracts.results. Each DbtBaseOperator - # subclass should implement prepare_results to return a - # serializable object + finally: res = self.serializable_result(results) - if context.get("ti", None) is not None: + + if ( + self.do_xcom_push is True + and context.get("ti", None) is not None + ): + # Some dbt operations use dataclasses for its results, + # found in dbt.contracts.results. Each DbtBaseOperator + # subclass should implement prepare_results to return a + # serializable object self.xcom_push_artifacts(context, dbt_dir) + self.xcom_push(context, key=XCOM_RETURN_KEY, value=res) if success is not True: - if self.do_xcom_push is True and context.get("ti", None) is not None: - self.xcom_push(context, key=XCOM_RETURN_KEY, value=res) raise AirflowException( - f"An error has occurred executing dbt {config.dbt_task}" + f"Dbt has failed to execute the following task: {config.dbt_task}" ) + return res @property diff --git a/pyproject.toml b/pyproject.toml index 720d355..c00a247 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -101,6 +101,6 @@ filterwarnings = [ [tool.mypy] ignore_missing_imports = true strict_optional = true -warn_unused_ignores = true +warn_unused_ignores = false warn_redundant_casts = true warn_unused_configs = true diff --git a/tests/hooks/dbt/backends/test_dbt_localfs_backend.py b/tests/hooks/dbt/backends/test_dbt_localfs_backend.py index 592f047..4e0bfa5 100644 --- a/tests/hooks/dbt/backends/test_dbt_localfs_backend.py +++ b/tests/hooks/dbt/backends/test_dbt_localfs_backend.py @@ -9,6 +9,7 @@ import pytest from airflow_dbt_python.hooks.backends import DbtLocalFsBackend +from airflow_dbt_python.hooks.backends.localfs import py37_copytree def test_pull_dbt_profiles(tmpdir, profiles_file): @@ -187,3 +188,48 @@ def test_push_dbt_project_to_zip_file(tmpdir, test_files): backend.push_dbt_project(test_files[0].parent.parent, zip_path) assert zip_path.exists() + + +def test_py37_copytree(test_files, tmpdir): + """The Python 3.7 workaround should produce the same results as copytree.""" + py37_dir = tmpdir / "py37_copytree_target" + assert not py37_dir.exists() + copytree_dir = tmpdir / "copytree_target" + assert not copytree_dir.exists() + + shutil.copytree(test_files[0].parent.parent, copytree_dir) + py37_copytree(test_files[0].parent.parent, py37_dir) + + for path in Path(copytree_dir).glob("**/*"): + if path.is_dir(): + continue + + py37_path = py37_dir / path.relative_to(copytree_dir) + assert py37_path.exists() + + +def test_py37_copytree_no_replace(test_files, tmpdir): + """The Python 3.7 workaround should produce the same results as copytree.""" + source = test_files[0].parent.parent + py37_copytree(source, source, replace=False) + + all_paths = [p for p in source.glob("**/*") if not p.is_dir()] + assert len(all_paths) == 4 + + +def test_py37_copytree_if_exists(test_files, tmpdir): + """The Python 3.7 workaround should produce the same results as copytree.""" + py37_dir = tmpdir / "py37_copytree_target" + py37_dir.mkdir() + + assert py37_dir.exists() + + source = test_files[0].parent.parent + py37_copytree(source, py37_dir) + + for path in source.glob("**/*"): + if path.is_dir(): + continue + + py37_path = py37_dir / path.relative_to(source) + assert py37_path.exists()