Skip to content

Commit

Permalink
feat(cli): Regex based IP detection
Browse files Browse the repository at this point in the history
This commit implements regex based IP detection. This is intended to use
for logfiles where column based detection doesn't work.

See RFC (DigitaleGesellschaft#44) for more information.

Closes DigitaleGesellschaft#44
  • Loading branch information
open-dynaMIX committed Dec 26, 2021
1 parent b516ea9 commit 3257d20
Show file tree
Hide file tree
Showing 3 changed files with 175 additions and 5 deletions.
8 changes: 7 additions & 1 deletion README.md
Expand Up @@ -34,6 +34,7 @@ Using shell redirects, it's also possible to rewrite existing log files.
- Masks IP addresses in log files
- Configurable amount of masked bits
- The column containing the IP address can freely be chosen
- Alternatively use a regex to point anonip to the location(s) of the IP(s). See [this RFC](https://github.com/DigitaleGesellschaft/Anonip/issues/44) for more information.
- Works for both access.log- and error.log files

## Officially supported python versions
Expand All @@ -57,7 +58,7 @@ For python versions <3.3:
```
usage: anonip.py [-h] [-4 INTEGER] [-6 INTEGER] [-i INTEGER] [-o FILE]
[--input FILE] [-c INTEGER [INTEGER ...]] [-l STRING]
[-r STRING] [-p] [-d] [-v]
[--regex STRING [STRING ...]] [-r STRING] [-p] [-d] [-v]
Anonip is a tool to anonymize IP-addresses in log files.
Expand All @@ -77,13 +78,18 @@ optional arguments:
default: 1)
-l STRING, --delimiter STRING
log delimiter (default: " ")
--regex STRING [STRING ...]
regex for detecting IP addresses (use instead of -c)
-r STRING, --replace STRING
replacement string in case address parsing fails
(Example: 0.0.0.0)
-p, --skip-private do not mask addresses in private ranges. See IANA
Special-Purpose Address Registry.
-d, --debug print debug messages
-v, --version show program's version number and exit
Example-usage in apache-config:
CustomLog "| /path/to/anonip.py [OPTIONS] --output /path/to/log" combined
```

## Usage
Expand Down
83 changes: 79 additions & 4 deletions anonip.py
Expand Up @@ -41,6 +41,7 @@

import argparse
import logging
import re
import sys
from io import open

Expand Down Expand Up @@ -78,6 +79,7 @@ def __init__(
increment=0,
delimiter=" ",
replace=None,
regex=None,
skip_private=False,
):
"""
Expand All @@ -98,6 +100,7 @@ def __init__(
self.increment = increment
self.delimiter = delimiter
self.replace = replace
self.regex = regex
self.skip_private = skip_private

@property
Expand Down Expand Up @@ -154,6 +157,7 @@ def run(self, input_file=None):
logger.debug("Got line: %r", line)

yield self.process_line(line)

line = input_file.readline()

def process_ip(self, ip):
Expand All @@ -176,9 +180,36 @@ def process_ip(self, ip):
)
return trunc_ip

def process_line(self, line):
def process_line_regex(self, line):
"""
This function processes a single line.
This function processes a single line based on the provided regex.
It returns the anonymized log line as string.
:param line: str
:return: str
"""
match = re.match(self.regex, line)
if not match:
logger.debug("Regex did not match!")
return line
groups = match.groups()

for m in set(groups):
if not m:
continue
ip_str, ip = self.extract_ip(m)
if ip:
trunc_ip = self.process_ip(ip)
line = line.replace(ip_str, str(trunc_ip))
elif self.replace:
line = line.replace(m, self.replace)

return line

def process_line_column(self, line):
"""
This function processes a single line based on the provided columns.
It returns the anonymized log line as string.
Expand Down Expand Up @@ -206,6 +237,18 @@ def process_line(self, line):

return self.delimiter.join(loglist)

def process_line(self, line):
"""
This function processes a single line.
It returns the anonymized log line as string.
:param line: str
:return: str
"""
if self.regex:
return self.process_line_regex(line)
return self.process_line_column(line)

@staticmethod
def extract_ip(column):
"""
Expand Down Expand Up @@ -297,6 +340,18 @@ def _validate_integer_ht_0(value):
return value


def regex_arg_type(value):
try:
re.compile(value)
except re.error as e:
msg = "must be a valid regex."
if hasattr(e, "msg"): # pragma: no cover
# not available on py27
msg = "must be a valid regex. Error: {}".format(e.msg)
raise argparse.ArgumentTypeError(msg)
return value


def parse_arguments(args):
"""
Parse all given arguments.
Expand Down Expand Up @@ -350,15 +405,20 @@ def parse_arguments(args):
type=lambda x: _validate_integer_ht_0(x),
help="assume IP address is in column n (1-based indexed; default: 1)",
)
parser.set_defaults(column=[1])
parser.add_argument(
"-l",
"--delimiter",
metavar="STRING",
type=str,
help='log delimiter (default: " ")',
)
parser.set_defaults(delimiter=" ")
parser.add_argument(
"--regex",
metavar="STRING",
nargs="+",
help="regex for detecting IP addresses (use optionally instead of -c)",
type=regex_arg_type,
)
parser.add_argument(
"-r",
"--replace",
Expand All @@ -380,6 +440,20 @@ def parse_arguments(args):

args = parser.parse_args(args)

if args.regex and (args.columns is not None or args.delimiter is not None):
raise parser.error(
'Ambiguous arguments: When using "--regex", "-c" and "-l" can\'t be used.'
)
if not args.regex and args.columns is None:
args.columns = [1]
if not args.regex and args.delimiter is None:
args.delimiter = " "
if args.regex:
try:
args.regex = re.compile(r"|".join(args.regex))
except re.error: # pragma: no cover
raise argparse.ArgumentTypeError("Failed to compile concatenated regex!")

return args


Expand All @@ -402,6 +476,7 @@ def main():
args.increment,
args.delimiter,
args.replace,
args.regex,
args.skip_private,
)

Expand Down
89 changes: 89 additions & 0 deletions tests.py
Expand Up @@ -10,6 +10,7 @@

import argparse
import logging
import re
import sys
from io import StringIO

Expand Down Expand Up @@ -128,6 +129,52 @@ def test_column(line, columns, expected):
assert a.process_line(line) == expected


@pytest.mark.parametrize(
"line,regex,expected,replace",
[
(
'3.3.3.3 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
re.compile(r"(?:^([^,]+) - - |.* - somefixedstring: ([^,]+) - .* - ([^,]+))"),
'3.3.0.0 - - [20/May/2015:21:05:01 +0000] "GET / HTTP/1.1" 200 13358 "-" "useragent"',
None,
),
(
"blabla/ 3.3.3.3 /blublu",
re.compile(r"^blabla/ ([^,]+) /blublu"),
"blabla/ 3.3.0.0 /blublu",
None,
),
(
"1.1.1.1 - somefixedstring: 2.2.2.2 - some random stuff - 3.3.3.3",
re.compile(r"^([^,]+) - somefixedstring: ([^,]+) - .* - ([^,]+)"),
"1.1.0.0 - somefixedstring: 2.2.0.0 - some random stuff - 3.3.0.0",
None,
),
(
"some line that doesn't match the provided regex",
re.compile(r"^([^,]+) - somefixedstring: ([^,]+) - .* - ([^,]+)"),
"some line that doesn't match the provided regex",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ ([^,]+) /blublu"),
"match but no ip/ notanip /blublu",
None,
),
(
"match but no ip/ notanip /blublu",
re.compile(r"^match but no ip/ ([^,]+) /blublu"),
"match but no ip/ yeah /blublu",
"yeah",
),
],
)
def test_regex(line, regex, expected, replace):
a = anonip.Anonip(regex=regex, replace=replace)
assert a.process_line(line) == expected


def test_replace():
a = anonip.Anonip(replace="replacement")
assert a.process_line("bla something") == "replacement something"
Expand Down Expand Up @@ -178,6 +225,39 @@ def test_cli_generic_args(args, attribute, expected):
assert getattr(anonip.parse_arguments(args), attribute) == expected


@pytest.mark.parametrize(
"args,success",
[
([], True),
(["--regex", "test"], True),
(["-c", "4"], True),
(["--regex", "test", "-c", "3"], False),
(["--regex", "test", "-l", ";"], False),
(["--regex", "test", "-l", ";", "-c", "4"], False),
],
)
def test_cli_args_ambiguity(args, success):
if success:
anonip.parse_arguments(args)
return

with pytest.raises(SystemExit) as e:
anonip.parse_arguments(args)
assert e.value.code == 2


@pytest.mark.parametrize(
"args,expected",
[
(["--regex", "test"], "test"),
(["--regex", "foo", "bar", "baz"], "foo|bar|baz"),
],
)
def test_regex_concat(args, expected):
args = anonip.parse_arguments(args)
assert args.regex == re.compile(expected)


@pytest.mark.parametrize(
"value,valid,bits",
[
Expand Down Expand Up @@ -207,6 +287,15 @@ def test_cli_validate_integer_ht_0(value, valid):
anonip._validate_integer_ht_0(value)


@pytest.mark.parametrize("value,valid", [("valid (.*)", True), ("\\9", False)])
def test_regex_arg_type(value, valid):
if valid:
assert anonip.regex_arg_type(value) == value
else:
with pytest.raises(argparse.ArgumentTypeError):
anonip.regex_arg_type(value)


@pytest.mark.parametrize("to_file", [False, True])
@pytest.mark.parametrize("debug,log_level", [(False, 30), (True, 10)])
def test_main(
Expand Down

0 comments on commit 3257d20

Please sign in to comment.