New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add support for timeouts in ServiceProxy #360
base: master
Are you sure you want to change the base?
Conversation
bf613f2
to
adbd803
Compare
adbd803
to
e544575
Compare
e544575
to
b441801
Compare
in the past when we've talked about this, it was often required on a per-call basis. it's tricky because all parameters to rpc are considered rpc paramters |
In this PR, I've add a default timeout on the Looking through the discussion here, it seems like there's a strong preference towards not entering the "user space" of the base function call of a method. The two possible solutions that come to mind are:
n.rpc.greeting_service.hello(name='Matt')
n.rpc.greeting_service.hello.send(args=(), kwargs={'name': 'Matt'}, timeout=2) Celery does the same thing with Personally, I lean towards implementing an interface like (2) where the "default" way of calling a method is the simplest / easiest to read, but uses system default, while a slightly more involved way of calling the method allows full customization of how the call is made. Three questions for you @davidszotten:
Thanks for your thoughts / time! Also, @mattbennett would love your thoughts on this. |
a few other thoughts. there are two sides to what you are calling "timeout" here, client side (aborting the wait) and server side (some kind of ttl). do we definitely wanto to tie these together? (probaby yes?) we might need to think about how we handle the latter, e.g. (of the top of my head) your example uses the standalone proxy, but i think the rpc extension is as, if not more important to consider i think i'm on the fence on configuring via context manager vs adding another call syntax with args and kwargs params |
My thoughts: I’m fine with approach (2). I agree that a nice “default” and a slightly more involved alternative makes for a good API. I also favour supporting defaults on the proxy, in a way that they can be changed with a subclass or at instantiation time. Which is more-or-less what this PR adds. Adding this to the standalone RPC proxy, you get an API similar to the final one in #292: with ServiceRpcProxy('service', rabbit_config, timeout=30) as rpc_proxy:
... These two approaches can obviously be combined to create a proxy with a default but also allow options to be set on a per-call basis: # service case
class Service(object):
name = "service"
other_rpc = RpcProxy("other", timeout=30)
@rpc
def method(self):
self.other_rpc.remote_method.call(args=args, timeout=15)
# standalone case
with ServiceRpcProxy('service', rabbit_config, timeout=30) as proxy:
proxy.remote_method(...)
proxy.remote_method.call(..., timeout=15) @jessepollak I would certainly be open to you expanding this PR to cover these cases. Stylistically, I think it would be nice if the On the "two sides" of a timeout: I think the client abort and the TTL should be tied together, but only in a limited way. A client using a timeout should set that same timeout as a TTL on its request. If the message expires before being consumed, we'll have avoided some unnecessary work. But I think that trying to kill workers that are already running would cause more problems than it solves. The library has no knowledge of what the worker is doing, so it's not possible to stop them safely. Similarly for propagating the TTL to RPC calls made by that worker -- it's not necessarily true that child RPC calls (or events, or any other kind of side-effect) should be aborted if the client stops caring about the result. If aborting workers was really required, it could be implemented with a custom entrypoint that extracted the incoming TTL and allowed the worker to alter its behaviour accordingly. In other words I think this is application-level logic rather than something to be implemented by the framework. |
@@ -275,6 +278,9 @@ def get_reply_event(self, correlation_id): | |||
self._reply_events[correlation_id] = reply_event | |||
return reply_event | |||
|
|||
def pop_reply_event(self, correlation_id): | |||
self._replay_events.pop(correlation_id, None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing return
def _call(self, *args, **kwargs): | ||
_log.debug('invoking %s', self) | ||
|
||
worker_ctx = self.worker_ctx | ||
container = worker_ctx.container | ||
timeout = kwargs.pop('_timeout', None) or self.timeout |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timeout = kwargs.pop('_timeout', self.timeout)
will make sense.
I'm digging up the subject here. Is implementing a timeout still a viable option? Since it's been a while since the last commit, is there any way to detect a service is down before calling it via RPC? |
Timeouts are still viable and desired. I think both "client-side" and TTL-based timeouts would be valuable, and I like the interface that was proposed: n.rpc.service.method.call(args=(), kwargs={'foo': 'bar'}, timeout=10, expire=10) There's no way to tell whether a service is available I'm afraid, other than trying to call it. |
def get_message_headers(self, worker_context, timeout=None): | ||
headers = HeaderEncoder.get_message_headers(self, worker_context) | ||
if timeout: | ||
headers['x-message-ttl'] = timeout * 1000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've tried this key and the one that is mentioned in the RabbitMQ documentation and used by kombu (expiration
) and it doesn't work.
Expected that when the expired message reaches the head of the queue it shouldn't be delivered but it is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I've got it working. Instead of setting it to the headers (app specific) we need to set the property on the message when we publish it.
class _TimeoutAwareMethodProxy(nameko.rpc.MethodProxy):
"""Timeout aware RPC method proxy."""
def __init__(
self, *, timeout_secs: float, **kwargs,
) -> None:
super().__init__(**kwargs)
self.publisher.expiration = timeout_secs
I revived this here #652, hopefully including the feedback from this pull request |
Following up on the discussion here, I wanted to do a quick spike on adding timeouts to ServiceProxy (and thereby RpcProxy). This PR currently has the best implementation I could think of, but given my relative lack of experience with nameko core development, I'd love any and all feedback. Some things I'm currently hesitant / have questions about:
ReplyListener
receives messages, it needs to distinguish between those messages that are "unknown" and those that have timed out. To do this, I check if thereply_event
has already raised aRpcTimeoutError
and if it has I throw the event away. This extra try / catch seems a little heavy handed, but I couldn't think of another way to convey that information toReplyListener
. What are thoughts on this? Should we explore refactoringReplyListener
to make this a little easier?Thanks for the feedback etc!