Permalink
Browse files

Fix IPC direct topic routing.

Direct messages were being stripped of the host
value when performing IPC forwarding. This caused
direct topics to be round-robined to all
services running on the system consuming from
the same base topic name.

i.e. if 'scheduler.host1' and 'scheduler.host2'
were running on the SAME machine, messages to
'scheduler.host1' may have been routed to
'scheduler.host2'.

Now, mulitple processing specifying different
rpc_zmq_host parameters will consume on
separate direct topics and will not
round-robin to other processes.

Adds a zmq-specific test to ensure that messages to
directed topics are not consumed by other
consumers of direct topics sharing a bare
topic on the same host.

Fixes bug 1123715

Change-Id: I939c24397e58492fc16561666aed3ca891325e9c
  • Loading branch information...
1 parent a1cc88f commit 6930432887f3551f88d08815fd04808fd15a07cc Eric Windisch committed Feb 1, 2013
Showing with 32 additions and 14 deletions.
  1. +17 −11 openstack/common/rpc/impl_zmq.py
  2. +6 −3 tests/unit/rpc/common.py
  3. +9 −0 tests/unit/rpc/test_zmq.py
@@ -439,11 +439,13 @@ def consume(self, sock):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic = data[1]
- topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~'):
+ sock_type = zmq.PUB
+ topic = topic.split('.', 1)[0]
+ elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@@ -588,23 +590,23 @@ class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
+ self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
- # Only consume on the base topic name.
- topic = topic.split('.', 1)[0]
-
- LOG.info(_("Create Consumer for topic (%(topic)s)") %
- {'topic': topic})
-
# Subscription scenarios
if fanout:
- subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
- topic = 'fanout~' + topic
+ subscribe = ('', fanout)[type(fanout) == str]
+ topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
+ topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
+
+ if topic in self.topics:
+ LOG.info(_("Skipping topic registration. Already registered."))
+ return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@@ -615,9 +617,11 @@ def create_consumer(self, topic, proxy, fanout=False):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
+ self.topics.append(topic)
def close(self):
self.reactor.close()
+ self.topics = []
def wait(self):
self.reactor.wait()
@@ -675,7 +679,9 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
- "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
+ "ipc://%s/zmq_topic_zmq_replies.%s" %
+ (CONF.rpc_zmq_ipc_dir,
+ CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
View
@@ -140,14 +140,17 @@ def test_context_passed(self):
"args": {"value": value}})
self.assertEqual(self.context.to_dict(), result)
- def _test_cast(self, method, value, args=None, fanout=False):
+ def _test_cast(self, method, value, args=None, fanout=False,
+ topic_nested=None):
"""Test casts by pushing items through a channeled queue.
@param: method a reference to a method returning a value
@param: value the value expected from method
@param: args optional dictionary arguments to method
@param: fanout boolean for use of rpc fanout method
"""
+ topic_nested = topic_nested or self.topic_nested
+
# Not a true global, but capitalized so
# it is clear it is leaking scope into Nested()
QUEUE = eventlet.queue.Queue()
@@ -163,7 +166,7 @@ def curry(*args, **kwargs):
QUEUE.put(method(*args, **kwargs))
nested = Nested()
- conn = self._create_consumer(nested, self.topic_nested, fanout)
+ conn = self._create_consumer(nested, topic_nested, fanout)
rpc_method = (self.rpc.cast, self.rpc.fanout_cast)[fanout]
@@ -173,7 +176,7 @@ def curry(*args, **kwargs):
msg['args'].update(args)
rpc_method(FLAGS, self.context,
- self.topic_nested,
+ topic_nested,
msg)
try:
@@ -127,3 +127,12 @@ def setUp(self):
super(RpcZmqDirectTopicTestCase, self).setUp(
topic='test.127.0.0.1',
topic_nested='nested.127.0.0.1')
+
+ def test_cast_wrong_direct_topic_failure(self):
+ try:
+ self._test_cast(common.TestReceiver.echo, 42, {"value": 42},
+ fanout=False, topic_nested='nested.localhost')
+ except Exception:
+ return
+ self.expectFailure("Message should not have been consumed.",
+ self.assertTrue, True)

0 comments on commit 6930432

Please sign in to comment.