Skip to content

Commit

Permalink
Added list-unconsumed-queues action
Browse files Browse the repository at this point in the history
Add status inquiry list-unconsumed-queues action to allow operators to
determine which queues are not being consumed in each RMQ vhost. Useful for
troubleshooting message queue volume alerts.

Closes-Bug: 1767437
Change-Id: Icdd0b8c4db607701bc5e33d86e263b6a5f1bb7f5
  • Loading branch information
Drew Freiberger authored and Vultaire committed Feb 27, 2019
1 parent 065914d commit 6672c48
Show file tree
Hide file tree
Showing 7 changed files with 214 additions and 4 deletions.
8 changes: 8 additions & 0 deletions actions.yaml
Expand Up @@ -27,3 +27,11 @@ complete-cluster-series-upgrade:
cluster the upgrade is complete cluster wide.
This action should be performed on the current leader. Note the leader may
have changed during the series upgrade process.
list-unconsumed-queues:
description: |-
list queues which currently have zero consumers, results are like:
unconsumed-queue-count: "2"
unconsumed-queues:
$vhost:
"0": queue_name1 - 0
"1": $queue_name - $num_messages
44 changes: 40 additions & 4 deletions actions/actions.py
Expand Up @@ -13,9 +13,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import os
import re
from collections import OrderedDict
from subprocess import check_output, CalledProcessError
import sys

Expand Down Expand Up @@ -46,6 +47,8 @@ def _add_path(path):
pause_unit_helper,
resume_unit_helper,
assess_status,
list_vhosts,
vhost_queue_info,
)


Expand Down Expand Up @@ -112,11 +115,44 @@ def complete_cluster_series_upgrade(args):
assess_status(ConfigRenderer(CONFIG_FILES))


def list_unconsumed_queues(args):
"""List queues which are unconsumed in RabbitMQ"""
count = 0
for vhost in list_vhosts():
try:
queue_info_dict = vhost_queue_info(vhost)
except CalledProcessError as e:
# if no queues, just raises an exception
action_set({'output': e.output,
'return-code': e.returncode})
action_fail("Failed to query RabbitMQ vhost {} queues"
"".format(vhost))
return False

for queue in queue_info_dict:
if queue['consumers'] == 0:
vhostqueue = "unconsumed-queues.{}".format(count)
value = OrderedDict((
('vhost', vhost),
('name', queue['name']),
('messages', queue['messages']),
))
action_set({vhostqueue: json.dumps(value)})
count += 1

action_set({'unconsumed-queue-count': count})


# A dictionary of all the defined actions to callables (which take
# parsed arguments).
ACTIONS = {"pause": pause, "resume": resume, "cluster-status": cluster_status,
"check-queues": check_queues,
"complete-cluster-series-upgrade": complete_cluster_series_upgrade}
ACTIONS = {
"pause": pause,
"resume": resume,
"cluster-status": cluster_status,
"check-queues": check_queues,
"complete-cluster-series-upgrade": complete_cluster_series_upgrade,
"list-unconsumed-queues": list_unconsumed_queues,
}


def main(args):
Expand Down
1 change: 1 addition & 0 deletions actions/list-unconsumed-queues
28 changes: 28 additions & 0 deletions hooks/rabbit_utils.py
Expand Up @@ -191,6 +191,34 @@ def list_vhosts():
return []


def vhost_queue_info(vhost):
"""Provide a list of queue info objects for the given vhost in RabbitMQ
Each object provides name (str), messages (int), and consumers (int)
@raises CalledProcessError on failure to list_queues of the vhost
"""
cmd = [RABBITMQ_CTL, '-p', vhost, 'list_queues',
'name', 'messages', 'consumers']
output = subprocess.check_output(cmd).decode('utf-8')

queue_info = []
# NOTE(jamespage): Earlier rabbitmqctl versions append "...done"
# to the output of list_queues
if '...done' in output:
queues = output.split('\n')[1:-2]
else:
queues = output.split('\n')[1:-1]

for queue in queues:
[qname, qmsgs, qconsumers] = queue.split()
queue_info.append({
'name': qname,
'messages': int(qmsgs),
'consumers': int(qconsumers)
})

return queue_info


def vhost_exists(vhost):
return vhost in list_vhosts()

Expand Down
29 changes: 29 additions & 0 deletions tests/basic_deployment.py
Expand Up @@ -722,3 +722,32 @@ def test_912_check_queues(self):

action_id = u.run_action(self.rmq0_sentry, "check-queues")
assert u.wait_on_action(action_id), "Check queues action failed."

def test_913_list_unconsumed_queues(self):
""" rabbitmqctl list-unconsumed-queues action can be returned. """
u.log.debug('Checking list-unconsumed-queues action...')

self._test_rmq_amqp_messages_all_units([self.rmq0_sentry])
action_id = u.run_action(self.rmq0_sentry, "list-unconsumed-queues")
assert u.wait_on_action(action_id), \
"list-unconsumed-queues action failed."

result = amulet.actions.get_action_output(action_id, full_output=True)
queue_count = int(result['results']['unconsumed-queue-count'])
assert queue_count > 0, 'Did not find any unconsumed queues.'

