Skip to content

Commit

Permalink
Fix RPC responses to allow None response correctly.
Browse files Browse the repository at this point in the history
Fixes bug 897155

Also adds a new fake rpc implementation that tests use by default.
This speeds up the test run by ~10% on my system.  We can decide to
ditch fake_rabbit at some point later..

Change-Id: I8877fad3d41ae055c15b1adff99e535c34e9ce92
  • Loading branch information
comstud committed Nov 29, 2011
1 parent e2a5955 commit 84693b4
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 75 deletions.
86 changes: 46 additions & 40 deletions nova/rpc/impl_carrot.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,13 @@ def process_data(self, message_data, message):
# we just log the message and send an error string
# back to the caller
LOG.warn(_('no method for message: %s') % message_data)
if msg_id:
msg_reply(msg_id,
_('No method for message: %s') % message_data)
ctxt.reply(msg_id,
_('No method for message: %s') % message_data)
return
self.pool.spawn_n(self._process_data, msg_id, ctxt, method, args)
self.pool.spawn_n(self._process_data, ctxt, method, args)

@exception.wrap_exception()
def _process_data(self, msg_id, ctxt, method, args):
def _process_data(self, ctxt, method, args):
"""Thread that magically looks for a method on the proxy
object and calls it.
"""
Expand All @@ -283,23 +282,18 @@ def _process_data(self, msg_id, ctxt, method, args):
# NOTE(vish): magic is fun!
try:
rval = node_func(context=ctxt, **node_args)
if msg_id:
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
msg_reply(msg_id, x, None)
else:
msg_reply(msg_id, rval, None)

# This final None tells multicall that it is done.
msg_reply(msg_id, None, None)
elif isinstance(rval, types.GeneratorType):
# NOTE(vish): this iterates through the generator
list(rval)
# Check if the result was a generator
if isinstance(rval, types.GeneratorType):
for x in rval:
ctxt.reply(x, None)
else:
ctxt.reply(rval, None)

# This final None tells multicall that it is done.
ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
if msg_id:
msg_reply(msg_id, None, sys.exc_info())
ctxt.reply(None, sys.exc_info())
return


Expand Down Expand Up @@ -447,7 +441,7 @@ def __init__(self, connection=None, msg_id=None):
super(DirectPublisher, self).__init__(connection=connection)


