Skip to content

Commit

Permalink
Move the monitor_queues script to this repository.
Browse files Browse the repository at this point in the history
  • Loading branch information
Logan Hanks committed Jan 18, 2012
1 parent 7a99b2d commit e911a23
Show file tree
Hide file tree
Showing 2 changed files with 263 additions and 0 deletions.
134 changes: 134 additions & 0 deletions monitor_queues.py
@@ -0,0 +1,134 @@
#!/usr/bin/env python

import logging
import socket
import subprocess
import sys
import time
import urllib

# Alert noisiness is suppressed by these two factors. First, a queue must be in
# an alerting state continuously for at least ALERT_GRACE_PERIOD seconds.
# Second, no alert for a single queue will be repeated within ALERT_RATE_LIMIT
# seconds.
ALERT_GRACE_PERIOD = 5
ALERT_RATE_LIMIT = 15

HAROLD_BASE = 'http://127.0.0.1:8888/harold'
HAROLD_SECRET = 'secret'
HAROLD_HEARTBEAT_INTERVAL = 60 # seconds
HAROLD_HEARTBEAT_TIMEOUT_FACTOR = 3

GRAPHITE_HOST = '10.114.195.155'
GRAPHITE_PORT = 2003

POLL_INTERVAL = 1.0

QUEUE_LIMITS = dict(
scraper_q=4000,
newcomments_q=200,
log_q=10000,
usage_q=8000,
commentstree_q=1000,
corrections_q=500,
spam_q=500,
vote_comment_q=10000,
vote_link_q=10000,
indextank_changes=50000,
solrsearch_changes=100000,
ratelimit_q=sys.maxint,
)

# dict of queue name to timestamp of beginning of current overrun status
overruns = {}

# dict of queue name to time of last alert
recent_alerts = {}

# timestamp of the last time a heartbeat was sent
last_heartbeat = 0

def harold_url(command):
return '%s/%s/%s' % (HAROLD_BASE, command, HAROLD_SECRET)

def harold_send_message(command, **data):
return urllib.urlopen(harold_url(command), urllib.urlencode(data))

def send_heartbeat():
global last_heartbeat
last_heartbeat = time.time()
interval = HAROLD_HEARTBEAT_INTERVAL * HAROLD_HEARTBEAT_TIMEOUT_FACTOR
harold_send_message('heartbeat', tag='monitor_queues', interval=interval)

def get_queue_lengths():
pipe = subprocess.Popen(['/usr/sbin/rabbitmqctl', 'list_queues'],
stdout=subprocess.PIPE)
output = pipe.communicate()[0]
lines = output.split('\n')[1:-2] # throw away first line and last lines
return dict((name, int(value)) for name, value in
(line.split() for line in lines))

def send_graphite_message(msg):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((GRAPHITE_HOST, GRAPHITE_PORT))
sock.send(msg + '\n')
sock.close()

def send_queue_stats(queue_lengths):
stat_msgs = []
now = time.time()
for name, length in queue_lengths.iteritems():
stat_msgs.append('stats.queue.%s.length %d %d' % (name, length, now))
if not stat_msgs:
return
send_graphite_message('\n'.join(stat_msgs))

def send_queue_alert(queue_name, queue_length, alert_threshold):
alert = dict(
tag=queue_name,
message='%s is too long (%d/%d)' % (
queue_name, queue_length, alert_threshold)
)
logging.warn('ALERT on %(tag)s: %(message)s' % alert)
harold_send_message('alert', **alert)

def update_queue_status(queue_name, queue_length, alert_threshold):
if queue_length <= alert_threshold:
if queue_name in overruns:
del overruns[queue_name]
return False
else:
now = time.time()
overruns.setdefault(queue_name, now)
if (now - overruns[queue_name] >= ALERT_GRACE_PERIOD
and recent_alerts.get(queue_name, 0) + ALERT_RATE_LIMIT <= now):
send_queue_alert(queue_name, queue_length, alert_threshold)
recent_alerts[queue_name] = now
return True
else:
logging.warn('suppressing continued alert on %s', queue_name)
return False

def check_queues():
queue_lengths = get_queue_lengths()
for name, length in queue_lengths.iteritems():
update_queue_status(name, length, QUEUE_LIMITS.get(name, sys.maxint))
send_queue_stats(queue_lengths)
if time.time() - last_heartbeat >= HAROLD_HEARTBEAT_INTERVAL:
send_heartbeat()

def main():
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s %(levelname)s %(message)s',
)
while True:
logging.info('checking on queues')
try:
check_queues()
except:
logging.exception('exception raised in check_queues')
time.sleep(POLL_INTERVAL)

if __name__ == '__main__':
main()
129 changes: 129 additions & 0 deletions monitor_queues_test.py
@@ -0,0 +1,129 @@
#!/usr/bin/env python

import subprocess
import time
import unittest

import monitor_queues
import testing

class MockPipe:
def __init__(self, pipe_output):
self.pipe_output = pipe_output

