Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions codex-rs/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

37 changes: 35 additions & 2 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,7 @@ use codex_login::default_client::set_default_client_residency_requirement;
use codex_login::login_with_api_key;
use codex_login::request_device_code;
use codex_login::run_login_server;
use codex_mcp::McpRuntimeEnvironment;
use codex_mcp::McpServerStatusSnapshot;
use codex_mcp::McpSnapshotDetail;
use codex_mcp::collect_mcp_server_status_snapshot_with_detail;
Expand Down Expand Up @@ -5648,10 +5649,40 @@ impl CodexMessageProcessor {
.to_mcp_config(self.thread_manager.plugins_manager().as_ref())
.await;
let auth = self.auth_manager.auth().await;
let runtime_environment = match self.thread_manager.environment_manager().current().await {
Ok(Some(environment)) => {
// Status listing has no turn cwd. This fallback is used only
// by executor-backed stdio MCPs whose config omits `cwd`.
McpRuntimeEnvironment::new(environment, config.cwd.to_path_buf())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, do we always start mcps in cwd?

}
Ok(None) => McpRuntimeEnvironment::new(
Arc::new(codex_exec_server::Environment::default()),
config.cwd.to_path_buf(),
),
Err(err) => {
// TODO(aibrahim): Investigate degrading MCP status listing when
// executor environment creation fails.
let error = JSONRPCErrorError {
code: INTERNAL_ERROR_CODE,
message: format!("failed to create environment: {err}"),
data: None,
};
self.outgoing.send_error(request, error).await;
return;
Comment on lines +5662 to +5671
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Degrade MCP status listing when env creation fails

list_mcp_server_status now returns a top-level internal error when environment_manager.current() fails. That aborts status listing for all servers, including local/HTTP servers that do not need executor-backed stdio, turning a recoverable environment issue into complete API failure.

Useful? React with 👍 / 👎.

}
};

tokio::spawn(async move {
Self::list_mcp_server_status_task(outgoing, request, params, config, mcp_config, auth)
.await;
Self::list_mcp_server_status_task(
outgoing,
request,
params,
config,
mcp_config,
auth,
runtime_environment,
)
.await;
});
}

