Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for custom backends #40

Merged
merged 8 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ repos:
hooks:
- id: mypy
name: Static type checking
additional_dependencies: ["types-freezegun==1.1.6"]
additional_dependencies: ["types-freezegun==1.1.6", "boto3-stubs[s3]"]

- repo: https://github.com/pycqa/isort
rev: 5.8.0
Expand Down
29 changes: 29 additions & 0 deletions airflow_dbt_python/hooks/backends/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
"""Backends for storing dbt projects and profiles.

The backend interface includes methods for pulling and pushing one or many files.
Internally, backends use Airflow hooks to execute the actual pushing and pulling.

Currently, only AWS S3 and the local filesystem are supported as backends.
"""
from typing import Optional, Type

from .base import DbtBackend, StrPath
from .localfs import DbtLocalFsBackend

try:
from .s3 import DbtS3Backend
except ImportError:
# S3 backend requires optional dependency
pass


def build_backend(scheme: str, conn_id: Optional[str] = None) -> DbtBackend:
"""Build a DbtBackend as long as the scheme is supported."""
if scheme == "s3":
backend_cls: Type[DbtBackend] = DbtS3Backend
elif scheme == "":
backend_cls = DbtLocalFsBackend
else:
raise NotImplementedError(f"Backend {scheme} is not supported")
backend = backend_cls(conn_id)
return backend
144 changes: 144 additions & 0 deletions airflow_dbt_python/hooks/backends/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Base dbt backend interface.

Ensures methods for pulling and pushing files are defined.
"""
from __future__ import annotations

from abc import ABC, abstractmethod
from os import PathLike
from pathlib import Path
from typing import Iterable, Union
from zipfile import ZipFile

from airflow.utils.log.logging_mixin import LoggingMixin

StrPath = Union[str, "PathLike[str]"]


class DbtBackend(ABC, LoggingMixin):
"""A backend storing any dbt files.

A concrete backend class should implement the push and pull methods to fetch one
or more dbt files. Backends can rely on an Airflow connection with a corresponding
hook, but this is not enforced.

Delegating the responsibility of dealing with dbt files to backend subclasses
allows us to support more backends without changing the DbtHook.

Attributes:
connection_id: An optional Airflow connection. If defined, will be used to
instantiate a hook for this backend.
"""

def pull_dbt_profiles(self, source_prefix: StrPath, destination: StrPath) -> Path:
"""Pull a dbt profiles.yml file from a given source_prefix.

Args:
source_prefix: Path pointing to a directory containing a profiles.yml file.
destination: Path to a directory where the profiles.yml will be stored.

Returns:
The destination Path.
"""
self.log.info("Pulling dbt profiles file from: %s", source_prefix)
if str(source_prefix).endswith("/"):
source_prefix = str(source_prefix) + "profiles.yml"
elif not str(source_prefix).endswith("profiles.yml"):
source_prefix = str(source_prefix) + "/profiles.yml"

destination_path = Path(destination)

if destination_path.is_dir() or destination_path.suffix != ".yml":
destination_path /= "profiles.yml"

self.pull_one(source_prefix, destination_path)
return destination_path

def pull_dbt_project(self, source_prefix: StrPath, destination: StrPath) -> Path:
"""Pull all dbt project files from a given source_prefix.

Args:
source_prefix: Path to a directory containing a dbt project.
destination: Path to a directory where the will be stored.

Returns:
The destination Path.
"""
self.log.info("Pulling dbt project files from: %s", source_prefix)
self.pull_many(source_prefix, destination)

return Path(destination)

def push_dbt_project(
self,
source: StrPath,
destination: StrPath,
replace: bool = False,
delete_before: bool = False,
) -> None:
"""Push all dbt project files from a given source_prefix.

Args:
source: Path to a directory containing a dbt project.
destination: Path or URL to a directory where the will be stored.
replace: Flag to indicate whether to replace existing files.
delete_before: Flag to indicate wheter to clear any existing files before
pushing the dbt project.
"""
self.log.info("Pushing dbt project files to: %s", destination)
self.push_many(
source, destination, replace=replace, delete_before=delete_before
)

@abstractmethod
def pull_one(self, source: StrPath, destination: StrPath) -> Path:
"""Pull a single dbt file from source and store it in destination.

Args:
source: The string representation of a path or a path object pointing to
the file to pull. This could be a URL.
destination: The string representation of a path or a path object pointing
to the location where the file will be stored.

Returns:
The directory where the file was stored.
"""
return NotImplemented

@abstractmethod
def pull_many(self, source: StrPath, destination: StrPath) -> Path:
"""Pull all dbt files under source and store them under destination.

Args:
source: The string representation of a path or a path object pointing to
the a directory containing all dbt files to pull.
destination: The string representation of a path or a path object pointing
to a local directory where all files will be stored.

