Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 171 additions & 31 deletions src/telemetry_window_demo/cli.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from __future__ import annotations

import argparse
from pathlib import Path
from typing import Any
import argparse
from collections.abc import Mapping, Sequence
from pathlib import Path
from typing import Any

from .features import compute_window_features
from .io import (
Expand All @@ -16,12 +17,21 @@
write_table,
)
from .preprocess import normalize_events
from .rules import apply_rules
from .visualize import plot_outputs
from .windowing import build_windows


def main() -> None:
from .rules import apply_rules
from .visualize import plot_outputs
from .windowing import build_windows

RUN_RULE_SECTION_NAMES = (
"high_error_rate",
"login_fail_burst",
"high_severity_spike",
"persistent_high_error",
"source_spread_spike",
"rare_event_repeat",
)


def main() -> None:
parser = build_parser()
args = parser.parse_args()
args.func(args)
Expand Down Expand Up @@ -88,14 +98,15 @@ def build_parser() -> argparse.ArgumentParser:
return parser


def run_command(args: argparse.Namespace) -> None:
config_path = Path(args.config).resolve()
config = load_config(config_path)
time_config = config.get("time", {})
feature_config = config.get("features", {})
rules_config = config.get("rules") or {}
input_path = resolve_config_path(config_path, config["input_path"])
output_dir = resolve_config_path(config_path, config.get("output_dir", "data/processed"))
def run_command(args: argparse.Namespace) -> None:
config_path = Path(args.config).resolve()
config = load_config(config_path)
run_config = _validate_run_config(config)
time_config = run_config["time"]
feature_config = run_config["features"]
rules_config = run_config["rules"]
input_path = resolve_config_path(config_path, run_config["input_path"])
output_dir = resolve_config_path(config_path, run_config["output_dir"])

events = load_events(input_path)
normalized = normalize_events(
Expand All @@ -104,19 +115,19 @@ def run_command(args: argparse.Namespace) -> None:
error_statuses=feature_config.get("error_statuses"),
high_severity_levels=feature_config.get("severity_levels"),
)
windows = build_windows(
normalized,
timestamp_col=time_config.get("timestamp_col", "timestamp"),
window_size_seconds=int(time_config.get("window_size_seconds", 60)),
step_size_seconds=int(time_config.get("step_size_seconds", 10)),
)
windows = build_windows(
normalized,
timestamp_col=time_config.get("timestamp_col", "timestamp"),
window_size_seconds=time_config["window_size_seconds"],
step_size_seconds=time_config["step_size_seconds"],
)
features = compute_window_features(
normalized,
windows,
count_event_types=feature_config.get("count_event_types"),
)
alerts = apply_rules(features, rules_config)
cooldown_seconds = int(rules_config.get("cooldown_seconds", 0))
)
alerts = apply_rules(features, rules_config)
cooldown_seconds = rules_config["cooldown_seconds"]

feature_path = write_table(features, output_dir / "features.csv")
alert_path = write_table(alerts, output_dir / "alerts.csv")
Expand Down Expand Up @@ -222,16 +233,145 @@ def run_config_change_demo_command(args: argparse.Namespace) -> None:
print(f" - {name}: {_display_path(path)}")


def _display_path(path: Path) -> str:
def _display_path(path: Path) -> str:
cwd = Path.cwd().resolve()
resolved = path.resolve()
try:
return resolved.relative_to(cwd).as_posix()
except ValueError:
return resolved.as_posix()


