From 3107a0796484d44e3cb59cc1c616acc2fae84a3b Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Tue, 13 Feb 2018 23:53:54 -0800 Subject: [PATCH 01/33] Implement thread-safe IOLoop.add_callback() in select_connection adapter --- pika/adapters/select_connection.py | 154 +++++++++++++++++++---------- 1 file changed, 103 insertions(+), 51 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index f6e6f3e3a..eda89c444 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -3,6 +3,7 @@ """ import abc +import collections import errno import functools import heapq @@ -11,8 +12,6 @@ import time import threading -from collections import defaultdict - import pika.compat from pika.adapters.base_connection import BaseConnection @@ -282,7 +281,14 @@ class IOLoop(object): def __init__(self): self._timer = _Timer() - self._poller = self._get_poller(self._timer.get_remaining_interval, + # Callbacks requested via `add_callback` + self._callbacks = collections.deque() + + # Identity of this IOLoop's thread + self._thread_id = None + + + self._poller = self._get_poller(self._get_remaining_interval, self.process_timeouts) @staticmethod @@ -346,13 +352,55 @@ def remove_timeout(self, timeout_id): """ self._timer.remove_timeout(timeout_id) + def add_callback_threadsafe(self, callback): + """Calls the given function on the next iteration of the IOLoop in the + context of the IOLoop's thread. + + NOTE: This is the only thread-safe method in IOLoop. All other + manipulations of IOLoop must be performed from the IOLoop's thread. + + For example, a thread may request a call to the `stop` method of an + ioloop that is running in a different thread via + `ioloop.add_callback_threadsafe(ioloop.stop)` + + :param method callback: The callback method + + """ + if not callable(callback): + raise TypeError( + 'callback must be a callable, but got %r' % (callback,)) + + # NOTE: `deque.append` is atomic + self._callbacks.append(callback) + if threading.current_thread().ident != self._thread_id: + # Wake up the IOLoop running in another thread + self._poller.wake_threadsafe() + def process_timeouts(self): - """[Extension] Process pending timeouts, invoking callbacks for those - whose time has come. Internal use only. + """[Extension] Process pending callbacks and timeouts, invoking those + whose time has come. Internal use only. """ + # Avoid I/O starvation by postponing new callbacks to the next iteration + for _ in pika.compat.xrange(len(self._callbacks)): + self._callbacks.popleft()() + self._timer.process_timeouts() + def _get_remaining_interval(self): + """Get the remaining interval to the next callback or timeout + expiration. + + :returns: non-negative number of seconds until next callback or timer + expiration; None if there are no callbacks and timers + :rtype: float + + """ + if self._callbacks: + return 0 + + return self._timer.get_remaining_interval() + def add_handler(self, fileno, handler, events): """[API] Add a new fileno to the set to be monitored @@ -385,6 +433,7 @@ def start(self): exit. See `IOLoop.stop`. """ + self._thread_id = threading.current_thread().ident self._poller.start() def stop(self): @@ -392,6 +441,9 @@ def stop(self): stop before this method returns. This is the only method that may be called from another thread. + TODO This shouldn't have been made thread-safe. Instead, `add_callback` + TODO should be used to safely call `stop` from the IOLoop's own thread. + TODO Update documentation/implementation """ self._poller.stop() @@ -399,6 +451,7 @@ def activate_poller(self): """[Extension] Activate the poller """ + self._thread_id = threading.current_thread().ident self._poller.activate_poller() def deactivate_poller(self): @@ -442,6 +495,10 @@ def __init__(self, get_wait_seconds, process_timeouts): self._get_wait_seconds = get_wait_seconds self._process_timeouts = process_timeouts + # We guard access to the waking file descriptors to avoid races from + # closing them while another thread is calling our `wake()` method. + self._waking_mutex = threading.Lock() + # fd-to-handler function mappings self._fd_handlers = dict() @@ -455,14 +512,42 @@ def __init__(self, get_wait_seconds, process_timeouts): self._stopping = False - # Mutex for controlling critical sections where ioloop-interrupt sockets - # are created, used, and destroyed. Needed in case `stop()` is called - # from a thread. - self._mutex = threading.Lock() + # Create ioloop-interrupt socket pair and register read handler. + self._r_interrupt, self._w_interrupt = self._get_interrupt_pair() + self.add_handler(self._r_interrupt.fileno(), self._read_interrupt, READ) + + def close(self): + """TODO figure out where to call it + """ + # Unregister and close ioloop-interrupt socket pair + with self._waking_mutex: + self.remove_handler(self._r_interrupt.fileno()) # pylint: disable=E1101 + self._r_interrupt.close() + self._r_interrupt = None + self._w_interrupt.close() + self._w_interrupt = None + + def wake_threadsafe(self): + """Wake up the poller as soon as possible. As the name indicates, this + method is thread-safe. + """ + with self._waking_mutex: + if self._w_interrupt is None: + return + + try: + # Send byte to interrupt the poll loop, use send() instead of + # os.write for Windows compatibility + self._w_interrupt.send(b'X') + except pika.compat.SOCKET_ERROR as err: + if err.errno != errno.EWOULDBLOCK: + raise + except Exception as err: + # There's nothing sensible to do here, we'll exit the interrupt + # loop after POLL_TIMEOUT secs in worst case anyway. + LOGGER.warning("Failed to send interrupt to poller: %s", err) + raise - # ioloop-interrupt socket pair; initialized in start() - self._r_interrupt = None - self._w_interrupt = None def _get_max_wait(self): """Get the interval to the next timeout event, or a default interval @@ -557,7 +642,7 @@ def activate_poller(self): """ # Activate the underlying poller and register current events self._init_poller() - fd_to_events = defaultdict(int) + fd_to_events = collections.defaultdict(int) for event, file_descriptors in self._fd_events.items(): for fileno in file_descriptors: fd_to_events[fileno] |= event @@ -584,17 +669,6 @@ def start(self): # Activate the underlying poller and register current events self.activate_poller() - # Create ioloop-interrupt socket pair and register read handler. - # NOTE: we defer their creation because some users (e.g., - # BlockingConnection adapter) don't use the event loop and these - # sockets would get reported as leaks - with self._mutex: - assert self._r_interrupt is None - self._r_interrupt, self._w_interrupt = self._get_interrupt_pair( - ) - self.add_handler(self._r_interrupt.fileno(), - self._read_interrupt, READ) - else: LOGGER.debug('Reentering IOLoop at nesting level=%s', self._start_nesting_levels) @@ -609,14 +683,7 @@ def start(self): self._start_nesting_levels -= 1 if self._start_nesting_levels == 0: - LOGGER.debug('Cleaning up IOLoop') - # Unregister and close ioloop-interrupt socket pair - with self._mutex: - self.remove_handler(self._r_interrupt.fileno()) - self._r_interrupt.close() - self._r_interrupt = None - self._w_interrupt.close() - self._w_interrupt = None + LOGGER.debug('Deactivating poller') # Deactivate the underlying poller self.deactivate_poller() @@ -633,22 +700,7 @@ def stop(self): LOGGER.debug('Stopping IOLoop') self._stopping = True - with self._mutex: - if self._w_interrupt is None: - return - - try: - # Send byte to interrupt the poll loop, use send() instead of - # os.write for Windows compatibility - self._w_interrupt.send(b'X') - except pika.compat.SOCKET_ERROR as err: - if err.errno != errno.EWOULDBLOCK: - raise - except Exception as err: - # There's nothing sensible to do here, we'll exit the interrupt - # loop after POLL_TIMEOUT secs in worst case anyway. - LOGGER.warning("Failed to send ioloop interrupt: %s", err) - raise + self.wake_threadsafe() @abc.abstractmethod def poll(self): @@ -796,7 +848,7 @@ def poll(self): # Build an event bit mask for each fileno we've received an event for - fd_event_map = defaultdict(int) + fd_event_map = collections.defaultdict(int) for fd_set, evt in zip((read, write, error), (READ, WRITE, ERROR)): for fileno in fd_set: fd_event_map[fileno] |= evt @@ -893,7 +945,7 @@ def poll(self): else: raise - fd_event_map = defaultdict(int) + fd_event_map = collections.defaultdict(int) for event in kevents: fd_event_map[event.ident] |= self._map_event(event) @@ -1012,7 +1064,7 @@ def poll(self): else: raise - fd_event_map = defaultdict(int) + fd_event_map = collections.defaultdict(int) for fileno, event in events: fd_event_map[fileno] |= event From ae42d748759d8afa81786b3f5cf2a5e700a1d3a1 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 14 Feb 2018 21:24:06 -0800 Subject: [PATCH 02/33] Fleshed out `close()` methods in select_connection's IOLoop and friends. --- pika/adapters/select_connection.py | 38 +++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index eda89c444..ce53f21aa 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -162,6 +162,12 @@ def __init__(self): # collection of canceled timeouts self._num_cancellations = 0 + def close(self): + """Release resources. Don't use the `_Timer` instance after closing + it + """ + self._timeout_heap = None + def call_later(self, delay, callback): """Schedule a one-shot timeout given delay seconds. @@ -287,10 +293,21 @@ def __init__(self): # Identity of this IOLoop's thread self._thread_id = None - self._poller = self._get_poller(self._get_remaining_interval, self.process_timeouts) + def close(self): + """Release IOLoop's resources. + + `IOLoop.close` is intended to be called by the application or test code + only after `IOLoop.start()` returns. After calling `close()`, no other + interaction with the closed instance of `IOLoop` should be performed. + + """ + self._poller.close() + self._timer.close() + self._callbacks = None + @staticmethod def _get_poller(get_wait_seconds, process_timeouts): """Determine the best poller to use for this environment and instantiate @@ -517,9 +534,19 @@ def __init__(self, get_wait_seconds, process_timeouts): self.add_handler(self._r_interrupt.fileno(), self._read_interrupt, READ) def close(self): - """TODO figure out where to call it + """Release poller's resources. + + TODO Need to call this from Poller tests to avoid socket resource + TODO leak messages + + `close()` is intended to be called after the poller's `start()` method + returns. After calling `close()`, no other interaction with the closed + poller instance should be performed. + """ - # Unregister and close ioloop-interrupt socket pair + # Unregister and close ioloop-interrupt socket pair; mutual exclusion is + # necessary to avoid race condition with `wake_threadsafe` running in + # another thread's context with self._waking_mutex: self.remove_handler(self._r_interrupt.fileno()) # pylint: disable=E1101 self._r_interrupt.close() @@ -527,9 +554,14 @@ def close(self): self._w_interrupt.close() self._w_interrupt = None + self._fd_handlers = None + self._fd_events = None + self._processing_fd_event_map = None + def wake_threadsafe(self): """Wake up the poller as soon as possible. As the name indicates, this method is thread-safe. + """ with self._waking_mutex: if self._w_interrupt is None: From ef5bc224ea3b11d07e3659822f01422f0dbf61c1 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 14 Feb 2018 22:12:15 -0800 Subject: [PATCH 03/33] Flesh out BlockingConnection.add_callback_threadsafe(); Close ioloop when cleaning up an instance of BlockingConnection. Fix/suppress several pylint findings. --- pika/adapters/blocking_connection.py | 53 ++++++++++++++++++++++++---- pika/adapters/select_connection.py | 17 ++++++--- 2 files changed, 59 insertions(+), 11 deletions(-) diff --git a/pika/adapters/blocking_connection.py b/pika/adapters/blocking_connection.py index 8649d2eaf..a1ea17611 100644 --- a/pika/adapters/blocking_connection.py +++ b/pika/adapters/blocking_connection.py @@ -10,6 +10,9 @@ classes. """ +# Suppress too-many-lines +# pylint: disable=C0302 + # Disable "access to protected member warnings: this wrapper implementation is # a friend of those instances # pylint: disable=W0212 @@ -158,7 +161,7 @@ def elements(self): with `append_element` """ assert self._ready, '_CallbackResult was not set' - assert isinstance(self._values, list) and len(self._values) > 0, ( + assert isinstance(self._values, list) and self._values, ( '_CallbackResult value is incompatible with append_element: %r' % (self._values,)) @@ -381,7 +384,7 @@ def __repr__(self): def _cleanup(self): """Clean up members that might inhibit garbage collection""" - self._impl.ioloop.deactivate_poller() + self._impl.ioloop.close() self._ready_events.clear() self._opened_result.reset() self._open_error_result.reset() @@ -528,6 +531,18 @@ def _on_timer_ready(self, evt): """ self._ready_events.append(evt) + def _on_threadsafe_callback(self, user_callback): + """Handle callback that was registered via `add_callback_threadsafe`. + + :param user_callback: callback passed to `add_callback_threadsafe` by + the application. + + """ + # Turn it into a 0-delay timeout to take advantage of our existing logic + # that deals with reentrancy + self.add_timeout(0, user_callback) + + def _on_connection_blocked(self, user_callback, method_frame): """Handle Connection.Blocked notification from RabbitMQ broker @@ -623,9 +638,8 @@ def add_timeout(self, deadline, callback): """ if not callable(callback): - raise ValueError( - 'callback parameter must be callable, but got %r' - % (callback,)) + raise TypeError( + 'callback must be a callable, but got %r' % (callback,)) evt = _TimerEvt(callback=callback) timer_id = self._impl.add_timeout( @@ -635,6 +649,33 @@ def add_timeout(self, deadline, callback): return timer_id + def add_callback_threadsafe(self, callback): + """Calls the given function on the next iteration of the connection's + IOLoop in the context of the IOLoop's thread. + + NOTE: This is the only thread-safe method in `BlockingConnection`. All + other manipulations of `BlockingConnection` must be performed from the + connection's thread. + + For example, a thread may request a call to the + `BlockingChannel.basic_ack` method of a `BlockingConnection` that is + running in a different thread via + + ``` + connection.add_callback_threadsafe( + functools.partial(channel.basic_ack, delivery_tag=...)) + ``` + + :param method callback: The callback method + + """ + if not callable(callback): + raise TypeError( + 'callback must be a callable, but got %r' % (callback,)) + + self._impl.ioloop.add_callback_threadsafe( + functools.partial(self._on_threadsafe_callback, callback)) + def remove_timeout(self, timeout_id): """Remove a timer if it's still in the timeout stack @@ -878,7 +919,7 @@ class _ConsumerCancellationEvt(_ChannelPendingEvt): `Basic.Cancel` """ - __slots__ = 'method_frame' + __slots__ = ('method_frame',) def __init__(self, method_frame): """ diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index ce53f21aa..bc24ed1c1 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -299,6 +299,9 @@ def __init__(self): def close(self): """Release IOLoop's resources. + TODO Need to call this from IOLoop and SelectConnection tests as well as + TODO from BlockingConnection to avoid socket resource leak messages + `IOLoop.close` is intended to be called by the application or test code only after `IOLoop.start()` returns. After calling `close()`, no other interaction with the closed instance of `IOLoop` should be performed. @@ -554,6 +557,8 @@ def close(self): self._w_interrupt.close() self._w_interrupt = None + self.deactivate_poller() + self._fd_handlers = None self._fd_events = None self._processing_fd_event_map = None @@ -991,8 +996,9 @@ def _init_poller(self): def _uninit_poller(self): """Notify the implementation to release the poller resource""" - self._kqueue.close() - self._kqueue = None + if self._kqueue is not None: + self._kqueue.close() + self._kqueue = None def _register_fd(self, fileno, events): """The base class invokes this method to notify the implementation to @@ -1110,10 +1116,11 @@ def _init_poller(self): def _uninit_poller(self): """Notify the implementation to release the poller resource""" - if hasattr(self._poll, "close"): - self._poll.close() + if self._poll is not None: + if hasattr(self._poll, "close"): + self._poll.close() - self._poll = None + self._poll = None def _register_fd(self, fileno, events): """The base class invokes this method to notify the implementation to From fff06b7e380443b2e1c400cb7bfdfc3047eddf48 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 14 Feb 2018 22:28:44 -0800 Subject: [PATCH 04/33] Fix blocking connection unit tests broken by prior commit. --- pika/adapters/select_connection.py | 3 +-- tests/unit/blocking_connection_tests.py | 9 +++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index bc24ed1c1..ad302a121 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -944,9 +944,8 @@ class KQueuePoller(_PollerBase): def __init__(self, get_wait_seconds, process_timeouts): """Create an instance of the KQueuePoller """ - super(KQueuePoller, self).__init__(get_wait_seconds, process_timeouts) - self._kqueue = None + super(KQueuePoller, self).__init__(get_wait_seconds, process_timeouts) @staticmethod def _map_event(kevent): diff --git a/tests/unit/blocking_connection_tests.py b/tests/unit/blocking_connection_tests.py index 118aa2d79..ab046a07d 100644 --- a/tests/unit/blocking_connection_tests.py +++ b/tests/unit/blocking_connection_tests.py @@ -126,8 +126,7 @@ def test_flush_output_user_initiated_close(self, connection._flush_output(lambda: False, lambda: True) self.assertEqual(connection._impl.ioloop.activate_poller.call_count, 1) - self.assertEqual(connection._impl.ioloop.deactivate_poller.call_count, - 1) + self.assertEqual(connection._impl.ioloop.close.call_count, 1) @patch.object( blocking_connection, @@ -152,8 +151,7 @@ def test_flush_output_server_initiated_error_close( self.assertSequenceEqual(cm.exception.args, (404, 'not found')) self.assertEqual(connection._impl.ioloop.activate_poller.call_count, 1) - self.assertEqual(connection._impl.ioloop.deactivate_poller.call_count, - 1) + self.assertEqual(connection._impl.ioloop.close.call_count, 1) @patch.object( blocking_connection, @@ -178,8 +176,7 @@ def test_flush_output_server_initiated_no_error_close( self.assertSequenceEqual(cm.exception.args, (200, 'ok')) self.assertEqual(connection._impl.ioloop.activate_poller.call_count, 1) - self.assertEqual(connection._impl.ioloop.deactivate_poller.call_count, - 1) + self.assertEqual(connection._impl.ioloop.close.call_count, 1) @patch.object( blocking_connection, From 8d966835fff4cb90b2c5e14740081754c76b1cd9 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 14 Feb 2018 23:51:11 -0800 Subject: [PATCH 05/33] Implemented BlockingConnection acceptance tests for add_callback_threadsafe being called from same and from another thread --- tests/acceptance/blocking_adapter_test.py | 43 +++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index 287b486e9..e7853754e 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1,7 +1,9 @@ """blocking adapter test""" from datetime import datetime +import functools import logging import socket +import threading import time import unittest import uuid @@ -449,6 +451,47 @@ def test(self): 'Blocked connection timeout expired')) +class TestAddCallbackThreadsafeFromSameThread(BlockingTestCaseBase): + + def test(self): + """BlockingConnection.add_callback_threadsafe from same thread""" + connection = self._connect() + + # Test timer completion + start_time = time.time() + rx_callback = [] + connection.add_callback_threadsafe( + lambda: rx_callback.append(time.time())) + while not rx_callback: + connection.process_data_events(time_limit=None) + + self.assertEqual(len(rx_callback), 1) + elapsed = time.time() - start_time + self.assertLess(elapsed, 0.25) + + +class TestAddCallbackThreadsafeFromAnotherThread(BlockingTestCaseBase): + + def test(self): + """BlockingConnection.add_callback_threadsafe from another thread""" + connection = self._connect() + + # Test timer completion + start_time = time.time() + rx_callback = [] + timer = threading.Timer( + 0, + functools.partial(connection.add_callback_threadsafe, + lambda: rx_callback.append(time.time()))) + timer.start() + while not rx_callback: + connection.process_data_events(time_limit=None) + + self.assertEqual(len(rx_callback), 1) + elapsed = time.time() - start_time + self.assertLess(elapsed, 0.25) + + class TestAddTimeoutRemoveTimeout(BlockingTestCaseBase): def test(self): From 12b1d5db22c36cd14bfd48e9382cd3508ee9aa4b Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 11:46:03 -0800 Subject: [PATCH 06/33] Implemented BlockingConnection acceptance test TestBasicConsumeWithAckFromAnotherThread --- tests/acceptance/blocking_adapter_test.py | 89 ++++++++++++++++++++++- 1 file changed, 86 insertions(+), 3 deletions(-) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index e7853754e..d20597500 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1538,7 +1538,7 @@ def test(self): # pylint: disable=R0914 mandatory=True) self.assertEqual(res, True) - # Flush channel to force Basic.Return + # Flush connection to force Basic.Return connection.channel().close() # Deposit a routable message in the queue @@ -1697,7 +1697,7 @@ def test(self): # pylint: disable=R0914,R0915 queue=q_name, expected_count=0) - # Attempt to cosume again with a short timeout + # Attempt to consume again with a short timeout connection.process_data_events(time_limit=0.005) self.assertEqual(len(rx_messages), 2) @@ -1712,6 +1712,89 @@ def test(self): # pylint: disable=R0914,R0915 self.assertEqual(frame.method.consumer_tag, consumer_tag) +class TestBasicConsumeWithAckFromAnotherThread(BlockingTestCaseBase): + + def test(self): # pylint: disable=R0914,R0915 + """BlockingChannel.basic_consume with ack from another thread and \ + requesting basic_ack via add_callback_threadsafe + """ + # This test simulates processing of a message on another thread and + # then requesting an ACK to be dispatched on the connection's thread + # via BlockingConnection.add_callback_threadsafe + + connection = self._connect() + + ch = connection.channel() + + q_name = 'TestBasicConsumeWithAckFromAnotherThread_q' + uuid.uuid1().hex + exg_name = ('TestBasicConsumeWithAckFromAnotherThread_exg' + + uuid.uuid1().hex) + routing_key = 'TestBasicConsumeWithAckFromAnotherThread' + + # Place channel in publisher-acknowledgments mode so that publishing + # with mandatory=True will be synchronous (for convenience) + res = ch.confirm_delivery() + self.assertIsNone(res) + + # Declare a new exchange + ch.exchange_declare(exg_name, exchange_type='direct') + self.addCleanup(connection.channel().exchange_delete, exg_name) + + # Declare a new queue + ch.queue_declare(q_name, auto_delete=True) + self.addCleanup(self._connect().channel().queue_delete, q_name) + + # Bind the queue to the exchange using routing key + ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) + + # Publish 2 messages with mandatory=True for synchronous processing + ch.publish(exg_name, routing_key, body='msg1', mandatory=True) + ch.publish(exg_name, routing_key, body='last-msg', mandatory=True) + + # Configure QoS for one message so that the 2nd message will be + # delivered only after the 1st one is ACKed + ch.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=False) + + # Create a consumer + consumer_tag_container = [] # so can access consumer tag from callback + + def ackAndEnqueueMessageFromAnotherThread(rx_ch, + rx_method, + rx_properties, # pylint: disable=W0613 + rx_body): + if rx_body == 'last-msg': + # Stop the `start_consuming` call + rx_ch.basic_cancel(consumer_tag_container[0]) + + # Request ACK dispatch via add_callback_threadsafe from other thread + threading.Timer( + 0, + lambda: connection.add_callback_threadsafe( + lambda: (ch.basic_ack(delivery_tag=rx_method.delivery_tag, + multiple=False), + rx_messages.append(rx_body)))) + + rx_messages = [] + consumer_tag = ch.basic_consume( + q_name, + ackAndEnqueueMessageFromAnotherThread, + auto_ack=False, + exclusive=False, + arguments=None) + consumer_tag_container.append(consumer_tag) + + # Wait for both messages + ch.start_consuming() + + # Wait for callbacks to finish processing + while len(rx_messages) < 2: + time.sleep(0.001) + + self.assertEqual(len(rx_messages), 2) + self.assertEqual(rx_messages[0], 'msg1') + self.assertEqual(rx_messages[1], 'last-msg') + + class TestTwoBasicConsumersOnSameChannel(BlockingTestCaseBase): def test(self): # pylint: disable=R0914 @@ -1982,7 +2065,7 @@ def test(self): # pylint: disable=R0914,R0915 queue=q_name, expected_count=0) - # Attempt to cosume again with a short timeout + # Attempt to consume again with a short timeout connection.process_data_events(time_limit=0.005) self.assertEqual(len(rx_messages), 2) From 38486a2f7a873581b3be9b366e68ccee8eedc546 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 12:40:45 -0800 Subject: [PATCH 07/33] Instrumented add_callback_threadsafe and TestBasicConsumeWithAckFromAnotherThread with debug logging to help debug test failure. --- pika/adapters/select_connection.py | 2 + tests/acceptance/blocking_adapter_test.py | 51 +++++++++++++++-------- 2 files changed, 35 insertions(+), 18 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index ad302a121..55df389a5 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -396,6 +396,8 @@ def add_callback_threadsafe(self, callback): # Wake up the IOLoop running in another thread self._poller.wake_threadsafe() + LOGGER.debug('add_callback_threadsafe: added callback=%r', callback) + def process_timeouts(self): """[Extension] Process pending callbacks and timeouts, invoking those whose time has come. Internal use only. diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index d20597500..329f6dc0b 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1756,23 +1756,37 @@ def test(self): # pylint: disable=R0914,R0915 ch.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=False) # Create a consumer - consumer_tag_container = [] # so can access consumer tag from callback - def ackAndEnqueueMessageFromAnotherThread(rx_ch, rx_method, rx_properties, # pylint: disable=W0613 rx_body): - if rx_body == 'last-msg': - # Stop the `start_consuming` call - rx_ch.basic_cancel(consumer_tag_container[0]) - - # Request ACK dispatch via add_callback_threadsafe from other thread - threading.Timer( - 0, - lambda: connection.add_callback_threadsafe( - lambda: (ch.basic_ack(delivery_tag=rx_method.delivery_tag, - multiple=False), - rx_messages.append(rx_body)))) + LOGGER.debug( + '%s: Got message body=%r; delivery-tag=%r', + datetime.now(), rx_body, rx_method.delivery_tag) + + # Request ACK dispatch via add_callback_threadsafe from other + # thread; if last message, cancel consumer so that start_consuming + # can return + + def processOnConnectionThread(): + LOGGER.debug('%s: ACKing message body=%r; delivery-tag=%r', + datetime.now(), + rx_body, + rx_method.delivery_tag) + ch.basic_ack(delivery_tag=rx_method.delivery_tag, + multiple=False) + rx_messages.append(rx_body) + + if rx_body == 'last-msg': + LOGGER.debug('%s: Canceling consumer consumer-tag=%r', + datetime.now(), + rx_method.consumer_tag) + rx_ch.basic_cancel(rx_method.consumer_tag) + + # Spawn a thread to initiate ACKing + threading.Timer(0, + lambda: connection.add_callback_threadsafe( + processOnConnectionThread)) rx_messages = [] consumer_tag = ch.basic_consume( @@ -1781,14 +1795,15 @@ def ackAndEnqueueMessageFromAnotherThread(rx_ch, auto_ack=False, exclusive=False, arguments=None) - consumer_tag_container.append(consumer_tag) # Wait for both messages + LOGGER.debug('%s: calling start_consuming(); consumer tag=%r', + datetime.now(), + consumer_tag) ch.start_consuming() - - # Wait for callbacks to finish processing - while len(rx_messages) < 2: - time.sleep(0.001) + LOGGER.debug('%s: Returned from start_consuming(); consumer tag=%r', + datetime.now(), + consumer_tag) self.assertEqual(len(rx_messages), 2) self.assertEqual(rx_messages[0], 'msg1') From 6070e7d70b27ef10d888c20d674c8c034babe731 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 12:50:18 -0800 Subject: [PATCH 08/33] Add missing timer.start() call to request message ACK --- tests/acceptance/blocking_adapter_test.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index 329f6dc0b..2a8767072 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1784,9 +1784,10 @@ def processOnConnectionThread(): rx_ch.basic_cancel(rx_method.consumer_tag) # Spawn a thread to initiate ACKing - threading.Timer(0, - lambda: connection.add_callback_threadsafe( - processOnConnectionThread)) + timer = threading.Timer(0, + lambda: connection.add_callback_threadsafe( + processOnConnectionThread)) + timer.start() rx_messages = [] consumer_tag = ch.basic_consume( From cb46ea85b1f44f76f55a58f68f2c11bec5e534e6 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 13:20:46 -0800 Subject: [PATCH 09/33] Implemented BlockingConnection acceptance test TestConsumeGeneratorWithAckFromAnotherThread --- tests/acceptance/blocking_adapter_test.py | 98 +++++++++++++++++++++-- 1 file changed, 92 insertions(+), 6 deletions(-) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index 2a8767072..dac9853a9 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1756,10 +1756,11 @@ def test(self): # pylint: disable=R0914,R0915 ch.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=False) # Create a consumer - def ackAndEnqueueMessageFromAnotherThread(rx_ch, - rx_method, - rx_properties, # pylint: disable=W0613 - rx_body): + rx_messages = [] + def ackAndEnqueueMessageViaAnotherThread(rx_ch, + rx_method, + rx_properties, # pylint: disable=W0613 + rx_body): LOGGER.debug( '%s: Got message body=%r; delivery-tag=%r', datetime.now(), rx_body, rx_method.delivery_tag) @@ -1789,10 +1790,9 @@ def processOnConnectionThread(): processOnConnectionThread)) timer.start() - rx_messages = [] consumer_tag = ch.basic_consume( q_name, - ackAndEnqueueMessageFromAnotherThread, + ackAndEnqueueMessageViaAnotherThread, auto_ack=False, exclusive=False, arguments=None) @@ -1811,6 +1811,92 @@ def processOnConnectionThread(): self.assertEqual(rx_messages[1], 'last-msg') +class TestConsumeGeneratorWithAckFromAnotherThread(BlockingTestCaseBase): + + def test(self): # pylint: disable=R0914,R0915 + """BlockingChannel.consume and requesting basic_ack from another \ + thread via add_callback_threadsafe + """ + connection = self._connect() + + ch = connection.channel() + + q_name = ('TestConsumeGeneratorWithAckFromAnotherThread_q' + + uuid.uuid1().hex) + exg_name = ('TestConsumeGeneratorWithAckFromAnotherThread_exg' + + uuid.uuid1().hex) + routing_key = 'TestConsumeGeneratorWithAckFromAnotherThread' + + # Place channel in publisher-acknowledgments mode so that publishing + # with mandatory=True will be synchronous (for convenience) + res = ch.confirm_delivery() + self.assertIsNone(res) + + # Declare a new exchange + ch.exchange_declare(exg_name, exchange_type='direct') + self.addCleanup(connection.channel().exchange_delete, exg_name) + + # Declare a new queue + ch.queue_declare(q_name, auto_delete=True) + self.addCleanup(self._connect().channel().queue_delete, q_name) + + # Bind the queue to the exchange using routing key + ch.queue_bind(q_name, exchange=exg_name, routing_key=routing_key) + + # Publish 2 messages with mandatory=True for synchronous processing + ch.publish(exg_name, routing_key, body='msg1', mandatory=True) + ch.publish(exg_name, routing_key, body='last-msg', mandatory=True) + + # Configure QoS for one message so that the 2nd message will be + # delivered only after the 1st one is ACKed + ch.basic_qos(prefetch_size=0, prefetch_count=1, all_channels=False) + + # Create a consumer + rx_messages = [] + def ackAndEnqueueMessageViaAnotherThread(rx_ch, + rx_method, + rx_properties, # pylint: disable=W0613 + rx_body): + LOGGER.debug( + '%s: Got message body=%r; delivery-tag=%r', + datetime.now(), rx_body, rx_method.delivery_tag) + + # Request ACK dispatch via add_callback_threadsafe from other + # thread; if last message, cancel consumer so that consumer + # generator completes + + def processOnConnectionThread(): + LOGGER.debug('%s: ACKing message body=%r; delivery-tag=%r', + datetime.now(), + rx_body, + rx_method.delivery_tag) + ch.basic_ack(delivery_tag=rx_method.delivery_tag, + multiple=False) + rx_messages.append(rx_body) + + if rx_body == 'last-msg': + LOGGER.debug('%s: Canceling consumer consumer-tag=%r', + datetime.now(), + rx_method.consumer_tag) + rx_ch.basic_cancel(rx_method.consumer_tag) + + # Spawn a thread to initiate ACKing + timer = threading.Timer(0, + lambda: connection.add_callback_threadsafe( + processOnConnectionThread)) + timer.start() + + for method, properties, body in ch.consume(q_name, auto_ack=False): + ackAndEnqueueMessageViaAnotherThread(rx_ch=ch, + rx_method=method, + rx_properties=properties, + rx_body=body) + + self.assertEqual(len(rx_messages), 2) + self.assertEqual(rx_messages[0], 'msg1') + self.assertEqual(rx_messages[1], 'last-msg') + + class TestTwoBasicConsumersOnSameChannel(BlockingTestCaseBase): def test(self): # pylint: disable=R0914 From 1e8f748c1411c487900451cc397fa46214bdee7b Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 13:58:01 -0800 Subject: [PATCH 10/33] Use BlockingChannel.cancel() instead of basic_cancel to cancel the generator-based consumer. --- pika/adapters/blocking_connection.py | 3 ++- tests/acceptance/blocking_adapter_test.py | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pika/adapters/blocking_connection.py b/pika/adapters/blocking_connection.py index a1ea17611..a639f886e 100644 --- a/pika/adapters/blocking_connection.py +++ b/pika/adapters/blocking_connection.py @@ -1844,7 +1844,8 @@ def consume(self, queue, auto_ack=False, """Blocking consumption of a queue instead of via a callback. This method is a generator that yields each message as a tuple of method, properties, and body. The active generator iterator terminates when the - consumer is cancelled by client or broker. + consumer is cancelled by client via `BlockingChannel.cancel()` or by + broker. Example: diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index dac9853a9..a3142f38d 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1878,7 +1878,9 @@ def processOnConnectionThread(): LOGGER.debug('%s: Canceling consumer consumer-tag=%r', datetime.now(), rx_method.consumer_tag) - rx_ch.basic_cancel(rx_method.consumer_tag) + # NOTE Need to use cancel() for the consumer generator + # instead of basic_cancel() + rx_ch.cancel(rx_method.consumer_tag) # Spawn a thread to initiate ACKing timer = threading.Timer(0, From cfe4a35fcfc044bfb5b9a3981496986bd8705182 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 14:03:44 -0800 Subject: [PATCH 11/33] Fixed call to BlockingChannel.cancel() from TestConsumeGeneratorWithAckFromAnotherThread --- tests/acceptance/blocking_adapter_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index a3142f38d..536e7b8fb 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1880,7 +1880,7 @@ def processOnConnectionThread(): rx_method.consumer_tag) # NOTE Need to use cancel() for the consumer generator # instead of basic_cancel() - rx_ch.cancel(rx_method.consumer_tag) + rx_ch.cancel() # Spawn a thread to initiate ACKing timer = threading.Timer(0, From 152d792c9c325f1a5eca0c29600cbe2226e85245 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 16:27:22 -0800 Subject: [PATCH 12/33] Implemented async adapter acceptance tests for add_callback_threadsafe. --- pika/adapters/asyncio_connection.py | 22 +++++++++++ pika/adapters/base_connection.py | 26 +++++++++++++ pika/adapters/blocking_connection.py | 15 +++---- pika/adapters/select_connection.py | 4 +- pika/adapters/tornado_connection.py | 26 +++++++++++++ pika/adapters/twisted_connection.py | 22 +++++++++++ tests/acceptance/async_adapter_tests.py | 52 +++++++++++++++++++++++++ 7 files changed, 156 insertions(+), 11 deletions(-) diff --git a/pika/adapters/asyncio_connection.py b/pika/adapters/asyncio_connection.py index dff7656f6..72d695334 100644 --- a/pika/adapters/asyncio_connection.py +++ b/pika/adapters/asyncio_connection.py @@ -41,6 +41,28 @@ def remove_timeout(handle): """ return handle.cancel() + def add_callback_threadsafe(self, callback): + """Requests a call to the given function as soon as possible in the + context of this IOLoop's thread. + + NOTE: This is the only thread-safe method offered by the IOLoop adapter. + All other manipulations of the IOLoop adapter and its parent connection + must be performed from the connection's thread. + + For example, a thread may request a call to the + `channel.basic_ack` method of a connection that is running in a + different thread via + + ``` + connection.add_callback_threadsafe( + functools.partial(channel.basic_ack, delivery_tag=...)) + ``` + + :param method callback: The callback method; must be callable. + + """ + self.loop.call_soon_threadsafe(callback) + def add_handler(self, fd, cb, event_state): """ Registers the given handler to receive the given events for ``fd``. diff --git a/pika/adapters/base_connection.py b/pika/adapters/base_connection.py index cecd3b730..4428150f9 100644 --- a/pika/adapters/base_connection.py +++ b/pika/adapters/base_connection.py @@ -135,6 +135,32 @@ def remove_timeout(self, timeout_id): """ self.ioloop.remove_timeout(timeout_id) + def add_callback_threadsafe(self, callback): + """Requests a call to the given function as soon as possible in the + context of this connection's IOLoop thread. + + NOTE: This is the only thread-safe method offered by the connection. All + other manipulations of the connection must be performed from the + connection's thread. + + For example, a thread may request a call to the + `channel.basic_ack` method of a connection that is running in a + different thread via + + ``` + connection.add_callback_threadsafe( + functools.partial(channel.basic_ack, delivery_tag=...)) + ``` + + :param method callback: The callback method; must be callable. + + """ + if not callable(callback): + raise TypeError( + 'callback must be a callable, but got %r' % (callback,)) + + self.ioloop.add_callback_threadsafe(callback) + def _adapter_connect(self): """Connect to the RabbitMQ broker, returning True if connected. diff --git a/pika/adapters/blocking_connection.py b/pika/adapters/blocking_connection.py index a639f886e..07dcb52da 100644 --- a/pika/adapters/blocking_connection.py +++ b/pika/adapters/blocking_connection.py @@ -650,8 +650,8 @@ def add_timeout(self, deadline, callback): return timer_id def add_callback_threadsafe(self, callback): - """Calls the given function on the next iteration of the connection's - IOLoop in the context of the IOLoop's thread. + """Requests a call to the given function as soon as possible in the + context of this connection's thread. NOTE: This is the only thread-safe method in `BlockingConnection`. All other manipulations of `BlockingConnection` must be performed from the @@ -666,14 +666,10 @@ def add_callback_threadsafe(self, callback): functools.partial(channel.basic_ack, delivery_tag=...)) ``` - :param method callback: The callback method + :param method callback: The callback method; must be callable """ - if not callable(callback): - raise TypeError( - 'callback must be a callable, but got %r' % (callback,)) - - self._impl.ioloop.add_callback_threadsafe( + self._impl.add_callback_threadsafe( functools.partial(self._on_threadsafe_callback, callback)) def remove_timeout(self, timeout_id): @@ -2440,7 +2436,8 @@ def queue_declare(self, queue, passive=False, durable=False, :param queue: The queue name :type queue: str or unicode; if empty string, the broker will create a unique queue name; - :param bool passive: Only check to see if the queue exists + :param bool passive: Only check to see if the queue exists and raise + `ChannelClosed` if it doesn't; :param bool durable: Survive reboots of the broker :param bool exclusive: Only allow access by the current connection :param bool auto_delete: Delete after consumer cancels or disconnects diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index 55df389a5..c94185b31 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -373,8 +373,8 @@ def remove_timeout(self, timeout_id): self._timer.remove_timeout(timeout_id) def add_callback_threadsafe(self, callback): - """Calls the given function on the next iteration of the IOLoop in the - context of the IOLoop's thread. + """Requests a call to the given function as soon as possible in the + context of this IOLoop's thread. NOTE: This is the only thread-safe method in IOLoop. All other manipulations of IOLoop must be performed from the IOLoop's thread. diff --git a/pika/adapters/tornado_connection.py b/pika/adapters/tornado_connection.py index 19607a31c..ad0ac8e20 100644 --- a/pika/adapters/tornado_connection.py +++ b/pika/adapters/tornado_connection.py @@ -94,3 +94,29 @@ def remove_timeout(self, timeout_id): """ return self.ioloop.remove_timeout(timeout_id) + + def add_callback_threadsafe(self, callback): + """Requests a call to the given function as soon as possible in the + context of this connection's IOLoop thread. + + NOTE: This is the only thread-safe method offered by the connection. All + other manipulations of the connection must be performed from the + connection's thread. + + For example, a thread may request a call to the + `channel.basic_ack` method of a connection that is running in a + different thread via + + ``` + connection.add_callback_threadsafe( + functools.partial(channel.basic_ack, delivery_tag=...)) + ``` + + :param method callback: The callback method; must be callable. + + """ + if not callable(callback): + raise TypeError( + 'callback must be a callable, but got %r' % (callback,)) + + self.ioloop.add_callback(callback) diff --git a/pika/adapters/twisted_connection.py b/pika/adapters/twisted_connection.py index 3ce458baf..6a1f8c595 100644 --- a/pika/adapters/twisted_connection.py +++ b/pika/adapters/twisted_connection.py @@ -225,6 +225,28 @@ def remove_timeout(self, call): """ call.cancel() + def add_callback_threadsafe(self, callback): + """Requests a call to the given function as soon as possible in the + context of this IOLoop's thread. + + NOTE: This is the only thread-safe method offered by the IOLoop adapter. + All other manipulations of the IOLoop adapter and its parent connection + must be performed from the connection's thread. + + For example, a thread may request a call to the + `channel.basic_ack` method of a connection that is running in a + different thread via + + ``` + connection.add_callback_threadsafe( + functools.partial(channel.basic_ack, delivery_tag=...)) + ``` + + :param method callback: The callback method; must be callable. + + """ + self.reactor.callFromThread(callback) + def stop(self): # Guard against stopping the reactor multiple times if not self.started: diff --git a/tests/acceptance/async_adapter_tests.py b/tests/acceptance/async_adapter_tests.py index 39fc169c6..53a45ae44 100644 --- a/tests/acceptance/async_adapter_tests.py +++ b/tests/acceptance/async_adapter_tests.py @@ -10,6 +10,7 @@ # Suppress pylint warning about unused argument # pylint: disable=W0613 +import threading import time import uuid @@ -36,6 +37,7 @@ def begin(self, channel): channel.confirm_delivery(ack_nack_callback=self.ack_nack_callback, callback=self.on_complete) + @staticmethod def ack_nack_callback(frame): pass @@ -463,6 +465,56 @@ def on_closed(self, connection, reply_code, reply_text): reply_text) +class TestAddCallbackThreadsafeFromIOLoopThread(AsyncTestCase, AsyncAdapters): + DESCRIPTION = "Test add_callback_threadsafe request from same thread." + + def start(self, *args, **kwargs): + self.loop_thread_ident = threading.current_thread().ident + self.my_start_time = None + self.got_callback = False + super(TestAddCallbackThreadsafeFromIOLoopThread, self).start(*args, **kwargs) + self.assertTrue(self.got_callback) + + def begin(self, channel): + self.my_start_time = time.time() + # Request a callback from our current (ioloop's) thread + channel.connection.add_callback_threadsafe(self.on_requested_callback) + + def on_requested_callback(self): + self.assertEqual(threading.current_thread().ident, + self.loop_thread_ident) + self.assertLess(time.time() - self.my_start_time, 0.25) + self.got_callback = True + self.stop() + + +class TestAddCallbackThreadsafeFromAnotherThread(AsyncTestCase, AsyncAdapters): + DESCRIPTION = "Test add_callback_threadsafe request from another thread." + + def start(self, *args, **kwargs): + self.loop_thread_ident = threading.current_thread().ident + self.my_start_time = None + self.got_callback = False + super(TestAddCallbackThreadsafeFromAnotherThread, self).start(*args, **kwargs) + self.assertTrue(self.got_callback) + + def begin(self, channel): + self.my_start_time = time.time() + # Request a callback from ioloop while executing in another thread + timer = threading.Timer( + 0, + lambda: channel.connection.add_callback_threadsafe( + self.on_requested_callback)) + timer.start() + + def on_requested_callback(self): + self.assertEqual(threading.current_thread().ident, + self.loop_thread_ident) + self.assertLess(time.time() - self.my_start_time, 0.25) + self.got_callback = True + self.stop() + + class TestViabilityOfMultipleTimeoutsWithSameDeadlineAndCallback(AsyncTestCase, AsyncAdapters): # pylint: disable=C0103 DESCRIPTION = "Test viability of multiple timeouts with same deadline and callback" From 35cdf73a689139c66ca193cf534d9b5e4341fc62 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 16:31:06 -0800 Subject: [PATCH 13/33] Release ioloop resources after class AsyncTestCase returns from ioloop.start. --- tests/acceptance/async_test_base.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index ac017f954..e0652cb73 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -63,6 +63,9 @@ def start(self, adapter=None): self.connection.ioloop.start() self.assertFalse(self._timed_out) + # Release ioloop resources + self.connection.ioloop.close() + def stop(self): """close the connection and stop the ioloop""" self.logger.info("Stopping test") From 61a6fc6496854bf374adb009b8478859418dd1e5 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 16:55:07 -0800 Subject: [PATCH 14/33] Fixed broken comparison of string with message body on python3 in the new "ack from other thread" BlockingConnection tests. --- tests/acceptance/blocking_adapter_test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index 536e7b8fb..29d8ad493 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1778,7 +1778,8 @@ def processOnConnectionThread(): multiple=False) rx_messages.append(rx_body) - if rx_body == 'last-msg': + # NOTE on python3, `b'last-msg' != 'last-msg'` + if rx_body == b'last-msg': LOGGER.debug('%s: Canceling consumer consumer-tag=%r', datetime.now(), rx_method.consumer_tag) @@ -1874,7 +1875,8 @@ def processOnConnectionThread(): multiple=False) rx_messages.append(rx_body) - if rx_body == 'last-msg': + # NOTE on python3, `b'last-msg' != 'last-msg'` + if rx_body == b'last-msg': LOGGER.debug('%s: Canceling consumer consumer-tag=%r', datetime.now(), rx_method.consumer_tag) From c88ea7cb9d1ef4fac39c4478b3675b2313d413bc Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 17:01:23 -0800 Subject: [PATCH 15/33] Close the connection's ioloop before setting self.connection to None in AsyncTestCase. --- tests/acceptance/async_test_base.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index e0652cb73..2d797740c 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -63,9 +63,6 @@ def start(self, adapter=None): self.connection.ioloop.start() self.assertFalse(self._timed_out) - # Release ioloop resources - self.connection.ioloop.close() - def stop(self): """close the connection and stop the ioloop""" self.logger.info("Stopping test") @@ -79,9 +76,10 @@ def _stop(self): self.logger.info("Removing timeout") self.connection.remove_timeout(self.timeout) self.timeout = None - if hasattr(self, 'connection') and self.connection: + if hasattr(self, 'connection') and self.connection is not None: self.logger.info("Stopping ioloop") self.connection.ioloop.stop() + self.connection.ioloop.close() self.connection = None def on_closed(self, connection, reply_code, reply_text): From eef5331c76500f12faed56427857ab9bd914da6b Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 17:11:10 -0800 Subject: [PATCH 16/33] Close async adapter's ioloop from finally after start exits in AsyncTestCase. --- tests/acceptance/async_test_base.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index 2d797740c..c297192f5 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -58,10 +58,15 @@ def start(self, adapter=None): self.connection = self.adapter(self.parameters, self.on_open, self.on_open_error, self.on_closed) - self.timeout = self.connection.add_timeout(self.TIMEOUT, - self.on_timeout) - self.connection.ioloop.start() - self.assertFalse(self._timed_out) + try: + self.timeout = self.connection.add_timeout(self.TIMEOUT, + self.on_timeout) + self.connection.ioloop.start() + self.assertFalse(self._timed_out) + finally: + self.connection.ioloop.close() + self.connection = None + def stop(self): """close the connection and stop the ioloop""" @@ -79,8 +84,6 @@ def _stop(self): if hasattr(self, 'connection') and self.connection is not None: self.logger.info("Stopping ioloop") self.connection.ioloop.stop() - self.connection.ioloop.close() - self.connection = None def on_closed(self, connection, reply_code, reply_text): """called when the connection has finished closing""" From 0ea5d5ea00b1bdd9ccc21de18b237479fdd2dd83 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 17:31:15 -0800 Subject: [PATCH 17/33] Add close method in asyncio_connection's IOLoopAdapter --- pika/adapters/asyncio_connection.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pika/adapters/asyncio_connection.py b/pika/adapters/asyncio_connection.py index 72d695334..0f5350982 100644 --- a/pika/adapters/asyncio_connection.py +++ b/pika/adapters/asyncio_connection.py @@ -20,6 +20,9 @@ def __init__(self, loop): self.readers = set() self.writers = set() + def close(self): + self.loop.close() + def add_timeout(self, deadline, callback): """Add the callback to the EventLoop timer to fire after deadline seconds. Returns a Handle to the timeout. From 856b4af2985e186be9c5d19b9237b727013ee2bf Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 18:37:18 -0800 Subject: [PATCH 18/33] Provide async acceptance tests with a private IOLoop for each adapter so that these IOLoops may be closed to release resources without risking closing of singletons which would break subsequent tests (since some adapters, such as torndado and asyncio make use of the singletons) --- tests/acceptance/async_test_base.py | 35 ++++++++++++++++++----------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index c297192f5..4ad7c8887 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -52,12 +52,13 @@ def begin(self, channel): # pylint: disable=R0201,W0613 """Extend to start the actual tests on the channel""" self.fail("AsyncTestCase.begin_test not extended") - def start(self, adapter=None): + def start(self, adapter, ioloop_factory): self.logger.info('start at %s', datetime.utcnow()) self.adapter = adapter or self.ADAPTER self.connection = self.adapter(self.parameters, self.on_open, - self.on_open_error, self.on_closed) + self.on_open_error, self.on_closed, + custom_ioloop=ioloop_factory()) try: self.timeout = self.connection.add_timeout(self.TIMEOUT, self.on_timeout) @@ -112,12 +113,12 @@ def on_timeout(self): class BoundQueueTestCase(AsyncTestCase): - def start(self, adapter=None): + def start(self, adapter, ioloop_factory): # PY3 compat encoding self.exchange = 'e-' + self.__class__.__name__ + ':' + uuid.uuid1().hex self.queue = 'q-' + self.__class__.__name__ + ':' + uuid.uuid1().hex self.routing_key = self.__class__.__name__ - super(BoundQueueTestCase, self).start(adapter) + super(BoundQueueTestCase, self).start(adapter, ioloop_factory) def begin(self, channel): self.channel.exchange_declare(self.exchange, @@ -154,20 +155,19 @@ def on_ready(self, frame): class AsyncAdapters(object): - def start(self, adapter_class): + def start(self, adapter_class, ioloop_factory): raise NotImplementedError def select_default_test(self): """SelectConnection:DefaultPoller""" - with mock.patch.multiple(select_connection, SELECT_TYPE=None): - self.start(adapters.SelectConnection) + self.start(adapters.SelectConnection, select_connection.IOLoop) def select_select_test(self): """SelectConnection:select""" with mock.patch.multiple(select_connection, SELECT_TYPE='select'): - self.start(adapters.SelectConnection) + self.start(adapters.SelectConnection, select_connection.IOLoop) @unittest.skipIf( not hasattr(select, 'poll') or @@ -176,27 +176,36 @@ def select_poll_test(self): """SelectConnection:poll""" with mock.patch.multiple(select_connection, SELECT_TYPE='poll'): - self.start(adapters.SelectConnection) + self.start(adapters.SelectConnection, select_connection.IOLoop) @unittest.skipIf(not hasattr(select, 'epoll'), "epoll not supported") def select_epoll_test(self): """SelectConnection:epoll""" with mock.patch.multiple(select_connection, SELECT_TYPE='epoll'): - self.start(adapters.SelectConnection) + self.start(adapters.SelectConnection, select_connection.IOLoop) @unittest.skipIf(not hasattr(select, 'kqueue'), "kqueue not supported") def select_kqueue_test(self): """SelectConnection:kqueue""" with mock.patch.multiple(select_connection, SELECT_TYPE='kqueue'): - self.start(adapters.SelectConnection) + self.start(adapters.SelectConnection, select_connection.IOLoop) def tornado_test(self): """TornadoConnection""" - self.start(adapters.TornadoConnection) + ioloop_factory = None + if adapters.TornadoConnection is not None: + import tornado.ioloop + ioloop_factory = tornado.ioloop.IOLoop + self.start(adapters.TornadoConnection, ioloop_factory) @unittest.skipIf(sys.version_info < (3, 4), "Asyncio available for Python 3.4+") def asyncio_test(self): """AsyncioConnection""" - self.start(adapters.AsyncioConnection) + ioloop_factory = None + if adapters.AsyncioConnection is not None: + import asyncio + ioloop_factory = asyncio.new_event_loop + + self.start(adapters.AsyncioConnection, ioloop_factory) From e0d862a34866524b9b08b9eeb32048d5e020ac0a Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 19:01:54 -0800 Subject: [PATCH 19/33] Compare message bodies to byte literals for compatibility with python3 --- tests/acceptance/blocking_adapter_test.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index 29d8ad493..d539f94a3 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -1808,8 +1808,8 @@ def processOnConnectionThread(): consumer_tag) self.assertEqual(len(rx_messages), 2) - self.assertEqual(rx_messages[0], 'msg1') - self.assertEqual(rx_messages[1], 'last-msg') + self.assertEqual(rx_messages[0], b'msg1') + self.assertEqual(rx_messages[1], b'last-msg') class TestConsumeGeneratorWithAckFromAnotherThread(BlockingTestCaseBase): @@ -1897,8 +1897,8 @@ def processOnConnectionThread(): rx_body=body) self.assertEqual(len(rx_messages), 2) - self.assertEqual(rx_messages[0], 'msg1') - self.assertEqual(rx_messages[1], 'last-msg') + self.assertEqual(rx_messages[0], b'msg1') + self.assertEqual(rx_messages[1], b'last-msg') class TestTwoBasicConsumersOnSameChannel(BlockingTestCaseBase): From 3af0004197645287b049bb3ef99f7955705de1a3 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 20:07:37 -0800 Subject: [PATCH 20/33] Close ioloops in tests to prevent resource leaks --- tests/acceptance/async_test_base.py | 12 ++++++++++++ tests/unit/connection_timeout_tests.py | 14 ++++++++++++-- tests/unit/select_connection_ioloop_tests.py | 1 + 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index 4ad7c8887..a0a28bc26 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -156,6 +156,18 @@ def on_ready(self, frame): class AsyncAdapters(object): def start(self, adapter_class, ioloop_factory): + """ + + :param adapter_class: pika connection adapter class to test. + :param ioloop_factory: to be called without args to instantiate a + non-shared ioloop to be passed as the `custom_ioloop` arg to the + `adapter_class` constructor. This is needed because some of the + adapters default to using a singleton ioloop, which results in + tests errors after prior tests close the ioloop to release resources, + in order to eliminate ResourceWarning warnings concerning unclosed + sockets from our adapters. + :return: + """ raise NotImplementedError def select_default_test(self): diff --git a/tests/unit/connection_timeout_tests.py b/tests/unit/connection_timeout_tests.py index b44161bfb..3118faba0 100644 --- a/tests/unit/connection_timeout_tests.py +++ b/tests/unit/connection_timeout_tests.py @@ -49,8 +49,13 @@ def test_asyncio_connection_timeout(self): connect=mock.Mock(side_effect=mock_timeout)) ) as create_sock_mock: params = pika.ConnectionParameters(socket_timeout=2.0) - conn = asyncio_connection.AsyncioConnection(params) + ioloop = asyncio_connection.asyncio.new_event_loop() + self.addCleanup(ioloop.close) + conn = asyncio_connection.AsyncioConnection( + params, + custom_ioloop=ioloop) conn._on_connect_timer() + create_sock_mock.return_value.settimeout.assert_called_with(2.0) self.assertIn('timeout', str(err_ctx.exception)) @@ -99,6 +104,7 @@ def test_select_connection_timeout(self): side_effect=mock_timeout))) as create_sock_mock: params = pika.ConnectionParameters(socket_timeout=2.0) conn = select_connection.SelectConnection(params) + self.addCleanup(conn.ioloop.close) conn._on_connect_timer() create_sock_mock.return_value.settimeout.assert_called_with(2.0) self.assertIn('timeout', str(err_ctx.exception)) @@ -113,7 +119,11 @@ def test_tornado_connection_timeout(self): connect=mock.Mock( side_effect=mock_timeout))) as create_sock_mock: params = pika.ConnectionParameters(socket_timeout=2.0) - conn = tornado_connection.TornadoConnection(params) + ioloop = tornado_connection.ioloop.IOLoop() + self.addCleanup(ioloop.close) + conn = tornado_connection.TornadoConnection( + params, + custom_ioloop=ioloop) conn._on_connect_timer() create_sock_mock.return_value.settimeout.assert_called_with(2.0) self.assertIn('timeout', str(err_ctx.exception)) diff --git a/tests/unit/select_connection_ioloop_tests.py b/tests/unit/select_connection_ioloop_tests.py index 62bf3f10c..272e98c74 100644 --- a/tests/unit/select_connection_ioloop_tests.py +++ b/tests/unit/select_connection_ioloop_tests.py @@ -36,6 +36,7 @@ def setUp(self): self.ioloop = select_connection.IOLoop() self.addCleanup(setattr, self, 'ioloop', None) + self.addCleanup(self.ioloop.close) activate_poller_patch = mock.patch.object( self.ioloop._poller, From b781769e2f6cca25d1ff0b3773326ae7ecf62086 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 20:59:47 -0800 Subject: [PATCH 21/33] Close select_connection pollers in a couple more tests to eliminate corresponding RresourceWarning messages when running Python3 tests --- tests/unit/select_connection_ioloop_tests.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/unit/select_connection_ioloop_tests.py b/tests/unit/select_connection_ioloop_tests.py index 272e98c74..e41d51bf2 100644 --- a/tests/unit/select_connection_ioloop_tests.py +++ b/tests/unit/select_connection_ioloop_tests.py @@ -439,6 +439,7 @@ def test_eintr( timer = select_connection._Timer() self.poller = self.ioloop._get_poller(timer.get_remaining_interval, timer.process_timeouts) + self.addCleanup(self.poller.close) sockpair = self.poller._get_interrupt_pair() self.addCleanup(sockpair[0].close) @@ -494,6 +495,7 @@ def start_test(self): poller = select_connection.SelectPoller( get_wait_seconds=timer.get_remaining_interval, process_timeouts=timer.process_timeouts) + self.addCleanup(poller.close) timer_call_container = [] timer.call_later(0.00001, lambda: timer_call_container.append(1)) From a4fa188504dd6bff320bf6a2883c425b5100a1d4 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 22:47:09 -0800 Subject: [PATCH 22/33] Implemented unit tests for select_connection's _Timer.close(), IOLoop.close(), poller close methods. --- pika/adapters/select_connection.py | 24 +++++--- tests/unit/select_connection_ioloop_tests.py | 60 ++++++++++++++++++++ tests/unit/select_connection_timer_tests.py | 14 +++++ 3 files changed, 89 insertions(+), 9 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index c94185b31..737303f1d 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -166,7 +166,11 @@ def close(self): """Release resources. Don't use the `_Timer` instance after closing it """ - self._timeout_heap = None + # Eliminate potential reference cycles to aid garbage-collection + if self._timeout_heap is not None: + for timeout in self._timeout_heap: + timeout.callback = None + self._timeout_heap = None def call_later(self, delay, callback): """Schedule a one-shot timeout given delay seconds. @@ -307,9 +311,10 @@ def close(self): interaction with the closed instance of `IOLoop` should be performed. """ - self._poller.close() - self._timer.close() - self._callbacks = None + if self._callbacks is not None: + self._poller.close() + self._timer.close() + self._callbacks = None @staticmethod def _get_poller(get_wait_seconds, process_timeouts): @@ -553,11 +558,12 @@ def close(self): # necessary to avoid race condition with `wake_threadsafe` running in # another thread's context with self._waking_mutex: - self.remove_handler(self._r_interrupt.fileno()) # pylint: disable=E1101 - self._r_interrupt.close() - self._r_interrupt = None - self._w_interrupt.close() - self._w_interrupt = None + if self._w_interrupt is not None: + self.remove_handler(self._r_interrupt.fileno()) # pylint: disable=E1101 + self._r_interrupt.close() + self._r_interrupt = None + self._w_interrupt.close() + self._w_interrupt = None self.deactivate_poller() diff --git a/tests/unit/select_connection_ioloop_tests.py b/tests/unit/select_connection_ioloop_tests.py index e41d51bf2..4595de22d 100644 --- a/tests/unit/select_connection_ioloop_tests.py +++ b/tests/unit/select_connection_ioloop_tests.py @@ -74,6 +74,23 @@ def on_timeout(self): self.fail('Test timed out') +class IOLoopCloseTestSelect(IOLoopBaseTest): + """ Test ioloop being stopped by another Thread. """ + SELECT_POLLER = 'select' + + def start_test(self): + with mock.patch.multiple(self.ioloop, + _timer=mock.DEFAULT, + _poller=mock.DEFAULT, + _callbacks=mock.DEFAULT) as mocks: + self.ioloop.close() + mocks['_timer'].close.assert_called_once() + mocks['_poller'].close.assert_called_once() + self.assertIsNone(self.ioloop._callbacks) + + self.ioloop.stop() + + class IOLoopThreadStopTestSelect(IOLoopBaseTest): """ Test ioloop being stopped by another Thread. """ SELECT_POLLER = 'select' @@ -517,3 +534,46 @@ def start_test(self): break self.assertEqual(timer_call_container, [1]) + + +class PollerTestCaseSelect(unittest.TestCase): + SELECT_POLLER = 'select' + + def setUp(self): + select_type_patch = mock.patch.multiple( + select_connection, SELECT_TYPE=self.SELECT_POLLER) + select_type_patch.start() + self.addCleanup(select_type_patch.stop) + + timer = select_connection._Timer() + self.addCleanup(timer.close) + self.poller = select_connection.IOLoop._get_poller( + timer.get_remaining_interval, + timer.process_timeouts) + self.addCleanup(self.poller.close) + + def test_poller_close(self): + self.poller.close() + self.assertIsNone(self.poller._r_interrupt) + self.assertIsNone(self.poller._w_interrupt) + self.assertIsNone(self.poller._fd_handlers) + self.assertIsNone(self.poller._fd_events) + self.assertIsNone(self.poller._processing_fd_event_map) + + +@unittest.skipIf(not POLL_SUPPORTED, 'poll not supported') +class PollerTestCasePoll(PollerTestCaseSelect): + """Same as PollerTestCaseSelect but uses poll syscall""" + SELECT_POLLER = 'poll' + + +@unittest.skipIf(not EPOLL_SUPPORTED, 'epoll not supported') +class PollerTestCaseEPoll(PollerTestCaseSelect): + """Same as PollerTestCaseSelect but uses epoll syscall""" + SELECT_POLLER = 'epoll' + + +@unittest.skipIf(not KQUEUE_SUPPORTED, 'kqueue not supported') +class PollerTestCaseKqueue(PollerTestCaseSelect): + """Same as PollerTestCaseSelect but uses kqueue syscall""" + SELECT_POLLER = 'kqueue' \ No newline at end of file diff --git a/tests/unit/select_connection_timer_tests.py b/tests/unit/select_connection_timer_tests.py index 1017e7ec3..72e3db596 100644 --- a/tests/unit/select_connection_timer_tests.py +++ b/tests/unit/select_connection_timer_tests.py @@ -91,6 +91,20 @@ def test_le_operator(self): class TimerClassTests(unittest.TestCase): """Test select_connection._Timer class""" + def test_close_empty(self): + timer = select_connection._Timer() + timer.close() + self.assertIsNone(timer._timeout_heap) + + def test_close_non_empty(self): + timer = select_connection._Timer() + t1 = timer.call_later(10, lambda: 10) + t2 = timer.call_later(20, lambda: 20) + timer.close() + self.assertIsNone(timer._timeout_heap) + self.assertIsNone(t1.callback) + self.assertIsNone(t2.callback) + def test_no_timeouts_remaining_interval_is_none(self): timer = select_connection._Timer() self.assertIsNone(timer.get_remaining_interval()) From 33fad748fae1b64354f29b2c1b678e575d2e712b Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Thu, 15 Feb 2018 23:01:45 -0800 Subject: [PATCH 23/33] Remove comment promising thread-safety from select_connection's IOLoop.stop and _PollerBase.stop. Technically, IOLoop.add_callback_threadsafe should be the only thread-safe method that can be used to delegate other non-thread-safe methods similarly to other asynchronous frameworks (twisted, tornado, etc.) --- pika/adapters/select_connection.py | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index 737303f1d..d29eeeafe 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -303,9 +303,6 @@ def __init__(self): def close(self): """Release IOLoop's resources. - TODO Need to call this from IOLoop and SelectConnection tests as well as - TODO from BlockingConnection to avoid socket resource leak messages - `IOLoop.close` is intended to be called by the application or test code only after `IOLoop.start()` returns. After calling `close()`, no other interaction with the closed instance of `IOLoop` should be performed. @@ -405,7 +402,7 @@ def add_callback_threadsafe(self, callback): def process_timeouts(self): """[Extension] Process pending callbacks and timeouts, invoking those - whose time has come. Internal use only. + whose time has come. Internal use only. """ # Avoid I/O starvation by postponing new callbacks to the next iteration @@ -465,12 +462,8 @@ def start(self): def stop(self): """[API] Request exit from the ioloop. The loop is NOT guaranteed to - stop before this method returns. This is the only method that may be - called from another thread. + stop before this method returns. - TODO This shouldn't have been made thread-safe. Instead, `add_callback` - TODO should be used to safely call `stop` from the IOLoop's own thread. - TODO Update documentation/implementation """ self._poller.stop() @@ -546,9 +539,6 @@ def __init__(self, get_wait_seconds, process_timeouts): def close(self): """Release poller's resources. - TODO Need to call this from Poller tests to avoid socket resource - TODO leak messages - `close()` is intended to be called after the poller's `start()` method returns. After calling `close()`, no other interaction with the closed poller instance should be performed. @@ -738,8 +728,7 @@ def start(self): def stop(self): """Request exit from the ioloop. The loop is NOT guaranteed to stop - before this method returns. This is the only method that may be called - from another thread. + before this method returns. """ LOGGER.debug('Stopping IOLoop') From 02bac9c8d58a2f89e7fd86c498f2f436c8587e1b Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Fri, 16 Feb 2018 13:22:36 -0800 Subject: [PATCH 24/33] Emit warning from select_connection's IOLoop.stop() method when called from a different thread than IOLoop is running in. Add an example of calling it via `add_callback_threadsafe()`. Use `add_callback_threadsafe()` to invoke ioloop.stop() from another thread in unit/select_connection_ioloop_tests.py --- pika/adapters/select_connection.py | 10 ++++++++++ tests/unit/select_connection_ioloop_tests.py | 5 ++++- 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index d29eeeafe..57c06d5d3 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -464,7 +464,17 @@ def stop(self): """[API] Request exit from the ioloop. The loop is NOT guaranteed to stop before this method returns. + To invoke `stop()` safely from a thread other than this IOLoop's thread, + call it via `add_callback_threadsafe`; e.g., + + `ioloop.add_callback_threadsafe(ioloop.stop)` + """ + if (self._thread_id is not None and + threading.current_thread().ident != self._thread_id): + LOGGER.warning('Use add_callback_threadsafe to request ' + 'ioloop.stop() from another thread') + self._poller.stop() def activate_poller(self): diff --git a/tests/unit/select_connection_ioloop_tests.py b/tests/unit/select_connection_ioloop_tests.py index 4595de22d..50eeb32cf 100644 --- a/tests/unit/select_connection_ioloop_tests.py +++ b/tests/unit/select_connection_ioloop_tests.py @@ -19,6 +19,7 @@ from pika import compat from pika.adapters import select_connection + EPOLL_SUPPORTED = hasattr(select, 'epoll') POLL_SUPPORTED = hasattr(select, 'poll') and hasattr(select.poll(), 'modify') KQUEUE_SUPPORTED = hasattr(select, 'kqueue') @@ -97,7 +98,9 @@ class IOLoopThreadStopTestSelect(IOLoopBaseTest): def start_test(self): """Starts a thread that stops ioloop after a while and start polling""" - timer = threading.Timer(0.1, self.ioloop.stop) + timer = threading.Timer( + 0.1, + lambda: self.ioloop.add_callback_threadsafe(self.ioloop.stop)) self.addCleanup(timer.cancel) timer.start() self.start() From 2c2b2e6efd45ac26ccc00bf0bb2874efe4cabbce Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Fri, 16 Feb 2018 13:36:21 -0800 Subject: [PATCH 25/33] add unittest cleanup callbacks to cancel threading.Timer instances in newly-added tests. --- tests/acceptance/async_adapter_tests.py | 1 + tests/acceptance/blocking_adapter_test.py | 3 +++ tests/unit/select_connection_ioloop_tests.py | 5 +++++ 3 files changed, 9 insertions(+) diff --git a/tests/acceptance/async_adapter_tests.py b/tests/acceptance/async_adapter_tests.py index 826f7a82d..db30b4f99 100644 --- a/tests/acceptance/async_adapter_tests.py +++ b/tests/acceptance/async_adapter_tests.py @@ -535,6 +535,7 @@ def begin(self, channel): 0, lambda: channel.connection.add_callback_threadsafe( self.on_requested_callback)) + self.addCleanup(timer.cancel) timer.start() def on_requested_callback(self): diff --git a/tests/acceptance/blocking_adapter_test.py b/tests/acceptance/blocking_adapter_test.py index d539f94a3..d9db8047a 100644 --- a/tests/acceptance/blocking_adapter_test.py +++ b/tests/acceptance/blocking_adapter_test.py @@ -483,6 +483,7 @@ def test(self): 0, functools.partial(connection.add_callback_threadsafe, lambda: rx_callback.append(time.time()))) + self.addCleanup(timer.cancel) timer.start() while not rx_callback: connection.process_data_events(time_limit=None) @@ -1789,6 +1790,7 @@ def processOnConnectionThread(): timer = threading.Timer(0, lambda: connection.add_callback_threadsafe( processOnConnectionThread)) + self.addCleanup(timer.cancel) timer.start() consumer_tag = ch.basic_consume( @@ -1888,6 +1890,7 @@ def processOnConnectionThread(): timer = threading.Timer(0, lambda: connection.add_callback_threadsafe( processOnConnectionThread)) + self.addCleanup(timer.cancel) timer.start() for method, properties, body in ch.consume(q_name, auto_ack=False): diff --git a/tests/unit/select_connection_ioloop_tests.py b/tests/unit/select_connection_ioloop_tests.py index 50eeb32cf..bfe555d78 100644 --- a/tests/unit/select_connection_ioloop_tests.py +++ b/tests/unit/select_connection_ioloop_tests.py @@ -19,6 +19,11 @@ from pika import compat from pika.adapters import select_connection +# protected-access +# pylint: disable=W0212 +# missing-docstring +# pylint: disable=C0111 + EPOLL_SUPPORTED = hasattr(select, 'epoll') POLL_SUPPORTED = hasattr(select, 'poll') and hasattr(select.poll(), 'modify') From 3ca1365eafa63712e6d0dca7f5d6aa134f05d580 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Sat, 17 Feb 2018 02:43:41 -0800 Subject: [PATCH 26/33] Fixed check in test_tornado_connection_call_parent. --- tests/unit/tornado_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/tornado_tests.py b/tests/unit/tornado_tests.py index de78c6062..7c363469f 100644 --- a/tests/unit/tornado_tests.py +++ b/tests/unit/tornado_tests.py @@ -15,4 +15,4 @@ def test_tornado_connection_call_parent(self, mock_init): obj = tornado_connection.TornadoConnection() mock_init.assert_called_once_with( None, None, None, None, - tornado_connection.ioloop.IOLoop.instance(), False) + tornado_connection.ioloop.IOLoop.instance()) From a255302a13a5123b185a7605e38efd0f765a4c41 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Sat, 17 Feb 2018 02:47:21 -0800 Subject: [PATCH 27/33] Added mention of `add_callback_threadsafe()` in the FAQ. --- docs/faq.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/faq.rst b/docs/faq.rst index 132b4955d..f70ef5528 100644 --- a/docs/faq.rst +++ b/docs/faq.rst @@ -3,7 +3,7 @@ Frequently Asked Questions - Is Pika thread safe? - Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads. + Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads, with one exception: you may call the connection method `add_callback_threadsafe` from another thread to schedule a callback within an active pika connection. - How do I report a bug with Pika? From f6d54387a620020e3766ca9f2528b5b4af41e9c1 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Sat, 17 Feb 2018 02:52:10 -0800 Subject: [PATCH 28/33] Revert "Fixed check in test_tornado_connection_call_parent." accidentally committed to wrong branch This reverts commit 3ca1365eafa63712e6d0dca7f5d6aa134f05d580. --- tests/unit/tornado_tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/unit/tornado_tests.py b/tests/unit/tornado_tests.py index 7c363469f..de78c6062 100644 --- a/tests/unit/tornado_tests.py +++ b/tests/unit/tornado_tests.py @@ -15,4 +15,4 @@ def test_tornado_connection_call_parent(self, mock_init): obj = tornado_connection.TornadoConnection() mock_init.assert_called_once_with( None, None, None, None, - tornado_connection.ioloop.IOLoop.instance()) + tornado_connection.ioloop.IOLoop.instance(), False) From 13aa231f28ccaa3371042c6c8710daed47d4f228 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 21 Feb 2018 15:41:55 -0800 Subject: [PATCH 29/33] In select_connection.py, raise AssertionError if loop is being closed before the call(s) to start() unwind. Implemented select_connection IOLoop tests: IOLoopCloseAfterStartReturnsTestSelect IOLoopCloseBeforeStartReturnsTestSelect IOLoopThreadStopTestSelect Implemented additaionl tests in acceptance/async_adapter_tests.py: TestAddCallbackThreadsafeRequestBeforeIOLoopStarts TestIOLoopStopBeforeIOLoopStarts --- pika/adapters/asyncio_connection.py | 8 +++ pika/adapters/select_connection.py | 15 +++-- tests/acceptance/async_adapter_tests.py | 62 ++++++++++++++++++++ tests/acceptance/async_test_base.py | 18 ++++-- tests/unit/select_connection_ioloop_tests.py | 54 +++++++++++++++-- 5 files changed, 141 insertions(+), 16 deletions(-) diff --git a/pika/adapters/asyncio_connection.py b/pika/adapters/asyncio_connection.py index 0f5350982..32f46485d 100644 --- a/pika/adapters/asyncio_connection.py +++ b/pika/adapters/asyncio_connection.py @@ -21,6 +21,14 @@ def __init__(self, loop): self.writers = set() def close(self): + """Release ioloop's resources. + + This method is intended to be called by the application or test code + only after the ioloop's outermost `start()` call returns. After calling + `close()`, no other interaction with the closed instance of ioloop + should be performed. + + """ self.loop.close() def add_timeout(self, deadline, callback): diff --git a/pika/adapters/select_connection.py b/pika/adapters/select_connection.py index 57c06d5d3..bcc16ded3 100644 --- a/pika/adapters/select_connection.py +++ b/pika/adapters/select_connection.py @@ -555,8 +555,11 @@ def close(self): """ # Unregister and close ioloop-interrupt socket pair; mutual exclusion is - # necessary to avoid race condition with `wake_threadsafe` running in + # necessary to avoid race condition with `wake_threadsafe` executing in # another thread's context + assert self._start_nesting_levels == 0, \ + 'Cannot call close() before start() unwinds.' + with self._waking_mutex: if self._w_interrupt is not None: self.remove_handler(self._r_interrupt.fileno()) # pylint: disable=E1101 @@ -709,7 +712,6 @@ def start(self): if self._start_nesting_levels == 1: LOGGER.debug('Entering IOLoop') - self._stopping = False # Activate the underlying poller and register current events self.activate_poller() @@ -728,10 +730,13 @@ def start(self): self._start_nesting_levels -= 1 if self._start_nesting_levels == 0: - LOGGER.debug('Deactivating poller') + try: + LOGGER.debug('Deactivating poller') - # Deactivate the underlying poller - self.deactivate_poller() + # Deactivate the underlying poller + self.deactivate_poller() + finally: + self._stopping = False else: LOGGER.debug('Leaving IOLoop with %s nesting levels remaining', self._start_nesting_levels) diff --git a/tests/acceptance/async_adapter_tests.py b/tests/acceptance/async_adapter_tests.py index db30b4f99..988eafe36 100644 --- a/tests/acceptance/async_adapter_tests.py +++ b/tests/acceptance/async_adapter_tests.py @@ -495,6 +495,41 @@ def on_closed(self, connection, reply_code, reply_text): reply_text) +class TestAddCallbackThreadsafeRequestBeforeIOLoopStarts(AsyncTestCase, AsyncAdapters): + DESCRIPTION = "Test add_callback_threadsafe request before ioloop starts." + + def _instantiate_connection(self, *args, **kwargs): # pylint: disable=W0221 + """We intercept this method from AsyncTestCase in order to call + add_callback_threadsafe before AsyncTestCase starts the ioloop. + """ + connection = super( + TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, self).start( + *args, **kwargs) + + self.my_start_time = time.time() + # Request a callback from our current (ioloop's) thread + connection.add_callback_threadsafe(self.on_requested_callback) + + return connection + + def start(self, *args, **kwargs): # pylint: disable=W0221 + self.loop_thread_ident = threading.current_thread().ident + self.my_start_time = None + self.got_callback = False + super(TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, self).start(*args, **kwargs) + self.assertTrue(self.got_callback) + + def begin(self, channel): + pass + + def on_requested_callback(self): + self.assertEqual(threading.current_thread().ident, + self.loop_thread_ident) + self.assertLess(time.time() - self.my_start_time, 0.25) + self.got_callback = True + self.stop() + + class TestAddCallbackThreadsafeFromIOLoopThread(AsyncTestCase, AsyncAdapters): DESCRIPTION = "Test add_callback_threadsafe request from same thread." @@ -546,6 +581,33 @@ def on_requested_callback(self): self.stop() +class TestIOLoopStopBeforeIOLoopStarts(AsyncTestCase, AsyncAdapters): + DESCRIPTION = "Test ioloop.stop() before ioloop starts causes ioloop to exit quickly." + + def _instantiate_connection(self, *args, **kwargs): # pylint: disable=W0221 + """We intercept this method from AsyncTestCase in order to call + ioloop.stop() before AsyncTestCase starts the ioloop. + """ + connection = super( + TestIOLoopStopBeforeIOLoopStarts, self).start( + *args, **kwargs) + + # Request ioloop to stop before it starts + self.my_start_time = time.time() + connection.ioloop.stop() + + return connection + + def start(self, *args, **kwargs): # pylint: disable=W0221 + self.loop_thread_ident = threading.current_thread().ident + self.my_start_time = None + super(TestIOLoopStopBeforeIOLoopStarts, self).start(*args, **kwargs) + self.assertLess(time.time() - self.my_start_time, 0.25) + + def begin(self, channel): + pass + + class TestViabilityOfMultipleTimeoutsWithSameDeadlineAndCallback(AsyncTestCase, AsyncAdapters): # pylint: disable=C0103 DESCRIPTION = "Test viability of multiple timeouts with same deadline and callback" diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index a0a28bc26..b1690ad0e 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -52,13 +52,20 @@ def begin(self, channel): # pylint: disable=R0201,W0613 """Extend to start the actual tests on the channel""" self.fail("AsyncTestCase.begin_test not extended") + def _instantiate_connection(self, adapter_class, ioloop_factory): + """Some tests need to subclass this in order to bootstrap their test + logic before we start the ioloop + """ + return adapter_class(self.parameters, self.on_open, + self.on_open_error, self.on_closed, + custom_ioloop=ioloop_factory()) + def start(self, adapter, ioloop_factory): self.logger.info('start at %s', datetime.utcnow()) self.adapter = adapter or self.ADAPTER - self.connection = self.adapter(self.parameters, self.on_open, - self.on_open_error, self.on_closed, - custom_ioloop=ioloop_factory()) + self.connection = self._instantiate_connection(self.adapter, + ioloop_factory) try: self.timeout = self.connection.add_timeout(self.TIMEOUT, self.on_timeout) @@ -68,14 +75,13 @@ def start(self, adapter, ioloop_factory): self.connection.ioloop.close() self.connection = None - def stop(self): """close the connection and stop the ioloop""" self.logger.info("Stopping test") if self.timeout is not None: self.connection.remove_timeout(self.timeout) self.timeout = None - self.connection.close() + self.connection.close() # NOTE: on_closed() will stop the ioloop def _stop(self): if hasattr(self, 'timeout') and self.timeout is not None: @@ -104,7 +110,7 @@ def on_open_error(self, connection, error): def on_timeout(self): """called when stuck waiting for connection to close""" self.logger.error('%s timed out; on_timeout called at %s', - self, datetime.utcnow()) + self, datetime.utcnow()) self.timeout = None # the dispatcher should have removed it self._timed_out = True # initiate cleanup diff --git a/tests/unit/select_connection_ioloop_tests.py b/tests/unit/select_connection_ioloop_tests.py index bfe555d78..3f40222c0 100644 --- a/tests/unit/select_connection_ioloop_tests.py +++ b/tests/unit/select_connection_ioloop_tests.py @@ -80,8 +80,8 @@ def on_timeout(self): self.fail('Test timed out') -class IOLoopCloseTestSelect(IOLoopBaseTest): - """ Test ioloop being stopped by another Thread. """ +class IOLoopCloseClosesSubordinateObjectsTestSelect(IOLoopBaseTest): + """ Test ioloop being closed """ SELECT_POLLER = 'select' def start_test(self): @@ -94,7 +94,37 @@ def start_test(self): mocks['_poller'].close.assert_called_once() self.assertIsNone(self.ioloop._callbacks) - self.ioloop.stop() + +class IOLoopCloseAfterStartReturnsTestSelect(IOLoopBaseTest): + """ Test IOLoop.close() after normal return from start(). """ + SELECT_POLLER = 'select' + + def start_test(self): + self.ioloop.stop() # so start will terminate quickly + self.start() + self.ioloop.close() + self.assertIsNone(self.ioloop._callbacks) + + +class IOLoopCloseBeforeStartReturnsTestSelect(IOLoopBaseTest): + """ Test calling IOLoop.close() before return from start() raises exception. """ + SELECT_POLLER = 'select' + + def start_test(self): + callback_completed = [] + + def call_close_from_callback(): + with self.assertRaises(AssertionError) as cm: + self.ioloop.close() + + self.assertEqual(cm.exception.args[0], + 'Cannot call close() before start() unwinds.') + self.ioloop.stop() + callback_completed.append(1) + + self.ioloop.add_callback_threadsafe(call_close_from_callback) + self.start() + self.assertEqual(callback_completed, [1]) class IOLoopThreadStopTestSelect(IOLoopBaseTest): @@ -108,7 +138,21 @@ def start_test(self): lambda: self.ioloop.add_callback_threadsafe(self.ioloop.stop)) self.addCleanup(timer.cancel) timer.start() - self.start() + self.start() # NOTE: Normal return from `start()` constitutes success + + +class IOLoopThreadStopTestSelect(IOLoopBaseTest): + """ Test ioloop being stopped by another Thread. """ + SELECT_POLLER = 'select' + + def start_test(self): + """Starts a thread that stops ioloop after a while and start polling""" + timer = threading.Timer( + 0.1, + lambda: self.ioloop.add_callback_threadsafe(self.ioloop.stop)) + self.addCleanup(timer.cancel) + timer.start() + self.start() # NOTE: Normal return from `start()` constitutes success @unittest.skipIf(not POLL_SUPPORTED, 'poll not supported') @@ -584,4 +628,4 @@ class PollerTestCaseEPoll(PollerTestCaseSelect): @unittest.skipIf(not KQUEUE_SUPPORTED, 'kqueue not supported') class PollerTestCaseKqueue(PollerTestCaseSelect): """Same as PollerTestCaseSelect but uses kqueue syscall""" - SELECT_POLLER = 'kqueue' \ No newline at end of file + SELECT_POLLER = 'kqueue' From 30c4148fdba02316784205c8a512bd6da5b03e98 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 21 Feb 2018 16:33:15 -0800 Subject: [PATCH 30/33] Fixed infinite recursion in new tests that override AsyncTestCase._instantiate_connection() --- tests/acceptance/async_adapter_tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/acceptance/async_adapter_tests.py b/tests/acceptance/async_adapter_tests.py index 988eafe36..8118b569f 100644 --- a/tests/acceptance/async_adapter_tests.py +++ b/tests/acceptance/async_adapter_tests.py @@ -503,7 +503,7 @@ def _instantiate_connection(self, *args, **kwargs): # pylint: disable=W0221 add_callback_threadsafe before AsyncTestCase starts the ioloop. """ connection = super( - TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, self).start( + TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, self)._instantiate_connection( *args, **kwargs) self.my_start_time = time.time() @@ -589,7 +589,7 @@ def _instantiate_connection(self, *args, **kwargs): # pylint: disable=W0221 ioloop.stop() before AsyncTestCase starts the ioloop. """ connection = super( - TestIOLoopStopBeforeIOLoopStarts, self).start( + TestIOLoopStopBeforeIOLoopStarts, self)._instantiate_connection( *args, **kwargs) # Request ioloop to stop before it starts From 34d94d4078321ca090f0f8b542b3fbf303e148e6 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 21 Feb 2018 16:55:40 -0800 Subject: [PATCH 31/33] In `Connection.close`, don't call `self._close_channels()` if there are not channels or the connection is still opening, in which case there should be no channels. --- pika/connection.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pika/connection.py b/pika/connection.py index 60d40caf4..03bfb5e38 100644 --- a/pika/connection.py +++ b/pika/connection.py @@ -1244,8 +1244,11 @@ def close(self, reply_code=200, reply_text='Normal shutdown'): LOGGER.warning('Suppressing close request on %s', self) return + # NOTE The connection is either in opening or open state + # Initiate graceful closing of channels that are OPEN or OPENING - self._close_channels(reply_code, reply_text) + if self._channels: + self._close_channels(reply_code, reply_text) # Set our connection state self._set_connection_state(self.CONNECTION_CLOSING) From f719d229a1f37dfc4337be10ce591078900c6300 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 21 Feb 2018 17:24:14 -0800 Subject: [PATCH 32/33] In TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, don't attempt closing the async connection before it is established to avoid triggering logic in the close code path that isn't ready to deal with it. --- tests/acceptance/async_adapter_tests.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/acceptance/async_adapter_tests.py b/tests/acceptance/async_adapter_tests.py index 8118b569f..c2b68aa02 100644 --- a/tests/acceptance/async_adapter_tests.py +++ b/tests/acceptance/async_adapter_tests.py @@ -520,14 +520,13 @@ def start(self, *args, **kwargs): # pylint: disable=W0221 self.assertTrue(self.got_callback) def begin(self, channel): - pass + self.stop() def on_requested_callback(self): self.assertEqual(threading.current_thread().ident, self.loop_thread_ident) self.assertLess(time.time() - self.my_start_time, 0.25) self.got_callback = True - self.stop() class TestAddCallbackThreadsafeFromIOLoopThread(AsyncTestCase, AsyncAdapters): From 5d062496abb8deddd08a813fa66634e86d2c2538 Mon Sep 17 00:00:00 2001 From: Vitaly Kruglikov Date: Wed, 21 Feb 2018 18:58:25 -0800 Subject: [PATCH 33/33] Facilitate safe cleanup after ioloop stops running in async adapter TestIOLoopStopBeforeIOLoopStarts acceptance test. --- tests/acceptance/async_adapter_tests.py | 24 +++++++-------- tests/acceptance/async_test_base.py | 40 +++++++++++++++---------- 2 files changed, 35 insertions(+), 29 deletions(-) diff --git a/tests/acceptance/async_adapter_tests.py b/tests/acceptance/async_adapter_tests.py index c2b68aa02..d392d0483 100644 --- a/tests/acceptance/async_adapter_tests.py +++ b/tests/acceptance/async_adapter_tests.py @@ -498,19 +498,18 @@ def on_closed(self, connection, reply_code, reply_text): class TestAddCallbackThreadsafeRequestBeforeIOLoopStarts(AsyncTestCase, AsyncAdapters): DESCRIPTION = "Test add_callback_threadsafe request before ioloop starts." - def _instantiate_connection(self, *args, **kwargs): # pylint: disable=W0221 + def _run_ioloop(self, *args, **kwargs): # pylint: disable=W0221 """We intercept this method from AsyncTestCase in order to call add_callback_threadsafe before AsyncTestCase starts the ioloop. - """ - connection = super( - TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, self)._instantiate_connection( - *args, **kwargs) + """ self.my_start_time = time.time() # Request a callback from our current (ioloop's) thread - connection.add_callback_threadsafe(self.on_requested_callback) + self.connection.add_callback_threadsafe(self.on_requested_callback) - return connection + return super( + TestAddCallbackThreadsafeRequestBeforeIOLoopStarts, self)._run_ioloop( + *args, **kwargs) def start(self, *args, **kwargs): # pylint: disable=W0221 self.loop_thread_ident = threading.current_thread().ident @@ -583,19 +582,16 @@ def on_requested_callback(self): class TestIOLoopStopBeforeIOLoopStarts(AsyncTestCase, AsyncAdapters): DESCRIPTION = "Test ioloop.stop() before ioloop starts causes ioloop to exit quickly." - def _instantiate_connection(self, *args, **kwargs): # pylint: disable=W0221 + def _run_ioloop(self, *args, **kwargs): # pylint: disable=W0221 """We intercept this method from AsyncTestCase in order to call ioloop.stop() before AsyncTestCase starts the ioloop. """ - connection = super( - TestIOLoopStopBeforeIOLoopStarts, self)._instantiate_connection( - *args, **kwargs) - # Request ioloop to stop before it starts self.my_start_time = time.time() - connection.ioloop.stop() + self.stop_ioloop_only() - return connection + return super( + TestIOLoopStopBeforeIOLoopStarts, self)._run_ioloop(*args, **kwargs) def start(self, *args, **kwargs): # pylint: disable=W0221 self.loop_thread_ident = threading.current_thread().ident diff --git a/tests/acceptance/async_test_base.py b/tests/acceptance/async_test_base.py index b1690ad0e..db01e48e0 100644 --- a/tests/acceptance/async_test_base.py +++ b/tests/acceptance/async_test_base.py @@ -52,42 +52,52 @@ def begin(self, channel): # pylint: disable=R0201,W0613 """Extend to start the actual tests on the channel""" self.fail("AsyncTestCase.begin_test not extended") - def _instantiate_connection(self, adapter_class, ioloop_factory): - """Some tests need to subclass this in order to bootstrap their test - logic before we start the ioloop - """ - return adapter_class(self.parameters, self.on_open, - self.on_open_error, self.on_closed, - custom_ioloop=ioloop_factory()) - def start(self, adapter, ioloop_factory): self.logger.info('start at %s', datetime.utcnow()) self.adapter = adapter or self.ADAPTER - self.connection = self._instantiate_connection(self.adapter, - ioloop_factory) + self.connection = self.adapter(self.parameters, + self.on_open, + self.on_open_error, + self.on_closed, + custom_ioloop=ioloop_factory()) try: self.timeout = self.connection.add_timeout(self.TIMEOUT, self.on_timeout) - self.connection.ioloop.start() + self._run_ioloop() self.assertFalse(self._timed_out) finally: self.connection.ioloop.close() self.connection = None + def stop_ioloop_only(self): + """Request stopping of the connection's ioloop to end the test without + closing the connection + """ + self._safe_remove_test_timeout() + self.connection.ioloop.stop() + def stop(self): """close the connection and stop the ioloop""" self.logger.info("Stopping test") - if self.timeout is not None: - self.connection.remove_timeout(self.timeout) - self.timeout = None + self._safe_remove_test_timeout() self.connection.close() # NOTE: on_closed() will stop the ioloop - def _stop(self): + def _run_ioloop(self): + """Some tests need to subclass this in order to bootstrap their test + logic after we instantiate the connection and assign it to + `self.connection`, but before we run the ioloop + """ + self.connection.ioloop.start() + + def _safe_remove_test_timeout(self): if hasattr(self, 'timeout') and self.timeout is not None: self.logger.info("Removing timeout") self.connection.remove_timeout(self.timeout) self.timeout = None + + def _stop(self): + self._safe_remove_test_timeout() if hasattr(self, 'connection') and self.connection is not None: self.logger.info("Stopping ioloop") self.connection.ioloop.stop()