Returns:
The directory where all files were stored.
"""
return NotImplemented

@abstractmethod
def push_one(self, source: StrPath, destination: StrPath, replace: bool = False):
"""Push a single dbt file from source and store it in destination."""
return NotImplemented

@abstractmethod
def push_many(
self,
source: StrPath,
destination: StrPath,
replace: bool = False,
delete_before: bool = False,
):
"""Push all dbt files under source and store them under destination."""
return NotImplemented


def zip_all_paths(paths: Iterable[Path], zip_path: Path) -> None:
"""Add all paths to a zip file in zip_path."""
with ZipFile(zip_path, "w") as zf:
for _file in paths:
zf.write(_file, arcname=_file.relative_to(zip_path.parent))
143 changes: 143 additions & 0 deletions airflow_dbt_python/hooks/backends/localfs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
"""A local filesystem backend.

Intended to be used only when running Airflow with a LocalExceutor.
"""
from __future__ import annotations

import shutil
import sys
from functools import partial
from pathlib import Path
from zipfile import ZipFile

from .base import DbtBackend, StrPath, zip_all_paths


class DbtLocalFsBackend(DbtBackend):
"""A concrete dbt backend for a local filesystem.

This backend is intended to be used when running Airflow with a LocalExecutor, and
it relies on shutil from the standard library to do all the file manipulation. For
these reasons, running multiple concurrent tasks with this backend may lead to race
conditions if attempting to push files to the backend.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def pull_one(self, source: StrPath, destination: StrPath) -> Path:
"""Pull a file from local path.

Args:
source: A local path to a directory containing the file to pull.
destination: A destination path where to pull the file to.
"""
destination_path = Path(destination)
destination_path.parent.mkdir(exist_ok=True, parents=True)

return shutil.copy(source, destination)

def pull_many(self, source: StrPath, destination: StrPath) -> Path:
"""Pull many files from local path.

Args:
source: A local path to a directory containing the files to pull.
destination: A destination path where to pull the file to.
"""
if Path(source).suffix == ".zip":
zip_destination = Path(destination) / "dbt_project.zip"
shutil.copy(source, zip_destination)

with ZipFile(zip_destination, "r") as zf:
zf.extractall(zip_destination.parent)

zip_destination.unlink()
else:
if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination)
else:
shutil.copytree(source, destination, dirs_exist_ok=True) # type: ignore

return Path(destination)

def push_one(
self, source: StrPath, destination: StrPath, replace: bool = False
) -> None:
"""Pull many files from local path.

If the file already exists, it will be ignored if replace is False (the
default).

Args:
source: A local path to a directory containing the files to pull.
destination: A destination path where to pull the file to.
replace: A bool flag to indicate whether to replace existing files.
"""
if replace is False and Path(destination).exists():
return
shutil.copy(source, destination)

def push_many(
self,
source: StrPath,
destination: StrPath,
replace: bool = False,
delete_before: bool = False,
) -> None:
"""Push all dbt files under the source directory to another local path.

Pushing supports zipped projects: the destination will be used to determine
if we are working with a zip file by looking at the file extension.

Args:
source: A local file path where to fetch the files to push.
destination: A local path where the file should be copied.
replace: Whether to replace existing files or not.
delete_before: Whether to delete the contents of destination before pushing.
"""
if Path(destination).suffix == ".zip":
if delete_before:
Path(destination).unlink()

all_files = Path(source).glob("**/*")

zip_path = Path(source) / ".temp.zip"
zip_all_paths(all_files, zip_path=zip_path)

shutil.copy(zip_path, destination)
else:
if delete_before:
shutil.rmtree(destination)

copy_function = partial(self.push_one, replace=replace)

if sys.version_info.major == 3 and sys.version_info.minor < 8:
py37_copytree(source, destination, replace)
else:
shutil.copytree( # type: ignore
source, destination, copy_function=copy_function, dirs_exist_ok=True
)


def py37_copytree(source: StrPath, destination: StrPath, replace: bool = True):
"""A (probably) poor attempt at replicating shutil.copytree for Python 3.7.

shutil.copytree is available in Python 3.7, however it doesn't have the
dirs_exist_ok parameter, and we really need that. If the destination path doesn't
exist, we can use shutil.copytree, however if it does then we need to copy files
one by one and make any subdirectories ourselves.
"""
if Path(destination).exists():
for path in Path(source).glob("**/*"):
if path.is_dir():
continue

target_path = Path(destination) / path.relative_to(source)
if target_path.exists() and not replace:
# shutil.copy replaces by default
continue

target_path.parent.mkdir(exist_ok=True, parents=True)
shutil.copy(path, target_path)
else:
shutil.copytree(source, destination)