New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement add_callback_threadsafe in all connection adapters #956

Merged
merged 39 commits into from Feb 23, 2018

Conversation

Projects
None yet
3 participants
@vitaly-krugl
Member

vitaly-krugl commented Feb 14, 2018

(NOTE that the failed coverage test results from pika's encrypted credentials configuration issue)

This pull request implements a new method add_callback_threadsafe on all supported connection adapters in pika. This method enables the application to delegate calls from other threads into the context of the adapter's IOLoop thread (similarly to Tornado's add_callback, Twisted's callFromThread, and asyncio's call_soon_threadsafe).

Lack of this feature has been a pain point for users of BlockingConnection and SelectConnection for as long as I can remember, and users have had to rely on clumsy and inefficient workarounds.

Here is a sampling of several relevant issues: #963, #927, #892, #930, #752, #418, #753, #636, #564.

For example, this feature facilitates the app to delegate long-lasting processing of incoming messages to a background thread and then use add_callback_threadsafe to efficiently delegate ACKing of the message in the context of the connection's thread, thus avoiding lost heartbeat/connection issues from blocking too long and also hacky polling techniques that applications have been using to work around the limitation in the past.

ADDITIONAL INFO FOR CHANGELOG: It is customary among frameworks - such as tornado, twisted, and asyncio - to designate a single threadsafe method that accepts a callback to be executed in the IOLoop's thread, and no others are thread-safe for ease of support/maintenance and for API orthogonality. Following this rationale, I removed the thread-safe designation inside the docstring of the ioloop stop() method in select_connection.IOLoop.

Fixes #892

@vitaly-krugl vitaly-krugl changed the title from DO NOT MERGE Implement thread-safe IOLoop.add_callback to DO NOT MERGE Implement thread-safe add_callback Feb 14, 2018

@vitaly-krugl vitaly-krugl changed the title from DO NOT MERGE Implement thread-safe add_callback to DO NOT MERGE Implement add_callback_threadsafe Feb 14, 2018

vitaly-krugl added some commits Feb 15, 2018

Flesh out BlockingConnection.add_callback_threadsafe();
Close ioloop when cleaning up an instance of BlockingConnection.
Fix/suppress several pylint findings.
Implemented BlockingConnection acceptance tests for add_callback_thre…
…adsafe

being called from same and from another thread
@vitaly-krugl

This comment has been minimized.

Member

vitaly-krugl commented Feb 15, 2018

Anyone have any idea why the Coverage travis-ci job is always failing in my PR?

cc @michaelklishin, @lukebakken

@shinji-s

This comment has been minimized.

Contributor

shinji-s commented Feb 15, 2018

The attempt to populate coverage/ directory from s3 storage is failing because credential is not installed/found. Sorry I don't know how to fix though.

@lukebakken

This comment has been minimized.

Contributor

lukebakken commented Feb 15, 2018

@shinji-s pointed it out -

The command "mkdir coverage" exited with 0.
0.49s$ aws s3 cp --recursive s3://com-gavinroy-travis/pika/$TRAVIS_BUILD_NUMBER/ coverage
fatal error: Unable to locate credentials

@gmr - would you have time to address this? Thanks!!!

@shinji-s

This comment has been minimized.

Contributor

shinji-s commented Feb 15, 2018

Regarding the coverage test failure, my guess below may be well off the mark but I'd try.

The credential is given here in .travis.yml

- secure: {actual-encrypted-string-is-omitted}
- secure: {actual-encrypted-string-is-omitted}

However, https://docs.travis-ci.com/user/encryption-keys/ has this to say:

Please note that encrypted environment variables are not available for pull requests from forks.

That is why the credential is not available in the above build?

@vitaly-krugl

This comment has been minimized.

Member

vitaly-krugl commented Feb 15, 2018

@shinji-s, thank you for looking into this. Your assessment makes a ton of sense. If that's the case, then the selected mechanism is inappropriate for the project's needs, since most 3rd-party PRs would naturally come from forks and reviewers as well as contributors need practical means to assess impact of those PRs on code coverage.

vitaly-krugl added some commits Feb 15, 2018

Instrumented add_callback_threadsafe and TestBasicConsumeWithAckFromA…
…notherThread with debug logging to help debug test failure.
Fixed broken comparison of string with message body on python3 in the…
… new "ack from other thread" BlockingConnection tests.
Provide async acceptance tests with a private IOLoop for each adapter…
… so that

