Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add s3collection_marker_each() helper for s3 consumers #3

Merged
merged 2 commits into from Sep 26, 2016
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Prev

s3consumer.py - tell boto3 to use a delimiter

This maintains the existing behavior, and discourages the use of
top-level prefixes, which wouldn't work anyway because it would
break timestamp ordering.

test_s3consumer.py - add test for muskrat entry w/ extra levels
  • Loading branch information
ender672 committed Sep 21, 2016
commit bad422f7f73158bd4fe69fa27905b750c004d0d6
@@ -77,7 +77,7 @@ def filter_collection(self, collection):
marker = self.get()
if marker:
collection = collection.filter(Marker=marker)
return collection
return collection.filter(Delimiter='/')

def persist_progress(self, collection):
"""Iterates through a collection, maintaining a persistent cursor."""
@@ -182,6 +182,31 @@ def test_s3collection_marker_each(self):
counter += 1
self.assertEqual(2, counter)

def test_prefix_match_extra_levels(self):
collection = self.bucket.objects.filter(Prefix=self.prefix)

with TempCursorFile() as path:
cursor = S3Cursor.at_path(path)

# add a message to the queue
message1 = str(uuid.uuid4())
self._add_message(message1)

# add a message with extra levels
message2 = str(uuid.uuid4())
ts = datetime.today().strftime(self.time_format)
key = self.prefix + 'FOO/BAR/' + ts
self.bucket.put_object(Key=key, Body=message2)

# iterate over queue, validate message & marker
counter = 0
for obj in cursor.each(collection):
counter += 1
last_key = obj.key
self.assertEqual(message1, obj.get()['Body'].read())
self.assertEqual(1, counter)
with open(path, 'r') as f:
self.assertEqual(last_key, f.read())

if '__main__' == __name__:
unittest.main()
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.