From a8c8d66f2ade852e7f959acaa95a3ebf83fea3eb Mon Sep 17 00:00:00 2001 From: Weinan Si Date: Sun, 17 Mar 2019 07:07:33 +0100 Subject: [PATCH 1/2] settingup email alerts --- workflowmonit/alertingDefs.py | 93 +++++++++++++++++++++++++++++++++++ workflowmonit/sendToMonit.py | 6 +++ 2 files changed, 99 insertions(+) create mode 100644 workflowmonit/alertingDefs.py diff --git a/workflowmonit/alertingDefs.py b/workflowmonit/alertingDefs.py new file mode 100644 index 0000000..e4323d5 --- /dev/null +++ b/workflowmonit/alertingDefs.py @@ -0,0 +1,93 @@ +#!/usr/bin/env python +from __future__ import print_function +import time +import json +import smtplib +from email.mime.text import MIMEText + + +def onFailureRate(doc, thres=0.5): + """ + check a workflow (represented by `doc`), if its failureRate is larger than + 0.5 AND running time is > 2days, then ALERT. + + :param doc dict: information describe a workflow + :param thres float: threshold + :returns: (judge result, short msg if failed) + """ + + res = (False, '') + if doc['status'] != 'running-open': return res + if doc['failureRate'] < thres: return res + runningOpen = [ + tr for tr in doc['transitions'] if tr['Status'] == 'running-open' + ] + if not runningOpen: return res + + runningOpenTime = runningOpen[0]['UpdateTime'] + if time.time() - runningOpenTime < 2 * 24 * 60 * 60: return res + + failMsg = 'FailureRate ({}) larger than threshold({}), while running time over 2 days (started at {})'.format( + doc['failureRate'], thres, time.ctime(runningOpenTime)) + + return (True, failMsg) + + +AlertDefs = [ + onFailureRate, +] + + +def alertWithEmail(docs, recipients): + """ + handling docs with alert emails. + + + :param docs list: list of documents + :param recipients list: list of recipients email addresses + """ + + sender = 'toolsandint-workflowmonitalert@cern.ch' + + for doc in docs: + alertResults = [ad(doc) for ad in AlertDefs] + positiveRes = filter(lambda d: d[0], alertResults) + if positiveRes: + shortAlertMsgs = [x[1] for x in positiveRes] + _contentMsg = '\n\n'.join([ + '*** THIS IS A GENERATED MESSAGE, PLEASE DO NOT REPLY ***', + 'Workflow: {}'.format(doc['name']), + 'Short Summary:\n{}'.format('\n'.join([ + '- {}'.format(s) for s in shortAlertMsgs + ])), + '-'* 79, + 'Full document:\n{}'.format( + json.dumps( + doc, sort_keys=True, indent=4, separators=(',', ': '))) + ]) + + contentMsg = MIMEText(_contentMsg) + contentMsg['Subject'] = '[workflowmonit] Alert on * {} *'.format( + doc['name']) + contentMsg['From'] = sender + contentMsg['To'] = ', '.join(recipients) + s = smtplib.SMTP('localhost') + s.sendmail(sender, recipients, contentMsg.as_string()) + s.quit() + + +def main(): + + import os + testdoc = os.path.join( + os.path.dirname(os.path.abspath(__file__)), + 'Logs/toSendDoc_190317-033802.json') + docs = json.load(open(testdoc)) + print([(d['name'], d['failureRate']) for d in docs]) + + alertWithEmail(docs, ['weinan.si@cern.ch', ]) + + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/workflowmonit/sendToMonit.py b/workflowmonit/sendToMonit.py index 56504db..d5d5617 100644 --- a/workflowmonit/sendToMonit.py +++ b/workflowmonit/sendToMonit.py @@ -13,6 +13,7 @@ import yaml from workflowmonit.stompAMQ import stompAMQ import workflowmonit.workflowCollector as wc +import workflowmonit.alertingDefs as ad CRED_FILE_PATH = os.path.join(os.path.dirname( os.path.abspath(__file__)), 'credential.yml') @@ -278,6 +279,11 @@ def main(): cred = wc.get_yamlconfig(CRED_FILE_PATH) docs = buildDoc(CONFIG_FILE_PATH) + # handling alerts + recipients = wc.get_yamlconfig(CONFIG_FILE_PATH).get('alert_recipients', []) + ad.alertWithEmail(docs, recipients) + + # backup documents if not os.path.isdir(LOGDIR): os.makedirs(LOGDIR) From d3d84471c0b5972b16154ef5a6da7e8a7bb84ae6 Mon Sep 17 00:00:00 2001 From: Weinan Si Date: Thu, 16 May 2019 06:04:04 +0200 Subject: [PATCH 2/2] sending emails when catching error --- workflowmonit/alertingDefs.py | 21 ++++++++++++++- workflowmonit/sendToMonit.py | 50 +++++++++++++++++++---------------- 2 files changed, 47 insertions(+), 24 deletions(-) diff --git a/workflowmonit/alertingDefs.py b/workflowmonit/alertingDefs.py index e4323d5..bf05a1a 100644 --- a/workflowmonit/alertingDefs.py +++ b/workflowmonit/alertingDefs.py @@ -76,6 +76,25 @@ def alertWithEmail(docs, recipients): s.quit() +def errorEmailShooter(msg, recipients): + """ + forward the error message to recipients by emails + + :param msg str: error mesages + :param recipients list: list of recipients email address + """ + + sender = 'toolsandint-workflowmonitalert@cern.ch' + + contentMsg = MIMEText(msg) + contentMsg['Subject'] = 'Exception caught for workflowmonit' + contentMsg['From'] = sender + contentMsg['To'] = ', '.join(recipients) + s = smtplib.SMTP('localhost') + s.sendmail(sender, recipients, contentMsg.as_string()) + s.quit() + + def main(): import os @@ -90,4 +109,4 @@ def main(): if __name__ == "__main__": - main() \ No newline at end of file + main() diff --git a/workflowmonit/sendToMonit.py b/workflowmonit/sendToMonit.py index d5d5617..cf8df76 100644 --- a/workflowmonit/sendToMonit.py +++ b/workflowmonit/sendToMonit.py @@ -269,36 +269,40 @@ def sendDoc(cred, docs): def main(): - with open(LOGGING_CONFIG, 'r') as f: - config = yaml.safe_load(f.read()) - logging.config.dictConfig(config) + recipients = wc.get_yamlconfig(CONFIG_FILE_PATH).get('alert_recipients', []) - global logger - logger = logging.getLogger('workflowmonitLogger') + try: + with open(LOGGING_CONFIG, 'r') as f: + config = yaml.safe_load(f.read()) + logging.config.dictConfig(config) - cred = wc.get_yamlconfig(CRED_FILE_PATH) - docs = buildDoc(CONFIG_FILE_PATH) + global logger + logger = logging.getLogger('workflowmonitLogger') - # handling alerts - recipients = wc.get_yamlconfig(CONFIG_FILE_PATH).get('alert_recipients', []) - ad.alertWithEmail(docs, recipients) + cred = wc.get_yamlconfig(CRED_FILE_PATH) + docs = buildDoc(CONFIG_FILE_PATH) - # backup documents - if not os.path.isdir(LOGDIR): - os.makedirs(LOGDIR) + # handling alerts + ad.alertWithEmail(docs, recipients) - doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format( - time.strftime('%y%m%d-%H%M%S'))) - wc.save_json(docs, doc_bkp) - logger.info('Document saved at: {}.json'.format(doc_bkp)) + # backup documents + if not os.path.isdir(LOGDIR): + os.makedirs(LOGDIR) - failures = sendDoc(cred=cred, docs=docs) + doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format( + time.strftime('%y%m%d-%H%M%S'))) + wc.save_json(docs, doc_bkp) + logger.info('Document saved at: {}.json'.format(doc_bkp)) - failedDocs_bkp = os.path.join( - LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S'))) - if len(failures): - wc.save_json(failures, failedDocs_bkp) - logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp)) + failures = sendDoc(cred=cred, docs=docs) + + failedDocs_bkp = os.path.join( + LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S'))) + if len(failures): + wc.save_json(failures, failedDocs_bkp) + logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp)) + except Exception as e: + ad.errorEmailShooter(str(e), recipients) if __name__ == "__main__":