Skip to content

Commit

Permalink
Use paginated index queries in message store export HTTP API.
Browse files Browse the repository at this point in the history
  • Loading branch information
jerith committed Sep 11, 2014
1 parent 642dabe commit d3a7005
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 23 deletions.
64 changes: 41 additions & 23 deletions vumi/components/message_store_resource.py
Expand Up @@ -27,7 +27,6 @@ def chunks(l, n):
class MessageStoreProxyResource(Resource):

isLeaf = True
default_chunk_size = 10
default_concurrency = 10

def __init__(self, message_store, batch_id):
Expand All @@ -39,40 +38,59 @@ def render_GET(self, request):
resp_headers = request.responseHeaders
resp_headers.addRawHeader(
'Content-Type', 'application/json; charset=utf-8')
if 'chunk_size' in request.args:
chunk_size = int(request.args['chunk_size'][0])
else:
chunk_size = self.default_chunk_size

if 'concurrency' in request.args:
concurrency = int(request.args['concurrency'][0])
else:
concurrency = self.default_concurrency

d = self.get_keys(self.message_store, self.batch_id)
d.addCallback(lambda keys: list(chunks(keys, chunk_size)))
d.addCallback(self.fetch_chunks, concurrency, request)
d = self.get_keys_page(self.message_store, self.batch_id)
d.addCallback(self.fetch_pages, concurrency, request)
return NOT_DONE_YET

def get_keys(self, message_store, batch_id):
def get_keys_page(self, message_store, batch_id):
raise NotImplementedError('To be implemented by sub-class.')

def get_message(self, message_store, message_id):
raise NotImplementedError('To be implemented by sub-class.')

def fetch_pages(self, keys_page, concurrency, request):
"""
Process a page of keys and each subsequent page.
The keys for the current page are handed off to :meth:`fetch_page` for
processing. If there is another page, we fetch that while the current
page is being handled and add a callback to process it when the
current page is finished.
When there are no more pages, we add a callback to close the request.
"""
d = self.fetch_page(keys_page, concurrency, request)
if keys_page.has_next_page():
# We fetch the next page before waiting for the current page to be
# processed.
next_page_d = keys_page.next_page()
d.addCallback(lambda _: next_page_d)
# Add this method as a callback to operate on the next page. It's
# like recursion, but without worrying about stack size.
d.addCallback(self.fetch_pages, concurrency, request)
else:
# No more pages, so close the request.
d.addCallback(lambda _: request.finish())
return d

@inlineCallbacks
def fetch_chunks(self, chunked_keys, concurrency, request):
while chunked_keys:
block, chunked_keys = (
chunked_keys[:concurrency], chunked_keys[concurrency:])
yield self.handle_chunks(block, request)
request.finish()

def handle_chunks(self, chunks, request):
return DeferredList([
self.handle_chunk(chunk, request) for chunk in chunks])
def fetch_page(self, keys_page, concurrency, request):
"""
Process a page of keys in chunks of concurrently-fetched messages.
"""
for keys in chunks(list(keys_page), concurrency):
yield self.handle_chunk(keys, request)

def handle_chunk(self, message_keys, request):
"""
Concurrently fetch a chunk of messages and write each to the response.
"""
return DeferredList([
self.handle_message(key, request) for key in message_keys])

Expand All @@ -88,17 +106,17 @@ def write_message(self, message, request):

class InboundResource(MessageStoreProxyResource):

def get_keys(self, message_store, batch_id):
return message_store.batch_inbound_keys(batch_id)
def get_keys_page(self, message_store, batch_id):
return message_store.batch_inbound_keys_page(batch_id)

def get_message(self, message_store, message_id):
return message_store.get_inbound_message(message_id)


class OutboundResource(MessageStoreProxyResource):

def get_keys(self, message_store, batch_id):
return message_store.batch_outbound_keys(batch_id)
def get_keys_page(self, message_store, batch_id):
return message_store.batch_outbound_keys_page(batch_id)

def get_message(self, message_store, message_id):
return message_store.get_outbound_message(message_id)
Expand Down
13 changes: 13 additions & 0 deletions vumi/components/tests/test_message_store_resource.py
Expand Up @@ -92,3 +92,16 @@ def test_get_outbound(self):
self.assertEqual(
set([msg['message_id'] for msg in messages]),
set([msg1['message_id'], msg2['message_id']]))

@inlineCallbacks
def test_get_inbound_multiple_pages(self):
self.store.DEFAULT_MAX_RESULTS = 1
batch_id = yield self.make_batch(('foo', 'bar'))
msg1 = yield self.make_inbound(batch_id, 'føø')
msg2 = yield self.make_inbound(batch_id, 'føø')
resp = yield self.make_request('GET', batch_id, 'inbound.json')
messages = map(
json.loads, filter(None, resp.delivered_body.split('\n')))
self.assertEqual(
set([msg['message_id'] for msg in messages]),
set([msg1['message_id'], msg2['message_id']]))

0 comments on commit d3a7005

Please sign in to comment.