Skip to content

Commit

Permalink
feat(s3): Add parameter to control replacement of existing S3 keys
Browse files Browse the repository at this point in the history
Setting replace_on_push to False will skip any S3 uploads when a key
already exists. This is useful to guarantee no unexpected files are
changed.

I'm still not super convinced about the solution as ideally
we would like partial replacing; only replacing changed files would be
ideal. This is hard to do with compressed projects, as it would
require downloading and uncompressing the project first. This has
been left for future work.
  • Loading branch information
tomasfarias committed Jan 22, 2022
1 parent aa57cc5 commit dd216ed
Show file tree
Hide file tree
Showing 7 changed files with 907 additions and 556 deletions.
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Expand Up @@ -17,6 +17,7 @@ repos:
rev: v0.902
hooks:
- id: mypy
additional_dependencies: ["types-freezegun==1.1.6"]

- repo: https://github.com/pycqa/isort
rev: 5.8.0
Expand Down
66 changes: 60 additions & 6 deletions airflow_dbt_python/hooks/s3.py
@@ -1,6 +1,7 @@
"""Provides an S3 hook exclusively for fetching dbt files."""
from __future__ import annotations

import os
from pathlib import Path
from typing import Optional
from zipfile import ZipFile
Expand Down Expand Up @@ -132,8 +133,22 @@ def download_many_s3_keys(

self.download_one_s3_object(local_project_file, s3_object)

def push_dbt_project(self, s3_project_url: str, project_dir: str):
"""Push a dbt project to S3."""
def push_dbt_project(
self, s3_project_url: str, project_dir: str, replace: bool = False
):
"""Push a dbt project to S3.
Pushing supports zipped projects: the s3_project_url will be used to determine
if we are working with a zip file by looking at the file extension.
Args:
s3_project_url (str): URL where the file/s should be uploaded. The bucket
name and key prefix will be extracted by calling S3Hook.parse_s3_url.
project_dir (str): A directory containing dbt project files. If
s3_project_url indicates a zip file, the contents of project_dir will be
zipped and uploaded, otherwise each file is individually uploaded.
replace (bool): Whether to replace existing files or not.
"""
bucket_name, key = self.parse_s3_url(s3_project_url)
dbt_project_files = Path(project_dir).glob("**/*")

Expand All @@ -143,8 +158,11 @@ def push_dbt_project(self, s3_project_url: str, project_dir: str):
for _file in dbt_project_files:
zf.write(_file, arcname=_file.relative_to(project_dir))

self.load_file(
zip_file_path, key=s3_project_url, bucket_name=bucket_name, replace=True
self.load_file_handle_replace_error(
zip_file_path,
key=s3_project_url,
bucket_name=bucket_name,
replace=replace,
)
zip_file_path.unlink()

Expand All @@ -155,11 +173,47 @@ def push_dbt_project(self, s3_project_url: str, project_dir: str):

s3_key = f"s3://{bucket_name}/{key}{ _file.relative_to(project_dir)}"

self.load_file(
self.load_file_handle_replace_error(
filename=_file,
key=s3_key,
bucket_name=bucket_name,
replace=True,
replace=replace,
)

self.log.info("Pushed dbt project to: %s", s3_project_url)

def load_file_handle_replace_error(
self,
filename: os.PathLike,
key: str,
bucket_name: Optional[str] = None,
replace: bool = False,
encrypt: bool = False,
gzip: bool = False,
acl_policy: Optional[bool] = None,
) -> bool:
"""Calls S3Hook.load_file but handle ValueError when replacing existing keys.
Will also log a warning whenever attempting to replace an existing key with
replace = False.
Returns:
True if no ValueError was raised, False otherwise.
"""
success = True

try:
self.load_file(
filename,
key,
bucket_name=bucket_name,
replace=replace,
encrypt=encrypt,
gzip=gzip,
acl_policy=acl_policy,
)
except ValueError:
success = False
self.log.warning("Failed to load %s: key already exists in S3.", key)

return success
7 changes: 6 additions & 1 deletion airflow_dbt_python/operators/dbt.py
Expand Up @@ -89,6 +89,7 @@ def __init__(
s3_conn_id: str = "aws_default",
do_xcom_push_artifacts: Optional[list[str]] = None,
push_dbt_project: bool = False,
replace_on_push: bool = True,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand Down Expand Up @@ -134,6 +135,8 @@ def __init__(
self.s3_conn_id = s3_conn_id
self.do_xcom_push_artifacts = do_xcom_push_artifacts
self.push_dbt_project = push_dbt_project
self.replace_on_push = replace_on_push

self._s3_hook = None
self._dbt_hook = None

Expand Down Expand Up @@ -252,7 +255,9 @@ def dbt_directory(self) -> Iterator[str]:
and urlparse(str(store_project_dir)).scheme == "s3"
):
self.log.info("Pushing dbt project back to S3: %s", store_project_dir)
self.s3_hook.push_dbt_project(store_project_dir, tmp_dir)
self.s3_hook.push_dbt_project(
store_project_dir, tmp_dir, self.replace_on_push
)

self.profiles_dir = store_profiles_dir
self.project_dir = store_project_dir
Expand Down
1,174 changes: 632 additions & 542 deletions poetry.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions pyproject.toml
Expand Up @@ -50,6 +50,8 @@ pytest-postgresql = "^3.1.1"
psycopg2-binary = "^2.8.6"
isort = "^5.9.2"
moto = "^2.2.2"
freezegun = "^1.1.0"
types-freezegun = "^1.1.6"

[tool.poetry.extras]
redshift = ["dbt-redshift"]
Expand Down
139 changes: 132 additions & 7 deletions tests/hooks/s3/test_dbt_s3_hook.py
Expand Up @@ -4,6 +4,7 @@
from pathlib import Path
from zipfile import ZipFile

import freezegun
import pytest

try:
Expand Down Expand Up @@ -310,14 +311,11 @@ def test_push_dbt_project_to_zip_file(s3_bucket, tmpdir, test_files):
assert key is True


def test_push_dbt_project_to_files(s3_bucket, tmpdir, test_files):
"""Test pushing a dbt project to a S3 path."""
hook = DbtS3Hook()

# Ensure we are working with an empty S3 prefix.
def clean_s3_prefix(hook: DbtS3Hook, prefix: str, s3_bucket: str):
"""Ensure we are working with an empty S3 prefix."""
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/",
prefix,
)
if keys is not None and len(keys) > 0:
hook.delete_objects(
Expand All @@ -326,13 +324,140 @@ def test_push_dbt_project_to_files(s3_bucket, tmpdir, test_files):
)
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/",
prefix,
)
assert keys is None or len(keys) == 0


def test_push_dbt_project_to_files(s3_bucket, tmpdir, test_files):
"""Test pushing a dbt project to a S3 path."""
hook = DbtS3Hook()
prefix = f"s3://{s3_bucket}/project/"
clean_s3_prefix(hook, prefix, s3_bucket)

hook.push_dbt_project(f"s3://{s3_bucket}/project/", test_files[0].parent.parent)
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/",
)
assert len(keys) == 4


def test_push_dbt_project_with_no_replace(s3_bucket, tmpdir, test_files):
"""Test pushing a dbt project to a S3 path with replace = False.
We store the s3.Object last_modified attribute before pushing a project and compare it to the
new values after pushing (should be the same as we are not replacing).
"""

hook = DbtS3Hook()
prefix = f"s3://{s3_bucket}/project/"
clean_s3_prefix(hook, prefix, s3_bucket)
bucket = hook.get_bucket(s3_bucket)

last_modified_expected = {}

project_dir = test_files[0].parent.parent

with freezegun.freeze_time("2022-01-01"):

for _file in project_dir.glob("**/*"):
if _file.is_dir():
continue

with open(_file) as f:
file_content = f.read()

key = f"s3://{s3_bucket}/project/{_file.relative_to(project_dir)}"
bucket.put_object(Key=key, Body=file_content.encode())
obj = hook.get_key(
key,
s3_bucket,
)
last_modified_expected[key] = obj.last_modified

with freezegun.freeze_time("2022-02-02"):
# Try to push the same files, a month after.
# Should not be replaced since replace = False.
hook.push_dbt_project(f"s3://{s3_bucket}/project/", project_dir, replace=False)

keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/",
)
assert len(keys) == 4, keys

