Skip to content

Commit

Permalink
Merge pull request #557 from vitaly-krugl/blocking-adapter-disconnect
Browse files Browse the repository at this point in the history
Get BlockingConnection into consistent state upon loss of TCP/IP connection with broker + acceptance tests
  • Loading branch information
gmr committed May 4, 2015
2 parents 9b33033 + 294904e commit 495d67c
Show file tree
Hide file tree
Showing 4 changed files with 680 additions and 11 deletions.
9 changes: 6 additions & 3 deletions pika/adapters/base_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def remove_timeout(self, timeout_id):
def _adapter_connect(self):
"""Connect to the RabbitMQ broker, returning True if connected
:rtype: bool
:returns: error string or exception instance on error; None on success
"""
# Get the addresses for the socket, supporting IPv4 & IPv6
Expand Down Expand Up @@ -158,7 +158,7 @@ def _check_state_on_disconnect(self):
raise exceptions.ProbableAccessDeniedError
elif self.is_open:
LOGGER.warning("Socket closed when connection was open")
elif not self.is_closed:
elif not self.is_closed and not self.is_closing:
LOGGER.warning('Unknown state on disconnect: %i',
self.connection_state)

Expand All @@ -176,7 +176,10 @@ def _cleanup_socket(self):
self.socket = None

def _create_and_connect_to_socket(self, sock_addr_tuple):
"""Create socket and connect to it, using SSL if enabled."""
"""Create socket and connect to it, using SSL if enabled.
:returns: error string on failure; None on success
"""
self.socket = socket.socket(sock_addr_tuple[0], socket.SOCK_STREAM, 0)
self.socket.setsockopt(SOL_TCP, socket.TCP_NODELAY, 1)
self.socket.settimeout(self.params.socket_timeout)
Expand Down
24 changes: 16 additions & 8 deletions pika/adapters/blocking_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,7 @@ def connect(self):
"""
self._set_connection_state(self.CONNECTION_INIT)
error = self._adapter_connect()
if error:
raise exceptions.AMQPConnectionError(error)
self._adapter_connect()

def process_data_events(self):
"""Will make sure that data events are processed. Your app can
Expand Down Expand Up @@ -292,14 +290,15 @@ def sleep(self, duration):
def _adapter_connect(self):
"""Connect to the RabbitMQ broker
:rtype: bool
:raises: pika.Exceptions.AMQPConnectionError
"""
# Remove the default behavior for connection errors
self.callbacks.remove(0, self.ON_CONNECTION_ERROR)
error = super(BlockingConnection, self)._adapter_connect()
if error:
# Restore disconnected state and raise
self._adapter_disconnect(reset_only=True)
raise exceptions.AMQPConnectionError(error)
self.socket.settimeout(self.SOCKET_CONNECT_TIMEOUT)
self._frames_written_without_read = 0
Expand All @@ -312,12 +311,21 @@ def _adapter_connect(self):
self.socket.settimeout(self.params.socket_timeout)
self._set_connection_state(self.CONNECTION_OPEN)

def _adapter_disconnect(self):
"""Called if the connection is being requested to disconnect."""
def _adapter_disconnect(self, reset_only=False):
"""Called if the connection is being requested to disconnect.
:param bool reset_only: if true, just reset the connection and don't
bother interpreting connection state
"""
self._remove_heartbeat()
self._cleanup_socket()
self._check_state_on_disconnect()
self._init_connection_state()
try:
if not reset_only:
# NOTE: this may raise an exception
self._check_state_on_disconnect()
finally:
# Complete transition to diconnected state
self._init_connection_state()

@staticmethod
def _call_timeout_method(timeout_value):
Expand Down
249 changes: 249 additions & 0 deletions tests/acceptance/blocking_adapter_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,249 @@
import logging
import socket
try:
import unittest2 as unittest
except ImportError:
import unittest

import uuid

from forward_server import ForwardServer

import pika
import pika.exceptions

LOGGER = logging.getLogger(__name__)
PARAMETERS_TEMPLATE = 'amqp://guest:guest@localhost:%(port)s/%%2f'
PARAMETERS = pika.URLParameters(PARAMETERS_TEMPLATE % {'port': 5672})
DEFAULT_TIMEOUT = 15



class BlockingTestCase(unittest.TestCase):

def _connect(self,
parameters=PARAMETERS,
connection_class=pika.BlockingConnection):
connection = connection_class(parameters)
self.addCleanup(lambda: self.connection.close
if self.connection.is_open else None)
return connection

def _on_test_timeout(self):
"""Called when test times out"""
self.fail('Test timed out')


