Skip to content

Commit

Permalink
Merge bd2c8b3 into 0cc3836
Browse files Browse the repository at this point in the history
  • Loading branch information
KaitCrawford committed Mar 23, 2016
2 parents 0cc3836 + bd2c8b3 commit 9fae669
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 246 deletions.
6 changes: 1 addition & 5 deletions vumi/transports/vumi_bridge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
from vumi.transports.vumi_bridge.vumi_bridge import (
GoConversationTransport, GoConversationClientTransport,
GoConversationServerTransport)
from vumi.transports.vumi_bridge.vumi_bridge import GoConversationTransport

__all__ = [
'GoConversationTransport',
'GoConversationClientTransport',
'GoConversationServerTransport',
]
140 changes: 11 additions & 129 deletions vumi/transports/vumi_bridge/tests/test_vumi_bridge.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
import base64
import json

from twisted.internet.defer import inlineCallbacks, returnValue, DeferredQueue
from twisted.internet.error import ConnectionLost
from twisted.internet.task import Clock
from twisted.web.server import NOT_DONE_YET
from twisted.python.failure import Failure

from vumi.message import TransportUserMessage
from vumi.tests.fake_connection import FakeHttpServer
from vumi.tests.helpers import VumiTestCase
from vumi.transports.tests.helpers import TransportHelper
from vumi.transports.vumi_bridge import (
GoConversationClientTransport, GoConversationServerTransport)
from vumi.transports.vumi_bridge import GoConversationTransport
from vumi.config import ConfigError
from vumi.utils import http_request_full

Expand All @@ -35,6 +31,7 @@ def get_transport(self, **config):
'account_key': 'account-key',
'conversation_key': 'conversation-key',
'access_token': 'access-token',
'publish_status': True,
}
defaults.update(config)
transport = yield self.tx_helper.get_transport(defaults, start=False)
Expand Down Expand Up @@ -65,135 +62,13 @@ def get_next_request(self):

class TestGoConversationTransport(TestGoConversationTransportBase):

transport_class = GoConversationClientTransport

@inlineCallbacks
def setup_transport(self, transport):
transport.clock = self.clock
# when the transport fires up it starts two new connections,
# wait for them & name them accordingly
reqs = []
reqs.append((yield self.get_next_request()))
reqs.append((yield self.get_next_request()))
if reqs[0].path.endswith('messages.json'):
self.message_req = reqs[0]
self.event_req = reqs[1]
else:
self.message_req = reqs[1]
self.event_req = reqs[0]
# put some data on the wire to have connectionMade called
self.message_req.write('')
self.event_req.write('')

@inlineCallbacks
def test_auth_headers(self):
yield self.get_transport()
[msg_auth_header] = self.message_req.requestHeaders.getRawHeaders(
'Authorization')
self.assertEqual(msg_auth_header, 'Basic %s' % (
base64.b64encode('account-key:access-token')))
[event_auth_header] = self.event_req.requestHeaders.getRawHeaders(
'Authorization')
self.assertEqual(event_auth_header, 'Basic %s' % (
base64.b64encode('account-key:access-token')))

@inlineCallbacks
def test_req_path(self):
yield self.get_transport()
base_url = "https://go.vumi.org/api/v1/go/http_api/"
self.assertEqual(
self.message_req.path,
base_url + 'conversation-key/messages.json')
self.assertEqual(
self.event_req.path,
base_url + 'conversation-key/events.json')

@inlineCallbacks
def test_receiving_messages(self):
yield self.get_transport()
msg = self.tx_helper.make_inbound("inbound")
self.message_req.write(msg.to_json().encode('utf-8') + '\n')
[received_msg] = yield self.tx_helper.wait_for_dispatched_inbound(1)
self.assertEqual(received_msg['message_id'], msg['message_id'])

@inlineCallbacks
def test_receiving_events(self):
transport = yield self.get_transport()
# prime the mapping
yield transport.map_message_id('remote', 'local')
ack = self.tx_helper.make_ack(event_id='event-id')
ack['user_message_id'] = 'remote'
self.event_req.write(ack.to_json().encode('utf-8') + '\n')
[received_ack] = yield self.tx_helper.wait_for_dispatched_events(1)
self.assertEqual(received_ack['event_id'], ack['event_id'])
self.assertEqual(received_ack['user_message_id'], 'local')
self.assertEqual(received_ack['sent_message_id'], 'remote')

@inlineCallbacks
def test_sending_messages(self):
tx = yield self.get_transport()
msg = self.tx_helper.make_outbound(
"outbound", session_event=TransportUserMessage.SESSION_CLOSE)
d = self.tx_helper.dispatch_outbound(msg)
req = yield self.get_next_request()
received_msg = json.loads(req.content.read())
self.assertEqual(received_msg, {
'content': msg['content'],
'in_reply_to': None,
'to_addr': msg['to_addr'],
'message_id': msg['message_id'],
'session_event': TransportUserMessage.SESSION_CLOSE,
'helper_metadata': {},
})