def _build_run_summary(
return resolved.as_posix()


def _validate_run_config(config: Mapping[str, Any]) -> dict[str, Any]:
time_config = _optional_mapping(config.get("time", {}), "time")
feature_config = _optional_mapping(config.get("features", {}), "features")
rules_config = _validate_rules_config(config.get("rules"))

return {
"input_path": _path_config_value(config.get("input_path"), "input_path"),
"output_dir": _path_config_value(
config.get("output_dir", "data/processed"),
"output_dir",
),
"time": {
"timestamp_col": _string_config_value(
time_config.get("timestamp_col", "timestamp"),
"time.timestamp_col",
),
"window_size_seconds": _int_config_value(
time_config.get("window_size_seconds", 60),
"time.window_size_seconds",
minimum=1,
),
"step_size_seconds": _int_config_value(
time_config.get("step_size_seconds", 10),
"time.step_size_seconds",
minimum=1,
),
},
"features": {
"count_event_types": _optional_string_sequence(
feature_config.get("count_event_types"),
"features.count_event_types",
),
"error_statuses": _optional_string_sequence(
feature_config.get("error_statuses"),
"features.error_statuses",
),
"severity_levels": _optional_string_sequence(
feature_config.get("severity_levels"),
"features.severity_levels",
),
},
"rules": rules_config,
}


def _validate_rules_config(raw_rules_config: Any) -> dict[str, Any]:
rules_config = (
{}
if raw_rules_config is None
else dict(_optional_mapping(raw_rules_config, "rules"))
)
rules_config["cooldown_seconds"] = _int_config_value(
rules_config.get("cooldown_seconds", 0),
"rules.cooldown_seconds",
minimum=0,
)

for rule_name in RUN_RULE_SECTION_NAMES:
if rule_name in rules_config:
rules_config[rule_name] = dict(
_optional_mapping(rules_config[rule_name], f"rules.{rule_name}")
)

rare_event_repeat = rules_config.get("rare_event_repeat")
if isinstance(rare_event_repeat, dict) and "event_types" in rare_event_repeat:
rare_event_repeat["event_types"] = _string_sequence(
rare_event_repeat["event_types"],
"rules.rare_event_repeat.event_types",
)

return rules_config


def _optional_mapping(value: Any, field_name: str) -> Mapping[str, Any]:
if not isinstance(value, Mapping):
raise ValueError(f"Config field '{field_name}' must be a mapping.")
return value


def _path_config_value(value: Any, field_name: str) -> str:
if not isinstance(value, str) or not value.strip():
raise ValueError(f"Config field '{field_name}' must be a non-empty path string.")
return value.strip()


def _string_config_value(value: Any, field_name: str) -> str:
if not isinstance(value, str) or not value.strip():
raise ValueError(f"Config field '{field_name}' must be a non-empty string.")
return value.strip()


def _int_config_value(value: Any, field_name: str, *, minimum: int) -> int:
if isinstance(value, bool):
raise ValueError(f"Config field '{field_name}' must be an integer.")
if isinstance(value, int):
parsed = value
elif isinstance(value, str) and value.strip().lstrip("+-").isdigit():
parsed = int(value)
else:
raise ValueError(f"Config field '{field_name}' must be an integer.")

if parsed < minimum:
qualifier = "positive" if minimum == 1 else f"at least {minimum}"
raise ValueError(f"Config field '{field_name}' must be {qualifier}.")
return parsed


def _optional_string_sequence(value: Any, field_name: str) -> list[str] | None:
if value is None:
return None
return _string_sequence(value, field_name)


def _string_sequence(value: Any, field_name: str) -> list[str]:
if isinstance(value, str) or not isinstance(value, Sequence):
raise ValueError(
f"Config field '{field_name}' must be a list of non-empty strings."
)

normalized: list[str] = []
for item in value:
if not isinstance(item, str) or not item.strip():
raise ValueError(
f"Config field '{field_name}' must be a list of non-empty strings."
)
normalized.append(item.strip())
return normalized


def _build_run_summary(
input_path: Path,
output_dir: Path,
normalized: Any,
Expand Down
91 changes: 91 additions & 0 deletions tests/test_run_config_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
from __future__ import annotations

from argparse import Namespace
from pathlib import Path
from typing import Any

import pytest
import yaml

from telemetry_window_demo.cli import run_command
from telemetry_window_demo.io import load_config


def _base_config(tmp_path: Path) -> dict[str, Any]:
repo_root = Path(__file__).resolve().parents[1]
config = load_config(repo_root / "configs" / "default.yaml")
config["input_path"] = str((repo_root / "data" / "raw" / "sample_events.jsonl").resolve())
config["output_dir"] = str((tmp_path / "processed").resolve())
return config


def _write_config(tmp_path: Path, config: dict[str, Any]) -> Path:
config_path = tmp_path / "invalid.yaml"
config_path.write_text(
yaml.safe_dump(config, sort_keys=False),
encoding="utf-8",
)
return config_path


def test_run_config_requires_input_path(tmp_path) -> None:
config = _base_config(tmp_path)
del config["input_path"]
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="input_path"):
run_command(Namespace(config=str(config_path)))


def test_run_config_rejects_boolean_window_size(tmp_path) -> None:
config = _base_config(tmp_path)
config["time"]["window_size_seconds"] = True
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="time.window_size_seconds"):
run_command(Namespace(config=str(config_path)))


def test_run_config_rejects_non_positive_step_size(tmp_path) -> None:
config = _base_config(tmp_path)
config["time"]["step_size_seconds"] = 0
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="time.step_size_seconds"):
run_command(Namespace(config=str(config_path)))


def test_run_config_rejects_string_feature_list(tmp_path) -> None:
config = _base_config(tmp_path)
config["features"]["count_event_types"] = "login_fail"
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="features.count_event_types"):
run_command(Namespace(config=str(config_path)))


def test_run_config_rejects_boolean_cooldown(tmp_path) -> None:
config = _base_config(tmp_path)
config["rules"]["cooldown_seconds"] = True
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="rules.cooldown_seconds"):
run_command(Namespace(config=str(config_path)))


def test_run_config_rejects_non_mapping_rule_section(tmp_path) -> None:
config = _base_config(tmp_path)
config["rules"]["high_error_rate"] = True
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="rules.high_error_rate"):
run_command(Namespace(config=str(config_path)))


def test_run_config_rejects_string_rare_event_types(tmp_path) -> None:
config = _base_config(tmp_path)
config["rules"]["rare_event_repeat"]["event_types"] = "malware_alert"
config_path = _write_config(tmp_path, config)

with pytest.raises(ValueError, match="rules.rare_event_repeat.event_types"):
run_command(Namespace(config=str(config_path)))
Loading