Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Only unpack replies on caller.

Opening the messages is unnecessary
in ZmqProxy, given that the msg_id
is already in the (zmq-specific)
message envelope. Additionally,
by not opening the messages, we
save a bit of complexity.

This avoids having to deserialize and
reserialize messages in ZmqProxy,
a necessary improvement for trusted-messaging
and should also have a markable performance
benefit.

Also cleans up msg_id handling a bit.

Change-Id: I31f365b9e1040503206b1d8320a0266a8121c821
  • Loading branch information...
commit b51a7241db53d87d780849563b99b5eee41761ba 1 parent 4552f10
Eric Windisch authored
Showing with 18 additions and 22 deletions.
  1. +18 −22 openstack/common/rpc/impl_zmq.py
View
40 openstack/common/rpc/impl_zmq.py
@@ -218,6 +218,8 @@ def __init__(self, addr, socket_type=None, bind=False):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
+ msg_id = msg_id or 0
+
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
@@ -294,13 +296,13 @@ def reply(self, ctx, proxy,
ctx.replies)
LOG.debug(_("Sending reply"))
- cast(CONF, ctx, topic, {
+ _multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
- 'msg_id': msg_id,
+ 'msg_id': msg_id, # Include for Folsom compat.
'response': response
}
- })
+ }, _msg_id=msg_id)
class ConsumerBase(object):
@@ -435,16 +437,8 @@ def consume(self, sock):
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
- # Handle zmq_replies magic
- if topic.startswith('fanout~'):
- sock_type = zmq.PUB
- elif topic.startswith('zmq_replies'):
+ if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
sock_type = zmq.PUB
- inside = rpc_common.deserialize_msg(_deserialize(in_msg))
- msg_id = inside[-1]['args']['msg_id']
- response = inside[-1]['args']['response']
- LOG.debug(_("->response->%s"), response)
- data = map(bytes, (msg_id, _serialize(response)))
else:
sock_type = zmq.PUSH
@@ -599,8 +593,8 @@ def consume_in_thread(self):
self.reactor.consume_in_thread()
-def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+def _cast(addr, context, topic, msg, timeout=None, serialize=True,
+ force_envelope=False, _msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@@ -609,7 +603,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
- conn.cast(msg_id, topic, payload, serialize, force_envelope)
+ conn.cast(_msg_id, topic, payload, serialize, force_envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@@ -617,7 +611,7 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None, serialize=True,
conn.close()
-def _call(addr, context, msg_id, topic, msg, timeout=None,
+def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@@ -653,7 +647,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
)
LOG.debug(_("Sending cast"))
- _cast(addr, context, msg_id, topic, payload,
+ _cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
LOG.debug(_("Cast sent; Waiting reply"))
@@ -661,10 +655,12 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
- responses = _deserialize(msg[-1])
+ responses = _deserialize(msg[-1])[-1]['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
+ except (IndexError, KeyError):
+ raise RPCException(_("RPC Message Invalid."))
finally:
if 'msg_waiter' in vars():
msg_waiter.close()
@@ -681,7 +677,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None,
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
- force_envelope=False):
+ force_envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@@ -707,10 +703,10 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
- _topic, _topic, msg, timeout, serialize,
- force_envelope)
+ _topic, msg, timeout, serialize,
+ force_envelope, _msg_id)
return
- return method(_addr, context, _topic, _topic, msg, timeout,
+ return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)
Please sign in to comment.
Something went wrong with that request. Please try again.