Skip to content

Commit

Permalink
Merge pull request #956 from vitaly-krugl/add-callback
Browse files Browse the repository at this point in the history
Implement add_callback_threadsafe in all connection adapters

(cherry picked from commit 1573ca1)
  • Loading branch information
vitaly-krugl authored and lukebakken committed Apr 16, 2018
1 parent 4f11520 commit 42f3d5b
Show file tree
Hide file tree
Showing 15 changed files with 883 additions and 109 deletions.
2 changes: 1 addition & 1 deletion docs/faq.rst
Expand Up @@ -3,7 +3,7 @@ Frequently Asked Questions

- Is Pika thread safe?

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.
Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method `add_callback_threadsafe` from another thread to schedule a callback within an active pika connection.

- How do I report a bug with Pika?

Expand Down
33 changes: 33 additions & 0 deletions pika/adapters/asyncio_connection.py
Expand Up @@ -20,6 +20,17 @@ def __init__(self, loop):
self.readers = set()
self.writers = set()

def close(self):
"""Release ioloop's resources.
This method is intended to be called by the application or test code
only after the ioloop's outermost `start()` call returns. After calling
`close()`, no other interaction with the closed instance of ioloop
should be performed.
"""
self.loop.close()

def add_timeout(self, deadline, callback_method):
"""Add the callback_method to the EventLoop timer to fire after deadline
seconds. Returns a Handle to the timeout.
Expand All @@ -41,6 +52,28 @@ def remove_timeout(handle):
"""
return handle.cancel()

def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this IOLoop's thread.
NOTE: This is the only thread-safe method offered by the IOLoop adapter.
All other manipulations of the IOLoop adapter and its parent connection
must be performed from the connection's thread.
For example, a thread may request a call to the
`channel.basic_ack` method of a connection that is running in a
different thread via
```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```
:param method callback: The callback method; must be callable.
"""
self.loop.call_soon_threadsafe(callback)

def add_handler(self, fd, cb, event_state):
""" Registers the given handler to receive the given events for ``fd``.
Expand Down
26 changes: 26 additions & 0 deletions pika/adapters/base_connection.py
Expand Up @@ -135,6 +135,32 @@ def remove_timeout(self, timeout_id):
"""
self.ioloop.remove_timeout(timeout_id)

def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this connection's IOLoop thread.
NOTE: This is the only thread-safe method offered by the connection. All
other manipulations of the connection must be performed from the
connection's thread.
For example, a thread may request a call to the
`channel.basic_ack` method of a connection that is running in a
different thread via
```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```
:param method callback: The callback method; must be callable.
"""
if not callable(callback):
raise TypeError(
'callback must be a callable, but got %r' % (callback,))

self.ioloop.add_callback_threadsafe(callback)

def _adapter_connect(self):
"""Connect to the RabbitMQ broker, returning True if connected.
Expand Down
50 changes: 45 additions & 5 deletions pika/adapters/blocking_connection.py
Expand Up @@ -10,6 +10,9 @@
classes.
"""
# Suppress too-many-lines
# pylint: disable=C0302

# Disable "access to protected member warnings: this wrapper implementation is
# a friend of those instances
# pylint: disable=W0212
Expand Down Expand Up @@ -155,7 +158,7 @@ def elements(self):
with `append_element`
"""
assert self._ready, '_CallbackResult was not set'
assert isinstance(self._values, list) and len(self._values) > 0, (
assert isinstance(self._values, list) and self._values, (
'_CallbackResult value is incompatible with append_element: %r'
% (self._values,))

Expand Down Expand Up @@ -378,7 +381,7 @@ def __repr__(self):

def _cleanup(self):
"""Clean up members that might inhibit garbage collection"""
self._impl.ioloop.deactivate_poller()
self._impl.ioloop.close()
self._ready_events.clear()
self._opened_result.reset()
self._open_error_result.reset()
Expand Down Expand Up @@ -525,6 +528,18 @@ def _on_timer_ready(self, evt):
"""
self._ready_events.append(evt)

def _on_threadsafe_callback(self, user_callback):
"""Handle callback that was registered via `add_callback_threadsafe`.
:param user_callback: callback passed to `add_callback_threadsafe` by
the application.
"""
# Turn it into a 0-delay timeout to take advantage of our existing logic
# that deals with reentrancy
self.add_timeout(0, user_callback)


def _on_connection_blocked(self, user_callback, method_frame):
"""Handle Connection.Blocked notification from RabbitMQ broker
Expand Down Expand Up @@ -632,6 +647,29 @@ def add_timeout(self, deadline, callback_method):

return timer_id

def add_callback_threadsafe(self, callback):
"""Requests a call to the given function as soon as possible in the
context of this connection's thread.
NOTE: This is the only thread-safe method in `BlockingConnection`. All
other manipulations of `BlockingConnection` must be performed from the
connection's thread.
For example, a thread may request a call to the
`BlockingChannel.basic_ack` method of a `BlockingConnection` that is
running in a different thread via
```
connection.add_callback_threadsafe(
functools.partial(channel.basic_ack, delivery_tag=...))
```
:param method callback: The callback method; must be callable
"""
self._impl.add_callback_threadsafe(
functools.partial(self._on_threadsafe_callback, callback))

def remove_timeout(self, timeout_id):
"""Remove a timer if it's still in the timeout stack
Expand Down Expand Up @@ -874,7 +912,7 @@ class _ConsumerCancellationEvt(_ChannelPendingEvt):
`Basic.Cancel`
"""

__slots__ = 'method_frame'
__slots__ = ('method_frame',)

def __init__(self, method_frame):
"""
Expand Down Expand Up @@ -1798,7 +1836,8 @@ def consume(self, queue, no_ack=False,
"""Blocking consumption of a queue instead of via a callback. This
method is a generator that yields each message as a tuple of method,
properties, and body. The active generator iterator terminates when the
consumer is cancelled by client or broker.
consumer is cancelled by client via `BlockingChannel.cancel()` or by
broker.
Example:
Expand Down Expand Up @@ -2398,7 +2437,8 @@ def queue_declare(self, queue='', passive=False, durable=False,
:param queue: The queue name
:type queue: str or unicode; if empty string, the broker will create a
unique queue name;
:param bool passive: Only check to see if the queue exists
:param bool passive: Only check to see if the queue exists and raise
`ChannelClosed` if it doesn't;
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
Expand Down

0 comments on commit 42f3d5b

Please sign in to comment.