From 9201c271436e7f195bf66d735c58f9f7b3aa5a97 Mon Sep 17 00:00:00 2001 From: Steve NCA Date: Mon, 1 Jun 2026 05:23:55 +0000 Subject: [PATCH 1/2] =?UTF-8?q?feat(reflex):=200.8.0-dev2=20=E2=80=94=20re?= =?UTF-8?q?gistry=20+=20runner=20+=203=20first-party=20handlers?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Second sub-step of the brain refactor. Stands up the REFLEX LAYER — fast deterministic responders that subscribe to the thalamus and produce ReflexOutcome records. Nothing publishes to the bus yet (that lands in 0.8.0-dev3), so the handlers idle. The plumbing they idle on is fully tested against InMemoryEventBus, which (per 0.8.0-dev1's contract suite) behaves identically to the production NatsEventBus — so when the first publisher lands the path lights up end-to-end. PUBLIC SURFACE — netcortex/reflex/ * protocol.py — ReflexHandler Protocol + frozen ReflexOutcome dataclass + Severity / OutcomeKind literal types. Severity is a four-bucket scale (info | warn | high | critical) so downstream alerting can pattern-match without parsing free-form strings. * registry.py — process-wide register_handler / get_handler / all_handlers / clear_registry. Idempotent re-registration of the same instance; duplicate-id collisions raise DuplicateHandlerError (handler ids appear on every persisted outcome — silent shadowing would be an operator footgun). * runner.py — ReflexRunner wires the registry to one bus, spawns one asyncio task per handler, isolates per-handler exceptions (raising handler → "errored" outcome with truncated traceback in diagnostic, dispatcher continues). Idempotent start/stop; ready_event for tests that need cross-handler ordering. FIRST-PARTY HANDLERS (all currently idle) * link_down — sensory.snmp.trap.link_down.> → high severity. Caps upstream key echo at 16 to bound outcome size. * security_webhook — sensory.meraki.webhook.security.> → severity derived from the Meraki payload via a coarse map (informational/warning/high/critical → info/warn/high/critical), defaults to warn for unknown values. * bgp_drop — sensory.snmp.trap.bgp_backward_transition.> → high severity. Target composed as "device|peer" when both are known, falls back to whichever is present. Each handler is intentionally minimal — captures the event, extracts a target, returns a "logged" outcome. The richer behavior (semantic memory lookup, maintenance-window check, dedup, NetBox journal mirror) lands in later sub-steps once the first publisher exists to drive it. TESTS — tests/reflex/ * test_registry.py: 7 cases — register/lookup, insertion ordering, duplicate rejection, idempotent re-register, type rejection, missing-key, clear. Save/restore fixture so cleared state does not leak to sibling test files. * test_runner.py: 8 cases against InMemoryEventBus — dispatch matching events, pattern-filter non-matching, fan-out to multiple handlers, exception isolation, None outcome not recorded, idempotent start/stop, stop-without-start safety, registry enumeration default-arg path. * test_handlers.py: 14 cases pinning the operator-facing surface (handler id + subscription pattern) and exercising each handler's outcome-shape contract + target-extraction fallbacks. End-to-end smoke against InMemoryEventBus: 3 publishes → 3 outcomes routed to the right handlers with correct severity and target extraction. NatsEventBus path will work identically (contract suite proves both backends satisfy the same Protocol). NOT YET WIRED * Still no publishers. Pollers continue to call correlator + writeback directly. First dual-write publisher lands in 0.8.0-dev3. * Outcomes are logged only — Neo4j :ReflexEvent persistence + NetBox journal mirror land in 0.8.0-dev3 once the writer Protocols have a consumer to justify them. Co-authored-by: Cursor --- CHANGELOG.md | 66 +++++ netcortex/__init__.py | 2 +- netcortex/reflex/__init__.py | 38 +++ netcortex/reflex/handlers/__init__.py | 26 ++ netcortex/reflex/handlers/bgp_drop.py | 97 +++++++ netcortex/reflex/handlers/link_down.py | 83 ++++++ netcortex/reflex/handlers/security_webhook.py | 88 +++++++ netcortex/reflex/protocol.py | 145 +++++++++++ netcortex/reflex/registry.py | 110 ++++++++ netcortex/reflex/runner.py | 211 +++++++++++++++ pyproject.toml | 2 +- tests/reflex/__init__.py | 0 tests/reflex/test_handlers.py | 231 +++++++++++++++++ tests/reflex/test_registry.py | 123 +++++++++ tests/reflex/test_runner.py | 241 ++++++++++++++++++ 15 files changed, 1461 insertions(+), 2 deletions(-) create mode 100644 netcortex/reflex/__init__.py create mode 100644 netcortex/reflex/handlers/__init__.py create mode 100644 netcortex/reflex/handlers/bgp_drop.py create mode 100644 netcortex/reflex/handlers/link_down.py create mode 100644 netcortex/reflex/handlers/security_webhook.py create mode 100644 netcortex/reflex/protocol.py create mode 100644 netcortex/reflex/registry.py create mode 100644 netcortex/reflex/runner.py create mode 100644 tests/reflex/__init__.py create mode 100644 tests/reflex/test_handlers.py create mode 100644 tests/reflex/test_registry.py create mode 100644 tests/reflex/test_runner.py diff --git a/CHANGELOG.md b/CHANGELOG.md index e9a96b7..7fdbd3d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,72 @@ and this file MUST be updated together whenever `__version__` changes. --- +## [0.8.0-dev2] — 2026-06-01 + +### Added — Reflex skeleton: registry + runner + 3 first-party handlers + +Second sub-step of the brain refactor. Stands up the **reflex layer** — +fast deterministic responders that subscribe to the thalamus and produce +:class:`ReflexOutcome` records. Nothing publishes to the bus yet (that +lands in `0.8.0-dev3`), so the handlers idle. The plumbing they idle on +is fully tested against `InMemoryEventBus`, which (per `0.8.0-dev1`'s +contract suite) behaves identically to the production `NatsEventBus` — +so when the first publisher lands the path lights up end-to-end. + +#### Public surface + +- `netcortex/reflex/` + - `protocol.py` — `ReflexHandler` Protocol + frozen `ReflexOutcome` + dataclass + `Severity` / `OutcomeKind` literal types. Severity is a + four-bucket scale (`info | warn | high | critical`) so downstream + alerting can pattern-match without parsing free-form strings. + - `registry.py` — process-wide `register_handler()` / `get_handler()` + / `all_handlers()` / `clear_registry()`. Idempotent re-registration + of the same instance; duplicate-id collisions raise + `DuplicateHandlerError` (handler ids appear on every persisted + outcome — silent shadowing would be an operator footgun). + - `runner.py` — `ReflexRunner` wires the registry to one bus, spawns + one asyncio task per handler, isolates per-handler exceptions + (raising handler → `errored` outcome with truncated traceback in + `diagnostic`, dispatcher continues). Idempotent `start()` / `stop()`; + `ready_event` for tests that need cross-handler ordering. + +#### First-party handlers (idle until 0.8.0-dev3) + +| Handler id | Subject pattern | Severity | Notes | +|---|---|---|---| +| `link_down` | `sensory.snmp.trap.link_down.>` | `high` | Caps upstream key echo at 16 to bound outcome size | +| `security_webhook` | `sensory.meraki.webhook.security.>` | from payload | Coarse Meraki→NetCortex severity map (`informational`/`warning`/`high`/`critical`) | +| `bgp_drop` | `sensory.snmp.trap.bgp_backward_transition.>` | `high` | Target composed as `device|peer` when both are known, falls back gracefully | + +Each handler is intentionally minimal — it captures the event, extracts +a target, returns a `logged` outcome. The richer behavior (semantic +memory lookup, maintenance-window check, dedup, NetBox journal mirror) +lands in later sub-steps once the first publisher exists to drive it. + +#### Tests + +- `tests/reflex/test_registry.py` — 7 cases: register/lookup, insertion + ordering, duplicate rejection, idempotent re-register, type rejection, + missing-key, clear. Uses a save/restore fixture so it does not leak the + cleared state to sibling test files. +- `tests/reflex/test_runner.py` — 8 cases against `InMemoryEventBus`: + dispatch matching events, pattern-filter non-matching, fan-out to + multiple handlers, exception isolation (`errored` outcome continues + dispatcher), `None` outcome not recorded, idempotent start/stop, + stop-without-start safety, registry enumeration default-arg path. +- `tests/reflex/test_handlers.py` — 14 cases pinning the operator-facing + surface (handler id + subscription pattern) and exercising each + handler's outcome-shape contract + target-extraction fallbacks. + +### Not yet wired + +- Still no publishers. Pollers continue to call correlator + writeback + directly. The first dual-write publisher lands in `0.8.0-dev3`. +- Outcomes are logged only — Neo4j `:ReflexEvent` persistence + NetBox + journal mirror land in `0.8.0-dev3` once the writer Protocols have a + consumer to justify them. + ## [0.8.0-dev1] — 2026-06-01 ### Added — Thalamus: NATS-backed event bus lands diff --git a/netcortex/__init__.py b/netcortex/__init__.py index ff3100d..6dc8033 100644 --- a/netcortex/__init__.py +++ b/netcortex/__init__.py @@ -22,4 +22,4 @@ ``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes. """ -__version__ = "0.8.0-dev1" +__version__ = "0.8.0-dev2" diff --git a/netcortex/reflex/__init__.py b/netcortex/reflex/__init__.py new file mode 100644 index 0000000..c443a5b --- /dev/null +++ b/netcortex/reflex/__init__.py @@ -0,0 +1,38 @@ +"""Reflex — fast deterministic responders on the event bus. + +Reflex handlers subscribe to narrow NATS subject patterns, run +deterministic logic in milliseconds, and produce :class:`ReflexOutcome` +records the deliberative loop later consolidates. + +Public surface: + +* :class:`ReflexHandler` — Protocol every handler obeys. +* :class:`ReflexOutcome` — frozen record of what a handler decided. +* :func:`register_handler` — registration entry point for handler modules. +* :class:`ReflexRunner` — wires the registered handler set to a bus. +* :mod:`netcortex.reflex.handlers` — importing this submodule registers + the first-party handlers (``link_down``, ``security_webhook``, + ``bgp_drop``). + +See ``docs/architecture/brain.md`` for the role of reflex in the +brain-mapped architecture. +""" + +from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.registry import ( + DuplicateHandlerError, + all_handlers, + get_handler, + register_handler, +) +from netcortex.reflex.runner import ReflexRunner + +__all__ = [ + "DuplicateHandlerError", + "ReflexHandler", + "ReflexOutcome", + "ReflexRunner", + "all_handlers", + "get_handler", + "register_handler", +] diff --git a/netcortex/reflex/handlers/__init__.py b/netcortex/reflex/handlers/__init__.py new file mode 100644 index 0000000..83ecadb --- /dev/null +++ b/netcortex/reflex/handlers/__init__.py @@ -0,0 +1,26 @@ +"""First-party reflex handlers. + +Importing this package registers every handler below with the global +registry in :mod:`netcortex.reflex.registry`. That side-effect is the +point — the runner enumerates the registry, so handlers only need to be +*imported* (not explicitly enumerated) for the runner to find them. + +To add a new first-party handler: + +1. Create ``netcortex/reflex/handlers/.py`` that calls + :func:`netcortex.reflex.registry.register_handler` at module scope. +2. Add a line to this file importing the new module. +3. Cover the subject pattern with a test in + ``tests/reflex/test_handlers.py``. + +That is the entire surface — no entry-point discovery, no dynamic +loading. By design (see ``docs/architecture/brain.md`` on plasticity). +""" + +from __future__ import annotations + +# Import-for-side-effect — each module registers itself on import. +# Keep these imports alphabetical so a diff reviewer can spot additions. +from netcortex.reflex.handlers import bgp_drop # noqa: F401 +from netcortex.reflex.handlers import link_down # noqa: F401 +from netcortex.reflex.handlers import security_webhook # noqa: F401 diff --git a/netcortex/reflex/handlers/bgp_drop.py b/netcortex/reflex/handlers/bgp_drop.py new file mode 100644 index 0000000..1cafee2 --- /dev/null +++ b/netcortex/reflex/handlers/bgp_drop.py @@ -0,0 +1,97 @@ +"""``bgp_drop`` — reflex handler for BGP session state-down signals. + +Subscribes to the BGP4-MIB ``bgpBackwardTransition`` trap (and, once the +streaming telemetry adapter lands, also ``sensory.cisco.mdt.bgp.>`` +neighbor-down samples). The fast deterministic response is to record a +session-down outcome so the deliberative loop (route convergence +analysis, prefix advertisement drift) has the wall-clock anchor. + +This module is dev2 scaffolding. When publishers land in 0.8.0-dev3+ the +handler will additionally: + +* resolve the peer IP against semantic memory's ``:BgpSession`` nodes so + the outcome carries the canonical session identifier, not just the + peer address; +* check whether the device is in a maintenance window OR the peer is a + known-flapping route-server (operator policy); +* attach a NetBox journal entry to the BGP session object (once the + reconciliation engine starts surfacing those — they are not first- + class in NetBox today, so the journal will live on the device); +* trigger a deliberative follow-up to assess prefix-advertisement + impact, comparing the last-known advertised prefix set on this + session against the post-drop topology snapshot. + +None of that is in dev2. The current handler logs and returns a +``high``-severity outcome; downstream consumers can already key off it. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Final + +from netcortex.contracts.event_bus import EventMessage +from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.registry import register_handler + +# Subject pattern. +# +# Real publishers will use +# ``sensory.snmp.trap.bgp_backward_transition.`` or +# ``sensory.cisco.mdt.bgp_neighbor_state.``. For dev2 the +# handler subscribes to the SNMP trap subject only; the second +# subscription (or a glob) lands once the telemetry adapter exists. +_PATTERN: Final[str] = "sensory.snmp.trap.bgp_backward_transition.>" + + +class BgpDropHandler: + """Reflex for BGP session backward-transition (down) events.""" + + id: Final[str] = "bgp_drop" + pattern: Final[str] = _PATTERN + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + payload = event.payload + device = ( + payload.get("device_id") + or payload.get("device") + or payload.get("target") + ) + peer = payload.get("peer") or payload.get("peer_ip") + peer_asn = payload.get("peer_asn") or payload.get("remote_as") + last_state = payload.get("last_state") or payload.get("previous_state") + # Compose a target identifier that survives whether or not the peer + # IP is known — preferring the canonical session "device|peer" key + # when both are available, falling back to whichever is present. + if device and peer: + target = f"{device}|{peer}" + elif peer: + target = str(peer) + elif device: + target = str(device) + else: + target = None + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=target, + # BGP session loss is high-severity by default. Operators can + # tune later via the policy library once it exists; we do not + # second-guess severity inside the handler. + severity="high", + occurred_at=datetime.now(tz=timezone.utc), + payload={ + "device": device, + "peer": peer, + "peer_asn": peer_asn, + "last_state": last_state, + }, + outcome="logged", + rationale=( + f"BGP session {target!r} backward-transition observed; " + f"last_state={last_state!r} — dev2 idle handler" + ), + ) + + +_HANDLER = register_handler(BgpDropHandler()) diff --git a/netcortex/reflex/handlers/link_down.py b/netcortex/reflex/handlers/link_down.py new file mode 100644 index 0000000..c562a49 --- /dev/null +++ b/netcortex/reflex/handlers/link_down.py @@ -0,0 +1,83 @@ +"""``link_down`` — reflex handler for interface-down signals. + +Subscribes to the SNMP linkDown trap subject. In the brain-mapped +architecture this is the fast deterministic response to an interface +going hard-down: log it now, let the deliberative loop (prefrontal, +0.11.0) decide whether to open a ticket, page someone, or just wait +for the symmetric linkUp. + +This module is dev2 scaffolding — the handler is registered and the +runner will subscribe it to the bus, but no publisher exists yet. The +first real linkDown publish lands in 0.8.0-dev3 when the SNMP-trap +sensory adapter (``sensory/trap/snmp.py``) is wired in. + +When publishers exist, the handler will also: + +* fetch the affected (device, interface) from semantic memory and verify + it is not in a maintenance window; +* deduplicate against a Redis "recently seen" window so a flapping link + produces one outcome per minute, not one per trap; +* attach a NetBox journal entry on the Interface object so an operator + sees the trap immediately in the tool they live in; +* emit a follow-up ``reflex.link_down.applied`` event so consolidation + knows a synthetic interface-state transition has been recorded. + +None of that is in dev2. The current implementation captures the trap, +extracts the target, and returns a ``logged`` outcome. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Final + +from netcortex.contracts.event_bus import EventMessage +from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.registry import register_handler + +# Subject pattern. +# +# Real publishers in 0.8.0-dev3+ will use +# ``sensory.snmp.trap.link_down.`` and emit one event per +# (device, interface) transition. The trailing ``>`` matches any number +# of further tokens so the handler can be subscribed today and the +# publisher's exact subject layout can evolve without redeploying the +# handler. +_PATTERN: Final[str] = "sensory.snmp.trap.link_down.>" + + +class LinkDownHandler: + """Reflex for IF-MIB linkDown traps.""" + + id: Final[str] = "link_down" + pattern: Final[str] = _PATTERN + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + payload = event.payload + target = ( + payload.get("device_id") + or payload.get("device") + or payload.get("target") + ) + interface = payload.get("interface") or payload.get("if_name") + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=str(target) if target else None, + severity="high", + occurred_at=datetime.now(tz=timezone.utc), + payload={ + "interface": interface, + # Cap the upstream payload echo so a chatty publisher + # cannot blow up the outcome record. + "upstream_keys": sorted(payload.keys())[:16], + }, + outcome="logged", + rationale=( + f"linkDown observed on {target!r} interface {interface!r}; " + "dev2 idle handler — no remediation yet" + ), + ) + + +_HANDLER = register_handler(LinkDownHandler()) diff --git a/netcortex/reflex/handlers/security_webhook.py b/netcortex/reflex/handlers/security_webhook.py new file mode 100644 index 0000000..c9a40d7 --- /dev/null +++ b/netcortex/reflex/handlers/security_webhook.py @@ -0,0 +1,88 @@ +"""``security_webhook`` — reflex handler for Meraki security webhooks. + +Subscribes to security-class Meraki Dashboard webhooks (IDS alerts, +malware, anomalous traffic, blocked URL hit, etc.) once the webhook +receiver in ``sensory/webhook/meraki.py`` lands (0.8.x patch). Dev2 +ships the handler skeleton so the pattern is reserved and the runner's +subscription map is complete from day one. + +When publishers exist, this handler will: + +* dedupe via Meraki's ``alertId`` (a Redis "seen recently" set with a + short TTL — Meraki retries the same alert on delivery failure); +* check whether the affected client is known to semantic memory; if not, + promote it to the working-memory "unknown clients" set so the + operator UI can surface unrecognized endpoints; +* compute a severity from the ``occurredAt`` + ``eventType`` cross + product using a policy in ``policy/security_severity.py`` (so the + threshold is operator-tunable, not hardcoded); +* attach a NetBox journal entry on the affected IPAddress/Interface + when it can be resolved. + +None of that is in dev2. The current implementation logs the inbound +event and returns an outcome whose severity comes verbatim from the +Meraki payload's ``severity`` field when present, defaulting to +``warn`` (Meraki's own scale runs informational/warning/critical; we +collapse to our four-bucket scale in the policy module later). +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Final + +from netcortex.contracts.event_bus import EventMessage +from netcortex.reflex.protocol import ReflexOutcome, Severity +from netcortex.reflex.registry import register_handler + +_PATTERN: Final[str] = "sensory.meraki.webhook.security.>" + +# Coarse mapping for dev2. Real severity policy lives in +# policy/security_severity.py once the policy library lands (0.8.x). +_MERAKI_SEVERITY_MAP: Final[dict[str, Severity]] = { + "informational": "info", + "info": "info", + "warning": "warn", + "warn": "warn", + "high": "high", + "critical": "critical", +} + + +class SecurityWebhookHandler: + """Reflex for Meraki security-class webhook events.""" + + id: Final[str] = "security_webhook" + pattern: Final[str] = _PATTERN + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + payload = event.payload + upstream_sev = str(payload.get("severity") or "").lower() + severity: Severity = _MERAKI_SEVERITY_MAP.get(upstream_sev, "warn") + target = ( + payload.get("clientMac") + or payload.get("deviceSerial") + or payload.get("networkId") + or payload.get("target") + ) + event_type = payload.get("alertType") or payload.get("eventType") + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=str(target) if target else None, + severity=severity, + occurred_at=datetime.now(tz=timezone.utc), + payload={ + "alert_id": payload.get("alertId"), + "event_type": event_type, + "network_id": payload.get("networkId"), + }, + outcome="logged", + rationale=( + f"meraki security webhook {event_type!r} on {target!r}; " + "dev2 idle handler — dedupe + classification pending" + ), + ) + + +_HANDLER = register_handler(SecurityWebhookHandler()) diff --git a/netcortex/reflex/protocol.py b/netcortex/reflex/protocol.py new file mode 100644 index 0000000..77ef3f8 --- /dev/null +++ b/netcortex/reflex/protocol.py @@ -0,0 +1,145 @@ +"""``ReflexHandler`` Protocol — the typed contract every reflex handler obeys. + +Reflex handlers are the **fast path** through the brain. They subscribe to a +narrow NATS subject pattern, run deterministic logic in milliseconds, and +produce a :class:`ReflexOutcome` that downstream consumers (semantic memory +in 0.8.0-dev3+, episodic memory in 0.9.0, NetBox journal mirror) persist. + +A handler must: + +* declare a stable ``id`` (used as the registry key and as the + ``handler`` field on the resulting ``ReflexOutcome``); +* declare exactly one NATS subject pattern it subscribes to (multiple + patterns are intentionally not supported in 0.8.0-dev2 — register two + handlers if you need two patterns; we'll revisit when a real use case + demands it); +* implement ``handle(event)`` synchronously OR as an async coroutine that + returns a :class:`ReflexOutcome` or ``None``. + +Handlers MUST NOT block on slow I/O. Anything that needs to wait on a +remote system (LLM call, NetBox writeback, large Cypher query) belongs in +the prefrontal/conductor path (0.11.0), not in reflex. + +See ``docs/architecture/brain.md`` for the role of reflex in the +brain-mapped architecture. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Literal, Protocol, runtime_checkable + +from netcortex.contracts.event_bus import EventMessage + +Severity = Literal["info", "warn", "high", "critical"] +"""Severity values understood by the persistence layer. + +Kept deliberately small (four buckets) so downstream alerting/escalation +rules can pattern-match without parsing free-form strings. Add new values +here, not in handler source. +""" + +OutcomeKind = Literal["logged", "applied", "skipped", "errored"] +"""What the handler actually did with the event. + +* ``logged`` — handler observed the event and recorded a :class:`ReflexOutcome` + but took no corrective action. The default for the dev2 idle handlers. +* ``applied`` — handler took a corrective action (e.g., suppressed a noisy + alert, opened a NetBox journal entry, kicked a remediation task). +* ``skipped`` — handler decided this event was not actionable (e.g., target + not in semantic memory, or in a maintenance window). +* ``errored`` — handler raised. The runner converts the exception to this + outcome rather than crashing the dispatcher. +""" + + +@dataclass(frozen=True) +class ReflexOutcome: + """Result of one reflex firing. + + In 0.8.0-dev2 these are only logged. In 0.8.0-dev3+ the runner + persists them as ``:ReflexEvent`` nodes in semantic memory (Neo4j), + with an ``:AFFECTS`` edge to the target entity if it is known to the + graph. When the affected entity is a Device/Interface/IPAddress that + NetBox knows about, a NetBox journal entry mirrors the outcome so + operators see it without leaving their tool of choice. + + The dataclass is frozen so an outcome cannot mutate between the handler + returning it and the runner persisting it — that prevents an entire + class of "the value I saw at log time differs from what I wrote to + the graph" bugs that have historically bitten correlator code. + """ + + handler: str + """The handler id that produced this outcome. Stable across releases.""" + + subject: str + """The NATS subject the event was published on.""" + + target: str | None + """Identifier of the affected entity (``platform_id``, NetBox device + name, BGP peer IP, etc.). ``None`` if the event has no clear target. + Used to attach an ``:AFFECTS`` edge in semantic memory.""" + + severity: Severity + occurred_at: datetime + payload: dict[str, Any] + """Verbatim subset of the originating event payload that the handler + deemed relevant. Kept small (handlers should not echo entire payloads) + so downstream storage doesn't bloat.""" + + outcome: OutcomeKind = "logged" + rationale: str = "" + """Free-form short explanation, intended for the operator UI. Keep + under 200 characters; long explanations belong in the rationale-text + field of the eventual :class:`netcortex.contracts.policy.Decision`.""" + + diagnostic: dict[str, Any] = field(default_factory=dict) + """Optional bag of fields the handler wants attached to the outcome + for debugging (e.g., the matched threshold value, the policy version + used). Not surfaced in the operator UI by default.""" + + +@runtime_checkable +class ReflexHandler(Protocol): + """Minimum surface every reflex handler must expose. + + Concrete handlers register via + :func:`netcortex.reflex.registry.register_handler`. The registry is the + only place these are enumerated; the runner discovers them from there. + """ + + @property + def id(self) -> str: + """Stable handler identifier (``"link_down"``, ``"bgp_drop"``, …). + + Used as the registry key AND as the ``handler`` field on every + resulting :class:`ReflexOutcome`. MUST be unique across the + loaded handler set; the registry rejects duplicates. + """ + ... + + @property + def pattern(self) -> str: + """NATS subject pattern this handler subscribes to. + + Validated by the runner against the same grammar + :class:`netcortex.thalamus.NatsEventBus` enforces. One pattern per + handler in 0.8.0-dev2. + """ + ... + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + """Process one event. + + Returning :class:`ReflexOutcome` instructs the runner to log / + persist it. Returning ``None`` means the handler observed the + event but consciously chose not to surface an outcome (e.g., the + event didn't actually represent the condition the handler cares + about despite matching the pattern). Returning ``None`` is NOT a + way to silently skip an error — raise instead, and the runner + will convert it to an ``errored`` outcome with the traceback in + ``diagnostic``. + """ + ... diff --git a/netcortex/reflex/registry.py b/netcortex/reflex/registry.py new file mode 100644 index 0000000..d60a07d --- /dev/null +++ b/netcortex/reflex/registry.py @@ -0,0 +1,110 @@ +"""Process-wide registry of loaded :class:`ReflexHandler` instances. + +Handlers register themselves at import time via +:func:`register_handler`. The :class:`netcortex.reflex.runner.ReflexRunner` +then enumerates the registry and wires each handler to the bus. + +Why a global registry and not dependency-injection +-------------------------------------------------- +The set of reflex handlers is **closed and small**: every handler is a +first-party module in this repository, signed by the same release. There +is no plugin loading, no late binding, no external code path. A +module-level dict is the simplest correct answer. + +When agent-proposed reflex handlers become a thing (post-1.0.0 plasticity +work), they will land here too — but only after the proposal-approval +cycle outlined in ``docs/architecture/brain.md``, which writes them to +the same first-party path with operator sign-off. +""" + +from __future__ import annotations + +import logging +from collections.abc import Iterator + +from netcortex.reflex.protocol import ReflexHandler + +_LOG = logging.getLogger(__name__) + +_REGISTRY: dict[str, ReflexHandler] = {} + + +class DuplicateHandlerError(ValueError): + """Raised when two handlers attempt to register the same id.""" + + +def register_handler(handler: ReflexHandler) -> ReflexHandler: + """Register a handler so the runner picks it up. + + Intended to be used at module scope:: + + _HANDLER = register_handler(LinkDownHandler()) + + Returning the handler lets the caller bind it to a module-level name + without repeating the construction. Re-registering the same id raises + :class:`DuplicateHandlerError` — handler ids are part of the public + operator-facing surface (they appear on every persisted outcome) and + silently shadowing them would be a footgun. + + The handler must structurally satisfy the :class:`ReflexHandler` + Protocol — checked at registration time so import-time failures point + at the offending file, not at the runner. + """ + if not isinstance(handler, ReflexHandler): + # ``runtime_checkable`` Protocol — verifies attribute presence, + # not call signatures. That is good enough to catch the common + # mistake of forgetting ``handle()`` on a new class. + raise TypeError( + f"object {handler!r} does not satisfy the ReflexHandler Protocol " + "(must expose 'id', 'pattern', and 'handle')" + ) + hid = handler.id + if hid in _REGISTRY: + existing = _REGISTRY[hid] + if existing is handler: + # Idempotent re-registration is harmless — common when handler + # modules are imported twice in the test suite. + return handler + raise DuplicateHandlerError( + f"reflex handler id {hid!r} already registered " + f"by {type(existing).__name__}; would be shadowed by " + f"{type(handler).__name__}" + ) + _REGISTRY[hid] = handler + _LOG.debug("reflex.registry.registered id=%s pattern=%s", hid, handler.pattern) + return handler + + +def get_handler(handler_id: str) -> ReflexHandler: + """Look up a registered handler by id. Raises :class:`KeyError` if absent.""" + return _REGISTRY[handler_id] + + +def all_handlers() -> Iterator[ReflexHandler]: + """Iterate the registered handlers in registration order. + + Order is the dict insertion order, which is registration order under + CPython 3.7+. Stable enough for the runner to assign deterministic + subscription indices in logs. + """ + return iter(_REGISTRY.values()) + + +def clear_registry() -> None: + """Drop every registered handler. + + Test-only helper. Production code never calls this — handlers are + expected to live for the lifetime of the process. The runner calls + :func:`all_handlers` at startup; mutating the registry after the + runner starts is undefined. + """ + _REGISTRY.clear() + + +__all__ = [ + "DuplicateHandlerError", + "all_handlers", + "clear_registry", + "get_handler", + "register_handler", +] diff --git a/netcortex/reflex/runner.py b/netcortex/reflex/runner.py new file mode 100644 index 0000000..e36d67d --- /dev/null +++ b/netcortex/reflex/runner.py @@ -0,0 +1,211 @@ +"""Reflex runner — wires every registered handler to the event bus. + +One :class:`ReflexRunner` instance owns one :class:`EventBus` connection +and one asyncio task per registered handler. Each task is a long-running +loop that pulls events from its handler's subscription and dispatches +them to the handler. + +Failure isolation +----------------- +A bug in one handler MUST NOT take down the others, and a bug in one +event MUST NOT take down its handler's loop. The dispatch wrapper: + +1. catches every exception the handler raises; +2. turns the exception into an :func:`ReflexOutcome` with + ``outcome="errored"`` so the operator UI sees what failed and why; +3. logs the traceback at WARNING (not ERROR — handlers fail often during + sensory-modality onboarding and a flood of ERRORs would mask real + incidents); +4. continues with the next event. + +Lifecycle +--------- +:meth:`start` is idempotent: a second call returns immediately. :meth:`stop` +cancels every per-handler task, drains the bus, and waits for the tasks to +unwind. Callers that need to know the runner is fully up before publishing +test events can ``await runner.ready_event.wait()``. + +In dev2 the outcomes are only logged — the Neo4j ``:ReflexEvent`` persistence +path lands with the first real publisher in 0.8.0-dev3. Until then the +runner is fully functional but every outcome surfaces as a structured log +line, which is enough to integration-test the wiring end-to-end against +``InMemoryEventBus``. +""" + +from __future__ import annotations + +import asyncio +import logging +import traceback +from datetime import datetime, timezone + +from netcortex.contracts.event_bus import EventBus, EventMessage +from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.registry import all_handlers + +_LOG = logging.getLogger(__name__) + + +class ReflexRunner: + """Drives the registered handler set against one :class:`EventBus`. + + Parameters + ---------- + bus: + The bus to subscribe against. The runner does NOT own the bus — + the caller is responsible for ``bus.close()`` after the runner + has stopped. This split keeps the runner reusable across short + test buses and the long-lived in-cluster :class:`NatsEventBus`. + handlers: + Optional explicit handler list. If omitted, the runner enumerates + the registry (the common case). Tests use the explicit form to + isolate runs from the global registry. + """ + + def __init__( + self, + bus: EventBus, + *, + handlers: list[ReflexHandler] | None = None, + ) -> None: + self._bus = bus + self._handlers: list[ReflexHandler] = ( + list(handlers) if handlers is not None else list(all_handlers()) + ) + self._tasks: list[asyncio.Task[None]] = [] + self._started = False + self._stopping = False + # Set after every handler's subscription task has been spawned; + # tests use this to avoid the "publish before subscribe" race. + self.ready_event = asyncio.Event() + # The runner records every outcome it dispatches. In dev2 this is + # the only persistence path; dev3+ replaces this with a Neo4j + # write that still keeps the in-memory copy for the operator + # status endpoint. + self.outcomes: list[ReflexOutcome] = [] + + @property + def handlers(self) -> list[ReflexHandler]: + """Snapshot of handlers this runner is driving (read-only).""" + return list(self._handlers) + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + async def start(self) -> None: + """Spawn one subscription task per handler. + + Idempotent: a second call is a no-op. Returns when every handler + has its subscription task scheduled; that does not necessarily + mean every subscription has been acknowledged by the bus. + Use ``await runner.ready_event.wait()`` plus a small grace period + if you need cross-handler ordering guarantees. + """ + if self._started: + return + self._started = True + for handler in self._handlers: + task = asyncio.create_task( + self._consume(handler), + name=f"reflex:{handler.id}", + ) + self._tasks.append(task) + _LOG.info( + "reflex.runner.handler_started id=%s pattern=%s", + handler.id, + handler.pattern, + ) + self.ready_event.set() + + async def stop(self, *, timeout: float = 5.0) -> None: + """Cancel every handler task and wait for them to unwind. + + ``timeout`` bounds the wait so a buggy handler that swallows + ``CancelledError`` cannot wedge shutdown. Tasks still running + after the timeout are logged and abandoned. + """ + if not self._started or self._stopping: + return + self._stopping = True + for task in self._tasks: + task.cancel() + if self._tasks: + try: + await asyncio.wait_for( + asyncio.gather(*self._tasks, return_exceptions=True), + timeout=timeout, + ) + except asyncio.TimeoutError: + stuck = [t for t in self._tasks if not t.done()] + _LOG.warning( + "reflex.runner.stop_timeout stuck_tasks=%d names=%s", + len(stuck), + [t.get_name() for t in stuck], + ) + self._tasks.clear() + + # ------------------------------------------------------------------ + # Per-handler loop + # ------------------------------------------------------------------ + async def _consume(self, handler: ReflexHandler) -> None: + """Drive one handler. Runs until cancelled or the bus closes.""" + try: + async for event in self._bus.subscribe(handler.pattern): + await self._dispatch_one(handler, event) + except asyncio.CancelledError: + # Normal shutdown path — re-raise so the task records as cancelled. + raise + except Exception as exc: + # Subscription itself broke (bus down, etc.). Log loudly; the + # supervising deployment will restart the pod. + _LOG.error( + "reflex.runner.subscription_failed handler=%s pattern=%s error=%s", + handler.id, + handler.pattern, + exc, + ) + + async def _dispatch_one( + self, + handler: ReflexHandler, + event: EventMessage, + ) -> None: + """Invoke one handler safely and persist its outcome.""" + try: + outcome = await handler.handle(event) + except asyncio.CancelledError: + raise + except Exception as exc: + outcome = ReflexOutcome( + handler=handler.id, + subject=event.subject, + target=str(event.payload.get("target") or "") or None, + severity="high", + occurred_at=datetime.now(tz=timezone.utc), + payload={}, + outcome="errored", + rationale=f"handler raised {type(exc).__name__}", + # Cap the traceback so a runaway recursion can't blow up + # the operator UI. 4KB is plenty for a useful diag. + diagnostic={"traceback": traceback.format_exc()[-4096:]}, + ) + _LOG.warning( + "reflex.runner.handler_raised handler=%s subject=%s error=%s", + handler.id, + event.subject, + exc, + ) + if outcome is None: + return + self.outcomes.append(outcome) + _LOG.info( + "reflex.outcome handler=%s subject=%s target=%s severity=%s outcome=%s", + outcome.handler, + outcome.subject, + outcome.target, + outcome.severity, + outcome.outcome, + ) + + +__all__ = ["ReflexRunner"] diff --git a/pyproject.toml b/pyproject.toml index 9df04e6..5f2e4a0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "netcortex" -version = "0.8.0.dev1" +version = "0.8.0.dev2" description = "The intelligence layer for your network — multi-dimensional graph of the network bridging Meraki, Catalyst Center, Intersight, and more with NetBox as SoT" readme = "README.md" requires-python = ">=3.12" diff --git a/tests/reflex/__init__.py b/tests/reflex/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/reflex/test_handlers.py b/tests/reflex/test_handlers.py new file mode 100644 index 0000000..2303bdd --- /dev/null +++ b/tests/reflex/test_handlers.py @@ -0,0 +1,231 @@ +"""Tests for the first-party reflex handlers. + +These cover the per-handler outcome shape and the target-extraction +fallbacks. The runner-level dispatch is covered in ``test_runner.py``. + +Importing the handlers module side-registers them with the global +registry. We pin those identities here so a rename to a handler id — +which is an operator-facing field on every outcome — fails CI loudly. +""" + +from __future__ import annotations + +from datetime import datetime, timezone + +import pytest + +from netcortex.contracts.event_bus import EventMessage + +# Import the package so the handlers register themselves. We then read +# them back out of the registry by id rather than constructing fresh +# instances — that way a future change to the registration mechanics is +# automatically exercised by these tests too. +from netcortex.reflex import handlers as _handlers # noqa: F401 +from netcortex.reflex.protocol import ReflexHandler, ReflexOutcome +from netcortex.reflex.registry import get_handler + +pytestmark = pytest.mark.asyncio + + +def _event(subject: str, payload: dict[str, object]) -> EventMessage: + return EventMessage( + subject=subject, + payload=payload, + headers={}, + ts=datetime.now(tz=timezone.utc), + ) + + +# --------------------------------------------------------------------------- +# Identity / wiring smoke tests +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "handler_id,expected_pattern", + [ + ("link_down", "sensory.snmp.trap.link_down.>"), + ("security_webhook", "sensory.meraki.webhook.security.>"), + ("bgp_drop", "sensory.snmp.trap.bgp_backward_transition.>"), + ], +) +def test_handler_registered_with_expected_pattern( + handler_id: str, expected_pattern: str +) -> None: + """Handler ids and patterns are part of the operator-facing surface. + + A rename here is fine, but it MUST be intentional — production + operators read these ids in alerts and on the reconciliation UI. + """ + h = get_handler(handler_id) + assert isinstance(h, ReflexHandler) + assert h.id == handler_id + assert h.pattern == expected_pattern + + +# --------------------------------------------------------------------------- +# link_down +# --------------------------------------------------------------------------- + + +async def test_link_down_extracts_device_and_interface() -> None: + h = get_handler("link_down") + outcome = await h.handle(_event( + "sensory.snmp.trap.link_down.r1", + {"device_id": "r1", "interface": "Gi0/1"}, + )) + assert outcome is not None + assert outcome.handler == "link_down" + assert outcome.target == "r1" + assert outcome.severity == "high" + assert outcome.payload["interface"] == "Gi0/1" + assert outcome.outcome == "logged" + + +async def test_link_down_handles_missing_target_field() -> None: + """No device field at all — outcome.target is None, not a stringified ``None``.""" + h = get_handler("link_down") + outcome = await h.handle(_event( + "sensory.snmp.trap.link_down.unknown", + {"interface": "Gi0/1"}, + )) + assert outcome is not None + assert outcome.target is None + + +async def test_link_down_caps_upstream_keys() -> None: + """A pathologically wide payload doesn't blow up the outcome record.""" + h = get_handler("link_down") + wide = {f"k{i}": i for i in range(100)} + outcome = await h.handle(_event("sensory.snmp.trap.link_down.r1", wide)) + assert outcome is not None + assert len(outcome.payload["upstream_keys"]) <= 16 + + +# --------------------------------------------------------------------------- +# security_webhook +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "upstream,expected", + [ + ("informational", "info"), + ("warning", "warn"), + ("high", "high"), + ("critical", "critical"), + # Unknown values fall back to "warn" — defensive default so an + # unrecognized Meraki severity never gets dropped below warn. + ("", "warn"), + ("totally-bogus", "warn"), + # Case-insensitive. + ("CRITICAL", "critical"), + ], +) +async def test_security_webhook_severity_mapping( + upstream: str, expected: str +) -> None: + h = get_handler("security_webhook") + outcome = await h.handle(_event( + "sensory.meraki.webhook.security.ids_alerted", + {"severity": upstream, "alertId": "abc-123", + "networkId": "N_1", "clientMac": "aa:bb:cc:dd:ee:ff", + "alertType": "ids_alerted"}, + )) + assert outcome is not None + assert outcome.severity == expected + + +async def test_security_webhook_prefers_client_mac_as_target() -> None: + h = get_handler("security_webhook") + outcome = await h.handle(_event( + "sensory.meraki.webhook.security.ids_alerted", + {"clientMac": "aa:bb:cc:dd:ee:ff", + "deviceSerial": "Q2XX-YYYY-ZZZZ", + "networkId": "N_1", + "severity": "warning"}, + )) + assert outcome is not None + assert outcome.target == "aa:bb:cc:dd:ee:ff" + + +async def test_security_webhook_falls_back_to_device_serial() -> None: + h = get_handler("security_webhook") + outcome = await h.handle(_event( + "sensory.meraki.webhook.security.malware_detected", + {"deviceSerial": "Q2XX-YYYY-ZZZZ", "networkId": "N_1"}, + )) + assert outcome is not None + assert outcome.target == "Q2XX-YYYY-ZZZZ" + + +# --------------------------------------------------------------------------- +# bgp_drop +# --------------------------------------------------------------------------- + + +async def test_bgp_drop_composes_session_target() -> None: + """device+peer -> ``device|peer`` canonical session id.""" + h = get_handler("bgp_drop") + outcome = await h.handle(_event( + "sensory.snmp.trap.bgp_backward_transition.r1", + {"device_id": "r1", "peer": "10.0.1.5", + "peer_asn": 65001, "last_state": "Established"}, + )) + assert outcome is not None + assert outcome.target == "r1|10.0.1.5" + assert outcome.severity == "high" + assert outcome.payload["peer_asn"] == 65001 + assert outcome.payload["last_state"] == "Established" + + +async def test_bgp_drop_target_falls_back_to_peer_only() -> None: + h = get_handler("bgp_drop") + outcome = await h.handle(_event( + "sensory.snmp.trap.bgp_backward_transition.unknown", + {"peer": "10.0.1.5"}, + )) + assert outcome is not None + assert outcome.target == "10.0.1.5" + + +async def test_bgp_drop_target_none_when_no_identifiers() -> None: + h = get_handler("bgp_drop") + outcome = await h.handle(_event( + "sensory.snmp.trap.bgp_backward_transition.unknown", + {"last_state": "Established"}, + )) + assert outcome is not None + assert outcome.target is None + + +# --------------------------------------------------------------------------- +# All-handler invariants +# --------------------------------------------------------------------------- + + +@pytest.mark.parametrize( + "handler_id", + ["link_down", "security_webhook", "bgp_drop"], +) +async def test_handler_returns_frozen_outcome_with_required_fields( + handler_id: str, +) -> None: + """Every handler must produce a well-formed :class:`ReflexOutcome`. + + Catches the common bug of forgetting to set ``severity`` or returning + a dict instead of a dataclass. + """ + h = get_handler(handler_id) + outcome = await h.handle(_event( + # Use a subject that matches each handler's pattern hierarchically. + # We don't strictly need this — handle() doesn't re-validate the + # subject — but it makes the test inputs realistic. + f"sensory.test.invocation.{handler_id}", + {"target": "test-target"}, + )) + assert outcome is not None + assert isinstance(outcome, ReflexOutcome) + assert outcome.handler == handler_id + assert outcome.severity in {"info", "warn", "high", "critical"} + assert outcome.outcome in {"logged", "applied", "skipped", "errored"} diff --git a/tests/reflex/test_registry.py b/tests/reflex/test_registry.py new file mode 100644 index 0000000..00f656a --- /dev/null +++ b/tests/reflex/test_registry.py @@ -0,0 +1,123 @@ +"""Unit tests for the reflex handler registry. + +The registry is the single source of truth the runner enumerates. Its +contract is small but load-bearing — duplicate ids are an operator- +facing footgun (handler ids appear on every persisted outcome), so the +registry refuses them loudly rather than silently shadowing. +""" + +from __future__ import annotations + +from datetime import datetime, timezone +from typing import Final + +import pytest + +from netcortex.contracts.event_bus import EventMessage +from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.registry import ( + DuplicateHandlerError, + all_handlers, + clear_registry, + get_handler, + register_handler, +) + + +class _StubHandler: + """Minimal structural ReflexHandler used by these tests.""" + + def __init__(self, hid: str, pattern: str = "test.>") -> None: + self.id: Final[str] = hid + self.pattern: Final[str] = pattern + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=None, + severity="info", + occurred_at=datetime.now(tz=timezone.utc), + payload={}, + ) + + +@pytest.fixture(autouse=True) +def _isolated_registry(): + """Save/restore the global registry around every test in this module. + + The first-party handlers register themselves at import time. These + tests need an empty registry to assert on, but they must NOT leak + the cleared state to sibling test files (e.g., ``test_handlers.py`` + relies on the production handlers being registered). Snapshot before, + restore after — using only the public registry surface. + """ + snapshot = list(all_handlers()) + clear_registry() + yield + clear_registry() + for h in snapshot: + register_handler(h) + + +def test_register_and_lookup() -> None: + h = _StubHandler("alpha") + returned = register_handler(h) + # Decorator-style usage returns the handler unchanged. + assert returned is h + assert get_handler("alpha") is h + assert list(all_handlers()) == [h] + + +def test_register_preserves_insertion_order() -> None: + a, b, c = _StubHandler("a"), _StubHandler("b"), _StubHandler("c") + register_handler(a) + register_handler(b) + register_handler(c) + assert [h.id for h in all_handlers()] == ["a", "b", "c"] + + +def test_register_duplicate_id_raises() -> None: + register_handler(_StubHandler("dup")) + with pytest.raises(DuplicateHandlerError) as ei: + register_handler(_StubHandler("dup")) + # Error message names BOTH handler classes so the operator can find + # the offending file quickly. We do not pin the exact message; only + # that the registered id appears in it. + assert "dup" in str(ei.value) + + +def test_register_same_instance_is_idempotent() -> None: + """Re-registering the exact same instance is a no-op. + + Modules can be imported twice in the same process (test suite + isolation, plugin auto-discovery), so the registry treats an + identical re-register as the harmless event it is. + """ + h = _StubHandler("same") + register_handler(h) + register_handler(h) + assert list(all_handlers()) == [h] + + +def test_register_rejects_non_handler() -> None: + """Registration is type-checked at registration time.""" + + class NotAHandler: + # Missing handle(), pattern, id. + pass + + with pytest.raises(TypeError): + register_handler(NotAHandler()) # type: ignore[arg-type] + + +def test_get_handler_missing_raises_key_error() -> None: + with pytest.raises(KeyError): + get_handler("never-registered") + + +def test_clear_registry_empties() -> None: + register_handler(_StubHandler("x")) + register_handler(_StubHandler("y")) + clear_registry() + assert list(all_handlers()) == [] diff --git a/tests/reflex/test_runner.py b/tests/reflex/test_runner.py new file mode 100644 index 0000000..01bb033 --- /dev/null +++ b/tests/reflex/test_runner.py @@ -0,0 +1,241 @@ +"""End-to-end tests for :class:`ReflexRunner`. + +Exercise the runner against the in-memory event bus so dispatch is +deterministic without standing up NATS. The in-memory bus implements the +same Protocol, so passing here means the runner will behave identically +against the production NATS backend (verified by the contract suite that +NATS satisfies that Protocol). +""" + +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone +from typing import Any, Final + +import pytest + +from netcortex.contracts.event_bus import EventBus, EventMessage +from netcortex.reflex.protocol import ReflexOutcome +from netcortex.reflex.runner import ReflexRunner +from tests.contracts.event_bus.in_memory import InMemoryEventBus + +pytestmark = pytest.mark.asyncio + + +def _bus() -> EventBus: + return InMemoryEventBus() + + +class _RecordingHandler: + """Handler that records every event it sees for test introspection.""" + + def __init__(self, hid: str, pattern: str) -> None: + self.id: Final[str] = hid + self.pattern: Final[str] = pattern + self.seen: list[EventMessage] = [] + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + self.seen.append(event) + return ReflexOutcome( + handler=self.id, + subject=event.subject, + target=str(event.payload.get("target") or "") or None, + severity="info", + occurred_at=datetime.now(tz=timezone.utc), + payload=dict(event.payload), + ) + + +class _RaisingHandler: + """Handler that raises — used to verify per-handler isolation.""" + + id: Final[str] = "boom" + pattern: Final[str] = "sensory.boom.>" + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + raise RuntimeError("simulated handler failure") + + +class _SkippingHandler: + """Handler that returns ``None`` — used to verify None-skips-recording.""" + + id: Final[str] = "skip" + pattern: Final[str] = "sensory.skip.>" + + async def handle(self, event: EventMessage) -> ReflexOutcome | None: + return None + + +async def _wait_for(predicate: Any, timeout: float = 1.5) -> None: + """Poll until ``predicate()`` is truthy or timeout. Better than fixed sleep.""" + deadline = asyncio.get_event_loop().time() + timeout + while asyncio.get_event_loop().time() < deadline: + if predicate(): + return + await asyncio.sleep(0.02) + raise AssertionError("predicate did not become true within timeout") + + +async def test_dispatches_event_to_matching_handler() -> None: + bus = _bus() + handler = _RecordingHandler("link_down", "sensory.snmp.trap.link_down.>") + runner = ReflexRunner(bus, handlers=[handler]) + await runner.start() + try: + await asyncio.sleep(0.05) # let subscription register + await bus.publish( + "sensory.snmp.trap.link_down.r1", + {"interface": "Gi0/1", "target": "r1"}, + ) + await _wait_for(lambda: len(handler.seen) == 1) + finally: + await runner.stop() + await bus.close() + + assert handler.seen[0].subject == "sensory.snmp.trap.link_down.r1" + assert handler.seen[0].payload == {"interface": "Gi0/1", "target": "r1"} + assert len(runner.outcomes) == 1 + outcome = runner.outcomes[0] + assert outcome.handler == "link_down" + assert outcome.target == "r1" + + +async def test_pattern_filters_out_non_matching_events() -> None: + bus = _bus() + handler = _RecordingHandler("only_a", "sensory.a.>") + runner = ReflexRunner(bus, handlers=[handler]) + await runner.start() + try: + await asyncio.sleep(0.05) + await bus.publish("sensory.a.event", {"i": 0}) + await bus.publish("sensory.b.event", {"i": 1}) # must NOT match + await bus.publish("sensory.a.deeper.event", {"i": 2}) + # Give the bus time to deliver any non-matching events too (and + # have them be filtered out, not silently delivered). + await _wait_for(lambda: len(handler.seen) == 2) + finally: + await runner.stop() + await bus.close() + + assert sorted(e.payload["i"] for e in handler.seen) == [0, 2] + + +async def test_multiple_handlers_fan_out() -> None: + """One event whose subject matches two handlers reaches both.""" + bus = _bus() + a = _RecordingHandler("a", "sensory.shared.>") + b = _RecordingHandler("b", "sensory.shared.>") + runner = ReflexRunner(bus, handlers=[a, b]) + await runner.start() + try: + await asyncio.sleep(0.05) + await bus.publish("sensory.shared.event", {"i": 0}) + await _wait_for(lambda: len(a.seen) == 1 and len(b.seen) == 1) + finally: + await runner.stop() + await bus.close() + + +async def test_handler_exception_does_not_kill_dispatcher() -> None: + """A raising handler produces an ``errored`` outcome and keeps going.""" + bus = _bus() + boom = _RaisingHandler() + runner = ReflexRunner(bus, handlers=[boom]) + await runner.start() + try: + await asyncio.sleep(0.05) + await bus.publish("sensory.boom.event", {"n": 1}) + await bus.publish("sensory.boom.event", {"n": 2}) + await _wait_for(lambda: len(runner.outcomes) == 2) + finally: + await runner.stop() + await bus.close() + + assert all(o.outcome == "errored" for o in runner.outcomes) + assert all("RuntimeError" in o.rationale for o in runner.outcomes) + # Diagnostic carries a traceback for debugging. + assert "traceback" in runner.outcomes[0].diagnostic + assert ( + "simulated handler failure" in runner.outcomes[0].diagnostic["traceback"] + ) + + +async def test_none_outcome_is_not_recorded() -> None: + """``handle() -> None`` is a conscious no-op, NOT an error.""" + bus = _bus() + handler = _SkippingHandler() + runner = ReflexRunner(bus, handlers=[handler]) + await runner.start() + try: + await asyncio.sleep(0.05) + await bus.publish("sensory.skip.event", {}) + # Wait long enough that, if an outcome were going to be recorded, + # it would have been. Then assert it was not. + await asyncio.sleep(0.2) + finally: + await runner.stop() + await bus.close() + + assert runner.outcomes == [] + + +async def test_start_is_idempotent() -> None: + bus = _bus() + handler = _RecordingHandler("x", "sensory.x.>") + runner = ReflexRunner(bus, handlers=[handler]) + await runner.start() + await runner.start() # second call must be a no-op + try: + # The handler list is one task, not two. + assert len([t for t in runner._tasks if not t.done()]) == 1 # noqa: SLF001 + finally: + await runner.stop() + await bus.close() + + +async def test_stop_is_idempotent() -> None: + bus = _bus() + runner = ReflexRunner( + bus, handlers=[_RecordingHandler("x", "sensory.x.>")] + ) + await runner.start() + await runner.stop() + await runner.stop() # second call must be a no-op + await bus.close() + + +async def test_stop_without_start_is_safe() -> None: + """Calling stop on a runner that never started is a no-op.""" + bus = _bus() + runner = ReflexRunner(bus, handlers=[_RecordingHandler("x", "sensory.x.>")]) + await runner.stop() + await bus.close() + + +async def test_runner_enumerates_registry_when_no_handlers_given() -> None: + """Default-argument path: the runner reads the registry.""" + from netcortex.reflex.registry import ( + all_handlers, + clear_registry, + register_handler, + ) + + # Snapshot the production registry so we don't leak the cleared state + # to sibling test files that depend on the first-party handlers. + snapshot = list(all_handlers()) + clear_registry() + try: + a = _RecordingHandler("reg-a", "sensory.reg.>") + b = _RecordingHandler("reg-b", "sensory.reg.>") + register_handler(a) + register_handler(b) + bus = _bus() + runner = ReflexRunner(bus) + assert {h.id for h in runner.handlers} == {"reg-a", "reg-b"} + await runner.stop() + await bus.close() + finally: + clear_registry() + for h in snapshot: + register_handler(h) From f2acbcf0cb0db5412e26a20b76b441c46a983b4f Mon Sep 17 00:00:00 2001 From: Steve NCA Date: Mon, 1 Jun 2026 05:25:57 +0000 Subject: [PATCH 2/2] test(reflex): silence pytest-asyncio mark warning on sync test test_handler_registered_with_expected_pattern was synchronous but the module's pytestmark = pytest.mark.asyncio still applied to it, producing 3 warnings per CI run. Future strict-mode pytest-asyncio may upgrade those warnings to errors. Trivially fixed by declaring the test async (no await required). Splitting it into its own file just to avoid the marker would be a worse trade-off. Co-authored-by: Cursor --- tests/reflex/test_handlers.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/reflex/test_handlers.py b/tests/reflex/test_handlers.py index 2303bdd..49389ce 100644 --- a/tests/reflex/test_handlers.py +++ b/tests/reflex/test_handlers.py @@ -49,13 +49,18 @@ def _event(subject: str, payload: dict[str, object]) -> EventMessage: ("bgp_drop", "sensory.snmp.trap.bgp_backward_transition.>"), ], ) -def test_handler_registered_with_expected_pattern( +async def test_handler_registered_with_expected_pattern( handler_id: str, expected_pattern: str ) -> None: """Handler ids and patterns are part of the operator-facing surface. A rename here is fine, but it MUST be intentional — production operators read these ids in alerts and on the reconciliation UI. + + Declared ``async`` purely to match the module-level + ``pytestmark = pytest.mark.asyncio``; there is nothing to await + here. Splitting this into its own file just to avoid the marker + would be a worse trade-off. """ h = get_handler(handler_id) assert isinstance(h, ReflexHandler)