Skip to content

Commit

Permalink
Add rpc dependency provider tests
Browse files Browse the repository at this point in the history
  • Loading branch information
daviskirk committed Aug 17, 2019
1 parent 3eababb commit b4c779e
Show file tree
Hide file tree
Showing 3 changed files with 241 additions and 9 deletions.
7 changes: 2 additions & 5 deletions nameko/standalone/rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,15 +124,12 @@ def consume_reply(self, correlation_id, timeout=None):

timer = TimeoutTimer(timeout)
while not self.pending.get(correlation_id):
if timer.time_left is not None and timer.time_left <= 0:
self.pending.pop(correlation_id, None)
raise RpcTimeout('Timed out after {} seconds'.format(timeout))
try:
next(self.consumer.consume(timeout=timer.time_left))
except socket.timeout as e:
except socket.timeout:
if timer.time_left is not None and timer.time_left <= 0:
raise RpcTimeout('Timed out after {} seconds'.format(timeout))
raise
raise # pragma: no cover # should never be reached
return self.pending.pop(correlation_id)

def handle_message(self, body, message):
Expand Down
19 changes: 17 additions & 2 deletions test/standalone/test_rpc_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,7 @@ def producer(self, get_producer):
producer.channel.returned_messages.get_nowait.side_effect = queue.Empty
return producer

@pytest.mark.parametrize('use_kwargs', [False, True])
@pytest.mark.usefixtures("memory_rabbit_config")
@pytest.mark.parametrize("parameter", [
# delivery options
Expand All @@ -601,19 +602,33 @@ def producer(self, get_producer):
'user_id', 'bogus_param'
])
def test_regular_parameters(
self, parameter, mock_container, producer
self, parameter, mock_container, producer, use_kwargs
):
""" Verify that most parameters can be specified at ServiceRpc
instantiation time.
"""
value = Mock()

rpc_client = ClusterRpcClient(**{parameter: value})
if use_kwargs:
kwargs = {parameter: value}
else:
kwargs = {'publisher_options': {parameter: value}}

rpc_client = ClusterRpcClient(**kwargs)

with rpc_client as client:
client.service.method.call_async()
assert producer.publish.call_args[1][parameter] == value

@patch('nameko.standalone.rpc._logger.warning')
def test_ignore_expiration(self, mock_warning):
""" Verify that most parameters can be specified at ServiceRpc
instantiation time.
"""
ClusterRpcClient(publisher_options={'expiration': 1})
assert mock_warning.call_count == 1
assert mock_warning.call_args[0][0].startswith('expiration set using') == 1

@pytest.mark.usefixtures("memory_rabbit_config")
@pytest.mark.usefixtures('predictable_call_ids')
def test_headers(self, mock_container, producer):
Expand Down
224 changes: 222 additions & 2 deletions test/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
)
from nameko.extensions import DependencyProvider
from nameko.rpc import (
Client, ClusterRpc, ReplyListener, Responder, Rpc, RpcConsumer, ServiceRpc,
rpc
Client, ClusterRpc, ReplyListener, Responder, Rpc, RpcConsumer, RpcTimeout,
ServiceRpc, rpc
)
from nameko.standalone.rpc import ServiceRpcClient
from nameko.testing.services import dummy, entrypoint_hook, entrypoint_waiter
Expand Down Expand Up @@ -288,6 +288,29 @@ def test_cannot_invoke_unspecified_client(self):
with pytest.raises(ValueError):
client.service()

@pytest.mark.parametrize('arg', Client.updatable_options)
def test_with_options(self, arg):
publish, register_for_reply, context_data = Mock(), Mock(), Mock()

client = Client(publish, register_for_reply, context_data)
modified_client = client.with_options(**{arg: 'changed'})
assert client != modified_client

for k, v in modified_client.__dict__.items():
original_value = client.__dict__[k]
if k == arg:
assert v != original_value
assert v == 'changed'
else:
assert v == original_value

def test_with_options_invalid_key(self):
publish, register_for_reply, context_data = Mock(), Mock(), Mock()

client = Client(publish, register_for_reply, context_data)
with pytest.raises(ValueError):
client.with_options(wrong=3)


# =============================================================================
# INTEGRATION TESTS
Expand Down Expand Up @@ -813,6 +836,149 @@ def echo(self, arg):
missing("foo")


@pytest.mark.usefixtures("rabbit_config")
@pytest.mark.parametrize("override", [None, 'service', 'method'])
@pytest.mark.parametrize("timeout, async_delay, call_duration, error", [
pytest.param(0.1, None, 1, True,
id="long duration with short timeout"),
pytest.param(0.2, None, 0.1, False,
id="short duration with long timeout"),
pytest.param(0.15, 0.2, 0.1, False,
id="async timeout but returns immediately"),
pytest.param(0.2, 0.1, 1, True,
id="async long duration with short timeout"),
])
def test_rpc_dependency_timeout(
container_factory,
override,
timeout, async_delay, call_duration, error
):

service_rpc_kwargs, override_kwargs = {}, {}
kwargs = {'timeout': timeout}
if override:
override_kwargs = kwargs
else:
service_rpc_kwargs = kwargs

class Service(object):
name = "service"
delegate = ServiceRpc("delegate", **service_rpc_kwargs)

