Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BlockingConnection tune for heartbeat=0 properly #966

Merged
merged 20 commits into from
Feb 27, 2018

Conversation

darcoli
Copy link
Contributor

@darcoli darcoli commented Feb 20, 2018

Fix for #965

@lukebakken
Copy link
Member

Please see the comment I added for #965

@lukebakken lukebakken closed this Feb 20, 2018
@darcoli
Copy link
Contributor Author

darcoli commented Feb 20, 2018

maybe this can be reconsidered given the discussion in #965

@lukebakken lukebakken reopened this Feb 20, 2018
@lukebakken
Copy link
Member

lukebakken commented Feb 20, 2018

Certainly, please add the verbiage that @vitaly-krugl requested and a test, if possible.

There is also this comment to address.

@lukebakken lukebakken added this to the 1.0.0 milestone Feb 20, 2018
@lukebakken lukebakken self-assigned this Feb 20, 2018
@vitaly-krugl
Copy link
Member

vitaly-krugl commented Feb 20, 2018

This is how it used to work, which I argue is preferential from user perspective based on extensive usage of several years: 781c548.

As a user, I needed to set a heartbeat value that made sense for my application, which was higher than the value proposed in server's tune method. But I still wanted a heartbeat to keep the connection alive and detect lost connection, just not as frequently as the server proposed. Since I don't always have control over the server's proposed value (not atypical), the client library needs to enable the app to get what it needs, especially when this is consistent with the AMQP 0.9.1 specification. This became especially critical after RabbitMQ reduced its default proposed heartbeat from 300 to 60 or so.

As it stands now, there is no way for an application to specify a heartbeat timeout greater than that proposed by the server.

A more flexible approach would be to allow the app to pass a callable that accepts the heartbeat value proposed by server and returns the heartbeat value that the app wants to use. The result should be honored as long as it's non-negative, otherwise raising ValueError. This callable would be called during negotiation once the server's proposed value is known. One way to instrument this in python is to allow either an integer or a callable to be passed for the heartbeat arg to Parameters/ConnectionParameters. URLParameters is a different story, perhaps it's constructor could be extended to accept an additional callable arg (e.g., heartbeat_tune_callback) for negotiating heartbeat that would be mutually-exclusive with the heartbeat value provided in the URL.

Per AMQP 0.9.1 tune documentation:

tune(short channel-max, long frame-max, short heartbeat)
Propose connection tuning parameters.
This method proposes a set of connection configuration values to the client. The client can accept and/or adjust these.

The application is the ultimate client, and the AMQP library should not interfere with the app's ability to use the AMQP protocol.

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Feb 20, 2018

Also, this docstring needs to be updated:

pika/pika/connection.py

Lines 368 to 374 in 3d3b95d

@heartbeat.setter
def heartbeat(self, value):
"""
:param value: desired connection heartbeat timeout for negotiation or
None to accept broker's value. 0 turns heartbeat off.
"""

And this one:

pika/pika/connection.py

Lines 615 to 617 in 3d3b95d

:param int heartbeat: Heartbeat timeout. Max between this value
and server's proposal will be used as the heartbeat timeout. Use 0
to deactivate heartbeats and None to accept server's proposal.

Also this one:

pika/pika/connection.py

Lines 743 to 745 in 3d3b95d

- heartbeat:
Specify the number of seconds between heartbeat frames to ensure that
the link between RabbitMQ and your application is up

Copy link
Member

@lukebakken lukebakken left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello! Please see @vitaly-krugl's comments

@darcoli
Copy link
Contributor Author

darcoli commented Feb 22, 2018

Hi @lukebakken. Yes i am implementing the suggested changes. Quick question, I'd like to reuse the CallbackManager instance in Connection to implement the suggested callback for heartbeat tuning. However, I am missing if there is a way to get the return value of the user's callback after invoking self.callbacks.process(..., ON_CONNECTION_TUNE,...). In CallbackManager I do not see the return value of the callbacks being saved. Is this the correct way or should this be done differently?

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Feb 22, 2018 via email

@vitaly-krugl
Copy link
Member

vitaly-krugl commented Feb 22, 2018 via email

@darcoli
Copy link
Contributor Author

darcoli commented Feb 22, 2018

I thought it might help to keep all callbacks handled consistently instead of reimplementing something that might already exist. I quickly skimmed through the code of CallbackManager and saw that it does some checking as well. But of course it may not apply here so I will take your advise and implement as suggested ;)

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see my review feedback. Thank you!

