Skip to content

Commit

Permalink
feat(cli): Regex based IP detection
Browse files Browse the repository at this point in the history
This commit implements regex based IP detection. This is intended to use
for logfiles where column based detection doesn't work.

See RFC (DigitaleGesellschaft#44) for more information.

Closes DigitaleGesellschaft#44
  • Loading branch information
open-dynaMIX committed Nov 4, 2020
1 parent 10270b3 commit a1bec7a
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 12 deletions.
8 changes: 7 additions & 1 deletion README.md
Expand Up @@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files.
- Masks IP addresses in log files
- Configurable amount of masked bits
- The column containing the IP address can freely be chosen
- Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information.
- Works for both access.log- and error.log files

## Officially supported python versions
Expand All @@ -56,7 +57,7 @@ For python versions <3.3:
```
usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE]
[--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING]
[-r STRING] [-p] [-d] [-v]
[--regex STRING [STRING ...]] [-r STRING] [-p] [-d] [-v]
Anonip is a tool to anonymize IP-addresses in log files.
Expand All @@ -76,13 +77,18 @@ optional arguments:
default: 1)
-l STRING, --delimiter STRING
log delimiter (default: " ")
--regex STRING [STRING ...]
regex for detecting IP addresses (use instead of -c)
-r STRING, --replace STRING
replacement string in case address parsing fails
(Example: 0.0.0.0)
-p, --skip-private do not mask addresses in private ranges. See IANA
Special-Purpose Address Registry.
-d, --debug print debug messages
-v, --version show program's version number and exit
Example-usage in apache-config:
CustomLog "| /path/to/anonip.py [OPTIONS] --output /path/to/log" combined
```

## Usage
Expand Down
77 changes: 72 additions & 5 deletions anonip.py
Expand Up @@ -41,6 +41,7 @@

import argparse
import logging
import re
import sys
from io import open

Expand Down Expand Up @@ -79,6 +80,7 @@ def __init__(
increment=0,
delimiter=" ",
replace=None,
regex=None,
skip_private=False,
):
"""
Expand All @@ -99,8 +101,13 @@ def __init__(
self.increment = increment
self.delimiter = delimiter
self.replace = replace
self.regex = regex
self.skip_private = skip_private

self.process_method = self.process_line_column
if self.regex:
self.process_method = self.process_line_regex

@property
def columns(self):
return self._columns
Expand Down Expand Up @@ -154,7 +161,8 @@ def run(self, input_file=None):

logger.debug("Got line: %r", line)

yield self.process_line(line)
yield self.process_method(line)

line = input_file.readline()

def process_ip(self, ip):
Expand All @@ -177,9 +185,36 @@ def process_ip(self, ip):
)
return trunc_ip

def process_line(self, line):
def process_line_regex(self, line):
"""
This function processes a single line based on the provided regex.
It returns the anonymized log line as string.
:param line: str
:return: str
"""
match = re.match(self.regex, line)
if not match:
logger.debug("Regex did not match!")
return line
groups = match.groups()

for m in set(groups):
if not m:
continue
ip_str, ip = self.extract_ip(m)
if ip:
trunc_ip = self.process_ip(ip)
line = line.replace(ip_str, str(trunc_ip))
elif self.replace:
line = line.replace(m, self.replace)

return line

def process_line_column(self, line):
"""
This function processes a single line.
This function processes a single line based on the provided columns.
It returns the anonymized log line as string.
Expand Down Expand Up @@ -298,6 +333,18 @@ def _validate_integer_ht_0(value):
return value


def regex_arg_type(value):
try:
re.compile(value)
except re.error as e:
msg = "must be a valid regex."
if hasattr(e, "msg"): # pragma: no cover
# not available on py27
msg = "must be a valid regex. Error: {}".format(e.msg)
raise argparse.ArgumentTypeError(msg)
return value


def parse_arguments(args):
"""
Parse all given arguments.
Expand Down Expand Up @@ -351,15 +398,20 @@ def parse_arguments(args):
type=lambda x: _validate_integer_ht_0(x),
help="assume IP address is in column n (1-based indexed; default: 1)",
)
parser.set_defaults(column=[1])
parser.add_argument(
"-l",
"--delimiter",
metavar="STRING",
type=str,
help='log delimiter (default: " ")',
)
parser.set_defaults(delimiter=" ")
parser.add_argument(
"--regex",
metavar="STRING",
nargs="+",
help="regex for detecting IP addresses (use optionally instead of -c)",
type=regex_arg_type,
)
parser.add_argument(
"-r",
"--replace",
Expand All @@ -381,6 +433,20 @@ def parse_arguments(args):

args = parser.parse_args(args)

if args.regex and (args.columns is not None or args.delimiter is not None):
raise parser.error(
'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.'
)
if not args.regex and args.columns is None:
args.columns = [1]
if not args.regex and args.delimiter is None:
args.delimiter = " "
if args.regex:
try:
args.regex = re.compile(r"|".join(args.regex))
except re.error: # pragma: no cover
raise argparse.ArgumentTypeError("Failed to compile concatenated regex!")

return args


Expand All @@ -402,6 +468,7 @@ def main():
args.increment,
args.delimiter,
args.replace,
args.regex,
args.skip_private,
)

Expand Down
101 changes: 95 additions & 6 deletions tests.py
Expand Up @@ -10,6 +10,7 @@

import argparse
import logging
import re
import sys
from io import StringIO

Expand Down Expand Up @@ -81,7 +82,7 @@
)
def test_process_line(ip, v4mask, v6mask, expected):
a = anonip.Anonip(ipv4mask=v4mask, ipv6mask=v6mask)
assert a.process_line(ip) == expected
assert a.process_line_column(ip) == expected


@pytest.mark.parametrize(
Expand All @@ -93,7 +94,7 @@ def test_process_line(ip, v4mask, v6mask, expected):
)
def test_increment(ip, increment, expected):
a = anonip.Anonip(increment=increment)
assert a.process_line(ip) == expected
assert a.process_line_column(ip) == expected


@pytest.mark.parametrize(
Expand Down Expand Up @@ -128,25 +129,71 @@ def test_increment(ip, increment, expected):
)
def test_column(line, columns, expected):
a = anonip.Anonip(columns=columns)
assert a.process_line(line) == expected
assert a.process_line_column(line) == expected


@pytest.mark.parametrize(
"line,regex,expected,replace",
[
(
'3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
re.compile(r"(?:^(.*) - - |.* - somefixedstring: (.*) - .* - (.*))"),
'3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
None,
),
(
"blabla/ 3.3.3.3 /blublu",
re.compile(r"^blabla/ (.*) /blublu"),
"blabla/ 3.3.0.0 /blublu",
None,
),
(
"1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3",
re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"),
"1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0",
None,
),
(
"some line that doesn't match the provided regex",
re.compile(r"^(.*) - somefixedstring: (.*) - .* - (.*)"),
"some line that doesn't match the provided regex",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ (.*) /blublu"),
"match but no ip/ notanip /blublu",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ (.*) /blublu"),
"match but no ip/ yeah /blublu",
"yeah",
),
],
)
def test_regex(line, regex, expected, replace):
a = anonip.Anonip(regex=regex, replace=replace)
assert a.process_line_regex(line) == expected


def test_replace():
a = anonip.Anonip(replace="replacement")
assert a.process_line("bla something") == "replacement something"
assert a.process_line_column("bla something") == "replacement something"


def test_delimiter():
a = anonip.Anonip(delimiter=";")
assert (
a.process_line("192.168.100.200;some;string;with;öéäü")
a.process_line_column("192.168.100.200;some;string;with;öéäü")
== "192.168.96.0;some;string;with;öéäü"
)


def test_private():
a = anonip.Anonip(skip_private=True)
assert a.process_line("192.168.100.200") == "192.168.100.200"
assert a.process_line_column("192.168.100.200") == "192.168.100.200"


def test_run(monkeypatch):
Expand Down Expand Up @@ -181,6 +228,39 @@ def test_cli_generic_args(args, attribute, expected):
assert getattr(anonip.parse_arguments(args), attribute) == expected


@pytest.mark.parametrize(
"args,success",
[
([], True),
(["--regex", "test"], True),
(["-c", "4"], True),
(["--regex", "test", "-c", "3"], False),
(["--regex", "test", "-l", ";"], False),
(["--regex", "test", "-l", ";", "-c", "4"], False),
],
)
def test_cli_args_ambiguity(args, success):
if success:
anonip.parse_arguments(args)
return

with pytest.raises(SystemExit) as e:
anonip.parse_arguments(args)
assert e.value.code == 2


@pytest.mark.parametrize(
"args,expected",
[
(["--regex", "test"], "test"),
(["--regex", "foo", "bar", "baz"], "foo|bar|baz"),
],
)
def test_regex_concat(args, expected):
args = anonip.parse_arguments(args)
assert args.regex == re.compile(expected)


@pytest.mark.parametrize(
"value,valid,bits",
[
Expand Down Expand Up @@ -210,6 +290,15 @@ def test_cli_validate_integer_ht_0(value, valid):
anonip._validate_integer_ht_0(value)


@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)])
def test_regex_arg_type(value, valid):
if valid:
assert anonip.regex_arg_type(value) == value
else:
with pytest.raises(argparse.ArgumentTypeError):
anonip.regex_arg_type(value)


@pytest.mark.parametrize("to_file", [False, True])
@pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)])
def test_main(
Expand Down

0 comments on commit a1bec7a

Please sign in to comment.