Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add client and server timeouts to rpc #652

Open
wants to merge 6 commits into
base: v3.0.0-rc
from

Conversation

@daviskirk
Copy link
Contributor

daviskirk commented Jul 26, 2019

This is an implementation of the suggested feature mentioned here: https://www.notion.so/Full-support-for-RPC-timeouts-592cd7903cc84a049be11abfe778d5ce

There are still some additional tests missing but the basic concept would be to add to arguments to all rpc clients (standalone and dependency provider):

  • timeout: will raise a timeout if when the total roundtrip time is exceeded (raised in at the callers machine)
  • delivery_timeout: Uses rabbitmqs "expiration" to remove undelivered messages from the server.

I built in logic to try and prevent race conditions (message removed from server and client is still waiting forever for an answer).

As always there are a few open questions I'd like to discuss with the maintainers (@mattbennett ?):

The current implementation no longer uses the socket timeout on the standalone rpc client (as suggested in the roadmap). However, it now relies on the timeout being raised by the rpc client. Previously (as far as I can tell) the standalone client did not rely on eventlet at all, but now the rpc client uses this to implement timing out after waiting for a reply for a while. The question is whether this is ok, or if this was a conscious decision to allow non-monkeypatched clients to still use the standalone rpc client.

Copy link
Member

mattbennett left a comment

This looks great, thanks @daviskirk!

Previously (as far as I can tell) the standalone client did not rely on eventlet at all, but now the rpc client uses this to implement timing out after waiting for a reply for a while. The question is whether this is ok, or if this was a conscious decision to allow non-monkeypatched clients to still use the standalone rpc client.

It was a conscious choice, but not a very well executed one. The idea is to be able to use the nameko.standalone package in any library, regardless of its concurrency model. But it's strange that you have to install all of Nameko, including eventlet, in order to use it. Really it should be a separate package.

For now though people rely on it working without a monkey patch. Can you implement a native-thread based timeout mechanism? Then it'd work in vanilla Python and Eventlet can co-opt it if the monkey patch is applied.

nameko/rpc.py Outdated Show resolved Hide resolved
@daviskirk daviskirk force-pushed the daviskirk:rpc-timeouts branch from b49f7e9 to 649ebb1 Aug 11, 2019
@daviskirk daviskirk force-pushed the daviskirk:rpc-timeouts branch 2 times, most recently from 1c4a5ff to b4c779e Aug 15, 2019
@daviskirk daviskirk force-pushed the daviskirk:rpc-timeouts branch 2 times, most recently from 94ec851 to 50cea76 Aug 18, 2019
@daviskirk daviskirk force-pushed the daviskirk:rpc-timeouts branch from 50cea76 to 9035c66 Aug 18, 2019
- event timeouts are only available starting eventlet 0.22.1
@daviskirk daviskirk force-pushed the daviskirk:rpc-timeouts branch from 9035c66 to 6bc838f Aug 18, 2019
@daviskirk daviskirk force-pushed the daviskirk:rpc-timeouts branch from 8905edb to 9777ccb Aug 18, 2019
@daviskirk

This comment has been minimized.

Copy link
Contributor Author

daviskirk commented Aug 18, 2019

For now though people rely on it working without a monkey patch. Can you implement a native-thread based timeout mechanism?

I used the builtin "timeout" argument in the consumer and added a test test_timeout_with_multiple_calls for the scenario mentioned in https://www.notion.so/Full-support-for-RPC-timeouts-592cd7903cc84a049be11abfe778d5ce (regarding the standalone rpc proxy implementation).
If this is not enough I can try to implement a threading based approach (I tried that first but had problems as there's no easy way to kill native threads in python which caused problems for me trying to implement a threading based timeout ... I'm admittedly the opposite of proficient when it comes to using native threads in python though).

@daviskirk daviskirk requested a review from mattbennett Sep 9, 2019
@daviskirk daviskirk changed the title WIP: Add client and server timeouts to rpc Add client and server timeouts to rpc Sep 9, 2019
Copy link
Member

mattbennett left a comment

@daviskirk

First, so sorry it's taken me such a long time to review this. This is a really great contribution -- thank you.

I have a few small comments and then a couple of larger ones on the structure of the tests.