@@ -1975,6 +1984,15 @@ def _on_connection_tune(self, method_frame):
self.params.frame_max = Connection._negotiate_integer_value(self.params.frame_max,
method_frame.method.frame_max)

if callable(self.params.heartbeat):
ret_heartbeat = self.params.heartbeat(method_frame.method.heartbeat)
if not isinstance(ret_heartbeat, numbers.Integral):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To avoid duplicating validation logic that's already in Parameters.heartbeat setter (type and minimum), just raise TypeError here if user's callback returned a callable or None, otherwise just assign the returned value to self.params.heartbeat to let it do the rest of the validation.

Copy link
Contributor Author

@darcoli darcoli Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about reusing the setter logic but now I do not want the callback to return another callable (in this fix I changed the setter to allow callable) - so essentially it is not duplicating the same logic because the logic of the check changes when then TUNE connection state is reached. I thought about making the logic in heartbeat's setter in Parameters depend on the Connection state (ie. do not allow callable during TUNE state) but I thought doing so is not a clean solution (would then need to add another property inside Parameters for the connection state).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darcoli - If something ever changes, I hate fixing the same logic in multiple places. What I meant is the following:

ret_heartbeat = self.params.heartbeat(method_frame.method.heartbeat)
if ret_heartbeat is None or callable(ret_heartbeat):
    # Enforce callback-specific restrictions on callback's return value
    raise TypeError('heartbeat callback must must not return None or callable, but got %r' % (ret_heartbeat,))

# Let hearbeat setter deal with the rest of the validation, so as not to duplicate the additional validation logic
self.params.heartbeat = ret_heartbeat

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hehe ok ... still not completely untangled but for sure its better than my version :D

@@ -370,13 +370,15 @@ def heartbeat(self, value):
"""
:param value: desired connection heartbeat timeout for negotiation or
None to accept broker's value. 0 turns heartbeat off.
Can be a callable that accepts server value and returns a new value
to be negotiated.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this docstring, we should make clear what the new API is. Namely that an integer value specified by app or returned from callback will be used to override the server's value. None specified by app will be used to adopt the server's value.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you intend not to allow the callback to return None? (having the same meaning as if None were assigned to hearbeat directly by user code)?

Copy link
Contributor Author

@darcoli darcoli Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes not allowing the callback return None was actually on purpose. The rationale behind it is that if the client passes a callback heartbeat, then an intention to control the heartbeat is shown and the client then needs to specify it. Of course, if the client wants to accept the server's proposal, it should just return with the same value that was passed as an argument to the callback. But, of course, I can change it to allow None to be returned if you think it makes more sense to do so ;)

and server's proposal will be used as the heartbeat timeout. Use 0
to deactivate heartbeats and None to accept server's proposal.
:param int|callable heartbeat: Heartbeat timeout. If set and is
an integer value, it will be used during connection tuning.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should go further and say that if it's an integer value, it will be used to override the server's value.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed pls review

an integer value, it will be used during connection tuning.
If set and is a callable, it will be invoked during connection
tuning and passed the server's proposal. Its return value will
then be negotiated. Use 0 to deactivate heartbeats and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"then be negotiated" - instead, we should say that the callback return value must be a non-negative integer that will be used to override the server's value during negotiation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed, pls review

If set and is a callable, it will be invoked during connection
tuning and passed the server's proposal. Its return value will
then be negotiated. Use 0 to deactivate heartbeats and
None to accept server's proposal.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This 0 and None sentence should be moved before the "If set and is a callable..." sentence, since the callable is not allowed to return None, thus making this misleading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed, pls review ;)

@@ -620,6 +619,18 @@ def test_on_connection_tune(self, method, heartbeat_checker):
#verfy
self.assertEqual(60, self.connection.params.heartbeat)

# Server value is 60, client passes a heartbeat function that
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server's value is 10, not 60, right? See method_frame.method.heartbeat = 10 below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, fixed

# Server value is 60, client passes a heartbeat function that
# chooses max(servervalue,60). Pick 60
def choose_max(val):
return max(val, 60)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should also assert that pika passed 10 from server's method_frame.method.heartbeat: add self.assertEqual(val, 10) before return in choose_max()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

:param int heartbeat: Heartbeat timeout. Max between this value
and server's proposal will be used as the heartbeat timeout. Use 0
to deactivate heartbeats and None to accept server's proposal.
:param int|callable heartbeat: Heartbeat timeout. If set and is
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for passing callable heartbeat using ConnectionParameters class to the appropriate section in test/unit/connection_parameters_tests.py

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

:param int|callable heartbeat: Heartbeat timeout. If set and is
an integer value, it will be used during connection tuning.
If set and is a callable, it will be invoked during connection
tuning and passed the server's proposal. Its return value will
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand the meaning of "passed the server's proposal". I think you intended to say "its return value will be used to override the server's proposal".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant passed the server's proposal as an argument to the callback. I will change the wording ;)

@@ -370,13 +370,15 @@ def heartbeat(self, value):
"""
:param value: desired connection heartbeat timeout for negotiation or
None to accept broker's value. 0 turns heartbeat off.
Can be a callable that accepts server value and returns a new value
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a test for passing callable heartbeat using Parameters class to the appropriate section in test/unit/connection_parameters_tests.py

