From acaf5c4c4d4f03cd739177284782261cf36befff Mon Sep 17 00:00:00 2001 From: Kaitlyn Crawford Date: Tue, 22 Mar 2016 15:30:48 +0200 Subject: [PATCH 1/4] removed GoConversationClientTransport classes and combined client and server config classes --- vumi/transports/vumi_bridge/__init__.py | 6 +- .../vumi_bridge/tests/test_vumi_bridge.py | 9 +- vumi/transports/vumi_bridge/vumi_bridge.py | 111 +----------------- 3 files changed, 8 insertions(+), 118 deletions(-) diff --git a/vumi/transports/vumi_bridge/__init__.py b/vumi/transports/vumi_bridge/__init__.py index 79d243a96..f5f520b29 100644 --- a/vumi/transports/vumi_bridge/__init__.py +++ b/vumi/transports/vumi_bridge/__init__.py @@ -1,9 +1,5 @@ -from vumi.transports.vumi_bridge.vumi_bridge import ( - GoConversationTransport, GoConversationClientTransport, - GoConversationServerTransport) +from vumi.transports.vumi_bridge.vumi_bridge import GoConversationTransport __all__ = [ 'GoConversationTransport', - 'GoConversationClientTransport', - 'GoConversationServerTransport', ] diff --git a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py index 4063d0f3e..5936a507d 100644 --- a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py +++ b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py @@ -11,8 +11,7 @@ from vumi.tests.fake_connection import FakeHttpServer from vumi.tests.helpers import VumiTestCase from vumi.transports.tests.helpers import TransportHelper -from vumi.transports.vumi_bridge import ( - GoConversationClientTransport, GoConversationServerTransport) +from vumi.transports.vumi_bridge import GoConversationTransport from vumi.config import ConfigError from vumi.utils import http_request_full @@ -65,7 +64,7 @@ def get_next_request(self): class TestGoConversationTransport(TestGoConversationTransportBase): - transport_class = GoConversationClientTransport + transport_class = GoConversationTransport @inlineCallbacks def setup_transport(self, transport): @@ -183,9 +182,9 @@ def test_reconnecting(self): self.assertFalse(transport.reconnect_call) -class TestGoConversationServerTransport(TestGoConversationTransportBase): +class TestGoConversationTransport(TestGoConversationTransportBase): - transport_class = GoConversationServerTransport + transport_class = GoConversationTransport def test_server_settings_without_configs(self): return self.assertFailure(self.get_transport(), ConfigError) diff --git a/vumi/transports/vumi_bridge/vumi_bridge.py b/vumi/transports/vumi_bridge/vumi_bridge.py index 782c9a120..a75be1f3f 100644 --- a/vumi/transports/vumi_bridge/vumi_bridge.py +++ b/vumi/transports/vumi_bridge/vumi_bridge.py @@ -2,17 +2,13 @@ import base64 import json -import random from twisted.internet.defer import inlineCallbacks -from twisted.internet import reactor -from twisted.web.http_headers import Headers from twisted.web import http from twisted.web.resource import Resource from twisted.web.server import NOT_DONE_YET from vumi.transports import Transport -from vumi.transports.vumi_bridge.client import StreamingClient from vumi.config import ConfigText, ConfigDict, ConfigInt, ConfigFloat from vumi.persist.txredis_manager import TxRedisManager from vumi.message import TransportUserMessage, TransportEvent @@ -20,7 +16,7 @@ from vumi import log -class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS): +class VumiBridgeTransportConfig(Transport.CONFIG_CLASS): account_key = ConfigText( 'The account key to connect with.', static=True, required=True) conversation_key = ConfigText( @@ -58,11 +54,6 @@ class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS): # molar Planck constant times c, joule meter/mole default=0.11962656472, static=True) - - -class VumiBridgeServerTransportConfig(VumiBridgeClientTransportConfig): - # Most of this copied wholesale from vumi.transports.httprpc. - web_port = ConfigInt( "The port to listen for requests on, defaults to `0`.", default=0, static=True) @@ -153,102 +144,6 @@ def get_auth_headers(self): } -class GoConversationClientTransport(GoConversationTransportBase): - """ - This transport essentially connects as a client to Vumi Go's streaming - HTTP API [1]_. - - It allows one to bridge Vumi and Vumi Go installations. - - NOTE: Since we're basically bridging two separate installations we're - leaving some of the attributes that we would normally change the - same. Specifically `transport_type`. - - .. [1] https://github.com/praekelt/vumi-go/blob/develop/docs/http_api.rst - - """ - - CONFIG_CLASS = VumiBridgeClientTransportConfig - continue_trying = True - clock = reactor - - @inlineCallbacks - def setup_transport(self): - config = self.get_static_config() - self.redis = yield TxRedisManager.from_config( - config.redis_manager) - self.retries = 0 - self.delay = config.initial_delay - self.reconnect_call = None - self.client = StreamingClient(self.agent_factory) - self.connect_api_clients() - - def teardown_transport(self): - if self.reconnect_call: - self.reconnect_call.cancel() - self.reconnect_call = None - self.continue_trying = False - self.disconnect_api_clients() - - def connect_api_clients(self): - self.message_client = self.client.stream( - TransportUserMessage, self.handle_inbound_message, - log.error, self.get_url('messages.json'), - headers=Headers(self.get_auth_headers()), - on_connect=self.reset_reconnect_delay, - on_disconnect=self.reconnect_api_clients) - self.event_client = self.client.stream( - TransportEvent, self.handle_inbound_event, - log.error, self.get_url('events.json'), - headers=Headers(self.get_auth_headers()), - on_connect=self.reset_reconnect_delay, - on_disconnect=self.reconnect_api_clients) - - def reconnect_api_clients(self, reason): - self.disconnect_api_clients() - if not self.continue_trying: - log.msg('Not retrying because of explicit request') - return - - config = self.get_static_config() - self.retries += 1 - if (config.max_retries is not None - and (self.retries > config.max_retries)): - log.warning('Abandoning reconnecting after %s attempts.' % ( - self.retries)) - return - - self.delay = min(self.delay * config.factor, - config.max_reconnect_delay) - if config.jitter: - self.delay = random.normalvariate(self.delay, - self.delay * config.jitter) - log.msg('Will retry in %s seconds' % (self.delay,)) - self.reconnect_call = self.clock.callLater(self.delay, - self.connect_api_clients) - - def reset_reconnect_delay(self): - config = self.get_static_config() - self.delay = config.initial_delay - self.retries = 0 - self.reconnect_call = None - self.continue_trying = True - - def disconnect_api_clients(self): - self.message_client.disconnect() - self.event_client.disconnect() - - -class GoConversationTransport(GoConversationClientTransport): - - def setup_transport(self, *args, **kwargs): - log.warning( - 'GoConversationTransport is deprecated, please use ' - '`GoConversationClientTransport` instead.') - return super(GoConversationTransport, self).setup_transport( - *args, **kwargs) - - class GoConversationHealthResource(Resource): # Most of this copied wholesale from vumi.transports.httprpc. isLeaf = True @@ -283,10 +178,10 @@ def render_POST(self, request): return self.render_(request) -class GoConversationServerTransport(GoConversationTransportBase): +class GoConversationTransport(GoConversationTransportBase): # Most of this copied wholesale from vumi.transports.httprpc. - CONFIG_CLASS = VumiBridgeServerTransportConfig + CONFIG_CLASS = VumiBridgeTransportConfig @inlineCallbacks def setup_transport(self): From cfd5517e69bb6750a458c184bf2d06e3889a6c12 Mon Sep 17 00:00:00 2001 From: Kaitlyn Crawford Date: Tue, 22 Mar 2016 15:44:51 +0200 Subject: [PATCH 2/4] removed message_path and event_path from the config and added web_path --- vumi/transports/vumi_bridge/tests/test_vumi_bridge.py | 4 +--- vumi/transports/vumi_bridge/vumi_bridge.py | 11 ++++------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py index 5936a507d..f16a0e6f0 100644 --- a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py +++ b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py @@ -190,9 +190,7 @@ def test_server_settings_without_configs(self): return self.assertFailure(self.get_transport(), ConfigError) def get_configured_transport(self): - return self.get_transport( - message_path='messages.json', event_path='events.json', - web_port='0') + return self.get_transport(web_path='', web_port='0') def post_msg(self, url, msg_json): data = msg_json.encode('utf-8') diff --git a/vumi/transports/vumi_bridge/vumi_bridge.py b/vumi/transports/vumi_bridge/vumi_bridge.py index a75be1f3f..803f4b227 100644 --- a/vumi/transports/vumi_bridge/vumi_bridge.py +++ b/vumi/transports/vumi_bridge/vumi_bridge.py @@ -57,11 +57,8 @@ class VumiBridgeTransportConfig(Transport.CONFIG_CLASS): web_port = ConfigInt( "The port to listen for requests on, defaults to `0`.", default=0, static=True) - message_path = ConfigText( - "The path to listen for message requests on.", required=True, - static=True) - event_path = ConfigText( - "The path to listen for event requests on.", required=True, + web_path = ConfigText( + "The path to listen for inbound requests on.", required=True, static=True) health_path = ConfigText( "The path to listen for downstream health checks on" @@ -191,9 +188,9 @@ def setup_transport(self): self.web_resource = yield self.start_web_resources([ (GoConversationResource(self.handle_raw_inbound_message), - config.message_path), + "%s/messages.json" % (config.web_path)), (GoConversationResource(self.handle_raw_inbound_event), - config.event_path), + "%s/events.json" % (config.web_path)), (GoConversationHealthResource(self), config.health_path), ], config.web_port) From 04a96202c1ddc607fd90d5269766076566d55d65 Mon Sep 17 00:00:00 2001 From: Kaitlyn Crawford Date: Wed, 23 Mar 2016 12:16:12 +0200 Subject: [PATCH 3/4] detect and publish status when trying to send messages --- .../vumi_bridge/tests/test_vumi_bridge.py | 8 +++++++ vumi/transports/vumi_bridge/vumi_bridge.py | 23 ++++++++++++++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py index f16a0e6f0..0562caffc 100644 --- a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py +++ b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py @@ -34,6 +34,7 @@ def get_transport(self, **config): 'account_key': 'account-key', 'conversation_key': 'conversation-key', 'access_token': 'access-token', + 'publish_status': True, } defaults.update(config) transport = yield self.tx_helper.get_transport(defaults, start=False) @@ -269,3 +270,10 @@ def test_sending_messages(self): [ack] = yield self.tx_helper.wait_for_dispatched_events(1) self.assertEqual(ack['user_message_id'], msg['message_id']) self.assertEqual(ack['sent_message_id'], remote_id) + + [status] = yield self.tx_helper.wait_for_dispatched_statuses(1) + + self.assertEquals(status['status'], 'ok') + self.assertEquals(status['component'], 'inbound') + self.assertEquals(status['type'], 'good_request') + self.assertEquals(status['message'], 'Good request received') diff --git a/vumi/transports/vumi_bridge/vumi_bridge.py b/vumi/transports/vumi_bridge/vumi_bridge.py index 803f4b227..5398afb16 100644 --- a/vumi/transports/vumi_bridge/vumi_bridge.py +++ b/vumi/transports/vumi_bridge/vumi_bridge.py @@ -12,7 +12,7 @@ from vumi.config import ConfigText, ConfigDict, ConfigInt, ConfigFloat from vumi.persist.txredis_manager import TxRedisManager from vumi.message import TransportUserMessage, TransportEvent -from vumi.utils import to_kwargs, http_request_full +from vumi.utils import to_kwargs, http_request_full, StatusEdgeDetector from vumi import log @@ -122,6 +122,7 @@ def handle_outbound_message(self, message): if resp.code != http.OK: log.warning('Unexpected status code: %s, body: %s' % ( resp.code, resp.delivered_body)) + self.add_status_bad_req() yield self.publish_nack(message['message_id'], reason='Unexpected status code: %s' % ( resp.code,)) @@ -130,6 +131,7 @@ def handle_outbound_message(self, message): remote_message = json.loads(resp.delivered_body) yield self.map_message_id( remote_message['message_id'], message['message_id']) + self.add_status_good_req() yield self.publish_ack(user_message_id=message['message_id'], sent_message_id=remote_message['message_id']) @@ -140,6 +142,24 @@ def get_auth_headers(self): config.account_key, config.access_token))], } + @inlineCallbacks + def update_status(self, **kw): + '''Publishes a status if it is not a repeat of the previously + published status.''' + if self.status_detect.check_status(**kw): + yield self.publish_status(**kw) + # TODO: Notify Junebug + + def add_status_bad_req(self): + return self.update_status( + status='down', component='inbound', type='bad_request', + message='Bad request received') + + def add_status_good_req(self): + return self.update_status( + status='ok', component='inbound', type='good_request', + message='Good request received') + class GoConversationHealthResource(Resource): # Most of this copied wholesale from vumi.transports.httprpc. @@ -193,6 +213,7 @@ def setup_transport(self): "%s/events.json" % (config.web_path)), (GoConversationHealthResource(self), config.health_path), ], config.web_port) + self.status_detect = StatusEdgeDetector() def teardown_transport(self): return self.web_resource.loseConnection() From bd2c8b3941f311970fa054cb7441fdd28e8aeeaa Mon Sep 17 00:00:00 2001 From: Kaitlyn Crawford Date: Wed, 23 Mar 2016 12:18:27 +0200 Subject: [PATCH 4/4] removed old tests --- .../vumi_bridge/tests/test_vumi_bridge.py | 123 ------------------ 1 file changed, 123 deletions(-) diff --git a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py index 0562caffc..32b8062eb 100644 --- a/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py +++ b/vumi/transports/vumi_bridge/tests/test_vumi_bridge.py @@ -1,11 +1,8 @@ -import base64 import json from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue -from twisted.internet.error import ConnectionLost from twisted.internet.task import Clock from twisted.web.server import NOT_DONE_YET -from twisted.python.failure import Failure from vumi.message import TransportUserMessage from vumi.tests.fake_connection import FakeHttpServer @@ -63,126 +60,6 @@ def get_next_request(self): returnValue(req) -class TestGoConversationTransport(TestGoConversationTransportBase): - - transport_class = GoConversationTransport - - @inlineCallbacks - def setup_transport(self, transport): - transport.clock = self.clock - # when the transport fires up it starts two new connections, - # wait for them & name them accordingly - reqs = [] - reqs.append((yield self.get_next_request())) - reqs.append((yield self.get_next_request())) - if reqs[0].path.endswith('messages.json'): - self.message_req = reqs[0] - self.event_req = reqs[1] - else: - self.message_req = reqs[1] - self.event_req = reqs[0] - # put some data on the wire to have connectionMade called - self.message_req.write('') - self.event_req.write('') - - @inlineCallbacks - def test_auth_headers(self): - yield self.get_transport() - [msg_auth_header] = self.message_req.requestHeaders.getRawHeaders( - 'Authorization') - self.assertEqual(msg_auth_header, 'Basic %s' % ( - base64.b64encode('account-key:access-token'))) - [event_auth_header] = self.event_req.requestHeaders.getRawHeaders( - 'Authorization') - self.assertEqual(event_auth_header, 'Basic %s' % ( - base64.b64encode('account-key:access-token'))) - - @inlineCallbacks - def test_req_path(self): - yield self.get_transport() - base_url = "https://go.vumi.org/api/v1/go/http_api/" - self.assertEqual( - self.message_req.path, - base_url + 'conversation-key/messages.json') - self.assertEqual( - self.event_req.path, - base_url + 'conversation-key/events.json') - - @inlineCallbacks - def test_receiving_messages(self): - yield self.get_transport() - msg = self.tx_helper.make_inbound("inbound") - self.message_req.write(msg.to_json().encode('utf-8') + '\n') - [received_msg] = yield self.tx_helper.wait_for_dispatched_inbound(1) - self.assertEqual(received_msg['message_id'], msg['message_id']) - - @inlineCallbacks - def test_receiving_events(self): - transport = yield self.get_transport() - # prime the mapping - yield transport.map_message_id('remote', 'local') - ack = self.tx_helper.make_ack(event_id='event-id') - ack['user_message_id'] = 'remote' - self.event_req.write(ack.to_json().encode('utf-8') + '\n') - [received_ack] = yield self.tx_helper.wait_for_dispatched_events(1) - self.assertEqual(received_ack['event_id'], ack['event_id']) - self.assertEqual(received_ack['user_message_id'], 'local') - self.assertEqual(received_ack['sent_message_id'], 'remote') - - @inlineCallbacks - def test_sending_messages(self): - tx = yield self.get_transport() - msg = self.tx_helper.make_outbound( - "outbound", session_event=TransportUserMessage.SESSION_CLOSE) - d = self.tx_helper.dispatch_outbound(msg) - req = yield self.get_next_request() - received_msg = json.loads(req.content.read()) - self.assertEqual(received_msg, { - 'content': msg['content'], - 'in_reply_to': None, - 'to_addr': msg['to_addr'], - 'message_id': msg['message_id'], - 'session_event': TransportUserMessage.SESSION_CLOSE, - 'helper_metadata': {}, - }) - - remote_id = TransportUserMessage.generate_id() - reply = msg.copy() - reply['message_id'] = remote_id - req.write(reply.to_json().encode('utf-8')) - req.finish() - yield d - - [ack] = yield self.tx_helper.wait_for_dispatched_events(1) - self.assertEqual(ack['user_message_id'], msg['message_id']) - self.assertEqual(ack['sent_message_id'], remote_id) - - @inlineCallbacks - def test_reconnecting(self): - transport = yield self.get_transport() - message_client = transport.message_client - message_client.connectionLost(Failure(ConnectionLost('foo'))) - - config = transport.get_static_config() - - self.assertTrue(transport.delay > config.initial_delay) - self.assertEqual(transport.retries, 1) - self.assertTrue(transport.reconnect_call) - self.clock.advance(transport.delay + 0.1) - - # write something to ensure connectionMade() is called on - # the protocol - message_req = yield self.get_next_request() - message_req.write('') - - event_req = yield self.get_next_request() - event_req.write('') - - self.assertEqual(transport.delay, config.initial_delay) - self.assertEqual(transport.retries, 0) - self.assertFalse(transport.reconnect_call) - - class TestGoConversationTransport(TestGoConversationTransportBase): transport_class = GoConversationTransport