diff --git a/codex-rs/app-server-protocol/schema/json/ServerNotification.json b/codex-rs/app-server-protocol/schema/json/ServerNotification.json index 4f5410c782c..16a030e3da9 100644 --- a/codex-rs/app-server-protocol/schema/json/ServerNotification.json +++ b/codex-rs/app-server-protocol/schema/json/ServerNotification.json @@ -2865,6 +2865,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json index e4ced5070cc..02715050df9 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.schemas.json @@ -13127,6 +13127,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json index d1f3cbda550..e74d3568e48 100644 --- a/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json +++ b/codex-rs/app-server-protocol/schema/json/codex_app_server_protocol.v2.schemas.json @@ -10982,6 +10982,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json index 2883670c88f..5bf26c13961 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ItemCompletedNotification.json @@ -684,6 +684,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json index c2e71ccba91..ef4b58811e1 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ItemStartedNotification.json @@ -684,6 +684,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json index a7fe2e8d601..75a3a347178 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ReviewStartResponse.json @@ -827,6 +827,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json index 9b1870198a7..efb40b52192 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadForkResponse.json @@ -1341,6 +1341,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json index 426f34ce35c..27aa211fa1a 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadListResponse.json @@ -1099,6 +1099,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json index c869a797498..22095e8605d 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadMetadataUpdateResponse.json @@ -1099,6 +1099,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json index 9569860c387..805598de498 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadReadResponse.json @@ -1099,6 +1099,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json index 5143545ec18..b6c1fe93612 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadResumeResponse.json @@ -1341,6 +1341,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json index 502dd3961fd..da605b1ddba 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadRollbackResponse.json @@ -1099,6 +1099,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json index c07b4d82584..4e5a2e9f70c 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartResponse.json @@ -1341,6 +1341,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json index ff87af20698..e239288d568 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadStartedNotification.json @@ -1099,6 +1099,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json index daf821c3747..f0b7c1b8f65 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/ThreadUnarchiveResponse.json @@ -1099,6 +1099,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json index 82c2b3c76ce..b80896d8caa 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnCompletedNotification.json @@ -827,6 +827,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json b/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json index ebb2065cb8a..b2bf7887edc 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnStartResponse.json @@ -827,6 +827,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json b/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json index 8b7c2bc4105..338b1d57e39 100644 --- a/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json +++ b/codex-rs/app-server-protocol/schema/json/v2/TurnStartedNotification.json @@ -827,6 +827,14 @@ "null" ] }, + "hostId": { + "default": null, + "description": "The configured exec host, when the command ran outside the local host.", + "type": [ + "string", + "null" + ] + }, "id": { "type": "string" }, diff --git a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts index 9202f3728f0..a47c30cbcca 100644 --- a/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts +++ b/codex-rs/app-server-protocol/schema/typescript/v2/ThreadItem.ts @@ -34,7 +34,11 @@ cwd: string, /** * Identifier for the underlying PTY process (when available). */ -processId: string | null, source: CommandExecutionSource, status: CommandExecutionStatus, +processId: string | null, +/** + * The configured exec host, when the command ran outside the local host. + */ +hostId: string | null, source: CommandExecutionSource, status: CommandExecutionStatus, /** * A best-effort parsing of the command to understand the action(s) it will perform. * This returns a list of CommandAction objects because a single shell command may diff --git a/codex-rs/app-server-protocol/src/protocol/item_builders.rs b/codex-rs/app-server-protocol/src/protocol/item_builders.rs index 9853f69f408..80520b13ad6 100644 --- a/codex-rs/app-server-protocol/src/protocol/item_builders.rs +++ b/codex-rs/app-server-protocol/src/protocol/item_builders.rs @@ -71,6 +71,7 @@ pub fn build_command_execution_approval_request_item( command: shlex_join(&payload.command), cwd: payload.cwd.clone(), process_id: None, + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::InProgress, command_actions: payload @@ -91,6 +92,7 @@ pub fn build_command_execution_begin_item(payload: &ExecCommandBeginEvent) -> Th command: shlex_join(&payload.command), cwd: payload.cwd.clone(), process_id: payload.process_id.clone(), + host_id: payload.host_id.clone(), source: payload.source.into(), status: CommandExecutionStatus::InProgress, command_actions: payload @@ -118,6 +120,7 @@ pub fn build_command_execution_end_item(payload: &ExecCommandEndEvent) -> Thread command: shlex_join(&payload.command), cwd: payload.cwd.clone(), process_id: payload.process_id.clone(), + host_id: payload.host_id.clone(), source: payload.source.into(), status: (&payload.status).into(), command_actions: payload @@ -151,6 +154,7 @@ pub fn build_item_from_guardian_event( command, cwd: cwd.clone(), process_id: None, + host_id: None, source: CommandExecutionSource::Agent, status, command_actions, @@ -183,6 +187,7 @@ pub fn build_item_from_guardian_event( command, cwd: cwd.clone(), process_id: None, + host_id: None, source: CommandExecutionSource::Agent, status, command_actions, diff --git a/codex-rs/app-server-protocol/src/protocol/thread_history.rs b/codex-rs/app-server-protocol/src/protocol/thread_history.rs index b0056579687..24770b425bc 100644 --- a/codex-rs/app-server-protocol/src/protocol/thread_history.rs +++ b/codex-rs/app-server-protocol/src/protocol/thread_history.rs @@ -1790,6 +1790,7 @@ mod tests { cmd: "echo hello world".into(), }], source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, stdout: String::new(), stderr: String::new(), @@ -1836,6 +1837,7 @@ mod tests { command: "echo 'hello world'".into(), cwd: PathBuf::from("/tmp"), process_id: Some("pid-1".into()), + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Completed, command_actions: vec![CommandAction::Unknown { @@ -2007,6 +2009,7 @@ mod tests { cwd: PathBuf::from("/tmp"), parsed_cmd: vec![ParsedCommand::Unknown { cmd: "ls".into() }], source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, stdout: String::new(), stderr: "exec command rejected by user".into(), @@ -2048,6 +2051,7 @@ mod tests { command: "ls".into(), cwd: PathBuf::from("/tmp"), process_id: Some("pid-2".into()), + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Declined, command_actions: vec![CommandAction::Unknown { @@ -2133,6 +2137,7 @@ mod tests { command: "rm -rf /tmp/guardian".into(), cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Declined, command_actions: vec![CommandAction::Unknown { @@ -2192,6 +2197,7 @@ mod tests { command: "/bin/rm -f /tmp/file.sqlite".into(), cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::InProgress, command_actions: vec![CommandAction::Unknown { @@ -2247,6 +2253,7 @@ mod tests { cmd: "echo done".into(), }], source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, stdout: "done\n".into(), stderr: String::new(), @@ -2281,6 +2288,7 @@ mod tests { command: "echo done".into(), cwd: PathBuf::from("/tmp"), process_id: Some("pid-42".into()), + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::Completed, command_actions: vec![CommandAction::Unknown { @@ -2336,6 +2344,7 @@ mod tests { cmd: "echo done".into(), }], source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, stdout: "done\n".into(), stderr: String::new(), diff --git a/codex-rs/app-server-protocol/src/protocol/v2.rs b/codex-rs/app-server-protocol/src/protocol/v2.rs index 66ac1a60201..13b01cce4cb 100644 --- a/codex-rs/app-server-protocol/src/protocol/v2.rs +++ b/codex-rs/app-server-protocol/src/protocol/v2.rs @@ -4409,6 +4409,9 @@ pub enum ThreadItem { cwd: PathBuf, /// Identifier for the underlying PTY process (when available). process_id: Option, + /// The configured exec host, when the command ran outside the local host. + #[serde(default)] + host_id: Option, #[serde(default)] source: CommandExecutionSource, status: CommandExecutionStatus, diff --git a/codex-rs/app-server-protocol/src/schema_fixtures.rs b/codex-rs/app-server-protocol/src/schema_fixtures.rs index 56dcf33a4d4..731a9bcf950 100644 --- a/codex-rs/app-server-protocol/src/schema_fixtures.rs +++ b/codex-rs/app-server-protocol/src/schema_fixtures.rs @@ -133,7 +133,7 @@ fn read_file_bytes(path: &Path) -> Result> { // fixture test is platform-independent. let text = String::from_utf8(bytes) .with_context(|| format!("expected UTF-8 TypeScript in {}", path.display()))?; - let text = text.replace("\r\n", "\n").replace('\r', "\n"); + let text = normalize_typescript_fixture_text(&text); // Fixture comparisons care about schema content, not whether the generator // re-prepended the standard banner to every TypeScript file. let text = text @@ -276,10 +276,7 @@ fn collect_typescript_fixture_file( let contents = T::export_to_string().context("export TypeScript fixture content")?; let output_path = normalize_relative_fixture_path(&output_path); - files.insert( - output_path, - contents.replace("\r\n", "\n").replace('\r', "\n"), - ); + files.insert(output_path, normalize_typescript_fixture_text(&contents)); let mut visitor = TypeScriptFixtureCollector { files, @@ -294,6 +291,20 @@ fn collect_typescript_fixture_file( Ok(()) } +fn normalize_typescript_fixture_text(text: &str) -> String { + let text = text.replace("\r\n", "\n").replace('\r', "\n"); + let mut normalized = String::with_capacity(text.len()); + for line in text.split_inclusive('\n') { + if let Some(line) = line.strip_suffix('\n') { + normalized.push_str(line.trim_end_matches([' ', '\t'])); + normalized.push('\n'); + } else { + normalized.push_str(line.trim_end_matches([' ', '\t'])); + } + } + normalized +} + fn normalize_relative_fixture_path(path: &Path) -> PathBuf { path.components().collect() } diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index ce71e8974dd..7dd2427af92 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -1618,6 +1618,7 @@ pub(crate) async fn apply_bespoke_event_handling( let command = shlex_join(&exec_command_begin_event.command); let cwd = exec_command_begin_event.cwd; let process_id = exec_command_begin_event.process_id; + let host_id = exec_command_begin_event.host_id; let first_start = { let mut state = thread_state.lock().await; state @@ -1631,6 +1632,7 @@ pub(crate) async fn apply_bespoke_event_handling( command, cwd, process_id, + host_id, source: exec_command_begin_event.source.into(), status: CommandExecutionStatus::InProgress, command_actions, @@ -2002,6 +2004,7 @@ async fn start_command_execution_item( command, cwd, process_id: None, + host_id: None, source, status: CommandExecutionStatus::InProgress, command_actions, @@ -2046,6 +2049,7 @@ async fn complete_command_execution_item( command, cwd, process_id, + host_id: None, source, status, command_actions, @@ -3228,6 +3232,7 @@ mod tests { command: completion_item.command.clone(), cwd: completion_item.cwd.clone(), process_id: None, + host_id: None, source: CommandExecutionSource::Agent, status: CommandExecutionStatus::InProgress, command_actions: completion_item.command_actions.clone(), diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 68de25b6f42..fe75410779d 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -675,6 +675,7 @@ impl Codex { mcp_manager.clone(), skills_watcher, agent_control, + environment_manager, environment, ) .await @@ -1603,6 +1604,7 @@ impl Session { mcp_manager: Arc, skills_watcher: Arc, agent_control: AgentControl, + environment_manager: Arc, environment: Option>, ) -> anyhow::Result> { debug!( @@ -2040,6 +2042,7 @@ impl Session { code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), + environment_manager, environment, }; services diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 1e07c6d1ff0..3a8a7302458 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use async_channel::Receiver; use async_channel::Sender; use codex_async_utils::OrCancelExt; -use codex_exec_server::EnvironmentManager; use codex_protocol::protocol::ApplyPatchApprovalRequestEvent; use codex_protocol::protocol::Event; use codex_protocol::protocol::EventMsg; @@ -78,9 +77,7 @@ pub(crate) async fn run_codex_thread_interactive( config, auth_manager, models_manager, - environment_manager: Arc::new(EnvironmentManager::from_environment( - parent_ctx.environment.as_deref(), - )), + environment_manager: Arc::clone(&parent_session.services.environment_manager), skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), diff --git a/codex-rs/core/src/codex_tests.rs b/codex-rs/core/src/codex_tests.rs index cd0895a8b5f..42bb2f5f3b5 100644 --- a/codex-rs/core/src/codex_tests.rs +++ b/codex-rs/core/src/codex_tests.rs @@ -2743,6 +2743,9 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { mcp_manager, Arc::new(SkillsWatcher::noop()), AgentControl::default(), + Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), Some(Arc::new( codex_exec_server::Environment::create(/*exec_server_url*/ None) .await @@ -2901,6 +2904,9 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), + environment_manager: Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), environment: Some(Arc::clone(&environment)), }; let js_repl = Arc::new(JsReplHandle::with_node_path( @@ -3746,6 +3752,9 @@ pub(crate) async fn make_session_and_context_with_dynamic_tools_and_rx( code_mode_service: crate::tools::code_mode::CodeModeService::new( config.js_repl_node_path.clone(), ), + environment_manager: Arc::new(codex_exec_server::EnvironmentManager::new( + /*exec_server_url*/ None, + )), environment: Some(Arc::clone(&environment)), }; let js_repl = Arc::new(JsReplHandle::with_node_path( diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index b64a3b6d6a5..d89985b9a1a 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -16,6 +16,7 @@ use crate::tools::sandboxing::ApprovalStore; use crate::unified_exec::UnifiedExecProcessManager; use codex_analytics::AnalyticsEventsClient; use codex_exec_server::Environment; +use codex_exec_server::EnvironmentManager; use codex_hooks::Hooks; use codex_login::AuthManager; use codex_mcp::McpConnectionManager; @@ -59,5 +60,6 @@ pub(crate) struct SessionServices { /// Session-scoped model client shared across turns. pub(crate) model_client: ModelClient, pub(crate) code_mode_service: CodeModeService, + pub(crate) environment_manager: Arc, pub(crate) environment: Option>, } diff --git a/codex-rs/core/src/tasks/user_shell.rs b/codex-rs/core/src/tasks/user_shell.rs index e0a7703879a..c2eb4b6cf9d 100644 --- a/codex-rs/core/src/tasks/user_shell.rs +++ b/codex-rs/core/src/tasks/user_shell.rs @@ -152,6 +152,7 @@ pub(crate) async fn execute_user_shell_command( cwd: cwd.to_path_buf(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::UserShell, + host_id: None, interaction_input: None, }), ) @@ -220,6 +221,7 @@ pub(crate) async fn execute_user_shell_command( cwd: cwd.to_path_buf(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::UserShell, + host_id: None, interaction_input: None, stdout: String::new(), stderr: aborted_message.clone(), @@ -244,6 +246,7 @@ pub(crate) async fn execute_user_shell_command( cwd: cwd.to_path_buf(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::UserShell, + host_id: None, interaction_input: None, stdout: output.stdout.text.clone(), stderr: output.stderr.text.clone(), @@ -288,6 +291,7 @@ pub(crate) async fn execute_user_shell_command( cwd: cwd.to_path_buf(), parsed_cmd, source: ExecCommandSource::UserShell, + host_id: None, interaction_input: None, stdout: exec_output.stdout.text.clone(), stderr: exec_output.stderr.text.clone(), diff --git a/codex-rs/core/src/tools/events.rs b/codex-rs/core/src/tools/events.rs index 58438cf5636..28c6bb20c4b 100644 --- a/codex-rs/core/src/tools/events.rs +++ b/codex-rs/core/src/tools/events.rs @@ -61,27 +61,20 @@ pub(crate) enum ToolEventFailure { Rejected(String), } -pub(crate) async fn emit_exec_command_begin( - ctx: ToolEventCtx<'_>, - command: &[String], - cwd: &Path, - parsed_cmd: &[ParsedCommand], - source: ExecCommandSource, - interaction_input: Option, - process_id: Option<&str>, -) { +async fn emit_exec_command_begin(ctx: ToolEventCtx<'_>, exec_input: &ExecCommandInput<'_>) { ctx.session .send_event( ctx.turn, EventMsg::ExecCommandBegin(ExecCommandBeginEvent { call_id: ctx.call_id.to_string(), - process_id: process_id.map(str::to_owned), + process_id: exec_input.process_id.map(str::to_owned), turn_id: ctx.turn.sub_id.clone(), - command: command.to_vec(), - cwd: cwd.to_path_buf(), - parsed_cmd: parsed_cmd.to_vec(), - source, - interaction_input, + command: exec_input.command.to_vec(), + cwd: exec_input.cwd.to_path_buf(), + parsed_cmd: exec_input.parsed_cmd.to_vec(), + source: exec_input.source, + host_id: exec_input.host_id.map(str::to_owned), + interaction_input: exec_input.interaction_input.map(str::to_owned), }), ) .await; @@ -105,6 +98,7 @@ pub(crate) enum ToolEmitter { source: ExecCommandSource, parsed_cmd: Vec, process_id: Option, + host_id: Option, }, } @@ -137,6 +131,7 @@ impl ToolEmitter { cwd: PathBuf, source: ExecCommandSource, process_id: Option, + host_id: Option, ) -> Self { let parsed_cmd = parse_command(command); Self::UnifiedExec { @@ -145,6 +140,7 @@ impl ToolEmitter { source, parsed_cmd, process_id, + host_id, } } @@ -169,6 +165,7 @@ impl ToolEmitter { *source, /*interaction_input*/ None, /*process_id*/ None, + /*host_id*/ None, ), stage, ) @@ -266,6 +263,7 @@ impl ToolEmitter { source, parsed_cmd, process_id, + host_id, }, stage, ) => { @@ -278,6 +276,7 @@ impl ToolEmitter { *source, /*interaction_input*/ None, process_id.as_deref(), + host_id.as_deref(), ), stage, ) @@ -370,6 +369,7 @@ struct ExecCommandInput<'a> { source: ExecCommandSource, interaction_input: Option<&'a str>, process_id: Option<&'a str>, + host_id: Option<&'a str>, } impl<'a> ExecCommandInput<'a> { @@ -380,6 +380,7 @@ impl<'a> ExecCommandInput<'a> { source: ExecCommandSource, interaction_input: Option<&'a str>, process_id: Option<&'a str>, + host_id: Option<&'a str>, ) -> Self { Self { command, @@ -388,6 +389,7 @@ impl<'a> ExecCommandInput<'a> { source, interaction_input, process_id, + host_id, } } } @@ -409,16 +411,7 @@ async fn emit_exec_stage( ) { match stage { ToolEventStage::Begin => { - emit_exec_command_begin( - ctx, - exec_input.command, - exec_input.cwd, - exec_input.parsed_cmd, - exec_input.source, - exec_input.interaction_input.map(str::to_owned), - exec_input.process_id, - ) - .await; + emit_exec_command_begin(ctx, &exec_input).await; } ToolEventStage::Success(output) | ToolEventStage::Failure(ToolEventFailure::Output(output)) => { @@ -482,6 +475,7 @@ async fn emit_exec_end( cwd: exec_input.cwd.to_path_buf(), parsed_cmd: exec_input.parsed_cmd.to_vec(), source: exec_input.source, + host_id: exec_input.host_id.map(str::to_owned), interaction_input: exec_input.interaction_input.map(str::to_owned), stdout: exec_result.stdout, stderr: exec_result.stderr, diff --git a/codex-rs/core/src/tools/handlers/unified_exec.rs b/codex-rs/core/src/tools/handlers/unified_exec.rs index fc1ad94b4d7..2d7d5c2b34a 100644 --- a/codex-rs/core/src/tools/handlers/unified_exec.rs +++ b/codex-rs/core/src/tools/handlers/unified_exec.rs @@ -44,6 +44,8 @@ pub(crate) struct ExecCommandArgs { #[serde(default)] shell: Option, #[serde(default)] + host_id: Option, + #[serde(default)] login: Option, #[serde(default = "default_tty")] tty: bool, @@ -176,13 +178,6 @@ impl ToolHandler for UnifiedExecHandler { } }; - let Some(environment) = turn.environment.as_ref() else { - return Err(FunctionCallError::RespondToModel( - "unified exec is unavailable in this session".to_string(), - )); - }; - let fs = environment.get_filesystem(); - let manager: &UnifiedExecProcessManager = &session.services.unified_exec_manager; let context = UnifiedExecContext::new(session.clone(), turn.clone(), call_id.clone()); @@ -190,7 +185,33 @@ impl ToolHandler for UnifiedExecHandler { "exec_command" => { let cwd = resolve_workdir_base_path(&arguments, &context.turn.cwd)?; let args: ExecCommandArgs = parse_arguments_with_base_path(&arguments, &cwd)?; - let workdir = context.turn.resolve_path(args.workdir.clone()); + let resolved_environment = resolve_exec_environment( + session.as_ref(), + turn.as_ref(), + args.host_id.as_deref(), + ) + .await?; + let environment = resolved_environment.environment; + let default_host_cwd = if environment.is_remote() && args.workdir.is_none() { + environment + .default_cwd() + .map(|cwd| { + codex_utils_absolute_path::AbsolutePathBuf::from_absolute_path(cwd) + .map_err(|err| { + FunctionCallError::RespondToModel(format!( + "exec host default cwd `{}` is not usable on this client: {err}", + cwd.display() + )) + }) + }) + .transpose()? + } else { + None + }; + let fs = environment.get_filesystem(); + let workdir = default_host_cwd + .clone() + .unwrap_or_else(|| context.turn.resolve_path(args.workdir.clone())); maybe_emit_implicit_skill_invocation( session.as_ref(), context.turn.as_ref(), @@ -199,14 +220,20 @@ impl ToolHandler for UnifiedExecHandler { ) .await; let process_id = manager.allocate_process_id().await; - let command = get_command( + let command = get_command_with_shell_override( &args, session.user_shell(), &turn.tools_config.unified_exec_shell_mode, turn.tools_config.allow_login_shell, + if environment.is_remote() { + environment.default_shell() + } else { + None + }, ) .map_err(FunctionCallError::RespondToModel)?; let command_for_display = codex_shell_command::parse_command::shlex_join(&command); + let request_host_id = resolved_environment.host_id; let ExecCommandArgs { workdir, @@ -253,7 +280,9 @@ impl ToolHandler for UnifiedExecHandler { let workdir = workdir.filter(|value| !value.is_empty()); - let workdir = workdir.map(|dir| context.turn.resolve_path(Some(dir))); + let workdir = workdir + .map(|dir| context.turn.resolve_path(Some(dir))) + .or(default_host_cwd); let cwd = workdir.clone().unwrap_or(cwd); let normalized_additional_permissions = match implicit_granted_permissions( sandbox_permissions, @@ -312,6 +341,8 @@ impl ToolHandler for UnifiedExecHandler { .exec_command( ExecCommandRequest { command, + environment, + host_id: request_host_id, process_id, yield_time_ms, max_output_tokens, @@ -379,11 +410,76 @@ fn emit_unified_exec_tty_metric(session_telemetry: &SessionTelemetry, tty: bool) ); } +struct ResolvedExecEnvironment { + environment: Arc, + host_id: Option, +} + +async fn resolve_exec_environment( + session: &crate::codex::Session, + turn: &crate::codex::TurnContext, + host_id: Option<&str>, +) -> Result { + let environment = match host_id { + Some(host_id) => session + .services + .environment_manager + .current_for_host(host_id) + .await + .map_err(|err| { + FunctionCallError::RespondToModel(format!( + "failed to resolve exec host `{host_id}`: {err}" + )) + })?, + None => turn.environment.clone(), + }; + + let environment = environment.ok_or_else(|| { + FunctionCallError::RespondToModel("unified exec is unavailable in this session".to_string()) + })?; + let host_id = if environment.is_remote() { + Some(match host_id { + Some(host_id) => host_id.to_string(), + None => { + let default_host_id = session.services.environment_manager.default_host_id(); + if default_host_id == codex_exec_server::LOCAL_HOST_ID { + "remote".to_string() + } else { + default_host_id + } + } + }) + } else { + None + }; + + Ok(ResolvedExecEnvironment { + environment, + host_id, + }) +} + pub(crate) fn get_command( args: &ExecCommandArgs, session_shell: Arc, shell_mode: &UnifiedExecShellMode, allow_login_shell: bool, +) -> Result, String> { + get_command_with_shell_override( + args, + session_shell, + shell_mode, + allow_login_shell, + /*shell_override*/ None, + ) +} + +fn get_command_with_shell_override( + args: &ExecCommandArgs, + session_shell: Arc, + shell_mode: &UnifiedExecShellMode, + allow_login_shell: bool, + shell_override: Option<&str>, ) -> Result, String> { let use_login_shell = match args.login { Some(true) if !allow_login_shell => { @@ -397,7 +493,8 @@ pub(crate) fn get_command( match shell_mode { UnifiedExecShellMode::Direct => { - let model_shell = args.shell.as_ref().map(|shell_str| { + let selected_shell = args.shell.as_deref().or(shell_override); + let model_shell = selected_shell.map(|shell_str| { let mut shell = get_shell_by_model_provided_path(&PathBuf::from(shell_str)); shell.shell_snapshot = crate::shell::empty_shell_snapshot_receiver(); shell diff --git a/codex-rs/core/src/tools/runtimes/unified_exec.rs b/codex-rs/core/src/tools/runtimes/unified_exec.rs index fd12607c1b7..c08015ddff4 100644 --- a/codex-rs/core/src/tools/runtimes/unified_exec.rs +++ b/codex-rs/core/src/tools/runtimes/unified_exec.rs @@ -44,12 +44,15 @@ use codex_tools::UnifiedExecShellMode; use codex_utils_absolute_path::AbsolutePathBuf; use futures::future::BoxFuture; use std::collections::HashMap; +use std::sync::Arc; /// Request payload used by the unified-exec runtime after approvals and /// sandbox preferences have been resolved for the current turn. #[derive(Clone, Debug)] pub struct UnifiedExecRequest { pub command: Vec, + pub environment: Arc, + pub host_id: Option, pub process_id: i32, pub cwd: AbsolutePathBuf, pub env: HashMap, @@ -69,6 +72,7 @@ pub struct UnifiedExecRequest { #[derive(serde::Serialize, Clone, Debug, Eq, PartialEq, Hash)] pub struct UnifiedExecApprovalKey { pub command: Vec, + pub host_id: Option, pub cwd: AbsolutePathBuf, pub tty: bool, pub sandbox_permissions: SandboxPermissions, @@ -108,6 +112,7 @@ impl Approvable for UnifiedExecRuntime<'_> { fn approval_keys(&self, req: &UnifiedExecRequest) -> Vec { vec![UnifiedExecApprovalKey { command: canonicalize_command_for_approval(&req.command), + host_id: req.host_id.clone(), cwd: req.cwd.clone(), tty: req.tty, sandbox_permissions: req.sandbox_permissions, @@ -202,11 +207,7 @@ impl<'a> ToolRuntime for UnifiedExecRunt ) -> Result { let base_command = &req.command; let session_shell = ctx.session.user_shell(); - let environment_is_remote = ctx - .turn - .environment - .as_ref() - .is_some_and(|environment| environment.is_remote()); + let environment_is_remote = req.environment.is_remote(); let command = if environment_is_remote { base_command.to_vec() } else { @@ -249,12 +250,7 @@ impl<'a> ToolRuntime for UnifiedExecRunt .await? { Some(prepared) => { - let Some(environment) = ctx.turn.environment.as_ref() else { - return Err(ToolError::Rejected( - "exec_command is unavailable in this session".to_string(), - )); - }; - if environment.is_remote() { + if req.environment.is_remote() { return Err(ToolError::Rejected( "unified_exec zsh-fork is not supported when exec_server_url is configured".to_string(), )); @@ -266,7 +262,7 @@ impl<'a> ToolRuntime for UnifiedExecRunt &prepared.exec_request, req.tty, prepared.spawn_lifecycle, - environment.as_ref(), + req.environment.as_ref(), ) .await .map_err(|err| match err { @@ -296,18 +292,13 @@ impl<'a> ToolRuntime for UnifiedExecRunt let exec_env = attempt .env_for(command, options, req.network.as_ref()) .map_err(|err| ToolError::Codex(err.into()))?; - let Some(environment) = ctx.turn.environment.as_ref() else { - return Err(ToolError::Rejected( - "exec_command is unavailable in this session".to_string(), - )); - }; self.manager .open_session_with_exec_env( req.process_id, &exec_env, req.tty, Box::new(NoopSpawnLifecycle), - environment.as_ref(), + req.environment.as_ref(), ) .await .map_err(|err| match err { diff --git a/codex-rs/core/src/unified_exec/async_watcher.rs b/codex-rs/core/src/unified_exec/async_watcher.rs index a5da0409760..a15bf707661 100644 --- a/codex-rs/core/src/unified_exec/async_watcher.rs +++ b/codex-rs/core/src/unified_exec/async_watcher.rs @@ -110,6 +110,7 @@ pub(crate) fn spawn_exit_watcher( turn_ref: Arc, call_id: String, command: Vec, + host_id: Option, cwd: PathBuf, process_id: i32, transcript: Arc>, @@ -129,6 +130,7 @@ pub(crate) fn spawn_exit_watcher( turn_ref, call_id, command, + host_id, cwd, Some(process_id.to_string()), transcript, @@ -143,6 +145,7 @@ pub(crate) fn spawn_exit_watcher( turn_ref, call_id, command, + host_id, cwd, Some(process_id.to_string()), transcript, @@ -196,6 +199,7 @@ pub(crate) async fn emit_exec_end_for_unified_exec( turn_ref: Arc, call_id: String, command: Vec, + host_id: Option, cwd: PathBuf, process_id: Option, transcript: Arc>, @@ -223,6 +227,7 @@ pub(crate) async fn emit_exec_end_for_unified_exec( cwd, ExecCommandSource::UnifiedExecStartup, process_id, + host_id, ); emitter .emit(event_ctx, ToolEventStage::Success(output)) @@ -235,6 +240,7 @@ pub(crate) async fn emit_failed_exec_end_for_unified_exec( turn_ref: Arc, call_id: String, command: Vec, + host_id: Option, cwd: PathBuf, process_id: Option, transcript: Arc>, @@ -266,6 +272,7 @@ pub(crate) async fn emit_failed_exec_end_for_unified_exec( cwd, ExecCommandSource::UnifiedExecStartup, process_id, + host_id, ); emitter .emit( diff --git a/codex-rs/core/src/unified_exec/mod.rs b/codex-rs/core/src/unified_exec/mod.rs index 3f3b018df4e..fdc6ca78008 100644 --- a/codex-rs/core/src/unified_exec/mod.rs +++ b/codex-rs/core/src/unified_exec/mod.rs @@ -27,6 +27,7 @@ use std::collections::HashSet; use std::sync::Arc; use std::sync::Weak; +use codex_exec_server::Environment; use codex_network_proxy::NetworkProxy; use codex_protocol::models::PermissionProfile; use codex_utils_absolute_path::AbsolutePathBuf; @@ -88,6 +89,8 @@ impl UnifiedExecContext { #[derive(Debug)] pub(crate) struct ExecCommandRequest { pub command: Vec, + pub environment: Arc, + pub host_id: Option, pub process_id: i32, pub yield_time_ms: u64, pub max_output_tokens: Option, diff --git a/codex-rs/core/src/unified_exec/process_manager.rs b/codex-rs/core/src/unified_exec/process_manager.rs index 9d77ac4e973..7aa502b4b34 100644 --- a/codex-rs/core/src/unified_exec/process_manager.rs +++ b/codex-rs/core/src/unified_exec/process_manager.rs @@ -192,6 +192,7 @@ impl UnifiedExecProcessManager { cwd.to_path_buf(), ExecCommandSource::UnifiedExecStartup, Some(request.process_id.to_string()), + request.host_id.clone(), ); emitter.emit(event_ctx, ToolEventStage::Begin).await; @@ -208,6 +209,7 @@ impl UnifiedExecProcessManager { Arc::clone(&process), context, &request.command, + request.host_id.clone(), cwd.clone(), start, request.process_id, @@ -255,6 +257,7 @@ impl UnifiedExecProcessManager { Arc::clone(&context.turn), context.call_id.clone(), request.command.clone(), + request.host_id.clone(), cwd.to_path_buf(), Some(request.process_id.to_string()), Arc::clone(&transcript), @@ -298,6 +301,7 @@ impl UnifiedExecProcessManager { Arc::clone(&context.turn), context.call_id.clone(), request.command.clone(), + request.host_id.clone(), cwd.to_path_buf(), Some(process_id.to_string()), Arc::clone(&transcript), @@ -526,6 +530,7 @@ impl UnifiedExecProcessManager { process: Arc, context: &UnifiedExecContext, command: &[String], + host_id: Option, cwd: AbsolutePathBuf, started_at: Instant, process_id: i32, @@ -572,6 +577,7 @@ impl UnifiedExecProcessManager { Arc::clone(&context.turn), context.call_id.clone(), command.to_vec(), + host_id, cwd.to_path_buf(), process_id, transcript, @@ -677,6 +683,8 @@ impl UnifiedExecProcessManager { .await; let req = UnifiedExecToolRequest { command: request.command.clone(), + environment: Arc::clone(&request.environment), + host_id: request.host_id.clone(), process_id: request.process_id, cwd, env, diff --git a/codex-rs/exec-server/src/client.rs b/codex-rs/exec-server/src/client.rs index 993d2ae018a..71804273a73 100644 --- a/codex-rs/exec-server/src/client.rs +++ b/codex-rs/exec-server/src/client.rs @@ -5,10 +5,13 @@ use std::time::Duration; use arc_swap::ArcSwap; use codex_app_server_protocol::JSONRPCNotification; use serde_json::Value; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; use tokio::sync::Mutex; use tokio::sync::watch; use tokio::time::timeout; +use tokio_tungstenite::client_async; use tokio_tungstenite::connect_async; use tracing::debug; @@ -187,6 +190,31 @@ impl ExecServerClient { .await } + pub async fn connect_websocket_stream( + websocket_url: String, + stream: S, + options: ExecServerClientConnectOptions, + ) -> Result + where + S: AsyncRead + AsyncWrite + Unpin + Send + 'static, + { + let (stream, _) = client_async(websocket_url.as_str(), stream) + .await + .map_err(|source| ExecServerError::WebSocketConnect { + url: websocket_url.clone(), + source, + })?; + + Self::connect( + JsonRpcConnection::from_websocket( + stream, + format!("exec-server websocket stream {websocket_url}"), + ), + options, + ) + .await + } + pub async fn initialize( &self, options: ExecServerClientConnectOptions, @@ -209,7 +237,7 @@ impl ExecServerClient { }, ) .await?; - { + if !response.session_id.is_empty() { let mut session_id = self .inner .session_id diff --git a/codex-rs/exec-server/src/environment.rs b/codex-rs/exec-server/src/environment.rs index 00b7fbb0ce2..e5409c87d10 100644 --- a/codex-rs/exec-server/src/environment.rs +++ b/codex-rs/exec-server/src/environment.rs @@ -1,6 +1,28 @@ +use std::collections::HashMap; +use std::path::Path; +use std::path::PathBuf; +use std::pin::Pin; +use std::process::Stdio; use std::sync::Arc; +use std::sync::Mutex; +use std::sync::RwLock; +use std::task::Context; +use std::task::Poll; +use std::time::Duration; +use tokio::io::AsyncBufReadExt; +use tokio::io::AsyncRead; +use tokio::io::AsyncWrite; +use tokio::io::BufReader; +use tokio::io::ReadBuf; +use tokio::process::Child; +use tokio::process::ChildStdin; +use tokio::process::ChildStdout; +use tokio::process::Command; use tokio::sync::OnceCell; +use tokio::time::Instant; +use tokio::time::sleep; +use tokio::time::timeout; use crate::ExecServerClient; use crate::ExecServerError; @@ -13,16 +35,147 @@ use crate::remote_file_system::RemoteFileSystem; use crate::remote_process::RemoteProcess; pub const CODEX_EXEC_SERVER_URL_ENV_VAR: &str = "CODEX_EXEC_SERVER_URL"; +pub const CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR: &str = "CODEX_EXEC_SERVER_SSH_HOSTS"; +pub const CODEX_EXEC_SERVER_DEFAULT_HOST_ENV_VAR: &str = "CODEX_EXEC_SERVER_DEFAULT_HOST"; +pub const LOCAL_HOST_ID: &str = "local"; +pub const DEFAULT_HOST_ALIAS: &str = "default"; -/// Lazily creates and caches the active environment for a session. +const SSH_BOOTSTRAP_TIMEOUT: Duration = Duration::from_secs(20); +const EXEC_SERVER_CONNECT_TIMEOUT: Duration = Duration::from_secs(5); +const SSH_EXEC_SERVER_CONNECT_TIMEOUT: Duration = Duration::from_secs(20); +const EXEC_SERVER_CONNECT_RETRY_DELAY: Duration = Duration::from_millis(100); +const SSH_BOOTSTRAP_PWD_MARKER: &str = "__codex_exec_server_pwd__="; +const SSH_BOOTSTRAP_SHELL_MARKER: &str = "__codex_exec_server_shell__="; + +/// A host that can provide command execution and filesystem access. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct HostConfig { + pub id: String, + pub connection: HostConnection, +} + +impl HostConfig { + pub fn local(id: impl Into) -> Self { + Self { + id: id.into(), + connection: HostConnection::Local, + } + } + + pub fn exec_server_url(id: impl Into, url: impl Into) -> Self { + Self { + id: id.into(), + connection: HostConnection::ExecServerUrl { url: url.into() }, + } + } + + pub fn ssh(id: impl Into, ssh_host: impl Into) -> Self { + Self { + id: id.into(), + connection: HostConnection::Ssh { + ssh_host: ssh_host.into(), + }, + } + } +} + +/// How Codex connects to a host. +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum HostConnection { + Local, + ExecServerUrl { url: String }, + Ssh { ssh_host: String }, +} + +struct HostEntry { + config: HostConfig, + environment: OnceCell>>, +} + +impl HostEntry { + fn new(config: HostConfig) -> Self { + Self { + config, + environment: OnceCell::new(), + } + } + + async fn environment( + &self, + disabled: bool, + ) -> Result>, ExecServerError> { + self.environment + .get_or_try_init(|| async { + if disabled { + Ok(None) + } else { + Ok(Some(Arc::new( + Environment::create_for_host_config(&self.config).await?, + ))) + } + }) + .await + .map(Option::as_ref) + .map(std::option::Option::<&Arc>::cloned) + } +} + +struct EnvironmentRegistry { + default_host_id: String, + disabled: bool, + hosts: HashMap>, +} + +impl EnvironmentRegistry { + fn new(exec_server_url: Option, disabled: bool) -> Self { + let mut hosts = HashMap::new(); + hosts.insert( + LOCAL_HOST_ID.to_string(), + Arc::new(HostEntry::new(HostConfig::local(LOCAL_HOST_ID))), + ); + + let default_host_id = if let Some(exec_server_url) = exec_server_url { + hosts.insert( + DEFAULT_HOST_ALIAS.to_string(), + Arc::new(HostEntry::new(HostConfig::exec_server_url( + DEFAULT_HOST_ALIAS, + exec_server_url, + ))), + ); + DEFAULT_HOST_ALIAS.to_string() + } else { + LOCAL_HOST_ID.to_string() + }; + + Self { + default_host_id, + disabled, + hosts, + } + } +} + +/// Lazily creates and caches execution environments for registered hosts. /// -/// The manager keeps the session's environment selection stable so subagents -/// and follow-up turns preserve an explicit disabled state. -#[derive(Debug)] +/// The registry lives above any individual thread/session. `current()` keeps +/// existing single-environment behavior, while `current_for_host()` lets a turn +/// route one command to any host already registered with the manager. pub struct EnvironmentManager { - exec_server_url: Option, - disabled: bool, - current_environment: OnceCell>>, + registry: RwLock, +} + +impl std::fmt::Debug for EnvironmentManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let registry = self + .registry + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + f.debug_struct("EnvironmentManager") + .field("default_host_id", ®istry.default_host_id) + .field("disabled", ®istry.disabled) + .field("hosts", ®istry.hosts.keys().collect::>()) + .finish() + } } impl Default for EnvironmentManager { @@ -36,71 +189,194 @@ impl EnvironmentManager { pub fn new(exec_server_url: Option) -> Self { let (exec_server_url, disabled) = normalize_exec_server_url(exec_server_url); Self { - exec_server_url, - disabled, - current_environment: OnceCell::new(), + registry: RwLock::new(EnvironmentRegistry::new(exec_server_url, disabled)), } } /// Builds a manager from process environment variables. pub fn from_env() -> Self { - Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok()) + let manager = Self::new(std::env::var(CODEX_EXEC_SERVER_URL_ENV_VAR).ok()); + if let Ok(ssh_hosts) = std::env::var(CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR) { + for host_config in parse_ssh_hosts_env(&ssh_hosts) { + if let Err(err) = manager.register_host(host_config) { + tracing::warn!( + "ignoring invalid {CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR} entry: {err}" + ); + } + } + } + if let Ok(default_host_id) = std::env::var(CODEX_EXEC_SERVER_DEFAULT_HOST_ENV_VAR) + && !default_host_id.trim().is_empty() + && let Err(err) = manager.set_default_host(default_host_id.trim()) + { + tracing::warn!( + "ignoring invalid {CODEX_EXEC_SERVER_DEFAULT_HOST_ENV_VAR} value: {err}" + ); + } + manager } /// Builds a manager from the currently selected environment, or from the /// disabled mode when no environment is available. pub fn from_environment(environment: Option<&Environment>) -> Self { match environment { - Some(environment) => Self { - exec_server_url: environment.exec_server_url().map(str::to_owned), - disabled: false, - current_environment: OnceCell::new(), - }, - None => Self { - exec_server_url: None, - disabled: true, - current_environment: OnceCell::new(), - }, + Some(environment) => { + if let Some(ssh_host) = environment + .exec_server_url() + .and_then(|url| url.strip_prefix("ssh://")) + .filter(|host| !host.is_empty()) + { + let manager = Self::new(/*exec_server_url*/ None); + if manager.register_ssh_host(ssh_host, ssh_host).is_ok() + && manager.set_default_host(ssh_host).is_ok() + { + manager + } else { + Self::new(environment.exec_server_url().map(str::to_owned)) + } + } else { + Self::new(environment.exec_server_url().map(str::to_owned)) + } + } + None => { + let manager = Self::new(/*exec_server_url*/ None); + { + let mut registry = manager + .registry + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + registry.disabled = true; + } + manager + } } } - /// Returns the remote exec-server URL when one is configured. - pub fn exec_server_url(&self) -> Option<&str> { - self.exec_server_url.as_deref() + /// Registers or replaces a host in the shared registry. + pub fn register_host(&self, host_config: HostConfig) -> Result<(), ExecServerError> { + let id = normalize_host_id(&host_config.id)?; + let mut host_config = host_config; + host_config.id = id.clone(); + let mut registry = self + .registry + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + registry + .hosts + .insert(id, Arc::new(HostEntry::new(host_config))); + Ok(()) + } + + /// Registers an SSH host. The host id defaults to the same value that SSH + /// resolves via `~/.ssh/config`. + pub fn register_ssh_host( + &self, + id: impl Into, + ssh_host: impl Into, + ) -> Result<(), ExecServerError> { + self.register_host(HostConfig::ssh(id, ssh_host)) + } + + pub fn set_default_host(&self, host_id: &str) -> Result<(), ExecServerError> { + let mut registry = self + .registry + .write() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let resolved = resolve_host_id(®istry, host_id)?; + if !registry.hosts.contains_key(&resolved) { + return Err(unknown_host_error(host_id)); + } + registry.default_host_id = resolved; + Ok(()) + } + + pub fn default_host_id(&self) -> String { + let registry = self + .registry + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + registry.default_host_id.clone() + } + + pub fn registered_host_ids(&self) -> Vec { + let registry = self + .registry + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let mut ids = registry.hosts.keys().cloned().collect::>(); + ids.sort(); + ids + } + + /// Returns the remote exec-server URL when the default host is URL-backed. + pub fn exec_server_url(&self) -> Option { + let registry = self + .registry + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + registry + .hosts + .get(®istry.default_host_id) + .and_then(|entry| match &entry.config.connection { + HostConnection::ExecServerUrl { url } => Some(url.clone()), + _ => None, + }) } - /// Returns true when this manager is configured to use a remote exec server. + /// Returns true when the default host is configured to use a remote backend. pub fn is_remote(&self) -> bool { - self.exec_server_url.is_some() + let registry = self + .registry + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + registry + .hosts + .get(®istry.default_host_id) + .is_some_and(|entry| !matches!(entry.config.connection, HostConnection::Local)) } - /// Returns the cached environment, creating it on first access. + /// Returns the cached default environment, creating it on first access. pub async fn current(&self) -> Result>, ExecServerError> { - self.current_environment - .get_or_try_init(|| async { - if self.disabled { - Ok(None) - } else { - Ok(Some(Arc::new( - Environment::create(self.exec_server_url.clone()).await?, - ))) - } - }) - .await - .map(Option::as_ref) - .map(std::option::Option::<&Arc>::cloned) + self.current_for_host(DEFAULT_HOST_ALIAS).await + } + + /// Returns the cached environment for a registered host, creating it on + /// first access. The special host id `default` resolves to the current + /// registry default. + pub async fn current_for_host( + &self, + host_id: &str, + ) -> Result>, ExecServerError> { + let (entry, disabled) = { + let registry = self + .registry + .read() + .unwrap_or_else(std::sync::PoisonError::into_inner); + let resolved_host_id = resolve_host_id(®istry, host_id)?; + let entry = registry + .hosts + .get(&resolved_host_id) + .cloned() + .ok_or_else(|| unknown_host_error(host_id))?; + (entry, registry.disabled) + }; + entry.environment(disabled).await } } -/// Concrete execution/filesystem environment selected for a session. +/// Concrete execution/filesystem environment selected for a host. /// /// This bundles the selected backend together with the corresponding remote -/// client, if any. +/// client, if any. SSH-backed environments also retain the child processes that +/// keep the remote exec-server SSH process alive. #[derive(Clone)] pub struct Environment { exec_server_url: Option, remote_exec_server_client: Option, exec_backend: Arc, + default_cwd: Option, + default_shell: Option, + _ssh_exec_server: Option>, } impl Default for Environment { @@ -109,6 +385,9 @@ impl Default for Environment { exec_server_url: None, remote_exec_server_client: None, exec_backend: Arc::new(LocalProcess::default()), + default_cwd: None, + default_shell: None, + _ssh_exec_server: None, } } } @@ -117,6 +396,8 @@ impl std::fmt::Debug for Environment { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Environment") .field("exec_server_url", &self.exec_server_url) + .field("default_cwd", &self.default_cwd) + .field("default_shell", &self.default_shell) .finish_non_exhaustive() } } @@ -131,32 +412,86 @@ impl Environment { )); } - let remote_exec_server_client = if let Some(exec_server_url) = &exec_server_url { - Some( - ExecServerClient::connect_websocket(RemoteExecServerConnectArgs { - websocket_url: exec_server_url.clone(), - client_name: "codex-environment".to_string(), - connect_timeout: std::time::Duration::from_secs(5), - initialize_timeout: std::time::Duration::from_secs(5), - resume_session_id: None, - }) - .await?, - ) - } else { - None - }; + match exec_server_url { + Some(exec_server_url) => { + Self::create_remote( + exec_server_url, + /*default_cwd*/ None, + /*default_shell*/ None, + /*ssh_exec_server*/ None, + "codex-environment", + ) + .await + } + None => Ok(Self::default()), + } + } + async fn create_for_host_config(host_config: &HostConfig) -> Result { + match &host_config.connection { + HostConnection::Local => Self::create(/*exec_server_url*/ None).await, + HostConnection::ExecServerUrl { url } => { + Self::create_remote( + url.clone(), + /*default_cwd*/ None, + /*default_shell*/ None, + /*ssh_exec_server*/ None, + format!("codex-environment-{}", host_config.id), + ) + .await + } + HostConnection::Ssh { ssh_host } => { + let bootstrap = SshExecServer::connect(ssh_host).await?; + Self::create_remote_from_client( + format!("ssh://{ssh_host}"), + bootstrap.client, + bootstrap.default_cwd.clone(), + bootstrap.default_shell.clone(), + Some(Arc::clone(&bootstrap.session)), + ) + } + } + } + + async fn create_remote( + exec_server_url: String, + default_cwd: Option, + default_shell: Option, + ssh_exec_server: Option>, + client_name: impl Into, + ) -> Result { + let remote_exec_server_client = + connect_websocket_with_retry(exec_server_url.clone(), client_name.into()).await?; let exec_backend: Arc = - if let Some(client) = remote_exec_server_client.clone() { - Arc::new(RemoteProcess::new(client)) - } else { - Arc::new(LocalProcess::default()) - }; + Arc::new(RemoteProcess::new(remote_exec_server_client.clone())); + + Ok(Self { + exec_server_url: Some(exec_server_url), + remote_exec_server_client: Some(remote_exec_server_client), + exec_backend, + default_cwd, + default_shell, + _ssh_exec_server: ssh_exec_server, + }) + } + + fn create_remote_from_client( + exec_server_url: String, + remote_exec_server_client: ExecServerClient, + default_cwd: Option, + default_shell: Option, + ssh_exec_server: Option>, + ) -> Result { + let exec_backend: Arc = + Arc::new(RemoteProcess::new(remote_exec_server_client.clone())); Ok(Self { - exec_server_url, - remote_exec_server_client, + exec_server_url: Some(exec_server_url), + remote_exec_server_client: Some(remote_exec_server_client), exec_backend, + default_cwd, + default_shell, + _ssh_exec_server: ssh_exec_server, }) } @@ -169,6 +504,14 @@ impl Environment { self.exec_server_url.as_deref() } + pub fn default_cwd(&self) -> Option<&Path> { + self.default_cwd.as_deref() + } + + pub fn default_shell(&self) -> Option<&str> { + self.default_shell.as_deref() + } + pub fn get_exec_backend(&self) -> Arc { Arc::clone(&self.exec_backend) } @@ -181,6 +524,234 @@ impl Environment { } } +struct SshExecServerBootstrap { + client: ExecServerClient, + default_cwd: Option, + default_shell: Option, + session: Arc, +} + +struct SshExecServer { + child: Mutex>, +} + +impl std::fmt::Debug for SshExecServer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("SshExecServer").finish_non_exhaustive() + } +} + +impl SshExecServer { + async fn connect(ssh_host: &str) -> Result { + let metadata = read_ssh_metadata(ssh_host).await?; + let mut child = Command::new("ssh") + .arg(ssh_host) + .arg(build_ssh_websocket_proxy_command()) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .map_err(ExecServerError::Spawn)?; + + let stdin = child.stdin.take().ok_or_else(|| { + ExecServerError::Protocol("ssh exec-server proxy stdin was not piped".to_string()) + })?; + let stdout = child.stdout.take().ok_or_else(|| { + ExecServerError::Protocol("ssh exec-server proxy stdout was not piped".to_string()) + })?; + if let Some(stderr) = child.stderr.take() { + tokio::spawn(drain_reader(stderr)); + } + + let session = Arc::new(Self { + child: Mutex::new(Some(child)), + }); + let client = timeout( + SSH_EXEC_SERVER_CONNECT_TIMEOUT, + ExecServerClient::connect_websocket_stream( + "ws://127.0.0.1/".to_string(), + ChildWebSocketStream { stdin, stdout }, + crate::ExecServerClientConnectOptions { + client_name: format!("codex-environment-ssh-{ssh_host}"), + initialize_timeout: SSH_EXEC_SERVER_CONNECT_TIMEOUT, + resume_session_id: None, + }, + ), + ) + .await + .map_err(|_| ExecServerError::WebSocketConnectTimeout { + url: format!("ssh://{ssh_host}"), + timeout: SSH_EXEC_SERVER_CONNECT_TIMEOUT, + })??; + + Ok(SshExecServerBootstrap { + client, + default_cwd: metadata.default_cwd, + default_shell: metadata.default_shell, + session, + }) + } +} + +impl Drop for SshExecServer { + fn drop(&mut self) { + if let Ok(mut child) = self.child.lock() + && let Some(child) = child.as_mut() + { + let _ = child.start_kill(); + } + } +} + +struct ChildWebSocketStream { + stdin: ChildStdin, + stdout: ChildStdout, +} + +impl AsyncRead for ChildWebSocketStream { + fn poll_read( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &mut ReadBuf<'_>, + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.stdout).poll_read(cx, buf) + } +} + +impl AsyncWrite for ChildWebSocketStream { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.stdin).poll_write(cx, buf) + } + + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.stdin).poll_flush(cx) + } + + fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.get_mut(); + Pin::new(&mut this.stdin).poll_shutdown(cx) + } +} + +struct SshMetadata { + default_cwd: Option, + default_shell: Option, +} + +async fn drain_reader(reader: R) +where + R: AsyncRead + Unpin + Send + 'static, +{ + drain_buf_reader(BufReader::new(reader)).await; +} + +async fn drain_buf_reader(mut reader: BufReader) +where + R: AsyncRead + Unpin + Send + 'static, +{ + let mut line = String::new(); + while reader.read_line(&mut line).await.unwrap_or(0) > 0 { + line.clear(); + } +} + +async fn read_ssh_metadata(ssh_host: &str) -> Result { + let output = timeout( + SSH_BOOTSTRAP_TIMEOUT, + Command::new("ssh") + .arg(ssh_host) + .arg(build_ssh_metadata_command()) + .output(), + ) + .await + .map_err(|_| { + ExecServerError::Protocol(format!( + "timed out waiting for ssh metadata after {SSH_BOOTSTRAP_TIMEOUT:?}" + )) + })? + .map_err(ExecServerError::Spawn)?; + + if !output.status.success() { + return Err(ExecServerError::Protocol(format!( + "ssh metadata command failed with status {}", + output.status + ))); + } + + let mut default_cwd = None; + let mut default_shell = None; + for line in String::from_utf8_lossy(&output.stdout).lines() { + let line = line.trim(); + if let Some(value) = line.strip_prefix(SSH_BOOTSTRAP_PWD_MARKER) { + if !value.is_empty() { + default_cwd = Some(PathBuf::from(value)); + } + } else if let Some(value) = line.strip_prefix(SSH_BOOTSTRAP_SHELL_MARKER) + && !value.is_empty() + { + default_shell = Some(value.to_string()); + } + } + + Ok(SshMetadata { + default_cwd, + default_shell, + }) +} + +fn build_ssh_metadata_command() -> String { + let script = format!( + "printf '{SSH_BOOTSTRAP_PWD_MARKER}%s\\n' \"$PWD\"; printf '{SSH_BOOTSTRAP_SHELL_MARKER}%s\\n' \"${{SHELL:-}}\"" + ); + let quoted_script = shell_single_quote(&script); + format!( + "if command -v zsh >/dev/null 2>&1; then exec zsh -lc {quoted_script}; fi; if command -v bash >/dev/null 2>&1; then exec bash -lc {quoted_script}; fi; exec sh -lc {quoted_script}" + ) +} + +fn build_ssh_websocket_proxy_command() -> String { + let script = r#"out="${TMPDIR:-/tmp}/codex-exec-server.$$.out"; err="${TMPDIR:-/tmp}/codex-exec-server.$$.err"; rm -f "$out" "$err"; cleanup() { if [ -n "${pid:-}" ]; then kill "$pid" >/dev/null 2>&1 || true; fi; rm -f "$out" "$err"; }; trap cleanup EXIT HUP INT TERM; codex exec-server --listen ws://127.0.0.1:0 >"$out" 2>"$err" & pid=$!; i=0; port=""; while [ "$i" -lt 100 ]; do if ! kill -0 "$pid" >/dev/null 2>&1; then cat "$err" >&2; exit 1; fi; port="$(sed -n 's#^ws://127\.0\.0\.1:\([0-9][0-9]*\)$#\1#p' "$out" | tail -1)"; if [ -n "$port" ]; then break; fi; i=$((i+1)); sleep 0.1; done; if [ -z "$port" ]; then echo "timed out waiting for codex exec-server URL" >&2; cat "$err" >&2; exit 1; fi; nc 127.0.0.1 "$port"; status=$?; cleanup; exit "$status""#; + let quoted_script = shell_single_quote(script); + format!( + "if command -v zsh >/dev/null 2>&1; then exec zsh -lc {quoted_script}; fi; if command -v bash >/dev/null 2>&1; then exec bash -lc {quoted_script}; fi; exec sh -lc {quoted_script}" + ) +} + +fn shell_single_quote(value: &str) -> String { + format!("'{}'", value.replace('\'', "'\\''")) +} + +async fn connect_websocket_with_retry( + exec_server_url: String, + client_name: String, +) -> Result { + let deadline = Instant::now() + EXEC_SERVER_CONNECT_TIMEOUT; + loop { + match ExecServerClient::connect_websocket(RemoteExecServerConnectArgs { + websocket_url: exec_server_url.clone(), + client_name: client_name.clone(), + connect_timeout: Duration::from_secs(1), + initialize_timeout: EXEC_SERVER_CONNECT_TIMEOUT, + resume_session_id: None, + }) + .await + { + Ok(client) => return Ok(client), + Err(_err) if Instant::now() < deadline => { + sleep(EXEC_SERVER_CONNECT_RETRY_DELAY).await; + } + Err(err) => return Err(err), + } + } +} + fn normalize_exec_server_url(exec_server_url: Option) -> (Option, bool) { match exec_server_url.as_deref().map(str::trim) { None | Some("") => (None, false), @@ -188,12 +759,70 @@ fn normalize_exec_server_url(exec_server_url: Option) -> (Option Some(url) => (Some(url.to_string()), false), } } + +fn normalize_host_id(host_id: &str) -> Result { + let host_id = host_id.trim(); + if host_id.is_empty() { + return Err(ExecServerError::Protocol( + "host id cannot be empty".to_string(), + )); + } + Ok(host_id.to_string()) +} + +fn resolve_host_id( + registry: &EnvironmentRegistry, + host_id: &str, +) -> Result { + let host_id = normalize_host_id(host_id)?; + if host_id == DEFAULT_HOST_ALIAS { + Ok(registry.default_host_id.clone()) + } else { + Ok(host_id) + } +} + +fn unknown_host_error(host_id: &str) -> ExecServerError { + ExecServerError::Protocol(format!( + "unknown exec host `{host_id}`; registered hosts are configured through `{CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR}`" + )) +} + +fn parse_ssh_hosts_env(value: &str) -> Vec { + value + .split(',') + .filter_map(|entry| { + let entry = entry.trim(); + if entry.is_empty() { + return None; + } + let (id, ssh_host) = entry + .split_once('=') + .or_else(|| entry.split_once(':')) + .unwrap_or((entry, entry)); + let id = id.trim(); + let ssh_host = ssh_host.trim(); + if id.is_empty() || ssh_host.is_empty() { + None + } else { + Some(HostConfig::ssh(id, ssh_host)) + } + }) + .collect() +} + #[cfg(test)] mod tests { use std::sync::Arc; + use super::CODEX_EXEC_SERVER_DEFAULT_HOST_ENV_VAR; + use super::CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR; + use super::DEFAULT_HOST_ALIAS; use super::Environment; use super::EnvironmentManager; + use super::HostConfig; + use super::LOCAL_HOST_ID; + use super::parse_ssh_hosts_env; use crate::ProcessId; use pretty_assertions::assert_eq; @@ -211,16 +840,15 @@ mod tests { fn environment_manager_normalizes_empty_url() { let manager = EnvironmentManager::new(Some(String::new())); - assert!(!manager.disabled); assert_eq!(manager.exec_server_url(), None); assert!(!manager.is_remote()); + assert_eq!(manager.default_host_id(), LOCAL_HOST_ID); } #[test] fn environment_manager_treats_none_value_as_disabled() { let manager = EnvironmentManager::new(Some("none".to_string())); - assert!(manager.disabled); assert_eq!(manager.exec_server_url(), None); assert!(!manager.is_remote()); } @@ -230,7 +858,43 @@ mod tests { let manager = EnvironmentManager::new(Some("ws://127.0.0.1:8765".to_string())); assert!(manager.is_remote()); - assert_eq!(manager.exec_server_url(), Some("ws://127.0.0.1:8765")); + assert_eq!( + manager.exec_server_url().as_deref(), + Some("ws://127.0.0.1:8765") + ); + assert_eq!(manager.default_host_id(), DEFAULT_HOST_ALIAS); + } + + #[test] + fn environment_manager_registers_ssh_hosts() { + let manager = EnvironmentManager::new(/*exec_server_url*/ None); + + manager + .register_ssh_host("host-a", "host-a.example") + .expect("register ssh host"); + manager + .set_default_host("host-a") + .expect("set default host"); + + assert_eq!(manager.default_host_id(), "host-a"); + assert_eq!( + manager.registered_host_ids(), + vec!["host-a".to_string(), LOCAL_HOST_ID.to_string()] + ); + assert!(manager.is_remote()); + assert_eq!(manager.exec_server_url(), None); + } + + #[test] + fn parse_ssh_hosts_env_accepts_aliases() { + assert_eq!( + parse_ssh_hosts_env("host-a, build=build-host, staging:staging-host"), + vec![ + HostConfig::ssh("host-a", "host-a"), + HostConfig::ssh("build", "build-host"), + HostConfig::ssh("staging", "staging-host"), + ] + ); } #[tokio::test] @@ -246,6 +910,24 @@ mod tests { assert!(Arc::ptr_eq(&first, &second)); } + #[tokio::test] + async fn environment_manager_current_for_host_caches_per_host() { + let manager = EnvironmentManager::new(/*exec_server_url*/ None); + + let default = manager + .current_for_host(DEFAULT_HOST_ALIAS) + .await + .expect("get default environment") + .expect("default environment"); + let local = manager + .current_for_host(LOCAL_HOST_ID) + .await + .expect("get local environment") + .expect("local environment"); + + assert!(Arc::ptr_eq(&default, &local)); + } + #[tokio::test] async fn disabled_environment_manager_has_no_current_environment() { let manager = EnvironmentManager::new(Some("none".to_string())); @@ -278,4 +960,76 @@ mod tests { assert_eq!(response.process.process_id().as_str(), "default-env-proc"); } + + #[tokio::test] + #[ignore] + async fn ssh_environment_manager_bootstraps_configured_host() { + let Ok(host) = std::env::var("CODEX_TEST_SSH_HOST") else { + eprintln!("set CODEX_TEST_SSH_HOST to an SSH config host to run this smoke test"); + return; + }; + let manager = EnvironmentManager::new(/*exec_server_url*/ None); + manager + .register_ssh_host(&host, &host) + .expect("register ssh host"); + manager.set_default_host(&host).expect("set default host"); + + let environment = manager + .current() + .await + .expect("bootstrap ssh environment") + .expect("ssh environment"); + let cwd = environment + .default_cwd() + .expect("ssh bootstrap should report cwd") + .to_path_buf(); + let response = environment + .get_exec_backend() + .start(crate::ExecParams { + process_id: ProcessId::from("ssh-env-proc"), + argv: vec![ + environment.default_shell().unwrap_or("/bin/sh").to_string(), + "-lc".to_string(), + "printf remote:%s \"$(uname -s)\"".to_string(), + ], + cwd, + env: Default::default(), + tty: false, + arg0: None, + }) + .await + .expect("start remote process"); + + assert_eq!(response.process.process_id().as_str(), "ssh-env-proc"); + let output = response + .process + .read( + /*after_seq*/ None, + /*max_bytes*/ Some(1024), + /*wait_ms*/ Some(5000), + ) + .await + .expect("read remote process"); + let text = output + .chunks + .iter() + .map(|chunk| String::from_utf8_lossy(&chunk.chunk.0).to_string()) + .collect::(); + assert!( + text.starts_with("remote:"), + "expected remote uname output, got {text:?}" + ); + } + + #[test] + fn env_var_names_are_stable() { + assert_eq!( + CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR, + "CODEX_EXEC_SERVER_SSH_HOSTS" + ); + assert_eq!( + CODEX_EXEC_SERVER_DEFAULT_HOST_ENV_VAR, + "CODEX_EXEC_SERVER_DEFAULT_HOST" + ); + } } diff --git a/codex-rs/exec-server/src/lib.rs b/codex-rs/exec-server/src/lib.rs index eccb9c91bb5..1fac05352ce 100644 --- a/codex-rs/exec-server/src/lib.rs +++ b/codex-rs/exec-server/src/lib.rs @@ -17,9 +17,15 @@ pub use client::ExecServerClient; pub use client::ExecServerError; pub use client_api::ExecServerClientConnectOptions; pub use client_api::RemoteExecServerConnectArgs; +pub use environment::CODEX_EXEC_SERVER_DEFAULT_HOST_ENV_VAR; +pub use environment::CODEX_EXEC_SERVER_SSH_HOSTS_ENV_VAR; pub use environment::CODEX_EXEC_SERVER_URL_ENV_VAR; +pub use environment::DEFAULT_HOST_ALIAS; pub use environment::Environment; pub use environment::EnvironmentManager; +pub use environment::HostConfig; +pub use environment::HostConnection; +pub use environment::LOCAL_HOST_ID; pub use file_system::CopyOptions; pub use file_system::CreateDirectoryOptions; pub use file_system::ExecutorFileSystem; diff --git a/codex-rs/exec-server/src/protocol.rs b/codex-rs/exec-server/src/protocol.rs index 54034bdc566..f30ec03845e 100644 --- a/codex-rs/exec-server/src/protocol.rs +++ b/codex-rs/exec-server/src/protocol.rs @@ -53,6 +53,7 @@ pub struct InitializeParams { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct InitializeResponse { + #[serde(default, alias = "session_id")] pub session_id: String, } @@ -293,3 +294,15 @@ mod base64_bytes { .map_err(serde::de::Error::custom) } } + +#[cfg(test)] +mod tests { + use super::InitializeResponse; + + #[test] + fn initialize_response_accepts_legacy_empty_payload() { + let response: InitializeResponse = + serde_json::from_str("{}").expect("legacy initialize response should deserialize"); + assert!(response.session_id.is_empty()); + } +} diff --git a/codex-rs/exec-server/src/server/handler/tests.rs b/codex-rs/exec-server/src/server/handler/tests.rs index 7b16ae53514..17867bbb04e 100644 --- a/codex-rs/exec-server/src/server/handler/tests.rs +++ b/codex-rs/exec-server/src/server/handler/tests.rs @@ -275,7 +275,7 @@ async fn output_and_exit_are_retained_after_notification_receiver_closes() { process_id.as_str(), shell_argv( "sleep 0.05; printf 'first\\n'; sleep 0.05; printf 'second\\n'", - "echo first && ping -n 2 127.0.0.1 >NUL && echo second", + "echo first&& ping -n 2 127.0.0.1 >NUL&& echo second", ), )) .await diff --git a/codex-rs/exec/tests/event_processor_with_json_output.rs b/codex-rs/exec/tests/event_processor_with_json_output.rs index bb293665619..fd708298318 100644 --- a/codex-rs/exec/tests/event_processor_with_json_output.rs +++ b/codex-rs/exec/tests/event_processor_with_json_output.rs @@ -167,6 +167,7 @@ fn command_execution_started_and_completed_translate_to_thread_events() { command: "ls".to_string(), cwd: PathBuf::from("/tmp/project"), process_id: Some("123".to_string()), + host_id: None, source: CommandExecutionSource::UserShell, status: ApiCommandExecutionStatus::InProgress, command_actions: Vec::::new(), @@ -206,6 +207,7 @@ fn command_execution_started_and_completed_translate_to_thread_events() { command: "ls".to_string(), cwd: PathBuf::from("/tmp/project"), process_id: Some("123".to_string()), + host_id: None, source: CommandExecutionSource::UserShell, status: ApiCommandExecutionStatus::Completed, command_actions: Vec::::new(), @@ -1280,6 +1282,7 @@ fn turn_completion_reconciles_started_items_from_turn_items() { command: "ls".to_string(), cwd: PathBuf::from("/tmp/project"), process_id: Some("123".to_string()), + host_id: None, source: CommandExecutionSource::UserShell, status: ApiCommandExecutionStatus::InProgress, command_actions: Vec::::new(), @@ -1318,6 +1321,7 @@ fn turn_completion_reconciles_started_items_from_turn_items() { command: "ls".to_string(), cwd: PathBuf::from("/tmp/project"), process_id: Some("123".to_string()), + host_id: None, source: CommandExecutionSource::UserShell, status: ApiCommandExecutionStatus::Completed, command_actions: Vec::::new(), diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index 44693033d81..613cddfa04c 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -3027,6 +3027,10 @@ pub struct ExecCommandBeginEvent { /// Where the command originated. Defaults to Agent for backward compatibility. #[serde(default)] pub source: ExecCommandSource, + /// Remote host id when the command is executed outside the local environment. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub host_id: Option, /// Raw input sent to a unified exec session (if this is an interaction event). #[serde(default, skip_serializing_if = "Option::is_none")] #[ts(optional)] @@ -3051,6 +3055,10 @@ pub struct ExecCommandEndEvent { /// Where the command originated. Defaults to Agent for backward compatibility. #[serde(default)] pub source: ExecCommandSource, + /// Remote host id when the command is executed outside the local environment. + #[serde(default, skip_serializing_if = "Option::is_none")] + #[ts(optional)] + pub host_id: Option, /// Raw input sent to a unified exec session (if this is an interaction event). #[serde(default, skip_serializing_if = "Option::is_none")] #[ts(optional)] diff --git a/codex-rs/tools/src/local_tool.rs b/codex-rs/tools/src/local_tool.rs index 3e369ab1e33..55b6f7f2d93 100644 --- a/codex-rs/tools/src/local_tool.rs +++ b/codex-rs/tools/src/local_tool.rs @@ -35,6 +35,13 @@ pub fn create_exec_command_tool(options: CommandToolOptions) -> ToolSpec { "Shell binary to launch. Defaults to the user's default shell.".to_string(), )), ), + ( + "host_id".to_string(), + JsonSchema::string(Some( + "Optional registered exec host to run the command on; omit for the default host." + .to_string(), + )), + ), ( "tty".to_string(), JsonSchema::boolean(Some( diff --git a/codex-rs/tools/src/local_tool_tests.rs b/codex-rs/tools/src/local_tool_tests.rs index b751545b3ac..3abea0ba4ba 100644 --- a/codex-rs/tools/src/local_tool_tests.rs +++ b/codex-rs/tools/src/local_tool_tests.rs @@ -126,6 +126,13 @@ fn exec_command_tool_matches_expected_spec() { "Shell binary to launch. Defaults to the user's default shell.".to_string(), )), ), + ( + "host_id".to_string(), + JsonSchema::string(Some( + "Optional registered exec host to run the command on; omit for the default host." + .to_string(), + )), + ), ( "tty".to_string(), JsonSchema::boolean(Some( diff --git a/codex-rs/tui/src/app/app_server_adapter.rs b/codex-rs/tui/src/app/app_server_adapter.rs index 5df4c18e54a..190a1f0c375 100644 --- a/codex-rs/tui/src/app/app_server_adapter.rs +++ b/codex-rs/tui/src/app/app_server_adapter.rs @@ -900,6 +900,7 @@ fn command_execution_started_event(turn_id: &str, item: &ThreadItem) -> Option Option Option command, cwd, process_id, + host_id, source, status, command_actions, @@ -982,6 +985,7 @@ fn command_execution_completed_event(turn_id: &str, item: &ThreadItem) -> Option .map(codex_app_server_protocol::CommandAction::into_core) .collect(), source: source.to_core(), + host_id: host_id.clone(), interaction_input: None, stdout: String::new(), stderr: String::new(), @@ -1181,6 +1185,7 @@ mod tests { command: "printf 'hello world\\n'".to_string(), cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: CommandExecutionSource::UserShell, status: CommandExecutionStatus::InProgress, command_actions: vec![CommandAction::Unknown { @@ -1237,6 +1242,7 @@ mod tests { command: "printf 'hello world\\n'".to_string(), cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: CommandExecutionSource::UserShell, status: CommandExecutionStatus::Completed, command_actions: vec![CommandAction::Unknown { @@ -1274,6 +1280,7 @@ mod tests { command: r#"C:\Program Files\Git\bin\bash.exe -lc "echo hi""#.to_string(), cwd: PathBuf::from("C:\\repo"), process_id: None, + host_id: None, source: CommandExecutionSource::UserShell, status: CommandExecutionStatus::InProgress, command_actions: vec![], @@ -1322,6 +1329,7 @@ mod tests { command: "printf 'hello world\\n'".to_string(), cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: CommandExecutionSource::UserShell, status: CommandExecutionStatus::Completed, command_actions: vec![CommandAction::Unknown { diff --git a/codex-rs/tui/src/chatwidget.rs b/codex-rs/tui/src/chatwidget.rs index 74b20328923..c33edceba46 100644 --- a/codex-rs/tui/src/chatwidget.rs +++ b/codex-rs/tui/src/chatwidget.rs @@ -397,6 +397,7 @@ struct RunningCommand { command: Vec, parsed_cmd: Vec, source: ExecCommandSource, + host_id: Option, } struct UnifiedExecProcessSummary { @@ -4319,9 +4320,14 @@ impl ChatWidget { if self.suppressed_exec_calls.remove(&ev.call_id) { return; } - let (command, parsed, source) = match running { - Some(rc) => (rc.command, rc.parsed_cmd, rc.source), - None => (ev.command.clone(), ev.parsed_cmd.clone(), ev.source), + let (command, parsed, source, host_id) = match running { + Some(rc) => (rc.command, rc.parsed_cmd, rc.source, rc.host_id), + None => ( + ev.command.clone(), + ev.parsed_cmd.clone(), + ev.source, + ev.host_id.clone(), + ), }; let parsed = self.annotate_skill_reads_in_parsed_cmd(parsed); let is_unified_exec_interaction = @@ -4382,6 +4388,7 @@ impl ChatWidget { command, parsed, source, + host_id, ev.interaction_input.clone(), self.config.animations, ); @@ -4403,6 +4410,7 @@ impl ChatWidget { command, parsed, source, + host_id, ev.interaction_input.clone(), self.config.animations, ); @@ -4550,6 +4558,7 @@ impl ChatWidget { command: ev.command.clone(), parsed_cmd: parsed_cmd.clone(), source: ev.source, + host_id: ev.host_id.clone(), }, ); let is_wait_interaction = matches!(ev.source, ExecCommandSource::UnifiedExecInteraction) @@ -4583,6 +4592,7 @@ impl ChatWidget { ev.command.clone(), parsed_cmd.clone(), ev.source, + ev.host_id.clone(), interaction_input.clone(), ) { @@ -4596,6 +4606,7 @@ impl ChatWidget { ev.command.clone(), parsed_cmd, ev.source, + ev.host_id, interaction_input, self.config.animations, ))); @@ -6184,6 +6195,7 @@ impl ChatWidget { command, cwd, process_id, + host_id, source, status, command_actions, @@ -6206,6 +6218,7 @@ impl ChatWidget { .map(codex_app_server_protocol::CommandAction::into_core) .collect(), source: source.to_core(), + host_id, interaction_input: None, }); } else { @@ -6221,6 +6234,7 @@ impl ChatWidget { .map(codex_app_server_protocol::CommandAction::into_core) .collect(), source: source.to_core(), + host_id, interaction_input: None, stdout: String::new(), stderr: String::new(), @@ -6777,6 +6791,7 @@ impl ChatWidget { command, cwd, process_id, + host_id, source, command_actions, .. @@ -6792,6 +6807,7 @@ impl ChatWidget { .map(codex_app_server_protocol::CommandAction::into_core) .collect(), source: source.to_core(), + host_id, interaction_input: None, }); } diff --git a/codex-rs/tui/src/chatwidget/tests/app_server.rs b/codex-rs/tui/src/chatwidget/tests/app_server.rs index 085e4976d49..825ae5062f9 100644 --- a/codex-rs/tui/src/chatwidget/tests/app_server.rs +++ b/codex-rs/tui/src/chatwidget/tests/app_server.rs @@ -193,6 +193,7 @@ async fn live_app_server_command_execution_strips_shell_wrapper() { command: command.clone(), cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: AppServerCommandExecutionSource::UserShell, status: AppServerCommandExecutionStatus::InProgress, command_actions: vec![AppServerCommandAction::Unknown { @@ -214,6 +215,7 @@ async fn live_app_server_command_execution_strips_shell_wrapper() { command, cwd: PathBuf::from("/tmp"), process_id: None, + host_id: None, source: AppServerCommandExecutionSource::UserShell, status: AppServerCommandExecutionStatus::Completed, command_actions: vec![AppServerCommandAction::Unknown { @@ -240,6 +242,51 @@ async fn live_app_server_command_execution_strips_shell_wrapper() { ); } +#[tokio::test] +async fn live_app_server_command_execution_shows_remote_host() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + chat.on_task_started(); + + let script = "echo remote"; + let command = + shlex::try_join(["/bin/zsh", "-lc", script]).expect("round-trippable shell wrapper"); + + chat.handle_server_notification( + ServerNotification::ItemCompleted(ItemCompletedNotification { + thread_id: "thread-1".to_string(), + turn_id: "turn-1".to_string(), + item: AppServerThreadItem::CommandExecution { + id: "cmd-remote".to_string(), + command, + cwd: PathBuf::from("/Users/nlieb"), + process_id: Some("proc-remote".to_string()), + host_id: Some("mac".to_string()), + source: AppServerCommandExecutionSource::UnifiedExecStartup, + status: AppServerCommandExecutionStatus::Completed, + command_actions: vec![AppServerCommandAction::Unknown { + command: script.to_string(), + }], + aggregated_output: Some("remote\n".to_string()), + exit_code: Some(0), + duration_ms: Some(5), + }, + }), + /*replay_kind*/ None, + ); + + let cells = drain_insert_history(&mut rx); + assert_eq!( + cells.len(), + 1, + "expected one completed command history cell" + ); + let blob = lines_to_single_string(cells.first().expect("command cell")); + assert!( + blob.contains("• Ran on mac echo remote"), + "expected remote host in app-server command row: {blob:?}" + ); +} + #[test] fn app_server_patch_changes_to_core_preserves_diffs() { let changes = app_server_patch_changes_to_core(vec![FileUpdateChange { diff --git a/codex-rs/tui/src/chatwidget/tests/exec_flow.rs b/codex-rs/tui/src/chatwidget/tests/exec_flow.rs index d271fc0076c..d7fe16057f1 100644 --- a/codex-rs/tui/src/chatwidget/tests/exec_flow.rs +++ b/codex-rs/tui/src/chatwidget/tests/exec_flow.rs @@ -346,6 +346,82 @@ async fn exec_history_cell_shows_working_then_failed() { assert!(blob.to_lowercase().contains("bloop"), "expected error text"); } +#[tokio::test] +async fn exec_history_cell_shows_remote_host_while_running() { + let (mut chat, _rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + + let command = vec![ + "bash".to_string(), + "-lc".to_string(), + "printf remote".to_string(), + ]; + let parsed_cmd = vec![ParsedCommand::Unknown { + cmd: "printf remote".to_string(), + }]; + chat.handle_codex_event(Event { + id: "call-remote-running".to_string(), + msg: EventMsg::ExecCommandBegin(ExecCommandBeginEvent { + call_id: "call-remote-running".to_string(), + process_id: Some("proc-remote".to_string()), + turn_id: "turn-1".to_string(), + command, + cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")), + parsed_cmd, + source: ExecCommandSource::Agent, + host_id: Some("remote-host".to_string()), + interaction_input: None, + }), + }); + + let blob = active_blob(&chat); + assert!( + blob.contains("• Running on remote-host"), + "expected remote host in running header: {blob:?}" + ); +} + +#[tokio::test] +async fn exec_history_cell_shows_remote_host_after_completion() { + let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; + chat.on_task_started(); + + let command = vec![ + "bash".to_string(), + "-lc".to_string(), + "echo remote".to_string(), + ]; + let parsed_cmd = codex_shell_command::parse_command::parse_command(&command); + chat.handle_codex_event(Event { + id: "call-remote-complete".to_string(), + msg: EventMsg::ExecCommandEnd(ExecCommandEndEvent { + call_id: "call-remote-complete".to_string(), + process_id: Some("proc-remote".to_string()), + turn_id: "turn-1".to_string(), + command, + cwd: std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")), + parsed_cmd, + source: ExecCommandSource::UnifiedExecStartup, + host_id: Some("remote-host".to_string()), + interaction_input: None, + stdout: "remote\n".to_string(), + stderr: String::new(), + aggregated_output: "remote\n".to_string(), + exit_code: 0, + duration: std::time::Duration::from_millis(5), + formatted_output: "remote\n".to_string(), + status: CoreExecCommandStatus::Completed, + }), + }); + + let cells = drain_insert_history(&mut rx); + assert_eq!(cells.len(), 1, "expected finalized remote exec cell"); + let blob = lines_to_single_string(&cells[0]); + assert!( + blob.contains("• Ran on remote-host echo remote"), + "expected remote host in completed header: {blob:?}" + ); +} + #[tokio::test] async fn exec_end_without_begin_uses_event_command() { let (mut chat, mut rx, _op_rx) = make_chatwidget_manual(/*model_override*/ None).await; @@ -366,6 +442,7 @@ async fn exec_end_without_begin_uses_event_command() { cwd, parsed_cmd, source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, stdout: "done".to_string(), stderr: String::new(), diff --git a/codex-rs/tui/src/chatwidget/tests/helpers.rs b/codex-rs/tui/src/chatwidget/tests/helpers.rs index f5241fac5d5..110f5ac988c 100644 --- a/codex-rs/tui/src/chatwidget/tests/helpers.rs +++ b/codex-rs/tui/src/chatwidget/tests/helpers.rs @@ -493,6 +493,7 @@ pub(super) fn begin_exec_with_source( cwd, parsed_cmd, source, + host_id: None, interaction_input, }; chat.handle_codex_event(Event { @@ -518,6 +519,7 @@ pub(super) fn begin_unified_exec_startup( cwd, parsed_cmd: Vec::new(), source: ExecCommandSource::UnifiedExecStartup, + host_id: None, interaction_input: None, }; chat.handle_codex_event(Event { @@ -632,6 +634,7 @@ pub(super) fn end_exec( cwd, parsed_cmd, source, + host_id, interaction_input, process_id, } = begin_event; @@ -645,6 +648,7 @@ pub(super) fn end_exec( cwd, parsed_cmd, source, + host_id, interaction_input, stdout: stdout.to_string(), stderr: stderr.to_string(), diff --git a/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs b/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs index 50de7064cc5..973c9e6054a 100644 --- a/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs +++ b/codex-rs/tui/src/chatwidget/tests/status_and_layout.rs @@ -1566,6 +1566,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() { cwd: cwd.clone(), parsed_cmd: parsed_cmd.clone(), source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, }), }); @@ -1579,6 +1580,7 @@ async fn chatwidget_exec_and_status_layout_vt100_snapshot() { cwd, parsed_cmd, source: ExecCommandSource::Agent, + host_id: None, interaction_input: None, stdout: String::new(), stderr: String::new(), diff --git a/codex-rs/tui/src/exec_cell/model.rs b/codex-rs/tui/src/exec_cell/model.rs index 878d42c711b..3bf6504eb8b 100644 --- a/codex-rs/tui/src/exec_cell/model.rs +++ b/codex-rs/tui/src/exec_cell/model.rs @@ -27,6 +27,7 @@ pub(crate) struct ExecCall { pub(crate) parsed: Vec, pub(crate) output: Option, pub(crate) source: ExecCommandSource, + pub(crate) host_id: Option, pub(crate) start_time: Option, pub(crate) duration: Option, pub(crate) interaction_input: Option, @@ -52,6 +53,7 @@ impl ExecCell { command: Vec, parsed: Vec, source: ExecCommandSource, + host_id: Option, interaction_input: Option, ) -> Option { let call = ExecCall { @@ -60,11 +62,18 @@ impl ExecCell { parsed, output: None, source, + host_id, start_time: Some(Instant::now()), duration: None, interaction_input, }; - if self.is_exploring_cell() && Self::is_exploring_call(&call) { + if self.is_exploring_cell() + && Self::is_exploring_call(&call) + && self + .calls + .iter() + .all(|existing| existing.host_id == call.host_id) + { Some(Self { calls: [self.calls.clone(), vec![call]].concat(), animations_enabled: self.animations_enabled, diff --git a/codex-rs/tui/src/exec_cell/render.rs b/codex-rs/tui/src/exec_cell/render.rs index 423217a6f66..d48f25cd37a 100644 --- a/codex-rs/tui/src/exec_cell/render.rs +++ b/codex-rs/tui/src/exec_cell/render.rs @@ -43,6 +43,7 @@ pub(crate) fn new_active_exec_command( command: Vec, parsed: Vec, source: ExecCommandSource, + host_id: Option, interaction_input: Option, animations_enabled: bool, ) -> ExecCell { @@ -53,6 +54,7 @@ pub(crate) fn new_active_exec_command( parsed, output: None, source, + host_id, start_time: Some(Instant::now()), duration: None, interaction_input, @@ -61,6 +63,13 @@ pub(crate) fn new_active_exec_command( ) } +fn remote_host_title_suffix(host_id: Option<&str>) -> String { + host_id + .filter(|host_id| !host_id.trim().is_empty()) + .map(|host_id| format!(" on {host_id}")) + .unwrap_or_default() +} + fn format_unified_exec_interaction(command: &[String], input: Option<&str>) -> String { let command_display = if let Some((_, script)) = extract_bash_command(command) { script.to_string() @@ -261,6 +270,8 @@ impl ExecCell { fn exploring_display_lines(&self, width: u16) -> Vec> { let mut out: Vec> = Vec::new(); + let host_suffix = + remote_host_title_suffix(self.calls.first().and_then(|call| call.host_id.as_deref())); out.push(Line::from(vec![ if self.is_active() { spinner(self.active_start_time(), self.animations_enabled()) @@ -269,9 +280,9 @@ impl ExecCell { }, " ".into(), if self.is_active() { - "Exploring".bold() + format!("Exploring{host_suffix}").bold() } else { - "Explored".bold() + format!("Explored{host_suffix}").bold() }, ])); @@ -375,13 +386,16 @@ impl ExecCell { }; let is_interaction = call.is_unified_exec_interaction(); let title = if is_interaction { - "" + String::new() } else if self.is_active() { - "Running" + format!( + "Running{}", + remote_host_title_suffix(call.host_id.as_deref()) + ) } else if call.is_user_shell_command() { - "You ran" + "You ran".to_string() } else { - "Ran" + format!("Ran{}", remote_host_title_suffix(call.host_id.as_deref())) }; let mut header_line = if is_interaction { @@ -784,6 +798,7 @@ mod tests { parsed: Vec::new(), output: Some(output), source: ExecCommandSource::UserShell, + host_id: None, start_time: None, duration: None, interaction_input: None, @@ -933,6 +948,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::UserShell, + host_id: None, start_time: None, duration: None, interaction_input: None, @@ -970,6 +986,7 @@ mod tests { }], output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: None, duration: None, interaction_input: None, @@ -1011,6 +1028,7 @@ mod tests { aggregated_output: url.to_string(), }), source: ExecCommandSource::UserShell, + host_id: None, start_time: None, duration: None, interaction_input: None, @@ -1048,6 +1066,7 @@ mod tests { aggregated_output: url.to_string(), }), source: ExecCommandSource::Agent, + host_id: None, start_time: None, duration: None, interaction_input: None, diff --git a/codex-rs/tui/src/history_cell.rs b/codex-rs/tui/src/history_cell.rs index 987005db02e..0a1afee0601 100644 --- a/codex-rs/tui/src/history_cell.rs +++ b/codex-rs/tui/src/history_cell.rs @@ -3959,6 +3959,7 @@ mod tests { ], output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -3986,6 +3987,7 @@ mod tests { }], output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4005,6 +4007,7 @@ mod tests { path: "shimmer.rs".into(), }], ExecCommandSource::Agent, + /*host_id*/ None, /*interaction_input*/ None, ) .unwrap(); @@ -4020,6 +4023,7 @@ mod tests { path: "status_indicator_widget.rs".into(), }], ExecCommandSource::Agent, + /*host_id*/ None, /*interaction_input*/ None, ) .unwrap(); @@ -4055,6 +4059,7 @@ mod tests { ], output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4079,6 +4084,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4105,6 +4111,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4129,6 +4136,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4152,6 +4160,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4176,6 +4185,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4200,6 +4210,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, @@ -4250,6 +4261,7 @@ mod tests { parsed: Vec::new(), output: None, source: ExecCommandSource::Agent, + host_id: None, start_time: Some(Instant::now()), duration: None, interaction_input: None, diff --git a/codex-rs/tui/src/pager_overlay.rs b/codex-rs/tui/src/pager_overlay.rs index bca5f1f360a..c39590c083e 100644 --- a/codex-rs/tui/src/pager_overlay.rs +++ b/codex-rs/tui/src/pager_overlay.rs @@ -1012,6 +1012,7 @@ mod tests { vec!["bash".into(), "-lc".into(), "ls".into()], vec![ParsedCommand::Unknown { cmd: "ls".into() }], ExecCommandSource::Agent, + /*host_id*/ None, /*interaction_input*/ None, /*animations_enabled*/ true, );