Skip to content

Commit

Permalink
fix race condition in standalone rpc proxy check for pending messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mattbennett committed May 3, 2018
1 parent b3e6248 commit 58e2692
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 66 deletions.
6 changes: 2 additions & 4 deletions nameko/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,7 @@ def start(self):
def stop(self):
self.should_stop = False

def get_consumers(self, consumer_cls, channel):
""" Extend Consumer.get_consumers
"""
def on_connection_revived(self):
if self.pending:
try:
with self.connection as conn:
Expand All @@ -285,7 +283,7 @@ def get_consumers(self, consumer_cls, channel):
)
)

return super(ReplyListener, self).get_consumers(consumer_cls, channel)
return super(ReplyListener, self).on_connection_revived()

def register_for_reply(self, correlation_id=None):
if correlation_id is None:
Expand Down
38 changes: 30 additions & 8 deletions nameko/standalone/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@

class ReplyListener(Consumer):

REGISTERED = "registered"
REQUESTED = "requested"

def __init__(self, config, queue, timeout=None, **kwargs):
self.queue = queue
self.timeout = timeout
Expand All @@ -51,26 +54,30 @@ def start(self):
def stop(self):
self.should_stop = True

def get_consumers(self, consumer_cls, channel):
""" Extend Consumer.get_consumers
"""
if self.pending:
def on_connection_revived(self):

requested_not_received = [
correlation_id for correlation_id, state in self.pending.items()
if state is self.REQUESTED
]
if requested_not_received:
try:
with self.connection as conn:
self.queue.bind(conn).queue_declare(passive=True)
print("queue still exists")
except NotFound:
raise ReplyQueueExpiredWithPendingReplies(
"Lost replies for correlation ids:\n{}".format(
"\n".join(self.pending.keys())
"\n".join(requested_not_received)
)
)

return super(ReplyListener, self).get_consumers(consumer_cls, channel)
return super(ReplyListener, self).on_connection_revived()

def register_for_reply(self, correlation_id=None):
if correlation_id is None:
correlation_id = str(uuid.uuid4())
self.pending[correlation_id] = None
self.pending[correlation_id] = self.REGISTERED
return RpcReply(
lambda: self.consume_reply(correlation_id), correlation_id
)
Expand All @@ -80,11 +87,26 @@ def consume_reply(self, correlation_id):
if self.should_stop:
raise RuntimeError("Stopped and can no longer be used")

while not self.pending.get(correlation_id):
recoverable_errors = (
self.connection.connection_errors +
self.connection.channel_errors
)

# if registered: move to requested
# TODO would be better if this state lived in the RpcReply object
if self.pending.get(correlation_id) is self.REGISTERED:
self.pending[correlation_id] = self.REQUESTED

while self.pending.get(correlation_id) is self.REQUESTED:
try:
next(self.consume(timeout=self.timeout))
except socket.timeout:
# TODO socket timeout does not equate to RPC timeout;
# we can try to consume a message multiple times
raise RpcTimeout()
except recoverable_errors:
pass

return self.pending.pop(correlation_id)

def handle_message(self, body, message):
Expand Down
83 changes: 35 additions & 48 deletions test/standalone/test_rpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ def test_proxy_remote_error(container_factory, rabbit_config):
assert exc_info.value.exc_type == "ExampleError"


