Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions maskerlogger/ahocorasick_regex_match.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,7 @@ def _get_match_regex(
) -> list[re.Match[str]]:
matches: list[re.Match[str]] = []
for regex in matched_regex:
if match := regex.search(line):
matches.append(match)
matches.extend(regex.finditer(line))
return matches

def match_regex_to_line(self, line: str) -> list[re.Match[str]] | None:
Expand Down
57 changes: 46 additions & 11 deletions maskerlogger/masker_formatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,18 +59,53 @@ def _validate_redact(redact: int | str) -> int:

def _mask_secret(self, msg: str, matches: list[re.Match]) -> str:
"""Masks the sensitive data in the log message."""
if not matches:
return msg

# Create a character array to track which positions should be masked
# Each element will be True if that character should be masked
mask_positions = [False] * len(msg)

# Process all matches and mark positions for masking
for match in matches:
match_groups = list(match.groups()) if match.groups() else [match.group()]
for group in match_groups:
if not group: # Skip empty groups
continue
redact_length = int((len(group) / 100) * self.redact)
if redact_length > 0:
# Replace only the beginning of the group with asterisks
masked_part = "*" * redact_length + group[redact_length:]
msg = msg.replace(group, masked_part, 1)

return msg
masked_something = False

# If there are capture groups, try to process each group (starting from 1)
if match.groups():
for group_index in range(1, len(match.groups()) + 1):
group = match.group(group_index)
if group: # Process non-empty groups
group_start = match.start(group_index)
group_end = match.end(group_index)
redact_length = int((len(group) / 100) * self.redact)

# Mark positions for masking (only the first redact_length characters)
for pos in range(group_start, min(group_start + redact_length, group_end)):
mask_positions[pos] = True
masked_something = True

# If no capture groups exist, or all capture groups were None/empty,
# fall back to masking the entire match (group 0)
if not masked_something:
full_match = match.group(0)
if full_match:
group_start = match.start(0)
group_end = match.end(0)
redact_length = int((len(full_match) / 100) * self.redact)

# Mark positions for masking (only the first redact_length characters)
for pos in range(group_start, min(group_start + redact_length, group_end)):
mask_positions[pos] = True

# Build the masked string by replacing marked positions with asterisks
result = []
for i, char in enumerate(msg):
if mask_positions[i]:
result.append("*")
else:
result.append(char)

return "".join(result)