def msg_reply(msg_id, reply=None, failure=None):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
Expand All @@ -463,12 +457,17 @@ def msg_reply(msg_id, reply=None, failure=None):
with ConnectionPool.item() as conn:
publisher = DirectPublisher(connection=conn, msg_id=msg_id)
try:
publisher.send({'result': reply, 'failure': failure})
msg = {'result': reply, 'failure': failure}
if ending:
msg['ending'] = True
publisher.send(msg)
except TypeError:
publisher.send(
{'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure})
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
publisher.send(msg)

publisher.close()

Expand Down Expand Up @@ -508,8 +507,11 @@ def __init__(self, *args, **kwargs):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)

def reply(self, *args, **kwargs):
msg_reply(self.msg_id, *args, **kwargs)
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
msg_reply(self.msg_id, reply, failure, ending)
if ending:
self.msg_id = None


def multicall(context, topic, msg):
Expand Down Expand Up @@ -537,8 +539,11 @@ def __init__(self, consumer):
self._consumer = consumer
self._results = queue.Queue()
self._closed = False
self._got_ending = False

def close(self):
if self._closed:
return
self._closed = True
self._consumer.close()
ConnectionPool.put(self._consumer.connection)
Expand All @@ -548,30 +553,31 @@ def __call__(self, data, message):
message.ack()
if data['failure']:
self._results.put(RemoteError(*data['failure']))
elif data.get('ending', False):
self._got_ending = True
else:
self._results.put(data['result'])

def __iter__(self):
return self.wait()

def wait(self):
while True:
rv = None
while rv is None and not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
while not self._closed:
try:
rv = self._consumer.fetch(enable_callbacks=True)
except Exception:
self.close()
raise
if rv is None:
time.sleep(0.01)

continue
if self._got_ending:
self.close()
raise StopIteration
result = self._results.get()
if isinstance(result, Exception):
self.close()
raise result
if result == None:
self.close()
raise StopIteration
yield result


Expand Down
146 changes: 146 additions & 0 deletions nova/rpc/impl_fake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Fake RPC implementation which calls proxy methods directly with no
queues. Casts will block, but this is very useful for tests.
"""

import sys
import traceback
import types

from nova import context
from nova.rpc import common as rpc_common

CONSUMERS = {}


class RpcContext(context.RequestContext):
def __init__(self, *args, **kwargs):
super(RpcContext, self).__init__(*args, **kwargs)
self._response = []
self._done = False

def reply(self, reply=None, failure=None, ending=False):
if ending:
self._done = True
if not self._done:
self._response.append((reply, failure))


class Consumer(object):
def __init__(self, topic, proxy):
self.topic = topic
self.proxy = proxy

def call(self, context, method, args):
node_func = getattr(self.proxy, method)
node_args = dict((str(k), v) for k, v in args.iteritems())

ctxt = RpcContext.from_dict(context.to_dict())
try:
rval = node_func(context=ctxt, **node_args)
# Caller might have called ctxt.reply() manually
for (reply, failure) in ctxt._response:
if failure:
raise failure[0], failure[1], failure[2]
yield reply
# if ending not 'sent'...we might have more data to
# return from the function itself
if not ctxt._done:
if isinstance(rval, types.GeneratorType):
for val in rval:
yield val
else:
yield rval
except Exception:
exc_info = sys.exc_info()
raise rpc_common.RemoteError(exc_info[0].__name__,
str(exc_info[1]),
traceback.format_exception(*exc_info))


class Connection(object):
"""Connection object."""

def __init__(self):
self.consumers = []

def create_consumer(self, topic, proxy, fanout=False):
consumer = Consumer(topic, proxy)
self.consumers.append(consumer)
if topic not in CONSUMERS:
CONSUMERS[topic] = []
CONSUMERS[topic].append(consumer)

def close(self):
for consumer in self.consumers:
CONSUMERS[consumer.topic].remove(consumer)
self.consumers = []

def consume_in_thread(self):
pass


def create_connection(new=True):
"""Create a connection"""
return Connection()


def multicall(context, topic, msg):
"""Make a call that returns multiple times."""

method = msg.get('method')
if not method:
return
args = msg.get('args', {})

try:
consumer = CONSUMERS[topic][0]
except (KeyError, IndexError):
return iter([None])
else:
return consumer.call(context, method, args)


def call(context, topic, msg):
"""Sends a message on a topic and wait for a response."""
rv = multicall(context, topic, msg)
# NOTE(vish): return the last result from the multicall
rv = list(rv)
if not rv:
return
return rv[-1]


def cast(context, topic, msg):
try:
call(context, topic, msg)
except rpc_common.RemoteError:
pass


def fanout_cast(context, topic, msg):
"""Cast to all consumers of a topic"""
method = msg.get('method')
if not method:
return
args = msg.get('args', {})

for consumer in CONSUMERS.get(topic, []):
try:
consumer.call(context, method, args)
except rpc_common.RemoteError:
pass
23 changes: 16 additions & 7 deletions nova/rpc/impl_kombu.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ def _process_data(self, ctxt, method, args):
else:
ctxt.reply(rval, None)
# This final None tells multicall that it is done.
ctxt.reply(None, None)
ctxt.reply(ending=True)
except Exception as e:
LOG.exception('Exception during message handling')
ctxt.reply(None, sys.exc_info())
Expand Down Expand Up @@ -668,9 +668,11 @@ def __init__(self, *args, **kwargs):
self.msg_id = msg_id
super(RpcContext, self).__init__(*args, **kwargs)

def reply(self, *args, **kwargs):
def reply(self, reply=None, failure=None, ending=False):
if self.msg_id:
msg_reply(self.msg_id, *args, **kwargs)
msg_reply(self.msg_id, reply, failure, ending)
if ending:
self.msg_id = None


class MulticallWaiter(object):
Expand All @@ -679,8 +681,11 @@ def __init__(self, connection):
self._iterator = connection.iterconsume()
self._result = None
self._done = False
self._got_ending = False

def done(self):
if self._done:
return
self._done = True
self._iterator.close()
self._iterator = None
Expand All @@ -690,6 +695,8 @@ def __call__(self, data):
"""The consume() callback will call this. Store the result."""
if data['failure']:
self._result = RemoteError(*data['failure'])
elif data.get('ending', False):
self._got_ending = True
else:
self._result = data['result']

Expand All @@ -699,13 +706,13 @@ def __iter__(self):
raise StopIteration
while True:
self._iterator.next()
if self._got_ending:
self.done()
raise StopIteration
result = self._result
if isinstance(result, Exception):
self.done()
raise result
if result == None:
self.done()
raise StopIteration
yield result


Expand Down Expand Up @@ -759,7 +766,7 @@ def fanout_cast(context, topic, msg):
conn.fanout_send(topic, msg)


def msg_reply(msg_id, reply=None, failure=None):
def msg_reply(msg_id, reply=None, failure=None, ending=False):
"""Sends a reply or an error on the channel signified by msg_id.
Failure should be a sys.exc_info() tuple.
Expand All @@ -779,4 +786,6 @@ def msg_reply(msg_id, reply=None, failure=None):
msg = {'result': dict((k, repr(v))
for k, v in reply.__dict__.iteritems()),
'failure': failure}
if ending:
msg['ending'] = True
conn.direct_send(msg_id, msg)
1 change: 0 additions & 1 deletion nova/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import nova.image.fake
import shutil
import stubout
from eventlet import greenthread

from nova import fakerabbit
from nova import flags
Expand Down
1 change: 0 additions & 1 deletion nova/tests/api/ec2/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -1537,7 +1537,6 @@ def test_stop_start_with_volume(self):
self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1['id'])

greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2['id'])
self.assertTrue(vol['deleted'])
Expand Down

0 comments on commit 84693b4

Please sign in to comment.