diff --git a/vumi/transports/vumi_bridge/__init__.py b/vumi/transports/vumi_bridge/__init__.py index 79d243a96..f5f520b29 100644 --- a/vumi/transports/vumi_bridge/__init__.py +++ b/vumi/transports/vumi_bridge/__init__.py @@ -1,9 +1,5 @@ -from vumi.transports.vumi_bridge.vumi_bridge import ( - GoConversationTransport, GoConversationClientTransport, - GoConversationServerTransport) +from vumi.transports.vumi_bridge.vumi_bridge import GoConversationTransport __all__ = [ 'GoConversationTransport', - 'GoConversationClientTransport', - 'GoConversationServerTransport', ] diff --git a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py index 4063d0f3e..32b8062eb 100644 --- a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py +++ b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py @@ -1,18 +1,14 @@ -import base64 import json from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue -from twisted.internet.error import ConnectionLost from twisted.internet.task import Clock from twisted.web.server import NOT_DONE_YET -from twisted.python.failure import Failure from vumi.message import TransportUserMessage from vumi.tests.fake_connection import FakeHttpServer from vumi.tests.helpers import VumiTestCase from vumi.transports.tests.helpers import TransportHelper -from vumi.transports.vumi_bridge import ( - GoConversationClientTransport, GoConversationServerTransport) +from vumi.transports.vumi_bridge import GoConversationTransport from vumi.config import ConfigError from vumi.utils import http_request_full @@ -35,6 +31,7 @@ def get_transport(self, **config): 'account_key': 'account-key', 'conversation_key': 'conversation-key', 'access_token': 'access-token', + 'publish_status': True, } defaults.update(config) transport = yield self.tx_helper.get_transport(defaults, start=False) @@ -65,135 +62,13 @@ def get_next_request(self): class TestGoConversationTransport(TestGoConversationTransportBase): - transport_class = GoConversationClientTransport - - @inlineCallbacks - def setup_transport(self, transport): - transport.clock = self.clock - # when the transport fires up it starts two new connections, - # wait for them & name them accordingly - reqs = [] - reqs.append((yield self.get_next_request())) - reqs.append((yield self.get_next_request())) - if reqs[0].path.endswith('messages.json'): - self.message_req = reqs[0] - self.event_req = reqs[1] - else: - self.message_req = reqs[1] - self.event_req = reqs[0] - # put some data on the wire to have connectionMade called - self.message_req.write('') - self.event_req.write('') - - @inlineCallbacks - def test_auth_headers(self): - yield self.get_transport() - [msg_auth_header] = self.message_req.requestHeaders.getRawHeaders( - 'Authorization') - self.assertEqual(msg_auth_header, 'Basic %s' % ( - base64.b64encode('account-key:access-token'))) - [event_auth_header] = self.event_req.requestHeaders.getRawHeaders( - 'Authorization') - self.assertEqual(event_auth_header, 'Basic %s' % ( - base64.b64encode('account-key:access-token'))) - - @inlineCallbacks - def test_req_path(self): - yield self.get_transport() - base_url = "https://go.vumi.org/api/v1/go/http_api/" - self.assertEqual( - self.message_req.path, - base_url + 'conversation-key/messages.json') - self.assertEqual( - self.event_req.path, - base_url + 'conversation-key/events.json') - - @inlineCallbacks - def test_receiving_messages(self): - yield self.get_transport() - msg = self.tx_helper.make_inbound("inbound") - self.message_req.write(msg.to_json().encode('utf-8') + '\n') - [received_msg] = yield self.tx_helper.wait_for_dispatched_inbound(1) - self.assertEqual(received_msg['message_id'], msg['message_id']) - - @inlineCallbacks - def test_receiving_events(self): - transport = yield self.get_transport() - # prime the mapping - yield transport.map_message_id('remote', 'local') - ack = self.tx_helper.make_ack(event_id='event-id') - ack['user_message_id'] = 'remote' - self.event_req.write(ack.to_json().encode('utf-8') + '\n') - [received_ack] = yield self.tx_helper.wait_for_dispatched_events(1) - self.assertEqual(received_ack['event_id'], ack['event_id']) - self.assertEqual(received_ack['user_message_id'], 'local') - self.assertEqual(received_ack['sent_message_id'], 'remote') - - @inlineCallbacks - def test_sending_messages(self): - tx = yield self.get_transport() - msg = self.tx_helper.make_outbound( - "outbound", session_event=TransportUserMessage.SESSION_CLOSE) - d = self.tx_helper.dispatch_outbound(msg) - req = yield self.get_next_request() - received_msg = json.loads(req.content.read()) - self.assertEqual(received_msg, { - 'content': msg['content'], - 'in_reply_to': None, - 'to_addr': msg['to_addr'], - 'message_id': msg['message_id'], - 'session_event': TransportUserMessage.SESSION_CLOSE, - 'helper_metadata': {}, - }) - - remote_id = TransportUserMessage.generate_id() - reply = msg.copy() - reply['message_id'] = remote_id - req.write(reply.to_json().encode('utf-8')) - req.finish() - yield d - - [ack] = yield self.tx_helper.wait_for_dispatched_events(1) - self.assertEqual(ack['user_message_id'], msg['message_id']) - self.assertEqual(ack['sent_message_id'], remote_id) - - @inlineCallbacks - def test_reconnecting(self): - transport = yield self.get_transport() - message_client = transport.message_client - message_client.connectionLost(Failure(ConnectionLost('foo'))) - - config = transport.get_static_config() - - self.assertTrue(transport.delay > config.initial_delay) - self.assertEqual(transport.retries, 1) - self.assertTrue(transport.reconnect_call) - self.clock.advance(transport.delay + 0.1) - - # write something to ensure connectionMade() is called on - # the protocol - message_req = yield self.get_next_request() - message_req.write('') - - event_req = yield self.get_next_request() - event_req.write('') - - self.assertEqual(transport.delay, config.initial_delay) - self.assertEqual(transport.retries, 0) - self.assertFalse(transport.reconnect_call) - - -class TestGoConversationServerTransport(TestGoConversationTransportBase): - - transport_class = GoConversationServerTransport + transport_class = GoConversationTransport def test_server_settings_without_configs(self): return self.assertFailure(self.get_transport(), ConfigError) def get_configured_transport(self): - return self.get_transport( - message_path='messages.json', event_path='events.json', - web_port='0') + return self.get_transport(web_path='', web_port='0') def post_msg(self, url, msg_json): data = msg_json.encode('utf-8') @@ -272,3 +147,10 @@ def test_sending_messages(self): [ack] = yield self.tx_helper.wait_for_dispatched_events(1) self.assertEqual(ack['user_message_id'], msg['message_id']) self.assertEqual(ack['sent_message_id'], remote_id) + + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + + self.assertEquals(status['status'], 'ok') + self.assertEquals(status['component'], 'inbound') + self.assertEquals(status['type'], 'good_request') + self.assertEquals(status['message'], 'Good request received') diff --git a/vumi/transports/vumi_bridge/vumi_bridge.py b/vumi/transports/vumi_bridge/vumi_bridge.py index 782c9a120..5398afb16 100644 --- a/vumi/transports/vumi_bridge/vumi_bridge.py +++ b/vumi/transports/vumi_bridge/vumi_bridge.py @@ -2,25 +2,21 @@ import base64 import json -import random from twisted.internet.defer import inlineCallbacks -from twisted.internet import reactor -from twisted.web.http_headers import Headers from twisted.web import http from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET from vumi.transports import Transport -from vumi.transports.vumi_bridge.client import StreamingClient from vumi.config import ConfigText, ConfigDict, ConfigInt, ConfigFloat from vumi.persist.txredis_manager import TxRedisManager from vumi.message import TransportUserMessage, TransportEvent -from vumi.utils import to_kwargs, http_request_full +from vumi.utils import to_kwargs, http_request_full, StatusEdgeDetector from vumi import log -class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS): +class VumiBridgeTransportConfig(Transport.CONFIG_CLASS): account_key = ConfigText( 'The account key to connect with.', static=True, required=True) conversation_key = ConfigText( @@ -58,19 +54,11 @@ class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS): # molar Planck constant times c, joule meter/mole default=0.11962656472, static=True) - - -class VumiBridgeServerTransportConfig(VumiBridgeClientTransportConfig): - # Most of this copied wholesale from vumi.transports.httprpc. - web_port = ConfigInt( "The port to listen for requests on, defaults to `0`.", default=0, static=True) - message_path = ConfigText( - "The path to listen for message requests on.", required=True, - static=True) - event_path = ConfigText( - "The path to listen for event requests on.", required=True, + web_path = ConfigText( + "The path to listen for inbound requests on.", required=True, static=True) health_path = ConfigText( "The path to listen for downstream health checks on" @@ -134,6 +122,7 @@ def handle_outbound_message(self, message): if resp.code != http.OK: log.warning('Unexpected status code: %s, body: %s' % ( resp.code, resp.delivered_body)) + self.add_status_bad_req() yield self.publish_nack(message['message_id'], reason='Unexpected status code: %s' % ( resp.code,)) @@ -142,6 +131,7 @@ def handle_outbound_message(self, message): remote_message = json.loads(resp.delivered_body) yield self.map_message_id( remote_message['message_id'], message['message_id']) + self.add_status_good_req() yield self.publish_ack(user_message_id=message['message_id'], sent_message_id=remote_message['message_id']) @@ -152,101 +142,23 @@ def get_auth_headers(self): config.account_key, config.access_token))], } - -class GoConversationClientTransport(GoConversationTransportBase): - """ - This transport essentially connects as a client to Vumi Go's streaming - HTTP API [1]_. - - It allows one to bridge Vumi and Vumi Go installations. - - NOTE: Since we're basically bridging two separate installations we're - leaving some of the attributes that we would normally change the - same. Specifically `transport_type`. - - .. [1] https://github.com/praekelt/vumi-go/blob/develop/docs/http_api.rst - - """ - - CONFIG_CLASS = VumiBridgeClientTransportConfig - continue_trying = True - clock = reactor - @inlineCallbacks - def setup_transport(self): - config = self.get_static_config() - self.redis = yield TxRedisManager.from_config( - config.redis_manager) - self.retries = 0 - self.delay = config.initial_delay - self.reconnect_call = None - self.client = StreamingClient(self.agent_factory) - self.connect_api_clients() - - def teardown_transport(self): - if self.reconnect_call: - self.reconnect_call.cancel() - self.reconnect_call = None - self.continue_trying = False - self.disconnect_api_clients() - - def connect_api_clients(self): - self.message_client = self.client.stream( - TransportUserMessage, self.handle_inbound_message, - log.error, self.get_url('messages.json'), - headers=Headers(self.get_auth_headers()), - on_connect=self.reset_reconnect_delay, - on_disconnect=self.reconnect_api_clients) - self.event_client = self.client.stream( - TransportEvent, self.handle_inbound_event, - log.error, self.get_url('events.json'), - headers=Headers(self.get_auth_headers()), - on_connect=self.reset_reconnect_delay, - on_disconnect=self.reconnect_api_clients) - - def reconnect_api_clients(self, reason): - self.disconnect_api_clients() - if not self.continue_trying: - log.msg('Not retrying because of explicit request') - return - - config = self.get_static_config() - self.retries += 1 - if (config.max_retries is not None - and (self.retries > config.max_retries)): - log.warning('Abandoning reconnecting after %s attempts.' % ( - self.retries)) - return - - self.delay = min(self.delay * config.factor, - config.max_reconnect_delay) - if config.jitter: - self.delay = random.normalvariate(self.delay, - self.delay * config.jitter) - log.msg('Will retry in %s seconds' % (self.delay,)) - self.reconnect_call = self.clock.callLater(self.delay, - self.connect_api_clients) - - def reset_reconnect_delay(self): - config = self.get_static_config() - self.delay = config.initial_delay - self.retries = 0 - self.reconnect_call = None - self.continue_trying = True - - def disconnect_api_clients(self): - self.message_client.disconnect() - self.event_client.disconnect() - + def update_status(self, **kw): + '''Publishes a status if it is not a repeat of the previously + published status.''' + if self.status_detect.check_status(**kw): + yield self.publish_status(**kw) + # TODO: Notify Junebug -class GoConversationTransport(GoConversationClientTransport): + def add_status_bad_req(self): + return self.update_status( + status='down', component='inbound', type='bad_request', + message='Bad request received') - def setup_transport(self, *args, **kwargs): - log.warning( - 'GoConversationTransport is deprecated, please use ' - '`GoConversationClientTransport` instead.') - return super(GoConversationTransport, self).setup_transport( - *args, **kwargs) + def add_status_good_req(self): + return self.update_status( + status='ok', component='inbound', type='good_request', + message='Good request received') class GoConversationHealthResource(Resource): @@ -283,10 +195,10 @@ def render_POST(self, request): return self.render_(request) -class GoConversationServerTransport(GoConversationTransportBase): +class GoConversationTransport(GoConversationTransportBase): # Most of this copied wholesale from vumi.transports.httprpc. - CONFIG_CLASS = VumiBridgeServerTransportConfig + CONFIG_CLASS = VumiBridgeTransportConfig @inlineCallbacks def setup_transport(self): @@ -296,11 +208,12 @@ def setup_transport(self): self.web_resource = yield self.start_web_resources([ (GoConversationResource(self.handle_raw_inbound_message), - config.message_path), + "%s/messages.json" % (config.web_path)), (GoConversationResource(self.handle_raw_inbound_event), - config.event_path), + "%s/events.json" % (config.web_path)), (GoConversationHealthResource(self), config.health_path), ], config.web_port) + self.status_detect = StatusEdgeDetector() def teardown_transport(self): return self.web_resource.loseConnection()