Permalink
Cannot retrieve contributors at this time
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
executable file
299 lines (247 sloc)
11.5 KB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import pathlib as pl, dataclasses as dc | |
import os, sys, signal, time, unicodedata, json, logging | |
# https://github.com/prometheus/client_python | |
import prometheus_client as pc | |
# https://github.com/etingof/pysnmp | |
from pysnmp import hlapi as snmp | |
import pysnmp.error as snmp_err_core | |
import pysnmp.proto.errind as snmp_err | |
class LogMessage: | |
def __init__(self, fmt, a, k): self.fmt, self.a, self.k = fmt, a, k | |
def __str__(self): return self.fmt.format(*self.a, **self.k) if self.a or self.k else self.fmt | |
class LogStyleAdapter(logging.LoggerAdapter): | |
def __init__(self, logger, extra=None): super().__init__(logger, extra or dict()) | |
def log(self, level, msg, *args, **kws): | |
if not self.isEnabledFor(level): return | |
log_kws = {} if 'exc_info' not in kws else dict(exc_info=kws.pop('exc_info')) | |
msg, kws = self.process(msg, kws) | |
self.logger._log(level, LogMessage(msg, args, kws), (), **log_kws) | |
get_logger = lambda name: LogStyleAdapter(logging.getLogger(f'prms-snmp.{name}')) | |
err_fmt = lambda err: '[{}] {}'.format(err.__class__.__name__, err) | |
def str_part(s, sep, default=None): | |
'Examples: str_part("user@host", "<@", "root"), str_part("host:port", ":>")' | |
c = sep.strip('<>') | |
if sep.strip(c) == '<': return (default, s) if c not in s else s.split(c, 1) | |
else: return (s, default) if c not in s else s.rsplit(c, 1) | |
str_norm = lambda v: unicodedata.normalize('NFKC', v.strip()).casefold() | |
### SNMP bits | |
class SNMPError(Exception): pass | |
class SNMPNX(SNMPError): pass | |
class SNMPTimeout(SNMPError): pass | |
@dc.dataclass | |
class SNMPData: | |
uptime:float; query_time:float; iface_stats:dict | |
class SNMP: | |
def __init__( self, | |
host, port, user, sha_auth_pw, iface_set=None, iface_n_max=10 ): | |
self.snmp_engine = snmp.SnmpEngine() | |
self.snmp_auth = snmp.UsmUserData( | |
user, sha_auth_pw, authProtocol=snmp.usmHMACSHAAuthProtocol ) | |
self.snmp_ctx = snmp.ContextData() | |
self.log = get_logger('snmp') | |
# Different retries to make sure reboot doesn't happen in-between queries | |
ep = host, port | |
self.snmp_ep_init = snmp.UdpTransportTarget(ep, retries=5, timeout=30.0) | |
self.snmp_ep_value = snmp.UdpTransportTarget(ep, retries=1, timeout=15.0) | |
if iface_set and len(iface_set) > iface_n_max: raise ValueError(iface_n_max) | |
self.iface_set = iface_set | |
# NETSERVER-MIB::hrSystemUptime | |
self.oid_uptime = snmp.ObjectType(snmp.ObjectIdentity('.1.3.6.1.2.1.25.1.1.0')) | |
iface_ns = list(range(1, iface_n_max+1)) | |
self.oid_iface_names, self.oid_iface_ingress, self.oid_iface_egress = ( | |
list(snmp.ObjectType(snmp.ObjectIdentity( # IF-MIB::ifXEntry | |
f'.1.3.6.1.2.1.31.1.1.1.{k}.{n}' )) for n in iface_ns) for k in [1, 6, 10] ) | |
def _query(self, oid, conv=None, init=False): | |
snmp_ep = self.snmp_ep_init if init else self.snmp_ep_value | |
cmd = snmp.getCmd( self.snmp_engine, | |
self.snmp_auth, snmp_ep, self.snmp_ctx, oid ) | |
try: err_indication, err_st, err_idx, var_list = next(cmd) | |
except snmp_err_core.PySnmpError as err: | |
raise SNMPError('SNMP carrier/conn error') # msg is too long here | |
if err_indication: | |
if isinstance(err_indication, snmp_err.RequestTimedOut): | |
raise SNMPTimeout(f'{snmp_ep.retries=} {snmp_ep.timeout=}') | |
raise SNMPError(err_fmt(err)) | |
if err_st: | |
raise SNMPError( f'{err_st.prettyPrint()}' | |
f' [{var_list[int(err_idx)-1] if err_idx else "?"}]' ) | |
if len(var_list) != 1: raise SNMPError(f'Unexpected result structure: {var_list}') | |
(k, v), = var_list | |
# print(k.prettyPrint(), type(v), v.prettyPrint()) | |
if isinstance(v, snmp.NoSuchObject): raise SNMPNX(oid) | |
return conv(v) if conv else v | |
def query_iface_data(self, iface_set=None): | |
if iface_set is None: iface_set = self.iface_set | |
ts0 = time.monotonic() | |
try: uptime = self._query(self.oid_uptime, int, init=True) / 100 | |
except SNMPError as err: return self.log.debug('Uptime query error: {}', err) | |
iface_stats, iface_names = dict(), dict((str_norm(iface), iface) for iface in iface_set) | |
for n, oid_iface in enumerate(self.oid_iface_names): | |
if iface_set is not None and not iface_names: break # found all ifaces | |
try: iface_name = self._query(oid_iface, str) | |
except SNMPNX: break | |
except SNMPError as err: return self.log.debug('Iface query error: {}', err) | |
if iface_set is not None: | |
iface_name = iface_names.pop(str_norm(iface_name), None) | |
if not iface_name: continue | |
try: | |
bs_in = self._query(self.oid_iface_ingress[n], int) | |
bs_out = self._query(self.oid_iface_egress[n], int) | |
except SNMPNX: continue # iface is gone | |
except SNMPError as err: return self.log.debug('Counter query error: {}', err) | |
iface_stats[iface_name] = bs_in, bs_out | |
query_time = time.monotonic() - ts0 | |
return SNMPData(uptime, query_time, iface_stats) | |
### Poller and prometheus collector loop | |
class SNMPPoller: | |
def __init__(self, snmp, interval, metric_prefix, spool_file, spool_tmp): | |
self.snmp, self.interval, self.prefix = snmp, interval, metric_prefix | |
self.spool_file, self.spool_tmp = spool_file, spool_tmp | |
self.log = get_logger('poller') | |
def snmp_spool_parse(self): | |
try: | |
data = json.loads(self.spool_file.read_text()) | |
data = SNMPData(**data) | |
except OSError as err: return | |
except Exception as err: | |
self.log.error( 'Unrecognized spool-file' | |
' structure, discarding [{}]: {}', spool_file, err_fmt(err) ) | |
else: | |
self.log.debug('Parsed spool data [{}]: {}', self.spool_file, data) | |
return data | |
def snmp_spool_dump(self, data): | |
if isinstance(data, SNMPData): data = dc.asdict(data) | |
with self.spool_tmp.open('w') as tmp: | |
tmp.write(json.dumps(data)) | |
tmp.flush() | |
os.fdatasync(tmp) | |
self.spool_tmp.rename(self.spool_file) | |
self.log.debug('Updated spool file [{}]', self.spool_file) | |
def run_query_loop(self): | |
# Traffic counter resets are avoided here, if possible, | |
# by restoring old value from spool-file and tracking all increments from there. | |
c_query_count = pc.Counter( | |
f'{self.prefix}snmp_query_count', | |
'Total number of SNMP queries made', ['result'] ) | |
c_query_time = pc.Histogram( | |
f'{self.prefix}snmp_query_time_seconds', | |
'Total SNMP querying time, in seconds' ) | |
c_iface_traffic = pc.Counter( | |
f'{self.prefix}iface_traffic_bytes', | |
'Total traffic through network interface', ['iface', 'dir'] ) | |
data = self.snmp_spool_parse() | |
if data: | |
stats_uptime = data.uptime | |
stats_last, stats_total = data.iface_stats, dict() | |
for iface, (bs_in, bs_out) in data.iface_stats.items(): | |
self.log.debug('Init - iface stats [{}]: in={:,} out={:,}', iface, bs_in, bs_out) | |
stats_total[iface] = [bs_in, bs_out] | |
c_iface_traffic.labels(iface, 'in').inc(bs_in) | |
c_iface_traffic.labels(iface, 'out').inc(bs_out) | |
else: | |
self.log.debug('Init - blank iface stats') | |
stats_uptime, stats_last, stats_total = 0, dict(), dict() | |
while True: | |
self.log.debug('Running SNMP query...') | |
data = self.snmp.query_iface_data() | |
c_query_count.labels('total').inc() | |
if not data: | |
self.log.debug(' - query result: fail') | |
c_query_count.labels('fail').inc() | |
else: | |
self.log.debug(' - query result: success') | |
c_query_count.labels('success').inc() | |
self.log.debug(' - query time: {:,.1f}', data.query_time) | |
c_query_time.observe(data.query_time) | |
if data.uptime < stats_uptime: | |
self.log.debug( | |
' - query uptime: {:,.0f} -> {:,.0f} [reboot detected]', | |
stats_uptime, data.uptime ) | |
stats_last.clear() | |
else: self.log.debug(' - query uptime: {:,.0f}', data.uptime) | |
stats_uptime = data.uptime | |
for iface, bs_vals in data.iface_stats.items(): | |
bs_in_old, bs_out_old = stats_last.get(iface, (0, 0)) | |
bs_in, bs_out = stats_last[iface] = bs_vals | |
inc_in, inc_out = max(0, bs_in - bs_in_old), max(0, bs_out - bs_out_old) | |
self.log.debug( | |
' - iface traffic [{}]: in={:,} [+{:,}] out={:,} [+{:,}]', | |
iface, bs_in, inc_in, bs_out, inc_out ) | |
for n, (k, inc) in enumerate([('in', inc_in), ('out', inc_out)]): | |
stats_total[iface][n] += inc | |
c_iface_traffic.labels(iface, k).inc(inc) | |
data.iface_stats = stats_total | |
self.snmp_spool_dump(data) | |
self.log.debug('Query loop delay: {:,.1f}', self.interval) | |
time.sleep(self.interval) | |
### CLI and HTTP server | |
def main(args=None): | |
import argparse, textwrap | |
dd = lambda text: (textwrap.dedent(text).strip('\n') + '\n').replace('\t', ' ') | |
fill = lambda s,w=90,ind='',ind_next=' ',**k: textwrap.fill( | |
s, w, initial_indent=ind, subsequent_indent=ind if ind_next is None else ind_next, **k ) | |
parser = argparse.ArgumentParser( | |
formatter_class=argparse.RawTextHelpFormatter, | |
description='Run prometheus exporter for SNMP network interface bandwidth totals.') | |
parser.add_argument('snmp_addr_port', help='SNMP host[:port] to query.') | |
parser.add_argument('snmp_auth_file', | |
help='File with "user:password" for SNMPv3 SHA1-HMAC authentication.') | |
parser.add_argument('spool_file', nargs='?', | |
help=dd(''' | |
JSON file to store/track remote uptime and last SNMP values in. | |
Must be in a writable directory to replace via create/rename.''')) | |
parser.add_argument('-i', '--iface', action='append', metavar='iface', | |
help=dd(''' | |
Specific interface name(s) to query/export | |
(default - all of them). Can be used multiple times.''')) | |
parser.add_argument('-m', '--metric-prefix', metavar='prefix', default='', | |
help=dd(''' | |
Optional prefix for exported prometheus metrics. | |
For example, with "mikrotik_" prefix, interface traffic counter | |
value will be "mikrotik_iface_traffic_bytes", while snmp stats | |
will look like "mikrotik_snmp_query_count" and such.''')) | |
parser.add_argument('-p', '--print', | |
action='store_true', help='Query and print data once instead of starting the daemon.') | |
parser.add_argument('-b', '--bind', | |
metavar='(host:)port', default='9101', | |
help='[host:]port to bind data exporter interface to.') | |
parser.add_argument('-t', '--snmp-poll-interval', | |
type=float, metavar='seconds', default=13*60, | |
help='Interval between polling counter values via SNMP, in seconds. Default: %(default)s') | |
parser.add_argument('--debug', action='store_true', help='Verbose operation mode.') | |
opts = parser.parse_args(sys.argv[1:] if args is None else args) | |
logging.basicConfig(level=logging.DEBUG if opts.debug else logging.WARNING) | |
log = get_logger('main') | |
snmp_user, snmp_pw = pl.Path(opts.snmp_auth_file).read_text().splitlines()[0].split(':', 1) | |
snmp_host, snmp_port = str_part(opts.snmp_addr_port, ':>') | |
iface_list = opts.iface | |
if iface_list: iface_list = list(iface.lower() for iface in iface_list) | |
snmp = SNMP(snmp_host, snmp_port or 161, snmp_user, snmp_pw, iface_set=opts.iface) | |
if opts.print: | |
data = snmp.query_iface_data() | |
if not data: return print('SNMP query failed') | |
print( 'Interface byte counters' | |
f' (uptime={data.uptime:,.0f}s query-time={data.query_time:.1f}s):' ) | |
for iface, (bs_in, bs_out) in data.iface_stats.items(): | |
print(f' {iface}: in={bs_in:,} out={bs_out:,}') | |
return | |
for sig in 'int term'.upper().split(): | |
signal.signal(getattr(signal, f'SIG{sig}'), lambda sig,frm: sys.exit(0)) | |
# Make sure spool paths are ok | |
if not opts.spool_file: | |
parser.error('Spool file argument is required in daemon mode') | |
spool_tmp = pl.Path(opts.spool_file + '.tmp') | |
spool_tmp.write_bytes(b'test') | |
spool_tmp.unlink() | |
spool_file = pl.Path(opts.spool_file) | |
try: data = json.loads(spool_file.read_text()) | |
except: spool_file.unlink(missing_ok=True) # can be corrupted after restart | |
host, port = str_part(opts.bind, '<:') | |
pc.start_http_server(int(port), host or '') | |
log.debug('Starting poller loop...') | |
SNMPPoller( snmp, | |
opts.snmp_poll_interval, opts.metric_prefix, | |
spool_file, spool_tmp ).run_query_loop() | |
log.debug('Finished') | |
if __name__ == '__main__': sys.exit(main()) |