class TestSuddenBrokerDisconnectBeforeChannel(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
"""BlockingConnection resets properly on TCP/IP drop during channel()
"""
with ForwardServer((PARAMETERS.host, PARAMETERS.port)) as fwd:
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": fwd.server_address[1]}))

self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)


# Once outside the context, the connection is broken

# BlockingConnection should raise ConnectionClosed
with self.assertRaises(pika.exceptions.ConnectionClosed):
channel = self.connection.channel()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)


class TestConnectWithDownedBroker(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
""" BlockingConnection to downed broker results in AMQPConnectionError
"""
# Reserve a port for use in connect
sock = socket.socket()
self.addCleanup(sock.close)

sock.bind(("localhost", 0))


with self.assertRaises(pika.exceptions.AMQPConnectionError):
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": sock.getsockname()[1]})
)

# Should never get here
self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)


class TestReconnectWithDownedBroker(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
""" BlockingConnection reconnect with downed broker
"""
with ForwardServer((PARAMETERS.host, PARAMETERS.port)) as fwd:
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": fwd.server_address[1]}))

self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)

# Once outside the context, the connection is broken

# BlockingConnection should raise AMQPConnectionError
with self.assertRaises(pika.exceptions.ConnectionClosed):
channel = self.connection.channel()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)

# Now attempt to reconnect
with self.assertRaises(pika.exceptions.AMQPConnectionError):
self.connection.connect()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)


class TestDisconnectDuringConnectionStart(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
""" BlockingConnection TCP/IP connection loss in CONNECTION_START
"""
fwd = ForwardServer((PARAMETERS.host, PARAMETERS.port))
fwd.start()
self.addCleanup(lambda: fwd.stop() if fwd.running else None)

# Establish and close connection
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": fwd.server_address[1]}))

self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)

self.connection.close()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)

# Set up hook to disconnect on Connection.Start frame from broker
self.connection.callbacks.add(
0, pika.spec.Connection.Start,
lambda *args, **kwards: fwd.stop())

# Now, attempt to reconnect
with self.assertRaises(pika.exceptions.ProbableAuthenticationError):
self.connection.connect()

# Verify that connection state reflects a closed connection
self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)


class TestDisconnectDuringConnectionTune(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
""" BlockingConnection TCP/IP connection loss in CONNECTION_TUNE
"""
fwd = ForwardServer((PARAMETERS.host, PARAMETERS.port))
fwd.start()
self.addCleanup(lambda: fwd.stop() if fwd.running else None)

# Establish and close connection
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": fwd.server_address[1]}))

self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)

self.connection.close()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)

# Set up hook to disconnect on Connection.Tune frame from broker
self.connection.callbacks.add(
0, pika.spec.Connection.Tune,
lambda *args, **kwards: fwd.stop())

# Now, attempt to reconnect
with self.assertRaises(pika.exceptions.ProbableAccessDeniedError):
self.connection.connect()

# Verify that connection state reflects a closed connection
self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)


class TestDisconnectDuringConnectionProtocol(BlockingTestCase):

TIMEOUT = DEFAULT_TIMEOUT

def start_test(self):
""" BlockingConnection TCP/IP connection loss in CONNECTION_PROTOCOL
"""
fwd = ForwardServer((PARAMETERS.host, PARAMETERS.port))
fwd.start()
self.addCleanup(lambda: fwd.stop() if fwd.running else None)

class MyConnection(pika.BlockingConnection):
DROP_REQUESTED = False
def _on_connected(self, *args, **kwargs):
"""Base method override for forcing TCP/IP connection loss"""
if self.DROP_REQUESTED:
# Drop
fwd.stop()
# Proceede to CONNECTION_PROTOCOL state
return super(MyConnection, self)._on_connected(*args, **kwargs)

# Establish and close connection
self.connection = self._connect(
pika.URLParameters(
PARAMETERS_TEMPLATE % {"port": fwd.server_address[1]}),
connection_class=MyConnection)

self.timeout = self.connection.add_timeout(self.TIMEOUT,
self._on_test_timeout)

self.connection.close()

self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)

# Request TCP/IP connection loss during subsequent reconnect
MyConnection.DROP_REQUESTED = True

# Now, attempt to reconnect
with self.assertRaises(pika.exceptions.IncompatibleProtocolError):
self.connection.connect()

# Verify that connection state reflects a closed connection
self.assertTrue(self.connection.is_closed)
self.assertFalse(self.connection.is_open)
self.assertIsNone(self.connection.socket)

0 comments on commit 495d67c

Please sign in to comment.