From ff3b11f5aeb4210ef6d119aa063845a248e6ae33 Mon Sep 17 00:00:00 2001 From: Xin Lin Date: Tue, 28 Apr 2026 01:02:25 -0700 Subject: [PATCH] feat: Use remote installed plugin cache for skills and MCP --- .../app-server/src/codex_message_processor.rs | 50 ++- .../src/codex_message_processor/plugins.rs | 67 ++-- codex-rs/app-server/src/message_processor.rs | 9 +- .../app-server/tests/suite/v2/skills_list.rs | 220 +++++++++++++ codex-rs/core-plugins/src/loader.rs | 42 ++- codex-rs/core-plugins/src/remote.rs | 57 ++++ codex-rs/core/src/plugins/manager.rs | 289 +++++++++++++++++- codex-rs/core/src/plugins/manager_tests.rs | 63 ++++ codex-rs/core/src/session/handlers.rs | 6 +- 9 files changed, 750 insertions(+), 53 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 362f8692b67b..7a6bc5feb52b 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -722,6 +722,35 @@ impl CodexMessageProcessor { self.clear_plugin_related_caches(); } + pub(crate) fn effective_plugins_changed_callback( + &self, + config: Config, + ) -> Arc { + let thread_manager = Arc::clone(&self.thread_manager); + Arc::new(move || { + Self::spawn_effective_plugins_changed_task(Arc::clone(&thread_manager), config.clone()); + }) + } + + fn on_effective_plugins_changed(&self, config: Config) { + Self::spawn_effective_plugins_changed_task(Arc::clone(&self.thread_manager), config); + } + + fn spawn_effective_plugins_changed_task(thread_manager: Arc, config: Config) { + tokio::spawn(async move { + thread_manager.plugins_manager().clear_cache(); + thread_manager.skills_manager().clear_cache(); + if thread_manager.list_thread_ids().await.is_empty() { + return; + } + if let Err(err) = + Self::queue_mcp_server_refresh_for_config(&thread_manager, &config).await + { + warn!("failed to queue MCP refresh after effective plugins changed: {err:?}"); + } + }); + } + fn clear_plugin_related_caches(&self) { self.thread_manager.plugins_manager().clear_cache(); self.thread_manager.skills_manager().clear_cache(); @@ -5362,7 +5391,7 @@ impl CodexMessageProcessor { async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) { let result = async { let config = self.load_latest_config(/*fallback_cwd*/ None).await?; - self.queue_mcp_server_refresh_for_config(&config).await?; + Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?; Ok::<_, JSONRPCErrorError>(McpServerRefreshResponse {}) } .await; @@ -5370,11 +5399,10 @@ impl CodexMessageProcessor { } async fn queue_mcp_server_refresh_for_config( - &self, + thread_manager: &Arc, config: &Config, ) -> Result<(), JSONRPCErrorError> { - let configured_servers = self - .thread_manager + let configured_servers = thread_manager .mcp_manager() .configured_servers(config) .await; @@ -5410,7 +5438,6 @@ impl CodexMessageProcessor { // Refresh requests are queued per thread; each thread rebuilds MCP connections on its next // active turn to avoid work for threads that never resume. - let thread_manager = Arc::clone(&self.thread_manager); thread_manager.refresh_mcp_servers(refresh_config).await; Ok(()) } @@ -6259,12 +6286,13 @@ impl CodexMessageProcessor { continue; } }; - let effective_skill_roots = plugins_manager - .effective_skill_roots_for_layer_stack( - &config_layer_stack, - config.features.enabled(Feature::Plugins) && workspace_codex_plugins_enabled, - ) - .await; + let effective_skill_roots = if workspace_codex_plugins_enabled { + plugins_manager + .effective_skill_roots_for_layer_stack(&config_layer_stack, &config) + .await + } else { + Vec::new() + }; let skills_input = codex_core::skills::SkillsLoadInput::new( cwd_abs.clone(), effective_skill_roots, diff --git a/codex-rs/app-server/src/codex_message_processor/plugins.rs b/codex-rs/app-server/src/codex_message_processor/plugins.rs index db8ade8c673f..cb11e20e6c92 100644 --- a/codex-rs/app-server/src/codex_message_processor/plugins.rs +++ b/codex-rs/app-server/src/codex_message_processor/plugins.rs @@ -37,7 +37,12 @@ impl CodexMessageProcessor { { return Ok(empty_response()); } - plugins_manager.maybe_start_non_curated_plugin_cache_refresh(&roots); + plugins_manager.maybe_start_plugin_list_background_tasks_for_config( + &config, + auth.clone(), + &roots, + Some(self.effective_plugins_changed_callback(config.clone())), + ); let config_for_marketplace_listing = config.clone(); let plugins_manager_for_marketplace_listing = plugins_manager.clone(); @@ -362,17 +367,10 @@ impl CodexMessageProcessor { } }; - self.clear_plugin_related_caches(); + self.on_effective_plugins_changed(config.clone()); let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await; - if !plugin_mcp_servers.is_empty() { - if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await { - warn!( - plugin = result.plugin_id.as_key(), - "failed to queue MCP refresh after plugin install: {err:?}" - ); - } self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers) .await; } @@ -464,19 +462,16 @@ impl CodexMessageProcessor { .await .map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?; - // TODO(remote plugins): remote marketplaces do not yet have a local - // marketplace/read-path sync, so this install path reads MCP/apps directly - // from the just-cached bundle. - self.clear_plugin_related_caches(); + self.thread_manager + .plugins_manager() + .maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + &config, + auth.clone(), + Some(self.effective_plugins_changed_callback(config.clone())), + ); let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await; if !plugin_mcp_servers.is_empty() { - if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await { - warn!( - plugin = result.plugin_id.as_key(), - "failed to queue MCP refresh after remote plugin install: {err:?}" - ); - } self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers) .await; } @@ -591,7 +586,15 @@ impl CodexMessageProcessor { .uninstall_plugin(plugin_id) .await .map_err(Self::plugin_uninstall_error)?; - self.clear_plugin_related_caches(); + match self.load_latest_config(/*fallback_cwd*/ None).await { + Ok(config) => self.on_effective_plugins_changed(config), + Err(err) => { + warn!( + "failed to reload config after plugin uninstall, clearing plugin-related caches only: {err:?}" + ); + self.clear_plugin_related_caches(); + } + } Ok(PluginUninstallResponse {}) } @@ -675,16 +678,32 @@ impl CodexMessageProcessor { let remote_plugin_service_config = RemotePluginServiceConfig { chatgpt_base_url: config.chatgpt_base_url.clone(), }; - codex_core_plugins::remote::uninstall_remote_plugin( + let uninstall_result = codex_core_plugins::remote::uninstall_remote_plugin( &remote_plugin_service_config, auth.as_ref(), config.codex_home.to_path_buf(), &plugin_id, ) - .await - .map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "uninstall remote plugin"))?; + .await; + + if matches!( + &uninstall_result, + Ok(()) | Err(RemotePluginCatalogError::CacheRemove(_)) + ) { + let plugins_manager = self.thread_manager.plugins_manager(); + if plugins_manager.clear_remote_installed_plugins_cache() { + self.on_effective_plugins_changed(config.clone()); + } + plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + &config, + auth.clone(), + Some(self.effective_plugins_changed_callback(config.clone())), + ); + } - self.clear_plugin_related_caches(); + uninstall_result.map_err(|err| { + remote_plugin_catalog_error_to_jsonrpc(err, "uninstall remote plugin") + })?; Ok(PluginUninstallResponse {}) } } diff --git a/codex-rs/app-server/src/message_processor.rs b/codex-rs/app-server/src/message_processor.rs index df77e8ca2b5c..9b0e1ada37f7 100644 --- a/codex-rs/app-server/src/message_processor.rs +++ b/codex-rs/app-server/src/message_processor.rs @@ -314,10 +314,15 @@ impl MessageProcessor { }); if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) { // Keep plugin startup warmups aligned at app-server startup. - // TODO(xl): Move into PluginManager once this no longer depends on config feature gating. + let on_effective_plugins_changed = + codex_message_processor.effective_plugins_changed_callback((*config).clone()); thread_manager .plugins_manager() - .maybe_start_plugin_startup_tasks_for_config(&config, auth_manager.clone()); + .maybe_start_plugin_startup_tasks_for_config( + &config, + auth_manager.clone(), + Some(on_effective_plugins_changed), + ); } let config_api = ConfigApi::new( config_manager, diff --git a/codex-rs/app-server/tests/suite/v2/skills_list.rs b/codex-rs/app-server/tests/suite/v2/skills_list.rs index e9c6e3bc0057..1105df37ca88 100644 --- a/codex-rs/app-server/tests/suite/v2/skills_list.rs +++ b/codex-rs/app-server/tests/suite/v2/skills_list.rs @@ -7,6 +7,8 @@ use app_test_support::McpProcess; use app_test_support::to_response; use app_test_support::write_chatgpt_auth; use codex_app_server_protocol::JSONRPCResponse; +use codex_app_server_protocol::PluginListParams; +use codex_app_server_protocol::PluginListResponse; use codex_app_server_protocol::RequestId; use codex_app_server_protocol::SkillsChangedNotification; use codex_app_server_protocol::SkillsListExtraRootsForCwd; @@ -24,6 +26,7 @@ use wiremock::ResponseTemplate; use wiremock::matchers::header; use wiremock::matchers::method; use wiremock::matchers::path; +use wiremock::matchers::query_param; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30); const WATCHER_TIMEOUT: Duration = Duration::from_secs(20); @@ -52,6 +55,23 @@ plugins = true ) } +fn write_remote_plugins_enabled_config_with_base_url( + codex_home: &std::path::Path, + base_url: &str, +) -> std::io::Result<()> { + std::fs::write( + codex_home.join("config.toml"), + format!( + r#"chatgpt_base_url = "{base_url}" + +[features] +plugins = true +remote_plugin = true +"#, + ), + ) +} + fn write_plugin_with_skill( repo_root: &std::path::Path, plugin_name: &str, @@ -93,6 +113,26 @@ fn write_plugin_with_skill( Ok(()) } +fn write_cached_remote_plugin_with_skill( + codex_home: &std::path::Path, +) -> Result { + let plugin_root = codex_home.join("plugins/cache/chatgpt-global/linear/local"); + std::fs::create_dir_all(plugin_root.join(".codex-plugin"))?; + std::fs::write( + plugin_root.join(".codex-plugin/plugin.json"), + r#"{"name":"linear"}"#, + )?; + + let skill_dir = plugin_root.join("skills/triage-issues"); + std::fs::create_dir_all(&skill_dir)?; + let skill_path = skill_dir.join("SKILL.md"); + std::fs::write( + &skill_path, + "---\nname: triage-issues\ndescription: Triage Linear issues\n---\n\n# Body\n", + )?; + Ok(skill_path) +} + #[tokio::test] async fn skills_list_includes_skills_from_per_cwd_extra_user_roots() -> Result<()> { let codex_home = TempDir::new()?; @@ -131,6 +171,186 @@ async fn skills_list_includes_skills_from_per_cwd_extra_user_roots() -> Result<( Ok(()) } +#[tokio::test] +async fn skills_list_loads_remote_installed_plugin_skills_from_cache() -> Result<()> { + let codex_home = TempDir::new()?; + let cwd = TempDir::new()?; + let server = MockServer::start().await; + let expected_skill_path = + std::fs::canonicalize(write_cached_remote_plugin_with_skill(codex_home.path())?)?; + write_remote_plugins_enabled_config_with_base_url( + codex_home.path(), + &format!("{}/backend-api/", server.uri()), + )?; + write_chatgpt_auth( + codex_home.path(), + ChatGptAuthFixture::new("chatgpt-token") + .account_id("account-123") + .chatgpt_user_id("user-123") + .chatgpt_account_id("account-123"), + AuthCredentialsStoreMode::File, + )?; + + let global_directory_body = r#"{ + "plugins": [ + { + "id": "plugins~Plugin_linear", + "name": "linear", + "scope": "GLOBAL", + "installation_policy": "AVAILABLE", + "authentication_policy": "ON_USE", + "release": { + "display_name": "Linear", + "description": "Track work in Linear", + "app_ids": [], + "interface": {}, + "skills": [] + } + } + ], + "pagination": { + "limit": 50, + "next_page_token": null + } +}"#; + let global_installed_body = r#"{ + "plugins": [ + { + "id": "plugins~Plugin_linear", + "name": "linear", + "scope": "GLOBAL", + "installation_policy": "AVAILABLE", + "authentication_policy": "ON_USE", + "release": { + "display_name": "Linear", + "description": "Track work in Linear", + "app_ids": [], + "interface": {}, + "skills": [] + }, + "enabled": true, + "disabled_skill_names": [] + } + ], + "pagination": { + "limit": 50, + "next_page_token": null + } +}"#; + let empty_page_body = r#"{ + "plugins": [], + "pagination": { + "limit": 50, + "next_page_token": null + } +}"#; + + for (scope, body) in [ + ("GLOBAL", global_directory_body), + ("WORKSPACE", empty_page_body), + ] { + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/list")) + .and(query_param("scope", scope)) + .and(query_param("limit", "200")) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string(body)) + .mount(&server) + .await; + } + let mut mcp = McpProcess::new(codex_home.path()).await?; + timeout(DEFAULT_TIMEOUT, mcp.initialize()).await??; + + let stale_skills_list_request_id = mcp + .send_skills_list_request(SkillsListParams { + cwds: vec![cwd.path().to_path_buf()], + force_reload: true, + per_cwd_extra_user_roots: None, + }) + .await?; + let stale_skills_list_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(stale_skills_list_request_id)), + ) + .await??; + let SkillsListResponse { data } = to_response(stale_skills_list_response)?; + assert_eq!(data.len(), 1); + assert!( + data[0] + .skills + .iter() + .all(|skill| skill.name != "linear:triage-issues"), + "remote installed plugin cache has not been refreshed yet" + ); + + for (scope, body) in [ + ("GLOBAL", global_installed_body), + ("WORKSPACE", empty_page_body), + ] { + Mock::given(method("GET")) + .and(path("/backend-api/ps/plugins/installed")) + .and(query_param("scope", scope)) + .and(header("authorization", "Bearer chatgpt-token")) + .and(header("chatgpt-account-id", "account-123")) + .respond_with(ResponseTemplate::new(200).set_body_string(body)) + .mount(&server) + .await; + } + + let plugin_list_request_id = mcp + .send_plugin_list_request(PluginListParams { cwds: None }) + .await?; + let plugin_list_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(plugin_list_request_id)), + ) + .await??; + let _: PluginListResponse = to_response(plugin_list_response)?; + + let SkillsListResponse { data } = timeout(DEFAULT_TIMEOUT, async { + loop { + let skills_list_request_id = mcp + .send_skills_list_request(SkillsListParams { + cwds: vec![cwd.path().to_path_buf()], + force_reload: false, + per_cwd_extra_user_roots: None, + }) + .await?; + let skills_list_response: JSONRPCResponse = timeout( + DEFAULT_TIMEOUT, + mcp.read_stream_until_response_message(RequestId::Integer(skills_list_request_id)), + ) + .await??; + let response: SkillsListResponse = to_response(skills_list_response)?; + if response.data.iter().any(|entry| { + entry + .skills + .iter() + .any(|skill| skill.name == "linear:triage-issues") + }) { + break Ok::(response); + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }) + .await??; + + assert_eq!(data.len(), 1); + assert_eq!(data[0].errors, Vec::new()); + let skill = data[0] + .skills + .iter() + .find(|skill| skill.name == "linear:triage-issues") + .expect("expected skill from cached remote plugin"); + assert_eq!( + std::fs::canonicalize(skill.path.as_path())?, + expected_skill_path + ); + assert_eq!(skill.enabled, true); + Ok(()) +} + #[tokio::test] async fn skills_list_excludes_plugin_skills_when_workspace_codex_plugins_disabled() -> Result<()> { let codex_home = TempDir::new()?; diff --git a/codex-rs/core-plugins/src/loader.rs b/codex-rs/core-plugins/src/loader.rs index 55b8c0b57c76..a2b3c8daa6c0 100644 --- a/codex-rs/core-plugins/src/loader.rs +++ b/codex-rs/core-plugins/src/loader.rs @@ -5,6 +5,7 @@ use crate::manifest::load_plugin_manifest; use crate::marketplace::MarketplacePluginSource; use crate::marketplace::list_marketplaces; use crate::marketplace::load_marketplace; +use crate::remote::RemoteInstalledPlugin; use crate::store::PluginStore; use crate::store::plugin_version_for_source; use codex_config::ConfigLayerStack; @@ -107,13 +108,14 @@ struct PluginAppConfig { pub async fn load_plugins_from_layer_stack( config_layer_stack: &ConfigLayerStack, + extra_plugins: HashMap, store: &PluginStore, restriction_product: Option, ) -> PluginLoadOutcome { let skill_config_rules = skill_config_rules_from_stack(config_layer_stack); - let mut configured_plugins: Vec<_> = configured_plugins_from_stack(config_layer_stack) - .into_iter() - .collect(); + let mut configured_plugins = configured_plugins_from_stack(config_layer_stack); + configured_plugins.extend(extra_plugins); + let mut configured_plugins: Vec<_> = configured_plugins.into_iter().collect(); configured_plugins.sort_unstable_by(|(a, _), (b, _)| a.cmp(b)); let mut plugins = Vec::with_capacity(configured_plugins.len()); @@ -145,6 +147,40 @@ pub async fn load_plugins_from_layer_stack( PluginLoadOutcome::from_plugins(plugins) } +pub fn remote_installed_plugins_to_config( + plugins: &[RemoteInstalledPlugin], + store: &PluginStore, +) -> HashMap { + plugins + .iter() + .filter_map(|plugin| { + let plugin_id = + match PluginId::new(plugin.name.clone(), plugin.marketplace_name.clone()) { + Ok(plugin_id) => plugin_id, + Err(err) => { + warn!( + plugin = %plugin.name, + remote_id = %plugin.id, + error = %err, + "ignoring invalid remote installed plugin name" + ); + return None; + } + }; + // TODO(remote plugins): download or update missing local bundles during remote + // installed reconciliation. Until then, only publish remote installed state for + // bundles already present in the local plugin cache. + store.active_plugin_root(&plugin_id)?; + Some(( + plugin_id.as_key(), + PluginConfig { + enabled: plugin.enabled, + }, + )) + }) + .collect() +} + pub fn refresh_curated_plugin_cache( codex_home: &Path, plugin_version: &str, diff --git a/codex-rs/core-plugins/src/remote.rs b/codex-rs/core-plugins/src/remote.rs index 6c87f4dc3036..032313ca1fad 100644 --- a/codex-rs/core-plugins/src/remote.rs +++ b/codex-rs/core-plugins/src/remote.rs @@ -39,6 +39,14 @@ pub struct RemoteMarketplace { pub plugins: Vec, } +#[derive(Debug, Clone, PartialEq)] +pub struct RemoteInstalledPlugin { + pub marketplace_name: String, + pub id: String, + pub name: String, + pub enabled: bool, +} + #[derive(Debug, Clone, PartialEq)] pub struct RemotePluginSummary { pub id: String, @@ -369,6 +377,39 @@ pub async fn fetch_remote_marketplaces( Ok(marketplaces) } +pub async fn fetch_remote_installed_plugins( + config: &RemotePluginServiceConfig, + auth: Option<&CodexAuth>, +) -> Result, RemotePluginCatalogError> { + let auth = ensure_chatgpt_auth(auth)?; + let global = async { + let scope = RemotePluginScope::Global; + let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?; + Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) + }; + let workspace = async { + let scope = RemotePluginScope::Workspace; + let installed_plugins = fetch_installed_plugins_for_scope(config, auth, scope).await?; + Ok::<_, RemotePluginCatalogError>((scope, installed_plugins)) + }; + + let (global, workspace) = tokio::try_join!(global, workspace)?; + let mut installed_plugins = [global, workspace] + .into_iter() + .flat_map(|(scope, plugins)| { + plugins + .into_iter() + .map(move |plugin| remote_installed_plugin_to_info(scope, &plugin)) + }) + .collect::>(); + installed_plugins.sort_by(|left, right| { + left.marketplace_name + .cmp(&right.marketplace_name) + .then_with(|| left.id.cmp(&right.id)) + }); + Ok(installed_plugins) +} + pub async fn fetch_remote_plugin_detail( config: &RemotePluginServiceConfig, auth: Option<&CodexAuth>, @@ -671,6 +712,22 @@ fn build_remote_plugin_summary( } } +fn remote_installed_plugin_to_info( + scope: RemotePluginScope, + installed_plugin: &RemotePluginInstalledItem, +) -> RemoteInstalledPlugin { + let plugin = &installed_plugin.plugin; + // Remote per-skill disabled state (`disabled_skill_names`) is intentionally + // not projected into skills/list yet; local skills.config remains the + // supported source for skill enablement. + RemoteInstalledPlugin { + marketplace_name: scope.marketplace_name().to_string(), + id: plugin.id.clone(), + name: plugin.name.clone(), + enabled: installed_plugin.enabled, + } +} + fn remote_plugin_interface_to_info(plugin: &RemotePluginDirectoryItem) -> Option { let interface = &plugin.release.interface; let display_name = non_empty_string(Some(&plugin.release.display_name)); diff --git a/codex-rs/core/src/plugins/manager.rs b/codex-rs/core/src/plugins/manager.rs index 880ad8ed224a..f59bbad558ce 100644 --- a/codex-rs/core/src/plugins/manager.rs +++ b/codex-rs/core/src/plugins/manager.rs @@ -22,6 +22,7 @@ use codex_core_plugins::loader::plugin_telemetry_metadata_from_root; use codex_core_plugins::loader::refresh_curated_plugin_cache; use codex_core_plugins::loader::refresh_non_curated_plugin_cache; use codex_core_plugins::loader::refresh_non_curated_plugin_cache_force_reinstall; +use codex_core_plugins::loader::remote_installed_plugins_to_config; use codex_core_plugins::manifest::PluginManifestInterface; use codex_core_plugins::manifest::load_plugin_manifest; use codex_core_plugins::marketplace::MarketplaceError; @@ -40,6 +41,8 @@ use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeError; use codex_core_plugins::marketplace_upgrade::ConfiguredMarketplaceUpgradeOutcome; use codex_core_plugins::marketplace_upgrade::configured_git_marketplace_names; use codex_core_plugins::marketplace_upgrade::upgrade_configured_git_marketplaces; +use codex_core_plugins::remote::RemoteInstalledPlugin; +use codex_core_plugins::remote::RemotePluginCatalogError; use codex_core_plugins::remote::RemotePluginServiceConfig; use codex_core_plugins::remote_legacy::RemotePluginFetchError; use codex_core_plugins::remote_legacy::RemotePluginMutationError; @@ -91,6 +94,30 @@ struct CachedFeaturedPluginIds { featured_plugin_ids: Vec, } +struct RemoteInstalledPluginsCacheRefreshRequest { + service_config: RemotePluginServiceConfig, + auth: Option, + notify: RemoteInstalledPluginsCacheRefreshNotify, + // App-server attaches side effects such as skills metadata invalidation and MCP refreshes when + // remote installed state changes. + on_effective_plugins_changed: Option>, +} + +#[derive(Clone, Copy)] +enum RemoteInstalledPluginsCacheRefreshNotify { + IfCacheChanged, + // Remote mutations may change local bundles or active MCP state even when the installed set is + // unchanged. Notify after `/installed` succeeds so MCP refreshes are ordered after the remote + // installed cache. + AfterSuccessfulRefresh, +} + +#[derive(Default)] +struct RemoteInstalledPluginsCacheRefreshState { + requested: Option, + in_flight: bool, +} + #[derive(Clone, PartialEq, Eq)] struct NonCuratedCacheRefreshRequest { roots: Vec, @@ -333,6 +360,10 @@ pub struct PluginsManager { configured_marketplace_upgrade_state: RwLock, non_curated_cache_refresh_state: RwLock, cached_enabled_outcome: RwLock>, + // TODO(remote plugins): reset this cache when ChatGPT auth/account state changes so stale + // remote installed state cannot remain effective for a different account. + remote_installed_plugins_cache: RwLock>>, + remote_installed_plugins_cache_refresh_state: RwLock, remote_sync_lock: Semaphore, restriction_product: Option, analytics_events_client: RwLock>, @@ -363,6 +394,10 @@ impl PluginsManager { ), non_curated_cache_refresh_state: RwLock::new(NonCuratedCacheRefreshState::default()), cached_enabled_outcome: RwLock::new(None), + remote_installed_plugins_cache: RwLock::new(None), + remote_installed_plugins_cache_refresh_state: RwLock::new( + RemoteInstalledPluginsCacheRefreshState::default(), + ), remote_sync_lock: Semaphore::new(/*permits*/ 1), restriction_product, analytics_events_client: RwLock::new(None), @@ -407,6 +442,7 @@ impl PluginsManager { let outcome = load_plugins_from_layer_stack( &config.config_layer_stack, + self.remote_installed_plugin_configs(config), &self.store, self.restriction_product, ) @@ -421,15 +457,19 @@ impl PluginsManager { } pub fn clear_cache(&self) { - let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() { + self.clear_enabled_outcome_cache(); + let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; - let mut featured_plugin_ids_cache = match self.featured_plugin_ids_cache.write() { + *featured_plugin_ids_cache = None; + } + + fn clear_enabled_outcome_cache(&self) { + let mut cached_enabled_outcome = match self.cached_enabled_outcome.write() { Ok(cache) => cache, Err(err) => err.into_inner(), }; - *featured_plugin_ids_cache = None; *cached_enabled_outcome = None; } @@ -437,14 +477,19 @@ impl PluginsManager { pub async fn effective_skill_roots_for_layer_stack( &self, config_layer_stack: &ConfigLayerStack, - plugins_feature_enabled: bool, + config: &Config, ) -> Vec { - if !plugins_feature_enabled { + if !config.features.enabled(Feature::Plugins) { return Vec::new(); } - load_plugins_from_layer_stack(config_layer_stack, &self.store, self.restriction_product) - .await - .effective_skill_roots() + load_plugins_from_layer_stack( + config_layer_stack, + self.remote_installed_plugin_configs(config), + &self.store, + self.restriction_product, + ) + .await + .effective_skill_roots() } fn cached_enabled_outcome(&self) -> Option { @@ -454,6 +499,116 @@ impl PluginsManager { } } + fn remote_installed_plugin_configs(&self, config: &Config) -> HashMap { + if !config.features.enabled(Feature::RemotePlugin) { + return HashMap::new(); + } + + let cache = match self.remote_installed_plugins_cache.read() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + let Some(plugins) = cache.as_ref() else { + return HashMap::new(); + }; + + remote_installed_plugins_to_config(plugins, &self.store) + } + + fn write_remote_installed_plugins_cache(&self, plugins: Vec) -> bool { + let mut cache = match self.remote_installed_plugins_cache.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + if cache.as_ref().is_some_and(|cache| cache.eq(&plugins)) { + return false; + } + *cache = Some(plugins); + drop(cache); + self.clear_enabled_outcome_cache(); + true + } + + pub fn clear_remote_installed_plugins_cache(&self) -> bool { + let mut cache = match self.remote_installed_plugins_cache.write() { + Ok(cache) => cache, + Err(err) => err.into_inner(), + }; + if cache.is_none() { + return false; + } + *cache = None; + drop(cache); + self.clear_enabled_outcome_cache(); + true + } + + pub fn maybe_start_remote_installed_plugins_cache_refresh( + self: &Arc, + config: &Config, + auth: Option, + on_effective_plugins_changed: Option>, + ) { + self.maybe_start_remote_installed_plugins_cache_refresh_with_notify( + config, + auth, + RemoteInstalledPluginsCacheRefreshNotify::IfCacheChanged, + on_effective_plugins_changed, + ); + } + + pub fn maybe_start_remote_installed_plugins_cache_refresh_after_mutation( + self: &Arc, + config: &Config, + auth: Option, + on_effective_plugins_changed: Option>, + ) { + self.maybe_start_remote_installed_plugins_cache_refresh_with_notify( + config, + auth, + RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh, + on_effective_plugins_changed, + ); + } + + fn maybe_start_remote_installed_plugins_cache_refresh_with_notify( + self: &Arc, + config: &Config, + auth: Option, + notify: RemoteInstalledPluginsCacheRefreshNotify, + on_effective_plugins_changed: Option>, + ) { + if !config.features.enabled(Feature::Plugins) + || !config.features.enabled(Feature::RemotePlugin) + { + return; + } + + self.schedule_remote_installed_plugins_cache_refresh( + RemoteInstalledPluginsCacheRefreshRequest { + service_config: remote_plugin_service_config(config), + auth, + notify, + on_effective_plugins_changed, + }, + ); + } + + pub fn maybe_start_plugin_list_background_tasks_for_config( + self: &Arc, + config: &Config, + auth: Option, + roots: &[AbsolutePathBuf], + on_effective_plugins_changed: Option>, + ) { + self.maybe_start_non_curated_plugin_cache_refresh(roots); + self.maybe_start_remote_installed_plugins_cache_refresh( + config, + auth, + on_effective_plugins_changed, + ); + } + fn cached_featured_plugin_ids( &self, cache_key: &FeaturedPluginIdsCacheKey, @@ -1128,6 +1283,7 @@ impl PluginsManager { self: &Arc, config: &Config, auth_manager: Arc, + on_effective_plugins_changed: Option>, ) { if config.features.enabled(Feature::Plugins) { self.start_curated_repo_sync(); @@ -1189,6 +1345,21 @@ impl PluginsManager { auth_manager.clone(), ); + if config.features.enabled(Feature::RemotePlugin) { + let config = config.clone(); + let manager = Arc::clone(self); + let auth_manager = auth_manager.clone(); + let on_effective_plugins_changed = on_effective_plugins_changed.clone(); + tokio::spawn(async move { + let auth = auth_manager.auth().await; + manager.maybe_start_remote_installed_plugins_cache_refresh( + &config, + auth, + on_effective_plugins_changed, + ); + }); + } + let config = config.clone(); let manager = Arc::clone(self); tokio::spawn(async move { @@ -1262,6 +1433,48 @@ impl PluginsManager { ); } + fn schedule_remote_installed_plugins_cache_refresh( + self: &Arc, + mut request: RemoteInstalledPluginsCacheRefreshRequest, + ) { + let should_spawn = { + let mut state = match self.remote_installed_plugins_cache_refresh_state.write() { + Ok(state) => state, + Err(err) => err.into_inner(), + }; + if let Some(existing_request) = state.requested.as_ref() { + if matches!( + existing_request.notify, + RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh + ) { + request.notify = + RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh; + } + if request.on_effective_plugins_changed.is_none() { + request.on_effective_plugins_changed = + existing_request.on_effective_plugins_changed.clone(); + } + } + state.requested = Some(request); + if state.in_flight { + false + } else { + state.in_flight = true; + true + } + }; + if !should_spawn { + return; + } + + let manager = Arc::clone(self); + tokio::spawn(async move { + manager + .run_remote_installed_plugins_cache_refresh_loop() + .await; + }); + } + fn schedule_non_curated_plugin_cache_refresh( self: &Arc, roots: &[AbsolutePathBuf], @@ -1368,6 +1581,66 @@ impl PluginsManager { } } + async fn run_remote_installed_plugins_cache_refresh_loop(self: Arc) { + loop { + let request = { + let mut state = match self.remote_installed_plugins_cache_refresh_state.write() { + Ok(state) => state, + Err(err) => err.into_inner(), + }; + match state.requested.take() { + Some(request) => request, + None => { + state.in_flight = false; + return; + } + } + }; + + let installed_plugins = codex_core_plugins::remote::fetch_remote_installed_plugins( + &request.service_config, + request.auth.as_ref(), + ) + .await; + match installed_plugins { + Ok(installed_plugins) => { + // TODO(remote plugins): reconcile missing or stale local bundles before + // publishing remote installed state as effective local plugin config. + let changed = self.write_remote_installed_plugins_cache(installed_plugins); + let should_notify = changed + || matches!( + request.notify, + RemoteInstalledPluginsCacheRefreshNotify::AfterSuccessfulRefresh + ); + if should_notify + && let Some(on_effective_plugins_changed) = + request.on_effective_plugins_changed + { + on_effective_plugins_changed(); + } + } + Err( + RemotePluginCatalogError::AuthRequired + | RemotePluginCatalogError::UnsupportedAuthMode, + ) => { + let changed = self.clear_remote_installed_plugins_cache(); + if changed + && let Some(on_effective_plugins_changed) = + request.on_effective_plugins_changed + { + on_effective_plugins_changed(); + } + } + Err(err) => { + warn!( + error = %err, + "failed to refresh remote installed plugins cache" + ); + } + } + } + } + fn run_non_curated_plugin_cache_refresh_loop(self: Arc) { loop { let request = { diff --git a/codex-rs/core/src/plugins/manager_tests.rs b/codex-rs/core/src/plugins/manager_tests.rs index fb4b5a62125a..a72df1881c8d 100644 --- a/codex-rs/core/src/plugins/manager_tests.rs +++ b/codex-rs/core/src/plugins/manager_tests.rs @@ -16,6 +16,7 @@ use codex_config::ConfigRequirementsToml; use codex_config::McpServerConfig; use codex_config::types::McpServerTransportConfig; use codex_core_plugins::installed_marketplaces::marketplace_install_root; +use codex_core_plugins::loader::load_plugins_from_layer_stack; use codex_core_plugins::loader::refresh_non_curated_plugin_cache; use codex_core_plugins::loader::refresh_non_curated_plugin_cache_force_reinstall; use codex_core_plugins::marketplace::MarketplacePluginInstallPolicy; @@ -246,6 +247,67 @@ async fn load_plugins_loads_default_skills_and_mcp_servers() { ); } +#[tokio::test] +async fn remote_installed_cache_adds_plugin_skill_roots_without_marketplace_config() { + let codex_home = TempDir::new().unwrap(); + let plugin_base = codex_home + .path() + .join("plugins/cache/chatgpt-global/linear"); + write_plugin(&plugin_base, "local", "linear"); + write_file( + &codex_home.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true +remote_plugin = true +"#, + ); + + let config = load_config(codex_home.path(), codex_home.path()).await; + let manager = PluginsManager::new(codex_home.path().to_path_buf()); + manager.write_remote_installed_plugins_cache(vec![ + codex_core_plugins::remote::RemoteInstalledPlugin { + marketplace_name: "chatgpt-global".to_string(), + id: "plugins~Plugin_linear".to_string(), + name: "linear".to_string(), + enabled: true, + }, + ]); + + let outcome = manager.plugins_for_config(&config).await; + assert_eq!( + outcome.effective_skill_roots(), + vec![AbsolutePathBuf::try_from(plugin_base.join("local/skills")).unwrap()] + ); + assert_eq!(outcome.plugins().len(), 1); + assert_eq!(outcome.plugins()[0].config_name, "linear@chatgpt-global"); +} + +#[tokio::test] +async fn remote_installed_cache_ignores_plugins_missing_local_cache() { + let codex_home = TempDir::new().unwrap(); + write_file( + &codex_home.path().join(CONFIG_TOML_FILE), + r#"[features] +plugins = true +remote_plugin = true +"#, + ); + + let config = load_config(codex_home.path(), codex_home.path()).await; + let manager = PluginsManager::new(codex_home.path().to_path_buf()); + manager.write_remote_installed_plugins_cache(vec![ + codex_core_plugins::remote::RemoteInstalledPlugin { + marketplace_name: "chatgpt-global".to_string(), + id: "plugins~Plugin_linear".to_string(), + name: "linear".to_string(), + enabled: true, + }, + ]); + + let outcome = manager.plugins_for_config(&config).await; + assert_eq!(outcome, PluginLoadOutcome::default()); +} + #[tokio::test] async fn load_plugins_resolves_disabled_skill_names_against_loaded_plugin_skills() { let codex_home = TempDir::new().unwrap(); @@ -3300,6 +3362,7 @@ async fn load_plugins_ignores_project_config_files() { let outcome = load_plugins_from_layer_stack( &stack, + std::collections::HashMap::new(), &PluginStore::new(codex_home.path().to_path_buf()), Some(Product::Codex), ) diff --git a/codex-rs/core/src/session/handlers.rs b/codex-rs/core/src/session/handlers.rs index 4965cdb38919..b35486160590 100644 --- a/codex-rs/core/src/session/handlers.rs +++ b/codex-rs/core/src/session/handlers.rs @@ -23,7 +23,6 @@ use codex_config::CloudRequirementsLoader; use codex_config::LoaderOverrides; use codex_config::loader::load_config_layers_state; use codex_exec_server::LOCAL_FS; -use codex_features::Feature; use codex_utils_absolute_path::AbsolutePathBuf; use crate::review_prompts::resolve_review_request; @@ -619,10 +618,7 @@ pub async fn list_skills(sess: &Session, sub_id: String, cwds: Vec, for } }; let effective_skill_roots = plugins_manager - .effective_skill_roots_for_layer_stack( - &config_layer_stack, - config.features.enabled(Feature::Plugins), - ) + .effective_skill_roots_for_layer_stack(&config_layer_stack, &config) .await; let skills_input = crate::SkillsLoadInput::new( cwd_abs.clone(),