Skip to content

Commit

Permalink
Merge pull request #1033 from vitaly-krugl/blocking-conn-add-callback…
Browse files Browse the repository at this point in the history
…-mutex

Fix race condition between BlockingConnection.add_callback_threasafe() and BlockingConnection._cleanup()
  • Loading branch information
michaelklishin committed May 3, 2018
2 parents 7b72772 + 7cc038a commit 62de151
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 11 deletions.
32 changes: 25 additions & 7 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import contextlib
import functools
import logging
import threading
import time

import pika.channel
Expand Down Expand Up @@ -338,6 +339,12 @@ def __init__(self, parameters=None, _impl_class=None):
:raises RuntimeError:
"""
# Used for mutual exclusion to avoid race condition between
# BlockingConnection._cleanup() and another thread calling
# BlockingConnection.add_callback_threadsafe() against a closed
# ioloop.
self._cleanup_mutex = threading.Lock()

# Used by the _acquire_event_dispatch decorator; when already greater
# than 0, event dispatch is already acquired higher up the call stack
self._event_dispatch_suspend_depth = 0
Expand Down Expand Up @@ -372,11 +379,14 @@ def __exit__(self, exc_type, value, traceback):
self.close()

def _cleanup(self):
"""Clean up members that might inhibit garbage collection"""
if self._impl is not None:
self._impl.ioloop.close()
self._ready_events.clear()
self._closed_result.reset()
"""Clean up members that might inhibit garbage collection
"""
with self._cleanup_mutex:
if self._impl is not None:
self._impl.ioloop.close()
self._ready_events.clear()
self._closed_result.reset()

@contextlib.contextmanager
def _acquire_event_dispatch(self):
Expand Down Expand Up @@ -715,9 +725,17 @@ def add_callback_threadsafe(self, callback):
:param method callback: The callback method; must be callable
:raises pika.exceptions.ConnectionWrongStateError: if connection is
closed
"""
self._impl.add_callback_threadsafe(
functools.partial(self._on_threadsafe_callback, callback))
with self._cleanup_mutex:
if self.is_closed:
raise exceptions.ConnectionWrongStateError(
'BlockingConnection.add_callback_threadsafe() called on '
'closed or closing connection.')

self._impl.add_callback_threadsafe(
functools.partial(self._on_threadsafe_callback, callback))

def remove_timeout(self, timeout_id):
"""Remove a timer if it's still in the timeout stack
Expand Down
5 changes: 4 additions & 1 deletion pika/adapters/select_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,10 @@ def close(self):
if self._callbacks is not None:
self._poller.close()
self._timer.close()
self._callbacks = None
# Set _callbacks to empty list rather than None so that race from
# another thread calling add_callback_threadsafe() won't result in
# AttributeError
self._callbacks = []

@staticmethod
def _get_poller(get_wait_seconds, process_timeouts):
Expand Down
12 changes: 12 additions & 0 deletions tests/acceptance/blocking_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,18 @@ def test(self):
self.assertLess(elapsed, 0.25)


class TestAddCallbackThreadsafeOnClosedConnectionRaisesWrongState(
BlockingTestCaseBase):

def test(self):
"""BlockingConnection.add_callback_threadsafe on closed connection raises ConnectionWrongStateError"""
connection = self._connect()
connection.close()

with self.assertRaises(pika.exceptions.ConnectionWrongStateError):
connection.add_callback_threadsafe(lambda: None)


class TestAddTimeoutRemoveTimeout(BlockingTestCaseBase):

def test(self):
Expand Down
23 changes: 20 additions & 3 deletions tests/unit/select_connection_ioloop_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
# invalid-name
# pylint: disable=C0103

# attribute-defined-outside-init
# pylint: disable=W0201


LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -151,18 +154,18 @@ def start_test(self):
self.ioloop.close()
mocks['_timer'].close.assert_called_once_with()
mocks['_poller'].close.assert_called_once_with()
self.assertIsNone(self.ioloop._callbacks)
self.assertEqual(self.ioloop._callbacks, [])


class IOLoopCloseAfterStartReturnsTestSelect(IOLoopBaseTest):
class IOLoopCloseAfterStartReturnsTest(IOLoopBaseTest):
""" Test IOLoop.close() after normal return from start(). """
SELECT_POLLER = 'select'

def start_test(self):
self.ioloop.stop() # so start will terminate quickly
self.start()
self.ioloop.close()
self.assertIsNone(self.ioloop._callbacks)
self.assertEqual(self.ioloop._callbacks, [])


class IOLoopCloseBeforeStartReturnsTestSelect(IOLoopBaseTest):
Expand Down Expand Up @@ -218,6 +221,20 @@ class IOLoopThreadStopTestKqueue(IOLoopThreadStopTestSelect):
SELECT_POLLER = 'kqueue'


class IOLoopAddCallbackAfterCloseDoesNotRaiseTestSelect(IOLoopBaseTest):
""" Test ioloop add_callback_threadsafe() after ioloop close doesn't raise exception. """
SELECT_POLLER = 'select'

def start_test(self):
# Simulate closing after start returns
self.ioloop.stop() # so that start() returns ASAP
self.start() # NOTE: Normal return from `start()` constitutes success
self.ioloop.close()

# Expect: add_callback_threadsafe() won't raise after ioloop.close()
self.ioloop.add_callback_threadsafe(lambda: None)


# TODO FUTURE - fix this flaky test
@unittest.skipIf(platform.python_implementation() == 'PyPy', 'test is flaky on PyPy')
class IOLoopTimerTestSelect(IOLoopBaseTest):
Expand Down

0 comments on commit 62de151

Please sign in to comment.