Skip to content

Commit

Permalink
Decouple transport for RPC and Notification
Browse files Browse the repository at this point in the history
Add a new configuration option for setting up
an alternate notification_transport_url that
can be used for notifications. This allows
operators to separate the transport mechanisms
used for RPC and Notifications.

DocImpact

Closes-Bug: #1504622
Change-Id: Ief6f95ea906bfd95b3218a930c9db5d8a764beb9
  • Loading branch information
dims committed Nov 11, 2015
1 parent 4684374 commit 6621b90
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 20 deletions.
1 change: 1 addition & 0 deletions oslo_messaging/notify/__init__.py
Expand Up @@ -15,6 +15,7 @@

__all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_transport',
'get_notification_listener',
'NotificationResult',
'NotificationFilter',
Expand Down
9 changes: 5 additions & 4 deletions oslo_messaging/notify/listener.py
Expand Up @@ -19,12 +19,13 @@
To create a notification listener, you supply a transport, list of targets and
a list of endpoints.
A transport can be obtained simply by calling the get_transport() method::
A transport can be obtained simply by calling the get_notification_transport()
method::
transport = messaging.get_transport(conf)
transport = messaging.get_notification_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration. See get_transport() for more details.
messaging configuration. See get_notification_transport() for more details.
The target supplied when creating a notification listener expresses the topic
and - optionally - the exchange to listen on. See Target for more details
Expand Down Expand Up @@ -56,7 +57,7 @@ class ErrorEndpoint(object):
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = oslo_messaging.get_transport(cfg.CONF)
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
oslo_messaging.Target(topic='notifications')
oslo_messaging.Target(topic='notifications_bis')
Expand Down
2 changes: 1 addition & 1 deletion oslo_messaging/notify/log_handler.py
Expand Up @@ -21,7 +21,7 @@ def __init__(self, *args, **kwargs):
# at runtime.
import oslo_messaging
logging.Handler.__init__(self, *args, **kwargs)
self._transport = oslo_messaging.get_transport(cfg.CONF)
self._transport = oslo_messaging.get_notification_transport(cfg.CONF)
self._notifier = oslo_messaging.Notifier(
self._transport,
publisher_id='error.publisher')
Expand Down
3 changes: 1 addition & 2 deletions oslo_messaging/notify/logger.py
Expand Up @@ -19,7 +19,6 @@
from oslo_config import cfg

from oslo_messaging.notify import notifier
from oslo_messaging import transport


class LoggingNotificationHandler(logging.Handler):
Expand Down Expand Up @@ -47,7 +46,7 @@ class LoggingNotificationHandler(logging.Handler):
def __init__(self, url, publisher_id=None, driver=None,
topic=None, serializer=None):
self.notifier = notifier.Notifier(
transport.get_transport(self.CONF, url),
notifier.get_notification_transport(self.CONF, url),
publisher_id, driver,
topic,
serializer() if serializer else None)
Expand Down
3 changes: 2 additions & 1 deletion oslo_messaging/notify/middleware.py
Expand Up @@ -59,7 +59,8 @@ def _factory(app):

def __init__(self, app, **conf):
self.notifier = notify.Notifier(
oslo_messaging.get_transport(cfg.CONF, conf.get('url')),
oslo_messaging.get_notification_transport(cfg.CONF,
conf.get('url')),
publisher_id=conf.get('publisher_id',
os.path.basename(sys.argv[0])))
self.service_name = conf.get('service_name')
Expand Down
20 changes: 18 additions & 2 deletions oslo_messaging/notify/notifier.py
Expand Up @@ -25,13 +25,18 @@
from stevedore import named

from oslo_messaging import serializer as msg_serializer
from oslo_messaging import transport as msg_transport

