feat(daemon): smart correlation state restoration during replay#61
Merged
Conversation
Replace the blanket "skip state on any replay" heuristic with a position-aware decision. The daemon now tracks the NATS JetStream high-water mark (stream sequence + published timestamp) in SQLite alongside the correlation snapshot, and on restart compares the replay start point against the stored position: - Forward replay (start > stored): restore state, preserving cross-boundary correlation windows. - Backward replay (start <= stored): skip state to avoid double-counting. - --replay-from-latest: always skip (starting fresh). - --keep-state / --clear-state: explicit operator overrides. Also wires --timestamp-fallback (wallclock|skip) to the CLI so forensic replay over logs without standard timestamp fields can opt into "detections only, no correlation updates." Replaces the old boolean `clear_correlation_state` config field with the richer `StateRestoreMode` enum (Auto / ForceClear / ForceKeep). The SQLite schema is auto-migrated on open to add `source_sequence` and `source_timestamp` columns.
15 tests covering: - SqliteStateStore: round-trip with/without SourcePosition, position updates, schema migration from old format, empty database. - decide_state_restore: ForceClear skips, ForceKeep restores, Auto with Resume/Latest/forward sequence/backward sequence/equal sequence/forward time/backward time/no stored position.
Cover --clear-state, --keep-state, --timestamp-fallback, schema migration, and NATS sequence-aware forward/backward replay logic.
On macOS, FSEvents fires multiple events on tempfile creation, filling the bounded reload channel before the test calls POST /api/v1/reload. Retry with 500ms backoff so the debounce loop drains the channel first. Also handle non-2xx responses in http_post instead of panicking.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
--keep-state/--clear-stateas explicit operator overrides for cases where the automatic decision is not what you want.--timestamp-fallback wallclock|skipto the CLI so forensic replay over logs without standard timestamp fields can opt into "detections only, no correlation updates."Behavior matrix
Resume(default)--replay-from-sequence Nwhere N > stored--replay-from-sequence Nwhere N <= stored--replay-from-time Twhere T > stored--replay-from-time Twhere T <= stored--replay-from-latest--clear-state--keep-state(new)Implementation
AckToken::nats_stream_position()extracts(stream_sequence, published_unix_timestamp)from JetStream message metadata before acking.SourcePositionstruct stored assource_sequence/source_timestampINTEGER columns in SQLite (auto-migrated from old schema viaPRAGMA table_infocheck).StateRestoreModeenum (Auto/ForceClear/ForceKeep) replaces the oldclear_correlation_state: boolonDaemonConfig.decide_state_restore()encapsulates the full decision tree, testable in isolation.AtomicU64/AtomicI64in the ack task, read by the periodic and shutdown state savers.--timestamp-fallbackwired throughbuild_correlation_configto setTimestampFallback::Skipon theCorrelationConfig.Test plan
SqliteStateStore(round-trip with/without position, position updates, old schema migration, empty DB)decide_state_restore(ForceClear, ForceKeep, Resume, Latest, forward/backward/equal sequence, forward/backward time, no stored position)cargo clippy --workspace --all-targets --all-features -- -D warningscleancargo clippy --workspace --all-targets -- -D warningsclean (without NATS feature)--state-db, shut down, restart with--replay-from-sequenceabove stored position, verify state restored