From a1bec7afd02d6307fb59df5a7c2d894e9f341c49 Mon Sep 17 00:00:00 2001 From: Fabio Ambauen <1833932+open-dynaMIX@users.noreply.github.com> Date: Mon, 19 Oct 2020 16:05:33 +0200 Subject: [PATCH] feat(cli): Regex based IP detection This commit implements regex based IP detection. This is intended to use for logfiles where column based detection doesn't work. See RFC (#44) for more information. Closes #44 --- README.md | 8 ++++- anonip.py | 77 ++++++++++++++++++++++++++++++++++++++--- tests.py | 101 ++++++++++++++++++++++++++++++++++++++++++++++++++---- 3 files changed, 174 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 81dfabf..08d85ea 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files. - Masks IP addresses in log files - Configurable amount of masked bits - The column containing the IP address can freely be chosen + - Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information. - Works for both access.log- and error.log files ## Officially supported python versions @@ -56,7 +57,7 @@ For python versions <3.3: ``` usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE] [--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING] - [-r STRING] [-p] [-d] [-v] + [--regex STRING [STRING ...]] [-r STRING] [-p] [-d] [-v] Anonip is a tool to anonymize IP-addresses in log files. @@ -76,6 +77,8 @@ optional arguments: default: 1) -l STRING, --delimiter STRING log delimiter (default: " ") + --regex STRING [STRING ...] + regex for detecting IP addresses (use instead of -c) -r STRING, --replace STRING replacement string in case address parsing fails (Example: 0.0.0.0) @@ -83,6 +86,9 @@ optional arguments: Special-Purpose Address Registry. -d, --debug print debug messages -v, --version show program's version number and exit + +Example-usage in apache-config: +CustomLog "| /path/to/anonip.py [OPTIONS] --output /path/to/log" combined ``` ## Usage diff --git a/anonip.py b/anonip.py index 1f93677..c232a95 100755 --- a/anonip.py +++ b/anonip.py @@ -41,6 +41,7 @@ import argparse import logging +import re import sys from io import open @@ -79,6 +80,7 @@ def __init__( increment=0, delimiter=" ", replace=None, + regex=None, skip_private=False, ): """ @@ -99,8 +101,13 @@ def __init__( self.increment = increment self.delimiter = delimiter self.replace = replace + self.regex = regex self.skip_private = skip_private + self.process_method = self.process_line_column + if self.regex: + self.process_method = self.process_line_regex + @property def columns(self): return self._columns @@ -154,7 +161,8 @@ def run(self, input_file=None): logger.debug("Got line: %r", line) - yield self.process_line(line) + yield self.process_method(line) + line = input_file.readline() def process_ip(self, ip): @@ -177,9 +185,36 @@ def process_ip(self, ip): ) return trunc_ip - def process_line(self, line): + def process_line_regex(self, line): + """ + This function processes a single line based on the provided regex. + + It returns the anonymized log line as string. + + :param line: str + :return: str + """ + match = re.match(self.regex, line) + if not match: + logger.debug("Regex did not match!") + return line + groups = match.groups() + + for m in set(groups): + if not m: + continue + ip_str, ip = self.extract_ip(m) + if ip: + trunc_ip = self.process_ip(ip) + line = line.replace(ip_str, str(trunc_ip)) + elif self.replace: + line = line.replace(m, self.replace) + + return line + + def process_line_column(self, line): """ - This function processes a single line. + This function processes a single line based on the provided columns. It returns the anonymized log line as string. @@ -298,6 +333,18 @@ def _validate_integer_ht_0(value): return value +def regex_arg_type(value): + try: + re.compile(value) + except re.error as e: + msg = "must be a valid regex." + if hasattr(e, "msg"): # pragma: no cover + # not available on py27 + msg = "must be a valid regex. Error: {}".format(e.msg) + raise argparse.ArgumentTypeError(msg) + return value + + def parse_arguments(args): """ Parse all given arguments. @@ -351,7 +398,6 @@ def parse_arguments(args): type=lambda x: _validate_integer_ht_0(x), help="assume IP address is in column n (1-based indexed; default: 1)", ) - parser.set_defaults(column=[1]) parser.add_argument( "-l", "--delimiter", @@ -359,7 +405,13 @@ def parse_arguments(args): type=str, help='log delimiter (default: " ")', ) - parser.set_defaults(delimiter=" ") + parser.add_argument( + "--regex", + metavar="STRING", + nargs="+", + help="regex for detecting IP addresses (use optionally instead of -c)", + type=regex_arg_type, + ) parser.add_argument( "-r", "--replace", @@ -381,6 +433,20 @@ def parse_arguments(args): args = parser.parse_args(args) + if args.regex and (args.columns is not None or args.delimiter is not None): + raise parser.error( + 'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.' + ) + if not args.regex and args.columns is None: + args.columns = [1] + if not args.regex and args.delimiter is None: + args.delimiter = " " + if args.regex: + try: + args.regex = re.compile(r"|".join(args.regex)) + except re.error: # pragma: no cover + raise argparse.ArgumentTypeError("Failed to compile concatenated regex!") + return args @@ -402,6 +468,7 @@ def main(): args.increment, args.delimiter, args.replace, + args.regex, args.skip_private, ) diff --git a/tests.py b/tests.py index 7762f0b..c477a36 100755 --- a/tests.py +++ b/tests.py @@ -10,6 +10,7 @@ import argparse import logging +import re import sys from io import StringIO @@ -81,7 +82,7 @@ ) def test_process_line(ip, v4mask, v6mask, expected): a = anonip.Anonip(ipv4mask=v4mask, ipv6mask=v6mask) - assert a.process_line(ip) == expected + assert a.process_line_column(ip) == expected @pytest.mark.parametrize( @@ -93,7 +94,7 @@ def test_process_line(ip, v4mask, v6mask, expected): ) def test_increment(ip, increment, expected): a = anonip.Anonip(increment=increment) - assert a.process_line(ip) == expected + assert a.process_line_column(ip) == expected @pytest.mark.parametrize( @@ -128,25 +129,71 @@ def test_increment(ip, increment, expected): ) def test_column(line, columns, expected): a = anonip.Anonip(columns=columns) - assert a.process_line(line) == expected + assert a.process_line_column(line) == expected + + +@pytest.mark.parametrize( + "line,regex,expected,replace", + [ + ( + '3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"', + re.compile(r"(?:^(.*) - - |.* - somefixedstring: (.*) - .* - (.*))"), + '3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"', + None, + ), + ( + "blabla/ 3.3.3.3 /blublu", + re.compile(r"^blabla/ (.*) /blublu"), + "blabla/ 3.3.0.0 /blublu", + None, + ), + ( + "1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3", + re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"), + "1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0", + None, + ), + ( + "some line that doesn't match the provided regex", + re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"), + "some line that doesn't match the provided regex", + None, + ), + ( + "match but no ip/ notanip /blublu", + re.compile(r"^match but no ip/ (.*) /blublu"), + "match but no ip/ notanip /blublu", + None, + ), + ( + "match but no ip/ notanip /blublu", + re.compile(r"^match but no ip/ (.*) /blublu"), + "match but no ip/ yeah /blublu", + "yeah", + ), + ], +) +def test_regex(line, regex, expected, replace): + a = anonip.Anonip(regex=regex, replace=replace) + assert a.process_line_regex(line) == expected def test_replace(): a = anonip.Anonip(replace="replacement") - assert a.process_line("bla something") == "replacement something" + assert a.process_line_column("bla something") == "replacement something" def test_delimiter(): a = anonip.Anonip(delimiter=";") assert ( - a.process_line("192.168.100.200;some;string;with;öéäü") + a.process_line_column("192.168.100.200;some;string;with;öéäü") == "192.168.96.0;some;string;with;öéäü" ) def test_private(): a = anonip.Anonip(skip_private=True) - assert a.process_line("192.168.100.200") == "192.168.100.200" + assert a.process_line_column("192.168.100.200") == "192.168.100.200" def test_run(monkeypatch): @@ -181,6 +228,39 @@ def test_cli_generic_args(args, attribute, expected): assert getattr(anonip.parse_arguments(args), attribute) == expected +@pytest.mark.parametrize( + "args,success", + [ + ([], True), + (["--regex", "test"], True), + (["-c", "4"], True), + (["--regex", "test", "-c", "3"], False), + (["--regex", "test", "-l", ";"], False), + (["--regex", "test", "-l", ";", "-c", "4"], False), + ], +) +def test_cli_args_ambiguity(args, success): + if success: + anonip.parse_arguments(args) + return + + with pytest.raises(SystemExit) as e: + anonip.parse_arguments(args) + assert e.value.code == 2 + + +@pytest.mark.parametrize( + "args,expected", + [ + (["--regex", "test"], "test"), + (["--regex", "foo", "bar", "baz"], "foo|bar|baz"), + ], +) +def test_regex_concat(args, expected): + args = anonip.parse_arguments(args) + assert args.regex == re.compile(expected) + + @pytest.mark.parametrize( "value,valid,bits", [ @@ -210,6 +290,15 @@ def test_cli_validate_integer_ht_0(value, valid): anonip._validate_integer_ht_0(value) +@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)]) +def test_regex_arg_type(value, valid): + if valid: + assert anonip.regex_arg_type(value) == value + else: + with pytest.raises(argparse.ArgumentTypeError): + anonip.regex_arg_type(value) + + @pytest.mark.parametrize("to_file", [False, True]) @pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)]) def test_main(