last_modified_result = {}

for key in keys:
obj = hook.get_key(
key,
s3_bucket,
)
last_modified_result[key] = obj.last_modified

assert last_modified_expected == last_modified_result


def test_push_dbt_project_with_partial_replace(s3_bucket, tmpdir, test_files):
"""Test pushing a dbt project to a S3 path with replace = False.
For this test we are looking for one file to be pushed while the rest are to be ignored
as they already exist and we are running with replace = False.
"""

hook = DbtS3Hook()
prefix = f"s3://{s3_bucket}/project/"
clean_s3_prefix(hook, prefix, s3_bucket)
bucket = hook.get_bucket(s3_bucket)

last_modified_expected = {}

project_dir = test_files[0].parent.parent

with freezegun.freeze_time("2022-01-01"):
for _file in project_dir.glob("**/*"):
if _file.is_dir():
continue

with open(_file) as f:
file_content = f.read()

key = f"s3://{s3_bucket}/project/{_file.relative_to(project_dir)}"
bucket.put_object(Key=key, Body=file_content.encode())
obj = hook.get_key(
key,
s3_bucket,
)
last_modified_expected[key] = obj.last_modified

# Delete a single key
hook.delete_objects(
s3_bucket,
[f"s3://{s3_bucket}/project/seeds/a_seed.csv"],
)

