Skip to content

Commit

Permalink
fix(s3): Split bucket name and key before uploading
Browse files Browse the repository at this point in the history
S3 files were uploaded to incorrect keys when running Airflow 2. This
was caused by differences between Airflow 1.10 and Airflow 2: the
latter assumes that when passing both bucket_name and key that the key
is to be taken as it is, where as the former seemed to work even when
the key contained the full url.

Now, we do the parsing and splitting ourselves. We would just set
bucket_name to None, however with Airflow 2.0 the upload function
checks for the presence of the bucket_name argument, to decide whether
to parse the url, not if it's set to None (I think this may be a bug).
  • Loading branch information
tomasfarias committed Feb 23, 2022
1 parent 6c20997 commit ba8c611
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 7 deletions.
10 changes: 4 additions & 6 deletions airflow_dbt_python/hooks/backends/s3.py
Expand Up @@ -92,12 +92,9 @@ def push_one(
name and key prefix will be extracted by calling S3Hook.parse_s3_url.
replace (bool): Whether to replace existing files or not.
"""
bucket_name, key = self.hook.parse_s3_url(str(destination))

self.load_file_handle_replace_error(
Path(source),
key=key,
bucket_name=bucket_name,
key=str(destination),
replace=replace,
)

Expand Down Expand Up @@ -134,7 +131,6 @@ def push_many(
self.load_file_handle_replace_error(
Path(zip_path),
key=str(destination),
bucket_name=bucket_name,
replace=replace,
)

Expand All @@ -148,7 +144,6 @@ def push_many(
self.load_file_handle_replace_error(
_file,
key=s3_key,
bucket_name=bucket_name,
replace=replace,
)

Expand Down Expand Up @@ -234,6 +229,9 @@ def load_file_handle_replace_error(
"""
success = True

if bucket_name is None:
bucket_name, key = self.hook.parse_s3_url(key)

self.log.info("Loading file %s to S3: %s", file_path, key)
try:
self.hook.load_file(
Expand Down
5 changes: 4 additions & 1 deletion tests/hooks/dbt/backends/test_dbt_s3_backend.py
Expand Up @@ -280,10 +280,13 @@ def test_push_dbt_project_to_zip_file(s3_bucket, s3_hook, tmpdir, test_files):
keys = s3_hook.list_keys(s3_bucket, f"s3://{s3_bucket}/project/")

key = s3_hook.check_for_key(
zip_s3_key,
"project/project.zip",
s3_bucket,
)
keys = s3_hook.list_keys(bucket_name=s3_bucket)

assert key is True
assert "project/project.zip" in keys


def test_push_dbt_project_to_files(s3_bucket, s3_hook, tmpdir, test_files):
Expand Down

0 comments on commit ba8c611

Please sign in to comment.