Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 9 additions & 5 deletions codex-rs/core/src/agent_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use rand::TryRngCore;
use rand::rngs::OsRng;
use serde::Deserialize;
use serde::Serialize;
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use tracing::debug;
use tracing::info;
use tracing::warn;
Expand All @@ -42,7 +42,7 @@ pub(crate) struct AgentIdentityManager {
chatgpt_base_url: String,
feature_enabled: bool,
abom: AgentBillOfMaterials,
ensure_lock: Arc<Mutex<()>>,
ensure_lock: Arc<Semaphore>,
}

impl std::fmt::Debug for AgentIdentityManager {
Expand Down Expand Up @@ -110,7 +110,7 @@ impl AgentIdentityManager {
chatgpt_base_url: config.chatgpt_base_url.clone(),
feature_enabled: config.features.enabled(Feature::UseAgentIdentity),
abom: build_abom(session_source),
ensure_lock: Arc::new(Mutex::new(())),
ensure_lock: Arc::new(Semaphore::new(/*permits*/ 1)),
}
}

Expand All @@ -137,7 +137,11 @@ impl AgentIdentityManager {
auth: &CodexAuth,
binding: &AgentIdentityBinding,
) -> Result<StoredAgentIdentity> {
let _guard = self.ensure_lock.lock().await;
let _guard = self
.ensure_lock
.acquire()
.await
.map_err(|_| anyhow::anyhow!("agent identity ensure semaphore closed"))?;

if let Some(stored_identity) = self.load_stored_identity(auth, binding)? {
info!(
Expand Down Expand Up @@ -346,7 +350,7 @@ impl AgentIdentityManager {
chatgpt_base_url,
feature_enabled,
abom: build_abom(session_source),
ensure_lock: Arc::new(Mutex::new(())),
ensure_lock: Arc::new(Semaphore::new(/*permits*/ 1)),
}
}

Expand Down
25 changes: 21 additions & 4 deletions codex-rs/core/src/exec_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use codex_shell_command::is_dangerous_command::command_might_be_dangerous;
use codex_shell_command::is_safe_command::is_known_safe_command;
use thiserror::Error;
use tokio::fs;
use tokio::sync::Semaphore;
use tokio::task::spawn_blocking;
use tracing::instrument;

Expand Down Expand Up @@ -197,7 +198,7 @@ pub enum ExecPolicyUpdateError {

pub(crate) struct ExecPolicyManager {
policy: ArcSwap<Policy>,
update_lock: tokio::sync::Mutex<()>,
update_lock: Semaphore,
}

pub(crate) struct ExecApprovalRequest<'a> {
Expand All @@ -213,7 +214,7 @@ impl ExecPolicyManager {
pub(crate) fn new(policy: Arc<Policy>) -> Self {
Self {
policy: ArcSwap::from(policy),
update_lock: tokio::sync::Mutex::new(()),
update_lock: Semaphore::new(/*permits*/ 1),
}
}

Expand Down Expand Up @@ -331,7 +332,15 @@ impl ExecPolicyManager {
codex_home: &Path,
amendment: &ExecPolicyAmendment,
) -> Result<(), ExecPolicyUpdateError> {
let _update_guard = self.update_lock.lock().await;
let _update_guard =
self.update_lock
.acquire()
.await
.map_err(|_| ExecPolicyUpdateError::AddRule {
source: ExecPolicyRuleError::InvalidRule(
"exec policy update semaphore closed".to_string(),
),
})?;
let policy_path = default_policy_path(codex_home);
spawn_blocking({
let policy_path = policy_path.clone();
Expand Down Expand Up @@ -376,7 +385,15 @@ impl ExecPolicyManager {
decision: Decision,
justification: Option<String>,
) -> Result<(), ExecPolicyUpdateError> {
let _update_guard = self.update_lock.lock().await;
let _update_guard =
self.update_lock
.acquire()
.await
.map_err(|_| ExecPolicyUpdateError::AddRule {
source: ExecPolicyRuleError::InvalidRule(
"exec policy update semaphore closed".to_string(),
),
})?;
let policy_path = default_policy_path(codex_home);
let host = host.to_string();
spawn_blocking({
Expand Down
13 changes: 7 additions & 6 deletions codex-rs/core/src/guardian/review_session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use codex_protocol::protocol::SandboxPolicy;
use codex_protocol::protocol::SubAgentSource;
use serde_json::Value;
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use tokio_util::sync::CancellationToken;
use tracing::warn;

Expand Down Expand Up @@ -91,7 +92,7 @@ struct GuardianReviewSession {
codex: Codex,
cancel_token: CancellationToken,
reuse_key: GuardianReviewSessionReuseKey,
review_lock: Mutex<()>,
review_lock: Semaphore,
state: Mutex<GuardianReviewState>,
}

Expand Down Expand Up @@ -281,7 +282,7 @@ impl GuardianReviewSessionManager {
Ok(mut state) => {
if let Some(trunk) = state.trunk.as_ref()
&& trunk.reuse_key != next_reuse_key
&& trunk.review_lock.try_lock().is_ok()
&& trunk.review_lock.try_acquire().is_ok()
{
stale_trunk_to_shutdown = state.trunk.take();
}
Expand Down Expand Up @@ -336,7 +337,7 @@ impl GuardianReviewSessionManager {
.await;
}

let trunk_guard = match trunk.review_lock.try_lock() {
let trunk_guard = match trunk.review_lock.try_acquire() {
Ok(trunk_guard) => trunk_guard,
Err(_) => {
return Box::pin(self.run_ephemeral_review(
Expand Down Expand Up @@ -375,7 +376,7 @@ impl GuardianReviewSessionManager {
reuse_key,
codex,
cancel_token: CancellationToken::new(),
review_lock: Mutex::new(()),
review_lock: Semaphore::new(/*permits*/ 1),
state: Mutex::new(GuardianReviewState {
prior_review_count: 0,
last_reviewed_transcript_cursor: None,
Expand All @@ -397,7 +398,7 @@ impl GuardianReviewSessionManager {
reuse_key,
codex,
cancel_token: CancellationToken::new(),
review_lock: Mutex::new(()),
review_lock: Semaphore::new(/*permits*/ 1),
state: Mutex::new(GuardianReviewState {
prior_review_count: 0,
last_reviewed_transcript_cursor: None,
Expand Down Expand Up @@ -531,7 +532,7 @@ async fn spawn_guardian_review_session(
codex,
cancel_token,
reuse_key,
review_lock: Mutex::new(()),
review_lock: Semaphore::new(/*permits*/ 1),
state: Mutex::new(GuardianReviewState {
prior_review_count,
last_reviewed_transcript_cursor: initial_transcript_cursor,
Expand Down
10 changes: 6 additions & 4 deletions codex-rs/core/src/plugins/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ use std::sync::RwLock;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::time::Instant;
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use toml_edit::value;
use tracing::info;
use tracing::warn;
Expand Down Expand Up @@ -349,7 +349,7 @@ pub struct PluginsManager {
configured_marketplace_upgrade_state: RwLock<ConfiguredMarketplaceUpgradeState>,
non_curated_cache_refresh_state: RwLock<NonCuratedCacheRefreshState>,
cached_enabled_outcome: RwLock<Option<PluginLoadOutcome>>,
remote_sync_lock: Mutex<()>,
remote_sync_lock: Semaphore,
restriction_product: Option<Product>,
analytics_events_client: RwLock<Option<AnalyticsEventsClient>>,
}
Expand Down Expand Up @@ -379,7 +379,7 @@ impl PluginsManager {
),
non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()),
cached_enabled_outcome: RwLock::new(None),
remote_sync_lock: Mutex::new(()),
remote_sync_lock: Semaphore::new(/*permits*/ 1),
restriction_product,
analytics_events_client: RwLock::new(None),
}
Expand Down Expand Up @@ -706,7 +706,9 @@ impl PluginsManager {
auth: Option<&CodexAuth>,
additive_only: bool,
) -> Result<RemotePluginSyncResult, PluginRemoteSyncError> {
let _remote_sync_guard = self.remote_sync_lock.lock().await;
let _remote_sync_guard = self.remote_sync_lock.acquire().await.map_err(|_| {
PluginRemoteSyncError::Config(anyhow::anyhow!("remote plugin sync semaphore closed"))
})?;

if !config.features.enabled(Feature::Plugins) {
return Ok(RemotePluginSyncResult::default());
Expand Down
6 changes: 5 additions & 1 deletion codex-rs/core/src/session/agent_task_lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,11 @@ impl Session {
return Ok(Some(agent_task));
}

let _guard = self.agent_task_registration_lock.lock().await;
let _guard = self
.agent_task_registration_lock
.acquire()
.await
.map_err(|_| anyhow::anyhow!("agent task registration semaphore closed"))?;
if let Some(agent_task) = self.cached_agent_task_for_current_identity().await {
return Ok(Some(agent_task));
}
Expand Down
11 changes: 9 additions & 2 deletions codex-rs/core/src/session/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,10 @@ impl Session {
let Some(started_proxy) = self.services.network_proxy.as_ref() else {
return;
};
let _refresh_guard = self.managed_network_proxy_refresh_lock.lock().await;
let Ok(_refresh_guard) = self.managed_network_proxy_refresh_lock.acquire().await else {
error!("managed network proxy refresh semaphore closed");
return;
};
let session_configuration = {
let state = self.state.lock().await;
state.session_configuration.clone()
Expand Down Expand Up @@ -1675,7 +1678,11 @@ impl Session {
amendment: &NetworkPolicyAmendment,
network_approval_context: &NetworkApprovalContext,
) -> anyhow::Result<()> {
let _refresh_guard = self.managed_network_proxy_refresh_lock.lock().await;
let _refresh_guard = self
.managed_network_proxy_refresh_lock
.acquire()
.await
.map_err(|_| anyhow::anyhow!("managed network proxy refresh semaphore closed"))?;
let host =
Self::validated_network_policy_amendment_host(amendment, network_approval_context)?;
let codex_home = self
Expand Down
9 changes: 5 additions & 4 deletions codex-rs/core/src/session/session.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use super::*;
use tokio::sync::Semaphore;

/// Context for an initialized model agent
///
Expand All @@ -11,7 +12,7 @@ pub(crate) struct Session {
pub(super) state: Mutex<SessionState>,
/// Serializes rebuild/apply cycles for the running proxy; each cycle
/// rebuilds from the current SessionState while holding this lock.
pub(super) managed_network_proxy_refresh_lock: Mutex<()>,
pub(super) managed_network_proxy_refresh_lock: Semaphore,
/// The set of enabled features should be invariant for the lifetime of the
/// session.
pub(super) features: ManagedFeatures,
Expand All @@ -25,7 +26,7 @@ pub(crate) struct Session {
pub(crate) services: SessionServices,
pub(super) js_repl: Arc<JsReplHandle>,
pub(super) next_internal_sub_id: AtomicU64,
pub(super) agent_task_registration_lock: Mutex<()>,
pub(super) agent_task_registration_lock: Semaphore,
}

#[derive(Clone)]
Expand Down Expand Up @@ -709,7 +710,7 @@ impl Session {
agent_status,
out_of_band_elicitation_paused,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Mutex::new(()),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
Expand All @@ -721,7 +722,7 @@ impl Session {
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
agent_task_registration_lock: Mutex::new(()),
agent_task_registration_lock: Semaphore::new(/*permits*/ 1),
});
if let Some(network_policy_decider_session) = network_policy_decider_session {
let mut guard = network_policy_decider_session.write().await;
Expand Down
9 changes: 5 additions & 4 deletions codex-rs/core/src/session/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ use sha2::Digest as _;
use sha2::Sha512;
use std::path::Path;
use std::time::Duration;
use tokio::sync::Semaphore;
use tokio::time::sleep;
use tokio::time::timeout;
use tracing_opentelemetry::OpenTelemetrySpanExt;
Expand Down Expand Up @@ -3293,7 +3294,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
agent_status: agent_status_tx,
out_of_band_elicitation_paused: watch::channel(false).0,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Mutex::new(()),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
Expand All @@ -3305,7 +3306,7 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) {
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
agent_task_registration_lock: Mutex::new(()),
agent_task_registration_lock: Semaphore::new(/*permits*/ 1),
};

(session, turn_context)
Expand Down Expand Up @@ -4263,7 +4264,7 @@ where
agent_status: agent_status_tx,
out_of_band_elicitation_paused: watch::channel(false).0,
state: Mutex::new(state),
managed_network_proxy_refresh_lock: Mutex::new(()),
managed_network_proxy_refresh_lock: Semaphore::new(/*permits*/ 1),
features: config.features.clone(),
pending_mcp_server_refresh_config: Mutex::new(None),
conversation: Arc::new(RealtimeConversationManager::new()),
Expand All @@ -4275,7 +4276,7 @@ where
services,
js_repl,
next_internal_sub_id: AtomicU64::new(0),
agent_task_registration_lock: Mutex::new(()),
agent_task_registration_lock: Semaphore::new(/*permits*/ 1),
});

(session, turn_context, rx_event)
Expand Down
12 changes: 8 additions & 4 deletions codex-rs/login/src/agent_identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use serde::Deserialize;
use serde::Serialize;
use sha2::Digest as _;
use sha2::Sha512;
use tokio::sync::Mutex;
use tokio::sync::Semaphore;
use tracing::debug;
use tracing::info;
use tracing::warn;
Expand All @@ -42,7 +42,7 @@ pub(crate) struct BackgroundAgentTaskManager {
chatgpt_base_url: String,
auth_mode: BackgroundAgentTaskAuthMode,
abom: AgentBillOfMaterials,
ensure_lock: Arc<Mutex<()>>,
ensure_lock: Arc<Semaphore>,
}

impl std::fmt::Debug for BackgroundAgentTaskManager {
Expand Down Expand Up @@ -158,7 +158,7 @@ impl BackgroundAgentTaskManager {
chatgpt_base_url: normalize_chatgpt_base_url(&chatgpt_base_url),
auth_mode,
abom: build_abom(session_source),
ensure_lock: Arc::new(Mutex::new(())),
ensure_lock: Arc::new(Semaphore::new(/*permits*/ 1)),
}
}

Expand Down Expand Up @@ -186,7 +186,11 @@ impl BackgroundAgentTaskManager {
return Ok(None);
};

let _guard = self.ensure_lock.lock().await;
let _guard = self
.ensure_lock
.acquire()
.await
.context("background agent task ensure semaphore closed")?;
let mut stored_identity = self
.ensure_registered_identity_for_binding(auth, &binding)
.await?;
Expand Down
1 change: 1 addition & 0 deletions codex-rs/login/src/auth/auth_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ fn dummy_chatgpt_auth_does_not_create_cwd_auth_json_when_identity_is_set() {
agent_runtime_id: "agent_123".to_string(),
agent_private_key: "pkcs8-base64".to_string(),
registered_at: "2026-04-13T12:00:00Z".to_string(),
background_task_id: None,
};

auth.set_agent_identity(record.clone())
Expand Down
Loading
Loading