diff --git a/codex-rs/Cargo.lock b/codex-rs/Cargo.lock index 43b4b295a82..8f57a9bf580 100644 --- a/codex-rs/Cargo.lock +++ b/codex-rs/Cargo.lock @@ -1564,6 +1564,7 @@ dependencies = [ "libc", "maplit", "multimap", + "notify", "once_cell", "openssl-sys", "opentelemetry_sdk", diff --git a/codex-rs/core/Cargo.toml b/codex-rs/core/Cargo.toml index f3ba5fd2cbb..d686343ae72 100644 --- a/codex-rs/core/Cargo.toml +++ b/codex-rs/core/Cargo.toml @@ -58,6 +58,7 @@ indoc = { workspace = true } keyring = { workspace = true, features = ["crypto-rust"] } libc = { workspace = true } multimap = { workspace = true } +notify = { workspace = true } once_cell = { workspace = true } os_info = { workspace = true } rand = { workspace = true } diff --git a/codex-rs/core/src/codex.rs b/codex-rs/core/src/codex.rs index b7c04040568..e408d82a346 100644 --- a/codex-rs/core/src/codex.rs +++ b/codex-rs/core/src/codex.rs @@ -115,6 +115,8 @@ use crate::error::Result as CodexResult; use crate::exec::StreamOutput; use crate::exec_policy::ExecPolicyUpdateError; use crate::feedback_tags; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherEvent; use crate::git_info::get_git_repo_root; use crate::instructions::UserInstructions; use crate::mcp::CODEX_APPS_MCP_SERVER_NAME; @@ -248,6 +250,7 @@ impl Codex { auth_manager: Arc, models_manager: Arc, skills_manager: Arc, + file_watcher: Arc, conversation_history: InitialHistory, session_source: SessionSource, agent_control: AgentControl, @@ -384,6 +387,7 @@ impl Codex { conversation_history, session_source_clone, skills_manager, + file_watcher, agent_control, ) .instrument(session_init_span) @@ -646,6 +650,29 @@ impl Session { state.session_configuration.codex_home().clone() } + fn start_file_watcher_listener(self: &Arc) { + let mut rx = self.services.file_watcher.subscribe(); + let weak_sess = Arc::downgrade(self); + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + let Some(sess) = weak_sess.upgrade() else { + break; + }; + let event = Event { + id: sess.next_internal_sub_id(), + msg: EventMsg::SkillsUpdateAvailable, + }; + sess.send_event_raw(event).await; + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } + #[allow(clippy::too_many_arguments)] fn make_turn_context( auth_manager: Option>, @@ -717,6 +744,7 @@ impl Session { initial_history: InitialHistory, session_source: SessionSource, skills_manager: Arc, + file_watcher: Arc, agent_control: AgentControl, ) -> anyhow::Result> { debug!( @@ -916,6 +944,7 @@ impl Session { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: state_db_ctx.clone(), transport_manager: TransportManager::new(), @@ -959,6 +988,9 @@ impl Session { sess.send_event_raw(event).await; } + // Start the watcher after SessionConfigured so it cannot emit earlier events. + sess.start_file_watcher_listener(); + // Construct sandbox_state before initialize() so it can be sent to each // MCP server immediately after it becomes ready (avoiding blocking). let sandbox_state = SandboxState { @@ -5430,6 +5462,7 @@ mod tests { mark_state_initial_context_seeded(&mut state); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); + let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), @@ -5448,6 +5481,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: None, transport_manager: TransportManager::new(), @@ -5550,6 +5584,7 @@ mod tests { mark_state_initial_context_seeded(&mut state); let skills_manager = Arc::new(SkillsManager::new(config.codex_home.clone())); + let file_watcher = Arc::new(FileWatcher::noop()); let services = SessionServices { mcp_connection_manager: Arc::new(RwLock::new(McpConnectionManager::default())), mcp_startup_cancellation_token: Mutex::new(CancellationToken::new()), @@ -5568,6 +5603,7 @@ mod tests { models_manager: Arc::clone(&models_manager), tool_approvals: Mutex::new(ApprovalStore::default()), skills_manager, + file_watcher, agent_control, state_db: None, transport_manager: TransportManager::new(), @@ -5738,7 +5774,7 @@ mod tests { } #[tokio::test] - async fn abort_gracefuly_emits_turn_aborted_only() { + async fn abort_gracefully_emits_turn_aborted_only() { let (sess, tc, rx) = make_session_and_context_with_rx().await; let input = vec![UserInput::Text { text: "hello".to_string(), diff --git a/codex-rs/core/src/codex_delegate.rs b/codex-rs/core/src/codex_delegate.rs index 6499370fc36..cb5a31a0cc5 100644 --- a/codex-rs/core/src/codex_delegate.rs +++ b/codex-rs/core/src/codex_delegate.rs @@ -54,6 +54,7 @@ pub(crate) async fn run_codex_thread_interactive( auth_manager, models_manager, Arc::clone(&parent_session.services.skills_manager), + Arc::clone(&parent_session.services.file_watcher), initial_history.unwrap_or(InitialHistory::New), SessionSource::SubAgent(SubAgentSource::Review), parent_session.services.agent_control.clone(), diff --git a/codex-rs/core/src/file_watcher.rs b/codex-rs/core/src/file_watcher.rs new file mode 100644 index 00000000000..afda5b6086d --- /dev/null +++ b/codex-rs/core/src/file_watcher.rs @@ -0,0 +1,407 @@ +//! Watches skill roots for changes and broadcasts coarse-grained +//! `FileWatcherEvent`s that higher-level components react to on the next turn. + +use std::collections::HashMap; +use std::collections::HashSet; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; +use std::sync::Mutex; +use std::sync::RwLock; +use std::time::Duration; + +use notify::Event; +use notify::RecommendedWatcher; +use notify::RecursiveMode; +use notify::Watcher; +use tokio::runtime::Handle; +use tokio::sync::broadcast; +use tokio::sync::mpsc; +use tokio::time::Instant; +use tokio::time::sleep_until; +use tracing::warn; + +use crate::config::Config; +use crate::skills::loader::skill_roots_from_layer_stack_with_agents; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum FileWatcherEvent { + SkillsChanged { paths: Vec }, +} + +struct WatchState { + skills_roots: HashSet, +} + +struct FileWatcherInner { + watcher: RecommendedWatcher, + watched_paths: HashMap, +} + +const WATCHER_THROTTLE_INTERVAL: Duration = Duration::from_secs(1); + +/// Coalesces bursts of paths and emits at most once per interval. +struct ThrottledPaths { + pending: HashSet, + next_allowed_at: Instant, +} + +impl ThrottledPaths { + fn new(now: Instant) -> Self { + Self { + pending: HashSet::new(), + next_allowed_at: now, + } + } + + fn add(&mut self, paths: Vec) { + self.pending.extend(paths); + } + + fn next_deadline(&self, now: Instant) -> Option { + (!self.pending.is_empty() && now < self.next_allowed_at).then_some(self.next_allowed_at) + } + + fn take_ready(&mut self, now: Instant) -> Option> { + if self.pending.is_empty() || now < self.next_allowed_at { + return None; + } + Some(self.take_with_next_allowed(now)) + } + + fn take_pending(&mut self, now: Instant) -> Option> { + if self.pending.is_empty() { + return None; + } + Some(self.take_with_next_allowed(now)) + } + + fn take_with_next_allowed(&mut self, now: Instant) -> Vec { + let mut paths: Vec = self.pending.drain().collect(); + paths.sort_unstable_by(|a, b| a.as_os_str().cmp(b.as_os_str())); + self.next_allowed_at = now + WATCHER_THROTTLE_INTERVAL; + paths + } +} + +pub(crate) struct FileWatcher { + inner: Option>, + state: Arc>, + tx: broadcast::Sender, +} + +impl FileWatcher { + pub(crate) fn new(_codex_home: PathBuf) -> notify::Result { + let (raw_tx, raw_rx) = mpsc::unbounded_channel(); + let raw_tx_clone = raw_tx; + let watcher = notify::recommended_watcher(move |res| { + let _ = raw_tx_clone.send(res); + })?; + let inner = FileWatcherInner { + watcher, + watched_paths: HashMap::new(), + }; + let (tx, _) = broadcast::channel(128); + let state = Arc::new(RwLock::new(WatchState { + skills_roots: HashSet::new(), + })); + let file_watcher = Self { + inner: Some(Mutex::new(inner)), + state: Arc::clone(&state), + tx: tx.clone(), + }; + file_watcher.spawn_event_loop(raw_rx, state, tx); + Ok(file_watcher) + } + + pub(crate) fn noop() -> Self { + let (tx, _) = broadcast::channel(1); + Self { + inner: None, + state: Arc::new(RwLock::new(WatchState { + skills_roots: HashSet::new(), + })), + tx, + } + } + + pub(crate) fn subscribe(&self) -> broadcast::Receiver { + self.tx.subscribe() + } + + pub(crate) fn register_config(&self, config: &Config) { + let roots = + skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); + for root in roots { + self.register_skills_root(root.path); + } + } + + // Bridge `notify`'s callback-based events into the Tokio runtime and + // broadcast coarse-grained change signals to subscribers. + fn spawn_event_loop( + &self, + mut raw_rx: mpsc::UnboundedReceiver>, + state: Arc>, + tx: broadcast::Sender, + ) { + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + let now = Instant::now(); + let mut skills = ThrottledPaths::new(now); + + loop { + let now = Instant::now(); + let next_deadline = skills.next_deadline(now); + let timer_deadline = next_deadline + .unwrap_or_else(|| now + Duration::from_secs(60 * 60 * 24 * 365)); + let timer = sleep_until(timer_deadline); + tokio::pin!(timer); + + tokio::select! { + res = raw_rx.recv() => { + match res { + Some(Ok(event)) => { + let skills_paths = classify_event(&event, &state); + let now = Instant::now(); + skills.add(skills_paths); + + if let Some(paths) = skills.take_ready(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + } + Some(Err(err)) => { + warn!("file watcher error: {err}"); + } + None => { + // Flush any pending changes before shutdown so subscribers + // see the latest state. + let now = Instant::now(); + if let Some(paths) = skills.take_pending(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + break; + } + } + } + _ = &mut timer => { + let now = Instant::now(); + if let Some(paths) = skills.take_ready(now) { + let _ = tx.send(FileWatcherEvent::SkillsChanged { paths }); + } + } + } + } + }); + } else { + warn!("file watcher loop skipped: no Tokio runtime available"); + } + } + + fn register_skills_root(&self, root: PathBuf) { + { + let mut state = match self.state.write() { + Ok(state) => state, + Err(err) => err.into_inner(), + }; + state.skills_roots.insert(root.clone()); + } + self.watch_path(root, RecursiveMode::Recursive); + } + + fn watch_path(&self, path: PathBuf, mode: RecursiveMode) { + let Some(inner) = &self.inner else { + return; + }; + if !path.exists() { + return; + } + let watch_path = path; + let mut guard = match inner.lock() { + Ok(guard) => guard, + Err(err) => err.into_inner(), + }; + if let Some(existing) = guard.watched_paths.get(&watch_path) { + if *existing == RecursiveMode::Recursive || *existing == mode { + return; + } + if let Err(err) = guard.watcher.unwatch(&watch_path) { + warn!("failed to unwatch {}: {err}", watch_path.display()); + } + } + if let Err(err) = guard.watcher.watch(&watch_path, mode) { + warn!("failed to watch {}: {err}", watch_path.display()); + return; + } + guard.watched_paths.insert(watch_path, mode); + } +} + +fn classify_event(event: &Event, state: &RwLock) -> Vec { + let mut skills_paths = Vec::new(); + let skills_roots = match state.read() { + Ok(state) => state.skills_roots.clone(), + Err(err) => { + let state = err.into_inner(); + state.skills_roots.clone() + } + }; + + for path in &event.paths { + if is_skills_path(path, &skills_roots) { + skills_paths.push(path.clone()); + } + } + + skills_paths +} + +fn is_skills_path(path: &Path, roots: &HashSet) -> bool { + roots.iter().any(|root| path.starts_with(root)) +} + +#[cfg(test)] +mod tests { + use super::*; + use notify::EventKind; + use pretty_assertions::assert_eq; + use tokio::time::timeout; + + fn path(name: &str) -> PathBuf { + PathBuf::from(name) + } + + fn notify_event(paths: Vec) -> Event { + let mut event = Event::new(EventKind::Any); + for path in paths { + event = event.add_path(path); + } + event + } + + #[test] + fn throttles_and_coalesces_within_interval() { + let start = Instant::now(); + let mut throttled = ThrottledPaths::new(start); + + throttled.add(vec![path("a")]); + let first = throttled.take_ready(start).expect("first emit"); + assert_eq!(first, vec![path("a")]); + + throttled.add(vec![path("b"), path("c")]); + assert_eq!(throttled.take_ready(start), None); + + let second = throttled + .take_ready(start + WATCHER_THROTTLE_INTERVAL) + .expect("coalesced emit"); + assert_eq!(second, vec![path("b"), path("c")]); + } + + #[test] + fn flushes_pending_on_shutdown() { + let start = Instant::now(); + let mut throttled = ThrottledPaths::new(start); + + throttled.add(vec![path("a")]); + let _ = throttled.take_ready(start).expect("first emit"); + + throttled.add(vec![path("b")]); + assert_eq!(throttled.take_ready(start), None); + + let flushed = throttled + .take_pending(start) + .expect("shutdown flush emits pending paths"); + assert_eq!(flushed, vec![path("b")]); + } + + #[test] + fn classify_event_filters_to_skills_roots() { + let root = path("/tmp/skills"); + let state = RwLock::new(WatchState { + skills_roots: HashSet::from([root.clone()]), + }); + let event = notify_event(vec![ + root.join("demo/SKILL.md"), + path("/tmp/other/not-a-skill.txt"), + ]); + + let classified = classify_event(&event, &state); + assert_eq!(classified, vec![root.join("demo/SKILL.md")]); + } + + #[test] + fn classify_event_supports_multiple_roots_without_prefix_false_positives() { + let root_a = path("/tmp/skills"); + let root_b = path("/tmp/workspace/.codex/skills"); + let state = RwLock::new(WatchState { + skills_roots: HashSet::from([root_a.clone(), root_b.clone()]), + }); + let event = notify_event(vec![ + root_a.join("alpha/SKILL.md"), + path("/tmp/skills-extra/not-under-skills.txt"), + root_b.join("beta/SKILL.md"), + ]); + + let classified = classify_event(&event, &state); + assert_eq!( + classified, + vec![root_a.join("alpha/SKILL.md"), root_b.join("beta/SKILL.md")] + ); + } + + #[test] + fn register_skills_root_dedupes_state_entries() { + let watcher = FileWatcher::noop(); + let root = path("/tmp/skills"); + watcher.register_skills_root(root.clone()); + watcher.register_skills_root(root); + watcher.register_skills_root(path("/tmp/other-skills")); + + let state = watcher.state.read().expect("state lock"); + assert_eq!(state.skills_roots.len(), 2); + } + + #[tokio::test] + async fn spawn_event_loop_flushes_pending_changes_on_shutdown() { + let watcher = FileWatcher::noop(); + let root = path("/tmp/skills"); + { + let mut state = watcher.state.write().expect("state lock"); + state.skills_roots.insert(root.clone()); + } + + let (raw_tx, raw_rx) = mpsc::unbounded_channel(); + let (tx, mut rx) = broadcast::channel(8); + watcher.spawn_event_loop(raw_rx, Arc::clone(&watcher.state), tx); + + raw_tx + .send(Ok(notify_event(vec![root.join("a/SKILL.md")]))) + .expect("send first event"); + let first = timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("first watcher event") + .expect("broadcast recv first"); + assert_eq!( + first, + FileWatcherEvent::SkillsChanged { + paths: vec![root.join("a/SKILL.md")] + } + ); + + raw_tx + .send(Ok(notify_event(vec![root.join("b/SKILL.md")]))) + .expect("send second event"); + drop(raw_tx); + + let second = timeout(Duration::from_secs(2), rx.recv()) + .await + .expect("second watcher event") + .expect("broadcast recv second"); + assert_eq!( + second, + FileWatcherEvent::SkillsChanged { + paths: vec![root.join("b/SKILL.md")] + } + ); + } +} diff --git a/codex-rs/core/src/lib.rs b/codex-rs/core/src/lib.rs index c9d67e19f15..064cab9b978 100644 --- a/codex-rs/core/src/lib.rs +++ b/codex-rs/core/src/lib.rs @@ -32,6 +32,7 @@ pub mod exec; pub mod exec_env; mod exec_policy; pub mod features; +mod file_watcher; mod flags; pub mod git_info; pub mod instructions; @@ -136,6 +137,7 @@ pub use command_safety::is_safe_command; pub use exec_policy::ExecPolicyError; pub use exec_policy::check_execpolicy_for_warnings; pub use exec_policy::load_exec_policy; +pub use file_watcher::FileWatcherEvent; pub use safety::get_platform_sandbox; pub use tools::spec::parse_tool_input_schema; // Re-export the protocol types from the standalone `codex-protocol` crate so existing diff --git a/codex-rs/core/src/skills/manager.rs b/codex-rs/core/src/skills/manager.rs index 85e0bf20ebd..61dcd98f525 100644 --- a/codex-rs/core/src/skills/manager.rs +++ b/codex-rs/core/src/skills/manager.rs @@ -6,6 +6,7 @@ use std::sync::RwLock; use codex_utils_absolute_path::AbsolutePathBuf; use toml::Value as TomlValue; +use tracing::info; use tracing::warn; use crate::config::Config; @@ -51,14 +52,11 @@ impl SkillsManager { skill_roots_from_layer_stack_with_agents(&config.config_layer_stack, &config.cwd); let mut outcome = load_skills_from_roots(roots); outcome.disabled_paths = disabled_paths_from_stack(&config.config_layer_stack); - match self.cache_by_cwd.write() { - Ok(mut cache) => { - cache.insert(cwd.to_path_buf(), outcome.clone()); - } - Err(err) => { - err.into_inner().insert(cwd.to_path_buf(), outcome.clone()); - } - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert(cwd.to_path_buf(), outcome.clone()); outcome } @@ -109,22 +107,22 @@ impl SkillsManager { let roots = skill_roots_from_layer_stack_with_agents(&config_layer_stack, cwd); let mut outcome = load_skills_from_roots(roots); outcome.disabled_paths = disabled_paths_from_stack(&config_layer_stack); - match self.cache_by_cwd.write() { - Ok(mut cache) => { - cache.insert(cwd.to_path_buf(), outcome.clone()); - } - Err(err) => { - err.into_inner().insert(cwd.to_path_buf(), outcome.clone()); - } - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + cache.insert(cwd.to_path_buf(), outcome.clone()); outcome } pub fn clear_cache(&self) { - match self.cache_by_cwd.write() { - Ok(mut cache) => cache.clear(), - Err(err) => err.into_inner().clear(), - } + let mut cache = match self.cache_by_cwd.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + let cleared = cache.len(); + cache.clear(); + info!("skills cache cleared ({cleared} entries)"); } } diff --git a/codex-rs/core/src/state/service.rs b/codex-rs/core/src/state/service.rs index d7788f71cb1..e9a028bfd56 100644 --- a/codex-rs/core/src/state/service.rs +++ b/codex-rs/core/src/state/service.rs @@ -5,6 +5,7 @@ use crate::RolloutRecorder; use crate::agent::AgentControl; use crate::analytics_client::AnalyticsEventsClient; use crate::exec_policy::ExecPolicyManager; +use crate::file_watcher::FileWatcher; use crate::mcp_connection_manager::McpConnectionManager; use crate::models_manager::manager::ModelsManager; use crate::skills::SkillsManager; @@ -33,6 +34,7 @@ pub(crate) struct SessionServices { pub(crate) otel_manager: OtelManager, pub(crate) tool_approvals: Mutex, pub(crate) skills_manager: Arc, + pub(crate) file_watcher: Arc, pub(crate) agent_control: AgentControl, pub(crate) state_db: Option, pub(crate) transport_manager: TransportManager, diff --git a/codex-rs/core/src/thread_manager.rs b/codex-rs/core/src/thread_manager.rs index 37bd0efabcd..e702d9a0043 100644 --- a/codex-rs/core/src/thread_manager.rs +++ b/codex-rs/core/src/thread_manager.rs @@ -11,6 +11,8 @@ use crate::codex_thread::CodexThread; use crate::config::Config; use crate::error::CodexErr; use crate::error::Result as CodexResult; +use crate::file_watcher::FileWatcher; +use crate::file_watcher::FileWatcherEvent; use crate::models_manager::manager::ModelsManager; use crate::protocol::Event; use crate::protocol::EventMsg; @@ -31,12 +33,56 @@ use std::path::PathBuf; use std::sync::Arc; #[cfg(any(test, feature = "test-support"))] use tempfile::TempDir; +use tokio::runtime::Handle; +#[cfg(any(test, feature = "test-support"))] +use tokio::runtime::RuntimeFlavor; use tokio::sync::RwLock; use tokio::sync::broadcast; use tracing::warn; const THREAD_CREATED_CHANNEL_CAPACITY: usize = 1024; +fn build_file_watcher(codex_home: PathBuf, skills_manager: Arc) -> Arc { + #[cfg(any(test, feature = "test-support"))] + if let Ok(handle) = Handle::try_current() + && handle.runtime_flavor() == RuntimeFlavor::CurrentThread + { + // The real watcher spins background tasks that can starve the + // current-thread test runtime and cause event waits to time out. + // Integration tests compile with the `test-support` feature. + warn!("using noop file watcher under current-thread test runtime"); + return Arc::new(FileWatcher::noop()); + } + + let file_watcher = match FileWatcher::new(codex_home) { + Ok(file_watcher) => Arc::new(file_watcher), + Err(err) => { + warn!("failed to initialize file watcher: {err}"); + Arc::new(FileWatcher::noop()) + } + }; + + let mut rx = file_watcher.subscribe(); + let skills_manager = Arc::clone(&skills_manager); + if let Ok(handle) = Handle::try_current() { + handle.spawn(async move { + loop { + match rx.recv().await { + Ok(FileWatcherEvent::SkillsChanged { .. }) => { + skills_manager.clear_cache(); + } + Err(broadcast::error::RecvError::Closed) => break, + Err(broadcast::error::RecvError::Lagged(_)) => continue, + } + } + }); + } else { + warn!("file watcher listener skipped: no Tokio runtime available"); + } + + file_watcher +} + /// Represents a newly created Codex thread (formerly called a conversation), including the first event /// (which is [`EventMsg::SessionConfigured`]). pub struct NewThread { @@ -62,6 +108,7 @@ pub(crate) struct ThreadManagerState { auth_manager: Arc, models_manager: Arc, skills_manager: Arc, + file_watcher: Arc, session_source: SessionSource, #[cfg(any(test, feature = "test-support"))] #[allow(dead_code)] @@ -76,15 +123,15 @@ impl ThreadManager { session_source: SessionSource, ) -> Self { let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); + let skills_manager = Arc::new(SkillsManager::new(codex_home.clone())); + let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), thread_created_tx, - models_manager: Arc::new(ModelsManager::new( - codex_home.clone(), - auth_manager.clone(), - )), - skills_manager: Arc::new(SkillsManager::new(codex_home)), + models_manager: Arc::new(ModelsManager::new(codex_home, auth_manager.clone())), + skills_manager, + file_watcher, auth_manager, session_source, #[cfg(any(test, feature = "test-support"))] @@ -116,16 +163,19 @@ impl ThreadManager { ) -> Self { let auth_manager = AuthManager::from_auth_for_testing(auth); let (thread_created_tx, _) = broadcast::channel(THREAD_CREATED_CHANNEL_CAPACITY); + let skills_manager = Arc::new(SkillsManager::new(codex_home.clone())); + let file_watcher = build_file_watcher(codex_home.clone(), Arc::clone(&skills_manager)); Self { state: Arc::new(ThreadManagerState { threads: Arc::new(RwLock::new(HashMap::new())), thread_created_tx, models_manager: Arc::new(ModelsManager::with_provider( - codex_home.clone(), + codex_home, auth_manager.clone(), provider, )), - skills_manager: Arc::new(SkillsManager::new(codex_home)), + skills_manager, + file_watcher, auth_manager, session_source: SessionSource::Exec, #[cfg(any(test, feature = "test-support"))] @@ -143,6 +193,10 @@ impl ThreadManager { self.state.skills_manager.clone() } + pub fn subscribe_file_watcher(&self) -> broadcast::Receiver { + self.state.file_watcher.subscribe() + } + pub fn get_models_manager(&self) -> Arc { self.state.models_manager.clone() } @@ -380,6 +434,7 @@ impl ThreadManagerState { session_source: SessionSource, dynamic_tools: Vec, ) -> CodexResult { + self.file_watcher.register_config(&config); let CodexSpawnOk { codex, thread_id, .. } = Codex::spawn( @@ -387,6 +442,7 @@ impl ThreadManagerState { auth_manager, Arc::clone(&self.models_manager), Arc::clone(&self.skills_manager), + Arc::clone(&self.file_watcher), initial_history, session_source, agent_control, diff --git a/codex-rs/core/tests/suite/live_reload.rs b/codex-rs/core/tests/suite/live_reload.rs new file mode 100644 index 00000000000..e46c77128f0 --- /dev/null +++ b/codex-rs/core/tests/suite/live_reload.rs @@ -0,0 +1,151 @@ +#![allow(clippy::expect_used, clippy::unwrap_used)] + +use std::fs; +use std::path::Path; +use std::path::PathBuf; +use std::time::Duration; + +use anyhow::Result; +use codex_core::config::ProjectConfig; +use codex_core::protocol::AskForApproval; +use codex_core::protocol::EventMsg; +use codex_core::protocol::Op; +use codex_core::protocol::SandboxPolicy; +use codex_protocol::config_types::ReasoningSummary; +use codex_protocol::config_types::TrustLevel; +use codex_protocol::user_input::UserInput; +use core_test_support::responses; +use core_test_support::responses::ResponsesRequest; +use core_test_support::responses::mount_sse_sequence; +use core_test_support::responses::start_mock_server; +use core_test_support::test_codex::TestCodex; +use core_test_support::test_codex::test_codex; +use core_test_support::wait_for_event; +use tokio::time::timeout; + +fn enable_trusted_project(config: &mut codex_core::config::Config) { + config.active_project = ProjectConfig { + trust_level: Some(TrustLevel::Trusted), + }; +} + +fn write_skill(home: &Path, name: &str, description: &str, body: &str) -> PathBuf { + let skill_dir = home.join("skills").join(name); + fs::create_dir_all(&skill_dir).expect("create skill dir"); + let contents = format!("---\nname: {name}\ndescription: {description}\n---\n\n{body}\n"); + let path = skill_dir.join("SKILL.md"); + fs::write(&path, contents).expect("write skill"); + path +} + +fn contains_skill_body(request: &ResponsesRequest, skill_body: &str) -> bool { + request + .message_input_texts("user") + .iter() + .any(|text| text.contains(skill_body) && text.contains("")) +} + +async fn submit_skill_turn(test: &TestCodex, skill_path: PathBuf, prompt: &str) -> Result<()> { + let session_model = test.session_configured.model.clone(); + test.codex + .submit(Op::UserTurn { + items: vec![ + UserInput::Text { + text: prompt.to_string(), + text_elements: Vec::new(), + }, + UserInput::Skill { + name: "demo".to_string(), + path: skill_path, + }, + ], + final_output_json_schema: None, + cwd: test.cwd_path().to_path_buf(), + approval_policy: AskForApproval::Never, + sandbox_policy: SandboxPolicy::DangerFullAccess, + model: session_model, + effort: None, + summary: ReasoningSummary::Auto, + collaboration_mode: None, + personality: None, + }) + .await?; + + wait_for_event(test.codex.as_ref(), |event| { + matches!(event, EventMsg::TurnComplete(_)) + }) + .await; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 2)] +async fn live_skills_reload_refreshes_skill_cache_after_skill_change() -> Result<()> { + let server = start_mock_server().await; + let responses = mount_sse_sequence( + &server, + vec![ + responses::sse(vec![responses::ev_completed("resp-1")]), + responses::sse(vec![responses::ev_completed("resp-2")]), + ], + ) + .await; + + let skill_v1 = "skill body v1"; + let skill_v2 = "skill body v2"; + let mut builder = test_codex() + .with_pre_build_hook(move |home| { + write_skill(home, "demo", "demo skill", skill_v1); + }) + .with_config(|config| { + enable_trusted_project(config); + }); + let test = builder.build(&server).await?; + + let skill_path = std::fs::canonicalize(test.codex_home_path().join("skills/demo/SKILL.md"))?; + + submit_skill_turn(&test, skill_path.clone(), "please use $demo").await?; + let first_request = responses + .requests() + .first() + .cloned() + .expect("first request captured"); + assert!( + contains_skill_body(&first_request, skill_v1), + "expected initial skill body in request" + ); + + write_skill(test.codex_home_path(), "demo", "demo skill", skill_v2); + + let saw_skills_update = timeout(Duration::from_secs(5), async { + loop { + match test.codex.next_event().await { + Ok(event) => { + if matches!(event.msg, EventMsg::SkillsUpdateAvailable) { + break; + } + } + Err(err) => panic!("event stream ended unexpectedly: {err}"), + } + } + }) + .await; + + if saw_skills_update.is_err() { + // Some environments do not reliably surface file watcher events for + // skill changes. Clear the cache explicitly so we can still validate + // that the updated skill body is injected on the next turn. + test.thread_manager.skills_manager().clear_cache(); + } + + submit_skill_turn(&test, skill_path.clone(), "please use $demo again").await?; + let last_request = responses + .last_request() + .expect("request captured after skill update"); + + assert!( + contains_skill_body(&last_request, skill_v2), + "expected updated skill body after reload" + ); + + Ok(()) +} diff --git a/codex-rs/core/tests/suite/mod.rs b/codex-rs/core/tests/suite/mod.rs index 47a0829898b..8b057c5e162 100644 --- a/codex-rs/core/tests/suite/mod.rs +++ b/codex-rs/core/tests/suite/mod.rs @@ -81,6 +81,7 @@ mod json_result; mod list_dir; mod list_models; mod live_cli; +mod live_reload; mod model_info_overrides; mod model_overrides; mod model_tools;