@pytest.mark.skip("Known race condition")
@patch('nameko.standalone.rpc.RPC_REPLY_QUEUE_TTL', new=200)
def test_reply_queue_removed_on_expiry(
rabbit_manager, rabbit_config, container_factory
Expand Down Expand Up @@ -187,33 +186,6 @@ def list_queues():
assert queues_after == queues_before


@pytest.mark.skip("Known race condition")
@patch('nameko.standalone.rpc.RPC_REPLY_QUEUE_TTL', new=200)
def test_reply_queue_not_removed_while_in_use(
rabbit_manager, rabbit_config, container_factory
):
def list_queues():
vhost = rabbit_config['vhost']
return [
queue['name']
for queue in rabbit_manager.get_queues(vhost=vhost)
]

container = container_factory(FooService, rabbit_config)
container.start()

# check proxy re-use
with ServiceRpcProxy('foobar', rabbit_config) as foo:
queues_before = list_queues()
# sleep for 2x TTL
assert foo.sleep(0.4) == 0.4
queues_between = list_queues()
assert foo.spam(ham='eggs') == 'eggs'
queues_after = list_queues()

assert queues_before == queues_between == queues_after


@skip_if_no_toxiproxy
class TestDisconnectedWhileWaitingForReply(object):

Expand All @@ -236,25 +208,25 @@ def establish_connection(self):
yield

@pytest.yield_fixture(autouse=True)
def fast_expiry(self):
with patch('nameko.standalone.rpc.RPC_REPLY_QUEUE_TTL', new=100):
def toxic_reply_listener(self, toxiproxy):
with patch.object(ReplyListener, 'amqp_uri', new=toxiproxy.uri):
yield

@pytest.fixture(autouse=True)
def container(
self, container_factory, rabbit_config, rabbit_manager, toxiproxy,
fast_expiry
@patch('nameko.standalone.rpc.RPC_REPLY_QUEUE_TTL', new=200)
def test_reply_queue_removed_while_disconnected_with_pending_reply(
self, toxiproxy, rabbit_config, container_factory
):

def enable_after_queue_expires():
eventlet.sleep(1)
eventlet.sleep(.4)
toxiproxy.enable()

class Service(object):
name = "service"

@rpc
def sleep(self):
# allow proxy reply listener to connect before disconnecting it
eventlet.sleep(.2)
toxiproxy.disable()
eventlet.spawn_n(enable_after_queue_expires)
return "OK"
Expand All @@ -263,20 +235,35 @@ def sleep(self):
container = container_factory(Service, config)
container.start()

return container
with ClusterRpcProxy(rabbit_config) as proxy:
with pytest.raises(ReplyQueueExpiredWithPendingReplies):
proxy.service.sleep()

@pytest.yield_fixture
def toxic_rpc_proxy(self, rabbit_config, toxiproxy):
config = rabbit_config.copy()
config['AMQP_URI'] = toxiproxy.uri
with ClusterRpcProxy(config) as proxy:
yield proxy

def test_reply_queue_removed_while_disconnected_with_pending_reply(
self, toxic_rpc_proxy, toxiproxy, container
):
with pytest.raises(ReplyQueueExpiredWithPendingReplies):
toxic_rpc_proxy.service.sleep()
@patch('nameko.standalone.rpc.RPC_REPLY_QUEUE_TTL', new=200)
def test_reply_queue_not_removed_while_in_use(
rabbit_manager, rabbit_config, container_factory
):
def list_queues():
vhost = rabbit_config['vhost']
return [
queue['name']
for queue in rabbit_manager.get_queues(vhost=vhost)
]

container = container_factory(FooService, rabbit_config)
container.start()

# check proxy re-use
with ServiceRpcProxy('foobar', rabbit_config) as foo:
queues_before = list_queues()
# sleep for 2x TTL
assert foo.sleep(.4) == .4
queues_between = list_queues()
assert foo.spam(ham='eggs') == 'eggs'
queues_after = list_queues()

assert queues_before == queues_between == queues_after


@patch('nameko.standalone.rpc.RPC_REPLY_QUEUE_TTL', new=200)
Expand Down
8 changes: 2 additions & 6 deletions test/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -802,14 +802,10 @@ def sleep(self):
eventlet.spawn_n(enable_after_queue_expires)
return "OK"

# very fast heartbeat
config = rabbit_config
config[HEARTBEAT_CONFIG_KEY] = 2 # seconds

container = container_factory(Service, config)
container = container_factory(Service, rabbit_config)
container.start()

delegate_container = container_factory(DelegateService, config)
delegate_container = container_factory(DelegateService, rabbit_config)
delegate_container.start()

return container
Expand Down

0 comments on commit 58e2692

Please sign in to comment.