Skip to content

Commit

Permalink
Fix switch connection destination when a rabbitmq cluster node disappear
Browse files Browse the repository at this point in the history
In a clustered rabbitmq when a node disappears, we get a
ConnectionRefusedError because the socket get disconnected.

The socket access yields a OSError because the heartbeat
tries to reach an unreachable host (No route to host).

Catch these exceptions to ensure that we call ensure_connection for switching
the connection destination.

POC is available at github.com:4383/rabbitmq-oslo_messging-error-poc

Example:
    $ git clone git@github.com:4383/rabbitmq-oslo_messging-error-poc
    $ cd rabbitmq-oslo_messging-error-poc
    $ python -m virtualenv .
    $ source bin/activate
    $ pip install -r requirements.txt
    $ sudo podman run -d --hostname my-rabbit --name rabbit rabbitmq:3
    $ python poc.py $(sudo podman inspect rabbit | niet '.[0].NetworkSettings.IPAddress')

And in parallele in an another shell|tmux
    $ podman stop rabbit
    $ # observe the output of the poc.py script we now call ensure_connection

Now you can observe some output relative to the connection who is
modified and not catched before these changes.

Related to: https://bugzilla.redhat.com/show_bug.cgi?id=1665399

Closes-Bug: #1828841

Change-Id: I9dc1644cac0e39eb11bf05f57bde77dcf6d42ed3
  • Loading branch information
4383 committed May 13, 2019
1 parent 558fc5f commit 9d8b143
Showing 1 changed file with 18 additions and 0 deletions.
18 changes: 18 additions & 0 deletions oslo_messaging/_drivers/impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -918,6 +918,14 @@ def _heartbeat_stop(self):
def _heartbeat_thread_job(self):
"""Thread that maintains inactive connections
"""
# NOTE(hberaud): Python2 doesn't have ConnectionRefusedError
# defined so to switch connections destination on failure
# with python2 and python3 we need to wrapp adapt connection refused
try:
ConnectRefuseError = ConnectionRefusedError
except NameError:
ConnectRefuseError = socket.error

while not self._heartbeat_exit_event.is_set():
with self._connection_lock.for_heartbeat():

Expand All @@ -934,7 +942,17 @@ def _heartbeat_thread_job(self):
self.connection.drain_events(timeout=0.001)
except socket.timeout:
pass
# NOTE(hberaud): In a clustered rabbitmq when
# a node disappears, we get a ConnectionRefusedError
# because the socket get disconnected.
# The socket access yields a OSError because the heartbeat
# tries to reach an unreachable host (No route to host).
# Catch these exceptions to ensure that we call
# ensure_connection for switching the
# connection destination.
except (socket.timeout,
ConnectRefuseError,
OSError,
kombu.exceptions.OperationalError) as exc:
LOG.info(_LI("A recoverable connection/channel error "
"occurred, trying to reconnect: %s"), exc)
Expand Down

0 comments on commit 9d8b143

Please sign in to comment.