Skip to content
This repository has been archived by the owner on Jun 26, 2020. It is now read-only.

Commit

Permalink
Fix problem with long messages in Qpid
Browse files Browse the repository at this point in the history
Qpid has a limitation where it cannot serialize a dict containing
a string greater than 65535 characters.  This change alters the
Qpid implementation to JSON encode the dict before sending it, but
only if Qpid would fail to serialize it.  This maintains as much
backward compatibility as possible, though long messages will
still fail if they are sent to an older receiver.

Fixes bug 1175808

Change-Id: I5d104e099f523508dae2b657f7d06d96984b10f0
  • Loading branch information
Ben Nemec committed Jun 3, 2013
1 parent e3e4979 commit 7ce5441
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 0 deletions.
47 changes: 47 additions & 0 deletions openstack/common/rpc/impl_qpid.py
Expand Up @@ -31,6 +31,7 @@
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common

qpid_codec = importutils.try_import("qpid.codec010")
qpid_messaging = importutils.try_import("qpid.messaging")
qpid_exceptions = importutils.try_import("qpid.messaging.exceptions")

Expand Down Expand Up @@ -69,6 +70,8 @@

cfg.CONF.register_opts(qpid_opts)

JSON_CONTENT_TYPE = 'application/json; charset=utf8'


class ConsumerBase(object):
"""Consumer base class."""
Expand Down Expand Up @@ -123,10 +126,27 @@ def reconnect(self, session):
self.receiver = session.receiver(self.address)
self.receiver.capacity = 1

def _unpack_json_msg(self, msg):
"""Load the JSON data in msg if msg.content_type indicates that it
is necessary. Put the loaded data back into msg.content and
update msg.content_type appropriately.
A Qpid Message containing a dict will have a content_type of
'amqp/map', whereas one containing a string that needs to be converted
back from JSON will have a content_type of JSON_CONTENT_TYPE.
:param msg: a Qpid Message object
:returns: None
"""
if msg.content_type == JSON_CONTENT_TYPE:
msg.content = jsonutils.loads(msg.content)
msg.content_type = 'amqp/map'

def consume(self):
"""Fetch the message and pass it to the callback object."""
message = self.receiver.fetch()
try:
self._unpack_json_msg(message)
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
Expand Down Expand Up @@ -228,8 +248,35 @@ def reconnect(self, session):
"""Re-establish the Sender after a reconnection."""
self.sender = session.sender(self.address)

def _pack_json_msg(self, msg):
"""Qpid cannot serialize dicts containing strings longer than 65535
characters. This function dumps the message content to a JSON
string, which Qpid is able to handle.
:param msg: May be either a Qpid Message object or a bare dict.
:returns: A Qpid Message with its content field JSON encoded.
"""
try:
msg.content = jsonutils.dumps(msg.content)
except AttributeError:
# Need to have a Qpid message so we can set the content_type.
msg = qpid_messaging.Message(jsonutils.dumps(msg))
msg.content_type = JSON_CONTENT_TYPE
return msg

def send(self, msg):
"""Send a message."""
try:
# Check if Qpid can encode the message
check_msg = msg
if not hasattr(check_msg, 'content_type'):
check_msg = qpid_messaging.Message(msg)
content_type = check_msg.content_type
enc, dec = qpid_messaging.message.get_codec(content_type)
enc(check_msg.content)
except qpid_codec.CodecException:
# This means the message couldn't be serialized as a dict.
msg = self._pack_json_msg(msg)
self.sender.send(msg)


Expand Down
83 changes: 83 additions & 0 deletions tests/unit/rpc/test_qpid.py
Expand Up @@ -28,6 +28,7 @@
from oslo.config import cfg

from openstack.common import context
from openstack.common import jsonutils
from openstack.common.rpc import amqp as rpc_amqp
from openstack.common.rpc import common as rpc_common
from tests import utils
Expand Down Expand Up @@ -491,6 +492,88 @@ def test_call_with_timeout(self):
def test_multicall(self):
self._test_call(multi=True)

def _test_publisher(self, message=True):
"""Test that messages containing long strings are correctly serialized
in a way that Qpid can handle.
:param message: The publisher may be passed either a Qpid Message
object or a bare dict. This parameter controls which of those the test
will send.
"""
self.sent_msg = None

def send_stub(msg):
self.sent_msg = msg

# Qpid cannot serialize a dict containing a string > 65535 chars.
raw_msg = {'test': 'a' * 65536}
if message:
base_msg = qpid.messaging.Message(raw_msg)
else:
base_msg = raw_msg
expected_msg = qpid.messaging.Message(jsonutils.dumps(raw_msg))
expected_msg.content_type = impl_qpid.JSON_CONTENT_TYPE
mock_session = self.mox.CreateMock(self.orig_session)
mock_sender = self.mox.CreateMock(self.orig_sender)
mock_session.sender(mox.IgnoreArg()).AndReturn(mock_sender)
self.stubs.Set(mock_sender, 'send', send_stub)
self.mox.ReplayAll()

publisher = impl_qpid.Publisher(mock_session, 'test_node')
publisher.send(base_msg)

self.assertEqual(self.sent_msg.content, expected_msg.content)
self.assertEqual(self.sent_msg.content_type, expected_msg.content_type)

def test_publisher_long_message(self):
self._test_publisher(message=True)

def test_publisher_long_dict(self):
self._test_publisher(message=False)

def _test_consumer_long_message(self, json=True):
"""Verify that the Qpid implementation correctly deserializes
message content.
:param json: For compatibility, this code needs to support both
messages that are and are not JSON encoded. This param
specifies which is being tested.
"""
def fake_callback(msg):
self.received_msg = msg

# The longest string Qpid can handle itself
chars = 65535
if json:
# The first length that requires JSON encoding
chars = 65536
raw_msg = {'test': 'a' * chars}
if json:
fake_message = qpid.messaging.Message(jsonutils.dumps(raw_msg))
fake_message.content_type = impl_qpid.JSON_CONTENT_TYPE
else:
fake_message = qpid.messaging.Message(raw_msg)
mock_session = self.mox.CreateMock(self.orig_session)
mock_receiver = self.mox.CreateMock(self.orig_receiver)
mock_session.receiver(mox.IgnoreArg()).AndReturn(mock_receiver)
mock_receiver.fetch().AndReturn(fake_message)
mock_session.acknowledge(mox.IgnoreArg())
self.mox.ReplayAll()

consumer = impl_qpid.DirectConsumer(None,
mock_session,
'bogus_msg_id',
fake_callback)
consumer.consume()

self.assertEqual(self.received_msg, raw_msg)

def test_consumer_long_message(self):
self._test_consumer_long_message(json=True)

def test_consumer_long_message_no_json(self):
self._test_consumer_long_message(json=False)


#
#from nova.tests.rpc import common
Expand Down

0 comments on commit 7ce5441

Please sign in to comment.