Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/flake.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,4 @@ jobs:
pip install flake8
- name: Analysing the code with flake8
run: |
flake8 $(git ls-files '*.py')
flake8 $(git ls-files '*.py') --max-line-length 100
36 changes: 36 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Run Tests

on:
push:
branches:
- main
pull_request:
branches:
- '*'

jobs:
test:
runs-on: ubuntu-latest

strategy:
matrix:
python-version: [3.11, 3.8]

steps:
- name: Checkout repository
uses: actions/checkout@v3

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v4
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install poetry
poetry install

- name: Run tests
run: |
poetry run pytest tests/
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ coverage.xml
.venv
env/
venv/
/site
/site
.idea/
4 changes: 2 additions & 2 deletions maskerlogger/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""
Init file for oxformatter package.
"""
from maskerlogger.masker_formatter import MaskerFormatter # noqa
__version__ = '0.3.0'
from maskerlogger.masker_formatter import MaskerFormatter, MaskerFormatterJson # noqa
__version__ = '0.4.0-beta.1'
9 changes: 6 additions & 3 deletions maskerlogger/ahocorasick_regex_match.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import tomllib
import tomli as toml
import re
from typing import List
import ahocorasick
Expand All @@ -21,18 +21,21 @@ def _initialize_automaton(self) -> ahocorasick.Automaton:
keyword_automaton.make_automaton()
return keyword_automaton

def _load_config(self, config_path: str) -> dict:
@staticmethod
def _load_config(config_path: str) -> dict:
with open(config_path, 'rb') as f:
return tomllib.load(f)
return toml.load(f)

def _extract_keywords_and_patterns(self, config) -> dict:
keyword_to_patterns = {}
for rule in config['rules']:
for keyword in rule.get('keywords', []):
if keyword not in keyword_to_patterns:
keyword_to_patterns[keyword] = []

keyword_to_patterns[keyword].append(self._get_compiled_regex(
rule['regex']))

return keyword_to_patterns

def _get_compiled_regex(self, regex: str) -> str:
Expand Down
96 changes: 79 additions & 17 deletions maskerlogger/masker_formatter.py
Original file line number Diff line number Diff line change
@@ -1,44 +1,106 @@
import logging
import os
import re
from abc import ABC
from typing import List

from pythonjsonlogger import jsonlogger

from maskerlogger.ahocorasick_regex_match import RegexMatcher
import os

DEFAULT_SECRETS_CONFIG_PATH = os.path.join(
os.path.dirname(__file__), "config/gitleaks.toml")
os.path.dirname(__file__), "config/gitleaks.toml"
)
_APPLY_MASK = 'apply_mask'
SKIP_MASK = {_APPLY_MASK: False}


class MaskerFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None, style='%', validate=True,
defaults=None,
regex_config_path=DEFAULT_SECRETS_CONFIG_PATH,
redact=100):
super().__init__(fmt, datefmt, style, validate=validate,
defaults=defaults)
self.redact = self._validate_redact(redact)
class AbstractMaskedLogger(ABC):
def __init__(
self,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact=100
):
"""Initializes the AbstractMaskedLogger.

Args:
regex_config_path (str): Path to the configuration file for regex patterns.
redact (int): Percentage of the sensitive data to redact.
"""
self.regex_matcher = RegexMatcher(regex_config_path)
self.redact = redact

def _validate_redact(self, redact: int) -> int:
@staticmethod
def _validate_redact(redact: int) -> int:
if not (0 <= int(redact) <= 100):
raise ValueError("Redact value must be between 0 and 100")
return int(redact)

def format(self, record: logging.LogRecord) -> str:
if getattr(record, _APPLY_MASK, True):
self._mask_sensitive_data(record)
return super().format(record)
return int(redact)

def _mask_secret(self, msg: str, matches: List[re.Match]) -> str:
"""Masks the sensitive data in the log message."""
for match in matches:
match_groups = match.groups() if match.groups() else [match.group()] # noqa
for group in match_groups:
redact_length = int((len(group) / 100) * self.redact)
msg = msg.replace(
group[:redact_length], "*" * redact_length, 1)
return msg

return msg

def _mask_sensitive_data(self, record: logging.LogRecord) -> None:
"""Applies masking to the sensitive data in the log message."""
if found_matching_regex := self.regex_matcher.match_regex_to_line(record.msg): # noqa
record.msg = self._mask_secret(record.msg, found_matching_regex)


# Normal Masked Logger - Text-Based Log Formatter
class MaskerFormatter(logging.Formatter, AbstractMaskedLogger):
def __init__(
self,
fmt: str,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact=100
):
"""Initializes the MaskerFormatter.

Args:
fmt (str): Format string for the logger.
regex_config_path (str): Path to the configuration file for regex patterns.
redact (int): Percentage of the sensitive data to redact.
"""
logging.Formatter.__init__(self, fmt)
AbstractMaskedLogger.__init__(self, regex_config_path, redact)

def format(self, record: logging.LogRecord) -> str:
"""Formats the log record as text and applies masking."""
if getattr(record, _APPLY_MASK, True):
self._mask_sensitive_data(record)

return super().format(record)


# JSON Masked Logger - JSON-Based Log Formatter
class MaskerFormatterJson(jsonlogger.JsonFormatter, AbstractMaskedLogger):
def __init__(
self,
fmt: str,
regex_config_path: str = DEFAULT_SECRETS_CONFIG_PATH,
redact=100
):
"""Initializes the MaskerFormatterJson.

