Skip to content

Commit

Permalink
Expose correct version_id in S3 event message
Browse files Browse the repository at this point in the history
  • Loading branch information
cburgmer authored and whummer committed Apr 13, 2019
1 parent 28ead93 commit 251aefa
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 6 deletions.
12 changes: 7 additions & 5 deletions localstack/services/s3/s3_listener.py
Expand Up @@ -78,7 +78,7 @@ def prefix_with_slash(s):
return s if s[0] == '/' else '/%s' % s


def get_event_message(event_name, bucket_name, file_name='testfile.txt', file_size=1024):
def get_event_message(event_name, bucket_name, file_name='testfile.txt', version_id=None, file_size=1024):
# Based on: http://docs.aws.amazon.com/AmazonS3/latest/dev/notification-content-structure.html
return {
'Records': [{
Expand Down Expand Up @@ -111,7 +111,7 @@ def get_event_message(event_name, bucket_name, file_name='testfile.txt', file_si
'key': file_name,
'size': file_size,
'eTag': 'd41d8cd98f00b204e9800998ecf8427e',
'versionId': '096fKKXTRTtl3on89fVO.nfljtsv6qko',
'versionId': version_id,
'sequencer': '0055AED6DCD90281E5'
}
}
Expand All @@ -126,7 +126,7 @@ def queue_url_for_arn(queue_arn):
QueueOwnerAWSAccountId=parts[4])['QueueUrl']


def send_notifications(method, bucket_name, object_path):
def send_notifications(method, bucket_name, object_path, version_id):
for bucket, b_cfg in iteritems(S3_NOTIFICATIONS):
if bucket == bucket_name:
action = {'PUT': 'ObjectCreated', 'POST': 'ObjectCreated', 'DELETE': 'ObjectRemoved'}[method]
Expand All @@ -143,7 +143,8 @@ def send_notifications(method, bucket_name, object_path):
# send notification
message = get_event_message(
event_name=event_name, bucket_name=bucket_name,
file_name=urlparse.urlparse(object_path[1:]).path
file_name=urlparse.urlparse(object_path[1:]).path,
version_id=version_id
)
message = json.dumps(message)
if b_cfg.get('Queue'):
Expand Down Expand Up @@ -547,8 +548,9 @@ def return_response(self, method, path, data, headers, response):
else:
parts = parsed.path[1:].split('/', 1)
object_path = parts[1] if parts[1][0] == '/' else '/%s' % parts[1]
version_id = response.headers.get('x-amz-version-id', None)

send_notifications(method, bucket_name, object_path)
send_notifications(method, bucket_name, object_path, version_id)

# publish event for creation/deletion of buckets:
if method in ('PUT', 'DELETE') and ('/' not in path[1:] or len(path[1:].split('/')[1]) <= 0):
Expand Down
12 changes: 11 additions & 1 deletion tests/integration/test_s3.py
Expand Up @@ -57,13 +57,15 @@ def test_s3_put_object_notification(self):

# create test bucket
self.s3_client.create_bucket(Bucket=TEST_BUCKET_WITH_NOTIFICATION)
self.s3_client.put_bucket_versioning(Bucket=TEST_BUCKET_WITH_NOTIFICATION,
VersioningConfiguration={'Status': 'Enabled'})
self.s3_client.put_bucket_notification_configuration(Bucket=TEST_BUCKET_WITH_NOTIFICATION,
NotificationConfiguration={'QueueConfigurations': [
{'QueueArn': queue_attributes['Attributes']['QueueArn'],
'Events': ['s3:ObjectCreated:*']}]})

# put an object where the bucket_name is in the path
self.s3_client.put_object(Bucket=TEST_BUCKET_WITH_NOTIFICATION, Key=key_by_path, Body='something')
obj = self.s3_client.put_object(Bucket=TEST_BUCKET_WITH_NOTIFICATION, Key=key_by_path, Body='something')

# put an object where the bucket_name is in the host
# it doesn't care about the authorization header as long as it's present
Expand All @@ -79,7 +81,15 @@ def test_s3_put_object_notification(self):
# the ApproximateNumberOfMessages attribute is a string
self.assertEqual(message_count, '2')

response = self.sqs_client.receive_message(QueueUrl=queue_url)
messages = [json.loads(to_str(m['Body'])) for m in response['Messages']]
record = messages[0]['Records'][0]
self.assertIsNotNone(record['s3']['object']['versionId'])
self.assertEquals(record['s3']['object']['versionId'], obj['VersionId'])

# clean up
self.s3_client.put_bucket_versioning(Bucket=TEST_BUCKET_WITH_NOTIFICATION,
VersioningConfiguration={'Status': 'Disabled'})
self.sqs_client.delete_queue(QueueUrl=queue_url)
self.s3_client.delete_objects(Bucket=TEST_BUCKET_WITH_NOTIFICATION,
Delete={'Objects': [{'Key': key_by_path}, {'Key': key_by_host}]})
Expand Down

0 comments on commit 251aefa

Please sign in to comment.