Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add s3collection_marker_each() helper for s3 consumers #3

Merged
merged 2 commits into from Sep 26, 2016
Merged
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file
Failed to load files.

Always

Just for now

Next

s3consumer.py - use boto3, add boto3 collection helpers

This extends S3Cursor with boto3 collection helpers.

s3cursor.filter_collection(coll) - returns a new collection with marker
s3cursor.persist_progress(coll) - updates marker after each iteration
s3cursor.each(coll) - uses the above helpers to iterate collections

Here is a sample vanilla consumer:

  for obj in S3Cursor('MyName').each(bucket_objects):
      my_handler(obj)

The motivation behind this helper is to leverage boto3 collections,
which allow chaining.

This makes it possible to write consumers that control the s3 connection
details. It is also an experiment in creating an alternate API for
consumers.

Also, boto3 already lazy-loads s3 connections, so there is no longer a
need to lazy-load connections & buckets.

I confirmed that this works with the following script:

  #!/usr/bin/env python

  import boto3

  s3 = boto3.resource('s3')
  bucket = s3.Bucket('internal_analytics_test')
  collection = bucket.objects.filter(Prefix='MUSKRAT')

Running the above with my network cable unplugged works fine -- boto3
doesn't make any connections until you actually list bucket contents or
fetch an object.

more details on boto3 buckets here:

https://boto3.readthedocs.io/en/latest/guide/migrations3.html#accessing-a-bucket
  • Loading branch information
ender672 committed Sep 19, 2016
commit 698660e98c5cc5c8092bc2689ebd000d178169f0
@@ -10,7 +10,7 @@
import os
import time

import boto
import boto3
from muskrat.util import config_loader

class S3Cursor(object):
@@ -40,6 +40,11 @@ def __init__(self, name, type, **kwargs ):
else:
raise NotImplementedError('File cursor types currently the only types supported')

@classmethod
def at_path(cls, path):
"""Creates a cursor object at the given path."""
name = os.path.basename(path)
return cls(name, 'file', location=os.path.dirname(path))

def _update_file_cursor( self, key ):
#instead of opening and re-opening we could just seek and truncate
@@ -67,14 +72,29 @@ def update( self, key ):
def get( self ):
return self._get_func()

def filter_collection(self, collection):
"""lowlevel helper to filter an s3 object collection with marker."""
marker = self.get()
if marker:
collection = collection.filter(Marker=marker)
return collection

def persist_progress(self, collection):
"""Iterates through a collection, maintaining a persistent cursor."""
for obj in collection:
yield obj
self.update(obj.key)

def each(self, collection):
collection = self.filter_collection(collection)
return self.persist_progress(collection)


class S3Consumer(object):

def __init__(self, routing_key, func, name=None, config='config.py'):

self.config = config_loader( config )
self._s3conn = None
self._bucket = None
self.routing_key = routing_key.upper()
self.callback = func

@@ -92,15 +112,15 @@ def __init__(self, routing_key, func, name=None, config='config.py'):

@property
def s3conn(self):
if self._s3conn is None:
self._s3conn = boto.connect_s3( self.config.s3_key, self.config.s3_secret )
return self._s3conn
return boto3.resource(
's3',
aws_access_key_id=self.config.s3_key,
aws_secret_access_key=self.config.s3_secret,
)

@property
def bucket(self):
if self._bucket is None:
self._bucket = self.s3conn.get_bucket( self.config.s3_bucket )
return self._bucket
return self.s3conn.Bucket(self.config.s3_bucket)

def _gen_name(self, func):
""" Generates a cursor name so that the cursor can be re-attached to """
@@ -111,11 +131,8 @@ def _gen_routing_key( self, routing_key ):

def _get_msg_iterator(self):
#If marker is not matched to a key then the returned list is none.
msg_iterator = self.bucket.list(
prefix=self._gen_routing_key( self.routing_key ) + '/',
delimiter= '/',
marker=self._cursor.get()
)
prefix = self._gen_routing_key(self.routing_key) + '/'
msg_iterator = self.bucket.objects.filter(Prefix=prefix)

return msg_iterator

@@ -125,12 +142,8 @@ def consume(self):
#Update: actually... this doesn't seem to be a problem...
msg_iterator = self._get_msg_iterator()

for msg in msg_iterator:
#Sub 'directories' are prefix objects, so ignore them
if isinstance( msg, boto.s3.key.Key ):
self.callback( msg.get_contents_as_string() )
self._cursor.update( msg.name )

for obj in self._cursor.each(msg_iterator):
self.callback(obj.get()['Body'].read())

This comment has been minimized.

@sirsgriffin

sirsgriffin Sep 21, 2016
Contributor

What is the behavior of this if the obj/key is not message. If this key points to something that is considered a namespace (something like a 'sub directory'. Not sure what to call it in S3) then what happens?

