Skip to content

Commit

Permalink
Migrate to oslo.messaging
Browse files Browse the repository at this point in the history
Glance currently uses a custom notifier and it has been maintaining it
for a long time. In a hope of reducing duplicated code and improving
cross-project contributions, this patch replaces the old notifier with
the one, recently developed, in oslo.messaging.

The oslo.messaging project is a port of the old oslo-rpc code to a
standalone, more stable and improved project. It brings all the benefits
that oslo-rpc would've brought as well as an easier way to integrate
with other projects.

This patch also:

    - Reduces the code shipped along with Glance since all the code
      copied from oslo-incubator related to the notifier is not needed
      anymore.
    - Improves the stability of existing, broker based, notifications.
    - Brings HA support.
    - Keeps backward compatibility by translating the old
      `notifier_strategy` into oslo.messaging drivers.

Changes to the code:

    - It is now necessary to pass the request context to the
      notification call.
    - Notifier package is no longer necessary. A notifier module was
      added instead.
    - New, notifier related, configurations were added.
    - A lot of code was removed

Since there's still not an official release, requirements.txt points to
the latest tarball created. A release for oslo.messaging is planned for
Icehouse.

docImpact
Implements bp oslo-messaging

Change-Id: I8cd84772bc5867e06b2a50ed7e15b9e86f0b94ad
  • Loading branch information
flaper87 committed Dec 4, 2013
1 parent 2f97e12 commit 90d6ef8
Show file tree
Hide file tree
Showing 20 changed files with 78 additions and 1,469 deletions.
9 changes: 8 additions & 1 deletion etc/glance-api.conf
Expand Up @@ -210,7 +210,14 @@ registry_client_protocol = http
# There are three methods of sending notifications, logging (via the
# log_file directive), rabbit (via a rabbitmq queue), qpid (via a Qpid
# message queue), or noop (no notifications sent, the default)
notifier_strategy = noop
# NOTE: THIS CONFIGURATION OPTION HAS BEEN DEPRECATED IN FAVOR OF `notification_driver`
# notifier_strategy = default

# Driver or drivers to handle sending notifications
# notification_driver = noop

# Default publisher_id for outgoing notifications.
# default_publisher_id = image.localhost

# Configuration options if sending notifications via rabbitmq (these are
# the defaults)
Expand Down
4 changes: 0 additions & 4 deletions glance/common/exception.py
Expand Up @@ -241,10 +241,6 @@ class StoreAddDisabled(GlanceException):
"store is disabled.")


class InvalidNotifierStrategy(GlanceException):
message = _("'%(strategy)s' is not an available notifier strategy.")


class MaxRedirectsExceeded(GlanceException):
message = _("Maximum redirects (%(redirects)s) was exceeded.")

Expand Down
96 changes: 50 additions & 46 deletions glance/notifier/__init__.py → glance/notifier.py
Expand Up @@ -17,16 +17,14 @@
# under the License.


import socket
import uuid
import warnings

from oslo.config import cfg
from oslo import messaging
import webob

from glance.common import exception
import glance.domain
import glance.domain.proxy
from glance.openstack.common import importutils
import glance.openstack.common.log as logging
from glance.openstack.common import timeutils

Expand All @@ -37,7 +35,10 @@
'notifications, logging (via the log_file directive), '
'rabbit (via a rabbitmq queue), qpid (via a Qpid '
'message queue), or noop (no notifications sent, the '
'default).'))
'default). (DEPRECATED)')),

cfg.StrOpt('default_publisher_id', default="image.localhost",
help='Default publisher_id for outgoing notifications'),
]

CONF = cfg.CONF
Expand All @@ -46,56 +47,54 @@
LOG = logging.getLogger(__name__)

_STRATEGY_ALIASES = {
"logging": "glance.notifier.notify_log.LoggingStrategy",
"rabbit": "glance.notifier.notify_kombu.RabbitStrategy",
"qpid": "glance.notifier.notify_qpid.QpidStrategy",
"noop": "glance.notifier.notify_noop.NoopStrategy",
"default": "glance.notifier.notify_noop.NoopStrategy",
"logging": "log",
"rabbit": "messaging",
"qpid": "messaging",
"noop": "noop",
"default": "noop",
}


