Skip to content
Permalink
Browse files

Tuneup sqs_queue_status cron script

  • Loading branch information...
pwnbus committed Jul 3, 2019
1 parent 8a8562f commit 60487d65b3e65cfa8f2309850176604294fe6061
Showing with 25 additions and 47 deletions.
  1. +25 −47 cron/sqs_queue_status.py
@@ -37,28 +37,35 @@ def getQueueSizes():
logger.debug('starting')
logger.debug(options)
es = ElasticsearchClient(options.esservers)
sqslist = {}
sqslist['queue_stats'] = {}
qcount = len(options.taskexchange)
qcounter = qcount - 1

client = boto3.client(
'sqs',
sqs_client = boto3.client(
"sqs",
region_name=options.region,
aws_access_key_id=options.accesskey,
aws_secret_access_key=options.secretkey
)

while qcounter >= 0:
for exchange in options.taskexchange:
logger.debug('Looking for sqs queue stats in queue' + exchange)
response = client.get_queue_attributes(
QueueUrl=client.get_queue_url(QueueName=exchange)['QueueUrl'],
AttributeNames=['All']
)
sqslist['queue_stats'][qcounter] = response['Attributes']
sqslist['queue_stats'][qcounter]['name'] = exchange
qcounter -= 1
queues_stats = {
'queues': [],
'total_feeds': len(options.taskexchange),
'total_messages_ready': 0,
'username': 'mozdef'
}
for queue_name in options.taskexchange:
logger.debug('Looking for sqs queue stats in queue' + queue_name)
queue_url = sqs_client.get_queue_url(QueueName=queue_name)['QueueUrl']
queue_attributes = sqs_client.get_queue_attributes(QueueUrl=queue_url, AttributeNames=['All'])['Attributes']
queue_stats = {
'queue': queue_name,
}
if 'ApproximateNumberOfMessages' in queue_attributes:
queue_stats['messages_ready'] = int(queue_attributes['ApproximateNumberOfMessages'])
queues_stats['total_messages_ready'] += queue_stats['messages_ready']
if 'ApproximateNumberOfMessagesNotVisible' in queue_attributes:
queue_stats['messages_inflight'] = int(queue_attributes['ApproximateNumberOfMessagesNotVisible'])
if 'ApproximateNumberOfMessagesDelayed' in queue_attributes:
queue_stats['messages_delayed'] = int(queue_attributes['ApproximateNumberOfMessagesDelayed'])

queues_stats['queues'].append(queue_stats)

# setup a log entry for health/status.
sqsid = '{0}-{1}'.format(options.account, options.region)
@@ -72,35 +79,8 @@ def getQueueSizes():
category='mozdef',
source='aws-sqs',
tags=[],
details=[])
healthlog['details'] = dict(username='mozdef')
healthlog['details']['queues']= list()
healthlog['details']['total_messages_ready'] = 0
healthlog['details']['total_feeds'] = qcount
details=queues_stats)
healthlog['tags'] = ['mozdef', 'status', 'sqs']
ready = 0
qcounter = qcount - 1
for q in sqslist['queue_stats'].keys():
queuelist = sqslist['queue_stats'][qcounter]
if 'ApproximateNumberOfMessages' in queuelist:
ready1 = int(queuelist['ApproximateNumberOfMessages'])
ready = ready1 + ready
healthlog['details']['total_messages_ready'] = ready
if 'ApproximateNumberOfMessages' in queuelist:
messages = int(queuelist['ApproximateNumberOfMessages'])
if 'ApproximateNumberOfMessagesNotVisible' in queuelist:
inflight = int(queuelist['ApproximateNumberOfMessagesNotVisible'])
if 'ApproximateNumberOfMessagesDelayed' in queuelist:
delayed = int(queuelist['ApproximateNumberOfMessagesDelayed'])
if 'name' in queuelist:
name = queuelist['name']
queueinfo=dict(
queue=name,
messages_delayed=delayed,
messages_ready=messages,
messages_inflight=inflight)
healthlog['details']['queues'].append(queueinfo)
qcounter -= 1
healthlog['type'] = 'mozdefhealth'
# post to elasticsearch servers directly without going through
# message queues in case there is an availability issue
@@ -109,8 +89,6 @@ def getQueueSizes():
# for use when querying for the latest sqs status
healthlog['tags'] = ['mozdef', 'status', 'sqs-latest']
es.save_event(index=options.index, doc_id=getDocID(sqsid), body=json.dumps(healthlog))
# except Exception as e:
# logger.error("Exception %r when gathering health and status " % e)


def main():

0 comments on commit 60487d6

Please sign in to comment.
You can’t perform that action at this time.