Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement add_callback_threadsafe in all connection adapters #956

Merged
merged 39 commits into from Feb 23, 2018
Merged
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
3107a07
Implement thread-safe IOLoop.add_callback() in select_connection adapter
vitaly-krugl Feb 14, 2018
b6cd85d
Merge remote-tracking branch 'upstream/master' into add-callback
vitaly-krugl Feb 15, 2018
ae42d74
Fleshed out `close()` methods in select_connection's IOLoop and friends.
vitaly-krugl Feb 15, 2018
ef5bc22
Flesh out BlockingConnection.add_callback_threadsafe();
vitaly-krugl Feb 15, 2018
fff06b7
Fix blocking connection unit tests broken by prior commit.
vitaly-krugl Feb 15, 2018
8d96683
Implemented BlockingConnection acceptance tests for add_callback_thre…
vitaly-krugl Feb 15, 2018
12b1d5d
Implemented BlockingConnection acceptance test TestBasicConsumeWithAc…
vitaly-krugl Feb 15, 2018
38486a2
Instrumented add_callback_threadsafe and TestBasicConsumeWithAckFromA…
vitaly-krugl Feb 15, 2018
6070e7d
Add missing timer.start() call to request message ACK
vitaly-krugl Feb 15, 2018
cb46ea8
Implemented BlockingConnection acceptance test TestConsumeGeneratorWi…
vitaly-krugl Feb 15, 2018
1e8f748
Use BlockingChannel.cancel() instead of basic_cancel to cancel the ge…
vitaly-krugl Feb 15, 2018
cfe4a35
Fixed call to BlockingChannel.cancel() from TestConsumeGeneratorWithA…
vitaly-krugl Feb 15, 2018
152d792
Implemented async adapter acceptance tests for add_callback_threadsafe.
vitaly-krugl Feb 16, 2018
35cdf73
Release ioloop resources after class AsyncTestCase returns from ioloo…
vitaly-krugl Feb 16, 2018
d51b6c3
Merge branch 'fix-multiple-timers-same-deadline' into add-callback
vitaly-krugl Feb 16, 2018
61a6fc6
Fixed broken comparison of string with message body on python3 in the…
vitaly-krugl Feb 16, 2018
c88ea7c
Close the connection's ioloop before setting self.connection to None …
vitaly-krugl Feb 16, 2018
eef5331
Close async adapter's ioloop from finally after start exits in AsyncT…
vitaly-krugl Feb 16, 2018
0ea5d5e
Add close method in asyncio_connection's IOLoopAdapter
vitaly-krugl Feb 16, 2018
856b4af
Provide async acceptance tests with a private IOLoop for each adapter…
vitaly-krugl Feb 16, 2018
e0d862a
Compare message bodies to byte literals for compatibility with python3
vitaly-krugl Feb 16, 2018
3af0004
Close ioloops in tests to prevent resource leaks
vitaly-krugl Feb 16, 2018
813be4f
Merge branch 'master' into add-callback
vitaly-krugl Feb 16, 2018
b781769
Close select_connection pollers in a couple more tests to eliminate c…
vitaly-krugl Feb 16, 2018
a4fa188
Implemented unit tests for select_connection's _Timer.close(), IOLoop…
vitaly-krugl Feb 16, 2018
33fad74
Remove comment promising thread-safety from select_connection's IOLoo…
vitaly-krugl Feb 16, 2018
02bac9c
Emit warning from select_connection's IOLoop.stop() method when called
vitaly-krugl Feb 16, 2018
2c2b2e6
add unittest cleanup callbacks to cancel threading.Timer instances in…
vitaly-krugl Feb 16, 2018
3ca1365
Fixed check in test_tornado_connection_call_parent.
vitaly-krugl Feb 17, 2018
a255302
Added mention of `add_callback_threadsafe()` in the FAQ.
vitaly-krugl Feb 17, 2018
f6d5438
Revert "Fixed check in test_tornado_connection_call_parent." accident…
vitaly-krugl Feb 17, 2018
6156df7
Merge branch 'master' into add-callback
lukebakken Feb 21, 2018
13aa231
In select_connection.py, raise AssertionError if loop is being
vitaly-krugl Feb 21, 2018
125a84e
Merge branch 'master' into add-callback
vitaly-krugl Feb 21, 2018
a497069
Merge branch 'add-callback' of github.com:vitaly-krugl/pika into add-…
vitaly-krugl Feb 22, 2018
30c4148
Fixed infinite recursion in new tests that override AsyncTestCase._in…
vitaly-krugl Feb 22, 2018
34d94d4
In `Connection.close`, don't call `self._close_channels()` if there are
vitaly-krugl Feb 22, 2018
f719d22
In TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, don't attempt
vitaly-krugl Feb 22, 2018
5d06249
Facilitate safe cleanup after ioloop stops running in async adapter T…
vitaly-krugl Feb 22, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/faq.rst
Expand Up @@ -3,7 +3,7 @@ Frequently Asked Questions

- Is Pika thread safe?

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method `add_callback_threadsafe` from another thread to schedule a callback within an active pika connection.

- How do I report a bug with Pika?

Expand Down
25 changes: 25 additions & 0 deletions pika/adapters/asyncio_connection.py
Expand Up @@ -20,6 +20,9 @@ def __init__(self, loop):
self.readers = set()
self.writers = set()

