diff --git a/MODULE.bazel.lock b/MODULE.bazel.lock index ef80693c70bf..7a1acc3816b7 100644 --- a/MODULE.bazel.lock +++ b/MODULE.bazel.lock @@ -663,6 +663,7 @@ "bitflags_1.3.2": "{\"dependencies\":[{\"name\":\"compiler_builtins\",\"optional\":true,\"req\":\"^0.1.2\"},{\"name\":\"core\",\"optional\":true,\"package\":\"rustc-std-workspace-core\",\"req\":\"^1.0.0\"},{\"kind\":\"dev\",\"name\":\"rustversion\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"serde\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"serde_derive\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"trybuild\",\"req\":\"^1.0\"},{\"kind\":\"dev\",\"name\":\"walkdir\",\"req\":\"^2.3\"}],\"features\":{\"default\":[],\"example_generated\":[],\"rustc-dep-of-std\":[\"core\",\"compiler_builtins\"]}}", "bitflags_2.10.0": "{\"dependencies\":[{\"name\":\"arbitrary\",\"optional\":true,\"req\":\"^1.0\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"arbitrary\",\"req\":\"^1.0\"},{\"name\":\"bytemuck\",\"optional\":true,\"req\":\"^1.12\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"bytemuck\",\"req\":\"^1.12.2\"},{\"kind\":\"dev\",\"name\":\"rustversion\",\"req\":\"^1.0\"},{\"default_features\":false,\"name\":\"serde_core\",\"optional\":true,\"req\":\"^1.0.228\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"serde_lib\",\"package\":\"serde\",\"req\":\"^1.0.103\"},{\"kind\":\"dev\",\"name\":\"serde_test\",\"req\":\"^1.0.19\"},{\"kind\":\"dev\",\"name\":\"trybuild\",\"req\":\"^1.0.18\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"zerocopy\",\"req\":\"^0.8\"}],\"features\":{\"example_generated\":[],\"serde\":[\"serde_core\"],\"std\":[]}}", "bitflags_2.11.0": "{\"dependencies\":[{\"name\":\"arbitrary\",\"optional\":true,\"req\":\"^1.0\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"arbitrary\",\"req\":\"^1.0\"},{\"name\":\"bytemuck\",\"optional\":true,\"req\":\"^1.12\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"bytemuck\",\"req\":\"^1.12.2\"},{\"kind\":\"dev\",\"name\":\"rustversion\",\"req\":\"^1.0\"},{\"default_features\":false,\"name\":\"serde_core\",\"optional\":true,\"req\":\"^1.0.228\"},{\"kind\":\"dev\",\"name\":\"serde_json\",\"req\":\"^1.0\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"serde_lib\",\"package\":\"serde\",\"req\":\"^1.0.103\"},{\"kind\":\"dev\",\"name\":\"serde_test\",\"req\":\"^1.0.19\"},{\"kind\":\"dev\",\"name\":\"trybuild\",\"req\":\"^1.0.18\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"zerocopy\",\"req\":\"^0.8\"}],\"features\":{\"example_generated\":[],\"serde\":[\"serde_core\"],\"std\":[]}}", + "blake2_0.10.6": "{\"dependencies\":[{\"features\":[\"mac\"],\"name\":\"digest\",\"req\":\"^0.10.3\"},{\"features\":[\"dev\"],\"kind\":\"dev\",\"name\":\"digest\",\"req\":\"^0.10.3\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.2.2\"}],\"features\":{\"default\":[\"std\"],\"reset\":[],\"simd\":[],\"simd_asm\":[\"simd_opt\"],\"simd_opt\":[\"simd\"],\"size_opt\":[],\"std\":[\"digest/std\"]}}", "block-buffer_0.10.4": "{\"dependencies\":[{\"name\":\"generic-array\",\"req\":\"^0.14\"}],\"features\":{}}", "block-padding_0.3.3": "{\"dependencies\":[{\"name\":\"generic-array\",\"req\":\"^0.14\"}],\"features\":{\"std\":[]}}", "block2_0.6.2": "{\"dependencies\":[{\"default_features\":false,\"features\":[\"std\"],\"name\":\"objc2\",\"req\":\">=0.6.2, <0.8.0\"}],\"features\":{\"alloc\":[],\"compiler-rt\":[\"objc2/unstable-compiler-rt\"],\"default\":[\"std\"],\"gnustep-1-7\":[\"objc2/gnustep-1-7\"],\"gnustep-1-8\":[\"gnustep-1-7\",\"objc2/gnustep-1-8\"],\"gnustep-1-9\":[\"gnustep-1-8\",\"objc2/gnustep-1-9\"],\"gnustep-2-0\":[\"gnustep-1-9\",\"objc2/gnustep-2-0\"],\"gnustep-2-1\":[\"gnustep-2-0\",\"objc2/gnustep-2-1\"],\"std\":[\"alloc\"],\"unstable-coerce-pointee\":[],\"unstable-objfw\":[],\"unstable-private\":[],\"unstable-winobjc\":[\"gnustep-1-8\"]}}", @@ -752,6 +753,8 @@ "crossterm_winapi_0.9.1": "{\"dependencies\":[{\"features\":[\"winbase\",\"consoleapi\",\"processenv\",\"handleapi\",\"synchapi\",\"impl-default\"],\"name\":\"winapi\",\"req\":\"^0.3.8\",\"target\":\"cfg(windows)\"}],\"features\":{}}", "crunchy_0.2.4": "{\"dependencies\":[],\"features\":{\"default\":[\"limit_128\"],\"limit_1024\":[],\"limit_128\":[],\"limit_2048\":[],\"limit_256\":[],\"limit_512\":[],\"limit_64\":[],\"std\":[]}}", "crypto-common_0.1.7": "{\"dependencies\":[{\"features\":[\"more_lengths\"],\"name\":\"generic-array\",\"req\":\"=0.14.7\"},{\"name\":\"rand_core\",\"optional\":true,\"req\":\"^0.6\"},{\"name\":\"typenum\",\"req\":\"^1.14\"}],\"features\":{\"getrandom\":[\"rand_core/getrandom\"],\"std\":[]}}", + "crypto_box_0.9.1": "{\"dependencies\":[{\"default_features\":false,\"name\":\"aead\",\"req\":\"^0.5.2\"},{\"kind\":\"dev\",\"name\":\"bincode\",\"req\":\"^1\"},{\"default_features\":false,\"name\":\"blake2\",\"optional\":true,\"req\":\"^0.10\"},{\"name\":\"chacha20\",\"optional\":true,\"req\":\"^0.9\"},{\"default_features\":false,\"name\":\"crypto_secretbox\",\"req\":\"^0.1.1\"},{\"default_features\":false,\"features\":[\"zeroize\"],\"name\":\"curve25519-dalek\",\"req\":\"^4\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"kind\":\"dev\",\"name\":\"rand\",\"req\":\"^0.8\"},{\"kind\":\"dev\",\"name\":\"rmp-serde\",\"req\":\"^1\"},{\"name\":\"salsa20\",\"optional\":true,\"req\":\"^0.10\"},{\"default_features\":false,\"name\":\"serdect\",\"optional\":true,\"req\":\"^0.2\"},{\"default_features\":false,\"name\":\"subtle\",\"req\":\"^2\"},{\"default_features\":false,\"name\":\"zeroize\",\"req\":\"^1\"}],\"features\":{\"alloc\":[\"aead/alloc\"],\"chacha20\":[\"dep:chacha20\",\"crypto_secretbox/chacha20\"],\"default\":[\"alloc\",\"getrandom\",\"salsa20\"],\"getrandom\":[\"aead/getrandom\",\"rand_core\"],\"heapless\":[\"aead/heapless\"],\"rand_core\":[\"aead/rand_core\"],\"salsa20\":[\"dep:salsa20\",\"crypto_secretbox/salsa20\"],\"seal\":[\"dep:blake2\",\"alloc\"],\"serde\":[\"dep:serdect\"],\"std\":[\"aead/std\"]}}", + "crypto_secretbox_0.1.1": "{\"dependencies\":[{\"default_features\":false,\"name\":\"aead\",\"req\":\"^0.5\"},{\"features\":[\"zeroize\"],\"name\":\"chacha20\",\"optional\":true,\"req\":\"^0.9\"},{\"default_features\":false,\"name\":\"cipher\",\"req\":\"^0.4\"},{\"default_features\":false,\"features\":[\"zeroize\"],\"name\":\"generic-array\",\"req\":\"^0.14.7\"},{\"kind\":\"dev\",\"name\":\"hex-literal\",\"req\":\"^0.4\"},{\"name\":\"poly1305\",\"req\":\"^0.8\"},{\"features\":[\"zeroize\"],\"name\":\"salsa20\",\"optional\":true,\"req\":\"^0.10\"},{\"default_features\":false,\"name\":\"subtle\",\"req\":\"^2\"},{\"default_features\":false,\"name\":\"zeroize\",\"req\":\"^1\"}],\"features\":{\"alloc\":[\"aead/alloc\"],\"default\":[\"alloc\",\"getrandom\",\"salsa20\"],\"getrandom\":[\"aead/getrandom\",\"rand_core\"],\"heapless\":[\"aead/heapless\"],\"rand_core\":[\"aead/rand_core\"],\"std\":[\"aead/std\",\"alloc\"],\"stream\":[\"aead/stream\"]}}", "csv-core_0.1.13": "{\"dependencies\":[{\"default_features\":false,\"kind\":\"dev\",\"name\":\"arrayvec\",\"req\":\"^0.5\"},{\"default_features\":false,\"name\":\"memchr\",\"req\":\"^2\"}],\"features\":{\"default\":[],\"libc\":[\"memchr/libc\"]}}", "csv_1.4.0": "{\"dependencies\":[{\"default_features\":false,\"features\":[\"alloc\",\"serde\"],\"kind\":\"dev\",\"name\":\"bstr\",\"req\":\"^1.7.0\"},{\"name\":\"csv-core\",\"req\":\"^0.1.11\"},{\"name\":\"itoa\",\"req\":\"^1\"},{\"name\":\"ryu\",\"req\":\"^1\"},{\"features\":[\"derive\"],\"kind\":\"dev\",\"name\":\"serde\",\"req\":\"^1.0.221\"},{\"name\":\"serde_core\",\"req\":\"^1.0.221\"}],\"features\":{}}", "ctor-proc-macro_0.0.7": "{\"dependencies\":[],\"features\":{\"default\":[]}}", diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index a499a511ca9f..ded2160d3aae 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -940,6 +940,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "blake2" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46502ad458c9a52b69d4d4d32775c788b7a1b85e8bc9d482d92250fc0e3f8efe" +dependencies = [ + "digest", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -1959,6 +1968,7 @@ dependencies = [ "codex-windows-sandbox", "core-foundation 0.9.4", "core_test_support", + "crypto_box", "csv", "ctor 0.6.3", "dirs", @@ -1989,6 +1999,7 @@ dependencies = [ "serde_json", "serial_test", "sha1", + "sha2", "shlex", "similar", "tempfile", @@ -3654,9 +3665,40 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78c8292055d1c1df0cce5d180393dc8cce0abec0a7102adb6c7b1eef6016d60a" dependencies = [ "generic-array", + "rand_core 0.6.4", "typenum", ] +[[package]] +name = "crypto_box" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16182b4f39a82ec8a6851155cc4c0cda3065bb1db33651726a29e1951de0f009" +dependencies = [ + "aead", + "blake2", + "crypto_secretbox", + "curve25519-dalek", + "salsa20", + "subtle", + "zeroize", +] + +[[package]] +name = "crypto_secretbox" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9d6cf87adf719ddf43a805e92c6870a531aedda35ff640442cbaf8674e141e1" +dependencies = [ + "aead", + "cipher", + "generic-array", + "poly1305", + "salsa20", + "subtle", + "zeroize", +] + [[package]] name = "csv" version = "1.4.0" @@ -4946,6 +4988,7 @@ checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" dependencies = [ "typenum", "version_check", + "zeroize", ] [[package]] diff --git a/codex-rs/Cargo.toml b/codex-rs/Cargo.toml index deb373881227..c8f6b9e5a7a4 100644 --- a/codex-rs/Cargo.toml +++ b/codex-rs/Cargo.toml @@ -224,6 +224,7 @@ color-eyre = "0.6.3" constant_time_eq = "0.3.1" crossbeam-channel = "0.5.15" crossterm = "0.28.1" +crypto_box = { version = "0.9.1", features = ["seal"] } csv = "1.3.1" ctor = "0.6.3" deno_core_icudata = "0.77.0" diff --git a/codex-rs/app-server/tests/suite/auth.rs b/codex-rs/app-server/tests/suite/auth.rs index 1e608710126d..b35a6fcba152 100644 --- a/codex-rs/app-server/tests/suite/auth.rs +++ b/codex-rs/app-server/tests/suite/auth.rs @@ -351,6 +351,8 @@ async fn get_auth_status_omits_token_after_proactive_refresh_failure() -> Result )?; let server = MockServer::start().await; + // App-server startup may proactively read stale auth before this test sends + // getAuthStatus; require the refresh path without depending on that race. Mock::given(method("POST")) .and(path("/oauth/token")) .respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({ @@ -358,7 +360,7 @@ async fn get_auth_status_omits_token_after_proactive_refresh_failure() -> Result "code": "refresh_token_reused" } }))) - .expect(2) + .expect(1..=2) .mount(&server) .await; @@ -418,6 +420,8 @@ async fn get_auth_status_returns_token_after_proactive_refresh_recovery() -> Res )?; let server = MockServer::start().await; + // App-server startup may proactively read stale auth before this test sends + // getAuthStatus; require the refresh path without depending on that race. Mock::given(method("POST")) .and(path("/oauth/token")) .respond_with(ResponseTemplate::new(401).set_body_json(serde_json::json!({ @@ -425,7 +429,7 @@ async fn get_auth_status_returns_token_after_proactive_refresh_recovery() -> Res "code": "refresh_token_reused" } }))) - .expect(2) + .expect(1..=2) .mount(&server) .await; diff --git a/codex-rs/app-server/tests/suite/v2/plugin_list.rs b/codex-rs/app-server/tests/suite/v2/plugin_list.rs index 8bc4a8598ed7..5036101de4ec 100644 --- a/codex-rs/app-server/tests/suite/v2/plugin_list.rs +++ b/codex-rs/app-server/tests/suite/v2/plugin_list.rs @@ -30,7 +30,8 @@ use wiremock::matchers::method; use wiremock::matchers::path; use wiremock::matchers::query_param; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +// These tests start full app-server processes and can also run plugin startup warmers. +const DEFAULT_TIMEOUT: Duration = Duration::from_secs(60); const TEST_CURATED_PLUGIN_SHA: &str = "0123456789abcdef0123456789abcdef01234567"; const STARTUP_REMOTE_PLUGIN_SYNC_MARKER_FILE: &str = ".tmp/app-server-remote-plugin-sync-v1"; diff --git a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs index 1f7b8a3e4bdb..11dc0ac4cf2c 100644 --- a/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs +++ b/codex-rs/app-server/tests/suite/v2/realtime_conversation.rs @@ -72,6 +72,7 @@ use wiremock::matchers::path; use wiremock::matchers::path_regex; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); +const DELEGATED_SHELL_TOOL_TIMEOUT_MS: u64 = 30_000; const STARTUP_CONTEXT_HEADER: &str = "Startup context from Codex."; const V2_STEERING_ACKNOWLEDGEMENT: &str = "This was sent to steer the previous background agent task."; @@ -1717,7 +1718,9 @@ async fn webrtc_v2_tool_call_delegated_turn_can_execute_shell_tool() -> Result<( create_shell_command_sse_response( realtime_tool_ok_command(), /*workdir*/ None, - Some(5000), + // Windows CI can spend several seconds starting the nested PowerShell command. This + // test verifies delegated shell-tool plumbing, not timeout enforcement. + Some(DELEGATED_SHELL_TOOL_TIMEOUT_MS), "shell_call", )?, create_final_assistant_message_sse_response("shell tool finished")?, diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index 4ce4192b8190..a0d42598105b 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -35,6 +35,7 @@ codex-connectors = { workspace = true } codex-config = { workspace = true } codex-core-plugins = { workspace = true } codex-core-skills = { workspace = true } +crypto_box = { workspace = true } codex-exec-server = { workspace = true } codex-features = { workspace = true } codex-feedback = { workspace = true } @@ -99,6 +100,7 @@ rmcp = { workspace = true, default-features = false, features = [ serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } sha1 = { workspace = true } +sha2 = { workspace = true } shlex = { workspace = true } similar = { workspace = true } tempfile = { workspace = true } diff --git a/codex-rs/core/src/agent_identity.rs b/codex-rs/core/src/agent_identity.rs index 2c897b54565d..cfcab507bb91 100644 --- a/codex-rs/core/src/agent_identity.rs +++ b/codex-rs/core/src/agent_identity.rs @@ -27,6 +27,10 @@ use tracing::debug; use tracing::info; use tracing::warn; +mod task_registration; + +pub(crate) use task_registration::RegisteredAgentTask; + use crate::config::Config; const AGENT_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15); @@ -119,32 +123,58 @@ impl AgentIdentityManager { return Ok(None); } - let Some(auth) = self.auth_manager.auth().await else { - debug!("skipping agent identity registration because no auth is available"); + let Some((auth, binding)) = self.current_auth_binding().await else { return Ok(None); }; - let Some(binding) = - AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id()) - else { - debug!("skipping agent identity registration because ChatGPT auth is unavailable"); - return Ok(None); - }; + self.ensure_registered_identity_for_binding(&auth, &binding) + .await + .map(Some) + } + async fn ensure_registered_identity_for_binding( + &self, + auth: &CodexAuth, + binding: &AgentIdentityBinding, + ) -> Result { let _guard = self.ensure_lock.lock().await; - if let Some(stored_identity) = self.load_stored_identity(&auth, &binding)? { + if let Some(stored_identity) = self.load_stored_identity(auth, binding)? { info!( agent_runtime_id = %stored_identity.agent_runtime_id, binding_id = %binding.binding_id, "reusing stored agent identity" ); - return Ok(Some(stored_identity)); + return Ok(stored_identity); } - let stored_identity = self.register_agent_identity(&binding).await?; - self.store_identity(&auth, &stored_identity)?; - Ok(Some(stored_identity)) + let stored_identity = self.register_agent_identity(binding).await?; + self.store_identity(auth, &stored_identity)?; + Ok(stored_identity) + } + + pub(crate) async fn task_matches_current_binding(&self, task: &RegisteredAgentTask) -> bool { + if !self.feature_enabled { + return false; + } + + self.current_auth_binding() + .await + .is_some_and(|(_, binding)| task.matches_binding(&binding)) + } + + async fn current_auth_binding(&self) -> Option<(CodexAuth, AgentIdentityBinding)> { + let Some(auth) = self.auth_manager.auth().await else { + debug!("skipping agent identity flow because no auth is available"); + return None; + }; + + let binding = + AgentIdentityBinding::from_auth(&auth, self.auth_manager.forced_chatgpt_workspace_id()); + if binding.is_none() { + debug!("skipping agent identity flow because ChatGPT auth is unavailable"); + } + binding.map(|binding| (auth, binding)) } async fn register_agent_identity( @@ -351,12 +381,11 @@ impl StoredAgentIdentity { } fn matches_binding(&self, binding: &AgentIdentityBinding) -> bool { - self.binding_id == binding.binding_id - && self.chatgpt_account_id == binding.chatgpt_account_id - && match binding.chatgpt_user_id.as_deref() { - Some(chatgpt_user_id) => self.chatgpt_user_id.as_deref() == Some(chatgpt_user_id), - None => true, - } + binding.matches_parts( + &self.binding_id, + &self.chatgpt_account_id, + self.chatgpt_user_id.as_deref(), + ) } fn validate_key_material(&self) -> Result<()> { @@ -375,6 +404,20 @@ impl StoredAgentIdentity { } impl AgentIdentityBinding { + fn matches_parts( + &self, + binding_id: &str, + chatgpt_account_id: &str, + chatgpt_user_id: Option<&str>, + ) -> bool { + binding_id == self.binding_id + && chatgpt_account_id == self.chatgpt_account_id + && match self.chatgpt_user_id.as_deref() { + Some(expected_user_id) => chatgpt_user_id == Some(expected_user_id), + None => true, + } + } + fn from_auth(auth: &CodexAuth, forced_workspace_id: Option) -> Option { if !auth.is_chatgpt_auth() { return None; diff --git a/codex-rs/core/src/agent_identity/task_registration.rs b/codex-rs/core/src/agent_identity/task_registration.rs new file mode 100644 index 000000000000..4fc5d51282d3 --- /dev/null +++ b/codex-rs/core/src/agent_identity/task_registration.rs @@ -0,0 +1,470 @@ +use std::time::Duration; + +use anyhow::Context; +use anyhow::Result; +use crypto_box::SecretKey as Curve25519SecretKey; +use ed25519_dalek::Signer as _; +use serde::Deserialize; +use serde::Serialize; +use sha2::Digest as _; +use sha2::Sha512; +use tracing::info; + +use super::*; + +const AGENT_TASK_REGISTRATION_TIMEOUT: Duration = Duration::from_secs(15); + +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) struct RegisteredAgentTask { + pub(crate) binding_id: String, + pub(crate) chatgpt_account_id: String, + pub(crate) chatgpt_user_id: Option, + pub(crate) agent_runtime_id: String, + pub(crate) task_id: String, + pub(crate) registered_at: String, +} + +#[derive(Debug, Serialize)] +struct RegisterTaskRequest { + signature: String, + timestamp: String, +} + +#[derive(Debug, Deserialize)] +struct RegisterTaskResponse { + encrypted_task_id: String, +} + +impl AgentIdentityManager { + pub(crate) async fn register_task(&self) -> Result> { + if !self.feature_enabled { + return Ok(None); + } + + let Some((auth, binding)) = self.current_auth_binding().await else { + return Ok(None); + }; + + self.register_task_for_binding(auth, binding).await + } + + async fn register_task_for_binding( + &self, + auth: CodexAuth, + binding: AgentIdentityBinding, + ) -> Result> { + let stored_identity = self + .ensure_registered_identity_for_binding(&auth, &binding) + .await?; + + let timestamp = Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true); + let request_body = RegisterTaskRequest { + signature: sign_task_registration_payload(&stored_identity, ×tamp)?, + timestamp, + }; + + let client = create_client(); + let url = + agent_task_registration_url(&self.chatgpt_base_url, &stored_identity.agent_runtime_id); + let human_biscuit = self.mint_human_biscuit(&binding, "POST", &url).await?; + let response = client + .post(&url) + .header("X-OpenAI-Authorization", human_biscuit) + .json(&request_body) + .timeout(AGENT_TASK_REGISTRATION_TIMEOUT) + .send() + .await + .with_context(|| format!("failed to send agent task registration request to {url}"))?; + + if response.status().is_success() { + let response_body = response + .json::() + .await + .with_context(|| format!("failed to parse agent task response from {url}"))?; + let registered_task = RegisteredAgentTask { + binding_id: stored_identity.binding_id.clone(), + chatgpt_account_id: stored_identity.chatgpt_account_id.clone(), + chatgpt_user_id: stored_identity.chatgpt_user_id.clone(), + agent_runtime_id: stored_identity.agent_runtime_id.clone(), + task_id: decrypt_task_id_response( + &stored_identity, + &response_body.encrypted_task_id, + )?, + registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + }; + info!( + agent_runtime_id = %registered_task.agent_runtime_id, + task_id = %registered_task.task_id, + "registered agent task" + ); + return Ok(Some(registered_task)); + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + anyhow::bail!("agent task registration failed with status {status} from {url}: {body}") + } +} + +impl RegisteredAgentTask { + pub(super) fn matches_binding(&self, binding: &AgentIdentityBinding) -> bool { + binding.matches_parts( + &self.binding_id, + &self.chatgpt_account_id, + self.chatgpt_user_id.as_deref(), + ) + } + + pub(crate) fn has_same_binding(&self, other: &Self) -> bool { + self.binding_id == other.binding_id + && self.chatgpt_account_id == other.chatgpt_account_id + && self.chatgpt_user_id == other.chatgpt_user_id + } +} + +fn sign_task_registration_payload( + stored_identity: &StoredAgentIdentity, + timestamp: &str, +) -> Result { + let signing_key = stored_identity.signing_key()?; + let payload = format!("{}:{timestamp}", stored_identity.agent_runtime_id); + Ok(BASE64_STANDARD.encode(signing_key.sign(payload.as_bytes()).to_bytes())) +} + +fn decrypt_task_id_response( + stored_identity: &StoredAgentIdentity, + encrypted_task_id: &str, +) -> Result { + let signing_key = stored_identity.signing_key()?; + let ciphertext = BASE64_STANDARD + .decode(encrypted_task_id) + .context("encrypted task id is not valid base64")?; + let plaintext = curve25519_secret_key_from_signing_key(&signing_key) + .unseal(&ciphertext) + .map_err(|_| anyhow::anyhow!("failed to decrypt encrypted task id"))?; + String::from_utf8(plaintext).context("decrypted task id is not valid UTF-8") +} + +fn curve25519_secret_key_from_signing_key(signing_key: &SigningKey) -> Curve25519SecretKey { + let digest = Sha512::digest(signing_key.to_bytes()); + let mut secret_key = [0u8; 32]; + secret_key.copy_from_slice(&digest[..32]); + secret_key[0] &= 248; + secret_key[31] &= 127; + secret_key[31] |= 64; + Curve25519SecretKey::from(secret_key) +} + +fn agent_task_registration_url(chatgpt_base_url: &str, agent_runtime_id: &str) -> String { + let trimmed = chatgpt_base_url.trim_end_matches('/'); + format!("{trimmed}/v1/agent/{agent_runtime_id}/task/register") +} + +#[cfg(test)] +mod tests { + use base64::engine::general_purpose::URL_SAFE_NO_PAD; + use codex_app_server_protocol::AuthMode as ApiAuthMode; + use codex_login::AuthCredentialsStoreMode; + use codex_login::AuthDotJson; + use codex_login::save_auth; + use codex_login::token_data::IdTokenInfo; + use codex_login::token_data::TokenData; + use pretty_assertions::assert_eq; + use wiremock::Mock; + use wiremock::MockServer; + use wiremock::ResponseTemplate; + use wiremock::matchers::header; + use wiremock::matchers::method; + use wiremock::matchers::path; + + use super::*; + + #[tokio::test] + async fn register_task_skips_when_feature_is_disabled() { + let auth_manager = + AuthManager::from_auth_for_testing(make_chatgpt_auth("account-123", Some("user-123"))); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ false, + "https://chatgpt.com/backend-api/".to_string(), + SessionSource::Cli, + ); + + assert_eq!(manager.register_task().await.unwrap(), None); + } + + #[tokio::test] + async fn register_task_skips_for_api_key_auth() { + let auth_manager = AuthManager::from_auth_for_testing(CodexAuth::from_api_key("test-key")); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + "https://chatgpt.com/backend-api/".to_string(), + SessionSource::Cli, + ); + + assert_eq!(manager.register_task().await.unwrap(), None); + } + + #[tokio::test] + async fn register_task_registers_and_decrypts_plaintext_task_id() { + let server = MockServer::start().await; + let chatgpt_base_url = server.uri(); + mount_human_biscuit(&server, &chatgpt_base_url, "agent-123").await; + let auth = make_chatgpt_auth("account-123", Some("user-123")); + let auth_manager = AuthManager::from_auth_for_testing(auth.clone()); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + chatgpt_base_url, + SessionSource::Cli, + ); + let stored_identity = seed_stored_identity(&manager, &auth, "agent-123", "account-123"); + let encrypted_task_id = + encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext"); + + Mock::given(method("POST")) + .and(path("/v1/agent/agent-123/task/register")) + .and(header("x-openai-authorization", "human-biscuit")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "encrypted_task_id": encrypted_task_id, + }))) + .expect(1) + .mount(&server) + .await; + + let task = manager + .register_task() + .await + .unwrap() + .expect("task should be registered"); + + assert_eq!( + task, + RegisteredAgentTask { + binding_id: "chatgpt-account-account-123".to_string(), + chatgpt_account_id: "account-123".to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: "agent-123".to_string(), + task_id: "task_123".to_string(), + registered_at: task.registered_at.clone(), + } + ); + } + + #[tokio::test] + async fn register_task_uses_chatgpt_base_url() { + let server = MockServer::start().await; + let chatgpt_base_url = format!("{}/backend-api", server.uri()); + mount_human_biscuit(&server, &chatgpt_base_url, "agent-fallback").await; + let auth = make_chatgpt_auth("account-123", Some("user-123")); + let auth_manager = AuthManager::from_auth_for_testing(auth.clone()); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + chatgpt_base_url, + SessionSource::Cli, + ); + let stored_identity = + seed_stored_identity(&manager, &auth, "agent-fallback", "account-123"); + let encrypted_task_id = encrypt_task_id_for_identity(&stored_identity, "task_fallback") + .expect("task ciphertext"); + + Mock::given(method("POST")) + .and(path("/backend-api/v1/agent/agent-fallback/task/register")) + .and(header("x-openai-authorization", "human-biscuit")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "encrypted_task_id": encrypted_task_id, + }))) + .expect(1) + .mount(&server) + .await; + + let task = manager + .register_task() + .await + .unwrap() + .expect("task should be registered"); + + assert_eq!(task.agent_runtime_id, "agent-fallback"); + assert_eq!(task.task_id, "task_fallback"); + } + + #[tokio::test] + async fn register_task_for_binding_keeps_one_auth_snapshot() { + let server = MockServer::start().await; + let chatgpt_base_url = server.uri(); + mount_human_biscuit(&server, &chatgpt_base_url, "agent-123").await; + let binding_auth = make_chatgpt_auth("account-123", Some("user-123")); + let auth_manager = + AuthManager::from_auth_for_testing(make_chatgpt_auth("account-456", Some("user-456"))); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + chatgpt_base_url, + SessionSource::Cli, + ); + let stored_identity = + seed_stored_identity(&manager, &binding_auth, "agent-123", "account-123"); + let encrypted_task_id = + encrypt_task_id_for_identity(&stored_identity, "task_123").expect("task ciphertext"); + let binding = + AgentIdentityBinding::from_auth(&binding_auth, /*forced_workspace_id*/ None) + .expect("binding"); + + Mock::given(method("POST")) + .and(path("/v1/agent/agent-123/task/register")) + .and(header("x-openai-authorization", "human-biscuit")) + .respond_with(ResponseTemplate::new(200).set_body_json(serde_json::json!({ + "encrypted_task_id": encrypted_task_id, + }))) + .expect(1) + .mount(&server) + .await; + + let task = manager + .register_task_for_binding(binding_auth, binding) + .await + .unwrap() + .expect("task should be registered"); + + assert_eq!( + task, + RegisteredAgentTask { + binding_id: "chatgpt-account-account-123".to_string(), + chatgpt_account_id: "account-123".to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: "agent-123".to_string(), + task_id: "task_123".to_string(), + registered_at: task.registered_at.clone(), + } + ); + } + + #[tokio::test] + async fn task_matches_current_binding_rejects_stale_auth_binding() { + let auth_manager = + AuthManager::from_auth_for_testing(make_chatgpt_auth("account-456", Some("user-456"))); + let manager = AgentIdentityManager::new_for_tests( + auth_manager, + /*feature_enabled*/ true, + "https://chatgpt.com/backend-api/".to_string(), + SessionSource::Cli, + ); + let task = RegisteredAgentTask { + binding_id: "chatgpt-account-account-123".to_string(), + chatgpt_account_id: "account-123".to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: "agent-123".to_string(), + task_id: "task_123".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + }; + + assert!(!manager.task_matches_current_binding(&task).await); + } + + async fn mount_human_biscuit( + server: &MockServer, + chatgpt_base_url: &str, + agent_runtime_id: &str, + ) { + let biscuit_url = agent_identity_biscuit_url(chatgpt_base_url); + let biscuit_path = reqwest::Url::parse(&biscuit_url) + .expect("biscuit URL parses") + .path() + .to_string(); + let target_url = agent_task_registration_url(chatgpt_base_url, agent_runtime_id); + Mock::given(method("GET")) + .and(path(biscuit_path)) + .and(header("authorization", "Bearer access-token-account-123")) + .and(header("x-original-method", "POST")) + .and(header("x-original-url", target_url)) + .respond_with( + ResponseTemplate::new(200).insert_header("x-openai-authorization", "human-biscuit"), + ) + .expect(1) + .mount(server) + .await; + } + + fn seed_stored_identity( + manager: &AgentIdentityManager, + auth: &CodexAuth, + agent_runtime_id: &str, + account_id: &str, + ) -> StoredAgentIdentity { + let key_material = generate_agent_key_material().expect("key material"); + let binding = + AgentIdentityBinding::from_auth(auth, /*forced_workspace_id*/ None).expect("binding"); + let stored_identity = StoredAgentIdentity { + binding_id: binding.binding_id, + chatgpt_account_id: account_id.to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: agent_runtime_id.to_string(), + private_key_pkcs8_base64: key_material.private_key_pkcs8_base64, + public_key_ssh: key_material.public_key_ssh, + registered_at: Utc::now().to_rfc3339_opts(SecondsFormat::Secs, true), + abom: manager.abom.clone(), + }; + manager + .store_identity(auth, &stored_identity) + .expect("store identity"); + let persisted = auth + .get_agent_identity(account_id) + .expect("persisted identity"); + assert_eq!(persisted.agent_runtime_id, agent_runtime_id); + stored_identity + } + + fn encrypt_task_id_for_identity( + stored_identity: &StoredAgentIdentity, + task_id: &str, + ) -> Result { + let mut rng = crypto_box::aead::OsRng; + let public_key = + curve25519_secret_key_from_signing_key(&stored_identity.signing_key()?).public_key(); + let ciphertext = public_key + .seal(&mut rng, task_id.as_bytes()) + .map_err(|_| anyhow::anyhow!("failed to encrypt test task id"))?; + Ok(BASE64_STANDARD.encode(ciphertext)) + } + + fn make_chatgpt_auth(account_id: &str, user_id: Option<&str>) -> CodexAuth { + let tempdir = tempfile::tempdir().expect("tempdir"); + let auth_json = AuthDotJson { + auth_mode: Some(ApiAuthMode::Chatgpt), + openai_api_key: None, + tokens: Some(TokenData { + id_token: IdTokenInfo { + email: None, + chatgpt_plan_type: None, + chatgpt_user_id: user_id.map(ToOwned::to_owned), + chatgpt_account_id: Some(account_id.to_string()), + chatgpt_account_is_fedramp: false, + raw_jwt: fake_id_token(account_id, user_id), + }, + access_token: format!("access-token-{account_id}"), + refresh_token: "refresh-token".to_string(), + account_id: Some(account_id.to_string()), + }), + last_refresh: Some(Utc::now()), + agent_identity: None, + }; + save_auth(tempdir.path(), &auth_json, AuthCredentialsStoreMode::File).expect("save auth"); + CodexAuth::from_auth_storage(tempdir.path(), AuthCredentialsStoreMode::File) + .expect("load auth") + .expect("auth") + } + + fn fake_id_token(account_id: &str, user_id: Option<&str>) -> String { + let header = URL_SAFE_NO_PAD.encode(r#"{"alg":"none","typ":"JWT"}"#); + let payload = serde_json::json!({ + "https://api.openai.com/auth": { + "chatgpt_user_id": user_id, + "chatgpt_account_id": account_id, + } + }); + let payload = URL_SAFE_NO_PAD.encode(payload.to_string()); + format!("{header}.{payload}.signature") + } +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 50a4c25abd1c..67bbed1c7e64 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -14,6 +14,7 @@ use crate::agent::MailboxReceiver; use crate::agent::agent_status_from_event; use crate::agent::status::is_final; use crate::agent_identity::AgentIdentityManager; +use crate::agent_identity::RegisteredAgentTask; use crate::apps::render_apps_section; use crate::commit_attribution::commit_message_trailer_instruction; use crate::compact; @@ -1577,7 +1578,7 @@ impl Session { async fn fail_agent_identity_registration(self: &Arc, error: anyhow::Error) { warn!(error = %error, "agent identity registration failed"); let message = format!( - "Agent identity registration failed. Codex cannot continue while `features.use_agent_identity` is enabled: {error}" + "Agent identity registration failed while `features.use_agent_identity` is enabled: {error}" ); self.send_event_raw(Event { id: self.next_internal_sub_id(), @@ -1587,7 +1588,90 @@ impl Session { }), }) .await; - handlers::shutdown(self, self.next_internal_sub_id()).await; + } + + async fn cached_agent_task_for_current_binding(&self) -> Option { + let agent_task = { + let state = self.state.lock().await; + state.agent_task() + }?; + + if self + .services + .agent_identity_manager + .task_matches_current_binding(&agent_task) + .await + { + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "reusing cached agent task" + ); + return Some(agent_task); + } + + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "discarding cached agent task because auth binding changed" + ); + let mut state = self.state.lock().await; + if state.agent_task().as_ref() == Some(&agent_task) { + state.clear_agent_task(); + } + None + } + + async fn ensure_agent_task_registered(&self) -> anyhow::Result> { + if let Some(agent_task) = self.cached_agent_task_for_current_binding().await { + return Ok(Some(agent_task)); + } + + for _ in 0..2 { + let Some(agent_task) = self.services.agent_identity_manager.register_task().await? + else { + return Ok(None); + }; + + if !self + .services + .agent_identity_manager + .task_matches_current_binding(&agent_task) + .await + { + debug!( + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "discarding newly registered agent task because auth binding changed" + ); + continue; + } + + { + let mut state = self.state.lock().await; + if let Some(existing_agent_task) = state.agent_task() { + if existing_agent_task.has_same_binding(&agent_task) { + return Ok(Some(existing_agent_task)); + } + debug!( + agent_runtime_id = %existing_agent_task.agent_runtime_id, + task_id = %existing_agent_task.task_id, + "replacing cached agent task because auth binding changed" + ); + } + state.set_agent_task(agent_task.clone()); + } + + info!( + thread_id = %self.conversation_id, + agent_runtime_id = %agent_task.agent_runtime_id, + task_id = %agent_task.task_id, + "registered agent task for thread" + ); + return Ok(Some(agent_task)); + } + + Ok(None) } #[allow(clippy::too_many_arguments)] @@ -6406,6 +6490,20 @@ pub(crate) async fn run_turn( })) .await; } + if let Err(error) = sess.ensure_agent_task_registered().await { + warn!(error = %error, "agent task registration failed"); + sess.send_event( + turn_context.as_ref(), + EventMsg::Error(ErrorEvent { + message: format!( + "Agent task registration failed. Please try again; Codex will attempt to register the task again on the next turn: {error}" + ), + codex_error_info: Some(CodexErrorInfo::Other), + }), + ) + .await; + return None; + } if !skill_items.is_empty() { sess.record_conversation_items(&turn_context, &skill_items) diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index 6f4276b07748..404f64c53fe2 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -4182,7 +4182,7 @@ pub(crate) async fn make_session_and_context_with_rx() -> ( } #[tokio::test] -async fn fail_agent_identity_registration_emits_error_and_shutdown() { +async fn fail_agent_identity_registration_emits_error_without_shutdown() { let (session, _turn_context, rx_event) = make_session_and_context_with_rx().await; session @@ -4200,21 +4200,14 @@ async fn fail_agent_identity_registration_emits_error_and_shutdown() { }) => { assert_eq!( message, - "Agent identity registration failed. Codex cannot continue while `features.use_agent_identity` is enabled: registration exploded".to_string() + "Agent identity registration failed while `features.use_agent_identity` is enabled: registration exploded".to_string() ); assert_eq!(codex_error_info, Some(CodexErrorInfo::Other)); } other => panic!("expected error event, got {other:?}"), } - let shutdown_event = timeout(Duration::from_secs(1), rx_event.recv()) - .await - .expect("shutdown event should arrive") - .expect("shutdown event should be readable"); - match shutdown_event.msg { - EventMsg::ShutdownComplete => {} - other => panic!("expected shutdown event, got {other:?}"), - } + assert!(rx_event.try_recv().is_err()); } #[tokio::test] diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index 4360b16de4cc..42ed15e87ae7 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -6,6 +6,7 @@ use codex_sandboxing::policy_transforms::merge_permission_profiles; use std::collections::HashMap; use std::collections::HashSet; +use crate::agent_identity::RegisteredAgentTask; use crate::codex::PreviousTurnSettings; use crate::codex::SessionConfiguration; use crate::context_manager::ContextManager; @@ -30,6 +31,7 @@ pub(crate) struct SessionState { previous_turn_settings: Option, /// Startup prewarmed session prepared during session initialization. pub(crate) startup_prewarm: Option, + pub(crate) agent_task: Option, pub(crate) active_connector_selection: HashSet, pub(crate) pending_session_start_source: Option, granted_permissions: Option, @@ -49,6 +51,7 @@ impl SessionState { mcp_dependency_prompted: HashSet::new(), previous_turn_settings: None, startup_prewarm: None, + agent_task: None, active_connector_selection: HashSet::new(), pending_session_start_source: None, granted_permissions: None, @@ -186,6 +189,18 @@ impl SessionState { self.startup_prewarm.take() } + pub(crate) fn agent_task(&self) -> Option { + self.agent_task.clone() + } + + pub(crate) fn set_agent_task(&mut self, agent_task: RegisteredAgentTask) { + self.agent_task = Some(agent_task); + } + + pub(crate) fn clear_agent_task(&mut self) { + self.agent_task = None; + } + // Adds connector IDs to the active set and returns the merged selection. pub(crate) fn merge_connector_selection(&mut self, connector_ids: I) -> HashSet where diff --git a/codex-rs/core/src/state/session_tests.rs b/codex-rs/core/src/state/session_tests.rs index 1af7ccc8f60a..6816c8731dab 100644 --- a/codex-rs/core/src/state/session_tests.rs +++ b/codex-rs/core/src/state/session_tests.rs @@ -1,4 +1,5 @@ use super::*; +use crate::agent_identity::RegisteredAgentTask; use crate::codex::make_session_configuration_for_tests; use codex_protocol::protocol::CreditsSnapshot; use codex_protocol::protocol::RateLimitWindow; @@ -33,6 +34,43 @@ async fn clear_connector_selection_removes_entries() { assert_eq!(state.get_connector_selection(), HashSet::new()); } +#[tokio::test] +async fn set_agent_task_persists_plaintext_task_for_session_reuse() { + let session_configuration = make_session_configuration_for_tests().await; + let mut state = SessionState::new(session_configuration); + let agent_task = RegisteredAgentTask { + binding_id: "chatgpt-account-account-123".to_string(), + chatgpt_account_id: "account-123".to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: "agent_123".to_string(), + task_id: "task_123".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + }; + + state.set_agent_task(agent_task.clone()); + + assert_eq!(state.agent_task(), Some(agent_task)); +} + +#[tokio::test] +async fn clear_agent_task_removes_cached_task() { + let session_configuration = make_session_configuration_for_tests().await; + let mut state = SessionState::new(session_configuration); + let agent_task = RegisteredAgentTask { + binding_id: "chatgpt-account-account-123".to_string(), + chatgpt_account_id: "account-123".to_string(), + chatgpt_user_id: Some("user-123".to_string()), + agent_runtime_id: "agent_123".to_string(), + task_id: "task_123".to_string(), + registered_at: "2026-03-23T12:00:00Z".to_string(), + }; + + state.set_agent_task(agent_task); + state.clear_agent_task(); + + assert_eq!(state.agent_task(), None); +} + #[tokio::test] async fn set_rate_limits_defaults_limit_id_to_codex_when_missing() { let session_configuration = make_session_configuration_for_tests().await; diff --git a/codex-rs/core/tests/suite/realtime_conversation.rs b/codex-rs/core/tests/suite/realtime_conversation.rs index 1cd97391bbe6..35ec9668a3e2 100644 --- a/codex-rs/core/tests/suite/realtime_conversation.rs +++ b/codex-rs/core/tests/suite/realtime_conversation.rs @@ -15,6 +15,7 @@ use codex_protocol::protocol::ConversationStartTransport; use codex_protocol::protocol::ConversationTextParams; use codex_protocol::protocol::ErrorEvent; use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::GitInfo; use codex_protocol::protocol::InitialHistory; use codex_protocol::protocol::Op; use codex_protocol::protocol::RealtimeAudioFrame; @@ -24,7 +25,11 @@ use codex_protocol::protocol::RealtimeEvent; use codex_protocol::protocol::RealtimeOutputModality; use codex_protocol::protocol::RealtimeVoice; use codex_protocol::protocol::RolloutItem; +use codex_protocol::protocol::RolloutLine; +use codex_protocol::protocol::SessionMeta; +use codex_protocol::protocol::SessionMetaLine; use codex_protocol::protocol::SessionSource; +use codex_protocol::protocol::UserMessageEvent; use codex_protocol::user_input::UserInput; use codex_utils_output_truncation::approx_token_count; use core_test_support::responses; @@ -65,6 +70,7 @@ const MEMORY_PROMPT_PHRASE: &str = "You have access to a memory folder with guidance from prior runs."; const REALTIME_CONVERSATION_TEST_SUBPROCESS_ENV_VAR: &str = "CODEX_REALTIME_CONVERSATION_TEST_SUBPROCESS"; +const WEBSOCKET_REQUEST_TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug, Clone)] struct RealtimeCallRequestCapture { @@ -141,7 +147,7 @@ async fn wait_for_matching_websocket_request( where F: Fn(&core_test_support::responses::WebSocketRequest) -> bool, { - let deadline = tokio::time::Instant::now() + Duration::from_secs(10); + let deadline = tokio::time::Instant::now() + WEBSOCKET_REQUEST_TIMEOUT; loop { if let Some(request) = server .connections() @@ -196,16 +202,18 @@ async fn seed_recent_thread( let db = test.codex.state_db().context("state db enabled")?; let thread_id = ThreadId::new(); let updated_at = Utc::now(); - let rollout_path = test + let rollout_dir = test .codex_home_path() - .join(format!("rollout-{thread_id}.jsonl")); - // This helper seeds SQLite metadata directly. Local listing drops stale metadata rows whose - // rollout path no longer exists, so create the placeholder path that the test metadata points - // at without exercising rollout writing in this realtime-context test. - std::fs::write(&rollout_path, "")?; + .join("sessions") + .join(updated_at.format("%Y/%m/%d").to_string()); + fs::create_dir_all(&rollout_dir)?; + let rollout_path = rollout_dir.join(format!( + "rollout-{}-{thread_id}.jsonl", + updated_at.format("%Y-%m-%dT%H-%M-%S") + )); let mut metadata_builder = codex_state::ThreadMetadataBuilder::new( thread_id, - rollout_path, + rollout_path.clone(), updated_at, SessionSource::Cli, ); @@ -215,6 +223,45 @@ async fn seed_recent_thread( let mut metadata = metadata_builder.build("test-provider"); metadata.title = title.to_string(); metadata.first_user_message = Some(first_user_message.to_string()); + + let timestamp = updated_at.to_rfc3339(); + let session_meta = RolloutLine { + timestamp: timestamp.clone(), + item: RolloutItem::SessionMeta(SessionMetaLine { + meta: SessionMeta { + id: thread_id, + timestamp: timestamp.clone(), + cwd: metadata.cwd.clone(), + originator: "cli".to_string(), + cli_version: "0.0.0".to_string(), + source: SessionSource::Cli, + model_provider: Some("test-provider".to_string()), + ..Default::default() + }, + git: Some(GitInfo { + commit_hash: None, + branch: metadata.git_branch.clone(), + repository_url: None, + }), + }), + }; + let user_message = RolloutLine { + timestamp, + item: RolloutItem::EventMsg(EventMsg::UserMessage(UserMessageEvent { + message: first_user_message.to_string(), + images: None, + local_images: Vec::new(), + text_elements: Vec::new(), + })), + }; + fs::write( + &rollout_path, + format!( + "{}\n{}\n", + serde_json::to_string(&session_meta)?, + serde_json::to_string(&user_message)? + ), + )?; db.upsert_thread(&metadata).await?; Ok(())