This comment has been minimized.

@ender672

ender672 Sep 21, 2016
Author Member

I just created a test for this. Just to be sure that I have it right:

Given that you are monitoring the prefix "FOO/BAR/".
And the bucket is empty.

When you add object A with key "FOO/BAR/2016-09-21T13:53:23.594894"
And you add object B with key "FOO/BAR/BAZ/2016-09-21T13:54:37.164853"
And the muskrat consumer is invoked

Then we should process object A
And we should not process object B

Is this the right test? If so, the implementation in this PR fails -- it processes both A and B.

This comment has been minimized.

@ender672

ender672 Sep 21, 2016
Author Member

This is fixed in 698660e

This comment has been minimized.

@ender672

ender672 Sep 21, 2016
Author Member

Is satisfying this test the behavior that we want?

When a consumer is monitoring "FOO/BAR" do we want to ignore "FOO/BAR/BAZ/2016-09-21T13:54:37.164853" ?

This comment has been minimized.

@ender672

ender672 Sep 21, 2016
Author Member

One more thing to point out -- this PR doesn't use the "delimiter" parameter when listing S3 entries, so the collection shouldn't yield prefix objects.

This comment has been minimized.

@ender672

ender672 Sep 21, 2016
Author Member

The test I mentioned is here:
ender672/muskrat@cca4b32


def consumption_loop( self, interval=2 ):
"""
@@ -157,14 +170,11 @@ class S3AggregateConsumer( S3Consumer ):
def consume( self ):
msg_iterator = self._get_msg_iterator()

cursor = None
messages = []
for msg in msg_iterator:
if isinstance( msg, boto.s3.key.Key ):
messages.append( msg.get_contents_as_string() )
cursor = msg.name
objs = list(self._cursor.filter_collection(msg_iterator))
messages = [x.get()['Body'].read() for x in objs]

if messages:
cursor = objs[-1].key
self.callback( messages )
self._cursor.update( cursor )

@@ -8,17 +8,30 @@
"""
import unittest
import os
import tempfile
import uuid
from datetime import datetime

import boto
import boto3

os.environ['MUSKRAT'] = 'TEST'
from ..producer import S3Producer
from ..s3consumer import S3Consumer, Consumer
from ..s3consumer import S3Consumer, Consumer, S3Cursor
from ..util import config_loader

config_path = 'config.py'
TEST_KEY_PREFIX = 'Muskrat.Consumer'

class TempCursorFile():
def __enter__(self):
fd, self.path = tempfile.mkstemp()
os.close(fd)
return self.path

def __exit__(self, type, value, traceback):
os.remove(self.path)

class TestS3ConsumerBase( unittest.TestCase ):

def setUp(self):
@@ -113,7 +126,62 @@ def decorated_consumer( msg ):
self.assertIsInstance( decorated_consumer.consumer, S3Consumer, 'Decorator did not correctly attach S3Consumer' )

decorated_consumer.consumer.consume()



class TestS3CollectionEach(unittest.TestCase):
def setUp(self):
config = config_loader(config_path)
self.time_format = config.s3_timestamp_format
self.prefix = 'MUSKRAT/TEST/S3COLLECTIONEACH/'

s3 = boto3.resource(
's3',
aws_access_key_id=config.s3_key,
aws_secret_access_key=config.s3_secret,
)
self.bucket = s3.Bucket(config.s3_bucket)

def tearDown(self):
for obj in self.bucket.objects.filter(Prefix=self.prefix):
obj.delete()

def _add_message(self, message):
key = self.prefix + datetime.today().strftime(self.time_format)
self.bucket.put_object(Key=key, Body=message)

def test_s3collection_marker_each(self):
""" An s3collection iterator which persists the marker in a file """
collection = self.bucket.objects.filter(Prefix=self.prefix)

with TempCursorFile() as path:
cursor = S3Cursor.at_path(path)

# add a message to the queue
message = str(uuid.uuid4())
self._add_message(message)

# iterate over queue, validate message & marker
counter = 0
for obj in cursor.each(collection):
counter += 1
last_key = obj.key
self.assertEqual(message, obj.get()['Body'].read())
self.assertEqual(1, counter)
with open(path, 'r') as f:
self.assertEqual(last_key, f.read())

# add more messages to the queue
messages = [str(uuid.uuid4()), str(uuid.uuid4())]
self._add_message(messages[0])
self._add_message(messages[1])

# iterate over queue & validate messages
counter = 0
for obj in cursor.each(collection):
self.assertEqual(messages[counter], obj.get()['Body'].read())
counter += 1
self.assertEqual(2, counter)


if '__main__' == __name__:
unittest.main()
@@ -1,3 +1,4 @@
pika==0.9.8
boto==2.7.0
boto3==1.4.0
simplejson==3.0.7
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.