""" Register an RPC call with the given `correlation_id` for a reply.
Returns a function that can be used to retrieve the reply, blocking
until it has been received.
until it has been received. If a timeout is given.

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

This docstring is incomplete.

publisher_options = dict(**publisher_options)

if publisher_options_kwargs:
_log.warning(

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

Would be better to raise this as a DeprecationWarning, e.g.

nameko/nameko/messaging.py

Lines 120 to 126 in 88d7e52

warnings.warn(
"The signature of `Publisher` has changed. The `queue` kwarg "
"is now deprecated. You can use the `declare` kwarg "
"to provide a list of Kombu queues to be declared. "
"See CHANGES, version 2.7.0 for more details. This warning "
"will be removed in version 2.9.0.",
DeprecationWarning

You can then also filter it out of the test logs with the filterwarnings decorator, e.g.

@pytest.mark.filterwarnings("ignore:The signature of `Publisher`:DeprecationWarning")
def test_publish_to_queue(
patch_maybe_declare, mock_producer, mock_channel, mock_container
):

@@ -561,12 +621,31 @@ def call_async(self, *args, **kwargs):
rpc_call = self._call(*args, **kwargs)
return rpc_call

def with_options(self, **options):
"""Create a new client with modified options."""

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

What's your view of making this a context manager? So you can use it like:

class Service:
    service_rpc = RpcClient("service")

    @entrypoint
    def meth(self):
        with self.service_rpc.with_options(timeout=5) as client:
            client.method()

This comment has been minimized.

Copy link
@daviskirk

daviskirk Oct 25, 2019

Author Contributor

In fact I started building it as one. I'll try and explain how we came to the conclusion that a direct version might be better:

  • A context manager implied "context" that is wrapped in a block. In this case the client is truly a new object independent from the old object and there is no entering or closing functionality necessary, so there is no real reason for a context manager.
  • For many options we more or less want to use it for a single call (especially timeouts, expiry which might be different for every call). In the case of a normal method, there's nothing keeping use from making a single line method call, which makes for uncluttered code especially in method heavy code where various timeouts will be applied very generously (in our case almost every call will have different options). Especially when these methods are used in a couple of ways, it's really easy to get really nested code without much benefit:
with self.service_rpc.with_options(timeout=5) as client1:
    client.method('this', 'that')

    with self.service_rpc.with_options(timeout=7, expiration=2) as client2:
        client2.method2('something') 

        with client2.with_options(timeout=4) as client3:
             client3.method2('something else') 

    client1.method('first again')
f = self.service_rpc.method.with_options(timeout=5)
client2 = self.service_rpc.with_options(timeout=7, expiration=2)

f('this', 'that')
client2.method2('something')
client2.with_options(timeout=4).method2('something else')
f('first again')

Most of our code my code that I tried tried to test this with got to be nested 2,3 or 4 levels deep fairly quickly and using an exit stack (https://docs.python.org/3/library/contextlib.html#contextlib.ExitStack) works but doesn't make the code look nicer (and as there is no real context to close without any real benefit as far as I can tell)

_logger.warning(
'expiration set using publisher_options '
'will be ignored.'
'use the explicit "expiration" argument instead.')

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

You're missing a space between these two sentences

self.timeout is None
):
# TODO: Figure out how to check this if "expiration" is set on the
# publisher itself, which we don't have access to here.

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

This is a bit unfortunate. An example of user ending up with a hung client is the below?

class Service:
    service_rpc = RpcClient("service" publisher_options={"timeout": 5, "expiration": 5}))

    @entrypoint
    def meth(self):
        new_client = self.service_rpc.with_options(timeout=None)  # cannot inspect publisher expiration
        new_client.method()  # will hang if message expires

This comment has been minimized.

Copy link
@daviskirk

daviskirk Oct 25, 2019

Author Contributor

Yes this was the kind of thing I was concerned about.
However, in the mean time I think this case might be covered:

The publisher option "expiration" is also passed to the client, so in this case while the client could not inspect the publisher, the client itself would actually have the expiration option set to the same value as the publisher and would trigger this ValueError.

While this isn't 100% "safe" since someone could later change the code without realizing this can cause problems, I think it's probably good enough if it works, but I will make extra sure these cases are tested (which would also guard against changes) and change the comment appropriately if that sounds ok?

(0.1, False, 0.3, 1), # fires at 0.1, 0.4
(0.1, True, 0.3, 1), # fires at 0, 0.3
(0.04, False, 0.06, 2), # fires at 0.04, 0.1, 0.16
(0.04, True, 0.06, 3), # fires at 0, 0.06, 0.12

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

Why were these changes required?

This comment has been minimized.

Copy link
@daviskirk

daviskirk Oct 25, 2019

Author Contributor

These were failing once in a while on the CI server.
Locally they always work but I think and I made them as short as possible and the delay on the server seems to be a tiny bit longer.
Only noticed it because I was pushing to this branch a lot.

test/test_rpc.py Show resolved Hide resolved
elif override == 'method':
meth = self.delegate.method.with_options(**override_kwargs)
else:
meth = self.delegate.method

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

This is quite complicated. At the risk of duplicating some code, can you split it up so it's more understandable?

The parameterisation around "timeout, async_delay, call_duration, error" is fine, but perhaps you can use fixture parameterisation to build the Service class outside of the test method.

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

What the service / method / None override is really asserting is that it doesn't matter how you specify the timeout value. You perhaps don't need to test this end-to-end.

Additionally, it'd be more powerful to test that you can override with a different value and that is the one that's respected. Indeed, a method-supplied value should override a service-supplied value which should override a constructor-supplied value.

meth = self.delegate.method.with_options(**override_kwargs)
else:
meth = self.delegate.method
return meth()

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

Same comment as above about the override complexity.


delegate_container = container_factory(DelegateService)
delegate_container.start()
# eventlet.sleep(1)

This comment has been minimized.

Copy link
@mattbennett

mattbennett Oct 25, 2019

Member

Guess these are leftover?

I guess you have to start the delegate_container in order to create the queues, and then stop it again to allow the messages to expire? Might be worth adding a little comment along those lines

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
2 participants
You can’t perform that action at this time.