From d8a19e4967036a176a50c278e3ecdb47a9552a21 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sat, 18 Oct 2025 00:35:24 -0700 Subject: [PATCH] chore(pegboard): only refresh serverless runner metadata when new or udpated endpoint --- out/openapi.json | 10 +++- packages/core/api-peer/src/runner_configs.rs | 21 ++++--- .../api-public/src/runner_configs/upsert.rs | 58 +++++++++++-------- .../namespace/src/ops/runner_config/upsert.rs | 41 ++++++++++--- 4 files changed, 91 insertions(+), 39 deletions(-) diff --git a/out/openapi.json b/out/openapi.json index 904e986439..d65a71c1d0 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -1696,7 +1696,15 @@ "additionalProperties": false }, "RunnerConfigsUpsertResponse": { - "type": "object" + "type": "object", + "required": [ + "endpoint_config_changed" + ], + "properties": { + "endpoint_config_changed": { + "type": "boolean" + } + } }, "RunnersListNamesResponse": { "type": "object", diff --git a/packages/core/api-peer/src/runner_configs.rs b/packages/core/api-peer/src/runner_configs.rs index fa992d7d02..e10fcd92b7 100644 --- a/packages/core/api-peer/src/runner_configs.rs +++ b/packages/core/api-peer/src/runner_configs.rs @@ -93,7 +93,9 @@ pub struct UpsertRequest(pub rivet_api_types::namespaces::runner_configs::Runner #[derive(Deserialize, Serialize, ToSchema)] #[schema(as = RunnerConfigsUpsertResponse)] -pub struct UpsertResponse {} +pub struct UpsertResponse { + pub endpoint_config_changed: bool, +} #[tracing::instrument(skip_all)] pub async fn upsert( @@ -109,14 +111,17 @@ pub async fn upsert( .await? .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - ctx.op(namespace::ops::runner_config::upsert::Input { - namespace_id: namespace.namespace_id, - name: path.runner_name, - config: body.0.into(), - }) - .await?; + let endpoint_config_changed = ctx + .op(namespace::ops::runner_config::upsert::Input { + namespace_id: namespace.namespace_id, + name: path.runner_name, + config: body.0.into(), + }) + .await?; - Ok(UpsertResponse {}) + Ok(UpsertResponse { + endpoint_config_changed, + }) } #[derive(Debug, Serialize, Deserialize, IntoParams)] diff --git a/packages/core/api-public/src/runner_configs/upsert.rs b/packages/core/api-public/src/runner_configs/upsert.rs index 0e225ba97b..d8d2b67049 100644 --- a/packages/core/api-public/src/runner_configs/upsert.rs +++ b/packages/core/api-public/src/runner_configs/upsert.rs @@ -59,11 +59,19 @@ async fn upsert_inner( tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert"); + // Resolve namespace + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + // Store serverless config before processing (since we'll remove from body.datacenters) let serverless_config = body .datacenters .iter() - .filter_map(|(dc_name, runner_config)| { + .filter_map(|(_dc_name, runner_config)| { if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless { url, headers, @@ -78,16 +86,17 @@ async fn upsert_inner( .next(); // Apply config + let mut any_endpoint_config_changed = false; for dc in &ctx.config().topology().datacenters { if let Some(runner_config) = body.datacenters.remove(&dc.name) { - if ctx.config().dc_label() == dc.datacenter_label { + let response = if ctx.config().dc_label() == dc.datacenter_label { rivet_api_peer::runner_configs::upsert( ctx.clone().into(), path.clone(), query.clone(), rivet_api_peer::runner_configs::UpsertRequest(runner_config), ) - .await?; + .await? } else { request_remote_datacenter::( ctx.config(), @@ -97,7 +106,11 @@ async fn upsert_inner( Some(&query), Some(&runner_config), ) - .await?; + .await? + }; + + if response.endpoint_config_changed { + any_endpoint_config_changed = true; } } else { if ctx.config().dc_label() == dc.datacenter_label { @@ -125,28 +138,25 @@ async fn upsert_inner( } } - // Resolve namespace - let namespace = ctx - .op(namespace::ops::resolve_for_name_global::Input { - name: query.namespace.clone(), - }) - .await? - .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; - // Update runner metadata // // This allows us to populate the actor names immediately upon configuring a serverless runner if let Some((url, metadata_headers)) = serverless_config { - if let Err(err) = utils::refresh_runner_config_metadata( - ctx.clone(), - namespace.namespace_id, - path.runner_name.clone(), - url, - metadata_headers, - ) - .await - { - tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata"); + if any_endpoint_config_changed { + tracing::debug!("endpoint config changed, refreshing metadata"); + if let Err(err) = utils::refresh_runner_config_metadata( + ctx.clone(), + namespace.namespace_id, + path.runner_name.clone(), + url, + metadata_headers, + ) + .await + { + tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata"); + } + } else { + tracing::debug!("endpoint config unchanged, skipping metadata refresh"); } } @@ -160,5 +170,7 @@ async fn upsert_inner( ) .await?; - Ok(UpsertResponse {}) + Ok(UpsertResponse { + endpoint_config_changed: any_endpoint_config_changed, + }) } diff --git a/packages/services/namespace/src/ops/runner_config/upsert.rs b/packages/services/namespace/src/ops/runner_config/upsert.rs index f6a45c7273..3b745d901a 100644 --- a/packages/services/namespace/src/ops/runner_config/upsert.rs +++ b/packages/services/namespace/src/ops/runner_config/upsert.rs @@ -12,22 +12,49 @@ pub struct Input { } #[operation] -pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result<()> { - ctx.udb()? +pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -> Result { + let endpoint_config_changed = ctx + .udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); - // Delete previous index - if let Some(existing_config) = tx.read_opt(&runner_config_key, Serializable).await? { + // Check if config changed (for serverless, compare URL and headers) + let endpoint_config_changed = if let Some(existing_config) = + tx.read_opt(&runner_config_key, Serializable).await? + { + // Delete previous index tx.delete(&keys::runner_config::ByVariantKey::new( input.namespace_id, runner_config_variant(&existing_config), input.name.clone(), )); - } + + // Check if serverless endpoint config changed + match (&existing_config.kind, &input.config.kind) { + ( + RunnerConfigKind::Serverless { + url: old_url, + headers: old_headers, + .. + }, + RunnerConfigKind::Serverless { + url: new_url, + headers: new_headers, + .. + }, + ) => old_url != new_url || old_headers != new_headers, + _ => { + // Config type changed or is not serverless + true + } + } + } else { + // New config + true + }; // Write new config tx.write(&runner_config_key, input.config.clone())?; @@ -104,7 +131,7 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - } } - Ok(Ok(())) + Ok(Ok(endpoint_config_changed)) }) .custom_instrument(tracing::info_span!("runner_config_upsert_tx")) .await? @@ -117,5 +144,5 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - .await?; } - Ok(()) + Ok(endpoint_config_changed) }