_notifier_opts = [
cfg.MultiStrOpt('notification_driver',
default=[],
help='The Drivers(s) to handle sending notifications. '
'Possible values are messaging, messagingv2, '
'routing, log, test, noop'),
cfg.StrOpt('notification_transport_url',
help='A URL representing the messaging driver to use for '
'notifications. If not set, we fall back to the same '
'configuration used for RPC.'),
cfg.ListOpt('notification_topics',
default=['notifications', ],
deprecated_name='topics',
Expand Down Expand Up @@ -75,6 +80,15 @@ def notify(self, ctxt, msg, priority, retry):
pass


def get_notification_transport(conf, url=None,
allowed_remote_exmods=None, aliases=None):
if url is None:
conf.register_opts(_notifier_opts)
url = conf.notification_transport_url
return msg_transport.get_transport(conf, url,
allowed_remote_exmods, aliases)


class Notifier(object):

"""Send notification messages.
Expand All @@ -94,7 +108,8 @@ class Notifier(object):
A Notifier object can be instantiated with a transport object and a
publisher ID:
notifier = messaging.Notifier(get_transport(CONF), 'compute')
notifier = messaging.Notifier(get_notification_transport(CONF),
'compute')
and notifications are sent via drivers chosen with the notification_driver
config option and on the topics chosen with the notification_topics config
Expand All @@ -103,7 +118,8 @@ class Notifier(object):
Alternatively, a Notifier object can be instantiated with a specific
driver or topic::
notifier = notifier.Notifier(RPC_TRANSPORT,
transport = notifier.get_notification_transport(CONF)
notifier = notifier.Notifier(transport,
'compute.host',
driver='messaging',
topic='notifications')
Expand Down
31 changes: 21 additions & 10 deletions oslo_messaging/tests/notify/test_listener.py
Expand Up @@ -21,6 +21,7 @@

import oslo_messaging
from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import notifier as msg_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock

Expand Down Expand Up @@ -126,7 +127,8 @@ def setUp(self):
ListenerSetupMixin.setUp(self)

def test_constructor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo')
endpoints = [object()]

Expand All @@ -141,7 +143,8 @@ def test_constructor(self):
self.assertEqual('blocking', listener.executor)

def test_no_target_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

listener = oslo_messaging.get_notification_listener(
transport,
Expand All @@ -155,7 +158,8 @@ def test_no_target_topic(self):
self.assertTrue(False)

def test_unknown_executor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

try:
oslo_messaging.get_notification_listener(transport, [], [],
Expand All @@ -167,7 +171,8 @@ def test_unknown_executor(self):
self.assertTrue(False)

def test_one_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

endpoint = mock.Mock()
endpoint.info.return_value = None
Expand All @@ -184,7 +189,8 @@ def test_one_topic(self):
{'message_id': mock.ANY, 'timestamp': mock.ANY})

def test_two_topics(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

endpoint = mock.Mock()
endpoint.info.return_value = None
Expand All @@ -210,7 +216,8 @@ def test_two_topics(self):
any_order=True)

def test_two_exchanges(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

endpoint = mock.Mock()
endpoint.info.return_value = None
Expand Down Expand Up @@ -254,7 +261,8 @@ def side_effect(target, ctxt, message, version, retry):
any_order=True)

def test_two_endpoints(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

endpoint1 = mock.Mock()
endpoint1.info.return_value = None
Expand All @@ -279,7 +287,8 @@ def test_two_endpoints(self):
'message_id': mock.ANY})

def test_requeue(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()

Expand All @@ -303,7 +312,8 @@ def side_effect_requeue(*args, **kwargs):
{'timestamp': mock.ANY, 'message_id': mock.ANY})])

def test_two_pools(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

endpoint1 = mock.Mock()
endpoint1.info.return_value = None
Expand Down Expand Up @@ -336,7 +346,8 @@ def mocked_endpoint_call(i):
mocked_endpoint_call(1)])

def test_two_pools_three_listener(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')

endpoint1 = mock.Mock()
endpoint1.info.return_value = None
Expand Down

0 comments on commit 6621b90

Please sign in to comment.