fix(cron): persist scheduler state and delivery outcomes#509
fix(cron): persist scheduler state and delivery outcomes#509
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
WalkthroughPersisted per-job scheduling cursor and claim-before-run semantics added; execution vs delivery outcomes split across DB, API, UI, metrics, and store; proactive broadcast gains classified transient/permanent failures with bounded retry/backoff; sandbox backend detection moved to a new module with tests. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/cron/scheduler.rs (1)
154-178:⚠️ Potential issue | 🟠 MajorDon't leave the scheduler in a half-enabled state on cursor-init failures.
register_with_anchor()inserts the job andset_enabled(true)flips/inserts it beforeensure_job_next_run_at()can fail. On that error path the method returns early, but the scheduler now holds an enabled in-memory job with no timer; the nextset_enabled(true)is a no-op because the flag is already true. Initialize the cursor first or roll back the in-memory mutation on failure.Also applies to: 621-633, 640-655
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 154 - 178, The code currently inserts the job and flips enabled before calling ensure_job_next_run_at/start_timer, which can leave an enabled in-memory job without a timer if ensure_job_next_run_at fails; modify register_with_anchor so it either initializes the cursor/anchor before mutating in-memory state or, if you must insert first, roll back the insertion and clear the enabled flag on any error from ensure_job_next_run_at/start_timer; specifically update the block that writes self.jobs and the subsequent calls to ensure_job_next_run_at(&config.id, anchor).await and start_timer(&config.id, anchor).await to perform cursor initialization first or to remove the job/disable it on error so the scheduler cannot remain half-enabled.
🧹 Nitpick comments (4)
tests/sandbox_detection_test.rs (1)
3-21: This integration coverage is largely duplicated by unit tests.These assertions mirror
src/sandbox/detection.rstests almost exactly. Consider keeping one layer (or changing this file to only validate crate-level public re-export wiring).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/sandbox_detection_test.rs` around lines 3 - 21, The tests in tests/sandbox_detection_test.rs duplicate unit tests in src/sandbox/detection.rs (they repeat detect_backend() existence and SandboxBackend variants); either remove the redundant integration tests or change this file to assert crate-level public re-exports instead (e.g., call the public API that re-exports detect_backend and SandboxBackend and verify the re-exported symbol types/variants), keeping detect_backend() usage minimal and replacing the three matches! assertions with a single check that the public SandboxBackend enum is exposed with the Bubblewrap, SandboxExec and None variants.src/sandbox.rs (1)
995-1001:test_sandbox_mode_serializationdoes not test serialization.The test currently only pattern-matches enum variants. It will still pass if serde rename/format behavior regresses.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/sandbox.rs` around lines 995 - 1001, The test test_sandbox_mode_serialization only pattern-matches variants instead of exercising Serde/TOML behavior; update it to serialize SandboxMode::Enabled and SandboxMode::Disabled (using toml::to_string or serde_json/to_string if TOML helper not available) and assert the produced strings match the expected serialized forms (e.g., the exact renamed values required by the TOML spec for SandboxMode), and also round-trip by deserializing those strings back into SandboxMode to ensure serde rename/format hasn’t regressed.tests/sandbox_initialization_test.rs (1)
5-11: This “initialization” test is compile-only and misses runtime validation.Current checks prove symbol visibility, but not initialization behavior. A minimal runtime smoke path would better match the test name and intent.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/sandbox_initialization_test.rs` around lines 5 - 11, The test only checks symbol visibility; add a minimal runtime smoke path in test_sandbox_creation_in_initialize that actually constructs a Sandbox/SandboxConfig (e.g., a default or test config), calls the real initialization function (initialize_agents or whichever function wires up the sandbox) and/or invokes detect_backend at runtime, and asserts the call succeeds (no panic and returns an Ok/expected value) to validate initialization behavior; update the test to exercise spacebot::sandbox::Sandbox, spacebot::sandbox::SandboxConfig, and spacebot::sandbox::detect_backend in a real execution path rather than only using std::any::type_name.tests/tool_sandbox_integration_test.rs (1)
12-29: Prefer direct API smoke checks overtype_name().contains(...).These assertions are brittle and mostly redundant with successful compilation/imports. Direct usage checks are cleaner and more stable.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/tool_sandbox_integration_test.rs` around lines 12 - 29, Replace brittle type_name string checks with direct compile-time smoke checks by referencing the types themselves: for ShellTool use a direct binding like `let _: ShellTool;` or instantiate via `ShellTool::new()`/`Default::default()` if available, and for spacebot::sandbox::Sandbox use `let _: spacebot::sandbox::Sandbox;` or call `Sandbox::new()`/`Default::default()` if a constructor exists; update the tests `test_shell_tool_module_structure` and `test_sandbox_module_structure` to use these direct bindings/instantiations instead of `type_name().contains(...)`.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/content/docs/`(features)/cron.mdx:
- Around line 217-223: Update the earlier passages that currently state run-once
jobs "disable after the execution attempt" so they match the behavior described
in point 6: run-once jobs are disabled before execution starts to guarantee
at-most-once semantics and deterministic ownership via advancing next_run_at;
ensure all mentions of "run-once" behavior (including any examples or sentences
that reference disabling after execution) are changed to state they are disabled
prior to execution start and that this ties into advancing next_run_at for
ownership.
- Around line 23-25: The overview flow shows the Scheduler recording execution
status before delivering, which contradicts later text that logs delivery
outcomes; update the sequence so the Scheduler invokes
MessagingManager::broadcast_proactive(...) (delivering the OutboundResponse)
first, then records execution status and writes the delivery outcome to the log;
reference MessagingManager::broadcast_proactive(...), Scheduler, and
OutboundResponse when making this swap so the diagram and Line 221 phrasing
match.
- Around line 114-115: The paragraph about config-backed jobs is ambiguous about
restart behavior; update it to explicitly state whether the enabled value in
config.toml takes precedence on startup or whether persisted disabled state
blocks loading. Specifically, clarify that when jobs are "seeded into the
database on startup" and "upserted" from config.toml, the config-sourced enabled
flag (the `enabled` field in config.toml) overrides any previously persisted
disabled state, so a reboot will restore the job's config `enabled` setting (or,
if you intend the opposite, state that persisted disabled jobs are never
reloaded and config is only used for new jobs). Mention the terms `config.toml`,
`enabled`, and the "upserted/seeded into the database on startup" behavior to
remove the ambiguity.
In `@src/cron/scheduler.rs`:
- Around line 355-367: The code currently discards errors from
advance_job_cursor(...) in the skip paths; change both call sites (the block
using compute_following_next_run_at(&job, &context, scheduled_run_at) and the
similar block at the other location) to handle the Result instead of using let _
= ...; specifically, await the Result, match or use ?/map_err to log the error
via the scheduler's logger (include job_id, scheduled_run_at and next_run_at/job
identity in the message) and decide a recovery action (e.g., return/propagate
the error, increment a retry counter, or at minimum continue with a short
backoff) so the scheduler won't spin on a persistent store error; ensure the new
code uses the same error type handling as advance_job_cursor and avoids silent
discards.
- Around line 1360-1367: persist_cron_execution is performing a synchronous
await on context.store.log_execution while the caller still holds an
ExecutionGuard, which can block the timer loop if SQLite stalls; change this to
fire-and-forget by spawning a background task that takes ownership (clone/move)
of the needed values and calls context.store.log_execution(cron_id,
record).await inside tokio::spawn, ensuring any metrics call
(record_cron_metrics) happens before spawning or is moved into the task as
appropriate, and log errors inside the spawned task so the ExecutionGuard can be
dropped before the DB write runs.
In `@src/messaging/traits.rs`:
- Around line 117-137: broadcast_failure_kind currently only inspects the
top-level error variant for a BroadcastFailureError, causing wrapped platform
errors to bypass the typed classifier; change broadcast_failure_kind to walk the
error chain (searching sources) and return the first BroadcastFailureError.kind
found (i.e., locate BroadcastFailureError anywhere in the error's cause chain)
before falling back to is_heuristically_transient_broadcast_error; this will
make broadcast_failure_kind follow the same precedence as
mark_classified_broadcast and prevent permanent adapter failures from being
misclassified in MessagingManager::broadcast_proactive.
In `@src/sandbox.rs`:
- Around line 941-946: The current check only tests if
Command::new("bwrap").arg("--version").output().await returned Err, but ignores
non-zero exit statuses; update the logic that handles the Result from
Command::new("bwrap").arg("--version").output().await (the version_check
variable) to also inspect the returned Output's status
(output.status.success()), and treat any non-successful exit as "bwrap not
found" by logging via tracing::debug!("bwrap not found in PATH") and returning
InternalBackend::None; keep the existing error path but add the success check
for the Output returned by version_check.
In `@src/sandbox/detection.rs`:
- Around line 34-57: check_bubblewrap currently treats a failed `--proc`
preflight as absence of bubblewrap, which is incorrect; change check_bubblewrap
to separately report (1) whether `bwrap` exists and (2) whether `--proc` is
supported (e.g., return Result<(bool /*exists*/, bool /*proc_supported*/),
Box<dyn Error>> or a small struct/enum), by first checking `bwrap --version` for
existence and then running the preflight only to set the proc_supported boolean;
update any callers (the public API path that expects an optional backend and the
runtime code referencing proc_supported in src/sandbox.rs) to read the
appropriate field so presence of bubblewrap is not hidden when proc support is
false.
- Around line 19-26: detect_backend currently hides probe failures by using `if
let Ok(true) = check_bubblewrap().await` and `if let Ok(true) =
check_sandbox_exec().await`; change these to explicitly match the Result from
`check_bubblewrap()` and `check_sandbox_exec()` so you log any `Err(e)` (include
`e` and context like "bubblewrap probe failed" or "sandbox-exec probe failed")
and only proceed when the probe returns `Ok(true)` (log `Ok(false)` as
debug/info if desired). Update the branches that return
`SandboxBackend::Bubblewrap`/log "sandbox-exec" accordingly so errors are not
discarded and failures remain visible.
In `@src/telemetry/registry.rs`:
- Around line 183-185: The metric cron_delivery_total declares labels "agent_id,
task_type, result" but the recorder in scheduler (uses cron_id as the second
label) makes "task_type" misleading and high-cardinality; update the metric
label to "cron_id" (or another accurate name like "job_id") in the IntCounterVec
declaration (cron_delivery_total) and any related registry entries, and adjust
any documentation to match, or alternatively change the recorder call site that
populates cron_delivery_total to emit a true task classification instead of
cron_id; ensure the label name you choose matches the values emitted by the
recorder (cron_id in scheduler.rs) so the public metric contract is correct.
---
Outside diff comments:
In `@src/cron/scheduler.rs`:
- Around line 154-178: The code currently inserts the job and flips enabled
before calling ensure_job_next_run_at/start_timer, which can leave an enabled
in-memory job without a timer if ensure_job_next_run_at fails; modify
register_with_anchor so it either initializes the cursor/anchor before mutating
in-memory state or, if you must insert first, roll back the insertion and clear
the enabled flag on any error from ensure_job_next_run_at/start_timer;
specifically update the block that writes self.jobs and the subsequent calls to
ensure_job_next_run_at(&config.id, anchor).await and start_timer(&config.id,
anchor).await to perform cursor initialization first or to remove the
job/disable it on error so the scheduler cannot remain half-enabled.
---
Nitpick comments:
In `@src/sandbox.rs`:
- Around line 995-1001: The test test_sandbox_mode_serialization only
pattern-matches variants instead of exercising Serde/TOML behavior; update it to
serialize SandboxMode::Enabled and SandboxMode::Disabled (using toml::to_string
or serde_json/to_string if TOML helper not available) and assert the produced
strings match the expected serialized forms (e.g., the exact renamed values
required by the TOML spec for SandboxMode), and also round-trip by deserializing
those strings back into SandboxMode to ensure serde rename/format hasn’t
regressed.
In `@tests/sandbox_detection_test.rs`:
- Around line 3-21: The tests in tests/sandbox_detection_test.rs duplicate unit
tests in src/sandbox/detection.rs (they repeat detect_backend() existence and
SandboxBackend variants); either remove the redundant integration tests or
change this file to assert crate-level public re-exports instead (e.g., call the
public API that re-exports detect_backend and SandboxBackend and verify the
re-exported symbol types/variants), keeping detect_backend() usage minimal and
replacing the three matches! assertions with a single check that the public
SandboxBackend enum is exposed with the Bubblewrap, SandboxExec and None
variants.
In `@tests/sandbox_initialization_test.rs`:
- Around line 5-11: The test only checks symbol visibility; add a minimal
runtime smoke path in test_sandbox_creation_in_initialize that actually
constructs a Sandbox/SandboxConfig (e.g., a default or test config), calls the
real initialization function (initialize_agents or whichever function wires up
the sandbox) and/or invokes detect_backend at runtime, and asserts the call
succeeds (no panic and returns an Ok/expected value) to validate initialization
behavior; update the test to exercise spacebot::sandbox::Sandbox,
spacebot::sandbox::SandboxConfig, and spacebot::sandbox::detect_backend in a
real execution path rather than only using std::any::type_name.
In `@tests/tool_sandbox_integration_test.rs`:
- Around line 12-29: Replace brittle type_name string checks with direct
compile-time smoke checks by referencing the types themselves: for ShellTool use
a direct binding like `let _: ShellTool;` or instantiate via
`ShellTool::new()`/`Default::default()` if available, and for
spacebot::sandbox::Sandbox use `let _: spacebot::sandbox::Sandbox;` or call
`Sandbox::new()`/`Default::default()` if a constructor exists; update the tests
`test_shell_tool_module_structure` and `test_sandbox_module_structure` to use
these direct bindings/instantiations instead of `type_name().contains(...)`.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 1ddbe501-1852-4775-9063-98762bff9b74
📒 Files selected for processing (21)
README.mddocs/content/docs/(features)/cron.mdxdocs/metrics.mdinterface/src/api/client.tsinterface/src/api/schema.d.tsinterface/src/routes/AgentCron.tsxmigrations/20260329105813_cron_next_run.sqlmigrations/20260329143500_cron_execution_delivery_status.sqlsrc/api/cron.rssrc/cron/scheduler.rssrc/cron/store.rssrc/main.rssrc/messaging/manager.rssrc/messaging/traits.rssrc/sandbox.rssrc/sandbox/detection.rssrc/telemetry/registry.rssrc/tools/cron.rstests/sandbox_detection_test.rstests/sandbox_initialization_test.rstests/tool_sandbox_integration_test.rs
src/cron/scheduler.rs
Outdated
| }, | ||
| ) | ||
| .await; | ||
| return Err(error); |
There was a problem hiding this comment.
Worth double-checking the semantics here: you persist execution_succeeded: true / delivery_succeeded: Some(false) but still return Err(error). Upstream the scheduler treats any Err as an execution failure (increments consecutive_failures, can trip the circuit breaker, and logs “execution failed”), which kind of re-collapses delivery failures back into execution failures. If the goal is separation, one option is to return Ok(()) after persisting the delivery failure (or return a typed error and handle it separately in the caller).
| let mut delay = Self::BROADCAST_INITIAL_RETRY_DELAY; | ||
|
|
||
| for attempt in 1..=Self::MAX_BROADCAST_RETRY_ATTEMPTS { | ||
| match adapter.broadcast(target, response.clone()).await { |
There was a problem hiding this comment.
response.clone() on each retry is fine for small payloads, but if proactive broadcasts include OutboundResponse::File { data: Vec<u8> } this can duplicate large buffers (and potentially re-upload the same file multiple times). Might be worth either (a) changing the file payload to be Arc<[u8]>/Bytes, or (b) special-casing file broadcasts to avoid retries / to retry without rebuilding the payload.
There was a problem hiding this comment.
Actionable comments posted: 3
♻️ Duplicate comments (2)
src/cron/scheduler.rs (2)
1338-1350:⚠️ Potential issue | 🟠 MajorDon't route delivery failures through the execution-failure path.
Line 1350 still returns the adapter error after persisting
execution_succeeded: true. The caller treats anyErras a cron execution failure, incrementsconsecutive_failures, emits a"Cron failed"memory event, and can trip the circuit breaker after repeated delivery outages. This needs a distinct non-error outcome, or separate handling in the caller, so delivery failures stay separate from execution failures.Based on learnings, "Apply circuit breaker pattern: Auto-disable recurring tasks after 3 consecutive failures. Apply to cron jobs, maintenance workers, and cortex routines."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 1338 - 1350, The code currently persists execution_succeeded: true via persist_cron_execution and CronExecutionRecord but then returns Err(error) (the delivery adapter error), which causes callers to treat delivery outages as execution failures; change the control flow to return a non-error outcome that encodes a delivery failure instead of Err. Concretely, introduce/return a distinct success-type or enum (e.g., CronExecutionOutcome or a Result variant like Ok(DeliveryFailed(String))) from the function so callers can differentiate execution vs delivery problems; ensure persist_cron_execution remains the same (execution_succeeded: true, delivery_succeeded: Some(false), delivery_error set) and update the caller logic to handle the new delivery-failure outcome (do not increment consecutive_failures or emit "Cron failed" for delivery-only failures). Replace the final return Err(error) at that site with the new non-error delivery-failure return value and adjust types/usages accordingly.
309-325:⚠️ Potential issue | 🟠 MajorBack off when stale-cursor fast-forward fails.
If
advance_job_cursor()errors here, Line 325 immediatelycontinues whilenext_run_atstays overdue, so the loop can hot-spin on the same store failure. The active-hours and execution-lock skip paths already sleep for 5s on this class of error; this path needs the same guard.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 309 - 325, When advance_job_cursor(&context, &jobs, &job_id, &job, next_run_at, fast_forward_to).await returns Err in the stale-cursor branch (the block whose fast-forward is determined by stale_recovery_next_run_at), sleep for a short backoff (same as other error paths, e.g. tokio::time::sleep(Duration::from_secs(5)).await) before continuing to avoid hot-spinning on a persistent store error; keep the existing tracing::warn log and then perform the sleep prior to the continue so next_run_at isn't repeatedly retried immediately.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cron/scheduler.rs`:
- Around line 1403-1427: In record_cron_metrics, overall_result currently folds
delivery status into execution success; change overall_result to reflect only
record.execution_succeeded (e.g., "success" if record.execution_succeeded else
"failure") and remove the delivery_attempted/delivery_succeeded checks from that
variable so cron_executions_total counts execution outcome only; keep
cron_delivery_total logic (delivery_result based on
record.delivery_attempted/delivery_succeeded) as-is to report delivery
separately.
- Around line 1296-1301: In the CronResponseWaitOutcome::TimedOut branch,
replace the throwaway `let _ = channel_handle.await;` with a proper match on the
join result (matching the same pattern and logging behavior used in the
delivered branch): after calling `channel_handle.abort()`, await the join and
handle `Ok(Ok(_))`, `Ok(Err(e))` (log the channel.run() error), and
`Err(join_err)` (log the task join/cancel failure) using the same logging macro
and message style as the delivered branch so no errors are silently discarded.
In `@src/cron/store.rs`:
- Around line 91-133: load_all_executions() now selects cron_id but
row_to_cron_execution_entry() and the CronExecutionEntry struct do not store it,
so callers cannot identify which cron produced each row; update the
CronExecutionEntry struct to include a cron_id field (Option<String> to keep
per-cron queries compatible), update row_to_cron_execution_entry() to extract
"cron_id" from the SqliteRow and populate the new field, and adjust any
construction sites or callers (e.g., load_all_executions and other functions
mentioned around lines 390-401 and 508-518) to handle the new field where
needed.
---
Duplicate comments:
In `@src/cron/scheduler.rs`:
- Around line 1338-1350: The code currently persists execution_succeeded: true
via persist_cron_execution and CronExecutionRecord but then returns Err(error)
(the delivery adapter error), which causes callers to treat delivery outages as
execution failures; change the control flow to return a non-error outcome that
encodes a delivery failure instead of Err. Concretely, introduce/return a
distinct success-type or enum (e.g., CronExecutionOutcome or a Result variant
like Ok(DeliveryFailed(String))) from the function so callers can differentiate
execution vs delivery problems; ensure persist_cron_execution remains the same
(execution_succeeded: true, delivery_succeeded: Some(false), delivery_error set)
and update the caller logic to handle the new delivery-failure outcome (do not
increment consecutive_failures or emit "Cron failed" for delivery-only
failures). Replace the final return Err(error) at that site with the new
non-error delivery-failure return value and adjust types/usages accordingly.
- Around line 309-325: When advance_job_cursor(&context, &jobs, &job_id, &job,
next_run_at, fast_forward_to).await returns Err in the stale-cursor branch (the
block whose fast-forward is determined by stale_recovery_next_run_at), sleep for
a short backoff (same as other error paths, e.g.
tokio::time::sleep(Duration::from_secs(5)).await) before continuing to avoid
hot-spinning on a persistent store error; keep the existing tracing::warn log
and then perform the sleep prior to the continue so next_run_at isn't repeatedly
retried immediately.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d13258f9-14fd-48e1-9321-5f64789e5e90
📒 Files selected for processing (12)
docs/content/docs/(features)/cron.mdxdocs/metrics.mdinterface/src/routes/AgentCron.tsxsrc/cron/scheduler.rssrc/cron/store.rssrc/messaging/traits.rssrc/sandbox.rssrc/sandbox/detection.rssrc/telemetry/registry.rstests/sandbox_detection_test.rstests/sandbox_initialization_test.rstests/tool_sandbox_integration_test.rs
✅ Files skipped from review due to trivial changes (3)
- tests/sandbox_detection_test.rs
- tests/tool_sandbox_integration_test.rs
- src/messaging/traits.rs
🚧 Files skipped from review as they are similar to previous changes (5)
- tests/sandbox_initialization_test.rs
- docs/metrics.md
- src/telemetry/registry.rs
- src/sandbox/detection.rs
- docs/content/docs/(features)/cron.mdx
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
src/cron/scheduler.rs (1)
1362-1374:⚠️ Potential issue | 🟠 MajorDon't feed delivery failures into the circuit breaker.
Lines 1362-1374 persist
execution_succeeded: truebut still returnErr(error). The caller treats anyErras an execution failure, incrementsconsecutive_failures, logs"cron job execution failed", and can auto-disable a healthy cron during adapter outages. Return a non-error outcome here, or distinguish delivery failures in the timer task. Based on learnings, "Apply circuit breaker pattern: Auto-disable recurring tasks after 3 consecutive failures. Apply to cron jobs, maintenance workers, and cortex routines."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 1362 - 1374, The code currently calls persist_cron_execution(...) with execution_succeeded: true but then returns Err(error), which the caller interprets as an execution failure and increments the circuit-breaker; change the return to a non-error outcome that distinguishes delivery failure from execution failure: either return Ok(()) from the surrounding function after persisting the CronExecutionRecord (so the caller will not increment consecutive_failures), or introduce a distinct result type/enum (e.g. CronRunOutcome or DeliveryFailure) and return that so the timer task can treat delivery failures separately; update references to persist_cron_execution, CronExecutionRecord, execution_succeeded, delivery_succeeded, delivery_error and the caller that checks Err to handle the new non-error/enum outcome accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/cron/scheduler.rs`:
- Around line 663-675: The code sets job.enabled = true before completing
initialization (cron_job_from_config, ensure_job_next_run_at, start_timer), so
if ensure_job_next_run_at or start_timer fails the in-memory jobs map contains
an enabled job with no timer; fix by deferring mutation of job.enabled and the
insertion into self.jobs until after ensure_job_next_run_at and start_timer have
both succeeded (or, alternatively, insert a disabled job and only flip
enabled=true after both calls succeed), and mirror the same change for the other
initialization path referenced around set_enabled to ensure no enabled entry is
left without a timer on error.
- Around line 176-195: The current code inserts the new CronJob into self.jobs
before calling ensure_job_next_run_at(&config.id, anchor), which means a failed
initialization will remove the live previous job and leave running timers
without a map entry; to fix, either (A) read and save the existing entry (e.g.,
old = jobs.get(&config.id).cloned()) before inserting and on error restore it
into self.jobs, or (B) defer mutating self.jobs by performing
ensure_job_next_run_at first and only insert the new job into the map after
ensure_job_next_run_at returns Ok; use the same symbols shown
(self.jobs.write().await, config.id, ensure_job_next_run_at, CronJob) to locate
and update the logic accordingly.
- Around line 432-457: The error branch for the claim path currently continues
immediately on Err (from claim_run_once_fire or advance_job_cursor), causing
tight 1ms retries; instead, add the same 5 second backoff used in the skip paths
before continuing: when match on claimed yields Err(error) in the scheduler loop
(the block referencing job_id, scheduled_run_at, context, jobs, and claimed),
call an async sleep for five seconds (e.g.
tokio::time::sleep(Duration::from_secs(5)).await) after logging the warn and
then continue; ensure imports for Duration/tokio::time are present if needed.
- Around line 717-737: cron_job_from_config must reject a zero interval when
there is no cron expression to avoid a divide-by-zero in interval_initial_delay;
after computing delivery_target and before constructing CronJob (and after
calling normalize_cron_expr), check if cron_expr is None and
config.interval_secs == 0 and return an Err(Error::Other(...)) describing
"interval_secs must be > 0 when no cron_expr is provided"; reference
normalize_cron_expr, config.interval_secs, and cron_job_from_config to locate
where to add this validation.
---
Duplicate comments:
In `@src/cron/scheduler.rs`:
- Around line 1362-1374: The code currently calls persist_cron_execution(...)
with execution_succeeded: true but then returns Err(error), which the caller
interprets as an execution failure and increments the circuit-breaker; change
the return to a non-error outcome that distinguishes delivery failure from
execution failure: either return Ok(()) from the surrounding function after
persisting the CronExecutionRecord (so the caller will not increment
consecutive_failures), or introduce a distinct result type/enum (e.g.
CronRunOutcome or DeliveryFailure) and return that so the timer task can treat
delivery failures separately; update references to persist_cron_execution,
CronExecutionRecord, execution_succeeded, delivery_succeeded, delivery_error and
the caller that checks Err to handle the new non-error/enum outcome accordingly.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6c71b5a3-ce01-402d-9e6e-c955a29ddb53
📒 Files selected for processing (5)
docs/metrics.mdinterface/src/api/client.tsinterface/src/api/schema.d.tssrc/cron/scheduler.rssrc/cron/store.rs
✅ Files skipped from review due to trivial changes (2)
- docs/metrics.md
- interface/src/api/schema.d.ts
🚧 Files skipped from review as they are similar to previous changes (2)
- interface/src/api/client.ts
- src/cron/store.rs
There was a problem hiding this comment.
♻️ Duplicate comments (4)
src/cron/scheduler.rs (4)
472-479:⚠️ Potential issue | 🟠 MajorAdd backoff to the main claim failure path.
When
claim_run_once_fire()oradvance_job_cursor()returnsErr, the code immediatelycontinues with the cursor still overdue. This creates a tight 1ms retry loop until the grace window is exceeded, hammering the store and logs on a persistent DB failure. Mirror the 5s backoff already added in the skip paths (lines 351, 422, 449).🛡️ Proposed fix
match claimed { Ok(true) => {} Ok(false) => continue, Err(error) => { tracing::warn!(cron_id = %job_id, %error, "failed to claim cron fire"); + tokio::time::sleep(Duration::from_secs(5)).await; continue; } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 472 - 479, The Err branch of the match on claimed (inside the cron claim loop) currently warns and continues immediately, causing a tight retry loop; update that Err(error) branch (the clause handling the result of claim_run_once_fire()/advance_job_cursor() where it logs "failed to claim cron fire") to sleep/backoff for 5 seconds (same Duration used in the existing skip paths) before continuing, using the async runtime sleep (e.g., tokio::time::sleep(Duration::from_secs(5)).await) so the cursor remains overdue but retries are rate-limited.
198-217:⚠️ Potential issue | 🟠 MajorPreserve the previous job when re-registration fails.
When re-registering an existing job, Line 200 replaces the live entry before
ensure_job_next_run_at()succeeds. If initialization fails, Line 208 removes the job entirely, so the still-running old timer will see no entry on its next wake-up and stop a previously healthy schedule. Save and restore the priorCronJobon error.🛡️ Proposed fix
{ let mut jobs = self.jobs.write().await; + // Preserve existing job in case rollback is needed + let previous_job = jobs.get(&config.id).cloned(); jobs.insert(config.id.clone(), job); } if config.enabled && let Err(error) = self.ensure_job_next_run_at(&config.id, anchor).await { { let mut jobs = self.jobs.write().await; - jobs.remove(&config.id); + if let Some(prev) = previous_job { + jobs.insert(config.id.clone(), prev); + } else { + jobs.remove(&config.id); + } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 198 - 217, The current insertion of a new CronJob into self.jobs happens before ensure_job_next_run_at succeeds, and on failure the code simply removes the entry which drops any previously-running job; modify the insertion to capture the previous entry (the Option returned by jobs.insert) and, if ensure_job_next_run_at(&config.id, anchor).await returns Err, restore the previous CronJob back into self.jobs if it existed (re-insert the saved Option value) otherwise remove the key; reference the existing variables/methods: self.jobs, jobs.insert(...), ensure_job_next_run_at(&config.id, anchor).await and config.id to locate where to save the old job and where to restore it on error.
725-746:⚠️ Potential issue | 🔴 CriticalValidate zero-second intervals up front.
If the normalized
cron_exprisNone, a stored/API value ofinterval_secs = 0reachesinterval_initial_delay()(line 887), which panics on86400 % interval_secs. The recurring follow-up path would also keep a non-advancing cursor. Reject zero here before the job is registered.🐛 Proposed fix
fn cron_job_from_config(config: &CronConfig) -> Result<CronJob> { + let cron_expr = normalize_cron_expr(config.cron_expr.clone())?; + if cron_expr.is_none() && config.interval_secs == 0 { + return Err(crate::error::Error::Other(anyhow::anyhow!( + "interval_secs must be greater than 0 when cron_expr is not set" + ))); + } + let delivery_target = parse_delivery_target(&config.delivery_target).ok_or_else(|| { crate::error::Error::Other(anyhow::anyhow!( "invalid delivery target '{}': expected format 'adapter:target'", config.delivery_target )) })?; Ok(CronJob { id: config.id.clone(), prompt: config.prompt.clone(), - cron_expr: normalize_cron_expr(config.cron_expr.clone())?, + cron_expr, interval_secs: config.interval_secs,🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 725 - 746, cron_job_from_config must reject jobs that have no cron expression and interval_secs == 0 to avoid a later panic in interval_initial_delay; after computing cron_expr via normalize_cron_expr(config.cron_expr.clone()), check if cron_expr.is_none() and config.interval_secs == 0 and return an Err (use crate::error::Error::Other(anyhow::anyhow!(...)) with a clear message about invalid zero interval when no cron_expr is provided) instead of proceeding to construct CronJob; reference cron_job_from_config, normalize_cron_expr, and interval_secs when adding this early validation.
671-685:⚠️ Potential issue | 🟠 MajorDon't leave re-enabled jobs without a timer on initialization errors.
The cold re-enable path inserts a job with
enabled = true(lines 675, 679) beforeensure_job_next_run_at()succeeds (line 682). If that call fails, the error propagates but the orphaned entry remains enabled with no timer. Laterset_enabled(true)calls will skipstart_timer()because the entry already looks enabled. Roll back the insertion on error or defer settingenabled = trueuntil both initialization calls succeed.🛡️ Proposed fix
let config = self.context.store.load(job_id).await?.ok_or_else(|| { crate::error::Error::Other(anyhow::anyhow!("cron job not found in store")) })?; let mut job = cron_job_from_config(&config)?; - job.enabled = true; + // Don't set enabled until initialization succeeds { let mut jobs = self.jobs.write().await; jobs.insert(job_id.to_string(), job); } - self.ensure_job_next_run_at(job_id, None).await?; + if let Err(error) = self.ensure_job_next_run_at(job_id, None).await { + let mut jobs = self.jobs.write().await; + jobs.remove(job_id); + return Err(error); + } + + { + let mut jobs = self.jobs.write().await; + if let Some(j) = jobs.get_mut(job_id) { + j.enabled = true; + } + } + self.start_timer(job_id, None).await;🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 671 - 685, The inserted job is marked enabled before initialization calls, causing orphaned enabled entries if ensure_job_next_run_at() or start_timer() fail; change the flow in the cold re-enable path using cron_job_from_config, ensure_job_next_run_at, and start_timer so that you either (1) insert the job into self.jobs initially with enabled = false, call ensure_job_next_run_at(job_id, None).await and start_timer(job_id, None).await, then acquire a write lock and flip jobs.get_mut(job_id).enabled = true; or (2) defer insertion until after ensure_job_next_run_at() and start_timer() succeed, and if any of those async calls return an error make sure to remove any partially inserted entry (jobs.remove(job_id)) so no enabled-without-timer state remains.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/cron/scheduler.rs`:
- Around line 472-479: The Err branch of the match on claimed (inside the cron
claim loop) currently warns and continues immediately, causing a tight retry
loop; update that Err(error) branch (the clause handling the result of
claim_run_once_fire()/advance_job_cursor() where it logs "failed to claim cron
fire") to sleep/backoff for 5 seconds (same Duration used in the existing skip
paths) before continuing, using the async runtime sleep (e.g.,
tokio::time::sleep(Duration::from_secs(5)).await) so the cursor remains overdue
but retries are rate-limited.
- Around line 198-217: The current insertion of a new CronJob into self.jobs
happens before ensure_job_next_run_at succeeds, and on failure the code simply
removes the entry which drops any previously-running job; modify the insertion
to capture the previous entry (the Option returned by jobs.insert) and, if
ensure_job_next_run_at(&config.id, anchor).await returns Err, restore the
previous CronJob back into self.jobs if it existed (re-insert the saved Option
value) otherwise remove the key; reference the existing variables/methods:
self.jobs, jobs.insert(...), ensure_job_next_run_at(&config.id, anchor).await
and config.id to locate where to save the old job and where to restore it on
error.
- Around line 725-746: cron_job_from_config must reject jobs that have no cron
expression and interval_secs == 0 to avoid a later panic in
interval_initial_delay; after computing cron_expr via
normalize_cron_expr(config.cron_expr.clone()), check if cron_expr.is_none() and
config.interval_secs == 0 and return an Err (use
crate::error::Error::Other(anyhow::anyhow!(...)) with a clear message about
invalid zero interval when no cron_expr is provided) instead of proceeding to
construct CronJob; reference cron_job_from_config, normalize_cron_expr, and
interval_secs when adding this early validation.
- Around line 671-685: The inserted job is marked enabled before initialization
calls, causing orphaned enabled entries if ensure_job_next_run_at() or
start_timer() fail; change the flow in the cold re-enable path using
cron_job_from_config, ensure_job_next_run_at, and start_timer so that you either
(1) insert the job into self.jobs initially with enabled = false, call
ensure_job_next_run_at(job_id, None).await and start_timer(job_id, None).await,
then acquire a write lock and flip jobs.get_mut(job_id).enabled = true; or (2)
defer insertion until after ensure_job_next_run_at() and start_timer() succeed,
and if any of those async calls return an error make sure to remove any
partially inserted entry (jobs.remove(job_id)) so no enabled-without-timer state
remains.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 90b99e37-9638-429e-b1b5-e5c47abb568b
📒 Files selected for processing (1)
src/cron/scheduler.rs
There was a problem hiding this comment.
♻️ Duplicate comments (2)
src/cron/scheduler.rs (2)
1400-1413:⚠️ Potential issue | 🟠 MajorDon’t return an execution error for delivery-only failures.
At Line 1412, returning
Err(error)after persistingexecution_succeeded: truecauses the caller to incrementconsecutive_failuresand potentially trip the circuit breaker, effectively re-collapsing delivery failure into execution failure.💡 Suggested fix
if let Err(error) = context .messaging_manager .broadcast_proactive( &job.delivery_target.adapter, &job.delivery_target.target, response, ) .await { @@ persist_cron_execution( context, &job.id, CronExecutionRecord { execution_succeeded: true, delivery_attempted: true, delivery_succeeded: Some(false), result_summary: summary.clone(), execution_error: None, delivery_error: Some(error.to_string()), }, ); - return Err(error); + return Ok(()); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 1400 - 1413, The persist block is correctly recording a delivery-only failure (execution_succeeded: true, delivery_succeeded: Some(false)) but then returns Err(error), which will be treated as an execution failure upstream; change the control flow in the function that calls persist_cron_execution so that after creating the CronExecutionRecord for a delivery failure you return an Ok variant (or a delivery-specific enum/result) instead of Err(error) so consecutive_failures is not incremented; modify the return at the site that references job.id to signal success-with-delivery-failure (e.g., return Ok(()) or a DeliveryFailed result) while preserving the persisted delivery_error value.
685-706:⚠️ Potential issue | 🟠 MajorFix enable sequencing to avoid orphaned or non-starting timers.
Line 698 starts the timer while the job is still disabled, so the timer loop can exit immediately on the enabled check (Line 318) before Line 704 flips it to
true. Also, in the in-memory enable path, Line 716 setsenabled = truebefore Line 727; if initialization fails there, the job remains enabled without a timer.💡 Suggested restructuring
@@ - // Job is in the HashMap — normal path. - let was_enabled = { - let mut jobs = self.jobs.write().await; - if let Some(job) = jobs.get_mut(job_id) { - let old = job.enabled; - job.enabled = enabled; - old - } else { - // Should not happen (we checked above), but be defensive. - return Err(crate::error::Error::Other(anyhow::anyhow!( - "cron job not found" - ))); - } - }; + // Job is in the HashMap — normal path. + let was_enabled = { + let jobs = self.jobs.read().await; + jobs.get(job_id) + .map(|job| job.enabled) + .ok_or_else(|| crate::error::Error::Other(anyhow::anyhow!("cron job not found")))? + }; @@ - if enabled && !was_enabled { - self.ensure_job_next_run_at(job_id, None).await?; - self.start_timer(job_id, None).await; + if enabled && !was_enabled { + self.ensure_job_next_run_at(job_id, None).await?; + { + let mut jobs = self.jobs.write().await; + if let Some(job) = jobs.get_mut(job_id) { + job.enabled = true; + } + } + self.start_timer(job_id, None).await; tracing::info!(cron_id = %job_id, "cron job enabled and timer started"); } @@ - if !enabled && was_enabled { + if !enabled && was_enabled { + { + let mut jobs = self.jobs.write().await; + if let Some(job) = jobs.get_mut(job_id) { + job.enabled = false; + } + } // Abort the timer immediately rather than waiting up to one full interval. let handle = { let mut timers = self.timers.write().await; timers.remove(job_id) }; @@ - self.start_timer(job_id, None).await; - - // Atomically enable the job after initialization succeeded + // Mark enabled before starting timer so timer loop doesn't observe disabled state. { let mut jobs = self.jobs.write().await; if let Some(j) = jobs.get_mut(job_id) { j.enabled = true; } } + self.start_timer(job_id, None).await;Also applies to: 711-729
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/cron/scheduler.rs` around lines 685 - 706, The timer is being started while the job remains disabled (start_timer called before setting j.enabled = true) and the in-memory enable path sets enabled = true too early, causing orphaned or non-starting timers; fix by making enabling and timer start atomic: after inserting the job and successfully running ensure_job_next_run_at(job_id, None).await, acquire the write lock, set j.enabled = true and then call start_timer(job_id, None).await while the job is enabled (or alternatively flip enabled then start the timer without releasing the invariant), and in the in-memory enable path ensure you only set j.enabled = true after ensure_job_next_run_at and start_timer succeed (or revert enabled on failure) so that jobs.get_mut(...).enabled is only true when a timer was started successfully.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@src/cron/scheduler.rs`:
- Around line 1400-1413: The persist block is correctly recording a
delivery-only failure (execution_succeeded: true, delivery_succeeded:
Some(false)) but then returns Err(error), which will be treated as an execution
failure upstream; change the control flow in the function that calls
persist_cron_execution so that after creating the CronExecutionRecord for a
delivery failure you return an Ok variant (or a delivery-specific enum/result)
instead of Err(error) so consecutive_failures is not incremented; modify the
return at the site that references job.id to signal
success-with-delivery-failure (e.g., return Ok(()) or a DeliveryFailed result)
while preserving the persisted delivery_error value.
- Around line 685-706: The timer is being started while the job remains disabled
(start_timer called before setting j.enabled = true) and the in-memory enable
path sets enabled = true too early, causing orphaned or non-starting timers; fix
by making enabling and timer start atomic: after inserting the job and
successfully running ensure_job_next_run_at(job_id, None).await, acquire the
write lock, set j.enabled = true and then call start_timer(job_id, None).await
while the job is enabled (or alternatively flip enabled then start the timer
without releasing the invariant), and in the in-memory enable path ensure you
only set j.enabled = true after ensure_job_next_run_at and start_timer succeed
(or revert enabled on failure) so that jobs.get_mut(...).enabled is only true
when a timer was started successfully.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: d0f5e320-fe3f-457e-80a1-fafc0c573f21
📒 Files selected for processing (1)
src/cron/scheduler.rs
…-exec - Add detection.rs with SandboxBackend enum (Bubblewrap, SandboxExec, None) - Implement detect_backend() with preflight checks for bubblewrap - Export public API from sandbox.rs module root - Rename internal definitions to InternalBackend to avoid conflicts - Add integration tests for backend detection The detection probes for available sandbox backends: - Linux: bubblewrap (bwrap) with --proc preflight check - macOS: sandbox-exec at /usr/bin/sandbox-exec
- Add workspace() method to Sandbox for external access - Create tool_sandbox_integration_test.rs to verify ShellTool accepts sandbox through public API The sandbox integration into ShellTool was already implemented: - ShellTool::new() accepts Arc<Sandbox> parameter - ShellTool.call() uses sandbox.wrap() for command containment - DANGEROUS_ENV_VARS blocking preserved for security - create_worker_tool_server() and create_cortex_chat_tool_server() both pass sandbox to ShellTool
- Add SAFETY comment to ExecutionGuard::drop explaining atomic ordering - Consolidate error emission paths with emit_cron_error() helper Verification: - cargo check --lib: clean (0 errors, 0 warnings) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add 5s backoff sleep on claim error to prevent tight retry loops - Restore previous job entry on ensure_job_next_run_at failure instead of removing - Reject jobs with interval_secs == 0 and no cron_expr to prevent later panic - Fix cold re-enable to insert disabled, init, then atomically enable Verification: - cargo check --lib: clean (0 errors, 0 warnings) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix SAFETY comment line reference (406 -> 436) for atomic ordering - Add timer abort note for circuit breaker (behavior verified correct) - Create tests/cron_integration_test.rs with claim race tests: - claim_and_advance_is_atomic_only_one_succeeds - duplicate_claim_fails_after_successful_advance - run_once_claim_prevents_subsequent_execution - stale_cursor_detection_after_lost_claim Verification: - cargo check --lib: clean (0 errors, 0 warnings) - cargo test --no-run: builds successfully Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Return Ok(()) after delivery failure instead of Err(error) so that consecutive_failures is not incremented (execution succeeded, delivery failed) - Make in-memory enable/disable atomic: initialize/start timer first, then flip enabled state atomically; disable state before aborting timer Verification: - cargo check --lib: clean (0 errors, 0 warnings) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
fae2bf3 to
2c3e953
Compare
Why
Cron scheduling was still too timer-local.
next_run_atonly lived in memory, delivery failures were getting collapsed into execution outcomes, and scheduled output could succeed or fail in ways that were hard to reason about across restarts, multiple processes, and proactive delivery retries.This PR makes cron execution state durable and reviewer-visible. The scheduler now claims runs against persisted state, delivery is tracked separately from prompt execution, and the follow-up fixes on this branch close the remaining correctness gaps around re-enable behavior and breaker accounting.
What Changed
next_run_atfor cron jobs and claim recurring runs before execution so restart recovery and multi-process scheduling use a shared durable cursor instead of timer-local statesuccessfield for compatibilityTesting
just preflightjust gate-prNotes
Closes #502