remote_id = TransportUserMessage.generate_id()
reply = msg.copy()
reply['message_id'] = remote_id
req.write(reply.to_json().encode('utf-8'))
req.finish()
yield d

[ack] = yield self.tx_helper.wait_for_dispatched_events(1)
self.assertEqual(ack['user_message_id'], msg['message_id'])
self.assertEqual(ack['sent_message_id'], remote_id)

@inlineCallbacks
def test_reconnecting(self):
transport = yield self.get_transport()
message_client = transport.message_client
message_client.connectionLost(Failure(ConnectionLost('foo')))

config = transport.get_static_config()

self.assertTrue(transport.delay > config.initial_delay)
self.assertEqual(transport.retries, 1)
self.assertTrue(transport.reconnect_call)
self.clock.advance(transport.delay + 0.1)

# write something to ensure connectionMade() is called on
# the protocol
message_req = yield self.get_next_request()
message_req.write('')

event_req = yield self.get_next_request()
event_req.write('')

self.assertEqual(transport.delay, config.initial_delay)
self.assertEqual(transport.retries, 0)
self.assertFalse(transport.reconnect_call)


class TestGoConversationServerTransport(TestGoConversationTransportBase):

transport_class = GoConversationServerTransport
transport_class = GoConversationTransport

def test_server_settings_without_configs(self):
return self.assertFailure(self.get_transport(), ConfigError)

def get_configured_transport(self):
return self.get_transport(
message_path='messages.json', event_path='events.json',
web_port='0')
return self.get_transport(web_path='', web_port='0')

def post_msg(self, url, msg_json):
data = msg_json.encode('utf-8')
Expand Down Expand Up @@ -272,3 +147,10 @@ def test_sending_messages(self):
[ack] = yield self.tx_helper.wait_for_dispatched_events(1)
self.assertEqual(ack['user_message_id'], msg['message_id'])
self.assertEqual(ack['sent_message_id'], remote_id)

[status] = yield self.tx_helper.wait_for_dispatched_statuses(1)

self.assertEquals(status['status'], 'ok')
self.assertEquals(status['component'], 'inbound')
self.assertEquals(status['type'], 'good_request')
self.assertEquals(status['message'], 'Good request received')
137 changes: 25 additions & 112 deletions vumi/transports/vumi_bridge/vumi_bridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,25 +2,21 @@

import base64
import json
import random

from twisted.internet.defer import inlineCallbacks
from twisted.internet import reactor
from twisted.web.http_headers import Headers
from twisted.web import http
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET

from vumi.transports import Transport
from vumi.transports.vumi_bridge.client import StreamingClient
from vumi.config import ConfigText, ConfigDict, ConfigInt, ConfigFloat
from vumi.persist.txredis_manager import TxRedisManager
from vumi.message import TransportUserMessage, TransportEvent
from vumi.utils import to_kwargs, http_request_full
from vumi.utils import to_kwargs, http_request_full, StatusEdgeDetector
from vumi import log


class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS):
class VumiBridgeTransportConfig(Transport.CONFIG_CLASS):
account_key = ConfigText(
'The account key to connect with.', static=True, required=True)
conversation_key = ConfigText(
Expand Down Expand Up @@ -58,19 +54,11 @@ class VumiBridgeClientTransportConfig(Transport.CONFIG_CLASS):
# molar Planck constant times c, joule meter/mole
default=0.11962656472,
static=True)


class VumiBridgeServerTransportConfig(VumiBridgeClientTransportConfig):
# Most of this copied wholesale from vumi.transports.httprpc.

web_port = ConfigInt(
"The port to listen for requests on, defaults to `0`.",
default=0, static=True)
message_path = ConfigText(
"The path to listen for message requests on.", required=True,
static=True)
event_path = ConfigText(
"The path to listen for event requests on.", required=True,
web_path = ConfigText(
"The path to listen for inbound requests on.", required=True,
static=True)
health_path = ConfigText(
"The path to listen for downstream health checks on"
Expand Down Expand Up @@ -134,6 +122,7 @@ def handle_outbound_message(self, message):
if resp.code != http.OK:
log.warning('Unexpected status code: %s, body: %s' % (
resp.code, resp.delivered_body))
self.add_status_bad_req()
yield self.publish_nack(message['message_id'],
reason='Unexpected status code: %s' % (
resp.code,))
Expand All @@ -142,6 +131,7 @@ def handle_outbound_message(self, message):
remote_message = json.loads(resp.delivered_body)
yield self.map_message_id(
remote_message['message_id'], message['message_id'])
self.add_status_good_req()
yield self.publish_ack(user_message_id=message['message_id'],
sent_message_id=remote_message['message_id'])

Expand All @@ -152,101 +142,23 @@ def get_auth_headers(self):
config.account_key, config.access_token))],
}


