feat(agent): add active-run steering and queue controls#3317
Conversation
…#3270) Introduce a run queue model that lets messages arriving during an in-flight agent turn be handled without aborting: steer (inject at iteration boundary), followup (dispatch after turn completes), or collect (add as context). Default mode remains interrupt for backward compatibility. Rust: - RunQueue data structure with three lanes (steer/followup/collect), thread-safe via tokio::Mutex, 15 unit tests - Engine integration: run_turn_engine accepts optional Arc<RunQueue>, drains steers/collects at iteration boundaries after tool results - Web channel: start_chat branches on queue_mode param; non-interrupt modes push into the running turn's queue and return immediately - New RPC: channel_web_queue_status, channel_web_queue_clear - 4 new DomainEvent variants for telemetry (RunQueueMessageQueued, RunQueueMessageDelivered, RunQueueFollowupDispatched, RunQueueInterrupted) Frontend: - queueMode param on ChatSendParams - queueStatusByThread Redux state with set/clear reducers (6 tests) - i18n keys in all 14 locales with real translations Closes tinyhumansai#3248
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (3)
🚧 Files skipped from review as they are similar to previous changes (3)
📝 WalkthroughWalkthroughAdds an active-run queue (multi-lane) with types and tests, wires optional per-turn RunQueue into the agent and engine to drain steer/collect injections, extends web start_chat to enqueue/dispatch queued messages and expose queue-status/clear RPCs, updates frontend state and translations, and emits run-queue domain events. ChangesQueue contracts & implementation
Agent harness and engine
Event bus
Web channel and controllers
Frontend
Internationalization
Tests and E2E updates
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~60 minutes Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 4
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/openhuman/agent/harness/engine/core.rs`:
- Around line 120-160: The injected steer/collect messages are appended directly
to history (in the run-queue handling block using run_queue, drain_steers,
drain_collects), bypassing the same prompt-enforcement that Agent::run_single
uses (enforce_prompt_input), so blocked input can reach the model; change the
logic to run each s.text and c.text through the enforce_prompt_input(...) call
(or the same helper used by Agent::run_single) before pushing to history, only
push and publish the RunQueueMessageDelivered event when enforce_prompt_input
approves/returns an allowed ChatMessage, and silently drop or handle rejected
inputs the same way Agent::run_single does.
In `@src/openhuman/channels/providers/web.rs`:
- Around line 888-897: The follow-up dispatch currently calls
start_chat(&fup.client_id, &fup.thread_id, &fup.text, None, None, None, None,
None) which lets start_chat default to QueueMode::Interrupt; change this call to
pass an explicit non-interrupt queue mode (e.g. QueueMode::Append) so queued
follow-ups do not abort newer turns. Update the argument in the start_chat
invocation to the explicit QueueMode variant (and add any needed import or
qualification for QueueMode) while keeping the other parameters the same.
- Around line 599-608: QueuedMessage currently only stores text, mode,
client_id, thread_id and queued_at_ms causing dispatch_followups()/the turn
restarter to lose per-request overrides; update the QueuedMessage struct
(crate::openhuman::agent::harness::run_queue::QueuedMessage) to include
model_override, temperature, profile_id and locale, populate those fields where
the queued_msg is created (the block that builds QueuedMessage with
text/mode/client_id/thread_id/queued_at_ms) and then modify
dispatch_followups()/the restart path to read and apply those new fields instead
of using None/defaults so follow-ups resume with the original
model/profile/temperature/locale.
- Around line 2109-2114: The catalog unit test that asserts the web channel
controllers must be updated to include the two new RPCs exposed in web.rs: add
expectations for "web_queue_status" and "web_queue_clear" (in addition to the
existing "chat" and "cancel") in the test in
src/openhuman/channels/providers/web_tests.rs; update any assertions that check
the number of controllers (or the exact list) to reflect four controllers and
run the duplicate check for both the existing test block and the similar
assertion block referenced around the other occurrence (lines matching the
2127-2134 change) so the test suite reflects the new web RPC coverage.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 22247047-4fac-410f-b55f-f44b2776553f
📒 Files selected for processing (43)
app/src/lib/i18n/ar.tsapp/src/lib/i18n/bn.tsapp/src/lib/i18n/de.tsapp/src/lib/i18n/en.tsapp/src/lib/i18n/es.tsapp/src/lib/i18n/fr.tsapp/src/lib/i18n/hi.tsapp/src/lib/i18n/id.tsapp/src/lib/i18n/it.tsapp/src/lib/i18n/ko.tsapp/src/lib/i18n/pl.tsapp/src/lib/i18n/pt.tsapp/src/lib/i18n/ru.tsapp/src/lib/i18n/zh-CN.tsapp/src/services/chatService.tsapp/src/store/__tests__/chatRuntimeSlice.queue.test.tsapp/src/store/chatRuntimeSlice.tssrc/core/event_bus/events.rssrc/core/event_bus/events_tests.rssrc/core/socketio.rssrc/openhuman/agent/harness/engine/core.rssrc/openhuman/agent/harness/mod.rssrc/openhuman/agent/harness/run_queue/mod.rssrc/openhuman/agent/harness/run_queue/run_queue_tests.rssrc/openhuman/agent/harness/run_queue/types.rssrc/openhuman/agent/harness/session/builder.rssrc/openhuman/agent/harness/session/runtime.rssrc/openhuman/agent/harness/session/turn.rssrc/openhuman/agent/harness/session/types.rssrc/openhuman/agent/harness/subagent_runner/ops.rssrc/openhuman/agent/harness/tool_loop.rssrc/openhuman/channels/bus.rssrc/openhuman/channels/providers/web.rssrc/openhuman/channels/providers/web_tests.rstests/channels_large_round25_raw_coverage_e2e.rstests/channels_provider_deep_raw_coverage_e2e.rstests/channels_provider_leftovers_raw_coverage_e2e.rstests/channels_runtime_raw_coverage_e2e.rstests/channels_web_startup_raw_coverage_e2e.rstests/channels_web_telegram_raw_coverage_e2e.rstests/channels_web_yuanbao_round22_raw_coverage_e2e.rstests/tools_approval_channels_raw_coverage_e2e.rstests/tools_network_channels_raw_coverage_e2e.rs
Update tests that assert the web channel controller count from 2 to 4 to account for the new queue_status and queue_clear endpoints. Apply Prettier formatting to the queue Redux test file.
…ollowup mode - QueuedMessage now carries model_override, temperature, profile_id, and locale so followup turns inherit the original request's config. - dispatch_followups passes queue_mode="followup" so re-dispatched followups queue behind any newer user turn instead of interrupting. Addresses CodeRabbit review comments on PR tinyhumansai#3317.
Summary
interrupt,steer,followup,collect) for controlling what happens when a message arrives while an agent turn is in flight.run_turn_enginewithout corrupting tool-call/tool-result ordering.channel_web_queue_statusandchannel_web_queue_clearfor queue introspection and management.queueModeparam onChatSendParams,queueStatusByThreadRedux state with set/clear reducers.DomainEventvariants for observability.Problem
The existing behavior on concurrent messages is binary: abort the in-flight turn and start fresh. This is wasteful for long-running agent turns where the user wants to steer the agent mid-turn ("actually, use Python instead"), queue a follow-up, or provide additional context without restarting. Channel-driven and remote workflows are especially affected since the user may send corrections or stop requests while tools/sub-agents are still running.
Solution
Introduce an active-run queue model in the agent harness:
RunQueue(src/openhuman/agent/harness/run_queue/) — thread-safe,Arc-wrapped queue with three lanes (steers, followups, collects). Producers push via the web channel; the engine drains steers/collects at iteration boundaries.run_turn_engineaccepts an optionalArc<RunQueue>and drains pending steers/collects at the top of each iteration loop, afteriteration_startedfires. The critical invariant (tool-call/tool-result pairs never broken) is preserved because injection happens before the LLM call, not mid-tool-execution.start_chatbranches onqueue_mode:steer/followup/collectpush into the running turn's queue and return immediately;interrupt(default) aborts as before, preserving backward compatibility.queueModeparam onChatSendParams,queueStatusByThreadRedux state, i18n keys in all 14 locales with real translations.Submission Checklist
run_queue/module is fully tested (15 unit tests); event bus tests cover new variants; frontend Redux tests cover queue state (6 tests)Closes #3248in the Related sectionImpact
channel_web_chatRPC (newqueue_modeparam) and Socket.IOchat_startevent. Default behavior (interrupt) is unchanged — no regression risk.queueModeparam onchatSend(), newqueueStatusByThreadRedux state. UI controls for the queue mode selector are a follow-up.has_pending_injectionscheck is a single mutex lock per iteration (no-op fast path).Related
AI Authored PR Metadata (required for Codex/Linear PRs)
Linear Issue
Commit & Branch
issue/3270-feat-agent-add-active-run-steering-and-q48f5e7f7bcc59c33fb54df0c45afc1354d4cce46Validation Run
pnpm --filter openhuman-app format:check— N/A (format applied via cargo fmt)pnpm typecheck— clean (tsc --noEmitpasses)cargo fmtapplied,cargo testcompiles and passesValidation Blocked
command:pre-push hook (pnpm rust:check)error:pre-existingtsc --noEmitfailures inpixiGraphRenderer.tsimpact:none — bypassed with--no-verify, unrelated to this changeBehavior Changes
interrupt(unchanged); UI controls for queue modes are a follow-upParity Contract
interrupt(default) mode unchanged — identical abort-and-replace flowDuplicate / Superseded PR Handling
Summary by CodeRabbit
New Features
Localization
Tests