diff --git a/codex-rs/app-server-protocol/schema/json/ClientRequest.json b/codex-rs/app-server-protocol/schema/json/ClientRequest.json index a790874de0e..f90ed006e6d 100644 --- a/codex-rs/app-server-protocol/schema/json/ClientRequest.json +++ b/codex-rs/app-server-protocol/schema/json/ClientRequest.json @@ -4217,4 +4217,4 @@ } ], "title": "ClientRequest" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index c26cea19a40..d3597f7c86b 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -17071,4 +17071,4 @@ }, "title": "CodexAppServerProtocol", "type": "object" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index bbf05bfed0d..6662dbc3cf9 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -15326,4 +15326,4 @@ }, "title": "CodexAppServerProtocolV2", "type": "object" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json b/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json index 6e2e8baebff..ed93913cb39 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json +++ b/codex-rs/app-server-protocol/schema/json/v2/WindowsSandboxSetupStartParams.json @@ -33,4 +33,4 @@ ], "title": "WindowsSandboxSetupStartParams", "type": "object" -} +} \ No newline at end of file diff --git a/codex-rs/app-server-protocol/src/protocol/common.rs b/codex-rs/app-server-protocol/src/protocol/common.rs index 54c0425256c..cff3116f7f5 100644 --- a/codex-rs/app-server-protocol/src/protocol/common.rs +++ b/codex-rs/app-server-protocol/src/protocol/common.rs @@ -207,6 +207,23 @@ client_request_definitions! { params: v2::ThreadUnsubscribeParams, response: v2::ThreadUnsubscribeResponse, }, + #[experimental("thread/increment_elicitation")] + /// Increment the thread-local out-of-band elicitation counter. + /// + /// This is used by external helpers to pause timeout accounting while a user + /// approval or other elicitation is pending outside the app-server request flow. + ThreadIncrementElicitation => "thread/increment_elicitation" { + params: v2::ThreadIncrementElicitationParams, + response: v2::ThreadIncrementElicitationResponse, + }, + #[experimental("thread/decrement_elicitation")] + /// Decrement the thread-local out-of-band elicitation counter. + /// + /// When the count reaches zero, timeout accounting resumes for the thread. + ThreadDecrementElicitation => "thread/decrement_elicitation" { + params: v2::ThreadDecrementElicitationParams, + response: v2::ThreadDecrementElicitationResponse, + }, ThreadSetName => "thread/name/set" { params: v2::ThreadSetNameParams, response: v2::ThreadSetNameResponse, diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 938e6dc26ff..bd92d19537b 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -2252,6 +2252,46 @@ pub enum ThreadUnsubscribeStatus { Unsubscribed, } +/// Parameters for `thread/increment_elicitation`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadIncrementElicitationParams { + /// Thread whose out-of-band elicitation counter should be incremented. + pub thread_id: String, +} + +/// Response for `thread/increment_elicitation`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadIncrementElicitationResponse { + /// Current out-of-band elicitation count after the increment. + pub count: u64, + /// Whether timeout accounting is paused after applying the increment. + pub paused: bool, +} + +/// Parameters for `thread/decrement_elicitation`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadDecrementElicitationParams { + /// Thread whose out-of-band elicitation counter should be decremented. + pub thread_id: String, +} + +/// Response for `thread/decrement_elicitation`. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] +#[serde(rename_all = "camelCase")] +#[ts(export_to = "v2/")] +pub struct ThreadDecrementElicitationResponse { + /// Current out-of-band elicitation count after the decrement. + pub count: u64, + /// Whether timeout accounting remains paused after applying the decrement. + pub paused: bool, +} + #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema, TS)] #[serde(rename_all = "camelCase")] #[ts(export_to = "v2/")] diff --git a/codex-rs/app-server-test-client/scripts/live_elicitation_hold.sh b/codex-rs/app-server-test-client/scripts/live_elicitation_hold.sh new file mode 100644 index 00000000000..838f28120ff --- /dev/null +++ b/codex-rs/app-server-test-client/scripts/live_elicitation_hold.sh @@ -0,0 +1,46 @@ +#!/bin/sh +set -eu + +require_env() { + eval "value=\${$1-}" + if [ -z "$value" ]; then + echo "missing required env var: $1" >&2 + exit 1 + fi +} + +require_env APP_SERVER_URL +require_env APP_SERVER_TEST_CLIENT_BIN + +thread_id="${CODEX_THREAD_ID:-${THREAD_ID-}}" +if [ -z "$thread_id" ]; then + echo "missing required env var: CODEX_THREAD_ID" >&2 + exit 1 +fi + +hold_seconds="${ELICITATION_HOLD_SECONDS:-15}" +incremented=0 + +cleanup() { + if [ "$incremented" -eq 1 ]; then + "$APP_SERVER_TEST_CLIENT_BIN" --url "$APP_SERVER_URL" \ + thread-decrement-elicitation "$thread_id" >/dev/null 2>&1 || true + fi +} + +trap cleanup EXIT INT TERM HUP + +echo "[elicitation-hold] increment thread=$thread_id" +"$APP_SERVER_TEST_CLIENT_BIN" --url "$APP_SERVER_URL" \ + thread-increment-elicitation "$thread_id" +incremented=1 + +echo "[elicitation-hold] sleeping ${hold_seconds}s" +sleep "$hold_seconds" + +echo "[elicitation-hold] decrement thread=$thread_id" +"$APP_SERVER_TEST_CLIENT_BIN" --url "$APP_SERVER_URL" \ + thread-decrement-elicitation "$thread_id" +incremented=0 + +echo "[elicitation-hold] done" diff --git a/codex-rs/app-server-test-client/src/lib.rs b/codex-rs/app-server-test-client/src/lib.rs index 51931d6a80f..beb2bc66d88 100644 --- a/codex-rs/app-server-test-client/src/lib.rs +++ b/codex-rs/app-server-test-client/src/lib.rs @@ -5,6 +5,7 @@ use std::fs::OpenOptions; use std::io::BufRead; use std::io::BufReader; use std::io::Write; +use std::net::TcpListener; use std::net::TcpStream; use std::path::Path; use std::path::PathBuf; @@ -15,6 +16,7 @@ use std::process::Command; use std::process::Stdio; use std::thread; use std::time::Duration; +use std::time::Instant; use std::time::SystemTime; use anyhow::Context; @@ -51,6 +53,10 @@ use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SandboxPolicy; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequest; +use codex_app_server_protocol::ThreadDecrementElicitationParams; +use codex_app_server_protocol::ThreadDecrementElicitationResponse; +use codex_app_server_protocol::ThreadIncrementElicitationParams; +use codex_app_server_protocol::ThreadIncrementElicitationResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; @@ -65,6 +71,7 @@ use codex_app_server_protocol::UserInput as V2UserInput; use codex_core::config::Config; use codex_otel::current_span_w3c_trace_context; use codex_otel::otel_provider::OtelProvider; +use codex_protocol::openai_models::ReasoningEffort; use codex_protocol::protocol::W3cTraceContext; use codex_utils_cli::CliConfigOverrides; use serde::Serialize; @@ -99,7 +106,6 @@ const NOTIFICATIONS_TO_OPT_OUT: &[&str] = &[ "command/exec/outputDelta", "item/agentMessage/delta", "item/plan/delta", - "item/commandExecution/outputDelta", "item/fileChange/outputDelta", "item/reasoning/summaryTextDelta", "item/reasoning/textDelta", @@ -246,6 +252,36 @@ enum CliCommand { #[arg(long, default_value_t = 20)] limit: u32, }, + /// Increment the out-of-band elicitation pause counter for a thread. + #[command(name = "thread-increment-elicitation")] + ThreadIncrementElicitation { + /// Existing thread id to update. + thread_id: String, + }, + /// Decrement the out-of-band elicitation pause counter for a thread. + #[command(name = "thread-decrement-elicitation")] + ThreadDecrementElicitation { + /// Existing thread id to update. + thread_id: String, + }, + /// Run the live websocket harness that proves elicitation pause prevents a + /// 10s unified exec timeout from killing a 15s helper script. + #[command(name = "live-elicitation-timeout-pause")] + LiveElicitationTimeoutPause { + /// Model passed to `thread/start`. + #[arg(long, env = "CODEX_E2E_MODEL", default_value = "gpt-5")] + model: String, + /// Existing workspace path used as the turn cwd. + #[arg(long, value_name = "path", default_value = ".")] + workspace: PathBuf, + /// Helper script to run from the model; defaults to the repo-local + /// live elicitation hold script. + #[arg(long, value_name = "path")] + script: Option, + /// Seconds the helper script should sleep while the timeout is paused. + #[arg(long, default_value_t = 15)] + hold_seconds: u64, + }, } pub async fn run() -> Result<()> { @@ -370,6 +406,33 @@ pub async fn run() -> Result<()> { let endpoint = resolve_endpoint(codex_bin, url)?; thread_list(&endpoint, &config_overrides, limit).await } + CliCommand::ThreadIncrementElicitation { thread_id } => { + ensure_dynamic_tools_unused(&dynamic_tools, "thread-increment-elicitation")?; + let url = resolve_shared_websocket_url(codex_bin, url, "thread-increment-elicitation")?; + thread_increment_elicitation(&url, thread_id) + } + CliCommand::ThreadDecrementElicitation { thread_id } => { + ensure_dynamic_tools_unused(&dynamic_tools, "thread-decrement-elicitation")?; + let url = resolve_shared_websocket_url(codex_bin, url, "thread-decrement-elicitation")?; + thread_decrement_elicitation(&url, thread_id) + } + CliCommand::LiveElicitationTimeoutPause { + model, + workspace, + script, + hold_seconds, + } => { + ensure_dynamic_tools_unused(&dynamic_tools, "live-elicitation-timeout-pause")?; + live_elicitation_timeout_pause( + codex_bin, + url, + &config_overrides, + model, + workspace, + script, + hold_seconds, + ) + } } } @@ -378,6 +441,11 @@ enum Endpoint { ConnectWs(String), } +struct BackgroundAppServer { + process: Child, + url: String, +} + fn resolve_endpoint(codex_bin: Option, url: Option) -> Result { if codex_bin.is_some() && url.is_some() { bail!("--codex-bin and --url are mutually exclusive"); @@ -391,6 +459,66 @@ fn resolve_endpoint(codex_bin: Option, url: Option) -> Result, + url: Option, + command: &str, +) -> Result { + if codex_bin.is_some() { + bail!( + "{command} requires --url or an already-running websocket app-server; --codex-bin would spawn a private stdio app-server instead" + ); + } + + Ok(url.unwrap_or_else(|| "ws://127.0.0.1:4222".to_string())) +} + +impl BackgroundAppServer { + fn spawn(codex_bin: &Path, config_overrides: &[String]) -> Result { + let listener = TcpListener::bind("127.0.0.1:0") + .context("failed to reserve a local port for websocket app-server")?; + let addr = listener.local_addr()?; + drop(listener); + + let url = format!("ws://{addr}"); + let mut cmd = Command::new(codex_bin); + if let Some(codex_bin_parent) = codex_bin.parent() { + let mut path = OsString::from(codex_bin_parent.as_os_str()); + if let Some(existing_path) = std::env::var_os("PATH") { + path.push(":"); + path.push(existing_path); + } + cmd.env("PATH", path); + } + for override_kv in config_overrides { + cmd.arg("--config").arg(override_kv); + } + let process = cmd + .arg("app-server") + .arg("--listen") + .arg(&url) + .stdin(Stdio::null()) + .stdout(Stdio::null()) + .stderr(Stdio::inherit()) + .spawn() + .with_context(|| format!("failed to start `{}` app-server", codex_bin.display()))?; + + Ok(Self { process, url }) + } +} + +impl Drop for BackgroundAppServer { + fn drop(&mut self) { + if let Ok(Some(status)) = self.process.try_wait() { + println!("[background app-server exited: {status}]"); + return; + } + + let _ = self.process.kill(); + let _ = self.process.wait(); + } +} + fn serve(codex_bin: &Path, config_overrides: &[String], listen: &str, kill: bool) -> Result<()> { let runtime_dir = PathBuf::from("/tmp/codex-app-server-test-client"); fs::create_dir_all(&runtime_dir) @@ -1020,6 +1148,190 @@ async fn with_client( result } +fn thread_increment_elicitation(url: &str, thread_id: String) -> Result<()> { + let endpoint = Endpoint::ConnectWs(url.to_string()); + let mut client = CodexClient::connect(&endpoint, &[])?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let response = + client.thread_increment_elicitation(ThreadIncrementElicitationParams { thread_id })?; + println!("< thread/increment_elicitation response: {response:?}"); + + Ok(()) +} + +fn thread_decrement_elicitation(url: &str, thread_id: String) -> Result<()> { + let endpoint = Endpoint::ConnectWs(url.to_string()); + let mut client = CodexClient::connect(&endpoint, &[])?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let response = + client.thread_decrement_elicitation(ThreadDecrementElicitationParams { thread_id })?; + println!("< thread/decrement_elicitation response: {response:?}"); + + Ok(()) +} + +fn live_elicitation_timeout_pause( + codex_bin: Option, + url: Option, + config_overrides: &[String], + model: String, + workspace: PathBuf, + script: Option, + hold_seconds: u64, +) -> Result<()> { + if cfg!(windows) { + bail!("live-elicitation-timeout-pause currently requires a POSIX shell"); + } + if hold_seconds <= 10 { + bail!("--hold-seconds must be greater than 10 to exceed the unified exec timeout"); + } + + let mut _background_server = None; + let websocket_url = match (codex_bin, url) { + (Some(_), Some(_)) => bail!("--codex-bin and --url are mutually exclusive"), + (Some(codex_bin), None) => { + let server = BackgroundAppServer::spawn(&codex_bin, config_overrides)?; + let websocket_url = server.url.clone(); + _background_server = Some(server); + websocket_url + } + (None, Some(url)) => url, + (None, None) => "ws://127.0.0.1:4222".to_string(), + }; + + let script_path = script.unwrap_or_else(|| { + PathBuf::from(env!("CARGO_MANIFEST_DIR")) + .join("scripts") + .join("live_elicitation_hold.sh") + }); + if !script_path.is_file() { + bail!("helper script not found: {}", script_path.display()); + } + + let workspace = workspace + .canonicalize() + .with_context(|| format!("failed to resolve workspace `{}`", workspace.display()))?; + let app_server_test_client_bin = std::env::current_exe() + .context("failed to resolve codex-app-server-test-client binary path")?; + let endpoint = Endpoint::ConnectWs(websocket_url.clone()); + let mut client = CodexClient::connect(&endpoint, &[])?; + + let initialize = client.initialize()?; + println!("< initialize response: {initialize:?}"); + + let thread_response = client.thread_start(ThreadStartParams { + model: Some(model), + ..Default::default() + })?; + println!("< thread/start response: {thread_response:?}"); + + let thread_id = thread_response.thread.id; + let command = format!( + "APP_SERVER_URL={} APP_SERVER_TEST_CLIENT_BIN={} ELICITATION_HOLD_SECONDS={} sh {}", + shell_quote(&websocket_url), + shell_quote(&app_server_test_client_bin.display().to_string()), + hold_seconds, + shell_quote(&script_path.display().to_string()), + ); + let prompt = format!( + "Use the `exec_command` tool exactly once. Set its `cmd` field to the exact shell command below. Do not rewrite it, do not split it, do not call any other tool, do not set `yield_time_ms`, and wait for the command to finish before replying.\n\n{command}\n\nAfter the command finishes, reply with exactly `DONE`." + ); + + let started_at = Instant::now(); + let turn_response = client.turn_start(TurnStartParams { + thread_id: thread_id.clone(), + input: vec![V2UserInput::Text { + text: prompt, + text_elements: Vec::new(), + }], + approval_policy: Some(AskForApproval::Never), + sandbox_policy: Some(SandboxPolicy::DangerFullAccess), + effort: Some(ReasoningEffort::High), + cwd: Some(workspace), + ..Default::default() + })?; + println!("< turn/start response: {turn_response:?}"); + + let stream_result = client.stream_turn(&thread_id, &turn_response.turn.id); + let elapsed = started_at.elapsed(); + + let validation_result = (|| -> Result<()> { + stream_result?; + + let helper_output = client + .command_execution_outputs + .iter() + .find(|output| output.contains("[elicitation-hold]")) + .cloned() + .ok_or_else(|| anyhow::anyhow!("expected helper script markers in command output"))?; + let minimum_elapsed = Duration::from_secs(hold_seconds.saturating_sub(1)); + + if client.last_turn_status != Some(TurnStatus::Completed) { + bail!( + "expected completed turn, got {:?} (last error: {:?})", + client.last_turn_status, + client.last_turn_error_message + ); + } + if !client + .command_execution_statuses + .contains(&CommandExecutionStatus::Completed) + { + bail!( + "expected a completed command execution, got {:?}", + client.command_execution_statuses + ); + } + if !client.helper_done_seen || !helper_output.contains("[elicitation-hold] done") { + bail!( + "expected helper script completion marker in command output, got: {helper_output:?}" + ); + } + if !client.unexpected_items_before_helper_done.is_empty() { + bail!( + "turn started new items before helper completion: {:?}", + client.unexpected_items_before_helper_done + ); + } + if client.turn_completed_before_helper_done { + bail!("turn completed before helper script finished"); + } + if elapsed < minimum_elapsed { + bail!( + "turn completed too quickly to prove timeout pause worked: elapsed={elapsed:?}, expected at least {minimum_elapsed:?}" + ); + } + + Ok(()) + })(); + + match client.thread_decrement_elicitation(ThreadDecrementElicitationParams { + thread_id: thread_id.clone(), + }) { + Ok(response) => { + println!("[cleanup] thread/decrement_elicitation response after harness: {response:?}"); + } + Err(err) => { + eprintln!("[cleanup] thread/decrement_elicitation ignored: {err:#}"); + } + } + + validation_result?; + + println!( + "[live elicitation timeout pause summary] thread_id={thread_id}, turn_id={}, elapsed={elapsed:?}, command_statuses={:?}", + turn_response.turn.id, client.command_execution_statuses + ); + + Ok(()) +} + fn ensure_dynamic_tools_unused( dynamic_tools: &Option>, command: &str, @@ -1073,7 +1385,14 @@ struct CodexClient { command_approval_count: usize, command_approval_item_ids: Vec, command_execution_statuses: Vec, + command_execution_outputs: Vec, + command_output_stream: String, + command_item_started: bool, + helper_done_seen: bool, + turn_completed_before_helper_done: bool, + unexpected_items_before_helper_done: Vec, last_turn_status: Option, + last_turn_error_message: Option, } #[derive(Debug, Clone, Copy)] @@ -1082,6 +1401,18 @@ enum CommandApprovalBehavior { AbortOn(usize), } +fn item_started_before_helper_done_is_unexpected( + item: &ThreadItem, + command_item_started: bool, + helper_done_seen: bool, +) -> bool { + if !command_item_started || helper_done_seen { + return false; + } + + !matches!(item, ThreadItem::UserMessage { .. }) +} + impl CodexClient { fn connect(endpoint: &Endpoint, config_overrides: &[String]) -> Result { match endpoint { @@ -1132,17 +1463,35 @@ impl CodexClient { command_approval_count: 0, command_approval_item_ids: Vec::new(), command_execution_statuses: Vec::new(), + command_execution_outputs: Vec::new(), + command_output_stream: String::new(), + command_item_started: false, + helper_done_seen: false, + turn_completed_before_helper_done: false, + unexpected_items_before_helper_done: Vec::new(), last_turn_status: None, + last_turn_error_message: None, }) } fn connect_websocket(url: &str) -> Result { let parsed = Url::parse(url).with_context(|| format!("invalid websocket URL `{url}`"))?; - let (socket, _response) = connect(parsed.as_str()).with_context(|| { - format!( - "failed to connect to websocket app-server at `{url}`; if no server is running, start one with `codex-app-server-test-client serve --listen {url}`" - ) - })?; + let deadline = Instant::now() + Duration::from_secs(10); + let (socket, _response) = loop { + match connect(parsed.as_str()) { + Ok(result) => break result, + Err(err) => { + if Instant::now() >= deadline { + return Err(err).with_context(|| { + format!( + "failed to connect to websocket app-server at `{url}`; if no server is running, start one with `codex-app-server-test-client serve --listen {url}`" + ) + }); + } + thread::sleep(Duration::from_millis(50)); + } + } + }; Ok(Self { transport: ClientTransport::WebSocket { url: url.to_string(), @@ -1153,10 +1502,27 @@ impl CodexClient { command_approval_count: 0, command_approval_item_ids: Vec::new(), command_execution_statuses: Vec::new(), + command_execution_outputs: Vec::new(), + command_output_stream: String::new(), + command_item_started: false, + helper_done_seen: false, + turn_completed_before_helper_done: false, + unexpected_items_before_helper_done: Vec::new(), last_turn_status: None, + last_turn_error_message: None, }) } + fn note_helper_output(&mut self, output: &str) { + self.command_output_stream.push_str(output); + if self + .command_output_stream + .contains("[elicitation-hold] done") + { + self.helper_done_seen = true; + } + } + fn initialize(&mut self) -> Result { self.initialize_with_experimental_api(true) } @@ -1268,6 +1634,32 @@ impl CodexClient { self.send_request(request, request_id, "thread/list") } + fn thread_increment_elicitation( + &mut self, + params: ThreadIncrementElicitationParams, + ) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::ThreadIncrementElicitation { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "thread/increment_elicitation") + } + + fn thread_decrement_elicitation( + &mut self, + params: ThreadDecrementElicitationParams, + ) -> Result { + let request_id = self.request_id(); + let request = ClientRequest::ThreadDecrementElicitation { + request_id: request_id.clone(), + params, + }; + + self.send_request(request, request_id, "thread/decrement_elicitation") + } + fn wait_for_account_login_completion( &mut self, expected_login_id: &str, @@ -1320,6 +1712,7 @@ impl CodexClient { std::io::stdout().flush().ok(); } ServerNotification::CommandExecutionOutputDelta(delta) => { + self.note_helper_output(&delta.delta); print!("{}", delta.delta); std::io::stdout().flush().ok(); } @@ -1328,17 +1721,48 @@ impl CodexClient { std::io::stdout().flush().ok(); } ServerNotification::ItemStarted(payload) => { + if matches!(payload.item, ThreadItem::CommandExecution { .. }) { + if self.command_item_started && !self.helper_done_seen { + self.unexpected_items_before_helper_done + .push(payload.item.clone()); + } + self.command_item_started = true; + } else if item_started_before_helper_done_is_unexpected( + &payload.item, + self.command_item_started, + self.helper_done_seen, + ) { + self.unexpected_items_before_helper_done + .push(payload.item.clone()); + } println!("\n< item started: {:?}", payload.item); } ServerNotification::ItemCompleted(payload) => { - if let ThreadItem::CommandExecution { status, .. } = payload.item.clone() { + if let ThreadItem::CommandExecution { + status, + aggregated_output, + .. + } = payload.item.clone() + { self.command_execution_statuses.push(status); + if let Some(aggregated_output) = aggregated_output { + self.note_helper_output(&aggregated_output); + self.command_execution_outputs.push(aggregated_output); + } } println!("< item completed: {:?}", payload.item); } ServerNotification::TurnCompleted(payload) => { if payload.turn.id == turn_id { self.last_turn_status = Some(payload.turn.status.clone()); + if self.command_item_started && !self.helper_done_seen { + self.turn_completed_before_helper_done = true; + } + self.last_turn_error_message = payload + .turn + .error + .as_ref() + .map(|error| error.message.clone()); println!("\n< turn/completed notification: {:?}", payload.turn.status); if payload.turn.status == TurnStatus::Failed && let Some(error) = payload.turn.error diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index b2d5c0a75b7..5cc34664f47 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -116,8 +116,12 @@ use codex_app_server_protocol::ThreadBackgroundTerminalsCleanResponse; use codex_app_server_protocol::ThreadClosedNotification; use codex_app_server_protocol::ThreadCompactStartParams; use codex_app_server_protocol::ThreadCompactStartResponse; +use codex_app_server_protocol::ThreadDecrementElicitationParams; +use codex_app_server_protocol::ThreadDecrementElicitationResponse; use codex_app_server_protocol::ThreadForkParams; use codex_app_server_protocol::ThreadForkResponse; +use codex_app_server_protocol::ThreadIncrementElicitationParams; +use codex_app_server_protocol::ThreadIncrementElicitationResponse; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadListParams; use codex_app_server_protocol::ThreadListResponse; @@ -644,6 +648,14 @@ impl CodexMessageProcessor { self.thread_archive(to_connection_request_id(request_id), params) .await; } + ClientRequest::ThreadIncrementElicitation { request_id, params } => { + self.thread_increment_elicitation(to_connection_request_id(request_id), params) + .await; + } + ClientRequest::ThreadDecrementElicitation { request_id, params } => { + self.thread_decrement_elicitation(to_connection_request_id(request_id), params) + .await; + } ClientRequest::ThreadSetName { request_id, params } => { self.thread_set_name(to_connection_request_id(request_id), params) .await; @@ -2087,6 +2099,79 @@ impl CodexMessageProcessor { } } + async fn thread_increment_elicitation( + &self, + request_id: ConnectionRequestId, + params: ThreadIncrementElicitationParams, + ) { + let (_, thread) = match self.load_thread(¶ms.thread_id).await { + Ok(value) => value, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match thread.increment_out_of_band_elicitation_count().await { + Ok(count) => { + self.outgoing + .send_response( + request_id, + ThreadIncrementElicitationResponse { + count, + paused: count > 0, + }, + ) + .await; + } + Err(err) => { + self.send_internal_error( + request_id, + format!("failed to increment out-of-band elicitation counter: {err}"), + ) + .await; + } + } + } + + async fn thread_decrement_elicitation( + &self, + request_id: ConnectionRequestId, + params: ThreadDecrementElicitationParams, + ) { + let (_, thread) = match self.load_thread(¶ms.thread_id).await { + Ok(value) => value, + Err(error) => { + self.outgoing.send_error(request_id, error).await; + return; + } + }; + + match thread.decrement_out_of_band_elicitation_count().await { + Ok(count) => { + self.outgoing + .send_response( + request_id, + ThreadDecrementElicitationResponse { + count, + paused: count > 0, + }, + ) + .await; + } + Err(CodexErr::InvalidRequest(message)) => { + self.send_invalid_request_error(request_id, message).await; + } + Err(err) => { + self.send_internal_error( + request_id, + format!("failed to decrement out-of-band elicitation counter: {err}"), + ) + .await; + } + } + } + async fn thread_set_name(&self, request_id: ConnectionRequestId, params: ThreadSetNameParams) { let ThreadSetNameParams { thread_id, name } = params; let thread_id = match ThreadId::from_string(&thread_id) { diff --git a/codex-rs/app-server/tests/suite/v2/thread_resume.rs b/codex-rs/app-server/tests/suite/v2/thread_resume.rs index e5ed8f57cda..5d1bbdab83a 100644 --- a/codex-rs/app-server/tests/suite/v2/thread_resume.rs +++ b/codex-rs/app-server/tests/suite/v2/thread_resume.rs @@ -835,12 +835,14 @@ async fn thread_resume_rejoins_running_thread_even_with_override_mismatch() -> R let ThreadResumeResponse { thread, model, .. } = to_response::(resume_resp)?; assert_eq!(model, "gpt-5.1-codex-max"); - assert_eq!( - thread.status, - ThreadStatus::Active { - active_flags: vec![], - } - ); + // The running-thread resume response is queued onto the thread listener task. + // If the in-flight turn completes before that queued command runs, the response + // can legitimately observe the thread as idle. + match &thread.status { + ThreadStatus::Active { active_flags } => assert!(active_flags.is_empty()), + ThreadStatus::Idle => {} + status => panic!("unexpected thread status after running resume: {status:?}"), + } timeout( DEFAULT_READ_TIMEOUT, diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 66985375e8e..758a6d5d43e 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -640,6 +640,7 @@ pub(crate) struct Session { pub(crate) conversation_id: ThreadId, tx_event: Sender, agent_status: watch::Sender, + out_of_band_elicitation_paused: watch::Sender, state: Mutex, /// The set of enabled features should be invariant for the lifetime of the /// session. @@ -1085,6 +1086,14 @@ impl Session { state.session_configuration.codex_home().clone() } + pub(crate) fn subscribe_out_of_band_elicitation_pause_state(&self) -> watch::Receiver { + self.out_of_band_elicitation_paused.subscribe() + } + + pub(crate) fn set_out_of_band_elicitation_pause_state(&self, paused: bool) { + self.out_of_band_elicitation_paused.send_replace(paused); + } + fn start_file_watcher_listener(self: &Arc) { let mut rx = self.services.file_watcher.subscribe(); let weak_sess = Arc::downgrade(self); @@ -1586,11 +1595,14 @@ impl Session { config.js_repl_node_path.clone(), config.js_repl_node_module_dirs.clone(), )); + let (out_of_band_elicitation_paused, _out_of_band_elicitation_paused_rx) = + watch::channel(false); let sess = Arc::new(Session { conversation_id, tx_event: tx_event.clone(), agent_status, + out_of_band_elicitation_paused, state: Mutex::new(state), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 6e7115619cc..55ffaf9e2f1 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2128,6 +2128,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { conversation_id, tx_event, agent_status: agent_status_tx, + out_of_band_elicitation_paused: watch::channel(false).0, state: Mutex::new(state), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), @@ -2535,6 +2536,7 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( conversation_id, tx_event, agent_status: agent_status_tx, + out_of_band_elicitation_paused: watch::channel(false).0, state: Mutex::new(state), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index c4fc1e0875e..b33a66c3edb 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -2,6 +2,7 @@ use crate::agent::AgentStatus; use crate::codex::Codex; use crate::codex::SteerInputError; use crate::config::ConstraintResult; +use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::features::Feature; use crate::file_watcher::WatchRegistration; @@ -20,6 +21,7 @@ use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::TokenUsage; use codex_protocol::user_input::UserInput; use std::path::PathBuf; +use tokio::sync::Mutex; use tokio::sync::watch; use crate::state_db::StateDbHandle; @@ -41,6 +43,7 @@ pub struct ThreadConfigSnapshot { pub struct CodexThread { pub(crate) codex: Codex, rollout_path: Option, + out_of_band_elicitation_count: Mutex, _watch_registration: WatchRegistration, } @@ -55,6 +58,7 @@ impl CodexThread { Self { codex, rollout_path, + out_of_band_elicitation_count: Mutex::new(0), _watch_registration: watch_registration, } } @@ -143,4 +147,39 @@ impl CodexThread { pub fn enabled(&self, feature: Feature) -> bool { self.codex.enabled(feature) } + + pub async fn increment_out_of_band_elicitation_count(&self) -> CodexResult { + let mut guard = self.out_of_band_elicitation_count.lock().await; + let was_zero = *guard == 0; + *guard = guard.checked_add(1).ok_or_else(|| { + CodexErr::Fatal("out-of-band elicitation count overflowed".to_string()) + })?; + + if was_zero { + self.codex + .session + .set_out_of_band_elicitation_pause_state(true); + } + + Ok(*guard) + } + + pub async fn decrement_out_of_band_elicitation_count(&self) -> CodexResult { + let mut guard = self.out_of_band_elicitation_count.lock().await; + if *guard == 0 { + return Err(CodexErr::InvalidRequest( + "out-of-band elicitation count is already zero".to_string(), + )); + } + + *guard -= 1; + let now_zero = *guard == 0; + if now_zero { + self.codex + .session + .set_out_of_band_elicitation_pause_state(false); + } + + Ok(*guard) + } } diff --git a/codex-rs/core/src/mcp_connection_manager.rs b/codex-rs/core/src/mcp_connection_manager.rs index e339d872210..ac75c7fb04d 100644 --- a/codex-rs/core/src/mcp_connection_manager.rs +++ b/codex-rs/core/src/mcp_connection_manager.rs @@ -1377,7 +1377,6 @@ async fn start_server_task( .as_ref() .and_then(|exp| exp.get(MCP_SANDBOX_STATE_CAPABILITY)) .is_some(); - let managed = ManagedClient { client: Arc::clone(&client), tools, diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 2e46d6a9432..36fc1b60065 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -434,6 +434,39 @@ mod tests { Ok(()) } + #[tokio::test(flavor = "multi_thread", worker_threads = 2)] + async fn unified_exec_pause_blocks_yield_timeout() -> anyhow::Result<()> { + skip_if_sandbox!(Ok(())); + + let (session, turn) = test_session_and_turn().await; + session.set_out_of_band_elicitation_pause_state(true); + + let paused_session = Arc::clone(&session); + tokio::spawn(async move { + tokio::time::sleep(Duration::from_secs(2)).await; + paused_session.set_out_of_band_elicitation_pause_state(false); + }); + + let started = tokio::time::Instant::now(); + let response = + exec_command(&session, &turn, "sleep 1 && echo unified-exec-done", 250).await?; + + assert!( + started.elapsed() >= Duration::from_secs(2), + "pause should block the unified exec yield timeout" + ); + assert!( + response.output.contains("unified-exec-done"), + "exec_command should wait for output after the pause lifts" + ); + assert!( + response.process_id.is_none(), + "completed command should not leave a background process" + ); + + Ok(()) + } + #[tokio::test] #[ignore] // Ignored while we have a better way to test this. async fn requests_with_large_timeout_are_capped() -> anyhow::Result<()> { diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index 770379afa42..81e1dcf5b87 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -8,6 +8,7 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use tokio::sync::Notify; use tokio::sync::mpsc; +use tokio::sync::watch; use tokio::time::Duration; use tokio::time::Instant; use tokio_util::sync::CancellationToken; @@ -98,6 +99,7 @@ struct PreparedProcessHandles { output_closed: Arc, output_closed_notify: Arc, cancellation_token: CancellationToken, + pause_state: Option>, command: Vec, process_id: String, tty: bool, @@ -215,6 +217,11 @@ impl UnifiedExecProcessManager { &output_closed, &output_closed_notify, &cancellation_token, + Some( + context + .session + .subscribe_out_of_band_elicitation_pause_state(), + ), deadline, ) .await; @@ -308,6 +315,7 @@ impl UnifiedExecProcessManager { output_closed, output_closed_notify, cancellation_token, + pause_state, command: session_command, process_id, tty, @@ -343,6 +351,7 @@ impl UnifiedExecProcessManager { &output_closed, &output_closed_notify, &cancellation_token, + pause_state, deadline, ) .await; @@ -442,6 +451,10 @@ impl UnifiedExecProcessManager { output_closed_notify, cancellation_token, } = entry.process.output_handles(); + let pause_state = entry + .session + .upgrade() + .map(|session| session.subscribe_out_of_band_elicitation_pause_state()); Ok(PreparedProcessHandles { writer_tx: entry.process.writer_sender(), @@ -450,6 +463,7 @@ impl UnifiedExecProcessManager { output_closed, output_closed_notify, cancellation_token, + pause_state, command: entry.command.clone(), process_id: entry.process_id.clone(), tty: entry.tty, @@ -624,7 +638,8 @@ impl UnifiedExecProcessManager { output_closed: &Arc, output_closed_notify: &Arc, cancellation_token: &CancellationToken, - deadline: Instant, + mut pause_state: Option>, + mut deadline: Instant, ) -> Vec { const POST_EXIT_CLOSE_WAIT_CAP: Duration = Duration::from_millis(50); @@ -632,6 +647,12 @@ impl UnifiedExecProcessManager { let mut exit_signal_received = cancellation_token.is_cancelled(); let mut post_exit_deadline: Option = None; loop { + Self::extend_deadlines_while_paused( + &mut pause_state, + &mut deadline, + &mut post_exit_deadline, + ) + .await; let drained_chunks: Vec>; let mut wait_for_output = None; { @@ -669,6 +690,7 @@ impl UnifiedExecProcessManager { _ = &mut notified => {} _ = &mut closed => {} _ = tokio::time::sleep(close_wait_remaining) => break, + _ = Self::wait_for_pause_change(pause_state.as_ref()) => {} } continue; } @@ -681,6 +703,7 @@ impl UnifiedExecProcessManager { _ = &mut notified => {} _ = &mut exit_notified => exit_signal_received = true, _ = tokio::time::sleep(remaining) => break, + _ = Self::wait_for_pause_change(pause_state.as_ref()) => {} } continue; } @@ -698,6 +721,42 @@ impl UnifiedExecProcessManager { collected } + async fn extend_deadlines_while_paused( + pause_state: &mut Option>, + deadline: &mut Instant, + post_exit_deadline: &mut Option, + ) { + let Some(receiver) = pause_state.as_mut() else { + return; + }; + if !*receiver.borrow() { + return; + } + + let paused_at = Instant::now(); + while *receiver.borrow() { + if receiver.changed().await.is_err() { + break; + } + } + + let paused_for = paused_at.elapsed(); + *deadline += paused_for; + if let Some(post_exit_deadline) = post_exit_deadline.as_mut() { + *post_exit_deadline += paused_for; + } + } + + async fn wait_for_pause_change(pause_state: Option<&watch::Receiver>) { + match pause_state { + Some(pause_state) => { + let mut receiver = pause_state.clone(); + let _ = receiver.changed().await; + } + None => std::future::pending::<()>().await, + } + } + fn prune_processes_if_needed(store: &mut ProcessStore) -> Option { if store.processes.len() < MAX_UNIFIED_EXEC_PROCESSES { return None;