Skip to content

Commit

Permalink
Merge pull request #567 from vitaly-krugl/reset-read-poller
Browse files Browse the repository at this point in the history
Fix issue #412: reset BlockingConnection._read_poller in BlockingConnection._adapter_disconnect() to guard against accidental access to old file descriptor
  • Loading branch information
gmr committed May 19, 2015
2 parents b6f2be7 + 62810fb commit 25a8b0f
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 1 deletion.
5 changes: 4 additions & 1 deletion pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,9 @@ def __init__(self, parameters=None):
:raises: RuntimeError
"""
super(BlockingConnection, self).__init__(parameters, None, False)
super(BlockingConnection, self).__init__(parameters,
on_open_callback=None,
stop_ioloop_on_close=False)
self._socket_timeouts = 0

def add_on_close_callback(self, callback_method_unused):
Expand Down Expand Up @@ -320,6 +322,7 @@ def _adapter_disconnect(self, reset_only=False):
"""
self._remove_heartbeat()
self._cleanup_socket()
self._read_poller = None
try:
if not reset_only:
# NOTE: this may raise an exception
Expand Down
33 changes: 33 additions & 0 deletions tests/acceptance/blocking_adapter_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,39 @@ def start_test(self):
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)

class TestNoAccessToFileDescriptorAfterConnectionClosed(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
"""BlockingConnection can't access file descriptor after \
ConnectionClosed
"""
with ForwardServer((PARAMETERS.host, PARAMETERS.port)) as fwd:
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": fwd.server_address[1]}))

self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)


# Once outside the context, the connection is broken

# BlockingConnection should raise ConnectionClosed
with self.assertRaises(pika.exceptions.ConnectionClosed):
channel = self.connection.channel()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)

# Attempt to operate on the connection once again after ConnectionClosed
self.assertIsNone(self.connection._read_poller)
self.assertIsNone(self.connection.socket)
with self.assertRaises(AttributeError):
channel = self.connection.channel()


class TestConnectWithDownedBroker(BlockingTestCase):

Expand Down

0 comments on commit 25a8b0f

Please sign in to comment.