diff --git a/engine/packages/api-peer/src/runner_configs.rs b/engine/packages/api-peer/src/runner_configs.rs index e10fcd92b7..8324984794 100644 --- a/engine/packages/api-peer/src/runner_configs.rs +++ b/engine/packages/api-peer/src/runner_configs.rs @@ -124,14 +124,14 @@ pub async fn upsert( }) } -#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[derive(Debug, Serialize, Clone, Deserialize, IntoParams)] #[serde(deny_unknown_fields)] #[into_params(parameter_in = Query)] pub struct DeleteQuery { pub namespace: String, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] #[serde(deny_unknown_fields)] pub struct DeletePath { pub runner_name: String, diff --git a/engine/packages/api-public/src/runner_configs/delete.rs b/engine/packages/api-public/src/runner_configs/delete.rs index 0d7f2245c0..caa966cfd0 100644 --- a/engine/packages/api-public/src/runner_configs/delete.rs +++ b/engine/packages/api-public/src/runner_configs/delete.rs @@ -1,5 +1,6 @@ -use anyhow::Result; +use anyhow::{Context, Result}; use axum::response::{IntoResponse, Response}; +use futures_util::{StreamExt, TryStreamExt}; use rivet_api_builder::{ ApiError, extract::{Extension, Json, Path, Query}, @@ -38,30 +39,43 @@ pub async fn delete( async fn delete_inner(ctx: ApiCtx, path: DeletePath, query: DeleteQuery) -> Result { ctx.auth().await?; - for dc in &ctx.config().topology().datacenters { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::delete( - ctx.clone().into(), - DeletePath { - runner_name: path.runner_name.clone(), - }, - DeleteQuery { - namespace: query.namespace.clone(), - }, - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - Some(&query), - Option::<&()>::None, - ) - .await?; - } - } + let dcs = ctx.config().topology().datacenters.clone(); + futures_util::stream::iter(dcs) + .map(|dc| { + let ctx = ctx.clone(); + let query = query.clone(); + let path = path.clone(); + async move { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + Some(&query), + Option::<&()>::None, + ) + .await?; + } + + anyhow::Ok(()) + } + }) + .buffer_unordered(16) + .try_collect::>() + // NOTE: We must error when any peer request fails, not all + .await?; // Resolve namespace let namespace = ctx diff --git a/engine/packages/api-public/src/runner_configs/upsert.rs b/engine/packages/api-public/src/runner_configs/upsert.rs index d8d2b67049..b13d311ca4 100644 --- a/engine/packages/api-public/src/runner_configs/upsert.rs +++ b/engine/packages/api-public/src/runner_configs/upsert.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use anyhow::Result; use axum::response::{IntoResponse, Response}; +use futures_util::{StreamExt, TryStreamExt}; use rivet_api_builder::{ ApiError, extract::{Extension, Json, Path, Query}, @@ -85,58 +86,75 @@ async fn upsert_inner( }) .next(); - // Apply config - let mut any_endpoint_config_changed = false; - for dc in &ctx.config().topology().datacenters { - if let Some(runner_config) = body.datacenters.remove(&dc.name) { - let response = if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::upsert( - ctx.clone().into(), - path.clone(), - query.clone(), - rivet_api_peer::runner_configs::UpsertRequest(runner_config), - ) - .await? - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::PUT, - Some(&query), - Some(&runner_config), - ) - .await? - }; + let dcs = ctx + .config() + .topology() + .datacenters + .iter() + .map(|dc| (dc.clone(), body.datacenters.remove(&dc.name))) + .collect::>(); + let any_endpoint_config_changed = futures_util::stream::iter(dcs) + .map(|(dc, runner_config)| { + let ctx = ctx.clone(); + let query = query.clone(); + let path = path.clone(); + async move { + if let Some(runner_config) = runner_config { + let response = if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::upsert( + ctx.clone().into(), + path.clone(), + query.clone(), + rivet_api_peer::runner_configs::UpsertRequest(runner_config), + ) + .await? + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::PUT, + Some(&query), + Some(&runner_config), + ) + .await? + }; - if response.endpoint_config_changed { - any_endpoint_config_changed = true; - } - } else { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::delete( - ctx.clone().into(), - DeletePath { - runner_name: path.runner_name.clone(), - }, - DeleteQuery { - namespace: query.namespace.clone(), - }, - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - Some(&query), - Option::<&()>::None, - ) - .await?; + anyhow::Ok(response.endpoint_config_changed) + } else { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + Some(&query), + Option::<&()>::None, + ) + .await?; + } + + Ok(false) + } } - } - } + }) + .buffer_unordered(16) + .try_collect::>() + // NOTE: We must error when any peer request fails, not all + .await? + .into_iter() + .any(|endpoint_config_changed| endpoint_config_changed); // Update runner metadata // diff --git a/engine/packages/api-public/src/runner_configs/utils.rs b/engine/packages/api-public/src/runner_configs/utils.rs index 5c11284f34..cb2d0939c3 100644 --- a/engine/packages/api-public/src/runner_configs/utils.rs +++ b/engine/packages/api-public/src/runner_configs/utils.rs @@ -89,6 +89,7 @@ pub async fn fetch_serverless_runner_metadata( .headers(header_map) .timeout(Duration::from_secs(10)) .send() + .custom_instrument(tracing::info_span!("fetch_metadata_request")) .await .map_err(|err| { if err.is_timeout() { diff --git a/engine/packages/api-util/src/lib.rs b/engine/packages/api-util/src/lib.rs index 1c67533c45..b67a2fb3cd 100644 --- a/engine/packages/api-util/src/lib.rs +++ b/engine/packages/api-util/src/lib.rs @@ -109,38 +109,39 @@ where A: Fn(u16, I, &mut R), R: Default + Send + 'static, { - let dcs = &ctx.config().topology().datacenters; - - let results = futures_util::stream::iter(dcs.clone().into_iter().map(|dc| { - let ctx = ctx.clone(); - let query = query.clone(); - let endpoint = endpoint.to_string(); - let local_handler = local_handler.clone(); - - async move { - if dc.datacenter_label == ctx.config().dc_label() { - // Local datacenter - use direct API call - (dc.datacenter_label, local_handler(ctx, query).await) - } else { - // Remote datacenter - HTTP request - ( - dc.datacenter_label, - request_remote_datacenter::( - ctx.config(), + let dcs = ctx.config().topology().datacenters.clone(); + + let results = futures_util::stream::iter(dcs) + .map(|dc| { + let ctx = ctx.clone(); + let query = query.clone(); + let endpoint = endpoint.to_string(); + let local_handler = local_handler.clone(); + + async move { + if dc.datacenter_label == ctx.config().dc_label() { + // Local datacenter - use direct API call + (dc.datacenter_label, local_handler(ctx, query).await) + } else { + // Remote datacenter - HTTP request + ( dc.datacenter_label, - &endpoint, - Method::GET, - Some(&query), - Option::<&()>::None, + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &endpoint, + Method::GET, + Some(&query), + Option::<&()>::None, + ) + .await, ) - .await, - ) + } } - } - })) - .buffer_unordered(16) - .collect::>() - .await; + }) + .buffer_unordered(16) + .collect::>() + .await; // Aggregate results let result_count = results.len(); @@ -159,7 +160,7 @@ where // Error only if all requests failed if result_count == errors.len() { if let Some(res) = errors.into_iter().next() { - return Err(res).with_context(|| "all datacenter requests failed"); + return Err(res).context("all datacenter requests failed"); } } diff --git a/engine/packages/namespace/src/ops/resolve_for_name_global.rs b/engine/packages/namespace/src/ops/resolve_for_name_global.rs index 296ecffa9e..c78089b6af 100644 --- a/engine/packages/namespace/src/ops/resolve_for_name_global.rs +++ b/engine/packages/namespace/src/ops/resolve_for_name_global.rs @@ -35,6 +35,7 @@ pub async fn namespace_resolve_for_name_global( .get(url) .query(&[("name", &input.name)]) .send() + .custom_instrument(tracing::info_span!("namespaces_http_request")) .await?; let res = rivet_api_util::parse_response::<