From 6fcdafdc150a2a1581e733e4dd80ad45a8d46c10 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Tue, 5 May 2026 18:52:37 -0700 Subject: [PATCH 1/2] Move skills watcher to app server --- .../app-server/src/bespoke_event_handling.rs | 8 - codex-rs/app-server/src/lib.rs | 1 + codex-rs/app-server/src/message_processor.rs | 4 + codex-rs/app-server/src/request_processors.rs | 1 + .../request_processors/thread_lifecycle.rs | 14 +- .../request_processors/thread_processor.rs | 6 +- .../src/request_processors/turn_processor.rs | 4 + codex-rs/app-server/src/skills_watcher.rs | 112 +++++++++++++ codex-rs/app-server/src/thread_state.rs | 5 + .../app-server/tests/suite/v2/skills_list.rs | 41 +++++ codex-rs/core/src/codex_delegate.rs | 1 - codex-rs/core/src/codex_thread.rs | 9 +- codex-rs/core/src/lib.rs | 1 - codex-rs/core/src/session/mod.rs | 34 +--- codex-rs/core/src/session/session.rs | 4 - codex-rs/core/src/session/tests.rs | 6 - .../core/src/session/tests/guardian_tests.rs | 2 - codex-rs/core/src/session/turn.rs | 1 - codex-rs/core/src/skills_watcher.rs | 125 -------------- codex-rs/core/src/state/service.rs | 2 - codex-rs/core/src/thread_manager.rs | 69 +------- codex-rs/core/tests/suite/live_reload.rs | 157 ------------------ codex-rs/core/tests/suite/mod.rs | 1 - codex-rs/mcp-server/src/codex_tool_runner.rs | 1 - codex-rs/protocol/src/protocol.rs | 3 - codex-rs/rollout-trace/src/protocol_event.rs | 2 - codex-rs/rollout/src/policy.rs | 1 - 27 files changed, 198 insertions(+), 417 deletions(-) create mode 100644 codex-rs/app-server/src/skills_watcher.rs delete mode 100644 codex-rs/core/src/skills_watcher.rs delete mode 100644 codex-rs/core/tests/suite/live_reload.rs diff --git a/codex-rs/app-server/src/bespoke_event_handling.rs b/codex-rs/app-server/src/bespoke_event_handling.rs index 18fcc1409115..a21a644dcf18 100644 --- a/codex-rs/app-server/src/bespoke_event_handling.rs +++ b/codex-rs/app-server/src/bespoke_event_handling.rs @@ -50,7 +50,6 @@ use codex_app_server_protocol::RawResponseItemCompletedNotification; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::ServerNotification; use codex_app_server_protocol::ServerRequestPayload; -use codex_app_server_protocol::SkillsChangedNotification; use codex_app_server_protocol::ThreadGoalUpdatedNotification; use codex_app_server_protocol::ThreadItem; use codex_app_server_protocol::ThreadRealtimeClosedNotification; @@ -201,13 +200,6 @@ pub(crate) async fn apply_bespoke_event_handling( ) .await; } - EventMsg::SkillsUpdateAvailable => { - outgoing - .send_server_notification(ServerNotification::SkillsChanged( - SkillsChangedNotification {}, - )) - .await; - } EventMsg::McpStartupUpdate(update) => { let (status, error) = match update.status { codex_protocol::protocol::McpStartupStatus::Starting => { diff --git a/codex-rs/app-server/src/lib.rs b/codex-rs/app-server/src/lib.rs index 4013bbe76bc9..34b5c7dd0d39 100644 --- a/codex-rs/app-server/src/lib.rs +++ b/codex-rs/app-server/src/lib.rs @@ -91,6 +91,7 @@ mod outgoing_message; mod request_processors; mod request_serialization; mod server_request_error; +mod skills_watcher; mod thread_state; mod thread_status; mod transport; diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index 4c1d16eeac95..40364720f99e 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -35,6 +35,7 @@ use crate::request_processors::WindowsSandboxRequestProcessor; use crate::request_serialization::QueuedInitializedRequest; use crate::request_serialization::RequestSerializationQueueKey; use crate::request_serialization::RequestSerializationQueues; +use crate::skills_watcher::SkillsWatcher; use crate::thread_state::ThreadStateManager; use crate::transport::AppServerTransport; use crate::transport::ConnectionOrigin; @@ -306,6 +307,7 @@ impl MessageProcessor { thread_manager .plugins_manager() .set_analytics_events_client(analytics_events_client.clone()); + let skills_watcher = SkillsWatcher::new(thread_manager.skills_manager(), outgoing.clone()); let pending_thread_unloads = Arc::new(Mutex::new(HashSet::new())); let thread_state_manager = ThreadStateManager::new(); @@ -399,6 +401,7 @@ impl MessageProcessor { Arc::clone(&thread_list_state_permit), thread_goal_processor.clone(), Some(state_db.clone()), + Arc::clone(&skills_watcher), ); let turn_processor = TurnRequestProcessor::new( auth_manager.clone(), @@ -412,6 +415,7 @@ impl MessageProcessor { thread_state_manager, thread_watch_manager, thread_list_state_permit, + Arc::clone(&skills_watcher), ); if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) { // Keep plugin startup warmups aligned at app-server startup. diff --git a/codex-rs/app-server/src/request_processors.rs b/codex-rs/app-server/src/request_processors.rs index be6d55986629..6fef0ba78094 100644 --- a/codex-rs/app-server/src/request_processors.rs +++ b/codex-rs/app-server/src/request_processors.rs @@ -11,6 +11,7 @@ use crate::outgoing_message::ConnectionRequestId; use crate::outgoing_message::OutgoingMessageSender; use crate::outgoing_message::RequestContext; use crate::outgoing_message::ThreadScopedOutgoingMessageSender; +use crate::skills_watcher::SkillsWatcher; use crate::thread_status::ThreadWatchManager; use crate::thread_status::resolve_thread_status; use chrono::DateTime; diff --git a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs index 4a677d91ab4f..822392774b2c 100644 --- a/codex-rs/app-server/src/request_processors/thread_lifecycle.rs +++ b/codex-rs/app-server/src/request_processors/thread_lifecycle.rs @@ -13,6 +13,7 @@ pub(super) struct ListenerTaskContext { pub(super) thread_list_state_permit: Arc, pub(super) fallback_model_provider: String, pub(super) codex_home: PathBuf, + pub(super) skills_watcher: Arc, } struct UnloadingState { @@ -227,12 +228,22 @@ pub(super) async fn ensure_listener_task_running( "thread {conversation_id} is closing; retry after the thread is closed" ))); }; + let config = conversation.config().await; + let environments = conversation.environment_selections().await; + let watch_registration = listener_task_context + .skills_watcher + .register_thread_config( + config.as_ref(), + listener_task_context.thread_manager.as_ref(), + &environments, + ) + .await; let (mut listener_command_rx, listener_generation) = { let mut thread_state = thread_state.lock().await; if thread_state.listener_matches(&conversation) { return Ok(()); } - thread_state.set_listener(cancel_tx, &conversation) + thread_state.set_listener(cancel_tx, &conversation, watch_registration) }; let ListenerTaskContext { outgoing, @@ -244,6 +255,7 @@ pub(super) async fn ensure_listener_task_running( thread_list_state_permit, fallback_model_provider, codex_home, + .. } = listener_task_context; let outgoing_for_task = Arc::clone(&outgoing); tokio::spawn(async move { diff --git a/codex-rs/app-server/src/request_processors/thread_processor.rs b/codex-rs/app-server/src/request_processors/thread_processor.rs index a9c7ab90d129..12885fb0c704 100644 --- a/codex-rs/app-server/src/request_processors/thread_processor.rs +++ b/codex-rs/app-server/src/request_processors/thread_processor.rs @@ -257,6 +257,7 @@ pub(crate) struct ThreadRequestProcessor { pub(super) thread_goal_processor: ThreadGoalRequestProcessor, pub(super) state_db: Option, pub(super) background_tasks: TaskTracker, + pub(super) skills_watcher: Arc, } impl ThreadRequestProcessor { @@ -276,6 +277,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: Arc, thread_goal_processor: ThreadGoalRequestProcessor, state_db: Option, + skills_watcher: Arc, ) -> Self { Self { auth_manager, @@ -293,6 +295,7 @@ impl ThreadRequestProcessor { thread_goal_processor, state_db, background_tasks: TaskTracker::new(), + skills_watcher, } } @@ -686,6 +689,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), + skills_watcher: Arc::clone(&self.skills_watcher), } } @@ -783,6 +787,7 @@ impl ThreadRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), + skills_watcher: Arc::clone(&self.skills_watcher), }; let request_trace = request_context.request_trace(); let config_manager = self.config_manager.clone(); @@ -981,7 +986,6 @@ impl ThreadRequestProcessor { .collect() }; let core_dynamic_tool_count = core_dynamic_tools.len(); - let NewThread { thread_id, thread, diff --git a/codex-rs/app-server/src/request_processors/turn_processor.rs b/codex-rs/app-server/src/request_processors/turn_processor.rs index 05997815ef38..90a66fa77ea7 100644 --- a/codex-rs/app-server/src/request_processors/turn_processor.rs +++ b/codex-rs/app-server/src/request_processors/turn_processor.rs @@ -13,6 +13,7 @@ pub(crate) struct TurnRequestProcessor { thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, + skills_watcher: Arc, } impl TurnRequestProcessor { @@ -29,6 +30,7 @@ impl TurnRequestProcessor { thread_state_manager: ThreadStateManager, thread_watch_manager: ThreadWatchManager, thread_list_state_permit: Arc, + skills_watcher: Arc, ) -> Self { Self { auth_manager, @@ -42,6 +44,7 @@ impl TurnRequestProcessor { thread_state_manager, thread_watch_manager, thread_list_state_permit, + skills_watcher, } } @@ -1086,6 +1089,7 @@ impl TurnRequestProcessor { thread_list_state_permit: self.thread_list_state_permit.clone(), fallback_model_provider: self.config.model_provider_id.clone(), codex_home: self.config.codex_home.to_path_buf(), + skills_watcher: Arc::clone(&self.skills_watcher), } } diff --git a/codex-rs/app-server/src/skills_watcher.rs b/codex-rs/app-server/src/skills_watcher.rs new file mode 100644 index 000000000000..33acf653355f --- /dev/null +++ b/codex-rs/app-server/src/skills_watcher.rs @@ -0,0 +1,112 @@ +use std::sync::Arc; +use std::time::Duration; + +use crate::outgoing_message::OutgoingMessageSender; +use codex_app_server_protocol::ServerNotification; +use codex_app_server_protocol::SkillsChangedNotification; +use codex_core::ThreadManager; +use codex_core::config::Config; +use codex_core::file_watcher::FileWatcher; +use codex_core::file_watcher::FileWatcherSubscriber; +use codex_core::file_watcher::Receiver; +use codex_core::file_watcher::ThrottledWatchReceiver; +use codex_core::file_watcher::WatchPath; +use codex_core::file_watcher::WatchRegistration; +use codex_core::skills::SkillsLoadInput; +use codex_core::skills::SkillsManager; +use codex_protocol::protocol::TurnEnvironmentSelection; +use tracing::warn; + +#[cfg(not(test))] +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10); +#[cfg(test)] +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); + +pub(crate) struct SkillsWatcher { + subscriber: FileWatcherSubscriber, +} + +impl SkillsWatcher { + pub(crate) fn new( + skills_manager: Arc, + outgoing: Arc, + ) -> Arc { + let file_watcher = match FileWatcher::new() { + Ok(file_watcher) => Arc::new(file_watcher), + Err(err) => { + warn!("failed to initialize skills file watcher: {err}"); + Arc::new(FileWatcher::noop()) + } + }; + let (subscriber, rx) = file_watcher.add_subscriber(); + Self::spawn_event_loop(rx, skills_manager, outgoing); + Arc::new(Self { subscriber }) + } + + pub(crate) async fn register_thread_config( + &self, + config: &Config, + thread_manager: &ThreadManager, + environments: &[TurnEnvironmentSelection], + ) -> WatchRegistration { + let Some(environment_selection) = environments.first() else { + return WatchRegistration::default(); + }; + let Some(environment) = thread_manager + .environment_manager() + .get_environment(&environment_selection.environment_id) + else { + warn!( + "failed to register skills watcher for unknown environment `{}`", + environment_selection.environment_id + ); + return WatchRegistration::default(); + }; + if environment.is_remote() { + return WatchRegistration::default(); + } + + let plugins_input = config.plugins_config_input(); + let plugins_manager = thread_manager.plugins_manager(); + let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await; + let skills_input = SkillsLoadInput::new( + config.cwd.clone(), + plugin_outcome.effective_plugin_skill_roots(), + config.config_layer_stack.clone(), + config.bundled_skills_enabled(), + ); + let roots = thread_manager + .skills_manager() + .skill_roots_for_config(&skills_input, Some(environment.get_filesystem())) + .await + .into_iter() + .map(|root| WatchPath { + path: root.path.into_path_buf(), + recursive: true, + }) + .collect(); + self.subscriber.register_paths(roots) + } + + fn spawn_event_loop( + rx: Receiver, + skills_manager: Arc, + outgoing: Arc, + ) { + let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); + let Ok(handle) = tokio::runtime::Handle::try_current() else { + warn!("skills watcher listener skipped: no Tokio runtime available"); + return; + }; + handle.spawn(async move { + while rx.recv().await.is_some() { + skills_manager.clear_cache(); + outgoing + .send_server_notification(ServerNotification::SkillsChanged( + SkillsChangedNotification {}, + )) + .await; + } + }); + } +} diff --git a/codex-rs/app-server/src/thread_state.rs b/codex-rs/app-server/src/thread_state.rs index dddbcf483b09..9b26f6ad9003 100644 --- a/codex-rs/app-server/src/thread_state.rs +++ b/codex-rs/app-server/src/thread_state.rs @@ -7,6 +7,7 @@ use codex_app_server_protocol::Turn; use codex_app_server_protocol::TurnError; use codex_core::CodexThread; use codex_core::ThreadConfigSnapshot; +use codex_core::file_watcher::WatchRegistration; use codex_protocol::ThreadId; use codex_protocol::protocol::EventMsg; use codex_protocol::protocol::RolloutItem; @@ -77,6 +78,7 @@ pub(crate) struct ThreadState { listener_command_tx: Option>, current_turn_history: ThreadHistoryBuilder, listener_thread: Option>, + watch_registration: WatchRegistration, } impl ThreadState { @@ -91,6 +93,7 @@ impl ThreadState { &mut self, cancel_tx: oneshot::Sender<()>, conversation: &Arc, + watch_registration: WatchRegistration, ) -> (mpsc::UnboundedReceiver, u64) { if let Some(previous) = self.cancel_tx.replace(cancel_tx) { let _ = previous.send(()); @@ -99,6 +102,7 @@ impl ThreadState { let (listener_command_tx, listener_command_rx) = mpsc::unbounded_channel(); self.listener_command_tx = Some(listener_command_tx); self.listener_thread = Some(Arc::downgrade(conversation)); + self.watch_registration = watch_registration; (listener_command_rx, self.listener_generation) } @@ -109,6 +113,7 @@ impl ThreadState { self.listener_command_tx = None; self.current_turn_history.reset(); self.listener_thread = None; + self.watch_registration = WatchRegistration::default(); } pub(crate) fn set_experimental_raw_events(&mut self, enabled: bool) { diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index b95adb9044d0..a0d48502fcaa 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -658,6 +658,27 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( let mut mcp = McpProcess::new(codex_home.path()).await?; timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + let initial_skills_request_id = mcp + .send_skills_list_request(SkillsListParams { + cwds: vec![codex_home.path().to_path_buf()], + force_reload: true, + per_cwd_extra_user_roots: None, + }) + .await?; + let initial_skills_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(initial_skills_request_id)), + ) + .await??; + let SkillsListResponse { data } = to_response(initial_skills_response)?; + assert_eq!(data.len(), 1); + assert!( + data[0] + .skills + .iter() + .any(|skill| { skill.name == "demo" && skill.description == "demo description" }) + ); + let thread_start_request_id = mcp .send_thread_start_request(ThreadStartParams { model: None, @@ -709,5 +730,25 @@ async fn skills_changed_notification_is_emitted_after_skill_change() -> Result<( let notification: SkillsChangedNotification = serde_json::from_value(params)?; assert_eq!(notification, SkillsChangedNotification {}); + let updated_skills_request_id = mcp + .send_skills_list_request(SkillsListParams { + cwds: vec![codex_home.path().to_path_buf()], + force_reload: false, + per_cwd_extra_user_roots: None, + }) + .await?; + let updated_skills_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(updated_skills_request_id)), + ) + .await??; + let SkillsListResponse { data } = to_response(updated_skills_response)?; + assert_eq!(data.len(), 1); + assert!( + data[0] + .skills + .iter() + .any(|skill| skill.name == "demo" && skill.description == "updated") + ); Ok(()) } diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 6aae36d2e7b7..3d65b0352a70 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -81,7 +81,6 @@ pub(crate) async fn run_codex_thread_interactive( skills_manager: Arc::clone(&parent_session.services.skills_manager), plugins_manager: Arc::clone(&parent_session.services.plugins_manager), mcp_manager: Arc::clone(&parent_session.services.mcp_manager), - skills_watcher: Arc::clone(&parent_session.services.skills_watcher), conversation_history: initial_history.unwrap_or(InitialHistory::New), session_source: SessionSource::SubAgent(subagent_source.clone()), agent_control: parent_session.services.agent_control.clone(), diff --git a/codex-rs/core/src/codex_thread.rs b/codex-rs/core/src/codex_thread.rs index 3d1ce3ea760b..7f71c812ff83 100644 --- a/codex-rs/core/src/codex_thread.rs +++ b/codex-rs/core/src/codex_thread.rs @@ -1,6 +1,5 @@ use crate::agent::AgentStatus; use crate::config::ConstraintResult; -use crate::file_watcher::WatchRegistration; use crate::goals::ExternalGoalSet; use crate::goals::GoalRuntimeEvent; use crate::session::Codex; @@ -31,6 +30,7 @@ use codex_protocol::protocol::SessionSource; use codex_protocol::protocol::Submission; use codex_protocol::protocol::ThreadMemoryMode; use codex_protocol::protocol::TokenUsageInfo; +use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::user_input::UserInput; use codex_thread_store::StoredThread; @@ -100,7 +100,6 @@ pub struct CodexThread { session_configured: SessionConfiguredEvent, rollout_path: Option, out_of_band_elicitation_count: Mutex, - _watch_registration: WatchRegistration, } /// Conduit for the bidirectional stream of messages that compose a thread @@ -111,7 +110,6 @@ impl CodexThread { session_configured: SessionConfiguredEvent, rollout_path: Option, session_source: SessionSource, - watch_registration: WatchRegistration, ) -> Self { Self { codex, @@ -119,7 +117,6 @@ impl CodexThread { session_configured, rollout_path, out_of_band_elicitation_count: Mutex::new(0), - _watch_registration: watch_registration, } } @@ -463,6 +460,10 @@ impl CodexThread { self.codex.session.get_config().await } + pub async fn environment_selections(&self) -> Vec { + self.codex.thread_environment_selections().await + } + pub async fn read_mcp_resource( &self, server: &str, diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index cbada9a26ced..76ffb93fbb57 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -106,7 +106,6 @@ pub(crate) use skills::manager; pub(crate) use skills::maybe_emit_implicit_skill_invocation; pub(crate) use skills::resolve_skill_dependencies_for_turn; pub(crate) use skills::skills_load_input_from_config; -mod skills_watcher; mod stream_events_utils; pub mod test_support; mod unified_exec; diff --git a/codex-rs/core/src/session/mod.rs b/codex-rs/core/src/session/mod.rs index 85f2ef438bab..2b5923c85a42 100644 --- a/codex-rs/core/src/session/mod.rs +++ b/codex-rs/core/src/session/mod.rs @@ -113,6 +113,7 @@ use codex_protocol::protocol::SubAgentSource; use codex_protocol::protocol::TurnAbortReason; use codex_protocol::protocol::TurnContextItem; use codex_protocol::protocol::TurnContextNetworkItem; +use codex_protocol::protocol::TurnEnvironmentSelection; use codex_protocol::protocol::W3cTraceContext; use codex_protocol::request_permissions::PermissionGrantScope; use codex_protocol::request_permissions::RequestPermissionProfile; @@ -280,8 +281,6 @@ use crate::rollout::map_session_init_error; use crate::session_startup_prewarm::SessionStartupPrewarmHandle; use crate::shell; use crate::shell_snapshot::ShellSnapshot; -use crate::skills_watcher::SkillsWatcher; -use crate::skills_watcher::SkillsWatcherEvent; use crate::state::ActiveTurn; use crate::state::MailboxDeliveryPhase; use crate::state::PendingRequestPermissions; @@ -392,7 +391,6 @@ pub(crate) struct CodexSpawnArgs { pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, - pub(crate) skills_watcher: Arc, pub(crate) conversation_history: InitialHistory, pub(crate) session_source: SessionSource, pub(crate) agent_control: AgentControl, @@ -454,7 +452,6 @@ impl Codex { skills_manager, plugins_manager, mcp_manager, - skills_watcher, conversation_history, session_source, agent_control, @@ -639,7 +636,6 @@ impl Codex { skills_manager, plugins_manager, mcp_manager.clone(), - skills_watcher, agent_control, environment_manager, analytics_events_client, @@ -774,6 +770,11 @@ impl Codex { state.session_configuration.thread_config_snapshot() } + pub(crate) async fn thread_environment_selections(&self) -> Vec { + let state = self.session.state.lock().await; + state.session_configuration.environments.clone() + } + pub(crate) fn state_db(&self) -> Option { self.session.state_db() } @@ -997,29 +998,6 @@ impl Session { self.out_of_band_elicitation_paused.send_replace(paused); } - fn start_skills_watcher_listener(self: &Arc) { - let mut rx = self.services.skills_watcher.subscribe(); - let weak_sess = Arc::downgrade(self); - tokio::spawn(async move { - loop { - match rx.recv().await { - Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { - let Some(sess) = weak_sess.upgrade() else { - break; - }; - let event = Event { - id: sess.next_internal_sub_id(), - msg: EventMsg::SkillsUpdateAvailable, - }; - sess.send_event_raw(event).await; - } - Err(tokio::sync::broadcast::error::RecvError::Closed) => break, - Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, - } - } - }); - } - pub(crate) fn get_tx_event(&self) -> Sender { self.tx_event.clone() } diff --git a/codex-rs/core/src/session/session.rs b/codex-rs/core/src/session/session.rs index 23621f87cfdf..71e16cb4c61a 100644 --- a/codex-rs/core/src/session/session.rs +++ b/codex-rs/core/src/session/session.rs @@ -339,7 +339,6 @@ impl Session { skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, - skills_watcher: Arc, agent_control: AgentControl, environment_manager: Arc, analytics_events_client: Option, @@ -816,7 +815,6 @@ impl Session { skills_manager, plugins_manager: Arc::clone(&plugins_manager), mcp_manager: Arc::clone(&mcp_manager), - skills_watcher, agent_control, network_proxy, network_approval: Arc::clone(&network_approval), @@ -901,8 +899,6 @@ impl Session { sess.send_event_raw(event).await; } - // Start the watcher after SessionConfigured so it cannot emit earlier events. - sess.start_skills_watcher_listener(); let mut required_mcp_servers: Vec = mcp_servers .iter() .filter(|(_, server)| server.enabled && server.required) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index f3a8d95107f6..215088d363d0 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -3522,7 +3522,6 @@ async fn session_new_fails_when_zsh_fork_enabled_without_zsh_path() { skills_manager, plugins_manager, mcp_manager, - Arc::new(SkillsWatcher::noop()), AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -3638,7 +3637,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { .expect("create environment"), ); - let skills_watcher = Arc::new(SkillsWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( &config.permissions.approval_policy, @@ -3674,7 +3672,6 @@ pub(crate) async fn make_session_and_context() -> (Session, TurnContext) { skills_manager, plugins_manager, mcp_manager, - skills_watcher, agent_control, network_proxy: None, network_approval: Arc::clone(&network_approval), @@ -3857,7 +3854,6 @@ async fn make_session_with_config_and_rx( skills_manager, plugins_manager, mcp_manager, - Arc::new(SkillsWatcher::noop()), AgentControl::default(), Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None, @@ -5153,7 +5149,6 @@ where ) .await .expect("state db should initialize"); - let skills_watcher = Arc::new(SkillsWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::new_uninitialized( &config.permissions.approval_policy, @@ -5189,7 +5184,6 @@ where skills_manager, plugins_manager, mcp_manager, - skills_watcher, agent_control, network_proxy: None, network_approval: Arc::clone(&network_approval), diff --git a/codex-rs/core/src/session/tests/guardian_tests.rs b/codex-rs/core/src/session/tests/guardian_tests.rs index d7ec5937d254..5b6df6288ed1 100644 --- a/codex-rs/core/src/session/tests/guardian_tests.rs +++ b/codex-rs/core/src/session/tests/guardian_tests.rs @@ -728,7 +728,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { /*bundled_skills_enabled*/ true, )); let mcp_manager = Arc::new(McpManager::new(Arc::clone(&plugins_manager))); - let skills_watcher = Arc::new(SkillsWatcher::noop()); let thread_store = Arc::new(codex_thread_store::LocalThreadStore::new( codex_thread_store::LocalThreadStoreConfig::from_config(&config), codex_state::StateRuntime::init( @@ -747,7 +746,6 @@ async fn guardian_subagent_does_not_inherit_parent_exec_policy_rules() { skills_manager, plugins_manager, mcp_manager, - skills_watcher, conversation_history: InitialHistory::New, session_source: SessionSource::SubAgent(SubAgentSource::Other( GUARDIAN_REVIEWER_NAME.to_string(), diff --git a/codex-rs/core/src/session/turn.rs b/codex-rs/core/src/session/turn.rs index 75f4762c9e78..6199af82d908 100644 --- a/codex-rs/core/src/session/turn.rs +++ b/codex-rs/core/src/session/turn.rs @@ -1508,7 +1508,6 @@ pub(super) fn realtime_text_for_event(msg: &EventMsg) -> Option { | EventMsg::McpListToolsResponse(_) | EventMsg::ListSkillsResponse(_) | EventMsg::RealtimeConversationListVoicesResponse(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::PlanUpdate(_) | EventMsg::TurnAborted(_) | EventMsg::ShutdownComplete diff --git a/codex-rs/core/src/skills_watcher.rs b/codex-rs/core/src/skills_watcher.rs deleted file mode 100644 index fb271ca87651..000000000000 --- a/codex-rs/core/src/skills_watcher.rs +++ /dev/null @@ -1,125 +0,0 @@ -//! Skills-specific watcher built on top of the generic [`FileWatcher`]. - -use std::path::PathBuf; -use std::sync::Arc; -use std::time::Duration; - -use tokio::runtime::Handle; -use tokio::sync::broadcast; -use tracing::warn; - -use crate::SkillsManager; -use crate::config::Config; -use crate::file_watcher::FileWatcher; -use crate::file_watcher::FileWatcherSubscriber; -use crate::file_watcher::Receiver; -use crate::file_watcher::ThrottledWatchReceiver; -use crate::file_watcher::WatchPath; -use crate::file_watcher::WatchRegistration; -use crate::skills_load_input_from_config; -use codex_core_plugins::PluginsManager; - -#[cfg(not(test))] -const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(10); -#[cfg(test)] -const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_millis(50); - -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum SkillsWatcherEvent { - SkillsChanged { paths: Vec }, -} - -pub(crate) struct SkillsWatcher { - subscriber: FileWatcherSubscriber, - tx: broadcast::Sender, -} - -impl SkillsWatcher { - pub(crate) fn new(file_watcher: &Arc) -> Self { - let (subscriber, rx) = file_watcher.add_subscriber(); - let (tx, _) = broadcast::channel(128); - let skills_watcher = Self { - subscriber, - tx: tx.clone(), - }; - Self::spawn_event_loop(rx, tx); - skills_watcher - } - - pub(crate) fn noop() -> Self { - Self::new(&Arc::new(FileWatcher::noop())) - } - - pub(crate) fn subscribe(&self) -> broadcast::Receiver { - self.tx.subscribe() - } - - pub(crate) async fn register_config( - &self, - config: &Config, - skills_manager: &SkillsManager, - plugins_manager: &PluginsManager, - fs: Option>, - ) -> WatchRegistration { - let plugins_input = config.plugins_config_input(); - let plugin_outcome = plugins_manager.plugins_for_config(&plugins_input).await; - let effective_skill_roots = plugin_outcome.effective_plugin_skill_roots(); - let skills_input = skills_load_input_from_config(config, effective_skill_roots); - let roots = skills_manager - .skill_roots_for_config(&skills_input, fs) - .await - .into_iter() - .map(|root| WatchPath { - path: root.path.into_path_buf(), - recursive: true, - }) - .collect(); - self.subscriber.register_paths(roots) - } - - fn spawn_event_loop(rx: Receiver, tx: broadcast::Sender) { - let mut rx = ThrottledWatchReceiver::new(rx, WATCHER_THROTTLE_INTERVAL); - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - while let Some(event) = rx.recv().await { - let _ = tx.send(SkillsWatcherEvent::SkillsChanged { paths: event.paths }); - } - }); - } else { - warn!("skills watcher listener skipped: no Tokio runtime available"); - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - use pretty_assertions::assert_eq; - use tokio::time::Duration; - use tokio::time::timeout; - - #[tokio::test] - async fn forwards_file_watcher_events() { - let file_watcher = Arc::new(FileWatcher::noop()); - let skills_watcher = SkillsWatcher::new(&file_watcher); - let mut rx = skills_watcher.subscribe(); - let _registration = skills_watcher - .subscriber - .register_path(PathBuf::from("/tmp/skill"), /*recursive*/ true); - - file_watcher - .send_paths_for_test(vec![PathBuf::from("/tmp/skill/SKILL.md")]) - .await; - - let event = timeout(Duration::from_secs(2), rx.recv()) - .await - .expect("skills watcher event") - .expect("broadcast recv"); - assert_eq!( - event, - SkillsWatcherEvent::SkillsChanged { - paths: vec![PathBuf::from("/tmp/skill/SKILL.md")], - } - ); - } -} diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index 9cd9e97fbba7..64a1810740c8 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -9,7 +9,6 @@ use crate::exec_policy::ExecPolicyManager; use crate::guardian::GuardianRejection; use crate::guardian::GuardianRejectionCircuitBreaker; use crate::mcp::McpManager; -use crate::skills_watcher::SkillsWatcher; use crate::tools::code_mode::CodeModeService; use crate::tools::network_approval::NetworkApprovalService; use crate::tools::sandboxing::ApprovalStore; @@ -59,7 +58,6 @@ pub(crate) struct SessionServices { pub(crate) skills_manager: Arc, pub(crate) plugins_manager: Arc, pub(crate) mcp_manager: Arc, - pub(crate) skills_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) network_proxy: Option, pub(crate) network_approval: Arc, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 06f8118db7a2..cb4538ad0c21 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -5,7 +5,6 @@ use crate::config::Config; use crate::config::ThreadStoreConfig; use crate::environment_selection::default_thread_environment_selections; use crate::environment_selection::resolve_environment_selections; -use crate::file_watcher::FileWatcher; use crate::mcp::McpManager; use crate::rollout::RolloutRecorder; use crate::rollout::truncation; @@ -14,8 +13,6 @@ use crate::session::CodexSpawnArgs; use crate::session::CodexSpawnOk; use crate::session::INITIAL_SUBMIT_ID; use crate::shell_snapshot::ShellSnapshot; -use crate::skills_watcher::SkillsWatcher; -use crate::skills_watcher::SkillsWatcherEvent; use crate::tasks::InterruptedTurnHistoryMarker; use crate::tasks::interrupted_turn_history_marker; use codex_agent_graph_store::AgentGraphStore; @@ -72,8 +69,6 @@ use std::sync::Arc; use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::time::Duration; -use tokio::runtime::Handle; -use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; use tokio::sync::broadcast; use tracing::warn; @@ -107,47 +102,6 @@ impl Drop for TempCodexHomeGuard { } } -fn build_skills_watcher(skills_manager: Arc) -> Arc { - if should_use_test_thread_manager_behavior() - && let Ok(handle) = Handle::try_current() - && handle.runtime_flavor() == RuntimeFlavor::CurrentThread - { - // The real watcher spins background tasks that can starve the - // current-thread test runtime and cause event waits to time out. - warn!("using noop skills watcher under current-thread test runtime"); - return Arc::new(SkillsWatcher::noop()); - } - - let file_watcher = match FileWatcher::new() { - Ok(file_watcher) => Arc::new(file_watcher), - Err(err) => { - warn!("failed to initialize file watcher: {err}"); - Arc::new(FileWatcher::noop()) - } - }; - let skills_watcher = Arc::new(SkillsWatcher::new(&file_watcher)); - - let mut rx = skills_watcher.subscribe(); - let skills_manager = Arc::clone(&skills_manager); - if let Ok(handle) = Handle::try_current() { - handle.spawn(async move { - loop { - match rx.recv().await { - Ok(SkillsWatcherEvent::SkillsChanged { .. }) => { - skills_manager.clear_cache(); - } - Err(broadcast::error::RecvError::Closed) => break, - Err(broadcast::error::RecvError::Lagged(_)) => continue, - } - } - }); - } else { - warn!("skills watcher listener skipped: no Tokio runtime available"); - } - - skills_watcher -} - /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewThread { @@ -247,7 +201,6 @@ pub(crate) struct ThreadManagerState { skills_manager: Arc, plugins_manager: Arc, mcp_manager: Arc, - skills_watcher: Arc, thread_store: Arc, state_db: StateDbHandle, agent_graph_store: Arc, @@ -329,7 +282,6 @@ impl ThreadManager { config.bundled_skills_enabled(), restriction_product, )); - let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), @@ -339,7 +291,6 @@ impl ThreadManager { skills_manager, plugins_manager, mcp_manager, - skills_watcher, thread_store, state_db, agent_graph_store, @@ -431,7 +382,6 @@ impl ThreadManager { /*bundled_skills_enabled*/ true, restriction_product, )); - let skills_watcher = build_skills_watcher(Arc::clone(&skills_manager)); // This test constructor has no Config input. Tests that need a non-local // process store should construct ThreadManager::new with an explicit store. let thread_store: Arc = Arc::new(LocalThreadStore::new( @@ -452,7 +402,6 @@ impl ThreadManager { skills_manager, plugins_manager, mcp_manager, - skills_watcher, thread_store, state_db, agent_graph_store, @@ -1172,19 +1121,6 @@ impl ThreadManagerState { } let environment_selections = resolve_environment_selections(self.environment_manager.as_ref(), &environments)?; - let watch_registration = match environment_selections.primary() { - Some(turn_environment) if !turn_environment.environment.is_remote() => { - self.skills_watcher - .register_config( - &config, - self.skills_manager.as_ref(), - self.plugins_manager.as_ref(), - Some(turn_environment.environment.get_filesystem()), - ) - .await - } - Some(_) | None => crate::file_watcher::WatchRegistration::default(), - }; let parent_rollout_thread_trace = self .parent_rollout_thread_trace_for_source(&session_source, &initial_history) .await; @@ -1199,7 +1135,6 @@ impl ThreadManagerState { skills_manager: Arc::clone(&self.skills_manager), plugins_manager: Arc::clone(&self.plugins_manager), mcp_manager: Arc::clone(&self.mcp_manager), - skills_watcher: Arc::clone(&self.skills_watcher), conversation_history: initial_history, session_source, agent_control, @@ -1218,7 +1153,7 @@ impl ThreadManagerState { }) .await?; let new_thread = self - .finalize_thread_spawn(codex, thread_id, tracked_session_source, watch_registration) + .finalize_thread_spawn(codex, thread_id, tracked_session_source) .await?; if is_resumed_thread && let Err(err) = new_thread.thread.apply_goal_resume_runtime_effects().await @@ -1233,7 +1168,6 @@ impl ThreadManagerState { codex: Codex, thread_id: ThreadId, session_source: SessionSource, - watch_registration: crate::file_watcher::WatchRegistration, ) -> CodexResult { let event = codex.next_event().await?; let session_configured = match event { @@ -1254,7 +1188,6 @@ impl ThreadManagerState { session_configured.clone(), session_configured.rollout_path.clone(), session_source, - watch_registration, )); e.insert(thread.clone()); return Ok(NewThread { diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs deleted file mode 100644 index c422073e4f29..000000000000 --- a/codex-rs/core/tests/suite/live_reload.rs +++ /dev/null @@ -1,157 +0,0 @@ -#![allow(clippy::expect_used, clippy::unwrap_used)] - -use std::fs; -use std::path::Path; -use std::path::PathBuf; -use std::time::Duration; - -use anyhow::Result; -use codex_config::config_toml::ProjectConfig; -use codex_protocol::config_types::TrustLevel; -use codex_protocol::models::PermissionProfile; -use codex_protocol::protocol::AskForApproval; -use codex_protocol::protocol::EventMsg; -use codex_protocol::protocol::Op; -use codex_protocol::user_input::UserInput; -use core_test_support::responses; -use core_test_support::responses::ResponsesRequest; -use core_test_support::responses::mount_sse_sequence; -use core_test_support::responses::start_mock_server; -use core_test_support::test_codex::TestCodex; -use core_test_support::test_codex::test_codex; -use core_test_support::test_codex::turn_permission_fields; -use core_test_support::wait_for_event; -use tokio::time::timeout; - -fn enable_trusted_project(config: &mut codex_core::config::Config) { - config.active_project = ProjectConfig { - trust_level: Some(TrustLevel::Trusted), - }; -} - -fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf { - let skill_dir = home.join("skills").join(name); - fs::create_dir_all(&skill_dir).expect("create skill dir"); - let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n"); - let path = skill_dir.join("SKILL.md"); - fs::write(&path, contents).expect("write skill"); - path -} - -fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool { - request - .message_input_texts("user") - .iter() - .any(|text| text.contains(skill_body) && text.contains("")) -} - -async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> { - let session_model = test.session_configured.model.clone(); - let (sandbox_policy, permission_profile) = - turn_permission_fields(PermissionProfile::Disabled, test.cwd_path()); - test.codex - .submit(Op::UserTurn { - environments: None, - items: vec![ - UserInput::Text { - text: prompt.to_string(), - text_elements: Vec::new(), - }, - UserInput::Skill { - name: "demo".to_string(), - path: skill_path, - }, - ], - final_output_json_schema: None, - cwd: test.cwd_path().to_path_buf(), - approval_policy: AskForApproval::Never, - approvals_reviewer: None, - sandbox_policy, - permission_profile, - model: session_model, - effort: None, - summary: None, - service_tier: None, - collaboration_mode: None, - personality: None, - }) - .await?; - - wait_for_event(test.codex.as_ref(), |event| { - matches!(event, EventMsg::TurnComplete(_)) - }) - .await; - Ok(()) -} - -#[tokio::test(flavor = "multi_thread", worker_threads = 2)] -async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> { - let server = start_mock_server().await; - let responses = mount_sse_sequence( - &server, - vec![ - responses::sse(vec![responses::ev_completed("resp-1")]), - responses::sse(vec![responses::ev_completed("resp-2")]), - ], - ) - .await; - - let skill_v1 = "skill body v1"; - let skill_v2 = "skill body v2"; - let mut builder = test_codex() - .with_pre_build_hook(move |home| { - write_skill(home, "demo", "demo skill", skill_v1); - }) - .with_config(|config| { - enable_trusted_project(config); - }); - let test = builder.build(&server).await?; - - let skill_path = dunce::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?; - - submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?; - let first_request = responses - .requests() - .first() - .cloned() - .expect("first request captured"); - assert!( - contains_skill_body(&first_request, skill_v1), - "expected initial skill body in request" - ); - - write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); - - let saw_skills_update = timeout(Duration::from_secs(5), async { - loop { - match test.codex.next_event().await { - Ok(event) => { - if matches!(event.msg, EventMsg::SkillsUpdateAvailable) { - break; - } - } - Err(err) => panic!("event stream ended unexpectedly: {err}"), - } - } - }) - .await; - - if saw_skills_update.is_err() { - // Some environments do not reliably surface file watcher events for - // skill changes. Clear the cache explicitly so we can still validate - // that the updated skill body is injected on the next turn. - test.thread_manager.skills_manager().clear_cache(); - } - - submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?; - let last_request = responses - .last_request() - .expect("request captured after skill update"); - - assert!( - contains_skill_body(&last_request, skill_v2), - "expected updated skill body after reload" - ); - - Ok(()) -} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index ad3280ebf080..a914af3021f7 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -57,7 +57,6 @@ mod image_rollout; mod items; mod json_result; mod live_cli; -mod live_reload; mod model_overrides; mod model_switching; mod model_visible_layout; diff --git a/codex-rs/mcp-server/src/codex_tool_runner.rs b/codex-rs/mcp-server/src/codex_tool_runner.rs index 4ec8c514340e..2112dfcba4ba 100644 --- a/codex-rs/mcp-server/src/codex_tool_runner.rs +++ b/codex-rs/mcp-server/src/codex_tool_runner.rs @@ -365,7 +365,6 @@ async fn run_codex_tool_session_inner( | EventMsg::AgentMessageContentDelta(_) | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::ExitedReviewMode(_) | EventMsg::RequestUserInput(_) | EventMsg::RequestPermissions(_) diff --git a/codex-rs/protocol/src/protocol.rs b/codex-rs/protocol/src/protocol.rs index ab294ab157c9..22de50a159b3 100644 --- a/codex-rs/protocol/src/protocol.rs +++ b/codex-rs/protocol/src/protocol.rs @@ -1451,9 +1451,6 @@ pub enum EventMsg { /// List of voices supported by realtime conversation streams. RealtimeConversationListVoicesResponse(RealtimeConversationListVoicesResponseEvent), - /// Notification that skill data may have been updated and clients may want to reload. - SkillsUpdateAvailable, - PlanUpdate(UpdatePlanArgs), TurnAborted(TurnAbortedEvent), diff --git a/codex-rs/rollout-trace/src/protocol_event.rs b/codex-rs/rollout-trace/src/protocol_event.rs index 9132ba91e7c1..09b0bc18f671 100644 --- a/codex-rs/rollout-trace/src/protocol_event.rs +++ b/codex-rs/rollout-trace/src/protocol_event.rs @@ -263,7 +263,6 @@ pub(crate) fn tool_runtime_trace_event(event: &EventMsg) -> Option Option<&'static s | EventMsg::McpListToolsResponse(_) | EventMsg::ListSkillsResponse(_) | EventMsg::RealtimeConversationListVoicesResponse(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::PlanUpdate(_) | EventMsg::EnteredReviewMode(_) | EventMsg::ExitedReviewMode(_) diff --git a/codex-rs/rollout/src/policy.rs b/codex-rs/rollout/src/policy.rs index 6aefff748017..d99c94333e7f 100644 --- a/codex-rs/rollout/src/policy.rs +++ b/codex-rs/rollout/src/policy.rs @@ -172,7 +172,6 @@ fn event_msg_persistence_mode(ev: &EventMsg) -> Option { | EventMsg::ReasoningContentDelta(_) | EventMsg::ReasoningRawContentDelta(_) | EventMsg::ImageGenerationBegin(_) - | EventMsg::SkillsUpdateAvailable | EventMsg::CollabAgentSpawnBegin(_) | EventMsg::CollabAgentInteractionBegin(_) | EventMsg::CollabWaitingBegin(_) From 86a5b7fca0d54927a918c63b22c5a0a5aad70417 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Wed, 6 May 2026 15:28:23 -0700 Subject: [PATCH 2/2] codex: fix CI failure on PR #21287 --- codex-rs/core/src/session/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/codex-rs/core/src/session/tests.rs b/codex-rs/core/src/session/tests.rs index 33e66a6c9002..72ce91c7a91f 100644 --- a/codex-rs/core/src/session/tests.rs +++ b/codex-rs/core/src/session/tests.rs @@ -4039,7 +4039,6 @@ async fn make_session_with_history_source_and_agent_control_and_rx( skills_manager, plugins_manager, mcp_manager, - Arc::new(SkillsWatcher::noop()), agent_control, Arc::new(codex_exec_server::EnvironmentManager::default_for_tests()), /*analytics_events_client*/ None,