feat(cortex): Implement Phase 1 ("The Tick Loop")#301
feat(cortex): Implement Phase 1 ("The Tick Loop")#301vsumner wants to merge 3 commits intospacedriveapp:mainfrom
Conversation
WalkthroughAdds a per-agent memory event bus ( Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/api/agents.rs (1)
406-407: Use shared event-bus defaults in warmup bootstrap.This hard-codes capacities in one path. Using the shared helper keeps behavior aligned with the rest of the runtime.
♻️ Suggested change
- let (event_tx, memory_event_tx) = - crate::create_process_event_buses_with_capacity(16, 64); + let (event_tx, memory_event_tx) = crate::create_process_event_buses();🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/api/agents.rs` around lines 406 - 407, The warmup bootstrap hard-codes capacities by calling create_process_event_buses_with_capacity(16, 64); replace that call with the shared helper that uses the runtime defaults (e.g. crate::create_process_event_buses()) so the warmup path aligns with the rest of the runtime; update the local binding (let (event_tx, memory_event_tx) = ...) to use the helper instead of the _with_capacity variant.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/cortex.rs`:
- Around line 985-987: Separate lag-warning state for the control and memory
streams by duplicating the existing variables: replace the single
lagged_since_last_warning: u64 and last_lag_warning: Option<Instant> with
per-stream variants (e.g., lagged_since_last_warning_control: u64,
last_lag_warning_control: Option<Instant>, lagged_since_last_warning_memory:
u64, last_lag_warning_memory: Option<Instant>), keep memory_event_stream_open
as-is (or introduce memory_event_stream_open/control_event_stream_open if
needed), and update all references in the control receiver logic to use the
*_control variables and all references in the memory receiver logic to use the
*_memory variables so counts and warning cadence are tracked independently for
each stream.
- Around line 911-925: The bulletin refresh currently advances
last_bulletin_refresh even when no fresh bulletin is produced; change the flow
so generate_bulletin (called via maybe_generate_bulletin_under_lock) returns an
explicit success indicator or the new bulletin (e.g., Option<String> or
Result<bool>), propagate that result back to spawn_bulletin_refresh_task, and
only update RuntimeConfig::last_bulletin_refresh when a new bulletin was
actually generated and stored in RuntimeConfig::memory_bulletin; update
maybe_generate_bulletin_under_lock and generate_bulletin signatures/usages
accordingly so the decision to advance the cadence is made on a real success
flag (or by comparing the old/new memory_bulletin) rather than unconditionally
after the task completes.
---
Nitpick comments:
In `@src/api/agents.rs`:
- Around line 406-407: The warmup bootstrap hard-codes capacities by calling
create_process_event_buses_with_capacity(16, 64); replace that call with the
shared helper that uses the runtime defaults (e.g.
crate::create_process_event_buses()) so the warmup path aligns with the rest of
the runtime; update the local binding (let (event_tx, memory_event_tx) = ...) to
use the helper instead of the _with_capacity variant.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock,!**/*.lock
📒 Files selected for processing (17)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(core)/architecture.mdxdocs/design-docs/cortex-implementation.mdsrc/agent/channel.rssrc/agent/channel_dispatch.rssrc/agent/channel_history.rssrc/agent/compactor.rssrc/agent/cortex.rssrc/agent/ingestion.rssrc/api/agents.rssrc/lib.rssrc/main.rssrc/telemetry/registry.rssrc/tools.rssrc/tools/memory_save.rstests/bulletin.rstests/context_dump.rs
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
src/agent/cortex.rs (1)
429-429: Extract the signal buffer cap into a shared constant.The
100cap appears in both initialization and eviction paths; one constant avoids drift.♻️ Proposed cleanup
+const SIGNAL_BUFFER_CAPACITY: usize = 100; @@ - signal_buffer: Arc::new(RwLock::new(VecDeque::with_capacity(100))), + signal_buffer: Arc::new(RwLock::new(VecDeque::with_capacity(SIGNAL_BUFFER_CAPACITY))), @@ - if buffer.len() > 100 { + if buffer.len() > SIGNAL_BUFFER_CAPACITY { buffer.pop_front(); }Also applies to: 628-630
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/cortex.rs` at line 429, Introduce a single shared constant (e.g. SIGNAL_BUFFER_CAP) and replace the hard-coded 100 in the VecDeque::with_capacity call used to initialize signal_buffer (the Arc::new(RwLock::new(VecDeque::with_capacity(100)))) and any eviction/trim logic that currently uses 100 (see the eviction code referencing the same numeric cap around the signal handling/ejection paths). Update references in functions/methods that manipulate signal_buffer so they use SIGNAL_BUFFER_CAP for capacity checks, push/evict thresholds, and any loop bounds to ensure capacity stays consistent across initialization and eviction code.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/agent/cortex.rs`:
- Around line 932-946: The bulletin refresh loop can retry every tick because
failures don't advance last_bulletin_refresh and spawn_bulletin_refresh_task
always calls generate_profile; to fix, implement simple
rate-limiting/circuit-breaker in maybe_generate_bulletin_under_lock (or the
caller) by tracking consecutive failures in deps.runtime_config.warmup_status
(or a new counter field), increment the counter on failure and only treat the
job as "due" again after a backoff (e.g., advance last_bulletin_refresh or set a
cooldown timestamp) or after N successful attempts, and gate the
generate_profile call in spawn_bulletin_refresh_task so it runs only on success
(or when failure counter is zero/under threshold) to avoid repeated profile
generations during prolonged bulletin failures; update BulletinRefreshOutcome to
carry success/failure so caller can update the failure counter and
last_bulletin_refresh accordingly.
---
Nitpick comments:
In `@src/agent/cortex.rs`:
- Line 429: Introduce a single shared constant (e.g. SIGNAL_BUFFER_CAP) and
replace the hard-coded 100 in the VecDeque::with_capacity call used to
initialize signal_buffer (the
Arc::new(RwLock::new(VecDeque::with_capacity(100)))) and any eviction/trim logic
that currently uses 100 (see the eviction code referencing the same numeric cap
around the signal handling/ejection paths). Update references in
functions/methods that manipulate signal_buffer so they use SIGNAL_BUFFER_CAP
for capacity checks, push/evict thresholds, and any loop bounds to ensure
capacity stays consistent across initialization and eviction code.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/agent/cortex.rs (2)
990-994:⚠️ Potential issue | 🟡 MinorUse structured field instead of literal retry-delay placeholder.
At Line 993,
{RETRY_DELAY_SECS}is logged as literal text, not a value. Add it as a tracing field to keep logs accurate.💡 Proposed fix
tracing::info!( attempt = attempt + 1, max = MAX_RETRIES, - "retrying bulletin generation in {RETRY_DELAY_SECS}s" + retry_delay_secs = RETRY_DELAY_SECS, + "retrying bulletin generation" );🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/cortex.rs` around lines 990 - 994, The tracing::info! call currently logs the literal "{RETRY_DELAY_SECS}" instead of the value; update the tracing invocation (the tracing::info! near the retry loop that includes attempt and max fields) to add a structured field for the delay (e.g. retry_delay_secs = RETRY_DELAY_SECS) and remove the literal placeholder from the message so the actual RETRY_DELAY_SECS value is emitted in logs.
977-1011:⚠️ Potential issue | 🟠 MajorStartup-failure path delays retry until full bulletin interval.
If all startup attempts fail, Line 1010 still seeds
last_bulletin_refreshto now, so the first scheduled retry is blocked bybulletin_intervalinstead of the failure backoff path. This can leave the bulletin stale much longer than intended.💡 Proposed fix
- for attempt in 0..=MAX_RETRIES { + let mut startup_refresh_succeeded = false; + for attempt in 0..=MAX_RETRIES { let bulletin_outcome = maybe_generate_bulletin_under_lock( cortex.deps.runtime_config.warmup_lock.as_ref(), &cortex.deps.runtime_config.warmup, &cortex.deps.runtime_config.warmup_status, || generate_bulletin(&cortex.deps, logger), ) .await; if bulletin_outcome.is_success() { + startup_refresh_succeeded = true; break; } if attempt < MAX_RETRIES { tracing::info!( attempt = attempt + 1, max = MAX_RETRIES, "retrying bulletin generation in {RETRY_DELAY_SECS}s" ); // ... } } // Generate an initial profile after startup bulletin synthesis. generate_profile(&cortex.deps, logger).await; - let mut last_bulletin_refresh = Instant::now(); + let now = Instant::now(); + let mut last_bulletin_refresh = now; // ... - let mut bulletin_refresh_failures: u32 = 0; - let mut next_bulletin_refresh_allowed_at = Instant::now(); + let mut bulletin_refresh_failures: u32 = if startup_refresh_succeeded { 0 } else { 1 }; + let mut next_bulletin_refresh_allowed_at = if startup_refresh_succeeded { + now + } else { + now + bulletin_refresh_failure_backoff(bulletin_refresh_failures) + };Based on learnings: "The Cortex must generate a memory bulletin—a periodically refreshed, LLM-curated summary of agent knowledge (~500 words) using
memory_recall—on a configurable interval (default 60 min) and cache it inRuntimeConfig::memory_bulletin."Also applies to: 1134-1137
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/cortex.rs` around lines 977 - 1011, The startup bulletin retry loop may leave last_bulletin_refresh set to now even if all attempts failed, which postpones the next retry until the full bulletin interval; after the for-loop, detect if no attempt succeeded (i.e., bulletin_outcome never .is_success()), and instead set last_bulletin_refresh back sufficiently (e.g., Instant::now() - Duration::from_secs(RETRY_DELAY_SECS) or by subtracting the configured bulletin interval) so the scheduler will take the failure backoff path; update the logic around maybe_generate_bulletin_under_lock / generate_bulletin and the assignment to last_bulletin_refresh to implement this conditional adjustment.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/agent/cortex.rs`:
- Around line 990-994: The tracing::info! call currently logs the literal
"{RETRY_DELAY_SECS}" instead of the value; update the tracing invocation (the
tracing::info! near the retry loop that includes attempt and max fields) to add
a structured field for the delay (e.g. retry_delay_secs = RETRY_DELAY_SECS) and
remove the literal placeholder from the message so the actual RETRY_DELAY_SECS
value is emitted in logs.
- Around line 977-1011: The startup bulletin retry loop may leave
last_bulletin_refresh set to now even if all attempts failed, which postpones
the next retry until the full bulletin interval; after the for-loop, detect if
no attempt succeeded (i.e., bulletin_outcome never .is_success()), and instead
set last_bulletin_refresh back sufficiently (e.g., Instant::now() -
Duration::from_secs(RETRY_DELAY_SECS) or by subtracting the configured bulletin
interval) so the scheduler will take the failure backoff path; update the logic
around maybe_generate_bulletin_under_lock / generate_bulletin and the assignment
to last_bulletin_refresh to implement this conditional adjustment.
48a9738 to
01c57e1
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/agent/cortex.rs (1)
976-1010:⚠️ Potential issue | 🟠 MajorStartup failure path postpones the next bulletin attempt for a full interval.
At Line 1010,
last_bulletin_refreshis set toInstant::now()unconditionally. If startup bulletin synthesis fails all retries, the due check at Line 1135 stays false until the fullbulletin_intervalelapses, which can delay recovery significantly. It also allows Line 1009 to generate profile data without a fresh startup bulletin.💡 Proposed fix
- for attempt in 0..=MAX_RETRIES { + let mut startup_refresh_succeeded = false; + for attempt in 0..=MAX_RETRIES { let bulletin_outcome = maybe_generate_bulletin_under_lock( cortex.deps.runtime_config.warmup_lock.as_ref(), &cortex.deps.runtime_config.warmup, &cortex.deps.runtime_config.warmup_status, || generate_bulletin(&cortex.deps, logger), ) .await; if bulletin_outcome.is_success() { + startup_refresh_succeeded = true; break; } if attempt < MAX_RETRIES { tracing::info!( attempt = attempt + 1, max = MAX_RETRIES, "retrying bulletin generation in {RETRY_DELAY_SECS}s" ); logger.log( "bulletin_failed", &format!( "Bulletin generation failed, retrying (attempt {}/{})", attempt + 1, MAX_RETRIES ), Some(serde_json::json!({ "attempt": attempt + 1, "max_retries": MAX_RETRIES })), ); tokio::time::sleep(Duration::from_secs(RETRY_DELAY_SECS)).await; } } - // Generate an initial profile after startup bulletin synthesis. - generate_profile(&cortex.deps, logger).await; - let mut last_bulletin_refresh = Instant::now(); + if startup_refresh_succeeded { + generate_profile(&cortex.deps, logger).await; + } + let now = Instant::now(); + let cortex_config = **cortex.deps.runtime_config.cortex.load(); + let bulletin_interval = Duration::from_secs(cortex_config.bulletin_interval_secs.max(1)); + let mut last_bulletin_refresh = if startup_refresh_succeeded { + now + } else { + now.checked_sub(bulletin_interval).unwrap_or(now) + }; ... - let mut bulletin_refresh_failures: u32 = 0; - let mut next_bulletin_refresh_allowed_at = Instant::now(); + let mut bulletin_refresh_failures: u32 = if startup_refresh_succeeded { 0 } else { 1 }; + let mut next_bulletin_refresh_allowed_at = if startup_refresh_succeeded { + now + } else { + now + bulletin_refresh_failure_backoff(bulletin_refresh_failures) + };Based on learnings: "The Cortex must generate a memory bulletin—a periodically refreshed, LLM-curated summary of agent knowledge (~500 words) using
memory_recall—on a configurable interval (default 60 min) and cache it inRuntimeConfig::memory_bulletin."Also applies to: 1132-1137
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/agent/cortex.rs` around lines 976 - 1010, The startup code unconditionally sets last_bulletin_refresh and runs generate_profile even if maybe_generate_bulletin_under_lock / generate_bulletin failed; move the last_bulletin_refresh update and the generate_profile(&cortex.deps, logger).await call into the success branch so they only run when bulletin_outcome.is_success(); if all startup retries fail, set last_bulletin_refresh to Instant::now() - Duration::from_secs(cortex.deps.runtime_config.bulletin_interval) (or otherwise subtract the configured bulletin interval) so the periodic due-check will immediately permit the next bulletin attempt; use the existing symbols maybe_generate_bulletin_under_lock, generate_bulletin, generate_profile, and cortex.deps.runtime_config.bulletin_interval to locate and implement these changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/design-docs/cortex-implementation.md`:
- Around line 57-59: Update the Phase 1 notes to reflect the actual
`ProcessEvent` enum surface (not "all 12 ProcessEvent variants") and document
that the implementation must map every current `ProcessEvent` variant; also
update the `MemorySaved` event documentation to state it includes `memory_type`
and `importance` fields and that real `memory_type`, `importance` and content
summaries should be pulled from events rather than hardcoded—refer to
`ProcessEvent`, `MemorySaved`, `memory_type`, and `importance` when making these
text changes so readers can verify parity against the code.
---
Outside diff comments:
In `@src/agent/cortex.rs`:
- Around line 976-1010: The startup code unconditionally sets
last_bulletin_refresh and runs generate_profile even if
maybe_generate_bulletin_under_lock / generate_bulletin failed; move the
last_bulletin_refresh update and the generate_profile(&cortex.deps,
logger).await call into the success branch so they only run when
bulletin_outcome.is_success(); if all startup retries fail, set
last_bulletin_refresh to Instant::now() -
Duration::from_secs(cortex.deps.runtime_config.bulletin_interval) (or otherwise
subtract the configured bulletin interval) so the periodic due-check will
immediately permit the next bulletin attempt; use the existing symbols
maybe_generate_bulletin_under_lock, generate_bulletin, generate_profile, and
cortex.deps.runtime_config.bulletin_interval to locate and implement these
changes.
ℹ️ Review info
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lockis excluded by!**/*.lock,!**/*.lock
📒 Files selected for processing (17)
docs/content/docs/(configuration)/config.mdxdocs/content/docs/(core)/architecture.mdxdocs/design-docs/cortex-implementation.mdsrc/agent/channel.rssrc/agent/channel_dispatch.rssrc/agent/channel_history.rssrc/agent/compactor.rssrc/agent/cortex.rssrc/agent/ingestion.rssrc/api/agents.rssrc/lib.rssrc/main.rssrc/telemetry/registry.rssrc/tools.rssrc/tools/memory_save.rstests/bulletin.rstests/context_dump.rs
🚧 Files skipped from review as they are similar to previous changes (4)
- src/telemetry/registry.rs
- docs/content/docs/(configuration)/config.mdx
- src/agent/channel_dispatch.rs
- src/agent/ingestion.rs
| - Map all 12 `ProcessEvent` variants, not just 3 | ||
| - Pull real `memory_type`, `importance`, content summaries from events | ||
| - Enrich `MemorySaved` event variant with `memory_type` and `importance` fields so the cortex gets useful data without querying the store |
There was a problem hiding this comment.
Update the ProcessEvent variant count in Phase 1 notes.
Line [57] says “all 12 ProcessEvent variants,” but the current enum surface is larger. This can mislead parity checks against implementation.
📝 Suggested doc fix
-- Map all 12 `ProcessEvent` variants, not just 3
+- Map all current `ProcessEvent` variants, not just 3📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - Map all 12 `ProcessEvent` variants, not just 3 | |
| - Pull real `memory_type`, `importance`, content summaries from events | |
| - Enrich `MemorySaved` event variant with `memory_type` and `importance` fields so the cortex gets useful data without querying the store | |
| - Map all current `ProcessEvent` variants, not just 3 | |
| - Pull real `memory_type`, `importance`, content summaries from events | |
| - Enrich `MemorySaved` event variant with `memory_type` and `importance` fields so the cortex gets useful data without querying the store |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@docs/design-docs/cortex-implementation.md` around lines 57 - 59, Update the
Phase 1 notes to reflect the actual `ProcessEvent` enum surface (not "all 12
ProcessEvent variants") and document that the implementation must map every
current `ProcessEvent` variant; also update the `MemorySaved` event
documentation to state it includes `memory_type` and `importance` fields and
that real `memory_type`, `importance` and content summaries should be pulled
from events rather than hardcoded—refer to `ProcessEvent`, `MemorySaved`,
`memory_type`, and `importance` when making these text changes so readers can
verify parity against the code.
Overview
This PR implements Phase 1: The Tick Loop for Cortex as defined in
docs/design-docs/cortex-implementation.mdand prepares the runtime for later phases.Phase 1 is intentionally foundational and does not introduce destructive behavior changes:
This PR is also the required base for Phase 2 work (PR #305):
spawn_cortex_loop()+ runtime tick loopWhat Phase 1 includes
Runtime bus and loop plumbing (done)
event_tx(control/lifecycle) andmemory_event_tx(memory telemetry).spawn_cortex_loop()that observes both buses and ticks.Event capture for health-relevant signals
memory_savetool now emits enrichedMemorySavedmemory events (memory_type,importance,content_summary).CompactionTriggeredwhen thresholds are reached.Signal model parity and observation mapping
Signalenum to align with emitted/consumedProcessEventsurface.observe()mapping to include worker, branch, tool, memory, compaction, task, and link events.Channel-scoped event filtering safety
Receiver resilience and lag observability
Laggedis observed as operational telemetry rather than a hard failure.Shared receive-policy helpers
Verification and test guidance
Commands run in this PR
cargo test -p spacebot channel_event_loop_continues_after_lagged_broadcast -- --nocapturecargo test -p spacebot run_cortex_loop_tick_not_starved_by_events -- --nocapturecargo test -p spacebot summarize_memory_content_truncates_to_max_chars -- --nocapturecargo test -p spacebot summarize_signal_text_truncates_long_text -- --nocapturecargo test -p spacebot event_filter_scopes_tool_events_by_channel -- --nocapturecargo test -p spacebot memory_receiver_lagged_continues_loop_and_tracks_drop_count -- --nocapturecargo check -p spacebotcargo check -p spacebot --features metricsjust gate-prRecommended ongoing validation
cargo test --libcargo test --test bulletincargo test --test context_dumpjust preflightjust gate-prAcceptance Criteria
tick_interval_secsandbulletin_interval_secsare applied at runtime and observed on ticks.MemorySavedandCompactionTriggeredare emitted to allow supervisor observability.observe()consumes the broad event set and emits normalizedSignalentries with real values.Signalenum and mapping are phase-consistent with current event producers.Definition of Done
spawn_cortex_loop().MemorySavedandCompactionTriggeredare emitted where specified.Signalevent mapping includes currentProcessEventvariants in use.observe()path is resilient toLagged/closed receiver states.Risks / Rollback
Files changed in this PR
src/agent/channel.rssrc/agent/channel_dispatch.rssrc/agent/channel_history.rssrc/agent/compactor.rssrc/agent/cortex.rssrc/agent/ingestion.rssrc/api/agents.rssrc/lib.rssrc/main.rssrc/telemetry/registry.rssrc/tools.rssrc/tools/memory_save.rstests/bulletin.rstests/context_dump.rsdocs/design-docs/cortex-implementation.mddocs/content/docs/(configuration)/config.mdxdocs/content/docs/(core)/architecture.mdxCargo.lockNotes