Permalink
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
executable file 227 lines (203 sloc) 8.46 KB
#!/usr/bin/env python3
"""
Monitor a mailserver checking sending and receiving. This makes it
neccessary to use two independent mailservers which exchange emails in
both directions. This script logs in to both of the hosts with SMTP and
sends mails to the other host. On the next run, it will login into both
hosts via IMAP, checks whether the mails arrived and delete them, while
sending out the next pair of mails.
This will of course cause a CRITICAL on first run! There is currently no
mechanism implemented to prevent this.
(c) 2014 Raphael Michel <michel@rami.io>
Published under MIT license
"""
import argparse
import socket
import time
import email
from smtplib import SMTP
from imaplib import IMAP4
import logging
import nagiosplugin
from nagiosplugin.state import Ok, Warn, Critical
_log = logging.getLogger('nagiosplugin')
class BooleanContext(nagiosplugin.Context):
"""This context only cares about boolean values.
You can specify using the ``critical``-parameter whether
a False result should cause a warning or a critical error.
"""
def __init__(self, name, critical=True,
fmt_metric='{name} is {value}',
result_cls=nagiosplugin.result.Result):
self.critical = critical
super().__init__(name, fmt_metric, result_cls)
def evaluate(self, metric, resource):
if not metric.value and self.critical:
return self.result_cls(Critical, "NOT OK", metric)
elif not metric.value and not self.critical:
return self.result_cls(Warn, "NOT OK", metric)
else:
return self.result_cls(Ok, "OK", metric)
class RecvSummary(nagiosplugin.Summary):
"""Provide a brief summary of the check results."""
def verbose(self, results):
super().verbose(results)
if results['recv1'].metric.value and results['recv2'].metric.value and results['send1'].metric.value and results['send2'].metric.value:
return "Both mails found, both new mails sent"
if not results['recv1'].metric.value:
return "Mail on host 1 not found"
if not results['recv2'].metric.value:
return "Mail on host 2 not found"
if not results['send1'].metric.value:
return "Sending failed on host 1"
if not results['send2'].metric.value:
return "Sending failed on host 1"
class Twowaymail(nagiosplugin.Resource):
"""
This is the actual plugin sending out mail probes and checking
for their arrival
"""
def __init__(self, conn1, conn2):
"""
The two parameters are expected to be tuples with the connection
information for the two mailservers in the format
(smtphost, imaphost, mailaddress, username, password, usetls)
"""
self.conn1 = conn1
self.conn2 = conn2
def send(self, conn, to):
"""
This sends a probe email to ``to`` using the mailserver
connection specified in the tuple ``conn``
"""
host = conn[0].split(":")[0]
try:
port = conn[0].split(":")[1]
except IndexError:
port = 587
smtp = SMTP(host, port)
if conn[5]:
smtp.starttls()
smtp.ehlo()
smtp.login(conn[3], conn[4])
smtp.sendmail(conn[2], to, """From: {fromaddr}
To: {toaddr}
Subject: Monitoring probe from {monitorhost}
X-TWM-Monitoring-Host: {monitorhost}
X-TWM-Sender-Host: {fromhost}
X-TWM-Unixtime: {unixtime}
KTHXBYE
""".format(
fromaddr=conn[2],
toaddr=to,
monitorhost=socket.gethostname(),
fromhost=conn[0],
unixtime=time.time()
)
)
smtp.quit()
def checkrecv(self, conn, senderhost):
"""
This logs in to the imap server specified by the tuple ``conn``
and returns true, if it finds a probe mail from ``senderhost`` which
is less than half an hour old. If found, it deletes the mail.
"""
host = conn[1].split(":")[0]
try:
port = conn[1].split(":")[1]
except IndexError:
port = 143
imap = IMAP4(host, port)
if conn[5]:
imap.starttls()
imap.login(conn[3], conn[4])
imap.select()
typ, data = imap.search(None, 'ALL')
found = False
for num in data[0].split():
typ, data = imap.fetch(num, '(RFC822)')
msg = email.message_from_string(data[0][1].decode("utf-8"))
if not 'x-twm-sender-host' in msg or msg['x-twm-sender-host'] != senderhost:
continue
if not 'x-twm-unixtime' in msg or time.time() - float(msg['x-twm-unixtime'].strip()) > 1800:
imap.store(num, '+FLAGS', '\\Deleted')
continue
found = True
imap.store(num, '+FLAGS', '\\Deleted')
imap.expunge()
imap.shutdown()
return found
def probe(self):
"""
Checks if mails are present and sends out the new mails.
"""
recv1 = recv2 = send1 = send2 = False
# Check for last mails
try:
recv1 = self.checkrecv(self.conn1, self.conn2[0])
except Exception as e:
_log.error('Error fetching mail from host 1')
_log.error(str(e))
try:
recv2 = self.checkrecv(self.conn2, self.conn1[0])
except Exception as e:
_log.error('Error fetching mail from host 2')
_log.error(str(e))
# Send out mail probes
try:
send1 = self.send(self.conn1, self.conn2[2])
except Exception as e:
_log.error('Error sending mail on host 1')
_log.error(str(e))
try:
send2 = self.send(self.conn2, self.conn1[2])
except Exception as e:
_log.error('Error sending mail on host 1')
_log.error(str(e))
return [
nagiosplugin.Metric('recv1', recv1, context='recv1'),
nagiosplugin.Metric('recv2', recv2, context='recv2'),
nagiosplugin.Metric('send1', recv1, context='send1'),
nagiosplugin.Metric('send2', recv2, context='send2'),
]
def main():
argp = argparse.ArgumentParser(description=__doc__)
argp.add_argument('-s1', '--smtp1', metavar='HOST', required=True,
help='SMTP Hostname for first host. You can specify a port using hostname:port syntax, default ist 587')
argp.add_argument('-S1', '--nossl1', action='store_true', default=False,
help='Do not use STARTLS for first host')
argp.add_argument('-i1', '--imap1', metavar='HOST', required=True,
help='IMAP Hostname for first host. You can specify a port using hostname:port syntax, default ist 143')
argp.add_argument('-u1', '--user1', metavar='USER', required=True,
help='Username on first host')
argp.add_argument('-p1', '--pass1', metavar='PASS', required=True,
help='Password on first host')
argp.add_argument('-a1', '--addr1', metavar='MAIL', required=True,
help='Mail address on first host')
argp.add_argument('-s2', '--smtp2', metavar='HOST', required=True,
help='SMTP Hostname on second host. You can specify a port using hostname:port syntax, default ist 587')
argp.add_argument('-S2', '--nossl2', action='store_true', default=False,
help='Do not use STARTTLS for first host')
argp.add_argument('-i2', '--imap2', metavar='HOST', required=True,
help='IMAP Hostname on second host. You can specify a port using hostname:port syntax, default ist 143')
argp.add_argument('-u2', '--user2', metavar='USER', required=True,
help='Username on second host')
argp.add_argument('-p2', '--pass2', metavar='PASS', required=True,
help='Password on second host')
argp.add_argument('-a2', '--addr2', metavar='MAIL', required=True,
help='Mail address on second host')
args = argp.parse_args()
check = nagiosplugin.Check(
Twowaymail(
(args.smtp1, args.imap1, args.addr1, args.user1, args.pass1, not args.nossl1),
(args.smtp2, args.imap2, args.addr2, args.user2, args.pass2, not args.nossl2)
),
BooleanContext('recv1'),
BooleanContext('recv2'),
BooleanContext('send1'),
BooleanContext('send2'),
RecvSummary()
)
check.main()
if __name__ == '__main__':
main()