def close(self):
self.loop.close()

def add_timeout(self, deadline, callback):
"""Add the callback to the EventLoop timer to fire after deadline
seconds. Returns a Handle to the timeout.
Expand All @@ -41,6 +44,28 @@ def remove_timeout(handle):
"""
return handle.cancel()

def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this IOLoop's thread.

NOTE: This is the only thread-safe method offered by the IOLoop adapter.
All other manipulations of the IOLoop adapter and its parent connection
must be performed from the connection's thread.

For example, a thread may request a call to the
`channel.basic_ack` method of a connection that is running in a
different thread via

```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```

:param method callback: The callback method; must be callable.

"""
self.loop.call_soon_threadsafe(callback)

def add_handler(self, fd, cb, event_state):
""" Registers the given handler to receive the given events for ``fd``.

Expand Down
26 changes: 26 additions & 0 deletions pika/adapters/base_connection.py
Expand Up @@ -127,6 +127,32 @@ def remove_timeout(self, timeout_id):
"""
self.ioloop.remove_timeout(timeout_id)

def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this connection's IOLoop thread.

NOTE: This is the only thread-safe method offered by the connection. All
other manipulations of the connection must be performed from the
connection's thread.

For example, a thread may request a call to the
`channel.basic_ack` method of a connection that is running in a
different thread via

```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```

:param method callback: The callback method; must be callable.

"""
if not callable(callback):
raise TypeError(
'callback must be a callable, but got %r' % (callback,))

self.ioloop.add_callback_threadsafe(callback)

def _adapter_connect(self):
"""Connect to the RabbitMQ broker, returning True if connected.

Expand Down
55 changes: 47 additions & 8 deletions pika/adapters/blocking_connection.py
Expand Up @@ -10,6 +10,9 @@
classes.

"""
# Suppress too-many-lines
# pylint: disable=C0302

# Disable "access to protected member warnings: this wrapper implementation is
# a friend of those instances
# pylint: disable=W0212
Expand Down Expand Up @@ -158,7 +161,7 @@ def elements(self):
with `append_element`
"""
assert self._ready, '_CallbackResult was not set'
assert isinstance(self._values, list) and len(self._values) > 0, (
assert isinstance(self._values, list) and self._values, (
'_CallbackResult value is incompatible with append_element: %r'
% (self._values,))

Expand Down Expand Up @@ -380,7 +383,7 @@ def __repr__(self):

def _cleanup(self):
"""Clean up members that might inhibit garbage collection"""
self._impl.ioloop.deactivate_poller()
self._impl.ioloop.close()
self._ready_events.clear()
self._opened_result.reset()
self._open_error_result.reset()
Expand Down Expand Up @@ -527,6 +530,18 @@ def _on_timer_ready(self, evt):
"""
self._ready_events.append(evt)

def _on_threadsafe_callback(self, user_callback):
"""Handle callback that was registered via `add_callback_threadsafe`.

:param user_callback: callback passed to `add_callback_threadsafe` by
the application.

"""
# Turn it into a 0-delay timeout to take advantage of our existing logic
# that deals with reentrancy
self.add_timeout(0, user_callback)


def _on_connection_blocked(self, user_callback, method_frame):
"""Handle Connection.Blocked notification from RabbitMQ broker

Expand Down Expand Up @@ -622,9 +637,8 @@ def add_timeout(self, deadline, callback):

"""
if not callable(callback):
raise ValueError(
'callback parameter must be callable, but got %r'
% (callback,))
raise TypeError(
'callback must be a callable, but got %r' % (callback,))

evt = _TimerEvt(callback=callback)
timer_id = self._impl.add_timeout(
Expand All @@ -634,6 +648,29 @@ def add_timeout(self, deadline, callback):

return timer_id

def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this connection's thread.

NOTE: This is the only thread-safe method in `BlockingConnection`. All
other manipulations of `BlockingConnection` must be performed from the
connection's thread.

For example, a thread may request a call to the
`BlockingChannel.basic_ack` method of a `BlockingConnection` that is
running in a different thread via

```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```

:param method callback: The callback method; must be callable

"""
self._impl.add_callback_threadsafe(
functools.partial(self._on_threadsafe_callback, callback))

def remove_timeout(self, timeout_id):
"""Remove a timer if it's still in the timeout stack

Expand Down Expand Up @@ -877,7 +914,7 @@ class _ConsumerCancellationEvt(_ChannelPendingEvt):
`Basic.Cancel`
"""

__slots__ = 'method_frame'
__slots__ = ('method_frame',)

def __init__(self, method_frame):
"""
Expand Down Expand Up @@ -1802,7 +1839,8 @@ def consume(self, queue, auto_ack=False,
"""Blocking consumption of a queue instead of via a callback. This
method is a generator that yields each message as a tuple of method,
properties, and body. The active generator iterator terminates when the
consumer is cancelled by client or broker.
consumer is cancelled by client via `BlockingChannel.cancel()` or by
broker.

Example:

Expand Down Expand Up @@ -2397,7 +2435,8 @@ def queue_declare(self, queue, passive=False, durable=False,
:param queue: The queue name
:type queue: str or unicode; if empty string, the broker will create a
unique queue name;
:param bool passive: Only check to see if the queue exists
:param bool passive: Only check to see if the queue exists and raise
`ChannelClosed` if it doesn't;
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
Expand Down