From 10625eed87b4c7f980bd5cd7cacbc4caa2dec197 Mon Sep 17 00:00:00 2001 From: John Eckersberg Date: Fri, 20 Nov 2015 17:25:58 -0500 Subject: [PATCH] Kombu: make reply and fanout queues expire instead of auto-delete Right now fanout and reply queues are unconditionally created with auto-delete flag which causes a number of problems listed in bug 1495568. Replacing auto-delete with queue expiration with some sane timeout should fix all these issues at once. Another problem being fixed is that auto-delete flag does not causes the queue to be deleted if it never had consumers. An orphaned fanout queue might appear that way and it will grow indefinitely until somebody manually removes it. See bug 1515278 for details. A new rabbit_transient_queues_ttl config parameter is introduced which configures the TTL for reply and fanout queues. It is a positive integer representing timeout in seconds. By default it is set to 10 minutes. That should be enough for application to reconnect or for server to send reply to client which already died. At the same time, it seems that not so many messages could be accumulated in fanout queues during that time. DocImpact With this change RabbitMQ driver defines reply and fanout queues differently comparing with the previous release: now they are defined with queue TTL (https://www.rabbitmq.com/ttl.html#queue-ttl) instead of auto-delete flag. That helps avoid a number of issues, see commit description for details. A new rabbit_transient_queues_ttl parameter is defined which controls the TTL value. It is set to 10 minutes by default. The change does not affect upgrade in any way. Closes-bug: #1495568 Closes-bug: #1515278 Co-Authored-by: Dmitry Mescheryakov Change-Id: I83a8d09dc0cdae24c12d7043ec810529a9ce57ab --- oslo_messaging/_drivers/impl_rabbit.py | 60 ++++++++++++++++++++------ 1 file changed, 47 insertions(+), 13 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 00484d695..bafeeb977 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -139,6 +139,13 @@ help='Use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the ' 'RabbitMQ database.'), + cfg.IntOpt('rabbit_transient_queues_ttl', + min=1, + default=600, + help='Positive integer representing duration in seconds for ' + 'queue TTL (x-expires). Queues which are unused for the ' + 'duration of the TTL are automatically deleted. The ' + 'parameter affects only reply and fanout queues.'), cfg.IntOpt('heartbeat_timeout_threshold', default=60, help="Number of seconds after which the Rabbit broker is " @@ -160,7 +167,7 @@ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we declare a mirrored queue @@ -170,8 +177,25 @@ def _get_queue_arguments(rabbit_ha_queues): Setting x-ha-policy to all means that the queue will be mirrored to all nodes in the cluster. + + If the rabbit_queue_ttl option is > 0, then the queue is + declared with the "Queue TTL" value as described here: + + https://www.rabbitmq.com/ttl.html + + Setting a queue TTL causes the queue to be automatically deleted + if it is unused for the TTL duration. This is a helpful safeguard + to prevent queues with zero consumers from growing without bound. """ - return {'x-ha-policy': 'all'} if rabbit_ha_queues else {} + args = {} + + if rabbit_ha_queues: + args['x-ha-policy'] = 'all' + + if rabbit_queue_ttl > 0: + args['x-expires'] = rabbit_queue_ttl * 1000 + + return args class RabbitMessage(dict): @@ -194,26 +218,29 @@ class Consumer(object): """Consumer class.""" def __init__(self, exchange_name, queue_name, routing_key, type, durable, - auto_delete, callback, nowait=True, rabbit_ha_queues=None): + exchange_auto_delete, queue_auto_delete, callback, + nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ self.queue_name = queue_name self.exchange_name = exchange_name self.routing_key = routing_key - self.auto_delete = auto_delete + self.exchange_auto_delete = exchange_auto_delete + self.queue_auto_delete = queue_auto_delete self.durable = durable self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(rabbit_ha_queues) + self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, + rabbit_queue_ttl) self.queue = None self.exchange = kombu.entity.Exchange( name=exchange_name, type=type, durable=self.durable, - auto_delete=self.auto_delete) + auto_delete=self.exchange_auto_delete) def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" @@ -222,7 +249,7 @@ def declare(self, conn): channel=conn.channel, exchange=self.exchange, durable=self.durable, - auto_delete=self.auto_delete, + auto_delete=self.queue_auto_delete, routing_key=self.routing_key, queue_arguments=self.queue_arguments) @@ -371,6 +398,8 @@ def __init__(self, conf, url, purpose): self.rabbit_userid = driver_conf.rabbit_userid self.rabbit_password = driver_conf.rabbit_password self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_transient_queues_ttl = \ + driver_conf.rabbit_transient_queues_ttl self.heartbeat_timeout_threshold = \ driver_conf.heartbeat_timeout_threshold self.heartbeat_rate = driver_conf.heartbeat_rate @@ -917,9 +946,11 @@ def declare_direct_consumer(self, topic, callback): routing_key=topic, type='direct', durable=False, - auto_delete=True, + exchange_auto_delete=True, + queue_auto_delete=False, callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl) self.declare_consumer(consumer) @@ -931,7 +962,8 @@ def declare_topic_consumer(self, exchange_name, topic, callback=None, routing_key=topic, type='topic', durable=self.amqp_durable_queues, - auto_delete=self.amqp_auto_delete, + exchange_auto_delete=self.amqp_auto_delete, + queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues) @@ -949,9 +981,11 @@ def declare_fanout_consumer(self, topic, callback): routing_key=topic, type='fanout', durable=False, - auto_delete=True, + exchange_auto_delete=True, + queue_auto_delete=False, callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl) self.declare_consumer(consumer) @@ -1030,7 +1064,7 @@ def _publish_and_creates_default_queue(self, exchange, msg, auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues)) + queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: '