class Notifier(object):
"""Uses a notification strategy to send out messages about events."""

def __init__(self, strategy=None):
_strategy = CONF.notifier_strategy
try:
strategy = _STRATEGY_ALIASES[_strategy]
msg = _('Converted strategy alias %s to %s')
LOG.debug(msg % (_strategy, strategy))
except KeyError:
strategy = _strategy
LOG.debug(_('No strategy alias found for %s') % strategy)

try:
strategy_class = importutils.import_class(strategy)
except ImportError:
raise exception.InvalidNotifierStrategy(strategy=strategy)
else:
self.strategy = strategy_class()

@staticmethod
def generate_message(event_type, priority, payload):
return {
"message_id": str(uuid.uuid4()),
"publisher_id": socket.gethostname(),
"event_type": event_type,
"priority": priority,
"payload": payload,
"timestamp": str(timeutils.utcnow()),
}
if CONF.notifier_strategy != 'default':
msg = _("notifier_strategy was deprecated in "
"favor of `notification_driver`")
warnings.warn(msg, DeprecationWarning)

# NOTE(flaper87): Use this to keep backwards
# compatibility. We'll try to get an oslo.messaging
# driver from the specified strategy.
_strategy = strategy or CONF.notifier_strategy
_driver = _STRATEGY_ALIASES.get(_strategy)

# NOTE(flaper87): The next 3 lines help
# with the migration to oslo.messaging.
# Without them, gate tests won't know
# what driver should be loaded.
# Once this patch lands, devstack will be
# updated and then these lines will be removed.
url = None
if _strategy in ['rabbit', 'qpid']:
url = _strategy + '://'

publisher_id = CONF.default_publisher_id
self._transport = messaging.get_transport(CONF, url)
self._notifier = messaging.Notifier(self._transport,
driver=_driver,
publisher_id=publisher_id)

def warn(self, event_type, payload):
msg = self.generate_message(event_type, "WARN", payload)
self.strategy.warn(msg)
self._notifier.warn({}, event_type, payload)

def info(self, event_type, payload):
msg = self.generate_message(event_type, "INFO", payload)
self.strategy.info(msg)
self._notifier.info({}, event_type, payload)

def error(self, event_type, payload):
msg = self.generate_message(event_type, "ERROR", payload)
self.strategy.error(msg)
self._notifier.error({}, event_type, payload)


def format_image_notification(image):
Expand Down Expand Up @@ -156,11 +155,13 @@ def __init__(self, image_repo, context, notifier):

def save(self, image):
super(ImageRepoProxy, self).save(image)
self.notifier.info('image.update', format_image_notification(image))
self.notifier.info('image.update',
format_image_notification(image))

def add(self, image):
super(ImageRepoProxy, self).add(image)
self.notifier.info('image.create', format_image_notification(image))
self.notifier.info('image.create',
format_image_notification(image))

def remove(self, image):
super(ImageRepoProxy, self).remove(image)
Expand Down Expand Up @@ -207,7 +208,8 @@ def get_data(self):
notify = self.notifier.info

try:
notify('image.send', self._format_image_send(sent))
notify('image.send',
self._format_image_send(sent))
except Exception as err:
msg = (_("An error occurred during image.send"
" notification: %(err)s") % {'err': err})
Expand Down Expand Up @@ -278,7 +280,8 @@ def __init__(self, task_repo, context, notifier):
item_proxy_kwargs=proxy_kwargs)

def add(self, task):
self.notifier.info('task.create', format_task_notification(task))
self.notifier.info('task.create',
format_task_notification(task))
return super(TaskRepoProxy, self).add(task)

def remove(self, task):
Expand Down Expand Up @@ -306,7 +309,8 @@ def __init__(self, task, context, notifier):
super(TaskProxy, self).__init__(task)

def run(self, executor):
self.notifier.info('task.run', format_task_notification(self.task))
self.notifier.info('task.run',
format_task_notification(self.task))
return super(TaskProxy, self).run(executor)

def begin_processing(self):
Expand Down

0 comments on commit 90d6ef8

Please sign in to comment.