Args:
fmt (str): Format string for the logger.
regex_config_path (str): Path to the configuration file for regex patterns.
redact (int): Percentage of the sensitive data to redact.
"""
jsonlogger.JsonFormatter.__init__(self, fmt)
AbstractMaskedLogger.__init__(self, regex_config_path, redact)

def format(self, record: logging.LogRecord) -> str:
"""Formats the log record as JSON and applies masking."""
if getattr(record, _APPLY_MASK, True):
self._mask_sensitive_data(record)

return super().format(record)
9 changes: 7 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
[tool.poetry]
name = "maskerlogger"
version = "0.3.0"
version = "0.4.0-beta.1"
description = "mask your secrets from your logs"
authors = ["Tamar Galer <tamar@ox.security>"]
readme = "README.md"
packages = [{include = "maskerlogger"}]

[tool.poetry.dependencies]
python = "^3.11"
python = ">=3.8"
pyahocorasick = "^2.1.0"
python-json-logger = "^2.0.7"
tomli = "^2.1.1"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"

[tool.poetry.urls]
Source = "https://github.com/oxsecurity/MaskerLogger"
Expand Down
Empty file added tests/__init__.py
Empty file.
158 changes: 158 additions & 0 deletions tests/test_masked_logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import logging
import pytest
import json
from io import StringIO
from maskerlogger import MaskerFormatter, MaskerFormatterJson


@pytest.fixture
def logger_and_log_stream():
"""
Pytest fixture to set up the logger and a StringIO stream for capturing log output.
Returns:
tuple: A logger instance and a StringIO object to capture the log output.
"""
logger = logging.getLogger('test_logger')
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
log_stream = StringIO()

# Create console handler and set formatter
console_handler = logging.StreamHandler(log_stream)
logger.addHandler(console_handler)

return logger, log_stream


@pytest.fixture
def log_format():
return "%(asctime)s %(name)s %(levelname)s %(message)s"


def test_masked_logger_text(logger_and_log_stream, log_format):
"""
Test the functionality of MaskerFormatter, ensuring it formats logs in plain text
and masks sensitive data correctly.
Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a sensitive message
logger.info("User login with password=secretpassword")

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# Validate that the password is masked in the text log output
assert "password=*****" in log_output
assert "secretpassword" not in log_output


def test_masked_logger_json(logger_and_log_stream, log_format):
"""
Test the functionality of MaskerFormatterJson, ensuring it formats logs in JSON format
and masks sensitive data correctly.
Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatterJson formatter
formatter = MaskerFormatterJson(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a sensitive message
logger.info("User login with password=secretpassword")

# Read and parse the log output
log_output = log_stream.getvalue().strip()
log_json = json.loads(log_output) # Parse the JSON log output

# Validate that the password is masked in the JSON log output
assert "password=*****" in log_json["message"]
assert "secretpassword" not in log_json["message"]


def test_masked_logger_text_format_after_masking(logger_and_log_stream, log_format):
"""
Test that MaskerFormatter outputs correctly formatted text logs after applying data masking.
Ensures that sensitive data is masked and log format remains valid.
Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a sensitive message
logger.info("Sensitive data: password=secretpassword and other info")

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# Validate that the password is masked and the log format is correct
assert "password=*****" in log_output
assert "secretpassword" not in log_output


def test_masked_logger_json_format_after_masking(logger_and_log_stream, log_format):
"""
Test that MaskerFormatterJson outputs correctly formatted JSON logs after applying data masking.
Ensures that sensitive data is masked and log format remains valid.
Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatterJson formatter
formatter = MaskerFormatterJson(fmt=log_format)

logger.handlers[0].setFormatter(formatter)

# Log a sensitive message
logger.info("Sensitive data: password=secretpassword and other info")

# Read and parse the log output
log_output = log_stream.getvalue().strip()
log_json = json.loads(log_output) # Parse the JSON log output

# Validate that the password is masked and the JSON log format is correct
assert "password=*****" in log_json["message"]
assert "secretpassword" not in log_json["message"]


def test_masked_logger_non_sensitive_data(logger_and_log_stream, log_format):
"""
Test that non-sensitive log messages are logged without modification,
ensuring they are formatted correctly in both text and JSON formats.
Args:
logger_and_log_stream (tuple): A tuple containing the logger and log stream.
"""
logger, log_stream = logger_and_log_stream

# Set the MaskerFormatter formatter for testing non-sensitive data
formatter = MaskerFormatter(fmt=log_format)
logger.handlers[0].setFormatter(formatter)

# Log a non-sensitive message
non_sensitive_msg = "This is a regular log message."
logger.info(non_sensitive_msg)

# Read and parse the log output
log_output = log_stream.getvalue().strip()

# Ensure the non-sensitive message is logged without any masking
assert non_sensitive_msg in log_output