Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ With the bundled sample data, the default run currently produces:

- `41` normalized events
- `24` windows
- `12` alerts after applying a `60` second per-rule cooldown
- `12` alerts after applying a `60` second cooldown

The default config suppresses repeated alerts with the same `rule_name` until `60` seconds have elapsed since that rule's last emitted alert. Different rules can still alert on the same window.
The default config suppresses repeated alerts by cooldown key. The key is `rule_name` plus an entity scope when the rule input includes `entity`, `source`, `target`, or `host`; otherwise it falls back to `rule_name` alone. Different cooldown keys can still alert on the same window.

## Outputs

Expand Down
47 changes: 41 additions & 6 deletions src/telemetry_window_demo/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"window_end",
"message",
)
COOLDOWN_SCOPE_COLUMNS = ("entity", "source", "target", "host")


def apply_rules(
Expand Down Expand Up @@ -46,7 +47,7 @@ def apply_rules(
if not alerts:
return pd.DataFrame(columns=ALERT_COLUMNS)

alerts_frame = pd.DataFrame(alerts, columns=ALERT_COLUMNS)
alerts_frame = pd.DataFrame(alerts)
alerts_frame = alerts_frame.sort_values(["alert_time", "rule_name"]).reset_index(drop=True)
return _apply_alert_cooldown(alerts_frame, cooldown_seconds)

Expand All @@ -56,6 +57,7 @@ def _row_alert(
rule_name: str,
severity: str,
message: str,
cooldown_scope: str | None = None,
) -> dict[str, object]:
return {
"alert_time": row["window_end"],
Expand All @@ -64,33 +66,66 @@ def _row_alert(
"window_start": row["window_start"],
"window_end": row["window_end"],
"message": message,
"cooldown_scope": _resolve_cooldown_scope(row, cooldown_scope),
}


def _resolve_cooldown_scope(
row: pd.Series,
explicit_scope: str | None = None,
) -> str | None:
if explicit_scope is not None:
value = explicit_scope.strip()
if value:
return value

for column in COOLDOWN_SCOPE_COLUMNS:
if column not in row.index:
continue

value = row[column]
if pd.isna(value):
continue

value_text = str(value).strip()
if value_text:
return f"{column}={value_text}"

return None


def _apply_alert_cooldown(
alerts: pd.DataFrame,
cooldown_seconds: int,
) -> pd.DataFrame:
if alerts.empty or cooldown_seconds <= 0:
return alerts.reset_index(drop=True)
return alerts.loc[:, ALERT_COLUMNS].reset_index(drop=True)

last_kept_at: dict[str, pd.Timestamp] = {}
last_kept_at: dict[tuple[str, str | None], pd.Timestamp] = {}
kept_rows: list[int] = []

for index, row in alerts.iterrows():
rule_name = str(row["rule_name"])
alert_time = pd.Timestamp(row["alert_time"])
last_alert_time = last_kept_at.get(rule_name)
scope_value = row.get("cooldown_scope")
if pd.isna(scope_value):
scope = None
else:
scope_text = str(scope_value).strip()
scope = scope_text or None

cooldown_key = (rule_name, scope)
last_alert_time = last_kept_at.get(cooldown_key)

if last_alert_time is None:
kept_rows.append(index)
last_kept_at[rule_name] = alert_time
last_kept_at[cooldown_key] = alert_time
continue

elapsed = (alert_time - last_alert_time).total_seconds()
if elapsed >= cooldown_seconds:
kept_rows.append(index)
last_kept_at[rule_name] = alert_time
last_kept_at[cooldown_key] = alert_time

return alerts.loc[kept_rows, ALERT_COLUMNS].reset_index(drop=True)

Expand Down
65 changes: 65 additions & 0 deletions tests/test_rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,71 @@ def test_apply_rules_suppresses_repeated_same_rule_within_cooldown() -> None:
]


def test_apply_rules_scopes_same_rule_cooldown_by_source_when_present() -> None:
features = pd.DataFrame(
[
{
"window_start": pd.Timestamp("2026-03-10T10:00:00Z"),
"window_end": pd.Timestamp("2026-03-10T10:01:00Z"),
"source": "host_a",
"event_count": 10,
"error_count": 4,
"error_rate": 0.40,
"unique_sources": 4,
"unique_targets": 2,
"high_severity_count": 0,
"login_fail_count": 0,
"malware_alert_count": 0,
},
{
"window_start": pd.Timestamp("2026-03-10T10:00:10Z"),
"window_end": pd.Timestamp("2026-03-10T10:01:10Z"),
"source": "host_b",
"event_count": 11,
"error_count": 5,
"error_rate": 0.45,
"unique_sources": 5,
"unique_targets": 2,
"high_severity_count": 0,
"login_fail_count": 0,
"malware_alert_count": 0,
},
{
"window_start": pd.Timestamp("2026-03-10T10:00:20Z"),
"window_end": pd.Timestamp("2026-03-10T10:01:20Z"),
"source": "host_a",
"event_count": 12,
"error_count": 6,
"error_rate": 0.50,
"unique_sources": 6,
"unique_targets": 2,
"high_severity_count": 0,
"login_fail_count": 0,
"malware_alert_count": 0,
},
]
)

alerts = apply_rules(
features,
{
"cooldown_seconds": 60,
"high_error_rate": {"threshold": 0.30, "severity": "medium"},
"persistent_high_error": {
"threshold": 1.0,
"consecutive_windows": 10,
"severity": "medium",
},
},
)

assert list(alerts["rule_name"]) == ["high_error_rate", "high_error_rate"]
assert list(alerts["alert_time"]) == [
pd.Timestamp("2026-03-10T10:01:00Z"),
pd.Timestamp("2026-03-10T10:01:10Z"),
]


def test_apply_rules_keeps_different_rules_during_same_cooldown_window() -> None:
features = pd.DataFrame(
[
Expand Down
Loading