queue_name = 'test' # publish_amqp_message_by_unit default queue name
for i in range(queue_count):
queue_data = json.loads(
result['results']['unconsumed-queues'][str(i)])
if queue_data['name'] == queue_name:
break
else:
assert False, 'Did not find expected queue in result.'

# Since we just reused _test_rmq_amqp_messages_all_units, we should
# have created the queue if it didn't already exist, but all messages
# should have already been consumed.
assert queue_data['messages'] == 0, 'Found unexpected message count.'

u.log.debug('OK')
71 changes: 71 additions & 0 deletions unit_tests/test_actions.py
Expand Up @@ -111,6 +111,77 @@ def test_check_queues_execption(self):
'-p', '/'])


class ListUnconsumedQueuesTestCase(CharmTestCase):

def setUp(self):
super(ListUnconsumedQueuesTestCase, self).setUp(
actions, ["list_vhosts", "vhost_queue_info", "action_set",
"action_fail"])

def test_list_unconsumed_queues(self):
self.list_vhosts.return_value = ['/']
self.vhost_queue_info.return_value = [
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])

self.list_vhosts.assert_called()
self.vhost_queue_info.assert_called_once_with('/')
calls = [
mock.call({
"unconsumed-queues.0":
'{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}),
mock.call({'unconsumed-queue-count': 1})
]
self.action_set.assert_has_calls(calls)

def test_list_multiple_vhosts_unconsumed_queues(self):
self.list_vhosts.return_value = ['/', 'other_vhost']
self.vhost_queue_info.return_value = [
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])

self.list_vhosts.assert_called()
calls = [
mock.call({
"unconsumed-queues.0":
'{"vhost": "/", "name": "unconsumed_queue", "messages": 1}'}),
mock.call({
"unconsumed-queues.1":
'{"vhost": "other_vhost", "name": "unconsumed_queue", '
'"messages": 1}'}),
mock.call({'unconsumed-queue-count': 2})
]
self.action_set.assert_has_calls(calls)

def test_list_unconsumed_queues_no_unconsumed(self):
self.list_vhosts.return_value = ['/']
self.vhost_queue_info.return_value = [
{'name': 'consumed_queue', 'messages': 1, 'consumers': 1},
{'name': 'consumed_queue2', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])

self.list_vhosts.assert_called()
self.vhost_queue_info.assert_called_once_with('/')
self.action_set.assert_called_once_with({'unconsumed-queue-count': 0})

def test_list_unconsumed_queues_exception(self):
self.vhost_queue_info.side_effect = \
actions.CalledProcessError(1, "Failure")
self.list_vhosts.return_value = ['/']
self.vhost_queue_info.return_value = [
{'name': 'unconsumed_queue', 'messages': 1, 'consumers': 0},
{'name': 'consumed_queue', 'messages': 0, 'consumers': 1}]
actions.list_unconsumed_queues([])

self.list_vhosts.assert_called()
self.vhost_queue_info.assert_called_once_with('/')
self.action_set.assert_called()
self.action_fail.assert_called_once_with(
"Failed to query RabbitMQ vhost / queues")


class MainTestCase(CharmTestCase):

def setUp(self):
Expand Down
37 changes: 37 additions & 0 deletions unit_tests/test_rabbit_utils.py
Expand Up @@ -99,6 +99,21 @@ def test_write_all(self, log, render):
{partitions,[]}]
"""

RABBITMQCTL_LIST_QUEUES = b"""Listing queues ...
a_sample_queue 0 1
cinder-scheduler.cinder 0 1
cinder-fanout-12345 250 0
myqueue 0 1
...done
"""

RABBITMQCTL_LIST_VHOSTS = b"""Listing vhosts ...
/
landscape
openstack
...done
"""


class UtilsTests(CharmTestCase):
def setUp(self):
Expand Down Expand Up @@ -193,6 +208,28 @@ def test_running_nodes(self, mock_subprocess):
['rabbit@juju-devel3-machine-14',
'rabbit@juju-devel3-machine-19'])

@mock.patch('rabbit_utils.subprocess')
def test_list_vhosts(self, mock_subprocess):
'''Ensure list_vhosts parses output into the proper list'''
mock_subprocess.check_output.return_value = \
RABBITMQCTL_LIST_VHOSTS
self.assertEqual(rabbit_utils.list_vhosts(),
['/', 'landscape', 'openstack'])

@mock.patch('rabbit_utils.subprocess')
def test_vhost_queue_info(self, mock_subprocess):
'''Ensure vhost_queue_info parses output into the proper format/info'''
mock_subprocess.check_output.return_value = \
RABBITMQCTL_LIST_QUEUES
self.assertEqual(rabbit_utils.vhost_queue_info('openstack'),
[{'name': 'a_sample_queue', 'messages': 0,
'consumers': 1},
{'name': 'cinder-scheduler.cinder', 'messages': 0,
'consumers': 1},
{'name': 'cinder-fanout-12345', 'messages': 250,
'consumers': 0},
{'name': 'myqueue', 'messages': 0, 'consumers': 1}])

@mock.patch('rabbit_utils.subprocess')
def test_nodes_solo(self, mock_subprocess):
'''Ensure cluster_status can be parsed for a single unit deployment'''
Expand Down

0 comments on commit 6672c48

Please sign in to comment.