diff --git a/vumi/components/message_store_resource.py b/vumi/components/message_store_resource.py index 9c8cb2ed8..5cf8ebcda 100644 --- a/vumi/components/message_store_resource.py +++ b/vumi/components/message_store_resource.py @@ -1,5 +1,7 @@ # -*- test-case-name: vumi.components.tests.test_message_store_resource -*- +from csv import writer + from twisted.application.internet import StreamServerEndpointService from twisted.internet.defer import DeferredList, inlineCallbacks from twisted.web.resource import NoResource, Resource @@ -24,20 +26,73 @@ def chunks(l, n): yield l[i:i + n] +class MessageFormatter(object): + """ Interface for writing messages to an HTTP request. """ + + def add_http_headers(self, request): + pass + + def write_row_header(self, request): + pass + + def write_row(self, request, message): + pass + + +class JsonFormatter(MessageFormatter): + """ Class for writing messages as JSON. """ + + def add_http_headers(self, request): + resp_headers = request.responseHeaders + resp_headers.addRawHeader( + 'Content-Type', 'application/json; charset=utf-8') + + def write_row(self, request, message): + request.write(message.to_json()) + request.write('\n') + + +class CsvFormatter(MessageFormatter): + """ Class for writing messages as CSV. """ + + FIELDS = ( + 'message_id', + 'to_addr', + 'from_addr', + 'in_reply_to', + 'session_event', + 'content', + 'group', + ) + + def add_http_headers(self, request): + resp_headers = request.responseHeaders + resp_headers.addRawHeader( + 'Content-Type', 'text/csv; charset=utf-8') + + def write_row_header(self, request): + writer(request).writerow(self.FIELDS) + + def write_row(self, request, message): + writer(request).writerow(list( + (message[key] or '').encode('utf-8') + for key in self.FIELDS)) + + class MessageStoreProxyResource(Resource): isLeaf = True default_concurrency = 10 - def __init__(self, message_store, batch_id): + def __init__(self, message_store, batch_id, formatter): Resource.__init__(self) self.message_store = message_store self.batch_id = batch_id + self.formatter = formatter def render_GET(self, request): - resp_headers = request.responseHeaders - resp_headers.addRawHeader( - 'Content-Type', 'application/json; charset=utf-8') + self.formatter.add_http_headers(request) + self.formatter.write_row_header(request) if 'concurrency' in request.args: concurrency = int(request.args['concurrency'][0]) @@ -100,8 +155,7 @@ def handle_message(self, message_key, request): return d def write_message(self, message, request): - request.write(message.to_json()) - request.write('\n') + self.formatter.write_row(request, message) class InboundResource(MessageStoreProxyResource): @@ -124,19 +178,24 @@ def get_message(self, message_store, message_id): class BatchResource(Resource): + RESOURCES = { + 'inbound.json': (InboundResource, JsonFormatter), + 'outbound.json': (OutboundResource, JsonFormatter), + 'inbound.csv': (InboundResource, CsvFormatter), + 'outbound.csv': (OutboundResource, CsvFormatter), + } + def __init__(self, message_store, batch_id): Resource.__init__(self) self.message_store = message_store self.batch_id = batch_id def getChild(self, path, request): - resource_class = { - 'inbound.json': InboundResource, - 'outbound.json': OutboundResource, - }.get(path) - if resource_class is None: + if path not in self.RESOURCES: return NoResource() - return resource_class(self.message_store, self.batch_id) + resource_class, message_formatter = self.RESOURCES.get(path) + return resource_class( + self.message_store, self.batch_id, message_formatter()) class MessageStoreResource(Resource): diff --git a/vumi/components/tests/test_message_store_resource.py b/vumi/components/tests/test_message_store_resource.py index 341ba81bc..6109c1b23 100644 --- a/vumi/components/tests/test_message_store_resource.py +++ b/vumi/components/tests/test_message_store_resource.py @@ -81,6 +81,22 @@ def test_get_inbound(self): set([msg['message_id'] for msg in messages]), set([msg1['message_id'], msg2['message_id']])) + @inlineCallbacks + def test_get_inbound_csv(self): + batch_id = yield self.make_batch(('foo', 'bar')) + msg1 = yield self.make_inbound(batch_id, 'føø') + msg2 = yield self.make_inbound(batch_id, 'føø') + resp = yield self.make_request('GET', batch_id, 'inbound.csv') + rows = resp.delivered_body.split('\r\n') + header, rows = rows[0], rows[1:-1] + self.assertEqual(header, ( + "message_id,to_addr,from_addr,in_reply_to,session_event,content," + "group")) + self.assertEqual(sorted(rows), sorted([ + "%s,9292,+41791234567,,,føø," % msg1['message_id'], + "%s,9292,+41791234567,,,føø," % msg2['message_id'], + ])) + @inlineCallbacks def test_get_outbound(self): batch_id = yield self.make_batch(('foo', 'bar')) @@ -93,6 +109,22 @@ def test_get_outbound(self): set([msg['message_id'] for msg in messages]), set([msg1['message_id'], msg2['message_id']])) + @inlineCallbacks + def test_get_outbound_csv(self): + batch_id = yield self.make_batch(('foo', 'bar')) + msg1 = yield self.make_outbound(batch_id, 'føø') + msg2 = yield self.make_outbound(batch_id, 'føø') + resp = yield self.make_request('GET', batch_id, 'outbound.csv') + rows = resp.delivered_body.split('\r\n') + header, rows = rows[0], rows[1:-1] + self.assertEqual(header, ( + "message_id,to_addr,from_addr,in_reply_to,session_event,content," + "group")) + self.assertEqual(sorted(rows), sorted([ + "%s,+41791234567,9292,,,føø," % msg1['message_id'], + "%s,+41791234567,9292,,,føø," % msg2['message_id'], + ])) + @inlineCallbacks def test_get_inbound_multiple_pages(self): self.store.DEFAULT_MAX_RESULTS = 1