def communicate(self):
return (self.pipe_output, '')

class MonitorQueuesTest(unittest.TestCase):
@testing.stub(subprocess, 'Popen')
def test_get_queue_lengths(self):
output = 'Listing queues ...\nA\t1\nB\t2\n...done.\n'
subprocess.Popen = lambda cmd, stdout: MockPipe(output)
self.assertEquals(dict(A=1, B=2), monitor_queues.get_queue_lengths())

@testing.stub(time, 'time')
@testing.stub(monitor_queues, 'send_graphite_message')
def test_send_queue_stats(self):
sent_messages = set()
time.time = lambda: 1000
monitor_queues.send_graphite_message = (
lambda msg: sent_messages.update(msg.split('\n')))
monitor_queues.send_queue_stats(dict(a=1, b=2))
expected_messages = set([
'stats.queue.a.length 1 1000',
'stats.queue.b.length 2 1000',
])
self.assertEquals(expected_messages, sent_messages)

@testing.stub(monitor_queues, 'harold_send_message')
def test_send_queue_alert(self):
msgs = []
monitor_queues.harold_send_message = (
lambda command, **data: msgs.append((command, data)))
monitor_queues.send_queue_alert('A', 2, 1)
self.assertEquals(
[('alert', dict(tag='A', message='A is too long (2/1)'))], msgs)

@testing.stub(time, 'time')
@testing.stub(monitor_queues, 'send_queue_alert')
def test_update_queue_status(self):
now = 1000
alerts = []
time.time = lambda: now
monitor_queues.send_queue_alert = (
lambda q, l, t: alerts.append((q, l, t)))

# Non-alerting conditions.
self.assertFalse(monitor_queues.update_queue_status('A', 1, 2))
self.assertFalse(monitor_queues.update_queue_status('A', 1, 1))

# Initial overrun condition for A should not fire alert.
self.assertFalse(monitor_queues.update_queue_status('A', 9, 1))
now += monitor_queues.ALERT_GRACE_PERIOD - 1
self.assertFalse(monitor_queues.update_queue_status('A', 9, 1))

# If overrun condition outlives the grace period, alert should raise.
now += 1
self.assertTrue(monitor_queues.update_queue_status('A', 2, 1))
self.assertEquals(('A', 2, 1), alerts[-1])

# Spammy alert should be suppressed but eventually refire.
now += monitor_queues.ALERT_RATE_LIMIT - 1
self.assertFalse(monitor_queues.update_queue_status('A', 9, 1))
now += 1
self.assertTrue(monitor_queues.update_queue_status('A', 3, 1))
self.assertEquals(('A', 3, 1), alerts[-1])

# Non-overrun condition should reset grace period.
now += monitor_queues.ALERT_RATE_LIMIT
self.assertFalse(monitor_queues.update_queue_status('A', 1, 1))
self.assertFalse(monitor_queues.update_queue_status('A', 9, 1))
now += monitor_queues.ALERT_GRACE_PERIOD - 1
self.assertFalse(monitor_queues.update_queue_status('A', 9, 1))
now += 1
self.assertTrue(monitor_queues.update_queue_status('A', 4, 1))
self.assertEquals(('A', 4, 1), alerts[-1])

@testing.stub(time, 'time')
@testing.stub(monitor_queues, 'get_queue_lengths')
@testing.stub(monitor_queues, 'update_queue_status')
@testing.stub(monitor_queues, 'send_queue_stats')
@testing.stub(monitor_queues, 'QUEUE_LIMITS')
@testing.stub(monitor_queues, 'harold_send_message')
def test_check_queues(self):
now = 1000
expected_queue_lengths = dict(A=1, B=2, C=3)
queue_lengths = {}
queue_statuses = {}

def stub_update_queue_status(n, l, t):
queue_statuses[n] = (l, t)

time.time = lambda: now
monitor_queues.get_queue_lengths = lambda: expected_queue_lengths
monitor_queues.update_queue_status = stub_update_queue_status
monitor_queues.send_queue_stats = lambda ql: queue_lengths.update(ql)
monitor_queues.QUEUE_LIMITS = dict(B=1)
# swallow up the outgoing heartbeat message
monitor_queues.harold_send_message = lambda command, **data: None

# First run should emit heartbeat.
monitor_queues.check_queues()
self.assertEquals(expected_queue_lengths, queue_lengths)
self.assertEquals((2, 1), queue_statuses['B'])
self.assertEquals(now, monitor_queues.last_heartbeat)

# Second run within heartbeat interval, no heartbeat emitted.
last_heartbeat = now
now += monitor_queues.HAROLD_HEARTBEAT_INTERVAL - 1
monitor_queues.check_queues()
self.assertEquals(last_heartbeat, monitor_queues.last_heartbeat)

# Third run when next heartbeat should be sent.
now += 1
monitor_queues.check_queues()
self.assertEquals(now, monitor_queues.last_heartbeat)


if __name__ == '__main__':
unittest.main()

0 comments on commit e911a23

Please sign in to comment.