Skip to content

Commit

Permalink
Respect file extension setting for S3 destinations in Firehose provider
Browse files Browse the repository at this point in the history
The Firehose delivery stream implementation was not considering the file
extension setting when the target was defined as an S3 bucket. This fix
ensures that the specified file extension is appended to the S3 object's
key when writing data. The `_get_s3_object_path` method now accepts the
`file_extension` parameter to accommodate this enhancement.
  • Loading branch information
ozgenbaris1 authored and pinzon committed May 22, 2024
1 parent d50eec0 commit dd35352
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 2 deletions.
9 changes: 7 additions & 2 deletions localstack/services/firehose/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,6 +812,7 @@ def _put_records_to_s3_bucket(
):
bucket = s3_bucket_name(s3_destination_description["BucketARN"])
prefix = s3_destination_description.get("Prefix", "")
file_extension = s3_destination_description.get("FileExtension", "")

if role_arn := s3_destination_description.get("RoleARN"):
factory = connect_to.with_assumed_role(
Expand All @@ -824,15 +825,15 @@ def _put_records_to_s3_bucket(
)
batched_data = b"".join([base64.b64decode(r.get("Data") or r.get("data")) for r in records])

obj_path = self._get_s3_object_path(stream_name, prefix)
obj_path = self._get_s3_object_path(stream_name, prefix, file_extension)
try:
LOG.debug("Publishing to S3 destination: %s. Data: %s", bucket, batched_data)
s3.put_object(Bucket=bucket, Key=obj_path, Body=batched_data)
except Exception as e:
LOG.exception(f"Unable to put records {records} to s3 bucket.")
raise e

def _get_s3_object_path(self, stream_name, prefix):
def _get_s3_object_path(self, stream_name, prefix, file_extension):
# See https://aws.amazon.com/kinesis/data-firehose/faqs/#Data_delivery
# Path prefix pattern: myApp/YYYY/MM/DD/HH/
# Object name pattern: DeliveryStreamName-DeliveryStreamVersion-YYYY-MM-DD-HH-MM-SS-RandomString
Expand All @@ -841,6 +842,10 @@ def _get_s3_object_path(self, stream_name, prefix):
pattern = "{pre}%Y/%m/%d/%H/{name}-%Y-%m-%d-%H-%M-%S-{rand}"
path = pattern.format(pre=prefix, name=stream_name, rand=str(uuid.uuid4()))
path = timestamp(format=path)

if file_extension:
path += file_extension

return path

def _put_to_redshift(
Expand Down
20 changes: 20 additions & 0 deletions tests/aws/services/firehose/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,26 @@ def _get_data():
return _read_s3_data


@pytest.fixture
def list_s3_objects(aws_client):
s3 = aws_client.s3

def _list_s3_objects(bucket_name: str, timeout: int = 10):
def _list_objects() -> list[str]:
response = s3.list_objects_v2(Bucket=bucket_name)

if (contents := response.get("Contents")) is None:
raise Exception("No objects in bucket yet")

return [content.get("Key") for content in contents]

objects = retry(_list_objects, sleep=1, retries=timeout)

return objects

return _list_s3_objects


def get_firehose_iam_documents(
bucket_arns: Union[List[str], str], stream_arns: Union[List[str], str]
) -> tuple[dict, dict]:
Expand Down
41 changes: 41 additions & 0 deletions tests/aws/services/firehose/test_firehose.py
Original file line number Diff line number Diff line change
Expand Up @@ -532,3 +532,44 @@ def test_kinesis_firehose_kinesis_as_source_multiple_delivery_streams(
]
)
snapshot.match("kinesis-event-stream-multiple-delivery-streams", s3_data)

@markers.aws.unknown
def test_kinesis_firehose_s3_as_destination_with_file_extension(
self,
s3_bucket,
aws_client,
account_id,
firehose_create_delivery_stream,
list_s3_objects,
):
bucket_arn = arns.s3_bucket_arn(s3_bucket)
role_arn = f"arn:aws:iam::{account_id}:role/Firehose-Role"
delivery_stream_name = f"test-delivery-stream-{short_uid()}"
file_extension = ".txt"

firehose_create_delivery_stream(
DeliveryStreamName=delivery_stream_name,
DeliveryStreamType="DirectPut",
ExtendedS3DestinationConfiguration={
"BucketARN": bucket_arn,
"RoleARN": role_arn,
"FileExtension": file_extension,
"ErrorOutputPrefix": "errors",
},
)

# prepare sample message
data = base64.b64encode(TEST_MESSAGE.encode())
record = {"Data": data}

# put message to delivery stream
aws_client.firehose.put_record(
DeliveryStreamName=delivery_stream_name,
Record=record,
)

s3_objects = list_s3_objects(s3_bucket, timeout=300)

s3_object = s3_objects[0]

assert s3_object.endswith(file_extension)

0 comments on commit dd35352

Please sign in to comment.