Skip to content

Commit

Permalink
Adding support for rabbitmq quorum queues
Browse files Browse the repository at this point in the history
https://www.rabbitmq.com/quorum-queues.html

The quorum queue is a modern queue type for RabbitMQ implementing a
durable, replicated FIFO queue based on the Raft consensus algorithm. It
is available as of RabbitMQ 3.8.0.

the quorum queues can not be set by policy so this should be done when
declaring the queue.

To declare a quorum queue set the x-queue-type queue argument to quorum
(the default is classic). This argument must be provided by a client at
queue declaration time; it cannot be set or changed using a policy. This
is because policy definition or applicable policy can be changed
dynamically but queue type cannot. It must be specified at the time of
declaration.

its good for the oslo messaging to add support for that type of queue
that have multiple advantaged over mirroring.

If quorum queues are sets mirrored queues will be ignored.

Closes-Bug: #1942933
Change-Id: Id573e04c287e034e50626daf6e18a34735d45251
  • Loading branch information
4383 authored and hamalq committed Feb 5, 2022
1 parent f9de265 commit 7e8acbf
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 9 deletions.
1 change: 1 addition & 0 deletions doc/source/admin/rabbit.rst
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ Consuming Options
^^^^^^^^^^^^^^^^^

- :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue`
- :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl`

Connection Options
Expand Down
4 changes: 3 additions & 1 deletion oslo_messaging/_drivers/amqp.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
help='Use durable queues in AMQP.'),
help='Use durable queues in AMQP. If rabbit_quorum_queue '
'is enabled, queues will be durable and this value will '
'be ignored.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
deprecated_group='DEFAULT',
Expand Down
59 changes: 51 additions & 8 deletions oslo_messaging/_drivers/impl_rabbit.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,17 @@
'nodes, run: '
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
"""'{"ha-mode": "all"}' \""""),
cfg.BoolOpt('rabbit_quorum_queue',
default=False,
help='Use quorum queues in RabbitMQ (x-queue-type: quorum). '
'The quorum queue is a modern queue type for RabbitMQ '
'implementing a durable, replicated FIFO queue based on the '
'Raft consensus algorithm. It is available as of '
'RabbitMQ 3.8.0. If set this option will conflict with '
'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
'in other words the HA queues should be disabled, quorum '
'queues durable by default so the amqp_durable_queues '
'opion is ignored when this option enabled.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
Expand Down Expand Up @@ -191,7 +202,8 @@
LOG = logging.getLogger(__name__)


def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
Expand All @@ -214,12 +226,31 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
Setting a queue TTL causes the queue to be automatically deleted
if it is unused for the TTL duration. This is a helpful safeguard
to prevent queues with zero consumers from growing without bound.
If the rabbit_quorum_queue option is set, we try to declare a mirrored
queue as described here:
https://www.rabbitmq.com/quorum-queues.html
Setting x-queue-type to quorum means that replicated FIFO queue based on
the Raft consensus algorithm will be used. It is available as of
RabbitMQ 3.8.0. If set this option will conflict with
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
in other words HA queues should be disabled.
"""
args = {}

if rabbit_quorum_queue and rabbit_ha_queues:
raise RuntimeError('Configuration Error: rabbit_quorum_queue '
'and rabbit_ha_queues both enabled, queue '
'type is quorum or HA (mirrored) not both')

if rabbit_ha_queues:
args['x-ha-policy'] = 'all'

if rabbit_quorum_queue:
args['x-queue-type'] = 'quorum'

if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000

Expand Down Expand Up @@ -248,7 +279,7 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False):
enable_cancel_on_failover=False, rabbit_quorum_queue=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
Expand All @@ -262,7 +293,8 @@ def __init__(self, exchange_name, queue_name, routing_key, type, durable,
self.type = type
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)
rabbit_queue_ttl,
rabbit_quorum_queue)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
Expand Down Expand Up @@ -475,6 +507,7 @@ def __init__(self, conf, url, purpose):

self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
Expand Down Expand Up @@ -674,6 +707,12 @@ def __init__(self, conf, url, purpose):
except AttributeError:
pass

@property
def durable(self):
# Quorum queues are durable by default, durable option should
# be enabled by default with quorum queues
return self.amqp_durable_queues or self.rabbit_quorum_queue

@classmethod
def validate_ssl_version(cls, version):
key = version.lower()
Expand Down Expand Up @@ -1163,12 +1202,13 @@ def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
durable=self.durable,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover)
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_quorum_queue)

self.declare_consumer(consumer)

Expand Down Expand Up @@ -1280,7 +1320,10 @@ def _publish_and_creates_default_queue(self, exchange, msg,
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
queue_arguments=_get_queue_arguments(
self.rabbit_ha_queues,
0,
self.rabbit_quorum_queue))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
Expand Down Expand Up @@ -1336,7 +1379,7 @@ def topic_send(self, exchange_name, topic, msg, timeout=None, retry=None,
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.amqp_durable_queues,
durable=self.durable,
auto_delete=self.amqp_auto_delete)

self._ensure_publishing(self._publish, exchange, msg,
Expand All @@ -1358,7 +1401,7 @@ def notify_send(self, exchange_name, topic, msg, retry=None, **kwargs):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.amqp_durable_queues,
durable=self.durable,
auto_delete=self.amqp_auto_delete)

self._ensure_publishing(self._publish_and_creates_default_queue,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
---
features:
- |
Adding support for quorum queues. Quorum queues are enabled if the
``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``).
Setting x-queue-type to quorum means that replicated FIFO queue based on
the Raft consensus algorithm will be used. It is available as of
RabbitMQ 3.8.0. The quorum queues are durable by default
(``amqp_durable_queues``) will be ignored.
when enabled the HA queues (``rabbit_ha_queues``) aka mirrored queues
should be disabled since the queue can't be both types at the same time

0 comments on commit 7e8acbf

Please sign in to comment.