diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 00484d695..bafeeb977 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -139,6 +139,13 @@ help='Use HA queues in RabbitMQ (x-ha-policy: all). ' 'If you change this option, you must wipe the ' 'RabbitMQ database.'), + cfg.IntOpt('rabbit_transient_queues_ttl', + min=1, + default=600, + help='Positive integer representing duration in seconds for ' + 'queue TTL (x-expires). Queues which are unused for the ' + 'duration of the TTL are automatically deleted. The ' + 'parameter affects only reply and fanout queues.'), cfg.IntOpt('heartbeat_timeout_threshold', default=60, help="Number of seconds after which the Rabbit broker is " @@ -160,7 +167,7 @@ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we declare a mirrored queue @@ -170,8 +177,25 @@ def _get_queue_arguments(rabbit_ha_queues): Setting x-ha-policy to all means that the queue will be mirrored to all nodes in the cluster. + + If the rabbit_queue_ttl option is > 0, then the queue is + declared with the "Queue TTL" value as described here: + + https://www.rabbitmq.com/ttl.html + + Setting a queue TTL causes the queue to be automatically deleted + if it is unused for the TTL duration. This is a helpful safeguard + to prevent queues with zero consumers from growing without bound. """ - return {'x-ha-policy': 'all'} if rabbit_ha_queues else {} + args = {} + + if rabbit_ha_queues: + args['x-ha-policy'] = 'all' + + if rabbit_queue_ttl > 0: + args['x-expires'] = rabbit_queue_ttl * 1000 + + return args class RabbitMessage(dict): @@ -194,26 +218,29 @@ class Consumer(object): """Consumer class.""" def __init__(self, exchange_name, queue_name, routing_key, type, durable, - auto_delete, callback, nowait=True, rabbit_ha_queues=None): + exchange_auto_delete, queue_auto_delete, callback, + nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0): """Init the Publisher class with the exchange_name, routing_key, type, durable auto_delete """ self.queue_name = queue_name self.exchange_name = exchange_name self.routing_key = routing_key - self.auto_delete = auto_delete + self.exchange_auto_delete = exchange_auto_delete + self.queue_auto_delete = queue_auto_delete self.durable = durable self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(rabbit_ha_queues) + self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, + rabbit_queue_ttl) self.queue = None self.exchange = kombu.entity.Exchange( name=exchange_name, type=type, durable=self.durable, - auto_delete=self.auto_delete) + auto_delete=self.exchange_auto_delete) def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" @@ -222,7 +249,7 @@ def declare(self, conn): channel=conn.channel, exchange=self.exchange, durable=self.durable, - auto_delete=self.auto_delete, + auto_delete=self.queue_auto_delete, routing_key=self.routing_key, queue_arguments=self.queue_arguments) @@ -371,6 +398,8 @@ def __init__(self, conf, url, purpose): self.rabbit_userid = driver_conf.rabbit_userid self.rabbit_password = driver_conf.rabbit_password self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_transient_queues_ttl = \ + driver_conf.rabbit_transient_queues_ttl self.heartbeat_timeout_threshold = \ driver_conf.heartbeat_timeout_threshold self.heartbeat_rate = driver_conf.heartbeat_rate @@ -917,9 +946,11 @@ def declare_direct_consumer(self, topic, callback): routing_key=topic, type='direct', durable=False, - auto_delete=True, + exchange_auto_delete=True, + queue_auto_delete=False, callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl) self.declare_consumer(consumer) @@ -931,7 +962,8 @@ def declare_topic_consumer(self, exchange_name, topic, callback=None, routing_key=topic, type='topic', durable=self.amqp_durable_queues, - auto_delete=self.amqp_auto_delete, + exchange_auto_delete=self.amqp_auto_delete, + queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues) @@ -949,9 +981,11 @@ def declare_fanout_consumer(self, topic, callback): routing_key=topic, type='fanout', durable=False, - auto_delete=True, + exchange_auto_delete=True, + queue_auto_delete=False, callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl) self.declare_consumer(consumer) @@ -1030,7 +1064,7 @@ def _publish_and_creates_default_queue(self, exchange, msg, auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues)) + queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: '