From 331721f2c39ab57ca25a25b2990bd98f89dd99b9 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 13 Oct 2025 21:45:16 -0700 Subject: [PATCH 1/2] fix(core): isolate errors in serverless to each individual runner config --- packages/core/pegboard-serverless/src/lib.rs | 190 +++++++++++-------- 1 file changed, 112 insertions(+), 78 deletions(-) diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 24db9825a6..94f7922005 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use anyhow::Result; +use anyhow::{Context, Result, bail}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64; use futures_util::{StreamExt, TryStreamExt}; @@ -15,7 +15,7 @@ use pegboard::keys; use reqwest::header::{HeaderName, HeaderValue}; use reqwest_eventsource as sse; use rivet_runner_protocol as protocol; -use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind}; +use rivet_types::runner_configs::RunnerConfigKind; use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; @@ -104,95 +104,38 @@ async fn tick( }) .await?; + // Process each runner config with error handling for (ns_id, runner_name, desired_slots) in &serverless_data { - let runner_config = runner_configs - .iter() - .find(|rc| rc.namespace_id == *ns_id) - .context("runner config not found")?; - - let namespace = ctx - .op(namespace::ops::get_global::Input { - namespace_ids: vec![ns_id.clone()], - }) - .await - .context("runner namespace not found")?; - let namespace = namespace.first().context("runner namespace not found")?; - let namespace_name = &namespace.name; + let runner_config = runner_configs.iter().find(|rc| rc.namespace_id == *ns_id); - let RunnerConfigKind::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } = &runner_config.config.kind - else { + let Some(runner_config) = runner_config else { tracing::warn!( ?ns_id, - "this runner config should not be in the serverless subspace (wrong config kind)" + ?runner_name, + "runner config not found, likely deleted" ); continue; }; - let curr = outbound_connections - .entry((*ns_id, runner_name.clone())) - .or_insert_with(Vec::new); - - // Remove finished and draining connections from list - curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); - - // Log warning and reset to 0 if negative - let adjusted_desired_slots = if *desired_slots < 0 { + if let Err(err) = tick_runner_config( + ctx, + *ns_id, + runner_name.clone(), + *desired_slots, + runner_config, + outbound_connections, + ) + .await + { tracing::error!( ?ns_id, ?runner_name, - ?desired_slots, - "negative desired slots, scaling to 0" + ?err, + "failed to process runner config, continuing with others" ); - 0 - } else { - *desired_slots - }; - - let desired_count = - (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) - .max(*min_runners as i64) - + *runners_margin as i64) - .min(*max_runners as i64) - .try_into()?; - - // Calculate diff - let drain_count = curr.len().saturating_sub(desired_count); - let start_count = desired_count.saturating_sub(curr.len()); - - if drain_count != 0 { - // TODO: Implement smart logic of draining runners with the lowest allocated actors - let draining_connections = curr.split_off(desired_count); - - for conn in draining_connections { - if conn.shutdown_tx.send(()).is_err() { - tracing::warn!( - "serverless connection shutdown channel dropped, likely already stopped" - ); - } - } + // Continue processing other runner configs even if this one failed + continue; } - - let starting_connections = std::iter::repeat_with(|| { - spawn_connection( - ctx.clone(), - url.clone(), - headers.clone(), - Duration::from_secs(*request_lifespan as u64), - *slots_per_runner, - runner_name.clone(), - namespace_name.clone(), - ) - }) - .take(start_count); - curr.extend(starting_connections); } // Remove entries that aren't returned from udb @@ -209,6 +152,97 @@ async fn tick( Ok(()) } +async fn tick_runner_config( + ctx: &StandaloneCtx, + ns_id: Id, + runner_name: String, + desired_slots: i64, + runner_config: &namespace::ops::runner_config::get::RunnerConfig, + outbound_connections: &mut HashMap<(Id, String), Vec>, +) -> Result<()> { + let namespace = ctx + .op(namespace::ops::get_global::Input { + namespace_ids: vec![ns_id.clone()], + }) + .await + .context("runner namespace not found")?; + let namespace = namespace.first().context("runner namespace not found")?; + let namespace_name = &namespace.name; + + let RunnerConfigKind::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = &runner_config.config.kind + else { + bail!("runner config should not be in the serverless subspace (wrong config kind)"); + }; + + let curr = outbound_connections + .entry((ns_id, runner_name.clone())) + .or_insert_with(Vec::new); + + // Remove finished and draining connections from list + curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); + + // Log warning and reset to 0 if negative + let adjusted_desired_slots = if desired_slots < 0 { + tracing::error!( + ?ns_id, + ?runner_name, + ?desired_slots, + "negative desired slots, scaling to 0" + ); + 0 + } else { + desired_slots + }; + + let desired_count = + (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) + .max(*min_runners as i64) + + *runners_margin as i64) + .min(*max_runners as i64) + .try_into()?; + + // Calculate diff + let drain_count = curr.len().saturating_sub(desired_count); + let start_count = desired_count.saturating_sub(curr.len()); + + if drain_count != 0 { + // TODO: Implement smart logic of draining runners with the lowest allocated actors + let draining_connections = curr.split_off(desired_count); + + for conn in draining_connections { + if conn.shutdown_tx.send(()).is_err() { + tracing::warn!( + "serverless connection shutdown channel dropped, likely already stopped" + ); + } + } + } + + let starting_connections = std::iter::repeat_with(|| { + spawn_connection( + ctx.clone(), + url.clone(), + headers.clone(), + Duration::from_secs(*request_lifespan as u64), + *slots_per_runner, + runner_name.clone(), + namespace_name.clone(), + ) + }) + .take(start_count); + curr.extend(starting_connections); + + Ok(()) +} + fn spawn_connection( ctx: StandaloneCtx, url: String, From 0fe9d193d94234cbad97a6e1830ffd0988a32e98 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 13 Oct 2025 21:46:30 -0700 Subject: [PATCH 2/2] fix(pegboard): request /start endpoint for serverless runners --- packages/core/pegboard-serverless/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 94f7922005..978442ac1b 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -338,7 +338,8 @@ async fn outbound_handler( .chain(token) .collect(); - let req = client.get(url).headers(headers); + let endpoint_url = format!("{}/start", url.trim_end_matches('/')); + let req = client.get(endpoint_url).headers(headers); let mut source = sse::EventSource::new(req).context("failed creating event source")?; let mut runner_id = None;