Skip to content

Commit

Permalink
Merge pull request #468 from mattbennett/fix-flakey-tests
Browse files Browse the repository at this point in the history
Fix flakey tests
  • Loading branch information
mattbennett committed Jan 17, 2018
2 parents fb7560e + 8330051 commit a0c61ad
Show file tree
Hide file tree
Showing 5 changed files with 221 additions and 125 deletions.
2 changes: 1 addition & 1 deletion nameko/standalone/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def get_message(self, correlation_id):

try:
while correlation_id not in self.replies:
self.consumer.channel.connection.client.drain_events(
self.consumer.connection.drain_events(
timeout=self.timeout
)

Expand Down
12 changes: 8 additions & 4 deletions test/cli/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from nameko.exceptions import CommandError
from nameko.runners import ServiceRunner
from nameko.standalone.rpc import ClusterRpcProxy
from nameko.testing.waiting import wait_for_call

from test.sample import Service

Expand All @@ -35,8 +36,10 @@ def test_run(rabbit_config):
0,
'test.sample:Service',
])
gt = eventlet.spawn(main, args)
eventlet.sleep(1)

# start runner and wait for it to come up
with wait_for_call(ServiceRunner, 'start'):
gt = eventlet.spawn(main, args)

# make sure service launches ok
with ClusterRpcProxy(rabbit_config) as proxy:
Expand Down Expand Up @@ -116,8 +119,9 @@ def test_main_with_logging_config(rabbit_config, tmpdir):
'test.sample',
])

gt = eventlet.spawn(main, args)
eventlet.sleep(1)
# start runner and wait for it to come up
with wait_for_call(ServiceRunner, 'start'):
gt = eventlet.spawn(main, args)

with ClusterRpcProxy(rabbit_config) as proxy:
proxy.service.ping()
Expand Down
237 changes: 161 additions & 76 deletions test/standalone/test_rpc_proxy.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import itertools
import socket

import eventlet
import pytest
from eventlet.event import Event
from kombu.connection import Connection
from kombu.message import Message
from mock import Mock, call

from nameko.containers import WorkerContext
from nameko.exceptions import RemoteError, RpcConnectionError, RpcTimeout
from nameko.extensions import DependencyProvider
from nameko.rpc import MethodProxy, Responder, get_rpc_exchange, rpc
from nameko.standalone.rpc import ClusterRpcProxy, ServiceRpcProxy
from nameko.testing.utils import get_rabbit_connections
from nameko.standalone.rpc import (
ClusterRpcProxy, ConsumeEvent, ServiceRpcProxy
)
from nameko.testing.waiting import wait_for_call
from nameko.utils.retry import retry

from test import skip_if_no_toxiproxy

Expand Down Expand Up @@ -222,85 +222,170 @@ def test_multiple_calls_to_result(container_factory, rabbit_config):
res.result()


def test_disconnect_with_pending_reply(
container_factory, rabbit_manager, rabbit_config
):
block = Event()
@skip_if_no_toxiproxy
class TestDisconnectWithPendingReply(object):

class ExampleService(object):
name = "exampleservice"
@pytest.yield_fixture
def toxic_rpc_proxy(self, rabbit_config, toxiproxy):
rabbit_config['AMQP_URI'] = toxiproxy.uri
with ClusterRpcProxy(rabbit_config) as proxy:
yield proxy

def hook(self):
pass # pragma: no cover
def test_disconnect_and_successfully_reconnect(
self, container_factory, rabbit_manager, rabbit_config,
toxic_rpc_proxy, toxiproxy
):
block = Event()

@rpc
def method(self, arg):
self.hook()
block.wait()
return arg
class Service(object):
name = "service"

container = container_factory(ExampleService, rabbit_config)
container.start()
@rpc
def method(self, arg):
block.wait()
return arg

vhost = rabbit_config['vhost']
container = container_factory(Service, rabbit_config)
container.start()

# get exampleservice's queue consumer connection while we know it's the
# only active connection
connections = get_rabbit_connections(vhost, rabbit_manager)
assert len(connections) == 1
container_connection = connections[0]

with ServiceRpcProxy('exampleservice', rabbit_config) as proxy:

# grab the proxy's connection too, the only other connection
connections = get_rabbit_connections(vhost, rabbit_manager)
assert len(connections) == 2
proxy_connection = [
conn for conn in connections if conn != container_connection
][0]

counter = itertools.count(start=1)

class ConnectionStillOpen(Exception):
pass

@retry(for_exceptions=ConnectionStillOpen, delay=0.2)
def wait_for_connection_close(name):
connections = get_rabbit_connections(vhost, rabbit_manager)
for conn in connections:
if conn['name'] == name:
raise ConnectionStillOpen(name) # pragma: no cover

def cb(args, kwargs, res, exc_info):
# trigger a disconnection on the second call.
# release running workers once the connection has been closed
count = next(counter)
if count == 2:
rabbit_manager.delete_connection(proxy_connection['name'])
wait_for_connection_close(proxy_connection['name'])
block.send(True)
return True

# attach a callback to `hook` so we can close the connection
# while there are requests in-flight
with wait_for_call(ExampleService, 'hook', callback=cb):

# make an async call that runs for some time
async_call = proxy.method.call_async("hello")