def _mask_sensitive_data(self, record: logging.LogRecord) -> None:
"""Applies masking to the sensitive data in the log message."""
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "maskerlogger"
version = "1.1.0b1"
version = "1.1.0b2"
description = "mask your secrets from your logs"
authors = [
{name = "Tamar Galer", email = "tamar@ox.security"},
Expand Down
270 changes: 270 additions & 0 deletions tests/test_masked_logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,273 @@ def test_redact_validation_type_conversion():
# Test invalid string values
with pytest.raises(ValueError, match="Redact value must be between 0 and 100"):
MaskerFormatter(fmt="%(message)s", redact="150")


def test_masked_logger_multiple_leaks_same_string(logger_and_log_stream, log_format):
"""
Test that multiple occurrences of the same leak in a single string are all masked.
This verifies the fix for catching more than 1 leak in the same string.

Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a message with multiple instances of the same secret (using 10+ char passwords)
logger.info(
"First password=secretpassword and second password=anothersecret and third password=secretpassword"
)

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# Validate that all password instances are masked
assert "password=" in log_output
assert "secretpassword" not in log_output
assert "anothersecret" not in log_output

# Count the number of password= occurrences to ensure all are processed
password_count = log_output.count("password=")
assert password_count == 3, f"Expected 3 password fields, found {password_count}"


def test_masked_logger_multiple_different_leaks_same_string(logger_and_log_stream, log_format):
"""
Test that multiple different types of leaks in a single string are all masked.

Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a message with multiple different sensitive patterns (using 10+ char secrets)
logger.info(
"User data: password=mysecretpassword and token=abc123tokenlong and password=anothersecret"
)

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# Validate that both password instances and token are masked
assert "password=" in log_output
assert "token=" in log_output
assert "mysecretpassword" not in log_output
assert "anothersecret" not in log_output
assert "abc123tokenlong" not in log_output


def test_masked_logger_overlapping_matches(logger_and_log_stream, log_format):
"""
Test that overlapping matches from different regex patterns are handled correctly.
This verifies that the character-array approach properly handles complex scenarios
where multiple patterns might match overlapping text spans.

Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a message that might trigger overlapping regex patterns
logger.info("Auth data: token=secrettoken123456 and password=overlappingsecretkey")

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# Validate that all sensitive data is masked, even with potential overlaps
assert "secrettoken123456" not in log_output
assert "overlappingsecretkey" not in log_output
# Note: Different patterns capture differently - some include key=, others don't
# The important thing is that the secret values are masked
assert "password=" in log_output # This pattern captures only the value


def test_masked_logger_empty_capture_groups(logger_and_log_stream, log_format):
"""
Test that patterns with capture groups that are all None/empty still get masked.
This verifies the fix for the edge case where regex patterns have optional groups
that don't match, leaving all capture groups as None/empty.

Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Create a scenario that might result in None capture groups
# This could happen with complex regex patterns that have optional groups
logger.info("API key: AKIAIOSFODNN7EXAMPLE") # AWS access key pattern

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# The key should be masked even if capture groups are None/empty
assert "AKIAIOSFODNN7EXAMPLE" not in log_output
# Some part of the message should be masked (asterisks should appear)
assert "*" in log_output


def test_masked_logger_no_capture_groups_fallback(logger_and_log_stream, log_format):
"""
Test that patterns with no capture groups fall back to masking the entire match (group 0).
This covers the fallback code path when masked_something remains False.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Use a pattern that we know doesn't have capture groups but matches secrets
# The JWT pattern should match without capture groups in some cases
logger.info(
"JWT: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c"
)

log_output = log_stream.getvalue().strip()

# The entire JWT should be masked since there are no capture groups
assert "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9" not in log_output
assert "*" in log_output


def test_masked_logger_all_capture_groups_none(logger_and_log_stream, log_format):
"""
Test the fallback to group 0 when all capture groups are None.
This specifically tests the case where match.groups() returns a tuple with None values.
"""
import re
from unittest.mock import Mock

logger, log_stream = logger_and_log_stream

formatter = MaskerFormatter(fmt=log_format)

# Mock a match object that has capture groups but they're all None
mock_match = Mock(spec=re.Match)
mock_match.groups.return_value = (None, None) # Two capture groups, both None
mock_match.group.side_effect = lambda i=0: "sensitivedata12345" if i == 0 else None
mock_match.start.side_effect = lambda i=0: 10 if i == 0 else -1
mock_match.end.side_effect = lambda i=0: 27 if i == 0 else -1

# Test the _mask_secret method directly with our mock match
test_message = "Some text sensitivedata12345 more text"
result = formatter._mask_secret(test_message, [mock_match])

# Should fall back to masking the entire match since all capture groups are None
assert "sensitivedata12345" not in result
assert "*" in result


def test_masked_logger_all_capture_groups_empty(logger_and_log_stream, log_format):
"""
Test the fallback to group 0 when all capture groups are empty strings.
"""
import re
from unittest.mock import Mock

logger, log_stream = logger_and_log_stream

formatter = MaskerFormatter(fmt=log_format)

# Mock a match object that has capture groups, but they're all empty strings
mock_match = Mock(spec=re.Match)
mock_match.groups.return_value = ("", "") # Two capture groups, both empty
mock_match.group.side_effect = lambda i=0: "anothersecret123" if i == 0 else ""
mock_match.start.side_effect = lambda i=0: 5 if i == 0 else -1
mock_match.end.side_effect = lambda i=0: 21 if i == 0 else -1

# Test the _mask_secret method directly
test_message = "Data anothersecret123 end"
result = formatter._mask_secret(test_message, [mock_match])

# Should fall back to masking the entire match since all capture groups are empty
assert "anothersecret123" not in result
assert "*" in result


def test_masked_logger_fallback_with_different_redact_percentages():
"""
Test the fallback masking with different redact percentages to ensure
the redact_length calculation works correctly in the fallback code path.
"""
import re
from unittest.mock import Mock

test_cases = [
(0, "testsecret1234", "testsecret1234"), # 0% should not mask anything
(50, "testsecret1234", "*******cret1234"), # 50% should mask half
(100, "testsecret1234", "**************"), # 100% should mask everything
]

for redact_percent, secret, _ in test_cases:
formatter = MaskerFormatter(fmt="%(message)s", redact=redact_percent)

# Mock a match with no valid capture groups
mock_match = Mock(spec=re.Match)
mock_match.groups.return_value = (None,)
mock_match.group.side_effect = lambda i=0, s=secret: s if i == 0 else None
mock_match.start.side_effect = lambda i=0: 0 if i == 0 else -1
mock_match.end.side_effect = lambda i=0, s=secret: len(s) if i == 0 else -1

result = formatter._mask_secret(secret, [mock_match])

if redact_percent == 0:
# 0% redaction should leave the original text
assert result == secret
else:
# Other percentages should mask appropriately
expected_mask_length = int((len(secret) / 100) * redact_percent)
expected_asterisks = "*" * expected_mask_length
expected_remaining = secret[expected_mask_length:]
expected_result = expected_asterisks + expected_remaining
assert result == expected_result, (
f"Redact {redact_percent}%: expected {expected_result}, got {result}"
)


def test_masked_logger_mixed_capture_groups_fallback():
"""
Test a scenario where some matches have valid capture groups and others need fallback.
This ensures both code paths work together correctly.
"""
import re
from unittest.mock import Mock

formatter = MaskerFormatter(fmt="%(message)s")

# First match: has a valid capture group
mock_match1 = Mock(spec=re.Match)
mock_match1.groups.return_value = ("validgroup123",)
mock_match1.group.side_effect = lambda i=0: "key=validgroup123" if i == 0 else "validgroup123"
mock_match1.start.side_effect = lambda i=0: 0 if i == 0 else 4
mock_match1.end.side_effect = lambda i=0: 17 if i == 0 else 17

# Second match: has capture groups, but they're all None (needs fallback)
mock_match2 = Mock(spec=re.Match)
mock_match2.groups.return_value = (None, None)
mock_match2.group.side_effect = lambda i=0: "fallbacksecret" if i == 0 else None
mock_match2.start.side_effect = lambda i=0: 20 if i == 0 else -1
mock_match2.end.side_effect = lambda i=0: 34 if i == 0 else -1

test_message = "key=validgroup123 : fallbacksecret end"
result = formatter._mask_secret(test_message, [mock_match1, mock_match2])

# Both secrets should be masked
assert "validgroup123" not in result
assert "fallbacksecret" not in result
assert "*" in result