-
Notifications
You must be signed in to change notification settings - Fork 0
/
check_traffic.py
executable file
·391 lines (361 loc) · 14 KB
/
check_traffic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
#!/usr/bin/env python3
"""Nagios-like plugin to check traffic per interface"""
import argparse
import hashlib
import json
import logging
import pathlib
import pickle
import re
import subprocess
import time
import nagiosplugin # type: ignore
logger = logging.getLogger("nagiosplugin")
CHECK_NAME = pathlib.Path(__file__).name
STATE_FILE_PATH = "/tmp"
def prettify_size(size, multiplier):
binary_str = "i" if multiplier == 1024 else ""
for unit in ("", f"K{binary_str}", f"M{binary_str}", f"G{binary_str}"):
if abs(size) < multiplier:
return f"{size:.1f}{unit}"
size /= multiplier
return f"{size:.1f}T{binary_str}"
def human_size(string):
match = re.search(r"\A(\d+)([KMGT]?)\Z", string, flags=re.I)
if not match:
raise argparse.ArgumentTypeError(
f"invalid argument: {string}, must be an integer, "
"optionally followed by K, M, G or T (case-insensitive)"
)
value, unit = match.groups()
units = ("", "K", "M", "G", "T")
power = units.index(unit.upper())
return int(value), power
def run_command(command):
try:
proc = subprocess.run(
command,
check=True,
text=True,
capture_output=True,
)
except subprocess.CalledProcessError as exc:
raise nagiosplugin.CheckError(
f"command {command} exited with status {exc.returncode}: {exc.stderr!r}"
)
logger.debug("Output from %s: %s", " ".join(command), proc.stdout)
return proc
class Traffic(nagiosplugin.Resource):
def __init__(self, args, args_hash):
self.args = args
self.args_hash = args_hash
self.old_state = {}
self.current_state = {"statistics": {}}
def _get_interfaces(self):
netns_info = []
if self.args.include_netns:
command_output = run_command(["ip", "-json", "netns", "list"]).stdout
# If there are no namespaces, no JSON is returned
if command_output:
netns_info = json.loads(command_output)
interfaces = []
for netns in [None] + netns_info:
if netns is None:
command = ["ip"]
netns_name = None
else:
command = ["sudo", "-n", "ip"]
netns_name = netns["name"]
command += ["-netns", netns_name]
command += ["-details", "-statistics", "-json", "link", "show"]
interfaces_by_netns = json.loads(run_command(command).stdout)
for interface in interfaces_by_netns:
interface["netns_name"] = netns_name
if netns_name is None:
interface["pretty_ifname"] = interface["ifname"]
else:
interface["pretty_ifname"] = f"{netns_name}/{interface['ifname']}"
interfaces.extend(interfaces_by_netns)
execution_time = time.time()
return execution_time, interfaces
def _include_interface(self, interface):
interface_name = interface["ifname"]
interface_pretty_name = interface["pretty_ifname"]
interface_type = interface["link_type"]
if "linkinfo" in interface and "info_kind" in interface["linkinfo"]:
interface_type = interface["linkinfo"]["info_kind"]
interface["computed_type"] = interface_type
# Exclude interfaces which are down
if not self.args.down and interface["operstate"] == "DOWN":
logger.info(
"[-] Skipping interface %s (operstate DOWN)",
interface_pretty_name,
)
return False
# Exclusions first
if interface_type in self.args.exclude_type:
logger.info(
"[-] Skipping interface %s (type %s matches %s)",
interface_pretty_name,
interface_type,
self.args.exclude_type,
)
return False
if self.args.exclude_name and re.search(self.args.exclude_name, interface_name):
logger.info(
"[-] Skipping interface %s (name matches %s)",
interface_pretty_name,
self.args.exclude_name,
)
return False
# Then inclusions, if any
inclusion_tests = []
if self.args.type:
inclusion_tests.append(("type", self.args.type, interface_type in self.args.type))
if self.args.name:
inclusion_tests.append(
("name", self.args.name, re.search(self.args.name, interface_name))
)
if inclusion_tests:
messages = []
tests_match = True
for test_type, test_str, test_result in inclusion_tests:
verb = "matches" if test_result else "does not match"
additional_info = f" {interface_type}" if test_type == "type" else ""
messages.append(f"{test_type}{additional_info} {verb} {test_str}")
# We must match all conditions
tests_match = tests_match and test_result
logger.info(
"%s interface %s (%s)",
"[+] Including" if tests_match else "[-] Skipping",
interface_pretty_name,
", ".join(messages),
)
return tests_match
# If there are no inclusions, implicitly include the interface
logger.info(
"[+] Including interface %s of type %s (no inclusion filter specified)",
interface_pretty_name,
interface_type,
)
return True
def _load_cookie(self, state_file):
with nagiosplugin.Cookie(str(state_file)) as cookie:
self.old_state = cookie
if self.old_state:
logger.debug("Loaded old metrics from %s", state_file)
else:
yield nagiosplugin.Metric(
name="Warn",
value={"message": f"no data in state file {state_file}, first run?"},
context="metadata",
)
@classmethod
def _save_cookie(cls, state_file, state):
with nagiosplugin.Cookie(str(state_file)) as cookie:
# We can't just copy the dict to the cookie
for key, value in state.items():
cookie[key] = value
def _probe_interface(self, interface):
interface_name = interface["pretty_ifname"]
self.current_state["statistics"][interface_name] = {}
for direction in ("rx", "tx"):
self.current_state["statistics"][interface_name][direction] = interface["stats64"][
direction
]["bytes"]
# Two cases where we can't compute the bandwidth:
# 1. no old data, e.g. first run
if not self.old_state:
return
# 2. new interface
if self.old_state and interface_name not in self.old_state["statistics"]:
yield nagiosplugin.Metric(
name="Warn",
value={"message": f"no data in state file for {interface_name}, new interface?"},
context="metadata",
)
return
time_delta = self.current_state["execution_time"] - self.old_state["execution_time"]
if self.args.bytes:
unit = "B"
multiplier = 1
else:
unit = "b"
multiplier = 8
for direction, current_bytes in self.current_state["statistics"][interface_name].items():
bandwidth = (
multiplier
* (current_bytes - self.old_state["statistics"][interface_name][direction])
/ time_delta
)
if bandwidth < 0:
yield nagiosplugin.Metric(
name="Warn",
value={
"message": f"Counter for {interface_name}/{direction} is decreasing,"
" this could be caused by a reboot"
},
context="metadata",
)
return
yield nagiosplugin.Metric(
name=f"{interface_name}_{direction}", value=bandwidth, uom=unit, context=direction
)
def probe(self):
state_file = pathlib.Path(STATE_FILE_PATH) / f".{CHECK_NAME}_{self.args_hash}"
yield from self._load_cookie(state_file)
execution_time, interfaces = self._get_interfaces()
if not interfaces:
raise nagiosplugin.CheckError("No interfaces found")
filtered_interfaces = [e for e in interfaces if self._include_interface(e)]
logger.info(
"Included interfaces: %s", ", ".join(e["pretty_ifname"] for e in filtered_interfaces)
)
if not filtered_interfaces:
raise nagiosplugin.CheckError("No matching interfaces found after applying filters")
self.current_state["execution_time"] = execution_time
for interface in filtered_interfaces:
yield from self._probe_interface(interface)
self._save_cookie(state_file, self.current_state)
class MetadataContext(nagiosplugin.Context):
def evaluate(self, metric, resource):
state_cls = getattr(nagiosplugin.state, metric.name)
return self.result_cls(state=state_cls, hint=metric.value["message"], metric=metric)
# No traceback display during argument parsing
@nagiosplugin.guarded(verbose=0)
def parse_args():
parser = argparse.ArgumentParser(
formatter_class=argparse.ArgumentDefaultsHelpFormatter, description=__doc__
)
parser.add_argument(
"-v",
"--verbose",
help="enable more verbose output, can be specified multiple times",
default=0,
action="count",
)
filter_group = parser.add_argument_group(
"filtering options, exclusions are applied before inclusions, "
"increase verbosity for details"
)
filter_group.add_argument(
"-t",
"--type",
help="only select interfaces of this type, can be specified multiple times",
action="append",
default=[],
)
filter_group.add_argument(
"-T",
"--exclude-type",
help="exclude interfaces of this type, can be specified multiple times",
action="append",
default=[],
)
filter_group.add_argument(
"-n", "--name", help="only select interfaces whose names match this regular expression"
)
filter_group.add_argument(
"-N", "--exclude-name", help="exclude interfaces whose names match this regular expression"
)
filter_group.add_argument(
"-d",
"--down",
help="include interfaces whose operstate is down",
action="store_true",
default=False,
)
filter_group.add_argument(
"--include-netns",
help='include interfaces from non-default network namespaces, the netns will be prepended to the interface name with a slash, e.g. "container/wg0"',
action="store_true",
default=False,
)
parser.add_argument(
"-b",
"--bytes",
help="use bytes as output unit instead of bits, does not affect thresholds",
action="store_true",
default=False,
)
threshold_group = parser.add_argument_group(
"threshold options, suffixes K, M, G and T (case-insensitive) are accepted, "
"multiples of 1000 are used for bits and 1024 for bytes"
)
threshold_group.add_argument(
"-w",
"--warning",
metavar=("RX", "TX"),
nargs=2,
help="warning threshold",
type=human_size,
)
threshold_group.add_argument(
"-c",
"--critical",
metavar=("RX", "TX"),
nargs=2,
help="critical threshold",
type=human_size,
)
args = parser.parse_args()
multiplier = 1024 if args.bytes else 1000
for threshold in ("warning", "critical"):
arg = getattr(args, threshold)
for i, direction in enumerate(("rx", "tx")):
setattr(args, f"{threshold}_{direction}", None)
if arg:
setattr(
args,
f"{threshold}_{direction}",
arg[i][0] * multiplier ** arg[i][1],
)
return args
class TrafficSummary(nagiosplugin.Summary):
def ok(self, results):
return ""
def verbose(self, results):
messages = []
for result in results:
if not result.context or result.context.name not in ("rx", "tx"):
continue
human_readable_value = prettify_size(
result.metric.value, 1024 if result.metric.uom == "B" else 1000
)
messages.append(f"{result.metric.name} = {human_readable_value}{result.metric.uom}/s")
return "\n".join(messages)
def problem(self, results):
messages = []
# Worst results first
for result in sorted(results, key=lambda x: x.state, reverse=True):
if result.state == nagiosplugin.state.Ok:
continue
if result.context and result.context.name in ("rx", "tx"):
human_readable_value = prettify_size(
result.metric.value, 1024 if result.metric.uom == "B" else 1000
)
messages.append(
f"{result.metric.name} ="
f" {human_readable_value}{result.metric.uom}/s ({result.hint})"
)
else:
messages.append(result.hint)
return ", ".join(messages)
@nagiosplugin.guarded
def main(args):
# Unique identifier used to store check state
relevant_args = []
for arg, arg_val in sorted(vars(args).items()):
if arg not in ("verbose",):
relevant_args.append((arg, arg_val))
args_hash = hashlib.sha1(pickle.dumps(relevant_args)).hexdigest()
check = nagiosplugin.Check(
Traffic(args, args_hash),
MetadataContext("metadata"),
nagiosplugin.ScalarContext("rx", args.warning_rx, args.critical_rx),
nagiosplugin.ScalarContext("tx", args.warning_tx, args.critical_tx),
TrafficSummary(),
)
check.main(args.verbose)
if __name__ == "__main__":
main(parse_args())