@dummy
def method(self):
if override == 'service':
meth = self.delegate.with_options(**override_kwargs).method
elif override == 'method':
meth = self.delegate.method.with_options(**override_kwargs)
else:
meth = self.delegate.method

if async_delay is not None:
method_call = meth.call_async()
eventlet.sleep(async_delay)
return method_call.result()
return meth()

class DelegateService(object):
name = "delegate"

@rpc
def method(self):
eventlet.sleep(call_duration)
return "success"

container = container_factory(Service)
container.start()

delegate_container = container_factory(DelegateService)
delegate_container.start()

with entrypoint_hook(container, 'method') as method:
with eventlet.Timeout(1):
if error:
with pytest.raises(RpcTimeout):
method()
else:
assert method() == 'success'


@pytest.mark.usefixtures("rabbit_config")
@pytest.mark.parametrize("override", [None, 'service', 'method'])
@pytest.mark.parametrize("expiration, timeout, delay, error", [
pytest.param(None, None, 0, False,
id="no expiration / no timeout / no delivery delay"),
pytest.param(None, 0.5, 1, RpcTimeout,
id="no expiration with delivery before timeout"),
pytest.param(None, 2, 1, False,
id="no expiration with delivery before timeout"),
pytest.param(2, 2, 1, False,
id="long expiration with delivery before timeout"),
pytest.param(0.1, 0.5, 0.2, RpcTimeout,
id="expired message triggering timeout"),
])
def test_rpc_dependency_expiration(
container_factory,
override,
expiration, timeout, delay, error
):
"""Test message expiration in combination with timeout option.
`expiration` and `timeout` arguments are passed to the rpc provider, the `delay`
indicates the time in which the delegate service is down and cannot accept
messages (allowing messages to expire during this time).
`override` controls at which point the options
"""
service_rpc_kwargs, override_kwargs = {}, {}
kwargs = {'timeout': timeout, 'expiration': expiration}
if override:
override_kwargs = kwargs
else:
service_rpc_kwargs = kwargs

class Service(object):
name = "service"
delegate = ServiceRpc("delegate", **service_rpc_kwargs)

@dummy
def method(self):
if override == 'service':
meth = self.delegate.with_options(**override_kwargs).method
elif override == 'method':
meth = self.delegate.method.with_options(**override_kwargs)
else:
meth = self.delegate.method
return meth()

class DelegateService(object):
name = "delegate"

@rpc
def method(self):
return "success"

container = container_factory(Service)
container.start()

delegate_container = container_factory(DelegateService)
delegate_container.start()
# eventlet.sleep(1)
delegate_container.stop()
# eventlet.sleep(1)
eventlet.spawn_after(delay, delegate_container.start)

with entrypoint_hook(container, 'method') as method:
with eventlet.Timeout(5) as timeout:
if error:
with pytest.raises(RpcTimeout):
method()
else:
result = method()
assert result == 'success'


@pytest.mark.usefixtures('rabbit_config')
@skip_if_no_toxiproxy
class TestDisconnectedWhileWaitingForReply(object): # pragma: no cover
Expand Down Expand Up @@ -1897,6 +2063,60 @@ def test_restricted_parameters(self, patch_uuid, mock_container, producer):
assert publish_params['reply_to'] == "uuid1"
assert publish_params['correlation_id'] == "uuid2"

@patch('nameko.rpc.ReplyListener.register_for_reply')
@pytest.mark.parametrize('publisher_options', [None, {'expiration': 111}])
@pytest.mark.parametrize('expiration, timeout, expected', [
pytest.param(
None, None, None, id='no config'),
pytest.param(
None, 1, None, id='timeout with no expiration'),
pytest.param(
1, None, ValueError, id='expiration without timeout raises error'),
pytest.param(
1, 2, 1, id='timeout + expiration with larger timeout'),
pytest.param(
2, 1, 2, id='timeout + expiration with larger expiration'),
])
def test_expiration_and_timeout(self, mock_container, producer,
publisher_options,
expiration, timeout, expected):
"""Verify that expiration and timeout options can be set, expiration
is passed to correctly producer and timeout is passed correctly to
reply listener.
"""
mock_container.shared_extensions = {}
mock_container.service_name = "service-name"

# use a real worker context so nameko headers are generated
service = Mock()
entrypoint = Mock(method_name="method")
worker_ctx = WorkerContext(
mock_container, service, entrypoint, data={'context': 'data'}
)

service_rpc = ServiceRpc(
"service-name",
expiration=expiration,
timeout=timeout,
publisher_options=publisher_options
).bind(mock_container, "service_rpc")

service_rpc.reply_listener.setup()
service_rpc.setup()

service_rpc = service_rpc.get_dependency(worker_ctx)

if isinstance(expected, type) and issubclass(expected, Exception):
with pytest.raises(expected):
service_rpc.method.call_async()
else:
service_rpc.method.call_async()
assert producer.publish.call_args[1]['expiration'] == expected
# Also check that timeout was passed correctly
reply_call_args = ReplyListener.register_for_reply.call_args
assert reply_call_args[1] == {'timeout': timeout}


@pytest.mark.usefixtures("rabbit_config")
@config.patch({MAX_WORKERS_CONFIG_KEY: 1})
Expand Down

0 comments on commit b4c779e

Please sign in to comment.