# make another call that will trigger the disconnection;
# expect the blocking proxy to raise when the service reconnects
with pytest.raises(RpcConnectionError):
proxy.method("hello")
# make an async call that will block,
# wait for the worker to have spawned
with wait_for_call(container, 'spawn_worker'):
res = toxic_rpc_proxy.service.method.call_async('msg1')

# disconnect toxiproxy
toxiproxy.disable()

# also expect the running call to raise, since the reply may have
# been sent while the queue was gone (deleted on disconnect, and
# not added until re-connect)
# reconnect toxiproxy just before the consumer attempts to reconnect;
# (consumer.cancel is the only hook we have)
def reconnect(args, kwargs, res, exc_info):
block.send(True)
toxiproxy.enable()
return True

with wait_for_call(
toxic_rpc_proxy._reply_listener.queue_consumer.consumer, 'cancel',
callback=reconnect
):
# rpc proxy should return an error for the request in flight.
with pytest.raises(RpcConnectionError):
async_call.result()
res.result()

# proxy should work again after reconnection
assert toxic_rpc_proxy.service.method("msg3") == "msg3"

def test_disconnect_and_fail_to_reconnect(
self, container_factory, rabbit_manager, rabbit_config,
toxic_rpc_proxy, toxiproxy
):
block = Event()

class Service(object):
name = "service"

@rpc
def method(self, arg):
block.wait()
return arg

container = container_factory(Service, rabbit_config)
container.start()

# proxy should work again afterwards
assert proxy.method("hello") == "hello"
# make an async call that will block,
# wait for the worker to have spawned
with wait_for_call(container, 'spawn_worker'):
res = toxic_rpc_proxy.service.method.call_async('msg1')

try:
# disconnect
toxiproxy.disable()

# rpc proxy should return an error for the request in flight.
# it will also attempt to reconnect and throw on failure
# because toxiproxy is still disconnected
with pytest.raises(socket.error):
with pytest.raises(RpcConnectionError):
res.result()

finally:
# reconnect toxiproxy
block.send(True)
toxiproxy.enable()

# proxy will not work afterwards because the queueconsumer connection
# was not recovered on the second attempt
with pytest.raises(RuntimeError):
toxic_rpc_proxy.service.method("msg3")


class TestConsumeEvent(object):

@pytest.fixture
def queue_consumer(self):
queue_consumer = Mock()
queue_consumer.stopped = False
queue_consumer.connection.connected = True
return queue_consumer

def test_wait(self, queue_consumer):
correlation_id = 1
event = ConsumeEvent(queue_consumer, correlation_id)

result = "result"

def get_message(correlation_id):
event.send(result)
queue_consumer.get_message.side_effect = get_message

assert event.wait() == result
assert queue_consumer.get_message.call_args == call(correlation_id)

def test_wait_disconnected_while_waiting(self, queue_consumer):
correlation_id = 1
event = ConsumeEvent(queue_consumer, correlation_id)

exc = RpcConnectionError()

def get_message(correlation_id):
event.send_exception(exc)
queue_consumer.get_message.side_effect = get_message

with pytest.raises(RpcConnectionError):
event.wait()
assert queue_consumer.get_message.call_args == call(correlation_id)

def test_wait_already_disconnected(self, queue_consumer):
correlation_id = 1
event = ConsumeEvent(queue_consumer, correlation_id)

exc = RpcConnectionError()

event.send_exception(exc)
with pytest.raises(RpcConnectionError):
event.wait()
assert not queue_consumer.get_message.called

def test_wait_queue_consumer_stopped(self, queue_consumer):
correlation_id = 1
event = ConsumeEvent(queue_consumer, correlation_id)

queue_consumer.stopped = True

with pytest.raises(RuntimeError) as raised:
event.wait()
assert "stopped" in str(raised.value)
assert not queue_consumer.get_message.called

def test_wait_queue_consumer_disconnected(self, queue_consumer):
correlation_id = 1
event = ConsumeEvent(queue_consumer, correlation_id)

queue_consumer.connection.connected = False

with pytest.raises(RuntimeError) as raised:
event.wait()
assert "disconnected" in str(raised.value)
assert not queue_consumer.get_message.called


def test_timeout_not_needed(container_factory, rabbit_manager, rabbit_config):
Expand All @@ -315,7 +400,7 @@ def test_timeout(container_factory, rabbit_manager, rabbit_config):
container = container_factory(FooService, rabbit_config)
container.start()

with ServiceRpcProxy('foobar', rabbit_config, timeout=.1) as proxy:
with ServiceRpcProxy('foobar', rabbit_config, timeout=.5) as proxy:
with pytest.raises(RpcTimeout):
proxy.sleep(seconds=1)

Expand All @@ -341,7 +426,7 @@ def test_async_timeout(
container = container_factory(FooService, rabbit_config)
container.start()

with ServiceRpcProxy('foobar', rabbit_config, timeout=.1) as proxy:
with ServiceRpcProxy('foobar', rabbit_config, timeout=.5) as proxy:
result = proxy.sleep.call_async(seconds=1)
with pytest.raises(RpcTimeout):
result.result()
Expand Down

0 comments on commit a0c61ad

Please sign in to comment.