diff --git a/vumi/transports/vumi_bridge/__init__.py b/vumi/transports/vumi_bridge/__init__.py index 79d243a96..f5f520b29 100644 --- a/vumi/transports/vumi_bridge/__init__.py +++ b/vumi/transports/vumi_bridge/__init__.py @@ -1,9 +1,5 @@ -from vumi.transports.vumi_bridge.vumi_bridge import ( - GoConversationTransport, GoConversationClientTransport, - GoConversationServerTransport) +from vumi.transports.vumi_bridge.vumi_bridge import GoConversationTransport __all__ = [ 'GoConversationTransport', - 'GoConversationClientTransport', - 'GoConversationServerTransport', ] diff --git a/vumi/transports/vumi_bridge/client.py b/vumi/transports/vumi_bridge/client.py deleted file mode 100644 index 0dc1cb450..000000000 --- a/vumi/transports/vumi_bridge/client.py +++ /dev/null @@ -1,104 +0,0 @@ -# -*- test-case-name: vumi.transports.vumi_bridge.tests.test_client -*- -import json - -from twisted.internet.defer import Deferred -from twisted.internet import reactor -from twisted.web.client import Agent, ResponseDone, ResponseFailed -from twisted.web import http -from twisted.protocols import basic -from twisted.python.failure import Failure - -from vumi.message import Message -from vumi.utils import to_kwargs -from vumi import log -from vumi.errors import VumiError - - -class VumiBridgeError(VumiError): - """Raised by errors encountered by VumiBridge.""" - - -class VumiBridgeInvalidJsonError(VumiError): - """Raised when invalid JSON is received.""" - - -class VumiMessageReceiver(basic.LineReceiver): - - delimiter = '\n' - message_class = Message - - def __init__(self, message_class, callback, errback, on_connect=None, - on_disconnect=None): - self.message_class = message_class - self.callback = callback - self.errback = errback - self._response = None - self._wait_for_response = Deferred() - self._on_connect = on_connect or (lambda *a: None) - self._on_disconnect = on_disconnect or (lambda *a: None) - self.disconnecting = False - - def get_response(self): - return self._wait_for_response - - def handle_response(self, response): - self._response = response - if self._response.code == http.NO_CONTENT: - self._wait_for_response.callback(self._response) - else: - self._response.deliverBody(self) - - def lineReceived(self, line): - d = Deferred() - d.addCallback(self.callback) - d.addErrback(self.errback) - line = line.strip() - try: - data = json.loads(line) - d.callback(self.message_class( - _process_fields=True, **to_kwargs(data))) - except ValueError, e: - f = Failure(VumiBridgeInvalidJsonError(line)) - d.errback(f) - except Exception, e: - log.err() - f = Failure(e) - d.errback(f) - - def connectionMade(self): - self._on_connect() - - def connectionLost(self, reason): - # the PotentialDataLoss here is because Twisted didn't receive a - # content length header, which is normal because we're streaming. - if (reason.check(ResponseDone, ResponseFailed, http.PotentialDataLoss) - and self._response is not None - and not self._wait_for_response.called): - self._wait_for_response.callback(self._response) - if not self.disconnecting: - self._on_disconnect(reason) - - def disconnect(self): - self.disconnecting = True - if self.transport and self.transport._producer is not None: - self.transport._producer.loseConnection() - self.transport._stopProxying() - - -class StreamingClient(object): - - def __init__(self, agent_factory=None): - if agent_factory is None: - agent_factory = Agent - self.agent = agent_factory(reactor) - - def stream(self, message_class, callback, errback, url, - headers=None, on_connect=None, on_disconnect=None): - receiver = VumiMessageReceiver( - message_class, callback, errback, - on_connect=on_connect, - on_disconnect=on_disconnect) - d = self.agent.request('GET', url, headers) - d.addCallback(lambda response: receiver.handle_response(response)) - d.addErrback(log.err) - return receiver diff --git a/vumi/transports/vumi_bridge/tests/test_client.py b/vumi/transports/vumi_bridge/tests/test_client.py deleted file mode 100644 index a96802757..000000000 --- a/vumi/transports/vumi_bridge/tests/test_client.py +++ /dev/null @@ -1,63 +0,0 @@ -from twisted.internet.defer import inlineCallbacks, DeferredQueue -from twisted.web.server import NOT_DONE_YET -from twisted.web.client import Agent, ResponseDone - -from vumi.transports.vumi_bridge.client import ( - StreamingClient, VumiBridgeInvalidJsonError) -from vumi.message import Message -from vumi.tests.fake_connection import FakeHttpServer -from vumi.tests.helpers import VumiTestCase - - -class TestStreamingClient(VumiTestCase): - - def setUp(self): - self.fake_http = FakeHttpServer(self.handle_request) - self.request_queue = DeferredQueue() - self.client = StreamingClient(self.fake_http.get_agent) - self.messages_received = DeferredQueue() - self.errors_received = DeferredQueue() - self.disconnects_received = DeferredQueue() - - def reason_trapper(reason): - if reason.trap(ResponseDone): - self.disconnects_received.put(reason.getErrorMessage()) - - self.receiver = self.client.stream( - Message, - self.messages_received.put, self.errors_received.put, - "http://vumi-go-api.example.com/", on_disconnect=reason_trapper) - - def handle_request(self, request): - self.request_queue.put(request) - return NOT_DONE_YET - - def test_default_agent_factory(self): - """ - If `None` is passed as the `agent_factory`, `Agent` is used instead. - """ - self.assertNotIsInstance(self.client.agent, Agent) - self.assertIsInstance(StreamingClient(None).agent, Agent) - self.assertIsInstance(StreamingClient().agent, Agent) - - @inlineCallbacks - def test_callback_on_disconnect(self): - req = yield self.request_queue.get() - req.write( - '%s\n' % (Message(foo='bar').to_json().encode('utf-8'),)) - req.finish() - message = yield self.messages_received.get() - self.assertEqual(message['foo'], 'bar') - reason = yield self.disconnects_received.get() - # this is the error message we get when a ResponseDone is raised - # which happens when the remote server closes the connection. - self.assertEqual(reason, 'Response body fully received') - - @inlineCallbacks - def test_invalid_json(self): - req = yield self.request_queue.get() - req.write("Hello\n") - req.finish() - err = yield self.assertFailure( - self.errors_received.get(), VumiBridgeInvalidJsonError) - self.assertEqual(err.args, ("Hello",)) diff --git a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py index 4063d0f3e..d689bbb88 100644 --- a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py +++ b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py @@ -1,18 +1,14 @@ -import base64 import json from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue -from twisted.internet.error import ConnectionLost from twisted.internet.task import Clock from twisted.web.server import NOT_DONE_YET -from twisted.python.failure import Failure from vumi.message import TransportUserMessage from vumi.tests.fake_connection import FakeHttpServer from vumi.tests.helpers import VumiTestCase from vumi.transports.tests.helpers import TransportHelper -from vumi.transports.vumi_bridge import ( - GoConversationClientTransport, GoConversationServerTransport) +from vumi.transports.vumi_bridge import GoConversationTransport from vumi.config import ConfigError from vumi.utils import http_request_full @@ -35,6 +31,7 @@ def get_transport(self, **config): 'account_key': 'account-key', 'conversation_key': 'conversation-key', 'access_token': 'access-token', + 'publish_status': True, } defaults.update(config) transport = yield self.tx_helper.get_transport(defaults, start=False) @@ -65,135 +62,13 @@ def get_next_request(self): class TestGoConversationTransport(TestGoConversationTransportBase): - transport_class = GoConversationClientTransport - - @inlineCallbacks - def setup_transport(self, transport): - transport.clock = self.clock - # when the transport fires up it starts two new connections, - # wait for them & name them accordingly - reqs = [] - reqs.append((yield self.get_next_request())) - reqs.append((yield self.get_next_request())) - if reqs[0].path.endswith('messages.json'): - self.message_req = reqs[0] - self.event_req = reqs[1] - else: - self.message_req = reqs[1] - self.event_req = reqs[0] - # put some data on the wire to have connectionMade called - self.message_req.write('') - self.event_req.write('') - - @inlineCallbacks - def test_auth_headers(self): - yield self.get_transport() - [msg_auth_header] = self.message_req.requestHeaders.getRawHeaders( - 'Authorization') - self.assertEqual(msg_auth_header, 'Basic %s' % ( - base64.b64encode('account-key:access-token'))) - [event_auth_header] = self.event_req.requestHeaders.getRawHeaders( - 'Authorization') - self.assertEqual(event_auth_header, 'Basic %s' % ( - base64.b64encode('account-key:access-token'))) - - @inlineCallbacks - def test_req_path(self): - yield self.get_transport() - base_url = "https://go.vumi.org/api/v1/go/http_api/" - self.assertEqual( - self.message_req.path, - base_url + 'conversation-key/messages.json') - self.assertEqual( - self.event_req.path, - base_url + 'conversation-key/events.json') - - @inlineCallbacks - def test_receiving_messages(self): - yield self.get_transport() - msg = self.tx_helper.make_inbound("inbound") - self.message_req.write(msg.to_json().encode('utf-8') + '\n') - [received_msg] = yield self.tx_helper.wait_for_dispatched_inbound(1) - self.assertEqual(received_msg['message_id'], msg['message_id']) - - @inlineCallbacks - def test_receiving_events(self): - transport = yield self.get_transport() - # prime the mapping - yield transport.map_message_id('remote', 'local') - ack = self.tx_helper.make_ack(event_id='event-id') - ack['user_message_id'] = 'remote' - self.event_req.write(ack.to_json().encode('utf-8') + '\n') - [received_ack] = yield self.tx_helper.wait_for_dispatched_events(1) - self.assertEqual(received_ack['event_id'], ack['event_id']) - self.assertEqual(received_ack['user_message_id'], 'local') - self.assertEqual(received_ack['sent_message_id'], 'remote') - - @inlineCallbacks - def test_sending_messages(self): - tx = yield self.get_transport() - msg = self.tx_helper.make_outbound( - "outbound", session_event=TransportUserMessage.SESSION_CLOSE) - d = self.tx_helper.dispatch_outbound(msg) - req = yield self.get_next_request() - received_msg = json.loads(req.content.read()) - self.assertEqual(received_msg, { - 'content': msg['content'], - 'in_reply_to': None, - 'to_addr': msg['to_addr'], - 'message_id': msg['message_id'], - 'session_event': TransportUserMessage.SESSION_CLOSE, - 'helper_metadata': {}, - }) - - remote_id = TransportUserMessage.generate_id() - reply = msg.copy() - reply['message_id'] = remote_id - req.write(reply.to_json().encode('utf-8')) - req.finish() - yield d - - [ack] = yield self.tx_helper.wait_for_dispatched_events(1) - self.assertEqual(ack['user_message_id'], msg['message_id']) - self.assertEqual(ack['sent_message_id'], remote_id) - - @inlineCallbacks - def test_reconnecting(self): - transport = yield self.get_transport() - message_client = transport.message_client - message_client.connectionLost(Failure(ConnectionLost('foo'))) - - config = transport.get_static_config() - - self.assertTrue(transport.delay > config.initial_delay) - self.assertEqual(transport.retries, 1) - self.assertTrue(transport.reconnect_call) - self.clock.advance(transport.delay + 0.1) - - # write something to ensure connectionMade() is called on - # the protocol - message_req = yield self.get_next_request() - message_req.write('') - - event_req = yield self.get_next_request() - event_req.write('') - - self.assertEqual(transport.delay, config.initial_delay) - self.assertEqual(transport.retries, 0) - self.assertFalse(transport.reconnect_call) - - -class TestGoConversationServerTransport(TestGoConversationTransportBase): - - transport_class = GoConversationServerTransport + transport_class = GoConversationTransport def test_server_settings_without_configs(self): return self.assertFailure(self.get_transport(), ConfigError) def get_configured_transport(self): - return self.get_transport( - message_path='messages.json', event_path='events.json', - web_port='0') + return self.get_transport(web_path='test', web_port='0') def post_msg(self, url, msg_json): data = msg_json.encode('utf-8') @@ -212,6 +87,12 @@ def test_receiving_messages(self): [received_msg] = yield self.tx_helper.wait_for_dispatched_inbound(1) self.assertEqual(received_msg['message_id'], msg['message_id']) + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'ok') + self.assertEquals(status['component'], 'received-from-vumi-go') + self.assertEquals(status['type'], 'good_request') + self.assertEquals(status['message'], 'Good request received') + @inlineCallbacks def test_receive_bad_message(self): transport = yield self.get_configured_transport() @@ -221,8 +102,14 @@ def test_receive_bad_message(self): [failure] = self.flushLoggedErrors() self.assertTrue('No JSON object' in str(failure)) + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'down') + self.assertEquals(status['component'], 'received-from-vumi-go') + self.assertEquals(status['type'], 'bad_request') + self.assertEquals(status['message'], 'Bad request received') + @inlineCallbacks - def test_receiving_events(self): + def test_receiving_ack_events(self): transport = yield self.get_configured_transport() url = transport.get_transport_url('events.json') # prime the mapping @@ -236,6 +123,33 @@ def test_receiving_events(self): self.assertEqual(received_ack['user_message_id'], 'local') self.assertEqual(received_ack['sent_message_id'], 'remote') + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'ok') + self.assertEquals(status['component'], 'sent-by-vumi-go') + self.assertEquals(status['type'], 'vumi_go_sent') + self.assertEquals(status['message'], 'Sent by Vumi Go') + + @inlineCallbacks + def test_receiving_nack_events(self): + transport = yield self.get_configured_transport() + url = transport.get_transport_url('events.json') + # prime the mapping + yield transport.map_message_id('remote', 'local') + nack = self.tx_helper.make_nack(event_id='event-id') + nack['user_message_id'] = 'remote' + resp = yield self.post_msg(url, nack.to_json()) + self.assertEqual(resp.code, 200) + [received_nack] = yield self.tx_helper.wait_for_dispatched_events(1) + self.assertEqual(received_nack['event_id'], nack['event_id']) + self.assertEqual(received_nack['user_message_id'], 'local') + self.assertEqual(received_nack['sent_message_id'], 'remote') + + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'down') + self.assertEquals(status['component'], 'sent-by-vumi-go') + self.assertEquals(status['type'], 'vumi_go_failed') + self.assertEquals(status['message'], 'Vumi Go failed to send') + @inlineCallbacks def test_receive_bad_event(self): transport = yield self.get_configured_transport() @@ -245,6 +159,12 @@ def test_receive_bad_event(self): [failure] = self.flushLoggedErrors() self.assertTrue('No JSON object' in str(failure)) + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'down') + self.assertEquals(status['component'], 'sent-by-vumi-go') + self.assertEquals(status['type'], 'bad_request') + self.assertEquals(status['message'], 'Bad request sent') + @inlineCallbacks def test_sending_messages(self): yield self.get_configured_transport() @@ -272,3 +192,27 @@ def test_sending_messages(self): [ack] = yield self.tx_helper.wait_for_dispatched_events(1) self.assertEqual(ack['user_message_id'], msg['message_id']) self.assertEqual(ack['sent_message_id'], remote_id) + + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'ok') + self.assertEquals(status['component'], 'submitted-to-vumi-go') + self.assertEquals(status['type'], 'good_request') + self.assertEquals(status['message'], 'Message accepted by Vumi Go') + + @inlineCallbacks + def test_sending_bad_messages(self): + yield self.get_configured_transport() + msg = self.tx_helper.make_outbound( + "outbound", session_event=TransportUserMessage.SESSION_CLOSE) + self.tx_helper.dispatch_outbound(msg) + req = yield self.get_next_request() + req.setResponseCode(400, "Bad Request") + + req.finish() + + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + self.assertEquals(status['status'], 'down') + self.assertEquals(status['component'], 'submitted-to-vumi-go') + self.assertEquals(status['type'], 'bad_request') + self.assertEquals(status['message'], + 'Message submission rejected by Vumi Go') diff --git a/vumi/transports/vumi_bridge/vumi_bridge.py b/vumi/transports/vumi_bridge/vumi_bridge.py index 782c9a120..8fcad6fa8 100644 --- a/vumi/transports/vumi_bridge/vumi_bridge.py +++ b/vumi/transports/vumi_bridge/vumi_bridge.py @@ -2,25 +2,21 @@ import base64 import json -import random from twisted.internet.defer import inlineCallbacks -from twisted.internet import reactor -from twisted.web.http_headers import Headers from twisted.web import http from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET from vumi.transports import Transport -from vumi.transports.vumi_bridge.client import StreamingClient from vumi.config import ConfigText, ConfigDict, ConfigInt, ConfigFloat from vumi.persist.txredis_manager import TxRedisManager from vumi.message import TransportUserMessage, TransportEvent -from vumi.utils import to_kwargs, http_request_full +from vumi.utils import to_kwargs, http_request_full, StatusEdgeDetector from vumi import log -class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS): +class VumiBridgeTransportConfig(Transport.CONFIG_CLASS): account_key = ConfigText( 'The account key to connect with.', static=True, required=True) conversation_key = ConfigText( @@ -58,19 +54,11 @@ class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS): # molar Planck constant times c, joule meter/mole default=0.11962656472, static=True) - - -class VumiBridgeServerTransportConfig(VumiBridgeClientTransportConfig): - # Most of this copied wholesale from vumi.transports.httprpc. - web_port = ConfigInt( "The port to listen for requests on, defaults to `0`.", default=0, static=True) - message_path = ConfigText( - "The path to listen for message requests on.", required=True, - static=True) - event_path = ConfigText( - "The path to listen for event requests on.", required=True, + web_path = ConfigText( + "The path to listen for inbound requests on.", required=True, static=True) health_path = ConfigText( "The path to listen for downstream health checks on" @@ -134,6 +122,10 @@ def handle_outbound_message(self, message): if resp.code != http.OK: log.warning('Unexpected status code: %s, body: %s' % ( resp.code, resp.delivered_body)) + self.update_status( + status='down', component='submitted-to-vumi-go', + type='bad_request', + message='Message submission rejected by Vumi Go') yield self.publish_nack(message['message_id'], reason='Unexpected status code: %s' % ( resp.code,)) @@ -142,6 +134,9 @@ def handle_outbound_message(self, message): remote_message = json.loads(resp.delivered_body) yield self.map_message_id( remote_message['message_id'], message['message_id']) + self.update_status( + status='ok', component='submitted-to-vumi-go', + type='good_request', message='Message accepted by Vumi Go') yield self.publish_ack(user_message_id=message['message_id'], sent_message_id=remote_message['message_id']) @@ -152,101 +147,12 @@ def get_auth_headers(self): config.account_key, config.access_token))], } - -class GoConversationClientTransport(GoConversationTransportBase): - """ - This transport essentially connects as a client to Vumi Go's streaming - HTTP API [1]_. - - It allows one to bridge Vumi and Vumi Go installations. - - NOTE: Since we're basically bridging two separate installations we're - leaving some of the attributes that we would normally change the - same. Specifically `transport_type`. - - .. [1] https://github.com/praekelt/vumi-go/blob/develop/docs/http_api.rst - - """ - - CONFIG_CLASS = VumiBridgeClientTransportConfig - continue_trying = True - clock = reactor - @inlineCallbacks - def setup_transport(self): - config = self.get_static_config() - self.redis = yield TxRedisManager.from_config( - config.redis_manager) - self.retries = 0 - self.delay = config.initial_delay - self.reconnect_call = None - self.client = StreamingClient(self.agent_factory) - self.connect_api_clients() - - def teardown_transport(self): - if self.reconnect_call: - self.reconnect_call.cancel() - self.reconnect_call = None - self.continue_trying = False - self.disconnect_api_clients() - - def connect_api_clients(self): - self.message_client = self.client.stream( - TransportUserMessage, self.handle_inbound_message, - log.error, self.get_url('messages.json'), - headers=Headers(self.get_auth_headers()), - on_connect=self.reset_reconnect_delay, - on_disconnect=self.reconnect_api_clients) - self.event_client = self.client.stream( - TransportEvent, self.handle_inbound_event, - log.error, self.get_url('events.json'), - headers=Headers(self.get_auth_headers()), - on_connect=self.reset_reconnect_delay, - on_disconnect=self.reconnect_api_clients) - - def reconnect_api_clients(self, reason): - self.disconnect_api_clients() - if not self.continue_trying: - log.msg('Not retrying because of explicit request') - return - - config = self.get_static_config() - self.retries += 1 - if (config.max_retries is not None - and (self.retries > config.max_retries)): - log.warning('Abandoning reconnecting after %s attempts.' % ( - self.retries)) - return - - self.delay = min(self.delay * config.factor, - config.max_reconnect_delay) - if config.jitter: - self.delay = random.normalvariate(self.delay, - self.delay * config.jitter) - log.msg('Will retry in %s seconds' % (self.delay,)) - self.reconnect_call = self.clock.callLater(self.delay, - self.connect_api_clients) - - def reset_reconnect_delay(self): - config = self.get_static_config() - self.delay = config.initial_delay - self.retries = 0 - self.reconnect_call = None - self.continue_trying = True - - def disconnect_api_clients(self): - self.message_client.disconnect() - self.event_client.disconnect() - - -class GoConversationTransport(GoConversationClientTransport): - - def setup_transport(self, *args, **kwargs): - log.warning( - 'GoConversationTransport is deprecated, please use ' - '`GoConversationClientTransport` instead.') - return super(GoConversationTransport, self).setup_transport( - *args, **kwargs) + def update_status(self, **kw): + '''Publishes a status if it is not a repeat of the previously + published status.''' + if self.status_detect.check_status(**kw): + yield self.publish_status(**kw) class GoConversationHealthResource(Resource): @@ -283,10 +189,10 @@ def render_POST(self, request): return self.render_(request) -class GoConversationServerTransport(GoConversationTransportBase): +class GoConversationTransport(GoConversationTransportBase): # Most of this copied wholesale from vumi.transports.httprpc. - CONFIG_CLASS = VumiBridgeServerTransportConfig + CONFIG_CLASS = VumiBridgeTransportConfig @inlineCallbacks def setup_transport(self): @@ -296,11 +202,12 @@ def setup_transport(self): self.web_resource = yield self.start_web_resources([ (GoConversationResource(self.handle_raw_inbound_message), - config.message_path), + "%s/messages.json" % (config.web_path)), (GoConversationResource(self.handle_raw_inbound_event), - config.event_path), + "%s/events.json" % (config.web_path)), (GoConversationHealthResource(self), config.health_path), ], config.web_port) + self.status_detect = StatusEdgeDetector() def teardown_transport(self): return self.web_resource.loseConnection() @@ -314,7 +221,8 @@ def get_transport_url(self, suffix=''): balancer or proxy. """ addr = self.web_resource.getHost() - return "http://%s:%s/%s" % (addr.host, addr.port, suffix.lstrip('/')) + return "http://%s:%s/%s/%s" % ( + addr.host, addr.port, self.config["web_path"], suffix.lstrip('/')) @inlineCallbacks def handle_raw_inbound_event(self, request): @@ -323,10 +231,21 @@ def handle_raw_inbound_event(self, request): msg = TransportEvent(_process_fields=True, **to_kwargs(data)) yield self.handle_inbound_event(msg) request.finish() + if msg.payload["event_type"] == "ack": + self.update_status( + status='ok', component='sent-by-vumi-go', + type='vumi_go_sent', message='Sent by Vumi Go') + else: + self.update_status( + status='down', component='sent-by-vumi-go', + type='vumi_go_failed', message='Vumi Go failed to send') except Exception as e: log.err(e) request.setResponseCode(400) request.finish() + self.update_status( + status='down', component='sent-by-vumi-go', + type='bad_request', message='Bad request sent') @inlineCallbacks def handle_raw_inbound_message(self, request): @@ -336,7 +255,13 @@ def handle_raw_inbound_message(self, request): _process_fields=True, **to_kwargs(data)) yield self.handle_inbound_message(msg) request.finish() + self.update_status( + status='ok', component='received-from-vumi-go', + type='good_request', message='Good request received') except Exception as e: log.err(e) request.setResponseCode(400) request.finish() + self.update_status( + status='down', component='received-from-vumi-go', + type='bad_request', message='Bad request received')