class GoConversationClientTransport(GoConversationTransportBase):
"""
This transport essentially connects as a client to Vumi Go's streaming
HTTP API [1]_.
It allows one to bridge Vumi and Vumi Go installations.
NOTE: Since we're basically bridging two separate installations we're
leaving some of the attributes that we would normally change the
same. Specifically `transport_type`.
.. [1] https://github.com/praekelt/vumi-go/blob/develop/docs/http_api.rst
"""

CONFIG_CLASS = VumiBridgeClientTransportConfig
continue_trying = True
clock = reactor

@inlineCallbacks
def setup_transport(self):
config = self.get_static_config()
self.redis = yield TxRedisManager.from_config(
config.redis_manager)
self.retries = 0
self.delay = config.initial_delay
self.reconnect_call = None
self.client = StreamingClient(self.agent_factory)
self.connect_api_clients()

def teardown_transport(self):
if self.reconnect_call:
self.reconnect_call.cancel()
self.reconnect_call = None
self.continue_trying = False
self.disconnect_api_clients()

def connect_api_clients(self):
self.message_client = self.client.stream(
TransportUserMessage, self.handle_inbound_message,
log.error, self.get_url('messages.json'),
headers=Headers(self.get_auth_headers()),
on_connect=self.reset_reconnect_delay,
on_disconnect=self.reconnect_api_clients)
self.event_client = self.client.stream(
TransportEvent, self.handle_inbound_event,
log.error, self.get_url('events.json'),
headers=Headers(self.get_auth_headers()),
on_connect=self.reset_reconnect_delay,
on_disconnect=self.reconnect_api_clients)

def reconnect_api_clients(self, reason):
self.disconnect_api_clients()
if not self.continue_trying:
log.msg('Not retrying because of explicit request')
return

config = self.get_static_config()
self.retries += 1
if (config.max_retries is not None
and (self.retries > config.max_retries)):
log.warning('Abandoning reconnecting after %s attempts.' % (
self.retries))
return

self.delay = min(self.delay * config.factor,
config.max_reconnect_delay)
if config.jitter:
self.delay = random.normalvariate(self.delay,
self.delay * config.jitter)
log.msg('Will retry in %s seconds' % (self.delay,))
self.reconnect_call = self.clock.callLater(self.delay,
self.connect_api_clients)

def reset_reconnect_delay(self):
config = self.get_static_config()
self.delay = config.initial_delay
self.retries = 0
self.reconnect_call = None
self.continue_trying = True

def disconnect_api_clients(self):
self.message_client.disconnect()
self.event_client.disconnect()

def update_status(self, **kw):
'''Publishes a status if it is not a repeat of the previously
published status.'''
if self.status_detect.check_status(**kw):
yield self.publish_status(**kw)
# TODO: Notify Junebug

class GoConversationTransport(GoConversationClientTransport):
def add_status_bad_req(self):
return self.update_status(
status='down', component='inbound', type='bad_request',
message='Bad request received')

def setup_transport(self, *args, **kwargs):
log.warning(
'GoConversationTransport is deprecated, please use '
'`GoConversationClientTransport` instead.')
return super(GoConversationTransport, self).setup_transport(
*args, **kwargs)
def add_status_good_req(self):
return self.update_status(
status='ok', component='inbound', type='good_request',
message='Good request received')


class GoConversationHealthResource(Resource):
Expand Down Expand Up @@ -283,10 +195,10 @@ def render_POST(self, request):
return self.render_(request)


class GoConversationServerTransport(GoConversationTransportBase):
class GoConversationTransport(GoConversationTransportBase):
# Most of this copied wholesale from vumi.transports.httprpc.

CONFIG_CLASS = VumiBridgeServerTransportConfig
CONFIG_CLASS = VumiBridgeTransportConfig

@inlineCallbacks
def setup_transport(self):
Expand All @@ -296,11 +208,12 @@ def setup_transport(self):

self.web_resource = yield self.start_web_resources([
(GoConversationResource(self.handle_raw_inbound_message),
config.message_path),
"%s/messages.json" % (config.web_path)),
(GoConversationResource(self.handle_raw_inbound_event),
config.event_path),
"%s/events.json" % (config.web_path)),
(GoConversationHealthResource(self), config.health_path),
], config.web_port)
self.status_detect = StatusEdgeDetector()

def teardown_transport(self):
return self.web_resource.loseConnection()
Expand Down

0 comments on commit 9fae669

Please sign in to comment.