with freezegun.freeze_time("2022-02-02"):
# Attempt to push project a month after.
# Only one file should be pushed as the rest exist and we are passing replace = False.
hook.push_dbt_project(f"s3://{s3_bucket}/project/", project_dir, replace=False)
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/",
)
assert len(keys) == 4

last_modified_result = {}

for key in keys:
obj = hook.get_key(
key,
s3_bucket,
)
last_modified_result[key] = obj.last_modified

for key, value in last_modified_result.items():
if key == f"s3://{s3_bucket}/project/seeds/a_seed.csv":
assert value > last_modified_expected[key]
else:
assert value == last_modified_expected[key]
74 changes: 74 additions & 0 deletions tests/operators/test_dbt_deps.py
@@ -1,9 +1,11 @@
"""Unit test module for DbtDepsOperator."""
import datetime as dt
import glob
import os
from pathlib import Path
from unittest.mock import patch

import freezegun
import pytest

from airflow_dbt_python.hooks.dbt import DepsTaskConfig
Expand Down Expand Up @@ -199,3 +201,75 @@ def test_dbt_deps_doesnt_affect_non_package_files(
assert (
last_modified < os.stat(_file).st_mtime
), f"DbtDepsOperator did not change a package file: {_file}"


@no_s3_hook
def test_dbt_deps_push_to_s3_with_no_replace(
s3_bucket,
profiles_file,
dbt_project_file,
packages_file,
):
"""Test execution of DbtDepsOperator with a push to S3 at the end but with replace = False.
We would expect dbt_packages to be pushed (since they don't exist) but the rest of the project
files should not be replaced.
"""
hook = DbtS3Hook()
bucket = hook.get_bucket(s3_bucket)

project_files = (dbt_project_file, profiles_file, packages_file)
with freezegun.freeze_time("2022-01-01"):
for _file in project_files:
with open(_file) as pf:
content = pf.read()
bucket.put_object(Key=f"project/{_file.name}", Body=content.encode())

# Ensure we are working with an empty dbt_packages dir in S3.
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
if keys is not None and len(keys) > 0:
hook.delete_objects(
s3_bucket,
keys,
)
keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
assert keys is None or len(keys) == 0

with freezegun.freeze_time("2022-02-02"):
op = DbtDepsOperator(
task_id="dbt_task",
project_dir=f"s3://{s3_bucket}/project/",
profiles_dir=f"s3://{s3_bucket}/project/",
push_dbt_project=True,
replace_on_push=False,
)

results = op.execute({})
assert results is None

keys = hook.list_keys(
s3_bucket,
f"s3://{s3_bucket}/project/dbt_packages/",
)
assert len(keys) >= 0
# dbt_utils files may be anything, let's just check that at least
# "dbt_utils" exists as part of the key.
assert len([k for k in keys if "dbt_utils" in k]) >= 0

file_names = {(f.name for f in project_files)}
for key in keys:
obj = hook.get_key(
key,
s3_bucket,
)

if Path(key).name in file_names:
assert obj.last_modified == dt.datetime(2022, 1, 1, tzinfo=dt.timezone.utc)
else:
assert obj.last_modified == dt.datetime(2022, 2, 2, tzinfo=dt.timezone.utc)

0 comments on commit dd216ed

Please sign in to comment.