Skip to content

Commit

Permalink
Merge "Cancel consumer if queue down"
Browse files Browse the repository at this point in the history
  • Loading branch information
Zuul authored and openstack-gerrit committed Aug 1, 2020
2 parents 599c0b9 + 196fa87 commit 8d78ab2
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 31 deletions.
83 changes: 52 additions & 31 deletions oslo_messaging/_drivers/impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,11 @@
'for direct send. The direct send is used as reply, '
'so the MessageUndeliverable exception is raised '
'in case the client queue does not exist.'),
cfg.BoolOpt('enable_cancel_on_failover',
default=False,
help="Enable x-cancel-on-ha-failover flag so that "
"rabbitmq server will cancel and notify consumers"
"when queue is down")
]

LOG = logging.getLogger(__name__)
Expand Down Expand Up @@ -234,7 +239,8 @@ class Consumer(object):

def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
Expand All @@ -256,18 +262,26 @@ def __init__(self, exchange_name, queue_name, routing_key, type, durable,
type=type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover

def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""

consumer_arguments = None
if self.enable_cancel_on_failover:
consumer_arguments = {
"x-cancel-on-ha-failover": True}

self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.queue_auto_delete,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue_arguments=self.queue_arguments,
consumer_arguments=consumer_arguments
)

try:
LOG.debug('[%s] Queue.declare: %s',
Expand Down Expand Up @@ -468,6 +482,7 @@ def __init__(self, conf, url, purpose):
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.heartbeat_in_pthread = driver_conf.heartbeat_in_pthread
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover

if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run
Expand Down Expand Up @@ -1116,31 +1131,35 @@ def declare_direct_consumer(self, topic, callback):
responses for call/multicall
"""

consumer = Consumer(exchange_name='', # using default exchange
queue_name=topic,
routing_key='',
type='direct',
durable=False,
exchange_auto_delete=False,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
consumer = Consumer(
exchange_name='', # using default exchange
queue_name=topic,
routing_key='',
type='direct',
durable=False,
exchange_auto_delete=False,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)

self.declare_consumer(consumer)

def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
consumer = Consumer(
exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover)

self.declare_consumer(consumer)

Expand All @@ -1151,16 +1170,18 @@ def declare_fanout_consumer(self, topic, callback):
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)

consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
consumer = Consumer(
exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)

self.declare_consumer(consumer)

Expand Down
7 changes: 7 additions & 0 deletions oslo_messaging/tests/functional/test_rabbitmq.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
]

def test_failover_scenario(self):
self._test_failover_scenario()

def test_failover_scenario_enable_cancel_on_failover(self):
self._test_failover_scenario(enable_cancel_on_failover=True)

def _test_failover_scenario(self, enable_cancel_on_failover=False):
# NOTE(sileht): run this test only if functional suite run of a driver
# that use rabbitmq as backend
self.driver = os.environ.get('TRANSPORT_DRIVER')
Expand All @@ -53,6 +59,7 @@ def test_failover_scenario(self):
kombu_reconnect_delay=0,
rabbit_retry_interval=0,
rabbit_retry_backoff=0,
enable_cancel_on_failover=enable_cancel_on_failover,
group='oslo_messaging_rabbit')

self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
fixes:
- |
Add a new option `enable_cancel_on_failover` for rabbitmq driver
which when enabled, will cancel consumers when queue appears
to be down.

0 comments on commit 8d78ab2

Please sign in to comment.