@vitaly-krugl
Copy link
Member

@darcoli: I agree with your philosophy regarding:

Yes not allowing the callback return None was actually on purpose. The rationale behind it is that if the client passes a callback heartbeat, then an intention to control the heartbeat is shown and the client then needs to specify it. Of course, if the client wants to accept the server's proposal, it should just return with the same value that was passed as an argument to the callback. But, of course, I can change it to allow None to be returned if you think it makes more sense to do so ;)

lukebakken
lukebakken previously approved these changes Feb 23, 2018
@@ -1975,6 +1984,15 @@ def _on_connection_tune(self, method_frame):
self.params.frame_max = Connection._negotiate_integer_value(self.params.frame_max,
method_frame.method.frame_max)

if callable(self.params.heartbeat):
ret_heartbeat = self.params.heartbeat(method_frame.method.heartbeat)
if not isinstance(ret_heartbeat, numbers.Integral):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darcoli - If something ever changes, I hate fixing the same logic in multiple places. What I meant is the following:

ret_heartbeat = self.params.heartbeat(method_frame.method.heartbeat)
if ret_heartbeat is None or callable(ret_heartbeat):
    # Enforce callback-specific restrictions on callback's return value
    raise TypeError('heartbeat callback must must not return None or callable, but got %r' % (ret_heartbeat,))

# Let hearbeat setter deal with the rest of the validation, so as not to duplicate the additional validation logic
self.params.heartbeat = ret_heartbeat

A callable can be given instead of an integer value. In this case
the callable will be invoked during connection tuning phase and is
given broker's value as an argument. The callable should return
the integer value to be proposed as connection heartbeat timeout.
Copy link
Member

@vitaly-krugl vitaly-krugl Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darcoli If we leave it as "desired connection heartbeat timeout for negotiation", it's not clear to the user that an integer value will actually override the server's value, since "negotiation" could mean anything.

How about the following here and in ConnectionParameters.__init__() docstring? I am going to borrow some of your new text from heartbeat documentation in theConnectionParameters constructor

:param int|None|callable value: Controls AMQP heartbeat timeout negotiation
    during connection tuning. An integer value always overrides the value
    proposed by broker. Use 0 to deactivate heartbeats and None to always accept the
    broker's proposal. If a callable is given, it will be given the heartbeat timeout int proposed
    by broker as its only argument and must return a non-negative integer that will be used to
    override the broker's proposal.

Copy link
Member

@vitaly-krugl vitaly-krugl Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darcoli, while making the above documentation recommendation, I realized that we also need to pass the reference to the connection instance in this callback as the first arg. So, it would be called like this from Connection: callback(connection, broker_value)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good point. Will change and will update the relevant documentation as well

return 1
params.heartbeat = heartbeat_callback
self.assertTrue(callable(params.heartbeat))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use self.assertIs(params.heartbeat, heartbeat_callback)

Copy link
Member

@vitaly-krugl vitaly-krugl Feb 23, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also need a ConnectionParameters test for passing callable heartbeat arg via ConnectionParameters constructor arg under ConnectionParametersTests

Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@darcoli, I submitted pull request darcoli#1 into your fork that resolves pylint findings related to your changes and also merges latest commits from pika master

Fix minor pylint messages related to your pull request and merge pika master
Copy link
Member

@vitaly-krugl vitaly-krugl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @darcoli

@lukebakken lukebakken merged commit fe60150 into pika:master Feb 27, 2018
lukebakken added a commit that referenced this pull request Apr 12, 2018
BlockingConnection tune for heartbeat=0 properly

(cherry picked from commit fe60150)
lukebakken added a commit that referenced this pull request Apr 13, 2018
BlockingConnection tune for heartbeat=0 properly

(cherry picked from commit fe60150)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants