Skip to content

Commit

Permalink
Merge pull request #1045 from lukebakken/pika-1044
Browse files Browse the repository at this point in the history
Fix issue #1044
  • Loading branch information
lukebakken committed May 15, 2018
2 parents 67ba812 + 3d6b4bb commit 5a8c8e2
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 17 deletions.
8 changes: 6 additions & 2 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,8 @@ def add_callback_threadsafe(self, callback):
context of this connection's thread.
NOTE: This is the only thread-safe method in `BlockingConnection`. All
other manipulations of `BlockingConnection` must be performed from the
connection's thread.
other manipulations of `BlockingConnection` must be performed from the
connection's thread.
NOTE: the callbacks are dispatched only in the scope of
specially-designated methods: see
Expand All @@ -740,6 +740,10 @@ def add_callback_threadsafe(self, callback):
functools.partial(channel.basic_ack, delivery_tag=...))
```
NOTE: if you know that the requester is running on the same thread as
the connection it is more efficient to use the
`BlockingConnection.add_timeout()` method with a deadline of 0.
:param method callback: The callback method; must be callable
:raises pika.exceptions.ConnectionWrongStateError: if connection is
Expand Down
18 changes: 3 additions & 15 deletions pika/adapters/select_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,6 @@ def __init__(self):
# Callbacks requested via `add_callback`
self._callbacks = collections.deque()

# Identity of this IOLoop's thread
self._thread_id = None

self._poller = self._get_poller(self._get_remaining_interval,
self.process_timeouts)

Expand Down Expand Up @@ -439,9 +436,9 @@ def add_callback_threadsafe(self, callback):

# NOTE: `deque.append` is atomic
self._callbacks.append(callback)
if threading.current_thread().ident != self._thread_id:
# Wake up the IOLoop running in another thread
self._poller.wake_threadsafe()

# Wake up the IOLoop which may be running in another thread
self._poller.wake_threadsafe()

LOGGER.debug('add_callback_threadsafe: added callback=%r', callback)

Expand Down Expand Up @@ -508,7 +505,6 @@ def start(self):
exit. See `IOLoop.stop`.
"""
self._thread_id = threading.current_thread().ident
self._poller.start()

def stop(self):
Expand All @@ -521,18 +517,12 @@ def stop(self):
`ioloop.add_callback_threadsafe(ioloop.stop)`
"""
if (self._thread_id is not None and
threading.current_thread().ident != self._thread_id):
LOGGER.warning('Use add_callback_threadsafe to request '
'ioloop.stop() from another thread')

self._poller.stop()

def activate_poller(self):
"""[Extension] Activate the poller
"""
self._thread_id = threading.current_thread().ident
self._poller.activate_poller()

def deactivate_poller(self):
Expand Down Expand Up @@ -942,7 +932,6 @@ def poll(self):
# supplied'.
time.sleep(self._get_max_wait())
read, write, error = [], [], []

break
except _SELECT_ERRORS as error:
if _is_resumable(error):
Expand All @@ -951,7 +940,6 @@ def poll(self):
raise

# Build an event bit mask for each fileno we've received an event for

fd_event_map = collections.defaultdict(int)
for fd_set, evt in zip(
(read, write, error),
Expand Down
4 changes: 4 additions & 0 deletions pika/adapters/utils/nbio_interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ def add_callback_threadsafe(self, callback):
All other manipulations of the IOLoop adapter and objects governed
by it must be performed from the IOLoop's thread.
NOTE: if you know that the requester is running on the same thread as
the connection it is more efficient to use the
`ioloop.call_later()` method with a delay of 0.
:param method callback: The callback method; must be callable.
:return: None
"""
Expand Down

0 comments on commit 5a8c8e2

Please sign in to comment.