Skip to content

Commit

Permalink
Revert "oslo.messaging context must be a dict"
Browse files Browse the repository at this point in the history
This reverts commit 5df4cb5.

The commit in question appears to have broken the notification agent
as an unintended side-effect. Reverting pending further investigation
as the notification agent is the more crucial component.

Related-to: #1317290
Change-Id: If9b1d638b60b7029c67f45e2e9feda62b9fd042a
  • Loading branch information
Eoghan Glynn committed May 7, 2014
1 parent 0a21107 commit 3626903
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 48 deletions.
2 changes: 1 addition & 1 deletion ceilometer/api/controllers/v2.py
Expand Up @@ -595,7 +595,7 @@ def _send_notification(event, payload):
notification = event.replace(" ", "_")
notification = "alarm.%s" % notification
notifier = messaging.get_notifier(publisher_id="ceilometer.api")
notifier.info(context.RequestContext(), notification, payload)
notifier.info(None, notification, payload)


class OldSample(_Base):
Expand Down
44 changes: 7 additions & 37 deletions ceilometer/messaging.py
Expand Up @@ -18,8 +18,6 @@
from oslo.config import cfg
import oslo.messaging

from ceilometer.openstack.common import context

TRANSPORT = None
NOTIFIER = None

Expand All @@ -30,27 +28,6 @@
}


class RequestContextSerializer(oslo.messaging.Serializer):
def __init__(self, base):
self._base = base

def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)

def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)

def serialize_context(self, ctxt):
return ctxt.to_dict()

def deserialize_context(self, ctxt):
return context.RequestContext(ctxt)


def setup(url=None):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
Expand All @@ -59,8 +36,7 @@ def setup(url=None):
TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
aliases=_ALIASES)
if not NOTIFIER:
serializer = RequestContextSerializer(None)
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
NOTIFIER = oslo.messaging.Notifier(TRANSPORT)


def cleanup():
Expand All @@ -76,19 +52,15 @@ def get_rpc_server(topic, endpoint):
"""Return a configured oslo.messaging rpc server."""
global TRANSPORT
target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic)
serializer = RequestContextSerializer(None)
return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)
executor='eventlet')


def get_rpc_client(**kwargs):
"""Return a configured oslo.messaging RPCClient."""
global TRANSPORT
target = oslo.messaging.Target(**kwargs)
serializer = RequestContextSerializer(None)
return oslo.messaging.RPCClient(TRANSPORT, target,
serializer=serializer)
return oslo.messaging.RPCClient(TRANSPORT, target)


def get_notification_listener(targets, endpoints, url=None):
Expand All @@ -99,10 +71,8 @@ def get_notification_listener(targets, endpoints, url=None):
_ALIASES)
else:
transport = TRANSPORT
serializer = RequestContextSerializer(None)
return oslo.messaging.get_notification_listener(
transport, targets, endpoints, executor='eventlet',
serializer=serializer)
transport, targets, endpoints, executor='eventlet')


def get_notifier(publisher_id):
Expand All @@ -113,9 +83,9 @@ def get_notifier(publisher_id):

def convert_to_old_notification_format(priority, ctxt, publisher_id,
event_type, payload, metadata):
# FIXME(sileht): temporary convert notification to old format
# to focus on oslo.messaging migration before refactoring the code to
# use the new oslo.messaging facilities
#FIXME(sileht): temporary convert notification to old format
#to focus on oslo.messaging migration before refactoring the code to
#use the new oslo.messaging facilities
notification = {'priority': priority,
'payload': payload,
'event_type': event_type,
Expand Down
6 changes: 3 additions & 3 deletions ceilometer/publisher/rpc.py
Expand Up @@ -142,7 +142,7 @@ def publish_samples(self, context, samples):
self.flush()

def flush(self):
# NOTE(sileht):
#note(sileht):
# IO of the rpc stuff in handled by eventlet,
# this is why the self.local_queue, is emptied before processing the
# queue and the remaining messages in the queue are added to
Expand All @@ -164,7 +164,7 @@ def _check_queue_length(self):
"dropping %d oldest samples") % count)

def _process_queue(self, queue, policy):
# NOTE(sileht):
#note(sileht):
# the behavior of rpc.cast call depends of rabbit_max_retries
# if rabbit_max_retries <= 0:
# it returns only if the msg has been sent on the amqp queue
Expand All @@ -178,7 +178,7 @@ def _process_queue(self, queue, policy):
context, topic, meters = queue[0]
try:
self.rpc_client.prepare(topic=topic).cast(
context, self.target, data=meters)
context.to_dict(), self.target, data=meters)
except oslo.messaging._drivers.common.RPCException:
samples = sum([len(m) for __, __, m in queue])
if policy == 'queue':
Expand Down
7 changes: 2 additions & 5 deletions ceilometer/tests/alarm/test_rpc.py
Expand Up @@ -23,16 +23,14 @@

from ceilometer.alarm import rpc as rpc_alarm
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models


class TestRPCAlarmNotifier(test.BaseTestCase):
def fake_cast(self, ctxt, method, **args):
self.assertIsInstance(ctxt, context.RequestContext)
def fake_cast(self, context, method, **args):
self.notified.append((method, args))

def setUp(self):
Expand Down Expand Up @@ -132,8 +130,7 @@ def test_notify_no_actions(self):


class TestRPCAlarmPartitionCoordination(test.BaseTestCase):
def fake_fanout_cast(self, ctxt, method, **args):
self.assertIsInstance(ctxt, context.RequestContext)
def fake_fanout_cast(self, context, method, **args):
self.notified.append((method, args))

def fake_prepare(self, fanout):
Expand Down
2 changes: 0 additions & 2 deletions ceilometer/tests/api/v2/test_post_samples_scenarios.py
Expand Up @@ -23,7 +23,6 @@

import mock

from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import timeutils
from ceilometer.tests.api.v2 import FunctionalTest
Expand All @@ -33,7 +32,6 @@
class TestPostSamples(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
def fake_cast(self, ctxt, target, data):
self.assertIsInstance(ctxt, context.RequestContext)
for m in data:
del m['message_signature']
self.published.append(data)
Expand Down

0 comments on commit 3626903

Please sign in to comment.