Expand All @@ -5662,6 +5693,7 @@ impl CodexMessageProcessor {
config: Config,
mcp_config: codex_mcp::McpConfig,
auth: Option<CodexAuth>,
runtime_environment: McpRuntimeEnvironment,
) {
let detail = match params.detail.unwrap_or(McpServerStatusDetail::Full) {
McpServerStatusDetail::Full => McpSnapshotDetail::Full,
Expand All @@ -5672,6 +5704,7 @@ impl CodexMessageProcessor {
&mcp_config,
auth.as_ref(),
request_id.request_id.to_string(),
runtime_environment,
detail,
)
.await;
Expand Down
2 changes: 1 addition & 1 deletion codex-rs/cli/tests/mcp_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn list_and_get_render_expected_output() -> Result<()> {
.expect("docs server should exist after add");
match &mut docs_entry.transport {
McpServerTransportConfig::Stdio { env_vars, .. } => {
*env_vars = vec!["APP_TOKEN".to_string(), "WORKSPACE_ID".to_string()];
*env_vars = vec!["APP_TOKEN".into(), "WORKSPACE_ID".into()];
}
other => panic!("unexpected transport: {other:?}"),
}
Expand Down
1 change: 1 addition & 0 deletions codex-rs/codex-mcp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ anyhow = { workspace = true }
async-channel = { workspace = true }
codex-async-utils = { workspace = true }
codex-config = { workspace = true }
codex-exec-server = { workspace = true }
codex-login = { workspace = true }
codex-otel = { workspace = true }
codex-plugin = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions codex-rs/codex-mcp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub use mcp_connection_manager::CodexAppsToolsCacheKey;
pub use mcp_connection_manager::DEFAULT_STARTUP_TIMEOUT;
pub use mcp_connection_manager::MCP_SANDBOX_STATE_META_CAPABILITY;
pub use mcp_connection_manager::McpConnectionManager;
pub use mcp_connection_manager::McpRuntimeEnvironment;
pub use mcp_connection_manager::SandboxState;
pub use mcp_connection_manager::ToolInfo;
pub use mcp_connection_manager::codex_apps_tools_cache_key;
Expand Down
26 changes: 23 additions & 3 deletions codex-rs/codex-mcp/src/mcp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use codex_protocol::protocol::SandboxPolicy;
use serde_json::Value;

use crate::mcp_connection_manager::McpConnectionManager;
use crate::mcp_connection_manager::McpRuntimeEnvironment;
use crate::mcp_connection_manager::codex_apps_tools_cache_key;
pub type McpManager = McpConnectionManager;

Expand Down Expand Up @@ -321,14 +322,23 @@ pub async fn collect_mcp_snapshot(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
) -> McpListToolsResponseEvent {
collect_mcp_snapshot_with_detail(config, auth, submit_id, McpSnapshotDetail::Full).await
collect_mcp_snapshot_with_detail(
config,
auth,
submit_id,
runtime_environment,
McpSnapshotDetail::Full,
)
.await
}

pub async fn collect_mcp_snapshot_with_detail(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
detail: McpSnapshotDetail,
) -> McpListToolsResponseEvent {
let mcp_servers = effective_mcp_servers(config, auth);
Expand Down Expand Up @@ -356,6 +366,7 @@ pub async fn collect_mcp_snapshot_with_detail(
submit_id,
tx_event,
SandboxPolicy::new_read_only_policy(),
runtime_environment,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
Expand Down Expand Up @@ -386,15 +397,23 @@ pub async fn collect_mcp_server_status_snapshot(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
) -> McpServerStatusSnapshot {
collect_mcp_server_status_snapshot_with_detail(config, auth, submit_id, McpSnapshotDetail::Full)
.await
collect_mcp_server_status_snapshot_with_detail(
config,
auth,
submit_id,
runtime_environment,
McpSnapshotDetail::Full,
)
.await
}

pub async fn collect_mcp_server_status_snapshot_with_detail(
config: &McpConfig,
auth: Option<&CodexAuth>,
submit_id: String,
runtime_environment: McpRuntimeEnvironment,
detail: McpSnapshotDetail,
) -> McpServerStatusSnapshot {
let mcp_servers = effective_mcp_servers(config, auth);
Expand Down Expand Up @@ -422,6 +441,7 @@ pub async fn collect_mcp_server_status_snapshot_with_detail(
submit_id,
tx_event,
SandboxPolicy::new_read_only_policy(),
runtime_environment,
config.codex_home.clone(),
codex_apps_tools_cache_key(auth),
tool_plugin_provenance,
Expand Down
101 changes: 97 additions & 4 deletions codex-rs/codex-mcp/src/mcp_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use codex_async_utils::CancelErr;
use codex_async_utils::OrCancelExt;
use codex_config::Constrained;
use codex_config::types::OAuthCredentialsStoreMode;
use codex_exec_server::Environment;
use codex_protocol::ToolName;
use codex_protocol::approvals::ElicitationRequest;
use codex_protocol::approvals::ElicitationRequestEvent;
Expand All @@ -50,6 +51,7 @@ use codex_protocol::protocol::McpStartupStatus;
use codex_protocol::protocol::McpStartupUpdateEvent;
use codex_protocol::protocol::SandboxPolicy;
use codex_rmcp_client::ElicitationResponse;
use codex_rmcp_client::ExecutorStdioServerLauncher;
use codex_rmcp_client::LocalStdioServerLauncher;
use codex_rmcp_client::RmcpClient;
use codex_rmcp_client::SendElicitation;
Expand Down Expand Up @@ -493,6 +495,7 @@ impl AsyncManagedClient {
elicitation_requests: ElicitationRequestManager,
codex_apps_tools_cache_context: Option<CodexAppsToolsCacheContext>,
tool_plugin_provenance: Arc<ToolPluginProvenance>,
runtime_environment: McpRuntimeEnvironment,
) -> Self {
let tool_filter = ToolFilter::from_config(&config);
let startup_snapshot = load_startup_cached_codex_apps_tools_snapshot(
Expand All @@ -509,8 +512,15 @@ impl AsyncManagedClient {
return Err(error.into());
}

let client =
Arc::new(make_rmcp_client(&server_name, config.transport, store_mode).await?);
let client = Arc::new(
make_rmcp_client(
&server_name,
config.clone(),
store_mode,
runtime_environment,
)
.await?,
);
match start_server_task(
server_name,
client,
Expand Down Expand Up @@ -650,6 +660,37 @@ pub struct McpConnectionManager {
elicitation_requests: ElicitationRequestManager,
}

/// Runtime placement information used when starting MCP server transports.
///
/// `McpConfig` describes what servers exist. This value describes where those
/// servers should run for the current caller. Keep it explicit at manager
/// construction time so status/snapshot paths and real sessions make the same
/// local-vs-remote decision. `fallback_cwd` is not a per-server override; it is
/// used only when an executor-backed stdio server omits `cwd` and the executor
/// API still needs a concrete process working directory.
#[derive(Clone)]
pub struct McpRuntimeEnvironment {
environment: Arc<Environment>,
fallback_cwd: PathBuf,
}

impl McpRuntimeEnvironment {
pub fn new(environment: Arc<Environment>, fallback_cwd: PathBuf) -> Self {
Self {
environment,
fallback_cwd,
}
}

fn environment(&self) -> Arc<Environment> {
Arc::clone(&self.environment)
}

fn fallback_cwd(&self) -> PathBuf {
self.fallback_cwd.clone()
}
}

impl McpConnectionManager {
pub fn configured_servers(&self, config: &McpConfig) -> HashMap<String, McpServerConfig> {
configured_mcp_servers(config)
Expand Down Expand Up @@ -710,6 +751,7 @@ impl McpConnectionManager {
submit_id: String,
tx_event: Sender<Event>,
initial_sandbox_policy: SandboxPolicy,
runtime_environment: McpRuntimeEnvironment,
codex_home: PathBuf,
codex_apps_tools_cache_key: CodexAppsToolsCacheKey,
tool_plugin_provenance: ToolPluginProvenance,
Expand Down Expand Up @@ -754,6 +796,7 @@ impl McpConnectionManager {
elicitation_requests.clone(),
codex_apps_tools_cache_context,
Arc::clone(&tool_plugin_provenance),
runtime_environment.clone(),
);
clients.insert(server_name.clone(), async_managed_client.clone());
let tx_event = tx_event.clone();
Expand Down Expand Up @@ -1484,9 +1527,25 @@ struct StartServerTaskParams {

async fn make_rmcp_client(
server_name: &str,
transport: McpServerTransportConfig,
config: McpServerConfig,
store_mode: OAuthCredentialsStoreMode,
runtime_environment: McpRuntimeEnvironment,
) -> Result<RmcpClient, StartupOutcomeError> {
let McpServerConfig {
transport,
experimental_environment,
..
} = config;
let remote_environment = match experimental_environment.as_deref() {
None | Some("local") => false,
Some("remote") => true,
Some(environment) => {
return Err(StartupOutcomeError::from(anyhow!(
"unsupported experimental_environment `{environment}` for MCP server `{server_name}`"
)));
}
};

match transport {
McpServerTransportConfig::Stdio {
command,
Expand All @@ -1502,7 +1561,24 @@ async fn make_rmcp_client(
.map(|(key, value)| (key.into(), value.into()))
.collect::<HashMap<_, _>>()
});
let launcher = Arc::new(LocalStdioServerLauncher) as Arc<dyn StdioServerLauncher>;
let launcher = if remote_environment {
let exec_environment = runtime_environment.environment();
if !exec_environment.is_remote() {
return Err(StartupOutcomeError::from(anyhow!(
"remote MCP server `{server_name}` requires a remote executor environment"
)));
}
Arc::new(ExecutorStdioServerLauncher::new(
exec_environment.get_exec_backend(),
runtime_environment.fallback_cwd(),
))
} else {
Arc::new(LocalStdioServerLauncher) as Arc<dyn StdioServerLauncher>
};

// `RmcpClient` always sees a launched MCP stdio server. The
// launcher hides whether that means a local child process or an
// executor process whose stdin/stdout bytes cross the process API.
RmcpClient::new_stdio_client(command_os, args_os, env_os, &env_vars, cwd, launcher)
.await
.map_err(|err| StartupOutcomeError::from(anyhow!(err)))
Expand All @@ -1513,6 +1589,23 @@ async fn make_rmcp_client(
env_http_headers,
bearer_token_env_var,
} => {
if remote_environment {
if !runtime_environment.environment().is_remote() {
return Err(StartupOutcomeError::from(anyhow!(
"remote MCP server `{server_name}` requires a remote executor environment"
)));
}
return Err(StartupOutcomeError::from(anyhow!(
// Remote HTTP needs the future low-level executor
// `network/request` API so reqwest runs on the executor side.
// Do not fall back to local HTTP here; the config explicitly
// asked for remote placement.
"remote streamable HTTP MCP server `{server_name}` is not implemented yet"
)));
}

// Local streamable HTTP remains the existing reqwest path from
// the orchestrator process.
let resolved_bearer_token =
match resolve_bearer_token(server_name, bearer_token_env_var.as_deref()) {
Ok(token) => token,
Expand Down
1 change: 1 addition & 0 deletions codex-rs/config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub use mcp_edit::load_global_mcp_servers;
pub use mcp_types::AppToolApproval;
pub use mcp_types::McpServerConfig;
pub use mcp_types::McpServerDisabledReason;
pub use mcp_types::McpServerEnvVar;
pub use mcp_types::McpServerToolConfig;
pub use mcp_types::McpServerTransportConfig;
pub use mcp_types::RawMcpServerConfig;
Expand Down
Loading
Loading