Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 39 additions & 11 deletions codex-rs/app-server/src/codex_message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,35 @@ impl CodexMessageProcessor {
self.clear_plugin_related_caches();
}

pub(crate) fn effective_plugins_changed_callback(
&self,
config: Config,
) -> Arc<dyn Fn() + Send + Sync> {
let thread_manager = Arc::clone(&self.thread_manager);
Arc::new(move || {
Self::spawn_effective_plugins_changed_task(Arc::clone(&thread_manager), config.clone());
})
Comment thread
xl-openai marked this conversation as resolved.
}

fn on_effective_plugins_changed(&self, config: Config) {
Self::spawn_effective_plugins_changed_task(Arc::clone(&self.thread_manager), config);
}

fn spawn_effective_plugins_changed_task(thread_manager: Arc<ThreadManager>, config: Config) {
tokio::spawn(async move {
thread_manager.plugins_manager().clear_cache();
thread_manager.skills_manager().clear_cache();
if thread_manager.list_thread_ids().await.is_empty() {
return;
}
if let Err(err) =
Self::queue_mcp_server_refresh_for_config(&thread_manager, &config).await
{
warn!("failed to queue MCP refresh after effective plugins changed: {err:?}");
}
});
}

fn clear_plugin_related_caches(&self) {
self.thread_manager.plugins_manager().clear_cache();
self.thread_manager.skills_manager().clear_cache();
Expand Down Expand Up @@ -5362,19 +5391,18 @@ impl CodexMessageProcessor {
async fn mcp_server_refresh(&self, request_id: ConnectionRequestId, _params: Option<()>) {
let result = async {
let config = self.load_latest_config(/*fallback_cwd*/ None).await?;
self.queue_mcp_server_refresh_for_config(&config).await?;
Self::queue_mcp_server_refresh_for_config(&self.thread_manager, &config).await?;
Ok::<_, JSONRPCErrorError>(McpServerRefreshResponse {})
}
.await;
self.outgoing.send_result(request_id, result).await;
}

async fn queue_mcp_server_refresh_for_config(
&self,
thread_manager: &Arc<ThreadManager>,
config: &Config,
) -> Result<(), JSONRPCErrorError> {
let configured_servers = self
.thread_manager
let configured_servers = thread_manager
.mcp_manager()
.configured_servers(config)
.await;
Expand Down Expand Up @@ -5410,7 +5438,6 @@ impl CodexMessageProcessor {

// Refresh requests are queued per thread; each thread rebuilds MCP connections on its next
// active turn to avoid work for threads that never resume.
let thread_manager = Arc::clone(&self.thread_manager);
thread_manager.refresh_mcp_servers(refresh_config).await;
Ok(())
}
Expand Down Expand Up @@ -6259,12 +6286,13 @@ impl CodexMessageProcessor {
continue;
}
};
let effective_skill_roots = plugins_manager
.effective_skill_roots_for_layer_stack(
&config_layer_stack,
config.features.enabled(Feature::Plugins) && workspace_codex_plugins_enabled,
)
.await;
let effective_skill_roots = if workspace_codex_plugins_enabled {
plugins_manager
.effective_skill_roots_for_layer_stack(&config_layer_stack, &config)
.await
} else {
Vec::new()
};
let skills_input = codex_core::skills::SkillsLoadInput::new(
cwd_abs.clone(),
effective_skill_roots,
Expand Down
67 changes: 43 additions & 24 deletions codex-rs/app-server/src/codex_message_processor/plugins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,12 @@ impl CodexMessageProcessor {
{
return Ok(empty_response());
}
plugins_manager.maybe_start_non_curated_plugin_cache_refresh(&roots);
plugins_manager.maybe_start_plugin_list_background_tasks_for_config(
&config,
auth.clone(),
&roots,
Some(self.effective_plugins_changed_callback(config.clone())),
);

let config_for_marketplace_listing = config.clone();
let plugins_manager_for_marketplace_listing = plugins_manager.clone();
Expand Down Expand Up @@ -362,17 +367,10 @@ impl CodexMessageProcessor {
}
};

self.clear_plugin_related_caches();
self.on_effective_plugins_changed(config.clone());

let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await;

if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}
Expand Down Expand Up @@ -464,19 +462,16 @@ impl CodexMessageProcessor {
.await
.map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?;

// TODO(remote plugins): remote marketplaces do not yet have a local
// marketplace/read-path sync, so this install path reads MCP/apps directly
// from the just-cached bundle.
self.clear_plugin_related_caches();
self.thread_manager
.plugins_manager()
.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config,
auth.clone(),
Some(self.effective_plugins_changed_callback(config.clone())),
);

let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await;
if !plugin_mcp_servers.is_empty() {
if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await {
warn!(
plugin = result.plugin_id.as_key(),
"failed to queue MCP refresh after remote plugin install: {err:?}"
);
}
self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers)
.await;
}
Expand Down Expand Up @@ -591,7 +586,15 @@ impl CodexMessageProcessor {
.uninstall_plugin(plugin_id)
.await
.map_err(Self::plugin_uninstall_error)?;
self.clear_plugin_related_caches();
match self.load_latest_config(/*fallback_cwd*/ None).await {
Ok(config) => self.on_effective_plugins_changed(config),
Err(err) => {
warn!(
"failed to reload config after plugin uninstall, clearing plugin-related caches only: {err:?}"
);
self.clear_plugin_related_caches();
}
}
Ok(PluginUninstallResponse {})
}

Expand Down Expand Up @@ -675,16 +678,32 @@ impl CodexMessageProcessor {
let remote_plugin_service_config = RemotePluginServiceConfig {
chatgpt_base_url: config.chatgpt_base_url.clone(),
};
codex_core_plugins::remote::uninstall_remote_plugin(
let uninstall_result = codex_core_plugins::remote::uninstall_remote_plugin(
&remote_plugin_service_config,
auth.as_ref(),
config.codex_home.to_path_buf(),
&plugin_id,
)
.await
.map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "uninstall remote plugin"))?;
.await;

if matches!(
&uninstall_result,
Ok(()) | Err(RemotePluginCatalogError::CacheRemove(_))
) {
let plugins_manager = self.thread_manager.plugins_manager();
if plugins_manager.clear_remote_installed_plugins_cache() {
self.on_effective_plugins_changed(config.clone());
}
plugins_manager.maybe_start_remote_installed_plugins_cache_refresh_after_mutation(
&config,
auth.clone(),
Some(self.effective_plugins_changed_callback(config.clone())),
);
}

self.clear_plugin_related_caches();
uninstall_result.map_err(|err| {
remote_plugin_catalog_error_to_jsonrpc(err, "uninstall remote plugin")
})?;
Ok(PluginUninstallResponse {})
}
}
Expand Down
9 changes: 7 additions & 2 deletions codex-rs/app-server/src/message_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,15 @@ impl MessageProcessor {
});
if matches!(plugin_startup_tasks, crate::PluginStartupTasks::Start) {
// Keep plugin startup warmups aligned at app-server startup.
// TODO(xl): Move into PluginManager once this no longer depends on config feature gating.
let on_effective_plugins_changed =
codex_message_processor.effective_plugins_changed_callback((*config).clone());
thread_manager
.plugins_manager()
.maybe_start_plugin_startup_tasks_for_config(&config, auth_manager.clone());
.maybe_start_plugin_startup_tasks_for_config(
&config,
auth_manager.clone(),
Some(on_effective_plugins_changed),
);
}
let config_api = ConfigApi::new(
config_manager,
Expand Down
Loading
Loading