Join GitHub today
GitHub is home to over 50 million developers working together to host and review code, manage projects, and build software together.
Sign upGitHub is where the world builds software
Millions of developers and companies build, ship, and maintain their software on GitHub — the largest and most advanced development platform in the world.
add s3collection_marker_each() helper for s3 consumers #3
Conversation
97f1b63
to
32ae8ce
| return s3collection_marker_each_cur(collection, cursor) | ||
|
|
||
|
|
||
| def s3collection_marker_json(collection, path): |
sirsgriffin
Sep 15, 2016
Contributor
Probably good to be able to set a property via the S3Consumer class + decorator that allows use to elect to feed the consumer a string (default ... cause everything is treating this as a string already) or json. Would be nice to write new consumers using json.
Probably good to be able to set a property via the S3Consumer class + decorator that allows use to elect to feed the consumer a string (default ... cause everything is treating this as a string already) or json. Would be nice to write new consumers using json.
| collection = collection.filter(Marker=marker) | ||
|
|
||
| for obj in collection: | ||
| yield obj |
sirsgriffin
Sep 15, 2016
Contributor
As long as we're working on Muskrat it would be nice to feed optional metadata into the consumer. Namely, the cursor is not available to the consumer except by inspecting the consumer property that is added to the function. Even then it only corresponds to the last cursor that was written. Would definitely be nice to have this in a few cases. The main one is, of course, logging.
As long as we're working on Muskrat it would be nice to feed optional metadata into the consumer. Namely, the cursor is not available to the consumer except by inspecting the consumer property that is added to the function. Even then it only corresponds to the last cursor that was written. Would definitely be nice to have this in a few cases. The main one is, of course, logging.
ender672
Sep 15, 2016
Author
Member
Thanks, you got me thinking about how to make cursor behavior extensible.
I'm experimenting with directly using S3Cursor in the consumer.
I like the S3Cursor class, and now I'm thinking this functionality that I had as standalone methods, e.g. s3collection_marker_each() should really be instance methods of S3Cursor:
class S3Cursor():
def filter_collection(self, collection):
marker = self.get()
if marker:
collection = collection.filter(Marker=marker)
return collection
def each(self, collection):
for obj in filter_collection(collection):
yield obj
self.update(obj.key)
This would highlight S3Cursor as the place for consumers to customize cursor behavior.
Here is a sample vanilla consumer:
for obj in S3Cursor('MyName').each(bucket_objects):
my_handler(obj)
Here is a consumer that wants to log cursor errors:
cursor = S3Cursor('MyName')
for obj in cursor.filter_collection(bucket_objects):
my_handler(obj)
try:
cursor.update(obj.key)
except IOError as e:
log_cursor_error(e)
Thanks, you got me thinking about how to make cursor behavior extensible.
I'm experimenting with directly using S3Cursor in the consumer.
I like the S3Cursor class, and now I'm thinking this functionality that I had as standalone methods, e.g. s3collection_marker_each() should really be instance methods of S3Cursor:
class S3Cursor():
def filter_collection(self, collection):
marker = self.get()
if marker:
collection = collection.filter(Marker=marker)
return collection
def each(self, collection):
for obj in filter_collection(collection):
yield obj
self.update(obj.key)This would highlight S3Cursor as the place for consumers to customize cursor behavior.
Here is a sample vanilla consumer:
for obj in S3Cursor('MyName').each(bucket_objects):
my_handler(obj)Here is a consumer that wants to log cursor errors:
cursor = S3Cursor('MyName')
for obj in cursor.filter_collection(bucket_objects):
my_handler(obj)
try:
cursor.update(obj.key)
except IOError as e:
log_cursor_error(e)This extends S3Cursor with boto3 collection helpers.
s3cursor.filter_collection(coll) - returns a new collection with marker
s3cursor.persist_progress(coll) - updates marker after each iteration
s3cursor.each(coll) - uses the above helpers to iterate collections
Here is a sample vanilla consumer:
for obj in S3Cursor('MyName').each(bucket_objects):
my_handler(obj)
The motivation behind this helper is to leverage boto3 collections,
which allow chaining.
This makes it possible to write consumers that control the s3 connection
details. It is also an experiment in creating an alternate API for
consumers.
Also, boto3 already lazy-loads s3 connections, so there is no longer a
need to lazy-load connections & buckets.
I confirmed that this works with the following script:
#!/usr/bin/env python
import boto3
s3 = boto3.resource('s3')
bucket = s3.Bucket('internal_analytics_test')
collection = bucket.objects.filter(Prefix='MUSKRAT')
Running the above with my network cable unplugged works fine -- boto3
doesn't make any connections until you actually list bucket contents or
fetch an object.
more details on boto3 buckets here:
https://boto3.readthedocs.io/en/latest/guide/migrations3.html#accessing-a-bucket
540956e
to
698660e
|
Hi Scott, can you take a look at the latest version? I reduced the scope of this PR to the following two additions to the Muskrat API: S3Cursor.at_path() s3cursor.each(collection) |
| self._cursor.update( msg.name ) | ||
|
|
||
| for obj in self._cursor.each(msg_iterator): | ||
| self.callback(obj.get()['Body'].read()) |
sirsgriffin
Sep 21, 2016
•
Contributor
What is the behavior of this if the obj/key is not message. If this key points to something that is considered a namespace (something like a 'sub directory'. Not sure what to call it in S3) then what happens?
What is the behavior of this if the obj/key is not message. If this key points to something that is considered a namespace (something like a 'sub directory'. Not sure what to call it in S3) then what happens?
ender672
Sep 21, 2016
Author
Member
I just created a test for this. Just to be sure that I have it right:
Given that you are monitoring the prefix "FOO/BAR/".
And the bucket is empty.
When you add object A with key "FOO/BAR/2016-09-21T13:53:23.594894"
And you add object B with key "FOO/BAR/BAZ/2016-09-21T13:54:37.164853"
And the muskrat consumer is invoked
Then we should process object A
And we should not process object B
Is this the right test? If so, the implementation in this PR fails -- it processes both A and B.
I just created a test for this. Just to be sure that I have it right:
Given that you are monitoring the prefix "FOO/BAR/".
And the bucket is empty.
When you add object A with key "FOO/BAR/2016-09-21T13:53:23.594894"
And you add object B with key "FOO/BAR/BAZ/2016-09-21T13:54:37.164853"
And the muskrat consumer is invoked
Then we should process object A
And we should not process object B
Is this the right test? If so, the implementation in this PR fails -- it processes both A and B.
| self._cursor.update( msg.name ) | ||
|
|
||
| for obj in self._cursor.each(msg_iterator): | ||
| self.callback(obj.get()['Body'].read()) |
ender672
Sep 21, 2016
•
Author
Member
Is satisfying this test the behavior that we want?
When a consumer is monitoring "FOO/BAR" do we want to ignore "FOO/BAR/BAZ/2016-09-21T13:54:37.164853" ?
Is satisfying this test the behavior that we want?
When a consumer is monitoring "FOO/BAR" do we want to ignore "FOO/BAR/BAZ/2016-09-21T13:54:37.164853" ?
ender672
Sep 21, 2016
Author
Member
One more thing to point out -- this PR doesn't use the "delimiter" parameter when listing S3 entries, so the collection shouldn't yield prefix objects.
One more thing to point out -- this PR doesn't use the "delimiter" parameter when listing S3 entries, so the collection shouldn't yield prefix objects.
This maintains the existing behavior, and discourages the use of top-level prefixes, which wouldn't work anyway because it would break timestamp ordering. test_s3consumer.py - add test for muskrat entry w/ extra levels
The motivation behind this helper is to leverage boto3 collections,
which allow chaining.
This makes it possible to write consumers that control the s3 connection
details. It is also an experiment in creating an alternate API for
consumers.
Usage example would be:
s3 = boto3.resource('s3')
collection = s3.Bucket(BUCKET).objects.filter(Prefix=PREFIX)
for obj in s3collection_marker_each(collection, MARKER_PATH):
print obj.get()['Body'].read()
The above is essentially the same as:
def process(obj):
print obj
c = S3Consumer(ROUTING_KEY, process, NAME)
c.consume()
Which is equivalent to:
@consumer(ROUTING_KEY)
def process(obj):
print obj
send_email_to_interested_client.consumer.consume()