these IOLoops may be closed to release resources without risking closing of singletons
which would break subsequent tests (since some adapters, such as torndado and asyncio
make use of the singletons)
@lukebakken

This comment has been minimized.

Contributor

lukebakken commented Feb 21, 2018

Reviewing now

@@ -164,6 +163,16 @@ def __init__(self):
# collection of canceled timeouts
self._num_cancellations = 0
def close(self):

This comment has been minimized.

@lukebakken

lukebakken Feb 21, 2018

Contributor

Should all of the connection adapters implement close? It looks like the Twisted connection IOLoopReactorAdapter does not, and the Tornado docs state that stop must be called before close which isn't enforced in our code. There's also a good chance I'm mis-understanding how these adapters work

This comment has been minimized.

@vitaly-krugl

vitaly-krugl Feb 21, 2018

Member

@lukebakken - good points. I will follow up in a couple of days. Thanks!

This comment has been minimized.

@vitaly-krugl

vitaly-krugl Feb 22, 2018

Member

@lukebakken:

  1. I didn't find the equivalent to ioloop.close() in twisted reactor. And the Twisted connection adapter is not integrated into the async acceptance test framework. In fact, the Twisted adapter has very little, if any, testing in pika (I would be surprised if we didn't have an open issue for it already, but anything is possible).
  2. I added an assertion in select_connection's ioloop close() code path to detect if it's being called before the call(s) to start() fully unwind. On the Tornado adapter side of things, I added a docstring to advise users not to call it until start unwinds - however, enforcement, if any, in the tornado case should be up to tornado's implementation as our use is not different from anyone else's use of tornado.

This comment has been minimized.

@vitaly-krugl

vitaly-krugl Feb 22, 2018

Member

@lukebakken, I added some more tests. The pull request is ready for resuming code review. Thanks!

This comment has been minimized.

@vitaly-krugl

vitaly-krugl Feb 22, 2018

Member

I also discovered an existing bug in the process of implementing new tests that I filed as issue #970.

This comment has been minimized.

@lukebakken

lukebakken Feb 22, 2018

Contributor

Great! I'll get on it today.

vitaly-krugl added some commits Feb 21, 2018

In select_connection.py, raise AssertionError if loop is being
closed before the call(s) to start() unwind.

Implemented select_connection IOLoop tests:
IOLoopCloseAfterStartReturnsTestSelect
IOLoopCloseBeforeStartReturnsTestSelect
IOLoopThreadStopTestSelect

Implemented additaionl tests in acceptance/async_adapter_tests.py:
TestAddCallbackThreadsafeRequestBeforeIOLoopStarts
TestIOLoopStopBeforeIOLoopStarts
Merge branch 'add-callback' of github.com:vitaly-krugl/pika into add-…
…callback

Trying to debug Updates were rejected because the remote contains work that you do not have locally.
In `Connection.close`, don't call `self._close_channels()` if there are
not channels or the connection is still opening, in which case there
should be no channels.
In TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, don't attempt
closing the async connection before it is established to avoid
triggering logic in the close code path that isn't ready to deal with it.
Facilitate safe cleanup after ioloop stops running in async adapter T…
…estIOLoopStopBeforeIOLoopStarts acceptance test.
@vitaly-krugl

This comment has been minimized.

Member

vitaly-krugl commented Feb 22, 2018

@lukebakken - this pull request contains a large number of commits - 39 presently; would it be better if I squashed them into a single commit?

@vitaly-krugl vitaly-krugl merged commit 1573ca1 into pika:master Feb 23, 2018

1 of 2 checks passed

continuous-integration/travis-ci/pr The Travis CI build failed
Details
continuous-integration/appveyor/pr AppVeyor build succeeded
Details
@vitaly-krugl

This comment has been minimized.

Member

vitaly-krugl commented Feb 23, 2018

Thanks @lukebakken !

@lukebakken

This comment has been minimized.

Contributor

lukebakken commented Feb 23, 2018

Great work, this will be very useful for Pika users

lukebakken added a commit that referenced this pull request Apr 16, 2018

Merge pull request #956 from vitaly-krugl/add-callback
Implement add_callback_threadsafe in all connection adapters

(cherry picked from commit 1573ca1)

lukebakken added a commit that referenced this pull request Apr 18, 2018

Merge pull request #1020 from pika/0.12-pika-956
Merge #956 into 0.12 release branch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment