Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 112 additions & 0 deletions workflowmonit/alertingDefs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
#!/usr/bin/env python
from __future__ import print_function
import time
import json
import smtplib
from email.mime.text import MIMEText


def onFailureRate(doc, thres=0.5):
"""
check a workflow (represented by `doc`), if its failureRate is larger than
0.5 AND running time is > 2days, then ALERT.

:param doc dict: information describe a workflow
:param thres float: threshold
:returns: (judge result, short msg if failed)
"""

res = (False, '')
if doc['status'] != 'running-open': return res
if doc['failureRate'] < thres: return res
runningOpen = [
tr for tr in doc['transitions'] if tr['Status'] == 'running-open'
]
if not runningOpen: return res

runningOpenTime = runningOpen[0]['UpdateTime']
if time.time() - runningOpenTime < 2 * 24 * 60 * 60: return res

failMsg = 'FailureRate ({}) larger than threshold({}), while running time over 2 days (started at {})'.format(
doc['failureRate'], thres, time.ctime(runningOpenTime))

return (True, failMsg)


AlertDefs = [
onFailureRate,
]


def alertWithEmail(docs, recipients):
"""
handling docs with alert emails.


:param docs list: list of documents
:param recipients list: list of recipients email addresses
"""

sender = 'toolsandint-workflowmonitalert@cern.ch'

for doc in docs:
alertResults = [ad(doc) for ad in AlertDefs]
positiveRes = filter(lambda d: d[0], alertResults)
if positiveRes:
shortAlertMsgs = [x[1] for x in positiveRes]
_contentMsg = '\n\n'.join([
'*** THIS IS A GENERATED MESSAGE, PLEASE DO NOT REPLY ***',
'Workflow: {}'.format(doc['name']),
'Short Summary:\n{}'.format('\n'.join([
'- {}'.format(s) for s in shortAlertMsgs
])),
'-'* 79,
'Full document:\n{}'.format(
json.dumps(
doc, sort_keys=True, indent=4, separators=(',', ': ')))
])

contentMsg = MIMEText(_contentMsg)
contentMsg['Subject'] = '[workflowmonit] Alert on * {} *'.format(
doc['name'])
contentMsg['From'] = sender
contentMsg['To'] = ', '.join(recipients)
s = smtplib.SMTP('localhost')
s.sendmail(sender, recipients, contentMsg.as_string())
s.quit()


def errorEmailShooter(msg, recipients):
"""
forward the error message to recipients by emails

:param msg str: error mesages
:param recipients list: list of recipients email address
"""

sender = 'toolsandint-workflowmonitalert@cern.ch'

contentMsg = MIMEText(msg)
contentMsg['Subject'] = 'Exception caught for workflowmonit'
contentMsg['From'] = sender
contentMsg['To'] = ', '.join(recipients)
s = smtplib.SMTP('localhost')
s.sendmail(sender, recipients, contentMsg.as_string())
s.quit()


def main():

import os
testdoc = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'Logs/toSendDoc_190317-033802.json')
docs = json.load(open(testdoc))
print([(d['name'], d['failureRate']) for d in docs])

alertWithEmail(docs, ['weinan.si@cern.ch', ])



if __name__ == "__main__":
main()
48 changes: 29 additions & 19 deletions workflowmonit/sendToMonit.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import yaml
from workflowmonit.stompAMQ import stompAMQ
import workflowmonit.workflowCollector as wc
import workflowmonit.alertingDefs as ad

CRED_FILE_PATH = os.path.join(os.path.dirname(
os.path.abspath(__file__)), 'credential.yml')
Expand Down Expand Up @@ -268,31 +269,40 @@ def sendDoc(cred, docs):

def main():

with open(LOGGING_CONFIG, 'r') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
recipients = wc.get_yamlconfig(CONFIG_FILE_PATH).get('alert_recipients', [])

global logger
logger = logging.getLogger('workflowmonitLogger')
try:
with open(LOGGING_CONFIG, 'r') as f:
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)

global logger
logger = logging.getLogger('workflowmonitLogger')

cred = wc.get_yamlconfig(CRED_FILE_PATH)
docs = buildDoc(CONFIG_FILE_PATH)
cred = wc.get_yamlconfig(CRED_FILE_PATH)
docs = buildDoc(CONFIG_FILE_PATH)

if not os.path.isdir(LOGDIR):
os.makedirs(LOGDIR)
# handling alerts
ad.alertWithEmail(docs, recipients)

doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format(
time.strftime('%y%m%d-%H%M%S')))
wc.save_json(docs, doc_bkp)
logger.info('Document saved at: {}.json'.format(doc_bkp))
# backup documents
if not os.path.isdir(LOGDIR):
os.makedirs(LOGDIR)

failures = sendDoc(cred=cred, docs=docs)
doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format(
time.strftime('%y%m%d-%H%M%S')))
wc.save_json(docs, doc_bkp)
logger.info('Document saved at: {}.json'.format(doc_bkp))

failedDocs_bkp = os.path.join(
LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S')))
if len(failures):
wc.save_json(failures, failedDocs_bkp)
logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp))
failures = sendDoc(cred=cred, docs=docs)

failedDocs_bkp = os.path.join(
LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S')))
if len(failures):
wc.save_json(failures, failedDocs_bkp)
logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp))
except Exception as e:
ad.errorEmailShooter(str(e), recipients)


if __name__ == "__main__":
Expand Down