From 331721f2c39ab57ca25a25b2990bd98f89dd99b9 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 13 Oct 2025 21:45:16 -0700 Subject: [PATCH 1/3] fix(core): isolate errors in serverless to each individual runner config --- packages/core/pegboard-serverless/src/lib.rs | 190 +++++++++++-------- 1 file changed, 112 insertions(+), 78 deletions(-) diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 24db9825a6..94f7922005 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -6,7 +6,7 @@ use std::{ }, }; -use anyhow::Result; +use anyhow::{Context, Result, bail}; use base64::Engine; use base64::engine::general_purpose::STANDARD as BASE64; use futures_util::{StreamExt, TryStreamExt}; @@ -15,7 +15,7 @@ use pegboard::keys; use reqwest::header::{HeaderName, HeaderValue}; use reqwest_eventsource as sse; use rivet_runner_protocol as protocol; -use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind}; +use rivet_types::runner_configs::RunnerConfigKind; use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; @@ -104,95 +104,38 @@ async fn tick( }) .await?; + // Process each runner config with error handling for (ns_id, runner_name, desired_slots) in &serverless_data { - let runner_config = runner_configs - .iter() - .find(|rc| rc.namespace_id == *ns_id) - .context("runner config not found")?; - - let namespace = ctx - .op(namespace::ops::get_global::Input { - namespace_ids: vec![ns_id.clone()], - }) - .await - .context("runner namespace not found")?; - let namespace = namespace.first().context("runner namespace not found")?; - let namespace_name = &namespace.name; + let runner_config = runner_configs.iter().find(|rc| rc.namespace_id == *ns_id); - let RunnerConfigKind::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } = &runner_config.config.kind - else { + let Some(runner_config) = runner_config else { tracing::warn!( ?ns_id, - "this runner config should not be in the serverless subspace (wrong config kind)" + ?runner_name, + "runner config not found, likely deleted" ); continue; }; - let curr = outbound_connections - .entry((*ns_id, runner_name.clone())) - .or_insert_with(Vec::new); - - // Remove finished and draining connections from list - curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); - - // Log warning and reset to 0 if negative - let adjusted_desired_slots = if *desired_slots < 0 { + if let Err(err) = tick_runner_config( + ctx, + *ns_id, + runner_name.clone(), + *desired_slots, + runner_config, + outbound_connections, + ) + .await + { tracing::error!( ?ns_id, ?runner_name, - ?desired_slots, - "negative desired slots, scaling to 0" + ?err, + "failed to process runner config, continuing with others" ); - 0 - } else { - *desired_slots - }; - - let desired_count = - (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) - .max(*min_runners as i64) - + *runners_margin as i64) - .min(*max_runners as i64) - .try_into()?; - - // Calculate diff - let drain_count = curr.len().saturating_sub(desired_count); - let start_count = desired_count.saturating_sub(curr.len()); - - if drain_count != 0 { - // TODO: Implement smart logic of draining runners with the lowest allocated actors - let draining_connections = curr.split_off(desired_count); - - for conn in draining_connections { - if conn.shutdown_tx.send(()).is_err() { - tracing::warn!( - "serverless connection shutdown channel dropped, likely already stopped" - ); - } - } + // Continue processing other runner configs even if this one failed + continue; } - - let starting_connections = std::iter::repeat_with(|| { - spawn_connection( - ctx.clone(), - url.clone(), - headers.clone(), - Duration::from_secs(*request_lifespan as u64), - *slots_per_runner, - runner_name.clone(), - namespace_name.clone(), - ) - }) - .take(start_count); - curr.extend(starting_connections); } // Remove entries that aren't returned from udb @@ -209,6 +152,97 @@ async fn tick( Ok(()) } +async fn tick_runner_config( + ctx: &StandaloneCtx, + ns_id: Id, + runner_name: String, + desired_slots: i64, + runner_config: &namespace::ops::runner_config::get::RunnerConfig, + outbound_connections: &mut HashMap<(Id, String), Vec>, +) -> Result<()> { + let namespace = ctx + .op(namespace::ops::get_global::Input { + namespace_ids: vec![ns_id.clone()], + }) + .await + .context("runner namespace not found")?; + let namespace = namespace.first().context("runner namespace not found")?; + let namespace_name = &namespace.name; + + let RunnerConfigKind::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = &runner_config.config.kind + else { + bail!("runner config should not be in the serverless subspace (wrong config kind)"); + }; + + let curr = outbound_connections + .entry((ns_id, runner_name.clone())) + .or_insert_with(Vec::new); + + // Remove finished and draining connections from list + curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); + + // Log warning and reset to 0 if negative + let adjusted_desired_slots = if desired_slots < 0 { + tracing::error!( + ?ns_id, + ?runner_name, + ?desired_slots, + "negative desired slots, scaling to 0" + ); + 0 + } else { + desired_slots + }; + + let desired_count = + (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) + .max(*min_runners as i64) + + *runners_margin as i64) + .min(*max_runners as i64) + .try_into()?; + + // Calculate diff + let drain_count = curr.len().saturating_sub(desired_count); + let start_count = desired_count.saturating_sub(curr.len()); + + if drain_count != 0 { + // TODO: Implement smart logic of draining runners with the lowest allocated actors + let draining_connections = curr.split_off(desired_count); + + for conn in draining_connections { + if conn.shutdown_tx.send(()).is_err() { + tracing::warn!( + "serverless connection shutdown channel dropped, likely already stopped" + ); + } + } + } + + let starting_connections = std::iter::repeat_with(|| { + spawn_connection( + ctx.clone(), + url.clone(), + headers.clone(), + Duration::from_secs(*request_lifespan as u64), + *slots_per_runner, + runner_name.clone(), + namespace_name.clone(), + ) + }) + .take(start_count); + curr.extend(starting_connections); + + Ok(()) +} + fn spawn_connection( ctx: StandaloneCtx, url: String, From 0fe9d193d94234cbad97a6e1830ffd0988a32e98 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 13 Oct 2025 21:46:30 -0700 Subject: [PATCH 2/3] fix(pegboard): request /start endpoint for serverless runners --- packages/core/pegboard-serverless/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 94f7922005..978442ac1b 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -338,7 +338,8 @@ async fn outbound_handler( .chain(token) .collect(); - let req = client.get(url).headers(headers); + let endpoint_url = format!("{}/start", url.trim_end_matches('/')); + let req = client.get(endpoint_url).headers(headers); let mut source = sse::EventSource::new(req).context("failed creating event source")?; let mut runner_id = None; From 3b2f1c6c9b00a1606bf1c6d22c79600d937b423f Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Mon, 13 Oct 2025 19:24:59 -0700 Subject: [PATCH 3/3] chore(core): auto-populate actor names for serverless runners --- Cargo.lock | 1 + packages/common/pools/src/reqwest.rs | 4 +- packages/core/api-public/Cargo.toml | 1 + packages/core/api-public/src/router.rs | 13 +- .../core/api-public/src/runner_configs.rs | 467 ------------------ .../api-public/src/runner_configs/delete.rs | 75 +++ .../api-public/src/runner_configs/list.rs | 106 ++++ .../core/api-public/src/runner_configs/mod.rs | 12 + .../src/runner_configs/refresh_metadata.rs | 114 +++++ .../runner_configs/serverless_health_check.rs | 80 +++ .../api-public/src/runner_configs/upsert.rs | 159 ++++++ .../api-public/src/runner_configs/utils.rs | 217 ++++++++ .../namespace/src/keys/runner_config.rs | 7 + 13 files changed, 784 insertions(+), 472 deletions(-) delete mode 100644 packages/core/api-public/src/runner_configs.rs create mode 100644 packages/core/api-public/src/runner_configs/delete.rs create mode 100644 packages/core/api-public/src/runner_configs/list.rs create mode 100644 packages/core/api-public/src/runner_configs/mod.rs create mode 100644 packages/core/api-public/src/runner_configs/refresh_metadata.rs create mode 100644 packages/core/api-public/src/runner_configs/serverless_health_check.rs create mode 100644 packages/core/api-public/src/runner_configs/upsert.rs create mode 100644 packages/core/api-public/src/runner_configs/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 46bf46693e..71d2ee1d42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4138,6 +4138,7 @@ dependencies = [ "rivet-api-types", "rivet-api-util", "rivet-config", + "rivet-data", "rivet-error", "rivet-pools", "rivet-types", diff --git a/packages/common/pools/src/reqwest.rs b/packages/common/pools/src/reqwest.rs index 23476203dd..779c08e219 100644 --- a/packages/common/pools/src/reqwest.rs +++ b/packages/common/pools/src/reqwest.rs @@ -3,11 +3,13 @@ use tokio::sync::OnceCell; static CLIENT: OnceCell = OnceCell::const_new(); static CLIENT_NO_TIMEOUT: OnceCell = OnceCell::const_new(); +static CLIENT_USER_AGENT: &str = concat!("RivetEngine/", env!("CARGO_PKG_VERSION")); pub async fn client() -> Result { CLIENT .get_or_try_init(|| async { Client::builder() + .user_agent(CLIENT_USER_AGENT) .timeout(std::time::Duration::from_secs(30)) .build() }) @@ -17,7 +19,7 @@ pub async fn client() -> Result { pub async fn client_no_timeout() -> Result { CLIENT_NO_TIMEOUT - .get_or_try_init(|| async { Client::builder().build() }) + .get_or_try_init(|| async { Client::builder().user_agent(CLIENT_USER_AGENT).build() }) .await .cloned() } diff --git a/packages/core/api-public/Cargo.toml b/packages/core/api-public/Cargo.toml index f1b546c253..ad9a71d21c 100644 --- a/packages/core/api-public/Cargo.toml +++ b/packages/core/api-public/Cargo.toml @@ -20,6 +20,7 @@ rivet-api-peer.workspace = true rivet-api-types.workspace = true rivet-api-util.workspace = true rivet-config.workspace = true +rivet-data.workspace = true rivet-error.workspace = true rivet-pools.workspace = true rivet-types.workspace = true diff --git a/packages/core/api-public/src/router.rs b/packages/core/api-public/src/router.rs index 962e70c7a9..ae7c64a66a 100644 --- a/packages/core/api-public/src/router.rs +++ b/packages/core/api-public/src/router.rs @@ -22,10 +22,11 @@ use crate::{actors, ctx, datacenters, metadata, namespaces, runner_configs, runn runners::list_names, namespaces::list, namespaces::create, - runner_configs::list, - runner_configs::upsert, - runner_configs::delete, - runner_configs::serverless_health_check, + runner_configs::list::list, + runner_configs::upsert::upsert, + runner_configs::delete::delete, + runner_configs::serverless_health_check::serverless_health_check, + runner_configs::refresh_metadata::refresh_metadata, datacenters::list, ), components( @@ -66,6 +67,10 @@ pub async fn router( "/runner-configs/{runner_name}", axum::routing::delete(runner_configs::delete), ) + .route( + "/runner-configs/{runner_name}/refresh-metadata", + axum::routing::post(runner_configs::refresh_metadata), + ) // MARK: Actors .route("/actors", axum::routing::get(actors::list::list)) .route("/actors", axum::routing::post(actors::create::create)) diff --git a/packages/core/api-public/src/runner_configs.rs b/packages/core/api-public/src/runner_configs.rs deleted file mode 100644 index 88a1b6d98d..0000000000 --- a/packages/core/api-public/src/runner_configs.rs +++ /dev/null @@ -1,467 +0,0 @@ -use std::{collections::HashMap, time::Duration}; - -use anyhow::Result; -use axum::{ - http::HeaderMap, - response::{IntoResponse, Response}, -}; -use reqwest::header::{HeaderMap as ReqwestHeaderMap, HeaderName, HeaderValue}; -use rivet_api_builder::{ - ApiError, - extract::{Extension, Json, Path, Query}, -}; -use rivet_api_peer::runner_configs::*; -use rivet_api_types::{pagination::Pagination, runner_configs::list::*}; -use rivet_api_util::{fanout_to_datacenters, request_remote_datacenter}; -use serde::{Deserialize, Serialize}; -use utoipa::IntoParams; -use utoipa::ToSchema; - -use crate::ctx::ApiCtx; - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = RunnerConfigsListResponse)] -pub struct ListResponse { - pub runner_configs: HashMap, - pub pagination: Pagination, -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[schema(as = RunnerConfigsListResponseRunnerConfigsValue)] -pub struct RunnerConfigDatacenters { - pub datacenters: HashMap, -} - -#[utoipa::path( - get, - operation_id = "runner_configs_list", - path = "/runner-configs", - params( - ListQuery, - ), - responses( - (status = 200, body = ListResponse), - ), - security(("bearer_auth" = [])), -)] -pub async fn list( - Extension(ctx): Extension, - headers: HeaderMap, - Path(path): Path, - Query(query): Query, -) -> Response { - match list_inner(ctx, headers, path, query).await { - Ok(response) => Json(response).into_response(), - Err(err) => ApiError::from(err).into_response(), - } -} - -async fn list_inner( - ctx: ApiCtx, - headers: HeaderMap, - path: ListPath, - query: ListQuery, -) -> Result { - ctx.auth().await?; - - let runner_configs = fanout_to_datacenters::< - rivet_api_types::runner_configs::list::ListResponse, - _, - _, - _, - _, - HashMap, - >( - ctx.clone().into(), - headers, - "/runner-configs", - query.clone(), - move |ctx, query| { - let path = path.clone(); - async move { rivet_api_peer::runner_configs::list(ctx, path, query).await } - }, - |dc_label, res, agg| { - for (runner_name, runner_config) in res.runner_configs { - let entry = agg - .entry(runner_name) - .or_insert_with(|| RunnerConfigDatacenters { - datacenters: HashMap::new(), - }); - - entry.datacenters.insert( - ctx.config() - .dc_for_label(dc_label) - .expect("dc should exist") - .name - .clone(), - runner_config, - ); - } - }, - ) - .await?; - - Ok(ListResponse { - runner_configs, - pagination: Pagination { cursor: None }, - }) -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = RunnerConfigsUpsertRequestBody)] -pub struct UpsertRequest { - pub datacenters: HashMap, -} - -#[utoipa::path( - put, - operation_id = "runner_configs_upsert", - path = "/runner-configs/{runner_name}", - params( - ("runner_name" = String, Path), - UpsertQuery, - ), - request_body(content = UpsertRequest, content_type = "application/json"), - responses( - (status = 200, body = UpsertResponse), - ), - security(("bearer_auth" = [])), -)] -pub async fn upsert( - Extension(ctx): Extension, - headers: HeaderMap, - Path(path): Path, - Query(query): Query, - Json(body): Json, -) -> Response { - match upsert_inner(ctx, headers, path, query, body).await { - Ok(response) => Json(response).into_response(), - Err(err) => ApiError::from(err).into_response(), - } -} - -async fn upsert_inner( - ctx: ApiCtx, - headers: HeaderMap, - path: UpsertPath, - query: UpsertQuery, - mut body: UpsertRequest, -) -> Result { - ctx.auth().await?; - - for dc in &ctx.config().topology().datacenters { - if let Some(runner_config) = body.datacenters.remove(&dc.name) { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::upsert( - ctx.clone().into(), - path.clone(), - query.clone(), - rivet_api_peer::runner_configs::UpsertRequest(runner_config), - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::PUT, - headers.clone(), - Some(&query), - Some(&runner_config), - ) - .await?; - } - } else { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::delete( - ctx.clone().into(), - DeletePath { - runner_name: path.runner_name.clone(), - }, - DeleteQuery { - namespace: query.namespace.clone(), - }, - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - headers.clone(), - Some(&query), - Option::<&()>::None, - ) - .await?; - } - } - } - - Ok(UpsertResponse {}) -} - -#[utoipa::path( - delete, - operation_id = "runner_configs_delete", - path = "/runner-configs/{runner_name}", - params( - ("runner_name" = String, Path), - DeleteQuery, - ), - responses( - (status = 200, body = DeleteResponse), - ), - security(("bearer_auth" = [])), -)] -pub async fn delete( - Extension(ctx): Extension, - headers: HeaderMap, - Path(path): Path, - Query(query): Query, -) -> Response { - match delete_inner(ctx, headers, path, query).await { - Ok(response) => Json(response).into_response(), - Err(err) => ApiError::from(err).into_response(), - } -} - -async fn delete_inner( - ctx: ApiCtx, - headers: HeaderMap, - path: DeletePath, - query: DeleteQuery, -) -> Result { - ctx.auth().await?; - - for dc in &ctx.config().topology().datacenters { - if ctx.config().dc_label() == dc.datacenter_label { - rivet_api_peer::runner_configs::delete( - ctx.clone().into(), - DeletePath { - runner_name: path.runner_name.clone(), - }, - DeleteQuery { - namespace: query.namespace.clone(), - }, - ) - .await?; - } else { - request_remote_datacenter::( - ctx.config(), - dc.datacenter_label, - &format!("/runner-configs/{}", path.runner_name), - axum::http::Method::DELETE, - headers.clone(), - Some(&query), - Option::<&()>::None, - ) - .await?; - } - } - - Ok(DeleteResponse {}) -} - -#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] -#[serde(deny_unknown_fields)] -#[into_params(parameter_in = Query)] -pub struct ServerlessHealthCheckQuery { - // NOTE: Only used in ee for ACL - pub namespace: String, -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(deny_unknown_fields)] -#[schema(as = RunnerConfigsServerlessHealthCheckRequest)] -pub struct ServerlessHealthCheckRequest { - pub url: String, - #[serde(default)] - pub headers: HashMap, -} - -#[derive(Deserialize, Serialize, ToSchema, Clone, Debug, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -#[schema(as = RunnerConfigsServerlessHealthCheckError)] -pub enum ServerlessHealthCheckError { - InvalidRequest {}, - RequestFailed {}, - RequestTimedOut {}, - NonSuccessStatus { status_code: u16, body: String }, - InvalidResponseJson { body: String }, - InvalidResponseSchema { runtime: String, version: String }, -} - -#[derive(Deserialize, Serialize, ToSchema)] -#[serde(rename_all = "snake_case")] -#[schema(as = RunnerConfigsServerlessHealthCheckResponse)] -pub enum ServerlessHealthCheckResponse { - Success { version: String }, - Failure { error: ServerlessHealthCheckError }, -} - -impl ServerlessHealthCheckResponse { - fn success(version: String) -> Self { - Self::Success { version } - } - - fn failure(error: ServerlessHealthCheckError) -> Self { - Self::Failure { error } - } -} - -const RESPONSE_BODY_MAX_LEN: usize = 1024; - -fn truncate_response_body(body: &str) -> String { - let mut chars = body.chars(); - let mut truncated: String = chars.by_ref().take(RESPONSE_BODY_MAX_LEN).collect(); - if chars.next().is_some() { - truncated.push_str("...[truncated]"); - } - - truncated -} - -#[derive(Deserialize)] -struct ServerlessMetadataPayload { - runtime: String, - version: String, -} - -#[utoipa::path( - post, - operation_id = "runner_configs_serverless_health_check", - path = "/runner-configs/serverless-health-check", - params( - UpsertQuery, - ), - request_body(content = ServerlessHealthCheckRequest, content_type = "application/json"), - responses( - (status = 200, body = ServerlessHealthCheckResponse), - ), - security(("bearer_auth" = [])), -)] -pub async fn serverless_health_check( - Extension(ctx): Extension, - Query(query): Query, - Json(body): Json, -) -> Response { - match serverless_health_check_inner(ctx, query, body).await { - Ok(response) => Json(response).into_response(), - Err(err) => ApiError::from(err).into_response(), - } -} - -async fn serverless_health_check_inner( - ctx: ApiCtx, - _query: ServerlessHealthCheckQuery, - body: ServerlessHealthCheckRequest, -) -> Result { - ctx.auth().await?; - - let ServerlessHealthCheckRequest { url, headers } = body; - let trimmed_url = url.trim(); - if trimmed_url.is_empty() { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::InvalidRequest {}, - )); - } - - let health_url = format!("{}/metadata", trimmed_url.trim_end_matches('/')); - - if reqwest::Url::parse(&health_url).is_err() { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::InvalidRequest {}, - )); - } - - let mut header_map = ReqwestHeaderMap::new(); - for (name, value) in headers { - let header_name = match HeaderName::from_bytes(name.trim().as_bytes()) { - Ok(name) => name, - Err(_) => { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::InvalidRequest {}, - )); - } - }; - - let header_value = match HeaderValue::from_str(value.trim()) { - Ok(value) => value, - Err(_) => { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::InvalidRequest {}, - )); - } - }; - - header_map.insert(header_name, header_value); - } - - let client = match reqwest::Client::builder() - .timeout(Duration::from_secs(10)) - .build() - { - Ok(client) => client, - Err(_) => { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::RequestFailed {}, - )); - } - }; - - let response = match client.get(&health_url).headers(header_map).send().await { - Ok(response) => response, - Err(err) => { - let error = if err.is_timeout() { - ServerlessHealthCheckError::RequestTimedOut {} - } else { - ServerlessHealthCheckError::RequestFailed {} - }; - - return Ok(ServerlessHealthCheckResponse::failure(error)); - } - }; - - let status = response.status(); - let body_raw = response - .text() - .await - .unwrap_or_else(|_| String::from("")); - let body_for_user = truncate_response_body(&body_raw); - - if !status.is_success() { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::NonSuccessStatus { - status_code: status.as_u16(), - body: body_for_user, - }, - )); - } - - let payload = match serde_json::from_str::(&body_raw) { - Ok(payload) => payload, - Err(_) => { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::InvalidResponseJson { - body: body_for_user, - }, - )); - } - }; - - let ServerlessMetadataPayload { runtime, version } = payload; - - let trimmed_version = version.trim(); - if runtime != "rivetkit" || trimmed_version.is_empty() { - return Ok(ServerlessHealthCheckResponse::failure( - ServerlessHealthCheckError::InvalidResponseSchema { runtime, version }, - )); - } - - Ok(ServerlessHealthCheckResponse::success( - trimmed_version.to_owned(), - )) -} diff --git a/packages/core/api-public/src/runner_configs/delete.rs b/packages/core/api-public/src/runner_configs/delete.rs new file mode 100644 index 0000000000..faaa45ea5e --- /dev/null +++ b/packages/core/api-public/src/runner_configs/delete.rs @@ -0,0 +1,75 @@ +use anyhow::Result; +use axum::{ + http::HeaderMap, + response::{IntoResponse, Response}, +}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Json, Path, Query}, +}; +use rivet_api_peer::runner_configs::*; +use rivet_api_util::request_remote_datacenter; + +use crate::ctx::ApiCtx; + +#[utoipa::path( + delete, + operation_id = "runner_configs_delete", + path = "/runner-configs/{runner_name}", + params( + ("runner_name" = String, Path), + DeleteQuery, + ), + responses( + (status = 200, body = DeleteResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn delete( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, +) -> Response { + match delete_inner(ctx, headers, path, query).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn delete_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: DeletePath, + query: DeleteQuery, +) -> Result { + ctx.auth().await?; + + for dc in &ctx.config().topology().datacenters { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + headers.clone(), + Some(&query), + Option::<&()>::None, + ) + .await?; + } + } + + Ok(DeleteResponse {}) +} diff --git a/packages/core/api-public/src/runner_configs/list.rs b/packages/core/api-public/src/runner_configs/list.rs new file mode 100644 index 0000000000..8a62446185 --- /dev/null +++ b/packages/core/api-public/src/runner_configs/list.rs @@ -0,0 +1,106 @@ +use std::collections::HashMap; + +use anyhow::Result; +use axum::{ + http::HeaderMap, + response::{IntoResponse, Response}, +}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Json, Path, Query}, +}; +use rivet_api_types::{pagination::Pagination, runner_configs::list::*}; +use rivet_api_util::fanout_to_datacenters; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::ctx::ApiCtx; + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsListResponse)] +pub struct ListResponse { + pub runner_configs: HashMap, + pub pagination: Pagination, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[schema(as = RunnerConfigsListResponseRunnerConfigsValue)] +pub struct RunnerConfigDatacenters { + pub datacenters: HashMap, +} + +#[utoipa::path( + get, + operation_id = "runner_configs_list", + path = "/runner-configs", + params( + ListQuery, + ), + responses( + (status = 200, body = ListResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn list( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, +) -> Response { + match list_inner(ctx, headers, path, query).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn list_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: ListPath, + query: ListQuery, +) -> Result { + ctx.auth().await?; + + let runner_configs = fanout_to_datacenters::< + rivet_api_types::runner_configs::list::ListResponse, + _, + _, + _, + _, + HashMap, + >( + ctx.clone().into(), + headers, + "/runner-configs", + query.clone(), + move |ctx, query| { + let path = path.clone(); + async move { rivet_api_peer::runner_configs::list(ctx, path, query).await } + }, + |dc_label, res, agg| { + for (runner_name, runner_config) in res.runner_configs { + let entry = agg + .entry(runner_name) + .or_insert_with(|| RunnerConfigDatacenters { + datacenters: HashMap::new(), + }); + + entry.datacenters.insert( + ctx.config() + .dc_for_label(dc_label) + .expect("dc should exist") + .name + .clone(), + runner_config, + ); + } + }, + ) + .await?; + + Ok(ListResponse { + runner_configs, + pagination: Pagination { cursor: None }, + }) +} diff --git a/packages/core/api-public/src/runner_configs/mod.rs b/packages/core/api-public/src/runner_configs/mod.rs new file mode 100644 index 0000000000..c386e70b90 --- /dev/null +++ b/packages/core/api-public/src/runner_configs/mod.rs @@ -0,0 +1,12 @@ +pub mod delete; +pub mod list; +pub mod refresh_metadata; +pub mod serverless_health_check; +pub mod upsert; +pub mod utils; + +pub use delete::delete; +pub use list::list; +pub use refresh_metadata::refresh_metadata; +pub use serverless_health_check::serverless_health_check; +pub use upsert::upsert; diff --git a/packages/core/api-public/src/runner_configs/refresh_metadata.rs b/packages/core/api-public/src/runner_configs/refresh_metadata.rs new file mode 100644 index 0000000000..7abfe20d39 --- /dev/null +++ b/packages/core/api-public/src/runner_configs/refresh_metadata.rs @@ -0,0 +1,114 @@ +use anyhow::*; +use axum::response::{IntoResponse, Response}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Json, Path, Query}, +}; +use serde::{Deserialize, Serialize}; +use utoipa::IntoParams; +use utoipa::ToSchema; + +use super::utils::refresh_runner_config_metadata; +use crate::ctx::ApiCtx; + +#[derive(Debug, Serialize, Deserialize, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct RefreshMetadataQuery { + pub namespace: String, +} + +#[derive(Deserialize)] +#[serde(deny_unknown_fields)] +pub struct RefreshMetadataPath { + pub runner_name: String, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsRefreshMetadataRequest)] +pub struct RefreshMetadataRequest {} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsRefreshMetadataResponse)] +pub struct RefreshMetadataResponse {} + +#[utoipa::path( + post, + operation_id = "runner_configs_refresh_metadata", + path = "/runner-configs/{runner_name}/refresh-metadata", + params( + ("runner_name" = String, Path), + RefreshMetadataQuery, + ), + request_body(content = RefreshMetadataRequest, content_type = "application/json"), + responses( + (status = 200, body = RefreshMetadataResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn refresh_metadata( + Extension(ctx): Extension, + Path(path): Path, + Query(query): Query, + Json(body): Json, +) -> Response { + match refresh_metadata_inner(ctx, path, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn refresh_metadata_inner( + ctx: ApiCtx, + path: RefreshMetadataPath, + query: RefreshMetadataQuery, + _body: RefreshMetadataRequest, +) -> Result { + ctx.auth().await?; + + // Resolve namespace + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + // Fetch runner configs for all datacenters + let runners: Vec<_> = ctx + .config() + .topology() + .datacenters + .iter() + .map(|_dc| (namespace.namespace_id, path.runner_name.clone())) + .collect(); + + let runner_configs = ctx + .op(namespace::ops::runner_config::get::Input { + runners, + bypass_cache: true, + }) + .await?; + + // Find first serverless config + let (url, headers) = runner_configs + .iter() + .find_map(|runner_config| { + if let rivet_types::runner_configs::RunnerConfigKind::Serverless { + url, headers, .. + } = &runner_config.config.kind + { + Some((url.clone(), headers.clone())) + } else { + None + } + }) + .ok_or_else(|| anyhow!("no serverless runner config found"))?; + + refresh_runner_config_metadata(ctx, namespace.namespace_id, path.runner_name, url, headers) + .await?; + + Ok(RefreshMetadataResponse {}) +} diff --git a/packages/core/api-public/src/runner_configs/serverless_health_check.rs b/packages/core/api-public/src/runner_configs/serverless_health_check.rs new file mode 100644 index 0000000000..ab593a55e9 --- /dev/null +++ b/packages/core/api-public/src/runner_configs/serverless_health_check.rs @@ -0,0 +1,80 @@ +use std::collections::HashMap; + +use anyhow::Result; +use axum::response::{IntoResponse, Response}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Json, Query}, +}; +use serde::{Deserialize, Serialize}; +use utoipa::IntoParams; +use utoipa::ToSchema; + +use super::utils::{ServerlessMetadataError, fetch_serverless_runner_metadata}; +use crate::ctx::ApiCtx; + +#[derive(Debug, Serialize, Deserialize, Clone, IntoParams)] +#[serde(deny_unknown_fields)] +#[into_params(parameter_in = Query)] +pub struct ServerlessHealthCheckQuery { + // NOTE: Only used in ee for ACL + pub namespace: String, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsServerlessHealthCheckRequest)] +pub struct ServerlessHealthCheckRequest { + pub url: String, + #[serde(default)] + pub headers: HashMap, +} + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(rename_all = "snake_case")] +#[schema(as = RunnerConfigsServerlessHealthCheckResponse)] +pub enum ServerlessHealthCheckResponse { + Success { version: String }, + Failure { error: ServerlessMetadataError }, +} + +#[utoipa::path( + post, + operation_id = "runner_configs_serverless_health_check", + path = "/runner-configs/serverless-health-check", + params( + ServerlessHealthCheckQuery, + ), + request_body(content = ServerlessHealthCheckRequest, content_type = "application/json"), + responses( + (status = 200, body = ServerlessHealthCheckResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn serverless_health_check( + Extension(ctx): Extension, + Query(query): Query, + Json(body): Json, +) -> Response { + match serverless_health_check_inner(ctx, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn serverless_health_check_inner( + ctx: ApiCtx, + _query: ServerlessHealthCheckQuery, + body: ServerlessHealthCheckRequest, +) -> Result { + ctx.auth().await?; + + let ServerlessHealthCheckRequest { url, headers } = body; + + match fetch_serverless_runner_metadata(url, headers).await { + Ok(metadata) => Ok(ServerlessHealthCheckResponse::Success { + version: metadata.version, + }), + Err(error) => Ok(ServerlessHealthCheckResponse::Failure { error }), + } +} diff --git a/packages/core/api-public/src/runner_configs/upsert.rs b/packages/core/api-public/src/runner_configs/upsert.rs new file mode 100644 index 0000000000..0509a3c5ce --- /dev/null +++ b/packages/core/api-public/src/runner_configs/upsert.rs @@ -0,0 +1,159 @@ +use std::collections::HashMap; + +use anyhow::Result; +use axum::{ + http::HeaderMap, + response::{IntoResponse, Response}, +}; +use rivet_api_builder::{ + ApiError, + extract::{Extension, Json, Path, Query}, +}; +use rivet_api_peer::runner_configs::*; +use rivet_api_util::request_remote_datacenter; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use super::utils; +use crate::ctx::ApiCtx; + +#[derive(Deserialize, Serialize, ToSchema)] +#[serde(deny_unknown_fields)] +#[schema(as = RunnerConfigsUpsertRequestBody)] +pub struct UpsertRequest { + pub datacenters: HashMap, +} + +#[utoipa::path( + put, + operation_id = "runner_configs_upsert", + path = "/runner-configs/{runner_name}", + params( + ("runner_name" = String, Path), + UpsertQuery, + ), + request_body(content = UpsertRequest, content_type = "application/json"), + responses( + (status = 200, body = UpsertResponse), + ), + security(("bearer_auth" = [])), +)] +pub async fn upsert( + Extension(ctx): Extension, + headers: HeaderMap, + Path(path): Path, + Query(query): Query, + Json(body): Json, +) -> Response { + match upsert_inner(ctx, headers, path, query, body).await { + Ok(response) => Json(response).into_response(), + Err(err) => ApiError::from(err).into_response(), + } +} + +async fn upsert_inner( + ctx: ApiCtx, + headers: HeaderMap, + path: UpsertPath, + query: UpsertQuery, + mut body: UpsertRequest, +) -> Result { + ctx.auth().await?; + + tracing::debug!(runner_name = ?path.runner_name, datacenters_count = body.datacenters.len(), "starting upsert"); + + // Store serverless config before processing (since we'll remove from body.datacenters) + let serverless_config = body + .datacenters + .iter() + .filter_map(|(dc_name, runner_config)| { + if let rivet_api_types::namespaces::runner_configs::RunnerConfigKind::Serverless { + url, + headers, + .. + } = &runner_config.kind + { + Some((url.clone(), headers.clone().unwrap_or_default())) + } else { + None + } + }) + .next(); + + // Apply config + for dc in &ctx.config().topology().datacenters { + if let Some(runner_config) = body.datacenters.remove(&dc.name) { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::upsert( + ctx.clone().into(), + path.clone(), + query.clone(), + rivet_api_peer::runner_configs::UpsertRequest(runner_config), + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::PUT, + headers.clone(), + Some(&query), + Some(&runner_config), + ) + .await?; + } + } else { + if ctx.config().dc_label() == dc.datacenter_label { + rivet_api_peer::runner_configs::delete( + ctx.clone().into(), + DeletePath { + runner_name: path.runner_name.clone(), + }, + DeleteQuery { + namespace: query.namespace.clone(), + }, + ) + .await?; + } else { + request_remote_datacenter::( + ctx.config(), + dc.datacenter_label, + &format!("/runner-configs/{}", path.runner_name), + axum::http::Method::DELETE, + headers.clone(), + Some(&query), + Option::<&()>::None, + ) + .await?; + } + } + } + + // Update runner metadata + // + // This allows us to populate the actor names immediately upon configuring a serverless runner + if let Some((url, metadata_headers)) = serverless_config { + // Resolve namespace + let namespace = ctx + .op(namespace::ops::resolve_for_name_global::Input { + name: query.namespace.clone(), + }) + .await? + .ok_or_else(|| namespace::errors::Namespace::NotFound.build())?; + + if let Err(err) = utils::refresh_runner_config_metadata( + ctx.clone(), + namespace.namespace_id, + path.runner_name.clone(), + url, + metadata_headers, + ) + .await + { + tracing::warn!(?err, runner_name = ?path.runner_name, "failed to refresh runner config metadata"); + } + } + + Ok(UpsertResponse {}) +} diff --git a/packages/core/api-public/src/runner_configs/utils.rs b/packages/core/api-public/src/runner_configs/utils.rs new file mode 100644 index 0000000000..7ccfcfd100 --- /dev/null +++ b/packages/core/api-public/src/runner_configs/utils.rs @@ -0,0 +1,217 @@ +use std::{collections::HashMap, time::Duration}; + +use anyhow::anyhow; +use gas::prelude::*; +use reqwest::header::{HeaderMap as ReqwestHeaderMap, HeaderName, HeaderValue}; +use serde::{Deserialize, Serialize}; +use utoipa::ToSchema; + +use crate::ctx::ApiCtx; + +const RESPONSE_BODY_MAX_LEN: usize = 1024; + +#[derive(Deserialize, Serialize, ToSchema, Clone, Debug, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +#[schema(as = RunnerConfigsServerlessMetadataError)] +pub enum ServerlessMetadataError { + InvalidRequest {}, + RequestFailed {}, + RequestTimedOut {}, + NonSuccessStatus { status_code: u16, body: String }, + InvalidResponseJson { body: String }, + InvalidResponseSchema { runtime: String, version: String }, +} + +#[derive(Debug, Clone)] +pub struct ServerlessMetadata { + pub runtime: String, + pub version: String, + pub actor_names: HashMap, +} + +#[derive(Deserialize)] +struct ServerlessMetadataPayload { + runtime: String, + version: String, + #[serde(rename = "actorNames", default)] + actor_names: HashMap, +} + +fn truncate_response_body(body: &str) -> String { + let mut chars = body.chars(); + let mut truncated: String = chars.by_ref().take(RESPONSE_BODY_MAX_LEN).collect(); + if chars.next().is_some() { + truncated.push_str("...[truncated]"); + } + + truncated +} + +/// Fetches metadata from a serverless runner at the given URL. +/// +/// Returns metadata including runtime, version, and actor names if available. +pub async fn fetch_serverless_runner_metadata( + url: String, + headers: HashMap, +) -> Result { + tracing::debug!(?url, "fetching serverless runner metadata"); + + let trimmed_url = url.trim(); + if trimmed_url.is_empty() { + return Err(ServerlessMetadataError::InvalidRequest {}); + } + + let metadata_url = format!("{}/metadata", trimmed_url.trim_end_matches('/')); + + if reqwest::Url::parse(&metadata_url).is_err() { + return Err(ServerlessMetadataError::InvalidRequest {}); + } + + let mut header_map = ReqwestHeaderMap::new(); + for (name, value) in headers { + let header_name = HeaderName::from_bytes(name.trim().as_bytes()) + .map_err(|_| ServerlessMetadataError::InvalidRequest {})?; + + let header_value = HeaderValue::from_str(value.trim()) + .map_err(|_| ServerlessMetadataError::InvalidRequest {})?; + + header_map.insert(header_name, header_value); + } + + let client = rivet_pools::reqwest::client() + .await + .map_err(|_| ServerlessMetadataError::RequestFailed {})?; + + tracing::debug!("sending metadata request"); + let response = client + .get(&metadata_url) + .headers(header_map) + .timeout(Duration::from_secs(10)) + .send() + .await + .map_err(|err| { + if err.is_timeout() { + ServerlessMetadataError::RequestTimedOut {} + } else { + ServerlessMetadataError::RequestFailed {} + } + })?; + + let status = response.status(); + tracing::debug!(?status, "received metadata response"); + let body_raw = response + .text() + .await + .unwrap_or_else(|_| String::from("")); + let body_for_user = truncate_response_body(&body_raw); + + if !status.is_success() { + return Err(ServerlessMetadataError::NonSuccessStatus { + status_code: status.as_u16(), + body: body_for_user, + }); + } + + let payload = serde_json::from_str::(&body_raw).map_err(|err| { + ServerlessMetadataError::InvalidResponseJson { + body: body_for_user, + } + })?; + + let ServerlessMetadataPayload { + runtime, + version, + actor_names, + } = payload; + + tracing::debug!( + ?runtime, + ?version, + actor_names_count = actor_names.len(), + "parsed metadata payload" + ); + + let trimmed_version = version.trim(); + if runtime != "rivetkit" || trimmed_version.is_empty() { + return Err(ServerlessMetadataError::InvalidResponseSchema { runtime, version }); + } + + Ok(ServerlessMetadata { + runtime, + version: trimmed_version.to_owned(), + actor_names, + }) +} + +/// Fetches metadata from the given URL and populates actor names in the database. +pub async fn refresh_runner_config_metadata( + ctx: ApiCtx, + namespace_id: Id, + runner_name: String, + url: String, + headers: HashMap, +) -> anyhow::Result<()> { + tracing::debug!( + ?namespace_id, + ?runner_name, + "refreshing runner config metadata" + ); + + // Fetch metadata + let metadata = fetch_serverless_runner_metadata(url, headers) + .await + .map_err(|e| anyhow!("failed to fetch serverless runner metadata: {:?}", e))?; + + if !metadata.actor_names.is_empty() { + tracing::debug!( + actor_names_count = metadata.actor_names.len(), + "storing actor names metadata" + ); + // Convert actor names to the format needed for database operations + let actor_names: Vec<(String, serde_json::Map)> = metadata + .actor_names + .into_iter() + .map(|(name, value)| { + if let serde_json::Value::Object(map) = value { + Ok((name, map)) + } else { + Err(anyhow!( + "actor name '{}' metadata must be an object, got: {:?}", + name, + value + )) + } + }) + .collect::>>()?; + + // Store actor names metadata with the runner config + ctx.udb()? + .run(|tx| { + let actor_names = actor_names.clone(); + async move { + let tx = tx.with_subspace(pegboard::keys::subspace()); + + // Write actor names + for (name, metadata) in actor_names { + let name_key = + pegboard::keys::ns::ActorNameKey::new(namespace_id, name.clone()); + + tx.write( + &name_key, + rivet_data::converted::ActorNameKeyData { metadata }, + )?; + } + + Ok(()) + } + }) + .custom_instrument(tracing::info_span!("runner_config_populate_actor_names_tx")) + .await?; + + tracing::debug!("successfully stored actor names metadata"); + } else { + tracing::debug!("no actor names to store"); + } + + Ok(()) +} diff --git a/packages/services/namespace/src/keys/runner_config.rs b/packages/services/namespace/src/keys/runner_config.rs index e0f5533ba4..1d232805e6 100644 --- a/packages/services/namespace/src/keys/runner_config.rs +++ b/packages/services/namespace/src/keys/runner_config.rs @@ -1,9 +1,16 @@ use anyhow::Result; use gas::prelude::*; use rivet_types::keys::namespace::runner_config::RunnerConfigVariant; +use serde::{Deserialize, Serialize}; use universaldb::prelude::*; use vbare::OwnedVersionedData; +#[derive(Debug, Serialize, Deserialize)] +pub struct ActorNamesMetadata { + pub actor_names: Vec<(String, serde_json::Map)>, + pub fetched_at: i64, +} + #[derive(Debug)] pub struct DataKey { pub namespace_id: Id,