From a09b6164e3070cb7db3149de345be449419b5759 Mon Sep 17 00:00:00 2001 From: pakrym-oai Date: Fri, 24 Apr 2026 19:57:49 -0700 Subject: [PATCH] Streamline plugin app-server handlers --- .../app-server/src/codex_message_processor.rs | 369 +++++------- .../src/codex_message_processor/plugins.rs | 545 +++++++----------- 2 files changed, 327 insertions(+), 587 deletions(-) diff --git a/codex-rs/app-server/src/codex_message_processor.rs b/codex-rs/app-server/src/codex_message_processor.rs index 44b9a398cca5..696ecaaf4ef1 100644 --- a/codex-rs/app-server/src/codex_message_processor.rs +++ b/codex-rs/app-server/src/codex_message_processor.rs @@ -7,6 +7,7 @@ use crate::error_code::INPUT_TOO_LARGE_ERROR_CODE; use crate::error_code::INTERNAL_ERROR_CODE; use crate::error_code::INVALID_PARAMS_ERROR_CODE; use crate::error_code::INVALID_REQUEST_ERROR_CODE; +use crate::error_code::invalid_params; use crate::fuzzy_file_search::FuzzyFileSearchSession; use crate::fuzzy_file_search::run_fuzzy_file_search; use crate::fuzzy_file_search::start_fuzzy_file_search_session; @@ -6322,6 +6323,15 @@ impl CodexMessageProcessor { self.outgoing.send_error(request_id, error).await; } + async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) { + let error = JSONRPCErrorError { + code: INTERNAL_ERROR_CODE, + message, + data: None, + }; + self.outgoing.send_error(request_id, error).await; + } + fn input_too_large_error(actual_chars: usize) -> JSONRPCErrorError { JSONRPCErrorError { code: INVALID_PARAMS_ERROR_CODE, @@ -6344,41 +6354,6 @@ impl CodexMessageProcessor { Ok(()) } - async fn send_internal_error(&self, request_id: ConnectionRequestId, message: String) { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message, - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - - async fn send_marketplace_error( - &self, - request_id: ConnectionRequestId, - err: MarketplaceError, - action: &str, - ) { - match err { - MarketplaceError::MarketplaceNotFound { .. } => { - self.send_invalid_request_error(request_id, err.to_string()) - .await; - } - MarketplaceError::Io { .. } => { - self.send_internal_error(request_id, format!("failed to {action}: {err}")) - .await; - } - MarketplaceError::InvalidMarketplaceFile { .. } - | MarketplaceError::PluginNotFound { .. } - | MarketplaceError::PluginNotAvailable { .. } - | MarketplaceError::PluginsDisabled - | MarketplaceError::InvalidPlugin(_) => { - self.send_invalid_request_error(request_id, err.to_string()) - .await; - } - } - } - async fn wait_for_thread_shutdown(thread: &Arc) -> ThreadShutdownResult { match tokio::time::timeout(Duration::from_secs(10), thread.shutdown_and_wait()).await { Ok(Ok(())) => ThreadShutdownResult::Complete, @@ -6457,34 +6432,33 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: ThreadUnsubscribeParams, ) { - let thread_id = match ThreadId::from_string(¶ms.thread_id) { - Ok(id) => id, - Err(err) => { - self.send_invalid_request_error(request_id, format!("invalid thread id: {err}")) - .await; - return; - } - }; + let result = self + .thread_unsubscribe_response(params, request_id.connection_id) + .await; + self.outgoing.send_result(request_id, result).await; + } + + async fn thread_unsubscribe_response( + &self, + params: ThreadUnsubscribeParams, + connection_id: ConnectionId, + ) -> Result { + let thread_id = ThreadId::from_string(¶ms.thread_id) + .map_err(|err| invalid_request(format!("invalid thread id: {err}")))?; if self.thread_manager.get_thread(thread_id).await.is_err() { // Reconcile stale app-server bookkeeping when the thread has already been // removed from the core manager. This keeps loaded-status/subscription state // consistent with the source of truth before reporting NotLoaded. self.finalize_thread_teardown(thread_id).await; - self.outgoing - .send_response( - request_id, - ThreadUnsubscribeResponse { - status: ThreadUnsubscribeStatus::NotLoaded, - }, - ) - .await; - return; + return Ok(ThreadUnsubscribeResponse { + status: ThreadUnsubscribeStatus::NotLoaded, + }); }; let was_subscribed = self .thread_state_manager - .unsubscribe_connection_from_thread(thread_id, request_id.connection_id) + .unsubscribe_connection_from_thread(thread_id, connection_id) .await; let status = if was_subscribed { @@ -6492,9 +6466,7 @@ impl CodexMessageProcessor { } else { ThreadUnsubscribeStatus::NotSubscribed }; - self.outgoing - .send_response(request_id, ThreadUnsubscribeResponse { status }) - .await; + Ok(ThreadUnsubscribeResponse { status }) } async fn prepare_thread_for_archive(&self, thread_id: ThreadId) { @@ -6588,6 +6560,16 @@ impl CodexMessageProcessor { config: Config, environment_manager: Arc, ) { + let result = Self::apps_list_response(&outgoing, params, config, environment_manager).await; + outgoing.send_result(request_id, result).await; + } + + async fn apps_list_response( + outgoing: &Arc, + params: AppsListParams, + config: Config, + environment_manager: Arc, + ) -> Result { let AppsListParams { cursor, limit, @@ -6597,15 +6579,7 @@ impl CodexMessageProcessor { let start = match cursor { Some(cursor) => match cursor.parse::() { Ok(idx) => idx, - Err(_) => { - let error = JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!("invalid cursor: {cursor}"), - data: None, - }; - outgoing.send_error(request_id, error).await; - return; - } + Err(_) => return Err(invalid_request(format!("invalid cursor: {cursor}"))), }, None => 0, }; @@ -6659,7 +6633,7 @@ impl CodexMessageProcessor { accessible_loaded, all_loaded, ) { - apps_list_helpers::send_app_list_updated_notification(&outgoing, merged.clone()) + apps_list_helpers::send_app_list_updated_notification(outgoing, merged.clone()) .await; last_notified_apps = Some(merged); } @@ -6669,25 +6643,13 @@ impl CodexMessageProcessor { let result = match tokio::time::timeout_at(app_list_deadline, rx.recv()).await { Ok(Some(result)) => result, Ok(None) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: "failed to load app lists".to_string(), - data: None, - }; - outgoing.send_error(request_id, error).await; - return; + return Err(internal_error("failed to load app lists")); } Err(_) => { let timeout_seconds = APP_LIST_LOAD_TIMEOUT.as_secs(); - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!( - "timed out waiting for app lists after {timeout_seconds} seconds" - ), - data: None, - }; - outgoing.send_error(request_id, error).await; - return; + return Err(internal_error(format!( + "timed out waiting for app lists after {timeout_seconds} seconds" + ))); } }; @@ -6697,26 +6659,14 @@ impl CodexMessageProcessor { accessible_loaded = true; } AppListLoadResult::Accessible(Err(err)) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: err, - data: None, - }; - outgoing.send_error(request_id, error).await; - return; + return Err(internal_error(err)); } AppListLoadResult::Directory(Ok(connectors)) => { all_connectors = Some(connectors); all_loaded = true; } AppListLoadResult::Directory(Err(err)) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: err, - data: None, - }; - outgoing.send_error(request_id, error).await; - return; + return Err(internal_error(err)); } } @@ -6746,27 +6696,26 @@ impl CodexMessageProcessor { all_loaded, ) && last_notified_apps.as_ref() != Some(&merged) { - apps_list_helpers::send_app_list_updated_notification(&outgoing, merged.clone()) + apps_list_helpers::send_app_list_updated_notification(outgoing, merged.clone()) .await; last_notified_apps = Some(merged.clone()); } if accessible_loaded && all_loaded { - match apps_list_helpers::paginate_apps(merged.as_slice(), start, limit) { - Ok(response) => { - outgoing.send_response(request_id, response).await; - return; - } - Err(error) => { - outgoing.send_error(request_id, error).await; - return; - } - } + return apps_list_helpers::paginate_apps(merged.as_slice(), start, limit); } } } async fn skills_list(&self, request_id: ConnectionRequestId, params: SkillsListParams) { + let result = self.skills_list_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn skills_list_response( + &self, + params: SkillsListParams, + ) -> Result { let SkillsListParams { cwds, force_reload, @@ -6791,17 +6740,13 @@ impl CodexMessageProcessor { let mut valid_extra_roots = Vec::new(); for root in entry.extra_user_roots { - let Ok(root) = AbsolutePathBuf::from_absolute_path_checked(root.as_path()) else { - self.send_invalid_request_error( - request_id, - format!( + let root = + AbsolutePathBuf::from_absolute_path_checked(root.as_path()).map_err(|_| { + invalid_request(format!( "skills/list perCwdExtraUserRoots extraUserRoots paths must be absolute: {}", root.display() - ), - ) - .await; - return; - }; + )) + })?; valid_extra_roots.push(root); } extra_roots_by_cwd @@ -6810,13 +6755,7 @@ impl CodexMessageProcessor { .extend(valid_extra_roots); } - let config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(error) => { - self.outgoing.send_error(request_id, error).await; - return; - } - }; + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; let auth = self.auth_manager.auth().await; let workspace_codex_plugins_enabled = self .workspace_codex_plugins_enabled(&config, auth.as_ref()) @@ -6895,9 +6834,7 @@ impl CodexMessageProcessor { errors, }); } - self.outgoing - .send_response(request_id, SkillsListResponse { data }) - .await; + Ok(SkillsListResponse { data }) } async fn marketplace_remove( &self, @@ -6910,27 +6847,16 @@ impl CodexMessageProcessor { marketplace_name: params.marketplace_name, }, ) - .await; - - match result { - Ok(outcome) => { - self.outgoing - .send_response( - request_id, - MarketplaceRemoveResponse { - marketplace_name: outcome.marketplace_name, - installed_root: outcome.removed_installed_root, - }, - ) - .await; - } - Err(MarketplaceRemoveError::InvalidRequest(message)) => { - self.send_invalid_request_error(request_id, message).await; - } - Err(MarketplaceRemoveError::Internal(message)) => { - self.send_internal_error(request_id, message).await; - } - } + .await + .map(|outcome| MarketplaceRemoveResponse { + marketplace_name: outcome.marketplace_name, + installed_root: outcome.removed_installed_root, + }) + .map_err(|err| match err { + MarketplaceRemoveError::InvalidRequest(message) => invalid_request(message), + MarketplaceRemoveError::Internal(message) => internal_error(message), + }); + self.outgoing.send_result(request_id, result).await; } async fn marketplace_upgrade( @@ -6938,53 +6864,38 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: MarketplaceUpgradeParams, ) { - let config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } - }; + let result = self.marketplace_upgrade_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn marketplace_upgrade_response( + &self, + params: MarketplaceUpgradeParams, + ) -> Result { + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; let plugins_manager = self.thread_manager.plugins_manager(); let MarketplaceUpgradeParams { marketplace_name } = params; - let result = tokio::task::spawn_blocking(move || { + let outcome = tokio::task::spawn_blocking(move || { plugins_manager .upgrade_configured_marketplaces_for_config(&config, marketplace_name.as_deref()) }) - .await; - - match result { - Ok(Ok(outcome)) => { - self.outgoing - .send_response( - request_id, - MarketplaceUpgradeResponse { - selected_marketplaces: outcome.selected_marketplaces, - upgraded_roots: outcome.upgraded_roots, - errors: outcome - .errors - .into_iter() - .map(|err| MarketplaceUpgradeErrorInfo { - marketplace_name: err.marketplace_name, - message: err.message, - }) - .collect(), - }, - ) - .await; - } - Ok(Err(message)) => { - self.send_invalid_request_error(request_id, message).await; - } - Err(err) => { - self.send_internal_error( - request_id, - format!("failed to upgrade marketplaces: {err}"), - ) - .await; - } - } + .await + .map_err(|err| internal_error(format!("failed to upgrade marketplaces: {err}")))? + .map_err(invalid_request)?; + + Ok(MarketplaceUpgradeResponse { + selected_marketplaces: outcome.selected_marketplaces, + upgraded_roots: outcome.upgraded_roots, + errors: outcome + .errors + .into_iter() + .map(|err| MarketplaceUpgradeErrorInfo { + marketplace_name: err.marketplace_name, + message: err.message, + }) + .collect(), + }) } async fn marketplace_add(&self, request_id: ConnectionRequestId, params: MarketplaceAddParams) { @@ -6996,28 +6907,17 @@ impl CodexMessageProcessor { sparse_paths: params.sparse_paths.unwrap_or_default(), }, ) - .await; - - match result { - Ok(outcome) => { - self.outgoing - .send_response( - request_id, - MarketplaceAddResponse { - marketplace_name: outcome.marketplace_name, - installed_root: outcome.installed_root, - already_added: outcome.already_added, - }, - ) - .await; - } - Err(MarketplaceAddError::InvalidRequest(message)) => { - self.send_invalid_request_error(request_id, message).await; - } - Err(MarketplaceAddError::Internal(message)) => { - self.send_internal_error(request_id, message).await; - } - } + .await + .map(|outcome| MarketplaceAddResponse { + marketplace_name: outcome.marketplace_name, + installed_root: outcome.installed_root, + already_added: outcome.already_added, + }) + .map_err(|err| match err { + MarketplaceAddError::InvalidRequest(message) => invalid_request(message), + MarketplaceAddError::Internal(message) => internal_error(message), + }); + self.outgoing.send_result(request_id, result).await; } async fn skills_config_write( @@ -7025,6 +6925,14 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: SkillsConfigWriteParams, ) { + let result = self.skills_config_write_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn skills_config_write_response( + &self, + params: SkillsConfigWriteParams, + ) -> Result { let SkillsConfigWriteParams { path, name, @@ -7039,43 +6947,24 @@ impl CodexMessageProcessor { ConfigEdit::SetSkillConfigByName { name, enabled } } _ => { - let error = JSONRPCErrorError { - code: INVALID_PARAMS_ERROR_CODE, - message: "skills/config/write requires exactly one of path or name".to_string(), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - return; + return Err(invalid_params( + "skills/config/write requires exactly one of path or name", + )); } }; let edits = vec![edit]; - let result = ConfigEditsBuilder::new(&self.config.codex_home) + ConfigEditsBuilder::new(&self.config.codex_home) .with_edits(edits) .apply() - .await; - - match result { - Ok(()) => { + .await + .map(|()| { self.thread_manager.plugins_manager().clear_cache(); self.thread_manager.skills_manager().clear_cache(); - self.outgoing - .send_response( - request_id, - SkillsConfigWriteResponse { - effective_enabled: enabled, - }, - ) - .await; - } - Err(err) => { - let error = JSONRPCErrorError { - code: INTERNAL_ERROR_CODE, - message: format!("failed to update skill settings: {err}"), - data: None, - }; - self.outgoing.send_error(request_id, error).await; - } - } + SkillsConfigWriteResponse { + effective_enabled: enabled, + } + }) + .map_err(|err| internal_error(format!("failed to update skill settings: {err}"))) } async fn turn_start( diff --git a/codex-rs/app-server/src/codex_message_processor/plugins.rs b/codex-rs/app-server/src/codex_message_processor/plugins.rs index 8f0f4dea9a8f..e1d0fffad37e 100644 --- a/codex-rs/app-server/src/codex_message_processor/plugins.rs +++ b/codex-rs/app-server/src/codex_message_processor/plugins.rs @@ -1,4 +1,6 @@ use super::*; +use crate::error_code::internal_error; +use crate::error_code::invalid_request; use codex_app_server_protocol::PluginInstallPolicy; impl CodexMessageProcessor { @@ -7,46 +9,33 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: PluginListParams, ) { + let result = self.plugin_list_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn plugin_list_response( + &self, + params: PluginListParams, + ) -> Result { let plugins_manager = self.thread_manager.plugins_manager(); let PluginListParams { cwds } = params; let roots = cwds.unwrap_or_default(); - let config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; + let empty_response = || PluginListResponse { + marketplaces: Vec::new(), + marketplace_load_errors: Vec::new(), + featured_plugin_ids: Vec::new(), }; if !config.features.enabled(Feature::Plugins) { - self.outgoing - .send_response( - request_id, - PluginListResponse { - marketplaces: Vec::new(), - marketplace_load_errors: Vec::new(), - featured_plugin_ids: Vec::new(), - }, - ) - .await; - return; + return Ok(empty_response()); } let auth = self.auth_manager.auth().await; if !self .workspace_codex_plugins_enabled(&config, auth.as_ref()) .await { - self.outgoing - .send_response( - request_id, - PluginListResponse { - marketplaces: Vec::new(), - marketplace_load_errors: Vec::new(), - featured_plugin_ids: Vec::new(), - }, - ) - .await; - return; + return Ok(empty_response()); } plugins_manager.maybe_start_non_curated_plugin_cache_refresh(&roots); @@ -100,18 +89,11 @@ impl CodexMessageProcessor { .await { Ok(Ok(outcome)) => outcome, - Ok(Err(err)) => { - self.send_marketplace_error(request_id, err, "list marketplace plugins") - .await; - return; - } + Ok(Err(err)) => return Err(Self::marketplace_error(err, "list marketplace plugins")), Err(err) => { - self.send_internal_error( - request_id, - format!("failed to list marketplace plugins: {err}"), - ) - .await; - return; + return Err(internal_error(format!( + "failed to list marketplace plugins: {err}" + ))); } }; @@ -174,16 +156,11 @@ impl CodexMessageProcessor { Vec::new() }; - self.outgoing - .send_response( - request_id, - PluginListResponse { - marketplaces: data, - marketplace_load_errors, - featured_plugin_ids, - }, - ) - .await; + Ok(PluginListResponse { + marketplaces: data, + marketplace_load_errors, + featured_plugin_ids, + }) } pub(super) async fn plugin_read( @@ -191,6 +168,14 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: PluginReadParams, ) { + let result = self.plugin_read_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn plugin_read_response( + &self, + params: PluginReadParams, + ) -> Result { let plugins_manager = self.thread_manager.plugins_manager(); let PluginReadParams { marketplace_path, @@ -201,30 +186,16 @@ impl CodexMessageProcessor { (Some(marketplace_path), None) => Ok(marketplace_path), (None, Some(remote_marketplace_name)) => Err(remote_marketplace_name), (Some(_), Some(_)) | (None, None) => { - self.outgoing - .send_error( - request_id, - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "plugin/read requires exactly one of marketplacePath or remoteMarketplaceName".to_string(), - data: None, - }, - ) - .await; - return; + return Err(invalid_request( + "plugin/read requires exactly one of marketplacePath or remoteMarketplaceName", + )); } }; let config_cwd = read_source.as_ref().ok().and_then(|marketplace_path| { marketplace_path.as_path().parent().map(Path::to_path_buf) }); - let config = match self.load_latest_config(config_cwd).await { - Ok(config) => config, - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } - }; + let config = self.load_latest_config(config_cwd).await?; let plugin = match read_source { Ok(marketplace_path) => { @@ -232,17 +203,10 @@ impl CodexMessageProcessor { plugin_name, marketplace_path, }; - let outcome = match plugins_manager + let outcome = plugins_manager .read_plugin_for_config(&config, &request) .await - { - Ok(outcome) => outcome, - Err(err) => { - self.send_marketplace_error(request_id, err, "read plugin details") - .await; - return; - } - }; + .map_err(|err| Self::marketplace_error(err, "read plugin details"))?; let environment_manager = self.thread_manager.environment_manager(); let app_summaries = plugin_app_helpers::load_plugin_app_summaries( &config, @@ -287,19 +251,9 @@ impl CodexMessageProcessor { if !config.features.enabled(Feature::Plugins) || !config.features.enabled(Feature::RemotePlugin) { - self.outgoing - .send_error( - request_id, - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "remote plugin read is not enabled for marketplace {remote_marketplace_name}" - ), - data: None, - }, - ) - .await; - return; + return Err(invalid_request(format!( + "remote plugin read is not enabled for marketplace {remote_marketplace_name}" + ))); } let auth = self.auth_manager.auth().await; let remote_plugin_service_config = RemotePluginServiceConfig { @@ -310,36 +264,20 @@ impl CodexMessageProcessor { .chars() .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~') { - self.send_invalid_request_error( - request_id, - "invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed" - .to_string(), - ) - .await; - return; + return Err(invalid_request( + "invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed", + )); } - let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail( + let remote_detail = codex_core_plugins::remote::fetch_remote_plugin_detail( &remote_plugin_service_config, auth.as_ref(), &remote_marketplace_name, &plugin_name, ) .await - { - Ok(remote_detail) => remote_detail, - Err(err) => { - self.outgoing - .send_error( - request_id, - remote_plugin_catalog_error_to_jsonrpc( - err, - "read remote plugin details", - ), - ) - .await; - return; - } - }; + .map_err(|err| { + remote_plugin_catalog_error_to_jsonrpc(err, "read remote plugin details") + })?; let plugin_apps = remote_detail .app_ids .iter() @@ -357,9 +295,7 @@ impl CodexMessageProcessor { } }; - self.outgoing - .send_response(request_id, PluginReadResponse { plugin }) - .await; + Ok(PluginReadResponse { plugin }) } pub(super) async fn plugin_install( @@ -367,6 +303,14 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: PluginInstallParams, ) { + let result = self.plugin_install_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn plugin_install_response( + &self, + params: PluginInstallParams, + ) -> Result { let PluginInstallParams { marketplace_path, remote_marketplace_name, @@ -375,44 +319,27 @@ impl CodexMessageProcessor { let marketplace_path = match (marketplace_path, remote_marketplace_name) { (Some(marketplace_path), None) => marketplace_path, (None, Some(remote_marketplace_name)) => { - self.remote_plugin_install(request_id, remote_marketplace_name, plugin_name) + return self + .remote_plugin_install_response(remote_marketplace_name, plugin_name) .await; - return; } (Some(_), Some(_)) | (None, None) => { - self.outgoing - .send_error( - request_id, - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: "plugin/install requires exactly one of marketplacePath or remoteMarketplaceName".to_string(), - data: None, - }, - ) - .await; - return; + return Err(invalid_request( + "plugin/install requires exactly one of marketplacePath or remoteMarketplaceName", + )); } }; let config_cwd = marketplace_path.as_path().parent().map(Path::to_path_buf); - let config = match self.load_latest_config(config_cwd.clone()).await { - Ok(config) => config, - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } - }; + let config = self.load_latest_config(config_cwd.clone()).await?; let auth = self.auth_manager.auth().await; if !self .workspace_codex_plugins_enabled(&config, auth.as_ref()) .await { - self.send_invalid_request_error( - request_id, - "Codex plugins are disabled for this workspace".to_string(), - ) - .await; - return; + return Err(invalid_request( + "Codex plugins are disabled for this workspace", + )); } let plugins_manager = self.thread_manager.plugins_manager(); @@ -421,197 +348,103 @@ impl CodexMessageProcessor { marketplace_path, }; - let install_result = plugins_manager.install_plugin(request).await; - - match install_result { - Ok(result) => { - let config = match self.load_latest_config(config_cwd).await { - Ok(config) => config, - Err(err) => { - warn!( - "failed to reload config after plugin install, using current config: {err:?}" - ); - config - } - }; - - self.clear_plugin_related_caches(); - - let plugin_mcp_servers = - load_plugin_mcp_servers(result.installed_path.as_path()).await; - - if !plugin_mcp_servers.is_empty() { - if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await { - warn!( - plugin = result.plugin_id.as_key(), - "failed to queue MCP refresh after plugin install: {err:?}" - ); - } - self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers) - .await; - } + let result = plugins_manager + .install_plugin(request) + .await + .map_err(Self::plugin_install_error)?; + let config = match self.load_latest_config(config_cwd).await { + Ok(config) => config, + Err(err) => { + warn!( + "failed to reload config after plugin install, using current config: {err:?}" + ); + config + } + }; - let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await; - let auth = self.auth_manager.auth().await; - let apps_needing_auth = self - .plugin_apps_needing_auth_for_install( - &config, - auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth), - &result.plugin_id.as_key(), - &plugin_apps, - ) - .await; + self.clear_plugin_related_caches(); - self.outgoing - .send_response( - request_id, - PluginInstallResponse { - auth_policy: result.auth_policy.into(), - apps_needing_auth, - }, - ) - .await; - } - Err(err) => { - if err.is_invalid_request() { - self.send_invalid_request_error(request_id, err.to_string()) - .await; - return; - } + let plugin_mcp_servers = load_plugin_mcp_servers(result.installed_path.as_path()).await; - match err { - CorePluginInstallError::Marketplace(err) => { - self.send_marketplace_error(request_id, err, "install plugin") - .await; - } - CorePluginInstallError::Config(err) => { - self.send_internal_error( - request_id, - format!("failed to persist installed plugin config: {err}"), - ) - .await; - } - CorePluginInstallError::Remote(err) => { - self.send_internal_error( - request_id, - format!("failed to enable remote plugin: {err}"), - ) - .await; - } - CorePluginInstallError::Join(err) => { - self.send_internal_error( - request_id, - format!("failed to install plugin: {err}"), - ) - .await; - } - CorePluginInstallError::Store(err) => { - self.send_internal_error( - request_id, - format!("failed to install plugin: {err}"), - ) - .await; - } - } + if !plugin_mcp_servers.is_empty() { + if let Err(err) = self.queue_mcp_server_refresh_for_config(&config).await { + warn!( + plugin = result.plugin_id.as_key(), + "failed to queue MCP refresh after plugin install: {err:?}" + ); } + self.start_plugin_mcp_oauth_logins(&config, plugin_mcp_servers) + .await; } + + let plugin_apps = load_plugin_apps(result.installed_path.as_path()).await; + let auth = self.auth_manager.auth().await; + let apps_needing_auth = self + .plugin_apps_needing_auth_for_install( + &config, + auth.as_ref().is_some_and(CodexAuth::is_chatgpt_auth), + &result.plugin_id.as_key(), + &plugin_apps, + ) + .await; + + Ok(PluginInstallResponse { + auth_policy: result.auth_policy.into(), + apps_needing_auth, + }) } - async fn remote_plugin_install( + async fn remote_plugin_install_response( &self, - request_id: ConnectionRequestId, remote_marketplace_name: String, plugin_name: String, - ) { - let config = match self.load_latest_config(/*fallback_cwd*/ None).await { - Ok(config) => config, - Err(err) => { - self.outgoing.send_error(request_id, err).await; - return; - } - }; + ) -> Result { + let config = self.load_latest_config(/*fallback_cwd*/ None).await?; if !config.features.enabled(Feature::Plugins) || !config.features.enabled(Feature::RemotePlugin) { - self.outgoing - .send_error( - request_id, - JSONRPCErrorError { - code: INVALID_REQUEST_ERROR_CODE, - message: format!( - "remote plugin install is not enabled for marketplace {remote_marketplace_name}" - ), - data: None, - }, - ) - .await; - return; + return Err(invalid_request(format!( + "remote plugin install is not enabled for marketplace {remote_marketplace_name}" + ))); } if plugin_name.is_empty() || !plugin_name .chars() .all(|ch| ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' || ch == '~') { - self.send_invalid_request_error( - request_id, - "invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed" - .to_string(), - ) - .await; - return; + return Err(invalid_request( + "invalid remote plugin id: only ASCII letters, digits, `_`, `-`, and `~` are allowed", + )); } let auth = self.auth_manager.auth().await; let remote_plugin_service_config = RemotePluginServiceConfig { chatgpt_base_url: config.chatgpt_base_url.clone(), }; - let remote_detail = match codex_core_plugins::remote::fetch_remote_plugin_detail( + let remote_detail = codex_core_plugins::remote::fetch_remote_plugin_detail( &remote_plugin_service_config, auth.as_ref(), &remote_marketplace_name, &plugin_name, ) .await - { - Ok(remote_detail) => remote_detail, - Err(err) => { - self.outgoing - .send_error( - request_id, - remote_plugin_catalog_error_to_jsonrpc( - err, - "read remote plugin details before install", - ), - ) - .await; - return; - } - }; + .map_err(|err| { + remote_plugin_catalog_error_to_jsonrpc(err, "read remote plugin details before install") + })?; if remote_detail.summary.install_policy == PluginInstallPolicy::NotAvailable { - self.send_invalid_request_error( - request_id, - format!("remote plugin {plugin_name} is not available for install"), - ) - .await; - return; + return Err(invalid_request(format!( + "remote plugin {plugin_name} is not available for install" + ))); } - if let Err(err) = codex_core_plugins::remote::install_remote_plugin( + codex_core_plugins::remote::install_remote_plugin( &remote_plugin_service_config, auth.as_ref(), &remote_marketplace_name, &plugin_name, ) .await - { - self.outgoing - .send_error( - request_id, - remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"), - ) - .await; - return; - } + .map_err(|err| remote_plugin_catalog_error_to_jsonrpc(err, "install remote plugin"))?; self.clear_plugin_related_caches(); @@ -629,15 +462,10 @@ impl CodexMessageProcessor { ) .await; - self.outgoing - .send_response( - request_id, - PluginInstallResponse { - auth_policy: remote_detail.summary.auth_policy, - apps_needing_auth, - }, - ) - .await; + Ok(PluginInstallResponse { + auth_policy: remote_detail.summary.auth_policy, + apps_needing_auth, + }) } async fn plugin_apps_needing_auth_for_install( @@ -709,59 +537,82 @@ impl CodexMessageProcessor { request_id: ConnectionRequestId, params: PluginUninstallParams, ) { + let result = self.plugin_uninstall_response(params).await; + self.outgoing.send_result(request_id, result).await; + } + + async fn plugin_uninstall_response( + &self, + params: PluginUninstallParams, + ) -> Result { let PluginUninstallParams { plugin_id } = params; let plugins_manager = self.thread_manager.plugins_manager(); - let uninstall_result = plugins_manager.uninstall_plugin(plugin_id).await; + plugins_manager + .uninstall_plugin(plugin_id) + .await + .map_err(Self::plugin_uninstall_error)?; + self.clear_plugin_related_caches(); + Ok(PluginUninstallResponse {}) + } - match uninstall_result { - Ok(()) => { - self.clear_plugin_related_caches(); - self.outgoing - .send_response(request_id, PluginUninstallResponse {}) - .await; + fn plugin_install_error(err: CorePluginInstallError) -> JSONRPCErrorError { + if err.is_invalid_request() { + return invalid_request(err.to_string()); + } + + match err { + CorePluginInstallError::Marketplace(err) => { + Self::marketplace_error(err, "install plugin") } - Err(err) => { - if err.is_invalid_request() { - self.send_invalid_request_error(request_id, err.to_string()) - .await; - return; - } + CorePluginInstallError::Config(err) => { + internal_error(format!("failed to persist installed plugin config: {err}")) + } + CorePluginInstallError::Remote(err) => { + internal_error(format!("failed to enable remote plugin: {err}")) + } + CorePluginInstallError::Join(err) => { + internal_error(format!("failed to install plugin: {err}")) + } + CorePluginInstallError::Store(err) => { + internal_error(format!("failed to install plugin: {err}")) + } + } + } - match err { - CorePluginUninstallError::Config(err) => { - self.send_internal_error( - request_id, - format!("failed to clear plugin config: {err}"), - ) - .await; - } - CorePluginUninstallError::Remote(err) => { - self.send_internal_error( - request_id, - format!("failed to uninstall remote plugin: {err}"), - ) - .await; - } - CorePluginUninstallError::Join(err) => { - self.send_internal_error( - request_id, - format!("failed to uninstall plugin: {err}"), - ) - .await; - } - CorePluginUninstallError::Store(err) => { - self.send_internal_error( - request_id, - format!("failed to uninstall plugin: {err}"), - ) - .await; - } - CorePluginUninstallError::InvalidPluginId(_) => { - unreachable!("invalid plugin ids are handled above"); - } - } + fn plugin_uninstall_error(err: CorePluginUninstallError) -> JSONRPCErrorError { + if err.is_invalid_request() { + return invalid_request(err.to_string()); + } + + match err { + CorePluginUninstallError::Config(err) => { + internal_error(format!("failed to clear plugin config: {err}")) + } + CorePluginUninstallError::Remote(err) => { + internal_error(format!("failed to uninstall remote plugin: {err}")) + } + CorePluginUninstallError::Join(err) => { + internal_error(format!("failed to uninstall plugin: {err}")) + } + CorePluginUninstallError::Store(err) => { + internal_error(format!("failed to uninstall plugin: {err}")) } + CorePluginUninstallError::InvalidPluginId(_) => { + unreachable!("invalid plugin ids are handled above"); + } + } + } + + fn marketplace_error(err: MarketplaceError, action: &str) -> JSONRPCErrorError { + match err { + MarketplaceError::MarketplaceNotFound { .. } + | MarketplaceError::InvalidMarketplaceFile { .. } + | MarketplaceError::PluginNotFound { .. } + | MarketplaceError::PluginNotAvailable { .. } + | MarketplaceError::PluginsDisabled + | MarketplaceError::InvalidPlugin(_) => invalid_request(err.to_string()), + MarketplaceError::Io { .. } => internal_error(format!("failed to {action}: {err}")), } } }