Skip to content

Commit

Permalink
Kombu: make reply and fanout queues expire instead of auto-delete
Browse files Browse the repository at this point in the history
Right now fanout and reply queues are unconditionally created with
auto-delete flag which causes a number of problems listed in bug
1495568. Replacing auto-delete with queue expiration with some sane
timeout should fix all these issues at once.

Another problem being fixed is that auto-delete flag does not causes
the queue to be deleted if it never had consumers. An orphaned fanout
queue might appear that way and it will grow indefinitely until
somebody manually removes it. See bug 1515278 for details.

A new rabbit_transient_queues_ttl config parameter is introduced which
configures the TTL for reply and fanout queues. It is a positive
integer representing timeout in seconds. By default it is set to 10
minutes. That should be enough for application to reconnect or
for server to send reply to client which already died. At the same
time, it seems that not so many messages could be accumulated in
fanout queues during that time.

DocImpact
With this change RabbitMQ driver defines reply and fanout queues
differently comparing with the previous release: now they are defined
with queue TTL (https://www.rabbitmq.com/ttl.html#queue-ttl) instead
of auto-delete flag. That helps avoid a number of issues, see commit
description for details. A new rabbit_transient_queues_ttl parameter
is defined which controls the TTL value. It is set to 10 minutes by
default. The change does not affect upgrade in any way.

Closes-bug: #1495568
Closes-bug: #1515278

Co-Authored-by: Dmitry Mescheryakov <dmescheryakov@mirantis.com>
Change-Id: I83a8d09dc0cdae24c12d7043ec810529a9ce57ab
  • Loading branch information
jeckersb and dmitrymex committed Jan 13, 2016
1 parent 03b5103 commit 10625ee
Showing 1 changed file with 47 additions and 13 deletions.
60 changes: 47 additions & 13 deletions oslo_messaging/_drivers/impl_rabbit.py
Expand Up @@ -139,6 +139,13 @@
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=600,
help='Positive integer representing duration in seconds for '
'queue TTL (x-expires). Queues which are unused for the '
'duration of the TTL are automatically deleted. The '
'parameter affects only reply and fanout queues.'),
cfg.IntOpt('heartbeat_timeout_threshold',
default=60,
help="Number of seconds after which the Rabbit broker is "
Expand All @@ -160,7 +167,7 @@
LOG = logging.getLogger(__name__)


def _get_queue_arguments(rabbit_ha_queues):
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue
Expand All @@ -170,8 +177,25 @@ def _get_queue_arguments(rabbit_ha_queues):
Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster.
If the rabbit_queue_ttl option is > 0, then the queue is
declared with the "Queue TTL" value as described here:
https://www.rabbitmq.com/ttl.html
Setting a queue TTL causes the queue to be automatically deleted
if it is unused for the TTL duration. This is a helpful safeguard
to prevent queues with zero consumers from growing without bound.
"""
return {'x-ha-policy': 'all'} if rabbit_ha_queues else {}
args = {}

if rabbit_ha_queues:
args['x-ha-policy'] = 'all'

if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000

return args


class RabbitMessage(dict):
Expand All @@ -194,26 +218,29 @@ class Consumer(object):
"""Consumer class."""

def __init__(self, exchange_name, queue_name, routing_key, type, durable,
auto_delete, callback, nowait=True, rabbit_ha_queues=None):
exchange_auto_delete, queue_auto_delete, callback,
nowait=True, rabbit_ha_queues=None, rabbit_queue_ttl=0):
"""Init the Publisher class with the exchange_name, routing_key,
type, durable auto_delete
"""
self.queue_name = queue_name
self.exchange_name = exchange_name
self.routing_key = routing_key
self.auto_delete = auto_delete
self.exchange_auto_delete = exchange_auto_delete
self.queue_auto_delete = queue_auto_delete
self.durable = durable
self.callback = callback
self.type = type
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues)
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)

self.queue = None
self.exchange = kombu.entity.Exchange(
name=exchange_name,
type=type,
durable=self.durable,
auto_delete=self.auto_delete)
auto_delete=self.exchange_auto_delete)

def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
Expand All @@ -222,7 +249,7 @@ def declare(self, conn):
channel=conn.channel,
exchange=self.exchange,
durable=self.durable,
auto_delete=self.auto_delete,
auto_delete=self.queue_auto_delete,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)

Expand Down Expand Up @@ -371,6 +398,8 @@ def __init__(self, conf, url, purpose):
self.rabbit_userid = driver_conf.rabbit_userid
self.rabbit_password = driver_conf.rabbit_password
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.heartbeat_timeout_threshold = \
driver_conf.heartbeat_timeout_threshold
self.heartbeat_rate = driver_conf.heartbeat_rate
Expand Down Expand Up @@ -917,9 +946,11 @@ def declare_direct_consumer(self, topic, callback):
routing_key=topic,
type='direct',
durable=False,
auto_delete=True,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)

self.declare_consumer(consumer)

Expand All @@ -931,7 +962,8 @@ def declare_topic_consumer(self, exchange_name, topic, callback=None,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
auto_delete=self.amqp_auto_delete,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)

Expand All @@ -949,9 +981,11 @@ def declare_fanout_consumer(self, topic, callback):
routing_key=topic,
type='fanout',
durable=False,
auto_delete=True,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)

self.declare_consumer(consumer)

Expand Down Expand Up @@ -1030,7 +1064,7 @@ def _publish_and_creates_default_queue(self, exchange, msg,
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues))
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
Expand Down

0 comments on commit 10625ee

Please sign in to comment.