diff --git a/codex-rs/core/src/agent_identity.rs b/codex-rs/core/src/agent_identity.rs index e2ee5e6b3119..3a4c6787f21a 100644 --- a/codex-rs/core/src/agent_identity.rs +++ b/codex-rs/core/src/agent_identity.rs @@ -22,7 +22,7 @@ use rand::TryRngCore; use rand::rngs::OsRng; use serde::Deserialize; use serde::Serialize; -use tokio::sync::Mutex; +use tokio::sync::Semaphore; use tracing::debug; use tracing::info; use tracing::warn; @@ -42,7 +42,7 @@ pub(crate) struct AgentIdentityManager { chatgpt_base_url: String, feature_enabled: bool, abom: AgentBillOfMaterials, - ensure_lock: Arc>, + ensure_lock: Arc, } impl std::fmt::Debug for AgentIdentityManager { @@ -110,7 +110,7 @@ impl AgentIdentityManager { chatgpt_base_url: config.chatgpt_base_url.clone(), feature_enabled: config.features.enabled(Feature::UseAgentIdentity), abom: build_abom(session_source), - ensure_lock: Arc::new(Mutex::new(())), + ensure_lock: Arc::new(Semaphore::new(/*permits*/ 1)), } } @@ -137,7 +137,11 @@ impl AgentIdentityManager { auth: &CodexAuth, binding: &AgentIdentityBinding, ) -> Result { - let _guard = self.ensure_lock.lock().await; + let _guard = self + .ensure_lock + .acquire() + .await + .map_err(|_| anyhow::anyhow!("agent identity ensure semaphore closed"))?; if let Some(stored_identity) = self.load_stored_identity(auth, binding)? { info!( @@ -346,7 +350,7 @@ impl AgentIdentityManager { chatgpt_base_url, feature_enabled, abom: build_abom(session_source), - ensure_lock: Arc::new(Mutex::new(())), + ensure_lock: Arc::new(Semaphore::new(/*permits*/ 1)), } } diff --git a/codex-rs/core/src/exec_policy.rs b/codex-rs/core/src/exec_policy.rs index 981b2d11d37e..54ad8058d0f0 100644 --- a/codex-rs/core/src/exec_policy.rs +++ b/codex-rs/core/src/exec_policy.rs @@ -28,6 +28,7 @@ use codex_shell_command::is_dangerous_command::command_might_be_dangerous; use codex_shell_command::is_safe_command::is_known_safe_command; use thiserror::Error; use tokio::fs; +use tokio::sync::Semaphore; use tokio::task::spawn_blocking; use tracing::instrument; @@ -197,7 +198,7 @@ pub enum ExecPolicyUpdateError { pub(crate) struct ExecPolicyManager { policy: ArcSwap, - update_lock: tokio::sync::Mutex<()>, + update_lock: Semaphore, } pub(crate) struct ExecApprovalRequest<'a> { @@ -213,7 +214,7 @@ impl ExecPolicyManager { pub(crate) fn new(policy: Arc) -> Self { Self { policy: ArcSwap::from(policy), - update_lock: tokio::sync::Mutex::new(()), + update_lock: Semaphore::new(/*permits*/ 1), } } @@ -331,7 +332,15 @@ impl ExecPolicyManager { codex_home: &Path, amendment: &ExecPolicyAmendment, ) -> Result<(), ExecPolicyUpdateError> { - let _update_guard = self.update_lock.lock().await; + let _update_guard = + self.update_lock + .acquire() + .await + .map_err(|_| ExecPolicyUpdateError::AddRule { + source: ExecPolicyRuleError::InvalidRule( + "exec policy update semaphore closed".to_string(), + ), + })?; let policy_path = default_policy_path(codex_home); spawn_blocking({ let policy_path = policy_path.clone(); @@ -376,7 +385,15 @@ impl ExecPolicyManager { decision: Decision, justification: Option, ) -> Result<(), ExecPolicyUpdateError> { - let _update_guard = self.update_lock.lock().await; + let _update_guard = + self.update_lock + .acquire() + .await + .map_err(|_| ExecPolicyUpdateError::AddRule { + source: ExecPolicyRuleError::InvalidRule( + "exec policy update semaphore closed".to_string(), + ), + })?; let policy_path = default_policy_path(codex_home); let host = host.to_string(); spawn_blocking({ diff --git a/codex-rs/core/src/guardian/review_session.rs b/codex-rs/core/src/guardian/review_session.rs index d5bc96b71212..7aea5978f1c1 100644 --- a/codex-rs/core/src/guardian/review_session.rs +++ b/codex-rs/core/src/guardian/review_session.rs @@ -19,6 +19,7 @@ use codex_protocol::protocol::SandboxPolicy; use codex_protocol::protocol::SubAgentSource; use serde_json::Value; use tokio::sync::Mutex; +use tokio::sync::Semaphore; use tokio_util::sync::CancellationToken; use tracing::warn; @@ -91,7 +92,7 @@ struct GuardianReviewSession { codex: Codex, cancel_token: CancellationToken, reuse_key: GuardianReviewSessionReuseKey, - review_lock: Mutex<()>, + review_lock: Semaphore, state: Mutex, } @@ -281,7 +282,7 @@ impl GuardianReviewSessionManager { Ok(mut state) => { if let Some(trunk) = state.trunk.as_ref() && trunk.reuse_key != next_reuse_key - && trunk.review_lock.try_lock().is_ok() + && trunk.review_lock.try_acquire().is_ok() { stale_trunk_to_shutdown = state.trunk.take(); } @@ -336,7 +337,7 @@ impl GuardianReviewSessionManager { .await; } - let trunk_guard = match trunk.review_lock.try_lock() { + let trunk_guard = match trunk.review_lock.try_acquire() { Ok(trunk_guard) => trunk_guard, Err(_) => { return Box::pin(self.run_ephemeral_review( @@ -375,7 +376,7 @@ impl GuardianReviewSessionManager { reuse_key, codex, cancel_token: CancellationToken::new(), - review_lock: Mutex::new(()), + review_lock: Semaphore::new(/*permits*/ 1), state: Mutex::new(GuardianReviewState { prior_review_count: 0, last_reviewed_transcript_cursor: None, @@ -397,7 +398,7 @@ impl GuardianReviewSessionManager { reuse_key, codex, cancel_token: CancellationToken::new(), - review_lock: Mutex::new(()), + review_lock: Semaphore::new(/*permits*/ 1), state: Mutex::new(GuardianReviewState { prior_review_count: 0, last_reviewed_transcript_cursor: None, @@ -531,7 +532,7 @@ async fn spawn_guardian_review_session( codex, cancel_token, reuse_key, - review_lock: Mutex::new(()), + review_lock: Semaphore::new(/*permits*/ 1), state: Mutex::new(GuardianReviewState { prior_review_count, last_reviewed_transcript_cursor: initial_transcript_cursor, diff --git a/codex-rs/core/src/plugins/manager.rs b/codex-rs/core/src/plugins/manager.rs index 794d1f243455..3698b4b19183 100644 --- a/codex-rs/core/src/plugins/manager.rs +++ b/codex-rs/core/src/plugins/manager.rs @@ -70,7 +70,7 @@ use std::sync::RwLock; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Instant; -use tokio::sync::Mutex; +use tokio::sync::Semaphore; use toml_edit::value; use tracing::info; use tracing::warn; @@ -349,7 +349,7 @@ pub struct PluginsManager { configured_marketplace_upgrade_state: RwLock, non_curated_cache_refresh_state: RwLock, cached_enabled_outcome: RwLock>, - remote_sync_lock: Mutex<()>, + remote_sync_lock: Semaphore, restriction_product: Option, analytics_events_client: RwLock>, } @@ -379,7 +379,7 @@ impl PluginsManager { ), non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()), cached_enabled_outcome: RwLock::new(None), - remote_sync_lock: Mutex::new(()), + remote_sync_lock: Semaphore::new(/*permits*/ 1), restriction_product, analytics_events_client: RwLock::new(None), } @@ -706,7 +706,9 @@ impl PluginsManager { auth: Option<&CodexAuth>, additive_only: bool, ) -> Result { - let _remote_sync_guard = self.remote_sync_lock.lock().await; + let _remote_sync_guard = self.remote_sync_lock.acquire().await.map_err(|_| { + PluginRemoteSyncError::Config(anyhow::anyhow!("remote plugin sync semaphore closed")) + })?; if !config.features.enabled(Feature::Plugins) { return Ok(RemotePluginSyncResult::default()); diff --git a/codex-rs/core/src/session/agent_task_lifecycle.rs b/codex-rs/core/src/session/agent_task_lifecycle.rs index 888848741da7..72d0cf0459c6 100644 --- a/codex-rs/core/src/session/agent_task_lifecycle.rs +++ b/codex-rs/core/src/session/agent_task_lifecycle.rs @@ -173,7 +173,11 @@ impl Session { return Ok(Some(agent_task)); } - let _guard = self.agent_task_registration_lock.lock().await; + let _guard = self + .agent_task_registration_lock + .acquire() + .await + .map_err(|_| anyhow::anyhow!("agent task registration semaphore closed"))?; if let Some(agent_task) = self.cached_agent_task_for_current_identity().await { return Ok(Some(agent_task)); } diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 80b6ac95db00..f159546575d4 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -897,7 +897,10 @@ impl Session { let Some(started_proxy) = self.services.network_proxy.as_ref() else { return; }; - let _refresh_guard = self.managed_network_proxy_refresh_lock.lock().await; + let Ok(_refresh_guard) = self.managed_network_proxy_refresh_lock.acquire().await else { + error!("managed network proxy refresh semaphore closed"); + return; + }; let session_configuration = { let state = self.state.lock().await; state.session_configuration.clone() @@ -1675,7 +1678,11 @@ impl Session { amendment: &NetworkPolicyAmendment, network_approval_context: &NetworkApprovalContext, ) -> anyhow::Result<()> { - let _refresh_guard = self.managed_network_proxy_refresh_lock.lock().await; + let _refresh_guard = self + .managed_network_proxy_refresh_lock + .acquire() + .await + .map_err(|_| anyhow::anyhow!("managed network proxy refresh semaphore closed"))?; let host = Self::validated_network_policy_amendment_host(amendment, network_approval_context)?; let codex_home = self diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 5cc0d2df7247..a2080bbeab26 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -1,4 +1,5 @@ use super::*; +use tokio::sync::Semaphore; /// Context for an initialized model agent /// @@ -11,7 +12,7 @@ pub(crate) struct Session { pub(super) state: Mutex, /// Serializes rebuild/apply cycles for the running proxy; each cycle /// rebuilds from the current SessionState while holding this lock. - pub(super) managed_network_proxy_refresh_lock: Mutex<()>, + pub(super) managed_network_proxy_refresh_lock: Semaphore, /// The set of enabled features should be invariant for the lifetime of the /// session. pub(super) features: ManagedFeatures, @@ -25,7 +26,7 @@ pub(crate) struct Session { pub(crate) services: SessionServices, pub(super) js_repl: Arc, pub(super) next_internal_sub_id: AtomicU64, - pub(super) agent_task_registration_lock: Mutex<()>, + pub(super) agent_task_registration_lock: Semaphore, } #[derive(Clone)] @@ -709,7 +710,7 @@ impl Session { agent_status, out_of_band_elicitation_paused, state: Mutex::new(state), - managed_network_proxy_refresh_lock: Mutex::new(()), + managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), conversation: Arc::new(RealtimeConversationManager::new()), @@ -721,7 +722,7 @@ impl Session { services, js_repl, next_internal_sub_id: AtomicU64::new(0), - agent_task_registration_lock: Mutex::new(()), + agent_task_registration_lock: Semaphore::new(/*permits*/ 1), }); if let Some(network_policy_decider_session) = network_policy_decider_session { let mut guard = network_policy_decider_session.write().await; diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index acde30560dcf..6aa4a366a98b 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -143,6 +143,7 @@ use sha2::Digest as _; use sha2::Sha512; use std::path::Path; use std::time::Duration; +use tokio::sync::Semaphore; use tokio::time::sleep; use tokio::time::timeout; use tracing_opentelemetry::OpenTelemetrySpanExt; @@ -3293,7 +3294,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { agent_status: agent_status_tx, out_of_band_elicitation_paused: watch::channel(false).0, state: Mutex::new(state), - managed_network_proxy_refresh_lock: Mutex::new(()), + managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), conversation: Arc::new(RealtimeConversationManager::new()), @@ -3305,7 +3306,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { services, js_repl, next_internal_sub_id: AtomicU64::new(0), - agent_task_registration_lock: Mutex::new(()), + agent_task_registration_lock: Semaphore::new(/*permits*/ 1), }; (session, turn_context) @@ -4263,7 +4264,7 @@ where agent_status: agent_status_tx, out_of_band_elicitation_paused: watch::channel(false).0, state: Mutex::new(state), - managed_network_proxy_refresh_lock: Mutex::new(()), + managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1), features: config.features.clone(), pending_mcp_server_refresh_config: Mutex::new(None), conversation: Arc::new(RealtimeConversationManager::new()), @@ -4275,7 +4276,7 @@ where services, js_repl, next_internal_sub_id: AtomicU64::new(0), - agent_task_registration_lock: Mutex::new(()), + agent_task_registration_lock: Semaphore::new(/*permits*/ 1), }); (session, turn_context, rx_event) diff --git a/codex-rs/login/src/agent_identity.rs b/codex-rs/login/src/agent_identity.rs index 751dd4538a75..864c45ad0fa3 100644 --- a/codex-rs/login/src/agent_identity.rs +++ b/codex-rs/login/src/agent_identity.rs @@ -22,7 +22,7 @@ use serde::Deserialize; use serde::Serialize; use sha2::Digest as _; use sha2::Sha512; -use tokio::sync::Mutex; +use tokio::sync::Semaphore; use tracing::debug; use tracing::info; use tracing::warn; @@ -42,7 +42,7 @@ pub(crate) struct BackgroundAgentTaskManager { chatgpt_base_url: String, auth_mode: BackgroundAgentTaskAuthMode, abom: AgentBillOfMaterials, - ensure_lock: Arc>, + ensure_lock: Arc, } impl std::fmt::Debug for BackgroundAgentTaskManager { @@ -158,7 +158,7 @@ impl BackgroundAgentTaskManager { chatgpt_base_url: normalize_chatgpt_base_url(&chatgpt_base_url), auth_mode, abom: build_abom(session_source), - ensure_lock: Arc::new(Mutex::new(())), + ensure_lock: Arc::new(Semaphore::new(/*permits*/ 1)), } } @@ -186,7 +186,11 @@ impl BackgroundAgentTaskManager { return Ok(None); }; - let _guard = self.ensure_lock.lock().await; + let _guard = self + .ensure_lock + .acquire() + .await + .context("background agent task ensure semaphore closed")?; let mut stored_identity = self .ensure_registered_identity_for_binding(auth, &binding) .await?; diff --git a/codex-rs/login/src/auth/auth_tests.rs b/codex-rs/login/src/auth/auth_tests.rs index 50dcc90e196c..326adea6e438 100644 --- a/codex-rs/login/src/auth/auth_tests.rs +++ b/codex-rs/login/src/auth/auth_tests.rs @@ -243,6 +243,7 @@ fn dummy_chatgpt_auth_does_not_create_cwd_auth_json_when_identity_is_set() { agent_runtime_id: "agent_123".to_string(), agent_private_key: "pkcs8-base64".to_string(), registered_at: "2026-04-13T12:00:00Z".to_string(), + background_task_id: None, }; auth.set_agent_identity(record.clone()) diff --git a/codex-rs/login/src/auth/manager.rs b/codex-rs/login/src/auth/manager.rs index 0731bf97db52..29733f13da9d 100644 --- a/codex-rs/login/src/auth/manager.rs +++ b/codex-rs/login/src/auth/manager.rs @@ -15,7 +15,7 @@ use std::sync::Mutex; use std::sync::RwLock; use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; -use tokio::sync::Mutex as AsyncMutex; +use tokio::sync::Semaphore; use tokio::sync::watch; use codex_app_server_protocol::AuthMode; @@ -1200,7 +1200,7 @@ pub struct AuthManager { forced_chatgpt_workspace_id: RwLock>, chatgpt_base_url: RwLock>, background_agent_task_auth_mode: RwLock, - refresh_lock: AsyncMutex<()>, + refresh_lock: Semaphore, external_auth: RwLock>>, auth_state_tx: watch::Sender<()>, } @@ -1285,7 +1285,7 @@ impl AuthManager { forced_chatgpt_workspace_id: RwLock::new(None), chatgpt_base_url: RwLock::new(None), background_agent_task_auth_mode: RwLock::new(BackgroundAgentTaskAuthMode::Disabled), - refresh_lock: AsyncMutex::new(()), + refresh_lock: Semaphore::new(/*permits*/ 1), external_auth: RwLock::new(None), auth_state_tx, } @@ -1307,7 +1307,7 @@ impl AuthManager { forced_chatgpt_workspace_id: RwLock::new(None), chatgpt_base_url: RwLock::new(None), background_agent_task_auth_mode: RwLock::new(BackgroundAgentTaskAuthMode::Disabled), - refresh_lock: AsyncMutex::new(()), + refresh_lock: Semaphore::new(/*permits*/ 1), external_auth: RwLock::new(None), auth_state_tx, }) @@ -1328,7 +1328,7 @@ impl AuthManager { forced_chatgpt_workspace_id: RwLock::new(None), chatgpt_base_url: RwLock::new(None), background_agent_task_auth_mode: RwLock::new(BackgroundAgentTaskAuthMode::Disabled), - refresh_lock: AsyncMutex::new(()), + refresh_lock: Semaphore::new(/*permits*/ 1), external_auth: RwLock::new(None), auth_state_tx, }) @@ -1347,7 +1347,7 @@ impl AuthManager { forced_chatgpt_workspace_id: RwLock::new(None), chatgpt_base_url: RwLock::new(None), background_agent_task_auth_mode: RwLock::new(BackgroundAgentTaskAuthMode::Disabled), - refresh_lock: AsyncMutex::new(()), + refresh_lock: Semaphore::new(/*permits*/ 1), external_auth: RwLock::new(Some( Arc::new(BearerTokenRefresher::new(config)) as Arc )), @@ -1742,7 +1742,12 @@ impl AuthManager { /// can assume that some other instance already refreshed it. If the persisted /// token is the same as the cached, then ask the token authority to refresh. pub async fn refresh_token(&self) -> Result<(), RefreshTokenError> { - let _refresh_guard = self.refresh_lock.lock().await; + let _refresh_guard = self.refresh_lock.acquire().await.map_err(|_| { + RefreshTokenError::Permanent(RefreshTokenFailedError::new( + RefreshTokenFailedReason::Other, + REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(), + )) + })?; let auth_before_reload = self.auth_cached(); if auth_before_reload .as_ref() @@ -1774,7 +1779,12 @@ impl AuthManager { /// observe refreshed token. If the token refresh fails, returns the error to /// the caller. pub async fn refresh_token_from_authority(&self) -> Result<(), RefreshTokenError> { - let _refresh_guard = self.refresh_lock.lock().await; + let _refresh_guard = self.refresh_lock.acquire().await.map_err(|_| { + RefreshTokenError::Permanent(RefreshTokenFailedError::new( + RefreshTokenFailedReason::Other, + REFRESH_TOKEN_UNKNOWN_MESSAGE.to_string(), + )) + })?; self.refresh_token_from_authority_impl().await } diff --git a/codex-rs/rmcp-client/src/rmcp_client.rs b/codex-rs/rmcp-client/src/rmcp_client.rs index b80e335a59d0..50891ec8c0cb 100644 --- a/codex-rs/rmcp-client/src/rmcp_client.rs +++ b/codex-rs/rmcp-client/src/rmcp_client.rs @@ -63,6 +63,7 @@ use serde_json::Value; use sse_stream::Sse; use sse_stream::SseStream; use tokio::sync::Mutex; +use tokio::sync::Semaphore; use tokio::sync::watch; use tokio::time; use tracing::warn; @@ -495,7 +496,7 @@ pub struct RmcpClient { state: Mutex, transport_recipe: TransportRecipe, initialize_context: Mutex>, - session_recovery_lock: Mutex<()>, + session_recovery_lock: Semaphore, elicitation_pause_state: ElicitationPauseState, } @@ -522,7 +523,7 @@ impl RmcpClient { }), transport_recipe, initialize_context: Mutex::new(None), - session_recovery_lock: Mutex::new(()), + session_recovery_lock: Semaphore::new(/*permits*/ 1), elicitation_pause_state: ElicitationPauseState::new(), }) } @@ -551,7 +552,7 @@ impl RmcpClient { }), transport_recipe, initialize_context: Mutex::new(None), - session_recovery_lock: Mutex::new(()), + session_recovery_lock: Semaphore::new(/*permits*/ 1), elicitation_pause_state: ElicitationPauseState::new(), }) } @@ -1098,7 +1099,11 @@ impl RmcpClient { &self, failed_service: &Arc>, ) -> Result<()> { - let _recovery_guard = self.session_recovery_lock.lock().await; + let _recovery_guard = self + .session_recovery_lock + .acquire() + .await + .map_err(|_| anyhow!("MCP client recovery semaphore closed"))?; { let guard = self.state.lock().await;