diff --git a/codex-rs/app-server/tests/common/mcp_process.rs b/codex-rs/app-server/tests/common/mcp_process.rs index bdc96cad0a..29768df81c 100644 --- a/codex-rs/app-server/tests/common/mcp_process.rs +++ b/codex-rs/app-server/tests/common/mcp_process.rs @@ -1,3 +1,4 @@ +use std::collections::VecDeque; use std::path::Path; use std::process::Stdio; use std::sync::atomic::AtomicI64; @@ -47,6 +48,7 @@ pub struct McpProcess { process: Child, stdin: ChildStdin, stdout: BufReader, + pending_user_messages: VecDeque, } impl McpProcess { @@ -117,6 +119,7 @@ impl McpProcess { process, stdin, stdout, + pending_user_messages: VecDeque::new(), }) } @@ -375,8 +378,9 @@ impl McpProcess { let message = self.read_jsonrpc_message().await?; match message { - JSONRPCMessage::Notification(_) => { - eprintln!("notification: {message:?}"); + JSONRPCMessage::Notification(notification) => { + eprintln!("notification: {notification:?}"); + self.enqueue_user_message(notification); } JSONRPCMessage::Request(jsonrpc_request) => { return jsonrpc_request.try_into().with_context( @@ -402,8 +406,9 @@ impl McpProcess { loop { let message = self.read_jsonrpc_message().await?; match message { - JSONRPCMessage::Notification(_) => { - eprintln!("notification: {message:?}"); + JSONRPCMessage::Notification(notification) => { + eprintln!("notification: {notification:?}"); + self.enqueue_user_message(notification); } JSONRPCMessage::Request(_) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); @@ -427,8 +432,9 @@ impl McpProcess { loop { let message = self.read_jsonrpc_message().await?; match message { - JSONRPCMessage::Notification(_) => { - eprintln!("notification: {message:?}"); + JSONRPCMessage::Notification(notification) => { + eprintln!("notification: {notification:?}"); + self.enqueue_user_message(notification); } JSONRPCMessage::Request(_) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); @@ -451,6 +457,10 @@ impl McpProcess { ) -> anyhow::Result { eprintln!("in read_stream_until_notification_message({method})"); + if let Some(notification) = self.take_pending_notification_by_method(method) { + return Ok(notification); + } + loop { let message = self.read_jsonrpc_message().await?; match message { @@ -458,6 +468,7 @@ impl McpProcess { if notification.method == method { return Ok(notification); } + self.enqueue_user_message(notification); } JSONRPCMessage::Request(_) => { anyhow::bail!("unexpected JSONRPCMessage::Request: {message:?}"); @@ -471,4 +482,21 @@ impl McpProcess { } } } + + fn take_pending_notification_by_method(&mut self, method: &str) -> Option { + if let Some(pos) = self + .pending_user_messages + .iter() + .position(|notification| notification.method == method) + { + return self.pending_user_messages.remove(pos); + } + None + } + + fn enqueue_user_message(&mut self, notification: JSONRPCNotification) { + if notification.method == "codex/event/user_message" { + self.pending_user_messages.push_back(notification); + } + } } diff --git a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs index f1f34f952f..4dff2a1575 100644 --- a/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs +++ b/codex-rs/app-server/tests/suite/codex_message_processor_flow.rs @@ -8,6 +8,7 @@ use app_test_support::to_response; use codex_app_server_protocol::AddConversationListenerParams; use codex_app_server_protocol::AddConversationSubscriptionResponse; use codex_app_server_protocol::ExecCommandApprovalParams; +use codex_app_server_protocol::InputItem; use codex_app_server_protocol::JSONRPCNotification; use codex_app_server_protocol::JSONRPCResponse; use codex_app_server_protocol::NewConversationParams; @@ -25,6 +26,10 @@ use codex_core::protocol::SandboxPolicy; use codex_core::protocol_config_types::ReasoningEffort; use codex_core::protocol_config_types::ReasoningSummary; use codex_core::spawn::CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR; +use codex_protocol::config_types::SandboxMode; +use codex_protocol::protocol::Event; +use codex_protocol::protocol::EventMsg; +use codex_protocol::protocol::InputMessageKind; use pretty_assertions::assert_eq; use std::env; use tempfile::TempDir; @@ -367,6 +372,234 @@ async fn test_send_user_turn_changes_approval_policy_behavior() { } // Helper: minimal config.toml pointing at mock provider. + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_send_user_turn_updates_sandbox_and_cwd_between_turns() { + if env::var(CODEX_SANDBOX_NETWORK_DISABLED_ENV_VAR).is_ok() { + println!( + "Skipping test because it cannot execute when network is disabled in a Codex sandbox." + ); + return; + } + + let tmp = TempDir::new().expect("tmp dir"); + let codex_home = tmp.path().join("codex_home"); + std::fs::create_dir(&codex_home).expect("create codex home dir"); + let workspace_root = tmp.path().join("workspace"); + std::fs::create_dir(&workspace_root).expect("create workspace root"); + let first_cwd = workspace_root.join("turn1"); + let second_cwd = workspace_root.join("turn2"); + std::fs::create_dir(&first_cwd).expect("create first cwd"); + std::fs::create_dir(&second_cwd).expect("create second cwd"); + + let responses = vec![ + create_shell_sse_response( + vec![ + "bash".to_string(), + "-lc".to_string(), + "echo first turn".to_string(), + ], + None, + Some(5000), + "call-first", + ) + .expect("create first shell response"), + create_final_assistant_message_sse_response("done first") + .expect("create first final assistant message"), + create_shell_sse_response( + vec![ + "bash".to_string(), + "-lc".to_string(), + "echo second turn".to_string(), + ], + None, + Some(5000), + "call-second", + ) + .expect("create second shell response"), + create_final_assistant_message_sse_response("done second") + .expect("create second final assistant message"), + ]; + let server = create_mock_chat_completions_server(responses).await; + create_config_toml(&codex_home, &server.uri()).expect("write config"); + + let mut mcp = McpProcess::new(&codex_home) + .await + .expect("spawn mcp process"); + timeout(DEFAULT_READ_TIMEOUT, mcp.initialize()) + .await + .expect("init timeout") + .expect("init failed"); + + let new_conv_id = mcp + .send_new_conversation_request(NewConversationParams { + cwd: Some(first_cwd.to_string_lossy().into_owned()), + approval_policy: Some(AskForApproval::Never), + sandbox: Some(SandboxMode::WorkspaceWrite), + ..Default::default() + }) + .await + .expect("send newConversation"); + let new_conv_resp: JSONRPCResponse = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(new_conv_id)), + ) + .await + .expect("newConversation timeout") + .expect("newConversation resp"); + let NewConversationResponse { + conversation_id, + model, + .. + } = to_response::(new_conv_resp) + .expect("deserialize newConversation response"); + + let add_listener_id = mcp + .send_add_conversation_listener_request(AddConversationListenerParams { conversation_id }) + .await + .expect("send addConversationListener"); + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(add_listener_id)), + ) + .await + .expect("addConversationListener timeout") + .expect("addConversationListener resp"); + + let first_turn_id = mcp + .send_send_user_turn_request(SendUserTurnParams { + conversation_id, + items: vec![InputItem::Text { + text: "first turn".to_string(), + }], + cwd: first_cwd.clone(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::WorkspaceWrite { + writable_roots: vec![first_cwd.clone()], + network_access: false, + exclude_tmpdir_env_var: false, + exclude_slash_tmp: false, + }, + model: model.clone(), + effort: Some(ReasoningEffort::Medium), + summary: ReasoningSummary::Auto, + }) + .await + .expect("send first sendUserTurn"); + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(first_turn_id)), + ) + .await + .expect("sendUserTurn 1 timeout") + .expect("sendUserTurn 1 resp"); + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await + .expect("task_complete 1 timeout") + .expect("task_complete 1 notification"); + + let second_turn_id = mcp + .send_send_user_turn_request(SendUserTurnParams { + conversation_id, + items: vec![InputItem::Text { + text: "second turn".to_string(), + }], + cwd: second_cwd.clone(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: model.clone(), + effort: Some(ReasoningEffort::Medium), + summary: ReasoningSummary::Auto, + }) + .await + .expect("send second sendUserTurn"); + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(second_turn_id)), + ) + .await + .expect("sendUserTurn 2 timeout") + .expect("sendUserTurn 2 resp"); + + let mut env_message: Option = None; + let second_cwd_str = second_cwd.to_string_lossy().into_owned(); + for _ in 0..10 { + let notification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/user_message"), + ) + .await + .expect("user_message timeout") + .expect("user_message notification"); + let params = notification + .params + .clone() + .expect("user_message should include params"); + let event: Event = serde_json::from_value(params).expect("deserialize user_message event"); + if let EventMsg::UserMessage(user) = event.msg + && matches!(user.kind, Some(InputMessageKind::EnvironmentContext)) + && user.message.contains(&second_cwd_str) + { + env_message = Some(user.message); + break; + } + } + let env_message = env_message.expect("expected environment context update"); + assert!( + env_message.contains("danger-full-access"), + "env context should reflect new sandbox mode: {env_message}" + ); + assert!( + env_message.contains("enabled"), + "env context should enable network access for danger-full-access policy: {env_message}" + ); + assert!( + env_message.contains(&second_cwd_str), + "env context should include updated cwd: {env_message}" + ); + + let exec_begin_notification = timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/exec_command_begin"), + ) + .await + .expect("exec_command_begin timeout") + .expect("exec_command_begin notification"); + let params = exec_begin_notification + .params + .clone() + .expect("exec_command_begin params"); + let event: Event = serde_json::from_value(params).expect("deserialize exec begin event"); + let exec_begin = match event.msg { + EventMsg::ExecCommandBegin(exec_begin) => exec_begin, + other => panic!("expected ExecCommandBegin event, got {other:?}"), + }; + assert_eq!( + exec_begin.cwd, second_cwd, + "exec turn should run from updated cwd" + ); + assert_eq!( + exec_begin.command, + vec![ + "bash".to_string(), + "-lc".to_string(), + "echo second turn".to_string() + ], + "exec turn should run expected command" + ); + + timeout( + DEFAULT_READ_TIMEOUT, + mcp.read_stream_until_notification_message("codex/event/task_complete"), + ) + .await + .expect("task_complete 2 timeout") + .expect("task_complete 2 notification"); +} + fn create_config_toml(codex_home: &Path, server_uri: &str) -> std::io::Result<()> { let config_toml = codex_home.join("config.toml"); std::fs::write( diff --git a/codex-rs/core/src/apply_patch.rs b/codex-rs/core/src/apply_patch.rs index 836b859633..5b6728ad0f 100644 --- a/codex-rs/core/src/apply_patch.rs +++ b/codex-rs/core/src/apply_patch.rs @@ -27,6 +27,7 @@ pub(crate) enum InternalApplyPatchInvocation { DelegateToExec(ApplyPatchExec), } +#[derive(Debug)] pub(crate) struct ApplyPatchExec { pub(crate) action: ApplyPatchAction, pub(crate) user_explicitly_approved_this_action: bool, @@ -109,3 +110,28 @@ pub(crate) fn convert_apply_patch_to_protocol( } result } + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + use tempfile::tempdir; + + #[test] + fn convert_apply_patch_maps_add_variant() { + let tmp = tempdir().expect("tmp"); + let p = tmp.path().join("a.txt"); + // Create an action with a single Add change + let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); + + let got = convert_apply_patch_to_protocol(&action); + + assert_eq!( + got.get(&p), + Some(&FileChange::Add { + content: "hello".to_string() + }) + ); + } +} diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index 1fbe701884..9f6b02a556 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -1,11 +1,9 @@ use std::borrow::Cow; use std::collections::HashMap; use std::fmt::Debug; -use std::path::Path; use std::path::PathBuf; use std::sync::Arc; use std::sync::atomic::AtomicU64; -use std::time::Duration; use crate::AuthManager; use crate::client_common::REVIEW_PROMPT; @@ -44,7 +42,6 @@ use tracing::warn; use crate::ModelProviderInfo; use crate::apply_patch; use crate::apply_patch::ApplyPatchExec; -use crate::apply_patch::CODEX_APPLY_PATCH_ARG1; use crate::apply_patch::InternalApplyPatchInvocation; use crate::apply_patch::convert_apply_patch_to_protocol; use crate::client::ModelClient; @@ -57,19 +54,21 @@ use crate::environment_context::EnvironmentContext; use crate::error::CodexErr; use crate::error::Result as CodexResult; use crate::error::SandboxErr; -use crate::error::get_error_message_ui; use crate::exec::ExecParams; use crate::exec::ExecToolCallOutput; -use crate::exec::SandboxType; use crate::exec::StdoutStream; +#[cfg(test)] use crate::exec::StreamOutput; -use crate::exec::process_exec_tool_call; use crate::exec_command::EXEC_COMMAND_TOOL_NAME; use crate::exec_command::ExecCommandParams; use crate::exec_command::ExecSessionManager; use crate::exec_command::WRITE_STDIN_TOOL_NAME; use crate::exec_command::WriteStdinParams; use crate::exec_env::create_env; +use crate::executor::ExecutionMode; +use crate::executor::Executor; +use crate::executor::ExecutorConfig; +use crate::executor::normalize_exec_result; use crate::mcp_connection_manager::McpConnectionManager; use crate::mcp_tool_call::handle_mcp_tool_call; use crate::model_family::find_family_for_model; @@ -113,9 +112,6 @@ use crate::protocol::TurnDiffEvent; use crate::protocol::WebSearchBeginEvent; use crate::rollout::RolloutRecorder; use crate::rollout::RolloutRecorderParams; -use crate::safety::SafetyCheck; -use crate::safety::assess_command_safety; -use crate::safety::assess_safety_for_untrusted_command; use crate::shell; use crate::state::ActiveTurn; use crate::state::SessionServices; @@ -128,7 +124,6 @@ use crate::user_instructions::UserInstructions; use crate::user_notification::UserNotification; use crate::util::backoff; use codex_otel::otel_event_manager::OtelEventManager; -use codex_otel::otel_event_manager::ToolDecisionSource; use codex_protocol::config_types::ReasoningEffort as ReasoningEffortConfig; use codex_protocol::config_types::ReasoningSummary as ReasoningSummaryConfig; use codex_protocol::custom_prompts::CustomPrompt; @@ -486,9 +481,13 @@ impl Session { unified_exec_manager: UnifiedExecSessionManager::default(), notifier: notify, rollout: Mutex::new(Some(rollout_recorder)), - codex_linux_sandbox_exe: config.codex_linux_sandbox_exe.clone(), user_shell: default_shell, show_raw_agent_reasoning: config.show_raw_agent_reasoning, + executor: Executor::new(ExecutorConfig::new( + turn_context.sandbox_policy.clone(), + turn_context.cwd.clone(), + config.codex_linux_sandbox_exe.clone(), + )), }; let sess = Arc::new(Session { @@ -573,6 +572,11 @@ impl Session { } } + /// Emit an exec approval request event and await the user's decision. + /// + /// The request is keyed by `sub_id`/`call_id` so matching responses are delivered + /// to the correct in-flight turn. If the task is aborted, this returns the + /// default `ReviewDecision` (`Denied`). pub async fn request_command_approval( &self, sub_id: String, @@ -670,11 +674,6 @@ impl Session { } } - pub async fn add_approved_command(&self, cmd: Vec) { - let mut state = self.state.lock().await; - state.add_approved_command(cmd); - } - /// Records input items: always append to conversation history and /// persist these response items to rollout. async fn record_conversation_items(&self, items: &[ResponseItem]) { @@ -832,6 +831,7 @@ impl Session { command_for_display, cwd, apply_patch, + .. } = exec_command_context; let msg = match apply_patch { Some(ApplyPatchCommandContext { @@ -928,45 +928,29 @@ impl Session { /// command even on error. /// /// Returns the output of the exec tool call. - async fn run_exec_with_events<'a>( + async fn run_exec_with_events( &self, turn_diff_tracker: &mut TurnDiffTracker, - begin_ctx: ExecCommandContext, - exec_args: ExecInvokeArgs<'a>, - ) -> crate::error::Result { - let is_apply_patch = begin_ctx.apply_patch.is_some(); - let sub_id = begin_ctx.sub_id.clone(); - let call_id = begin_ctx.call_id.clone(); - - self.on_exec_command_begin(turn_diff_tracker, begin_ctx.clone()) + prepared: PreparedExec, + approval_policy: AskForApproval, + ) -> Result { + let PreparedExec { context, request } = prepared; + let is_apply_patch = context.apply_patch.is_some(); + let sub_id = context.sub_id.clone(); + let call_id = context.call_id.clone(); + + self.on_exec_command_begin(turn_diff_tracker, context.clone()) .await; - let result = process_exec_tool_call( - exec_args.params, - exec_args.sandbox_type, - exec_args.sandbox_policy, - exec_args.sandbox_cwd, - exec_args.codex_linux_sandbox_exe, - exec_args.stdout_stream, - ) - .await; + let result = self + .services + .executor + .run(request, self, approval_policy, &context) + .await; + + let normalized = normalize_exec_result(&result); + let borrowed = normalized.event_output(); - let output_stderr; - let borrowed: &ExecToolCallOutput = match &result { - Ok(output) => output, - Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => output, - Err(e) => { - output_stderr = ExecToolCallOutput { - exit_code: -1, - stdout: StreamOutput::new(String::new()), - stderr: StreamOutput::new(get_error_message_ui(e)), - aggregated_output: StreamOutput::new(get_error_message_ui(e)), - duration: Duration::default(), - timed_out: false, - }; - &output_stderr - } - }; self.on_exec_command_end( turn_diff_tracker, &sub_id, @@ -976,13 +960,15 @@ impl Session { ) .await; + drop(normalized); + result } /// Helper that emits a BackgroundEvent with the given message. This keeps /// the call‑sites terse so adding more diagnostics does not clutter the /// core agent logic. - async fn notify_background_event(&self, sub_id: &str, message: impl Into) { + pub(crate) async fn notify_background_event(&self, sub_id: &str, message: impl Into) { let event = Event { id: sub_id.to_string(), msg: EventMsg::BackgroundEvent(BackgroundEventEvent { @@ -1070,7 +1056,7 @@ impl Session { &self.services.notifier } - fn user_shell(&self) -> &shell::Shell { + pub(crate) fn user_shell(&self) -> &shell::Shell { &self.services.user_shell } @@ -1092,6 +1078,8 @@ pub(crate) struct ExecCommandContext { pub(crate) command_for_display: Vec, pub(crate) cwd: PathBuf, pub(crate) apply_patch: Option, + pub(crate) tool_name: String, + pub(crate) otel_event_manager: OtelEventManager, } #[derive(Clone, Debug)] @@ -1298,8 +1286,19 @@ async fn submission_loop( let previous_env_context = EnvironmentContext::from(turn_context.as_ref()); let new_env_context = EnvironmentContext::from(&fresh_turn_context); if !new_env_context.equals_except_shell(&previous_env_context) { - sess.record_conversation_items(&[ResponseItem::from(new_env_context)]) + let env_response_item = ResponseItem::from(new_env_context); + sess.record_conversation_items(std::slice::from_ref(&env_response_item)) .await; + for msg in map_response_item_to_event_messages( + &env_response_item, + sess.show_raw_agent_reasoning(), + ) { + let event = Event { + id: sub.id.clone(), + msg, + }; + sess.send_event(event).await; + } } // Install the new persistent context for subsequent tasks/turns. @@ -2610,33 +2609,6 @@ fn parse_container_exec_arguments( }) } -pub struct ExecInvokeArgs<'a> { - pub params: ExecParams, - pub sandbox_type: SandboxType, - pub sandbox_policy: &'a SandboxPolicy, - pub sandbox_cwd: &'a Path, - pub codex_linux_sandbox_exe: &'a Option, - pub stdout_stream: Option, -} - -fn maybe_translate_shell_command( - params: ExecParams, - sess: &Session, - turn_context: &TurnContext, -) -> ExecParams { - let should_translate = matches!(sess.user_shell(), crate::shell::Shell::PowerShell(_)) - || turn_context.shell_environment_policy.use_profile; - - if should_translate - && let Some(command) = sess - .user_shell() - .format_default_shell_invocation(params.command.clone()) - { - return ExecParams { command, ..params }; - } - params -} - async fn handle_container_exec_with_params( tool_name: &str, params: ExecParams, @@ -2682,152 +2654,10 @@ async fn handle_container_exec_with_params( MaybeApplyPatchVerified::NotApplyPatch => None, }; - let (params, safety, command_for_display) = match &apply_patch_exec { - Some(ApplyPatchExec { - action: ApplyPatchAction { patch, cwd, .. }, - user_explicitly_approved_this_action, - }) => { - let path_to_codex = std::env::current_exe() - .ok() - .map(|p| p.to_string_lossy().to_string()); - let Some(path_to_codex) = path_to_codex else { - return Err(FunctionCallError::RespondToModel( - "failed to determine path to codex executable".to_string(), - )); - }; - - let params = ExecParams { - command: vec![ - path_to_codex, - CODEX_APPLY_PATCH_ARG1.to_string(), - patch.clone(), - ], - cwd: cwd.clone(), - timeout_ms: params.timeout_ms, - env: HashMap::new(), - with_escalated_permissions: params.with_escalated_permissions, - justification: params.justification.clone(), - }; - let safety = if *user_explicitly_approved_this_action { - SafetyCheck::AutoApprove { - sandbox_type: SandboxType::None, - user_explicitly_approved: true, - } - } else { - assess_safety_for_untrusted_command( - turn_context.approval_policy, - &turn_context.sandbox_policy, - params.with_escalated_permissions.unwrap_or(false), - ) - }; - ( - params, - safety, - vec!["apply_patch".to_string(), patch.clone()], - ) - } - None => { - let safety = { - let state = sess.state.lock().await; - assess_command_safety( - ¶ms.command, - turn_context.approval_policy, - &turn_context.sandbox_policy, - state.approved_commands_ref(), - params.with_escalated_permissions.unwrap_or(false), - ) - }; - let command_for_display = params.command.clone(); - (params, safety, command_for_display) - } - }; - - let sandbox_type = match safety { - SafetyCheck::AutoApprove { - sandbox_type, - user_explicitly_approved, - } => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - ReviewDecision::Approved, - if user_explicitly_approved { - ToolDecisionSource::User - } else { - ToolDecisionSource::Config - }, - ); - - sandbox_type - } - SafetyCheck::AskUser => { - let decision = sess - .request_command_approval( - sub_id.clone(), - call_id.clone(), - params.command.clone(), - params.cwd.clone(), - params.justification.clone(), - ) - .await; - match decision { - ReviewDecision::Approved => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - ReviewDecision::Approved, - ToolDecisionSource::User, - ); - } - ReviewDecision::ApprovedForSession => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - ReviewDecision::ApprovedForSession, - ToolDecisionSource::User, - ); - sess.add_approved_command(params.command.clone()).await; - } - ReviewDecision::Denied => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - ReviewDecision::Denied, - ToolDecisionSource::User, - ); - return Err(FunctionCallError::RespondToModel( - "exec command rejected by user".to_string(), - )); - } - ReviewDecision::Abort => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - ReviewDecision::Abort, - ToolDecisionSource::User, - ); - return Err(FunctionCallError::RespondToModel( - "exec command aborted by user".to_string(), - )); - } - } - // No sandboxing is applied because the user has given - // explicit approval. Often, we end up in this case because - // the command cannot be run in a sandbox, such as - // installing a new dependency that requires network access. - SandboxType::None - } - SafetyCheck::Reject { reason } => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - ReviewDecision::Denied, - ToolDecisionSource::Config, - ); - return Err(FunctionCallError::RespondToModel(format!( - "exec command rejected: {reason:?}" - ))); - } + let command_for_display = if let Some(exec) = apply_patch_exec.as_ref() { + vec!["apply_patch".to_string(), exec.action.patch.clone()] + } else { + params.command.clone() }; let exec_command_context = ExecCommandContext { @@ -2835,38 +2665,47 @@ async fn handle_container_exec_with_params( call_id: call_id.clone(), command_for_display: command_for_display.clone(), cwd: params.cwd.clone(), - apply_patch: apply_patch_exec.map( + apply_patch: apply_patch_exec.as_ref().map( |ApplyPatchExec { action, user_explicitly_approved_this_action, }| ApplyPatchCommandContext { - user_explicitly_approved_this_action, - changes: convert_apply_patch_to_protocol(&action), + user_explicitly_approved_this_action: *user_explicitly_approved_this_action, + changes: convert_apply_patch_to_protocol(action), }, ), + tool_name: tool_name.to_string(), + otel_event_manager, }; - let params = maybe_translate_shell_command(params, sess, turn_context); + let mode = match apply_patch_exec { + Some(exec) => ExecutionMode::ApplyPatch(exec), + None => ExecutionMode::Shell, + }; + + sess.services.executor.update_environment( + turn_context.sandbox_policy.clone(), + turn_context.cwd.clone(), + ); + + let prepared_exec = PreparedExec::new( + exec_command_context, + params, + command_for_display, + mode, + Some(StdoutStream { + sub_id: sub_id.clone(), + call_id: call_id.clone(), + tx_event: sess.tx_event.clone(), + }), + turn_context.shell_environment_policy.use_profile, + ); + let output_result = sess .run_exec_with_events( turn_diff_tracker, - exec_command_context.clone(), - ExecInvokeArgs { - params: params.clone(), - sandbox_type, - sandbox_policy: &turn_context.sandbox_policy, - sandbox_cwd: &turn_context.cwd, - codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe, - stdout_stream: if exec_command_context.apply_patch.is_some() { - None - } else { - Some(StdoutStream { - sub_id: sub_id.clone(), - call_id: call_id.clone(), - tx_event: sess.tx_event.clone(), - }) - }, - }, + prepared_exec, + turn_context.approval_policy, ) .await; @@ -2880,154 +2719,16 @@ async fn handle_container_exec_with_params( Err(FunctionCallError::RespondToModel(content)) } } - Err(CodexErr::Sandbox(error)) => { - handle_sandbox_error( - tool_name, - turn_diff_tracker, - params, - exec_command_context, - error, - sandbox_type, - sess, - turn_context, - &otel_event_manager, - ) - .await - } - Err(e) => Err(FunctionCallError::RespondToModel(format!( - "execution error: {e:?}" + Err(ExecError::Function(err)) => Err(err), + Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { output }))) => Err( + FunctionCallError::RespondToModel(format_exec_output(&output)), + ), + Err(ExecError::Codex(err)) => Err(FunctionCallError::RespondToModel(format!( + "execution error: {err:?}" ))), } } -#[allow(clippy::too_many_arguments)] -async fn handle_sandbox_error( - tool_name: &str, - turn_diff_tracker: &mut TurnDiffTracker, - params: ExecParams, - exec_command_context: ExecCommandContext, - error: SandboxErr, - sandbox_type: SandboxType, - sess: &Session, - turn_context: &TurnContext, - otel_event_manager: &OtelEventManager, -) -> Result { - let call_id = exec_command_context.call_id.clone(); - let sub_id = exec_command_context.sub_id.clone(); - let cwd = exec_command_context.cwd.clone(); - - if let SandboxErr::Timeout { output } = &error { - let content = format_exec_output(output); - return Err(FunctionCallError::RespondToModel(content)); - } - - // Early out if either the user never wants to be asked for approval, or - // we're letting the model manage escalation requests. Otherwise, continue - match turn_context.approval_policy { - AskForApproval::Never | AskForApproval::OnRequest => { - return Err(FunctionCallError::RespondToModel(format!( - "failed in sandbox {sandbox_type:?} with execution error: {error:?}" - ))); - } - AskForApproval::UnlessTrusted | AskForApproval::OnFailure => (), - } - - // Note that when `error` is `SandboxErr::Denied`, it could be a false - // positive. That is, it may have exited with a non-zero exit code, not - // because the sandbox denied it, but because that is its expected behavior, - // i.e., a grep command that did not match anything. Ideally we would - // include additional metadata on the command to indicate whether non-zero - // exit codes merit a retry. - - // For now, we categorically ask the user to retry without sandbox and - // emit the raw error as a background event. - sess.notify_background_event(&sub_id, format!("Execution failed: {error}")) - .await; - - let decision = sess - .request_command_approval( - sub_id.clone(), - call_id.clone(), - params.command.clone(), - cwd.clone(), - Some("command failed; retry without sandbox?".to_string()), - ) - .await; - - match decision { - ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { - // Persist this command as pre‑approved for the - // remainder of the session so future - // executions skip the sandbox directly. - // TODO(ragona): Isn't this a bug? It always saves the command in an | fork? - sess.add_approved_command(params.command.clone()).await; - // Inform UI we are retrying without sandbox. - sess.notify_background_event(&sub_id, "retrying command without sandbox") - .await; - - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - decision, - ToolDecisionSource::User, - ); - - // This is an escalated retry; the policy will not be - // examined and the sandbox has been set to `None`. - let retry_output_result = sess - .run_exec_with_events( - turn_diff_tracker, - exec_command_context.clone(), - ExecInvokeArgs { - params, - sandbox_type: SandboxType::None, - sandbox_policy: &turn_context.sandbox_policy, - sandbox_cwd: &turn_context.cwd, - codex_linux_sandbox_exe: &sess.services.codex_linux_sandbox_exe, - stdout_stream: if exec_command_context.apply_patch.is_some() { - None - } else { - Some(StdoutStream { - sub_id: sub_id.clone(), - call_id: call_id.clone(), - tx_event: sess.tx_event.clone(), - }) - }, - }, - ) - .await; - - match retry_output_result { - Ok(retry_output) => { - let ExecToolCallOutput { exit_code, .. } = &retry_output; - let content = format_exec_output(&retry_output); - if *exit_code == 0 { - Ok(content) - } else { - Err(FunctionCallError::RespondToModel(content)) - } - } - Err(e) => Err(FunctionCallError::RespondToModel(format!( - "retry failed: {e}" - ))), - } - } - decision @ (ReviewDecision::Denied | ReviewDecision::Abort) => { - otel_event_manager.tool_decision( - tool_name, - call_id.as_str(), - decision, - ToolDecisionSource::User, - ); - - // Fall through to original failure handling. - Err(FunctionCallError::RespondToModel( - "exec command rejected by user".to_string(), - )) - } - } -} - fn format_exec_output_str(exec_output: &ExecToolCallOutput) -> String { let ExecToolCallOutput { aggregated_output, .. @@ -3286,6 +2987,8 @@ pub(crate) async fn exit_review_mode( .await; } +use crate::executor::errors::ExecError; +use crate::executor::linkers::PreparedExec; #[cfg(test)] pub(crate) use tests::make_session_and_context; @@ -3599,9 +3302,13 @@ mod tests { unified_exec_manager: UnifiedExecSessionManager::default(), notifier: UserNotifier::default(), rollout: Mutex::new(None), - codex_linux_sandbox_exe: None, user_shell: shell::Shell::Unknown, show_raw_agent_reasoning: config.show_raw_agent_reasoning, + executor: Executor::new(ExecutorConfig::new( + turn_context.sandbox_policy.clone(), + turn_context.cwd.clone(), + None, + )), }; let session = Session { conversation_id, @@ -3668,9 +3375,13 @@ mod tests { unified_exec_manager: UnifiedExecSessionManager::default(), notifier: UserNotifier::default(), rollout: Mutex::new(None), - codex_linux_sandbox_exe: None, user_shell: shell::Shell::Unknown, show_raw_agent_reasoning: config.show_raw_agent_reasoning, + executor: Executor::new(ExecutorConfig::new( + config.sandbox_policy.clone(), + config.cwd.clone(), + None, + )), }; let session = Arc::new(Session { conversation_id, diff --git a/codex-rs/core/src/executor/backends.rs b/codex-rs/core/src/executor/backends.rs new file mode 100644 index 0000000000..95cdb3cacb --- /dev/null +++ b/codex-rs/core/src/executor/backends.rs @@ -0,0 +1,101 @@ +use std::collections::HashMap; +use std::env; + +use async_trait::async_trait; + +use crate::CODEX_APPLY_PATCH_ARG1; +use crate::apply_patch::ApplyPatchExec; +use crate::exec::ExecParams; +use crate::function_tool::FunctionCallError; + +pub(crate) enum ExecutionMode { + Shell, + ApplyPatch(ApplyPatchExec), +} + +#[async_trait] +/// Backend-specific hooks that prepare and post-process execution requests for a +/// given [`ExecutionMode`]. +pub(crate) trait ExecutionBackend: Send + Sync { + fn prepare( + &self, + params: ExecParams, + // Required for downcasting the apply_patch. + mode: &ExecutionMode, + ) -> Result; + + fn stream_stdout(&self, _mode: &ExecutionMode) -> bool { + true + } +} + +static SHELL_BACKEND: ShellBackend = ShellBackend; +static APPLY_PATCH_BACKEND: ApplyPatchBackend = ApplyPatchBackend; + +pub(crate) fn backend_for_mode(mode: &ExecutionMode) -> &'static dyn ExecutionBackend { + match mode { + ExecutionMode::Shell => &SHELL_BACKEND, + ExecutionMode::ApplyPatch(_) => &APPLY_PATCH_BACKEND, + } +} + +struct ShellBackend; + +#[async_trait] +impl ExecutionBackend for ShellBackend { + fn prepare( + &self, + params: ExecParams, + mode: &ExecutionMode, + ) -> Result { + match mode { + ExecutionMode::Shell => Ok(params), + _ => Err(FunctionCallError::RespondToModel( + "shell backend invoked with non-shell mode".to_string(), + )), + } + } +} + +struct ApplyPatchBackend; + +#[async_trait] +impl ExecutionBackend for ApplyPatchBackend { + fn prepare( + &self, + params: ExecParams, + mode: &ExecutionMode, + ) -> Result { + match mode { + ExecutionMode::ApplyPatch(exec) => { + let path_to_codex = env::current_exe() + .ok() + .map(|p| p.to_string_lossy().to_string()) + .ok_or_else(|| { + FunctionCallError::RespondToModel( + "failed to determine path to codex executable".to_string(), + ) + })?; + + let patch = exec.action.patch.clone(); + Ok(ExecParams { + command: vec![path_to_codex, CODEX_APPLY_PATCH_ARG1.to_string(), patch], + cwd: exec.action.cwd.clone(), + timeout_ms: params.timeout_ms, + // Run apply_patch with a minimal environment for determinism and to + // avoid leaking host environment variables into the patch process. + env: HashMap::new(), + with_escalated_permissions: params.with_escalated_permissions, + justification: params.justification, + }) + } + ExecutionMode::Shell => Err(FunctionCallError::RespondToModel( + "apply_patch backend invoked without patch context".to_string(), + )), + } + } + + fn stream_stdout(&self, _mode: &ExecutionMode) -> bool { + false + } +} diff --git a/codex-rs/core/src/executor/cache.rs b/codex-rs/core/src/executor/cache.rs new file mode 100644 index 0000000000..737ecb927c --- /dev/null +++ b/codex-rs/core/src/executor/cache.rs @@ -0,0 +1,51 @@ +use std::collections::HashSet; +use std::sync::Arc; +use std::sync::Mutex; + +#[derive(Clone, Debug, Default)] +/// Thread-safe store of user approvals so repeated commands can reuse +/// previously granted trust. +pub(crate) struct ApprovalCache { + inner: Arc>>>, +} + +impl ApprovalCache { + pub(crate) fn insert(&self, command: Vec) { + if command.is_empty() { + return; + } + if let Ok(mut guard) = self.inner.lock() { + guard.insert(command); + } + } + + pub(crate) fn snapshot(&self) -> HashSet> { + self.inner.lock().map(|g| g.clone()).unwrap_or_default() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use pretty_assertions::assert_eq; + + #[test] + fn insert_ignores_empty_and_dedupes() { + let cache = ApprovalCache::default(); + + // Empty should be ignored + cache.insert(vec![]); + assert!(cache.snapshot().is_empty()); + + // Insert a command and verify snapshot contains it + let cmd = vec!["foo".to_string(), "bar".to_string()]; + cache.insert(cmd.clone()); + let snap1 = cache.snapshot(); + assert!(snap1.contains(&cmd)); + + // Reinserting should not create duplicates + cache.insert(cmd); + let snap2 = cache.snapshot(); + assert_eq!(snap1, snap2); + } +} diff --git a/codex-rs/core/src/executor/mod.rs b/codex-rs/core/src/executor/mod.rs new file mode 100644 index 0000000000..a5a305c604 --- /dev/null +++ b/codex-rs/core/src/executor/mod.rs @@ -0,0 +1,64 @@ +mod backends; +mod cache; +mod runner; +mod sandbox; + +pub(crate) use backends::ExecutionMode; +pub(crate) use runner::ExecutionRequest; +pub(crate) use runner::Executor; +pub(crate) use runner::ExecutorConfig; +pub(crate) use runner::normalize_exec_result; + +pub(crate) mod linkers { + use crate::codex::ExecCommandContext; + use crate::exec::ExecParams; + use crate::exec::StdoutStream; + use crate::executor::backends::ExecutionMode; + use crate::executor::runner::ExecutionRequest; + + pub struct PreparedExec { + pub(crate) context: ExecCommandContext, + pub(crate) request: ExecutionRequest, + } + + impl PreparedExec { + pub fn new( + context: ExecCommandContext, + params: ExecParams, + approval_command: Vec, + mode: ExecutionMode, + stdout_stream: Option, + use_shell_profile: bool, + ) -> Self { + let request = ExecutionRequest { + params, + approval_command, + mode, + stdout_stream, + use_shell_profile, + }; + + Self { context, request } + } + } +} + +pub mod errors { + use crate::error::CodexErr; + use crate::function_tool::FunctionCallError; + use thiserror::Error; + + #[derive(Debug, Error)] + pub enum ExecError { + #[error(transparent)] + Function(#[from] FunctionCallError), + #[error(transparent)] + Codex(#[from] CodexErr), + } + + impl ExecError { + pub(crate) fn rejection(msg: impl Into) -> Self { + FunctionCallError::RespondToModel(msg.into()).into() + } + } +} diff --git a/codex-rs/core/src/executor/runner.rs b/codex-rs/core/src/executor/runner.rs new file mode 100644 index 0000000000..befa83360d --- /dev/null +++ b/codex-rs/core/src/executor/runner.rs @@ -0,0 +1,387 @@ +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::RwLock; +use std::time::Duration; + +use super::backends::ExecutionMode; +use super::backends::backend_for_mode; +use super::cache::ApprovalCache; +use crate::codex::ExecCommandContext; +use crate::codex::Session; +use crate::error::CodexErr; +use crate::error::SandboxErr; +use crate::error::get_error_message_ui; +use crate::exec::ExecParams; +use crate::exec::ExecToolCallOutput; +use crate::exec::SandboxType; +use crate::exec::StdoutStream; +use crate::exec::StreamOutput; +use crate::exec::process_exec_tool_call; +use crate::executor::errors::ExecError; +use crate::executor::sandbox::select_sandbox; +use crate::function_tool::FunctionCallError; +use crate::protocol::AskForApproval; +use crate::protocol::ReviewDecision; +use crate::protocol::SandboxPolicy; +use crate::shell; +use codex_otel::otel_event_manager::ToolDecisionSource; + +#[derive(Clone, Debug)] +pub(crate) struct ExecutorConfig { + pub(crate) sandbox_policy: SandboxPolicy, + pub(crate) sandbox_cwd: PathBuf, + codex_linux_sandbox_exe: Option, +} + +impl ExecutorConfig { + pub(crate) fn new( + sandbox_policy: SandboxPolicy, + sandbox_cwd: PathBuf, + codex_linux_sandbox_exe: Option, + ) -> Self { + Self { + sandbox_policy, + sandbox_cwd, + codex_linux_sandbox_exe, + } + } +} + +/// Coordinates sandbox selection, backend-specific preparation, and command +/// execution for tool calls requested by the model. +pub(crate) struct Executor { + approval_cache: ApprovalCache, + config: Arc>, +} + +impl Executor { + pub(crate) fn new(config: ExecutorConfig) -> Self { + Self { + approval_cache: ApprovalCache::default(), + config: Arc::new(RwLock::new(config)), + } + } + + /// Updates the sandbox policy and working directory used for future + /// executions without recreating the executor. + pub(crate) fn update_environment(&self, sandbox_policy: SandboxPolicy, sandbox_cwd: PathBuf) { + if let Ok(mut cfg) = self.config.write() { + cfg.sandbox_policy = sandbox_policy; + cfg.sandbox_cwd = sandbox_cwd; + } + } + + /// Runs a prepared execution request end-to-end: prepares parameters, decides on + /// sandbox placement (prompting the user when necessary), launches the command, + /// and lets the backend post-process the final output. + pub(crate) async fn run( + &self, + mut request: ExecutionRequest, + session: &Session, + approval_policy: AskForApproval, + context: &ExecCommandContext, + ) -> Result { + if matches!(request.mode, ExecutionMode::Shell) { + request.params = + maybe_translate_shell_command(request.params, session, request.use_shell_profile); + } + + // Step 1: Normalise parameters via the selected backend. + let backend = backend_for_mode(&request.mode); + let stdout_stream = if backend.stream_stdout(&request.mode) { + request.stdout_stream.clone() + } else { + None + }; + request.params = backend + .prepare(request.params, &request.mode) + .map_err(ExecError::from)?; + + // Step 2: Snapshot sandbox configuration so it stays stable for this run. + let config = self + .config + .read() + .map_err(|_| ExecError::rejection("executor config poisoned"))? + .clone(); + + // Step 3: Decide sandbox placement, prompting for approval when needed. + let sandbox_decision = select_sandbox( + &request, + approval_policy, + self.approval_cache.snapshot(), + &config, + session, + &context.sub_id, + &context.call_id, + &context.otel_event_manager, + ) + .await?; + if sandbox_decision.record_session_approval { + self.approval_cache.insert(request.approval_command.clone()); + } + + // Step 4: Launch the command within the chosen sandbox. + let first_attempt = self + .spawn( + request.params.clone(), + sandbox_decision.initial_sandbox, + &config, + stdout_stream.clone(), + ) + .await; + + // Step 5: Handle sandbox outcomes, optionally escalating to an unsandboxed retry. + match first_attempt { + Ok(output) => Ok(output), + Err(CodexErr::Sandbox(SandboxErr::Timeout { output })) => { + Err(CodexErr::Sandbox(SandboxErr::Timeout { output }).into()) + } + Err(CodexErr::Sandbox(error)) => { + if sandbox_decision.escalate_on_failure { + self.retry_without_sandbox( + &request, + &config, + session, + context, + stdout_stream, + error, + ) + .await + } else { + Err(ExecError::rejection(format!( + "failed in sandbox {:?} with execution error: {error:?}", + sandbox_decision.initial_sandbox + ))) + } + } + Err(err) => Err(err.into()), + } + } + + /// Fallback path invoked when a sandboxed run is denied so the user can + /// approve rerunning without isolation. + async fn retry_without_sandbox( + &self, + request: &ExecutionRequest, + config: &ExecutorConfig, + session: &Session, + context: &ExecCommandContext, + stdout_stream: Option, + sandbox_error: SandboxErr, + ) -> Result { + session + .notify_background_event( + &context.sub_id, + format!("Execution failed: {sandbox_error}"), + ) + .await; + let decision = session + .request_command_approval( + context.sub_id.to_string(), + context.call_id.to_string(), + request.approval_command.clone(), + request.params.cwd.clone(), + Some("command failed; retry without sandbox?".to_string()), + ) + .await; + + context.otel_event_manager.tool_decision( + &context.tool_name, + &context.call_id, + decision, + ToolDecisionSource::User, + ); + match decision { + ReviewDecision::Approved | ReviewDecision::ApprovedForSession => { + if matches!(decision, ReviewDecision::ApprovedForSession) { + self.approval_cache.insert(request.approval_command.clone()); + } + session + .notify_background_event(&context.sub_id, "retrying command without sandbox") + .await; + + let retry_output = self + .spawn( + request.params.clone(), + SandboxType::None, + config, + stdout_stream, + ) + .await?; + + Ok(retry_output) + } + ReviewDecision::Denied | ReviewDecision::Abort => { + Err(ExecError::rejection("exec command rejected by user")) + } + } + } + + async fn spawn( + &self, + params: ExecParams, + sandbox: SandboxType, + config: &ExecutorConfig, + stdout_stream: Option, + ) -> Result { + process_exec_tool_call( + params, + sandbox, + &config.sandbox_policy, + &config.sandbox_cwd, + &config.codex_linux_sandbox_exe, + stdout_stream, + ) + .await + } +} + +fn maybe_translate_shell_command( + params: ExecParams, + session: &Session, + use_shell_profile: bool, +) -> ExecParams { + let should_translate = + matches!(session.user_shell(), shell::Shell::PowerShell(_)) || use_shell_profile; + + if should_translate + && let Some(command) = session + .user_shell() + .format_default_shell_invocation(params.command.clone()) + { + return ExecParams { command, ..params }; + } + + params +} + +pub(crate) struct ExecutionRequest { + pub params: ExecParams, + pub approval_command: Vec, + pub mode: ExecutionMode, + pub stdout_stream: Option, + pub use_shell_profile: bool, +} + +pub(crate) struct NormalizedExecOutput<'a> { + borrowed: Option<&'a ExecToolCallOutput>, + synthetic: Option, +} + +impl<'a> NormalizedExecOutput<'a> { + pub(crate) fn event_output(&'a self) -> &'a ExecToolCallOutput { + match (self.borrowed, self.synthetic.as_ref()) { + (Some(output), _) => output, + (None, Some(output)) => output, + (None, None) => unreachable!("normalized exec output missing data"), + } + } +} + +/// Converts a raw execution result into a uniform view that always exposes an +/// [`ExecToolCallOutput`], synthesizing error output when the command fails +/// before producing a response. +pub(crate) fn normalize_exec_result( + result: &Result, +) -> NormalizedExecOutput<'_> { + match result { + Ok(output) => NormalizedExecOutput { + borrowed: Some(output), + synthetic: None, + }, + Err(ExecError::Codex(CodexErr::Sandbox(SandboxErr::Timeout { output }))) => { + NormalizedExecOutput { + borrowed: Some(output.as_ref()), + synthetic: None, + } + } + Err(err) => { + let message = match err { + ExecError::Function(FunctionCallError::RespondToModel(msg)) => msg.clone(), + ExecError::Codex(e) => get_error_message_ui(e), + }; + let synthetic = ExecToolCallOutput { + exit_code: -1, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(message.clone()), + aggregated_output: StreamOutput::new(message), + duration: Duration::default(), + timed_out: false, + }; + NormalizedExecOutput { + borrowed: None, + synthetic: Some(synthetic), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::error::CodexErr; + use crate::error::EnvVarError; + use crate::error::SandboxErr; + use crate::exec::StreamOutput; + use pretty_assertions::assert_eq; + + fn make_output(text: &str) -> ExecToolCallOutput { + ExecToolCallOutput { + exit_code: 1, + stdout: StreamOutput::new(String::new()), + stderr: StreamOutput::new(String::new()), + aggregated_output: StreamOutput::new(text.to_string()), + duration: Duration::from_millis(123), + timed_out: false, + } + } + + #[test] + fn normalize_success_borrows() { + let out = make_output("ok"); + let result: Result = Ok(out); + let normalized = normalize_exec_result(&result); + assert_eq!(normalized.event_output().aggregated_output.text, "ok"); + } + + #[test] + fn normalize_timeout_borrows_embedded_output() { + let out = make_output("timed out payload"); + let err = CodexErr::Sandbox(SandboxErr::Timeout { + output: Box::new(out), + }); + let result: Result = Err(ExecError::Codex(err)); + let normalized = normalize_exec_result(&result); + assert_eq!( + normalized.event_output().aggregated_output.text, + "timed out payload" + ); + } + + #[test] + fn normalize_function_error_synthesizes_payload() { + let err = FunctionCallError::RespondToModel("boom".to_string()); + let result: Result = Err(ExecError::Function(err)); + let normalized = normalize_exec_result(&result); + assert_eq!(normalized.event_output().aggregated_output.text, "boom"); + } + + #[test] + fn normalize_codex_error_synthesizes_user_message() { + // Use a simple EnvVar error which formats to a clear message + let e = CodexErr::EnvVar(EnvVarError { + var: "FOO".to_string(), + instructions: Some("set it".to_string()), + }); + let result: Result = Err(ExecError::Codex(e)); + let normalized = normalize_exec_result(&result); + assert!( + normalized + .event_output() + .aggregated_output + .text + .contains("Missing environment variable: `FOO`"), + "expected synthesized user-friendly message" + ); + } +} diff --git a/codex-rs/core/src/executor/sandbox.rs b/codex-rs/core/src/executor/sandbox.rs new file mode 100644 index 0000000000..5c01ff69b4 --- /dev/null +++ b/codex-rs/core/src/executor/sandbox.rs @@ -0,0 +1,405 @@ +use crate::apply_patch::ApplyPatchExec; +use crate::codex::Session; +use crate::exec::SandboxType; +use crate::executor::ExecutionMode; +use crate::executor::ExecutionRequest; +use crate::executor::ExecutorConfig; +use crate::executor::errors::ExecError; +use crate::safety::SafetyCheck; +use crate::safety::assess_command_safety; +use crate::safety::assess_patch_safety; +use codex_otel::otel_event_manager::OtelEventManager; +use codex_otel::otel_event_manager::ToolDecisionSource; +use codex_protocol::protocol::AskForApproval; +use codex_protocol::protocol::ReviewDecision; +use std::collections::HashSet; + +/// Sandbox placement options selected for an execution run, including whether +/// to escalate after failures and whether approvals should persist. +pub(crate) struct SandboxDecision { + pub(crate) initial_sandbox: SandboxType, + pub(crate) escalate_on_failure: bool, + pub(crate) record_session_approval: bool, +} + +impl SandboxDecision { + fn auto(sandbox: SandboxType, escalate_on_failure: bool) -> Self { + Self { + initial_sandbox: sandbox, + escalate_on_failure, + record_session_approval: false, + } + } + + fn user_override(record_session_approval: bool) -> Self { + Self { + initial_sandbox: SandboxType::None, + escalate_on_failure: false, + record_session_approval, + } + } +} + +fn should_escalate_on_failure(approval: AskForApproval, sandbox: SandboxType) -> bool { + matches!( + (approval, sandbox), + ( + AskForApproval::UnlessTrusted | AskForApproval::OnFailure, + SandboxType::MacosSeatbelt | SandboxType::LinuxSeccomp + ) + ) +} + +/// Determines how a command should be sandboxed, prompting the user when +/// policy requires explicit approval. +#[allow(clippy::too_many_arguments)] +pub async fn select_sandbox( + request: &ExecutionRequest, + approval_policy: AskForApproval, + approval_cache: HashSet>, + config: &ExecutorConfig, + session: &Session, + sub_id: &str, + call_id: &str, + otel_event_manager: &OtelEventManager, +) -> Result { + match &request.mode { + ExecutionMode::Shell => { + select_shell_sandbox( + request, + approval_policy, + approval_cache, + config, + session, + sub_id, + call_id, + otel_event_manager, + ) + .await + } + ExecutionMode::ApplyPatch(exec) => { + select_apply_patch_sandbox(exec, approval_policy, config) + } + } +} + +#[allow(clippy::too_many_arguments)] +async fn select_shell_sandbox( + request: &ExecutionRequest, + approval_policy: AskForApproval, + approved_snapshot: HashSet>, + config: &ExecutorConfig, + session: &Session, + sub_id: &str, + call_id: &str, + otel_event_manager: &OtelEventManager, +) -> Result { + let command_for_safety = if request.approval_command.is_empty() { + request.params.command.clone() + } else { + request.approval_command.clone() + }; + + let safety = assess_command_safety( + &command_for_safety, + approval_policy, + &config.sandbox_policy, + &approved_snapshot, + request.params.with_escalated_permissions.unwrap_or(false), + ); + + match safety { + SafetyCheck::AutoApprove { + sandbox_type, + user_explicitly_approved, + } => { + let mut decision = SandboxDecision::auto( + sandbox_type, + should_escalate_on_failure(approval_policy, sandbox_type), + ); + if user_explicitly_approved { + decision.record_session_approval = true; + } + let (decision_for_event, source) = if user_explicitly_approved { + (ReviewDecision::ApprovedForSession, ToolDecisionSource::User) + } else { + (ReviewDecision::Approved, ToolDecisionSource::Config) + }; + otel_event_manager.tool_decision("local_shell", call_id, decision_for_event, source); + Ok(decision) + } + SafetyCheck::AskUser => { + let decision = session + .request_command_approval( + sub_id.to_string(), + call_id.to_string(), + request.approval_command.clone(), + request.params.cwd.clone(), + request.params.justification.clone(), + ) + .await; + + otel_event_manager.tool_decision( + "local_shell", + call_id, + decision, + ToolDecisionSource::User, + ); + match decision { + ReviewDecision::Approved => Ok(SandboxDecision::user_override(false)), + ReviewDecision::ApprovedForSession => Ok(SandboxDecision::user_override(true)), + ReviewDecision::Denied | ReviewDecision::Abort => { + Err(ExecError::rejection("exec command rejected by user")) + } + } + } + SafetyCheck::Reject { reason } => Err(ExecError::rejection(format!( + "exec command rejected: {reason}" + ))), + } +} + +fn select_apply_patch_sandbox( + exec: &ApplyPatchExec, + approval_policy: AskForApproval, + config: &ExecutorConfig, +) -> Result { + if exec.user_explicitly_approved_this_action { + return Ok(SandboxDecision::user_override(false)); + } + + match assess_patch_safety( + &exec.action, + approval_policy, + &config.sandbox_policy, + &config.sandbox_cwd, + ) { + SafetyCheck::AutoApprove { sandbox_type, .. } => Ok(SandboxDecision::auto( + sandbox_type, + should_escalate_on_failure(approval_policy, sandbox_type), + )), + SafetyCheck::AskUser => Err(ExecError::rejection( + "patch requires approval but none was recorded", + )), + SafetyCheck::Reject { reason } => { + Err(ExecError::rejection(format!("patch rejected: {reason}"))) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::codex::make_session_and_context; + use crate::exec::ExecParams; + use crate::function_tool::FunctionCallError; + use crate::protocol::SandboxPolicy; + use codex_apply_patch::ApplyPatchAction; + use pretty_assertions::assert_eq; + + #[tokio::test] + async fn select_apply_patch_user_override_when_explicit() { + let (session, ctx) = make_session_and_context(); + let tmp = tempfile::tempdir().expect("tmp"); + let p = tmp.path().join("a.txt"); + let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); + let exec = ApplyPatchExec { + action, + user_explicitly_approved_this_action: true, + }; + let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); + let request = ExecutionRequest { + params: ExecParams { + command: vec!["apply_patch".into()], + cwd: std::env::temp_dir(), + timeout_ms: None, + env: std::collections::HashMap::new(), + with_escalated_permissions: None, + justification: None, + }, + approval_command: vec!["apply_patch".into()], + mode: ExecutionMode::ApplyPatch(exec), + stdout_stream: None, + use_shell_profile: false, + }; + let otel_event_manager = ctx.client.get_otel_event_manager(); + let decision = select_sandbox( + &request, + AskForApproval::OnRequest, + Default::default(), + &cfg, + &session, + "sub", + "call", + &otel_event_manager, + ) + .await + .expect("ok"); + // Explicit user override runs without sandbox + assert_eq!(decision.initial_sandbox, SandboxType::None); + assert_eq!(decision.escalate_on_failure, false); + } + + #[tokio::test] + async fn select_apply_patch_autoapprove_in_danger() { + let (session, ctx) = make_session_and_context(); + let tmp = tempfile::tempdir().expect("tmp"); + let p = tmp.path().join("a.txt"); + let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); + let exec = ApplyPatchExec { + action, + user_explicitly_approved_this_action: false, + }; + let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None); + let request = ExecutionRequest { + params: ExecParams { + command: vec!["apply_patch".into()], + cwd: std::env::temp_dir(), + timeout_ms: None, + env: std::collections::HashMap::new(), + with_escalated_permissions: None, + justification: None, + }, + approval_command: vec!["apply_patch".into()], + mode: ExecutionMode::ApplyPatch(exec), + stdout_stream: None, + use_shell_profile: false, + }; + let otel_event_manager = ctx.client.get_otel_event_manager(); + let decision = select_sandbox( + &request, + AskForApproval::OnRequest, + Default::default(), + &cfg, + &session, + "sub", + "call", + &otel_event_manager, + ) + .await + .expect("ok"); + // On platforms with a sandbox, DangerFullAccess still prefers it + let expected = crate::safety::get_platform_sandbox().unwrap_or(SandboxType::None); + assert_eq!(decision.initial_sandbox, expected); + assert_eq!(decision.escalate_on_failure, false); + } + + #[tokio::test] + async fn select_apply_patch_requires_approval_on_unless_trusted() { + let (session, ctx) = make_session_and_context(); + let tempdir = tempfile::tempdir().expect("tmpdir"); + let p = tempdir.path().join("a.txt"); + let action = ApplyPatchAction::new_add_for_test(&p, "hello".to_string()); + let exec = ApplyPatchExec { + action, + user_explicitly_approved_this_action: false, + }; + let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); + let request = ExecutionRequest { + params: ExecParams { + command: vec!["apply_patch".into()], + cwd: std::env::temp_dir(), + timeout_ms: None, + env: std::collections::HashMap::new(), + with_escalated_permissions: None, + justification: None, + }, + approval_command: vec!["apply_patch".into()], + mode: ExecutionMode::ApplyPatch(exec), + stdout_stream: None, + use_shell_profile: false, + }; + let otel_event_manager = ctx.client.get_otel_event_manager(); + let result = select_sandbox( + &request, + AskForApproval::UnlessTrusted, + Default::default(), + &cfg, + &session, + "sub", + "call", + &otel_event_manager, + ) + .await; + match result { + Ok(_) => panic!("expected error"), + Err(ExecError::Function(FunctionCallError::RespondToModel(msg))) => { + assert!(msg.contains("requires approval")) + } + Err(other) => panic!("unexpected error: {other:?}"), + } + } + + #[tokio::test] + async fn select_shell_autoapprove_in_danger_mode() { + let (session, ctx) = make_session_and_context(); + let cfg = ExecutorConfig::new(SandboxPolicy::DangerFullAccess, std::env::temp_dir(), None); + let request = ExecutionRequest { + params: ExecParams { + command: vec!["some-unknown".into()], + cwd: std::env::temp_dir(), + timeout_ms: None, + env: std::collections::HashMap::new(), + with_escalated_permissions: None, + justification: None, + }, + approval_command: vec!["some-unknown".into()], + mode: ExecutionMode::Shell, + stdout_stream: None, + use_shell_profile: false, + }; + let otel_event_manager = ctx.client.get_otel_event_manager(); + let decision = select_sandbox( + &request, + AskForApproval::OnRequest, + Default::default(), + &cfg, + &session, + "sub", + "call", + &otel_event_manager, + ) + .await + .expect("ok"); + assert_eq!(decision.initial_sandbox, SandboxType::None); + assert_eq!(decision.escalate_on_failure, false); + } + + #[cfg(any(target_os = "macos", target_os = "linux"))] + #[tokio::test] + async fn select_shell_escalates_on_failure_with_platform_sandbox() { + let (session, ctx) = make_session_and_context(); + let cfg = ExecutorConfig::new(SandboxPolicy::ReadOnly, std::env::temp_dir(), None); + let request = ExecutionRequest { + params: ExecParams { + // Unknown command => untrusted but not flagged dangerous + command: vec!["some-unknown".into()], + cwd: std::env::temp_dir(), + timeout_ms: None, + env: std::collections::HashMap::new(), + with_escalated_permissions: None, + justification: None, + }, + approval_command: vec!["some-unknown".into()], + mode: ExecutionMode::Shell, + stdout_stream: None, + use_shell_profile: false, + }; + let otel_event_manager = ctx.client.get_otel_event_manager(); + let decision = select_sandbox( + &request, + AskForApproval::OnFailure, + Default::default(), + &cfg, + &session, + "sub", + "call", + &otel_event_manager, + ) + .await + .expect("ok"); + // On macOS/Linux we should have a platform sandbox and escalate on failure + assert_ne!(decision.initial_sandbox, SandboxType::None); + assert_eq!(decision.escalate_on_failure, true); + } +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index 7d087a0da2..4c7dfdcb3c 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -27,6 +27,7 @@ pub mod error; pub mod exec; mod exec_command; pub mod exec_env; +pub mod executor; mod flags; pub mod git_info; pub mod landlock; diff --git a/codex-rs/core/src/safety.rs b/codex-rs/core/src/safety.rs index b976ae4a4c..0ed0f929ff 100644 --- a/codex-rs/core/src/safety.rs +++ b/codex-rs/core/src/safety.rs @@ -125,9 +125,10 @@ pub fn assess_command_safety( // the session _because_ they know it needs to run outside a sandbox. if is_known_safe_command(command) || approved.contains(command) { + let user_explicitly_approved = approved.contains(command); return SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, - user_explicitly_approved: false, + user_explicitly_approved, }; } @@ -380,7 +381,7 @@ mod tests { safety_check, SafetyCheck::AutoApprove { sandbox_type: SandboxType::None, - user_explicitly_approved: false, + user_explicitly_approved: true, } ); } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index a67b9dda93..994352eddf 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -1,9 +1,9 @@ use crate::RolloutRecorder; use crate::exec_command::ExecSessionManager; +use crate::executor::Executor; use crate::mcp_connection_manager::McpConnectionManager; use crate::unified_exec::UnifiedExecSessionManager; use crate::user_notification::UserNotifier; -use std::path::PathBuf; use tokio::sync::Mutex; pub(crate) struct SessionServices { @@ -12,7 +12,7 @@ pub(crate) struct SessionServices { pub(crate) unified_exec_manager: UnifiedExecSessionManager, pub(crate) notifier: UserNotifier, pub(crate) rollout: Mutex>, - pub(crate) codex_linux_sandbox_exe: Option, pub(crate) user_shell: crate::shell::Shell, pub(crate) show_raw_agent_reasoning: bool, + pub(crate) executor: Executor, } diff --git a/codex-rs/core/src/state/session.rs b/codex-rs/core/src/state/session.rs index ee0c5fc976..f170a10c18 100644 --- a/codex-rs/core/src/state/session.rs +++ b/codex-rs/core/src/state/session.rs @@ -1,7 +1,5 @@ //! Session-wide mutable state. -use std::collections::HashSet; - use codex_protocol::models::ResponseItem; use crate::conversation_history::ConversationHistory; @@ -12,7 +10,6 @@ use crate::protocol::TokenUsageInfo; /// Persistent, session-scoped state previously stored directly on `Session`. #[derive(Default)] pub(crate) struct SessionState { - pub(crate) approved_commands: HashSet>, pub(crate) history: ConversationHistory, pub(crate) token_info: Option, pub(crate) latest_rate_limits: Option, @@ -44,15 +41,6 @@ impl SessionState { self.history.replace(items); } - // Approved command helpers - pub(crate) fn add_approved_command(&mut self, cmd: Vec) { - self.approved_commands.insert(cmd); - } - - pub(crate) fn approved_commands_ref(&self) -> &HashSet> { - &self.approved_commands - } - // Token/rate limit helpers pub(crate) fn update_token_info_from_usage( &mut self, diff --git a/codex-rs/core/tests/suite/seatbelt.rs b/codex-rs/core/tests/suite/seatbelt.rs index 78f599d42e..a879d3e952 100644 --- a/codex-rs/core/tests/suite/seatbelt.rs +++ b/codex-rs/core/tests/suite/seatbelt.rs @@ -169,6 +169,12 @@ async fn python_getpwuid_works_under_seatbelt() { return; } + // For local dev. + if which::which("python3").is_err() { + eprintln!("python3 not found in PATH, skipping test."); + return; + } + // ReadOnly is sufficient here since we are only exercising user lookup. let policy = SandboxPolicy::ReadOnly; let command_cwd = std::env::current_dir().expect("getcwd");