diff --git a/out/openapi.json b/out/openapi.json index d50e3558e0..116ccaa826 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -1192,7 +1192,31 @@ "additionalProperties": false }, "RunnerConfig": { + "allOf": [ + { + "$ref": "#/components/schemas/RunnerConfigKind" + }, + { + "type": "object", + "properties": { + "metadata": {} + } + } + ] + }, + "RunnerConfigKind": { "oneOf": [ + { + "type": "object", + "required": [ + "normal" + ], + "properties": { + "normal": { + "type": "object" + } + } + }, { "type": "object", "required": [ @@ -1203,16 +1227,16 @@ "type": "object", "required": [ "url", - "headers", "request_lifespan", "slots_per_runner", - "min_runners", - "max_runners", - "runners_margin" + "max_runners" ], "properties": { "headers": { - "type": "object", + "type": [ + "object", + "null" + ], "additionalProperties": { "type": "string" }, @@ -1226,7 +1250,10 @@ "minimum": 0 }, "min_runners": { - "type": "integer", + "type": [ + "integer", + "null" + ], "format": "int32", "minimum": 0 }, @@ -1237,7 +1264,10 @@ "minimum": 0 }, "runners_margin": { - "type": "integer", + "type": [ + "integer", + "null" + ], "format": "int32", "minimum": 0 }, @@ -1258,7 +1288,8 @@ "RunnerConfigVariant": { "type": "string", "enum": [ - "serverless" + "serverless", + "normal" ] }, "RunnerConfigsDeleteResponse": { @@ -1466,71 +1497,14 @@ "RunnerConfigsUpsertRequestBody": { "type": "object", "additionalProperties": { - "oneOf": [ + "allOf": [ + { + "$ref": "#/components/schemas/RunnerConfigKind" + }, { "type": "object", - "required": [ - "serverless" - ], "properties": { - "serverless": { - "type": "object", - "required": [ - "url", - "request_lifespan", - "slots_per_runner", - "max_runners" - ], - "properties": { - "headers": { - "type": [ - "object", - "null" - ], - "additionalProperties": { - "type": "string" - }, - "propertyNames": { - "type": "string" - } - }, - "max_runners": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "min_runners": { - "type": [ - "integer", - "null" - ], - "format": "int32", - "minimum": 0 - }, - "request_lifespan": { - "type": "integer", - "format": "int32", - "description": "Seconds.", - "minimum": 0 - }, - "runners_margin": { - "type": [ - "integer", - "null" - ], - "format": "int32", - "minimum": 0 - }, - "slots_per_runner": { - "type": "integer", - "format": "int32", - "minimum": 0 - }, - "url": { - "type": "string" - } - } - } + "metadata": {} } } ] diff --git a/packages/common/api-types/src/namespaces/runner_configs.rs b/packages/common/api-types/src/namespaces/runner_configs.rs index 040587cea8..ffe64c1ed1 100644 --- a/packages/common/api-types/src/namespaces/runner_configs.rs +++ b/packages/common/api-types/src/namespaces/runner_configs.rs @@ -3,9 +3,18 @@ use std::collections::HashMap; use gas::prelude::*; use utoipa::ToSchema; +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct RunnerConfig { + #[serde(flatten)] + pub kind: RunnerConfigKind, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "snake_case")] -pub enum RunnerConfig { +pub enum RunnerConfigKind { + Normal {}, Serverless { url: String, headers: Option>, @@ -20,8 +29,10 @@ pub enum RunnerConfig { impl Into for RunnerConfig { fn into(self) -> rivet_types::runner_configs::RunnerConfig { - match self { - RunnerConfig::Serverless { + let RunnerConfig { kind, metadata } = self; + let kind = match kind { + RunnerConfigKind::Normal {} => rivet_types::runner_configs::RunnerConfigKind::Normal {}, + RunnerConfigKind::Serverless { url, headers, request_lifespan, @@ -29,7 +40,7 @@ impl Into for RunnerConfig { min_runners, max_runners, runners_margin, - } => rivet_types::runner_configs::RunnerConfig::Serverless { + } => rivet_types::runner_configs::RunnerConfigKind::Serverless { url, headers: headers.unwrap_or_default(), request_lifespan, @@ -38,6 +49,7 @@ impl Into for RunnerConfig { max_runners, runners_margin: runners_margin.unwrap_or_default(), }, - } + }; + rivet_types::runner_configs::RunnerConfig { kind, metadata } } } diff --git a/packages/common/types/src/keys/namespace/runner_config.rs b/packages/common/types/src/keys/namespace/runner_config.rs index 7dc326fb8b..b326177dac 100644 --- a/packages/common/types/src/keys/namespace/runner_config.rs +++ b/packages/common/types/src/keys/namespace/runner_config.rs @@ -5,12 +5,14 @@ use utoipa::ToSchema; #[serde(rename_all = "snake_case")] pub enum RunnerConfigVariant { Serverless = 0, + Normal = 1, } impl RunnerConfigVariant { pub fn parse(v: &str) -> Option { match v { "serverless" => Some(RunnerConfigVariant::Serverless), + "normal" => Some(RunnerConfigVariant::Normal), _ => None, } } @@ -20,6 +22,7 @@ impl std::fmt::Display for RunnerConfigVariant { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { RunnerConfigVariant::Serverless => write!(f, "serverless"), + RunnerConfigVariant::Normal => write!(f, "normal"), } } } diff --git a/packages/common/types/src/runner_configs.rs b/packages/common/types/src/runner_configs.rs index 4cd1be9d99..3cb4dd18f5 100644 --- a/packages/common/types/src/runner_configs.rs +++ b/packages/common/types/src/runner_configs.rs @@ -3,9 +3,18 @@ use std::collections::HashMap; use gas::prelude::*; use utoipa::ToSchema; +#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] +pub struct RunnerConfig { + #[serde(flatten)] + pub kind: RunnerConfigKind, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub metadata: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "snake_case")] -pub enum RunnerConfig { +pub enum RunnerConfigKind { + Normal {}, Serverless { url: String, headers: HashMap, @@ -18,37 +27,54 @@ pub enum RunnerConfig { }, } -impl From for rivet_data::generated::namespace_runner_config_v1::Data { +impl From for rivet_data::generated::namespace_runner_config_v2::RunnerConfig { fn from(value: RunnerConfig) -> Self { - match value { - RunnerConfig::Serverless { - url, - headers, - request_lifespan, - slots_per_runner, - min_runners, - max_runners, - runners_margin, - } => rivet_data::generated::namespace_runner_config_v1::Data::Serverless( - rivet_data::generated::namespace_runner_config_v1::Serverless { + let RunnerConfig { kind, metadata } = value; + rivet_data::generated::namespace_runner_config_v2::RunnerConfig { + metadata: metadata.and_then(|value| serde_json::to_string(&value).ok()), + kind: match kind { + RunnerConfigKind::Normal {} => { + rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal + } + RunnerConfigKind::Serverless { url, - headers: headers.into(), + headers, request_lifespan, slots_per_runner, min_runners, max_runners, runners_margin, - }, - ), + } => { + rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless( + rivet_data::generated::namespace_runner_config_v2::Serverless { + url, + headers: headers.into(), + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + }, + ) + } + }, } } } -impl From for RunnerConfig { - fn from(value: rivet_data::generated::namespace_runner_config_v1::Data) -> Self { - match value { - rivet_data::generated::namespace_runner_config_v1::Data::Serverless(o) => { - RunnerConfig::Serverless { +impl From for RunnerConfig { + fn from(value: rivet_data::generated::namespace_runner_config_v2::RunnerConfig) -> Self { + let rivet_data::generated::namespace_runner_config_v2::RunnerConfig { metadata, kind } = + value; + RunnerConfig { + metadata: metadata.and_then(|raw| serde_json::from_str(&raw).ok()), + kind: match kind { + rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Normal => { + RunnerConfigKind::Normal {} + } + rivet_data::generated::namespace_runner_config_v2::RunnerConfigKind::Serverless( + o, + ) => RunnerConfigKind::Serverless { url: o.url, headers: o.headers.into(), request_lifespan: o.request_lifespan, @@ -56,8 +82,15 @@ impl From for RunnerCon min_runners: o.min_runners, max_runners: o.max_runners, runners_margin: o.runners_margin, - } - } + }, + }, } } } + +impl RunnerConfig { + /// If updates to this run config affects the autoscaler. + pub fn affects_autoscaler(&self) -> bool { + matches!(self.kind, RunnerConfigKind::Serverless { .. }) + } +} diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 3dedfcb079..24db9825a6 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -15,7 +15,7 @@ use pegboard::keys; use reqwest::header::{HeaderName, HeaderValue}; use reqwest_eventsource as sse; use rivet_runner_protocol as protocol; -use rivet_types::runner_configs::RunnerConfig; +use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind}; use tokio::{sync::oneshot, task::JoinHandle, time::Duration}; use universaldb::options::StreamingMode; use universaldb::utils::IsolationLevel::*; @@ -119,7 +119,7 @@ async fn tick( let namespace = namespace.first().context("runner namespace not found")?; let namespace_name = &namespace.name; - let RunnerConfig::Serverless { + let RunnerConfigKind::Serverless { url, headers, request_lifespan, @@ -127,7 +127,7 @@ async fn tick( min_runners, max_runners, runners_margin, - } = &runner_config.config + } = &runner_config.config.kind else { tracing::warn!( ?ns_id, diff --git a/packages/services/namespace/src/keys/runner_config.rs b/packages/services/namespace/src/keys/runner_config.rs index 8bad0a799b..e0f5533ba4 100644 --- a/packages/services/namespace/src/keys/runner_config.rs +++ b/packages/services/namespace/src/keys/runner_config.rs @@ -32,9 +32,7 @@ impl FormalKey for DataKey { fn serialize(&self, value: Self::Value) -> Result> { rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) - .serialize_with_embedded_version( - rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, - ) + .serialize_with_embedded_version(rivet_data::PEGBOARD_NAMESPACE_RUNNER_CONFIG_VERSION) } } @@ -125,9 +123,7 @@ impl FormalKey for ByVariantKey { fn serialize(&self, value: Self::Value) -> Result> { rivet_data::versioned::NamespaceRunnerConfig::latest(value.into()) - .serialize_with_embedded_version( - rivet_data::PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION, - ) + .serialize_with_embedded_version(rivet_data::PEGBOARD_NAMESPACE_RUNNER_CONFIG_VERSION) } } diff --git a/packages/services/namespace/src/ops/runner_config/delete.rs b/packages/services/namespace/src/ops/runner_config/delete.rs index 517741d0da..b5f7e4597d 100644 --- a/packages/services/namespace/src/ops/runner_config/delete.rs +++ b/packages/services/namespace/src/ops/runner_config/delete.rs @@ -11,7 +11,8 @@ pub struct Input { #[operation] pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) -> Result<()> { - ctx.udb()? + let bump_autoscaler = ctx + .udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); @@ -19,26 +20,34 @@ pub async fn namespace_runner_config_delete(ctx: &OperationCtx, input: &Input) - let runner_config_key = keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); - if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? { - tx.delete(&runner_config_key); + let bump_autoscaler = + if let Some(config) = tx.read_opt(&runner_config_key, Serializable).await? { + tx.delete(&runner_config_key); - // Clear secondary idx - tx.delete(&keys::runner_config::ByVariantKey::new( - input.namespace_id, - runner_config_variant(&config), - input.name.clone(), - )); - } + // Clear secondary idx + let variant = runner_config_variant(&config); + tx.delete(&keys::runner_config::ByVariantKey::new( + input.namespace_id, + variant, + input.name.clone(), + )); - Ok(()) + config.affects_autoscaler() + } else { + false + }; + + Ok(bump_autoscaler) }) .custom_instrument(tracing::info_span!("runner_config_delete_tx")) .await?; - // Bump autoscaler - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; + // Bump autoscaler when a serverless config is modified + if bump_autoscaler { + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + } Ok(()) } diff --git a/packages/services/namespace/src/ops/runner_config/upsert.rs b/packages/services/namespace/src/ops/runner_config/upsert.rs index 9f801b2fbf..5bf9f805e1 100644 --- a/packages/services/namespace/src/ops/runner_config/upsert.rs +++ b/packages/services/namespace/src/ops/runner_config/upsert.rs @@ -1,6 +1,6 @@ use gas::prelude::*; -use rivet_types::runner_configs::RunnerConfig; -use universaldb::options::MutationType; +use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind}; +use universaldb::{options::MutationType, utils::IsolationLevel::*}; use crate::{errors, keys, utils::runner_config_variant}; @@ -17,13 +17,21 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); - // TODO: Once other types of configs get added, delete previous config before writing - tx.write( - &keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()), - input.config.clone(), - )?; + let runner_config_key = + keys::runner_config::DataKey::new(input.namespace_id, input.name.clone()); + + // Delete previous config + if let Some(existing_config) = tx.read_opt(&runner_config_key, Serializable).await? { + tx.delete(&runner_config_key); + tx.delete(&keys::runner_config::ByVariantKey::new( + input.namespace_id, + runner_config_variant(&existing_config), + input.name.clone(), + )); + } - // Write to secondary idx + // Write new config + tx.write(&runner_config_key, input.config.clone())?; tx.write( &keys::runner_config::ByVariantKey::new( input.namespace_id, @@ -33,8 +41,9 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - input.config.clone(), )?; - match &input.config { - RunnerConfig::Serverless { + match &input.config.kind { + RunnerConfigKind::Normal { .. } => {} + RunnerConfigKind::Serverless { url, headers, slots_per_runner, @@ -103,9 +112,11 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - .map_err(|err| err.build())?; // Bump autoscaler - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; + if input.config.affects_autoscaler() { + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + } Ok(()) } diff --git a/packages/services/namespace/src/utils.rs b/packages/services/namespace/src/utils.rs index 03defad796..65e91399b3 100644 --- a/packages/services/namespace/src/utils.rs +++ b/packages/services/namespace/src/utils.rs @@ -1,9 +1,11 @@ use rivet_types::{ - keys::namespace::runner_config::RunnerConfigVariant, runner_configs::RunnerConfig, + keys::namespace::runner_config::RunnerConfigVariant, + runner_configs::{RunnerConfig, RunnerConfigKind}, }; pub fn runner_config_variant(runner_config: &RunnerConfig) -> RunnerConfigVariant { - match runner_config { - RunnerConfig::Serverless { .. } => RunnerConfigVariant::Serverless, + match runner_config.kind { + RunnerConfigKind::Normal { .. } => RunnerConfigVariant::Normal, + RunnerConfigKind::Serverless { .. } => RunnerConfigVariant::Serverless, } } diff --git a/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs b/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs index 4ea34d31cc..9bc8f97b22 100644 --- a/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs +++ b/packages/services/pegboard/src/ops/runner/find_dc_with_runner.rs @@ -5,7 +5,7 @@ use futures_util::{FutureExt, StreamExt, TryFutureExt, stream::FuturesUnordered} use gas::prelude::*; use rivet_api_types::{runner_configs::list as runner_configs_list, runners::list as runners_list}; use rivet_api_util::{HeaderMap, Method, request_remote_datacenter}; -use rivet_types::runner_configs::RunnerConfig; +use rivet_types::runner_configs::{RunnerConfig, RunnerConfigKind}; use serde::de::DeserializeOwned; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -79,12 +79,13 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< }) .await?; if let Some(runner) = res.first() { - match &runner.config { - RunnerConfig::Serverless { max_runners, .. } => { + match &runner.config.kind { + RunnerConfigKind::Serverless { max_runners, .. } => { if *max_runners != 0 { return Ok(Some(ctx.config().dc_label())); } } + _ => {} } } @@ -142,10 +143,9 @@ async fn find_dc_with_runner_inner(ctx: &OperationCtx, input: &Input) -> Result< |res| { res.runner_configs .iter() - .filter(|(_, rc)| match rc { - rivet_types::runner_configs::RunnerConfig::Serverless { - max_runners, .. - } => *max_runners != 0, + .filter(|(_, rc)| match rc.kind { + RunnerConfigKind::Serverless { max_runners, .. } => max_runners != 0, + _ => false, }) .count() != 0 }, diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index 6d16343aaa..30bab87861 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -6,8 +6,9 @@ use gas::prelude::*; use rivet_metrics::KeyValue; use rivet_runner_protocol as protocol; use rivet_types::{ - actors::CrashPolicy, keys::namespace::runner_config::RunnerConfigVariant, - runner_configs::RunnerConfig, + actors::CrashPolicy, + keys::namespace::runner_config::RunnerConfigVariant, + runner_configs::{RunnerConfig, RunnerConfigKind}, }; use std::time::Instant; use universaldb::options::{ConflictRangeType, MutationType, StreamingMode}; @@ -145,8 +146,9 @@ async fn allocate_actor( .await?; let has_valid_serverless = runner_config_res .first() - .map(|runner| match &runner.config { - RunnerConfig::Serverless { max_runners, .. } => *max_runners != 0, + .map(|runner| match &runner.config.kind { + RunnerConfigKind::Serverless { max_runners, .. } => *max_runners != 0, + _ => false, }) .unwrap_or_default(); diff --git a/scripts/api/add-serverless.ts b/scripts/api/add-serverless.ts deleted file mode 100755 index 38f16400dd..0000000000 --- a/scripts/api/add-serverless.ts +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env tsx - -import * as readline from "readline/promises"; - -const rl = readline.createInterface({ - input: process.stdin, - output: process.stdout, -}); - -const rivetToken = process.env.RIVET_TOKEN; -if (!rivetToken) { - console.error("Error: RIVET_TOKEN environment variable is not set"); - process.exit(1); -} - -const endpoint = - process.env.RIVET_ENDPOINT || - (await rl.question("Rivet Endpoint (default: http://localhost:6420): ")) || - "http://localhost:6420"; -const namespace = - (await rl.question("Namespace (default: default): ")) || "default"; -const runnerName = - (await rl.question("Runner name (default: serverless): ")) || "serverless"; -const serverlessUrl = - (await rl.question( - "Serverless URL (default: http://localhost:3000/api/rivet/start): ", - )) || "http://localhost:3000/api/rivet/start"; - -rl.close(); - -const response = await fetch( - `${endpoint}/runner-configs/${runnerName}?namespace=${namespace}`, - { - method: "PUT", - headers: { - Authorization: `Bearer ${rivetToken}`, - "Content-Type": "application/json", - }, - body: JSON.stringify({ - default: { - serverless: { - url: serverlessUrl, - headers: {}, - runners_margin: 1, - min_runners: 1, - max_runners: 3, - slots_per_runner: 100, - request_lifespan: 15 * 60, - }, - } - }), - }, -); - -if (!response.ok) { - console.error(`Error: ${response.status} ${response.statusText}`); - console.error(await response.text()); - process.exit(1); -} - -console.log("✅ Successfully configured serverless runner!"); - diff --git a/scripts/api/delete-run-config.ts b/scripts/api/runner-configs/delete.ts similarity index 89% rename from scripts/api/delete-run-config.ts rename to scripts/api/runner-configs/delete.ts index 3a53c4f109..dd59ddc4fa 100755 --- a/scripts/api/delete-run-config.ts +++ b/scripts/api/runner-configs/delete.ts @@ -7,11 +7,10 @@ const rl = readline.createInterface({ output: process.stdout, }); -const rivetToken = process.env.RIVET_TOKEN; -if (!rivetToken) { - console.error("Error: RIVET_TOKEN environment variable is not set"); - process.exit(1); -} +const rivetToken = + process.env.RIVET_TOKEN || + (await rl.question("Rivet Token (default: dev): ")).trim() || + "dev"; const endpoint = process.env.RIVET_ENDPOINT || diff --git a/scripts/api/list-run-config.ts b/scripts/api/runner-configs/list.ts similarity index 84% rename from scripts/api/list-run-config.ts rename to scripts/api/runner-configs/list.ts index 27f3c147f7..b2e9c0a603 100755 --- a/scripts/api/list-run-config.ts +++ b/scripts/api/runner-configs/list.ts @@ -7,11 +7,10 @@ const rl = readline.createInterface({ output: process.stdout, }); -const rivetToken = process.env.RIVET_TOKEN; -if (!rivetToken) { - console.error("Error: RIVET_TOKEN environment variable is not set"); - process.exit(1); -} +const rivetToken = + process.env.RIVET_TOKEN || + (await rl.question("Rivet Token (default: dev): ")).trim() || + "dev"; const endpoint = process.env.RIVET_ENDPOINT || diff --git a/scripts/api/runner-configs/upsert.ts b/scripts/api/runner-configs/upsert.ts new file mode 100755 index 0000000000..1b6a90da67 --- /dev/null +++ b/scripts/api/runner-configs/upsert.ts @@ -0,0 +1,147 @@ +#!/usr/bin/env tsx + +import * as readline from "readline/promises"; + +const rl = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +async function ask(question: string, options: { default?: string; allowEmpty?: boolean } = {}) { + const suffix = options.default !== undefined && options.default !== "" + ? ` (default: ${options.default})` + : ""; + const answer = (await rl.question(`${question}${suffix}: `)).trim(); + if (answer === "" && options.default !== undefined) { + return options.default; + } + if (answer === "" && options.allowEmpty) { + return ""; + } + return answer; +} + +async function askNumber(question: string, defaultValue: number) { + const answer = (await rl.question(`${question} (default: ${defaultValue}): `)).trim(); + if (answer === "") { + return defaultValue; + } + const value = Number(answer); + if (!Number.isFinite(value) || Number.isNaN(value)) { + console.error(`Error: ${question} must be a number`); + rl.close(); + process.exit(1); + } + return value; +} + +function parseJson(input: string, context: string) { + try { + return JSON.parse(input); + } catch (error) { + console.error(`Error: unable to parse ${context} JSON`); + rl.close(); + process.exit(1); + } +} + +function ensureStringRecord(value: unknown, context: string): Record { + if (value === null || typeof value !== "object" || Array.isArray(value)) { + console.error(`Error: ${context} must be a JSON object with string values`); + rl.close(); + process.exit(1); + } + const record: Record = {}; + for (const [key, val] of Object.entries(value)) { + if (typeof val !== "string") { + console.error(`Error: ${context} value for key "${key}" must be a string`); + rl.close(); + process.exit(1); + } + record[key] = val; + } + return record; +} + +const rivetToken = + process.env.RIVET_TOKEN || + (await ask("Rivet token", { default: "dev" })); + +const endpoint = + process.env.RIVET_ENDPOINT || + (await ask("Rivet endpoint", { default: "http://localhost:6420" })); +const namespace = await ask("Namespace", { default: "default" }); +const datacenter = await ask("Datacenter", { default: "default" }); +const runnerName = await ask("Runner name", { default: "serverless" }); +const runnerType = (await ask("Runner config type (normal/serverless)", { + default: "serverless", +})).toLowerCase(); + +if (runnerType !== "normal" && runnerType !== "serverless") { + console.error("Error: runner config type must be either 'normal' or 'serverless'"); + rl.close(); + process.exit(1); +} + +const metadataInput = await ask("Metadata JSON (optional)", { allowEmpty: true }); +let metadata: unknown; +if (metadataInput) { + metadata = parseJson(metadataInput, "metadata"); +} + +let dcRunnerConfig: Record; + +if (runnerType === "normal") { + dcRunnerConfig = { + normal: {}, + ...(metadata !== undefined ? { metadata } : {}), + }; +} else { + const serverlessUrl = await ask("Serverless URL", { + default: "http://localhost:3000/api/rivet/start", + }); + const headersInput = await ask("Serverless headers JSON", { default: "{}" }); + const headers = ensureStringRecord(parseJson(headersInput, "headers"), "headers"); + const requestLifespan = await askNumber("Request lifespan (seconds)", 15 * 60); + const slotsPerRunner = await askNumber("Slots per runner", 100); + const minRunners = await askNumber("Min runners", 1); + const maxRunners = await askNumber("Max runners", 3); + const runnersMargin = await askNumber("Runners margin", 1); + + dcRunnerConfig = { + serverless: { + url: serverlessUrl, + headers, + request_lifespan: requestLifespan, + slots_per_runner: slotsPerRunner, + min_runners: minRunners, + max_runners: maxRunners, + runners_margin: runnersMargin, + }, + ...(metadata !== undefined ? { metadata } : {}), + }; +} + +rl.close(); + +const response = await fetch( + `${endpoint}/runner-configs/${runnerName}?namespace=${namespace}`, + { + method: "PUT", + headers: { + Authorization: `Bearer ${rivetToken}`, + "Content-Type": "application/json", + }, + body: JSON.stringify({ + [datacenter]: dcRunnerConfig, + }), + }, +); + +if (!response.ok) { + console.error(`Error: ${response.status} ${response.statusText}`); + console.error(await response.text()); + process.exit(1); +} + +console.log("✅ Successfully upserted runner config!"); diff --git a/scripts/api/list-runners.ts b/scripts/api/runners/list.ts similarity index 80% rename from scripts/api/list-runners.ts rename to scripts/api/runners/list.ts index 129ddb43c9..c1a7604161 100755 --- a/scripts/api/list-runners.ts +++ b/scripts/api/runners/list.ts @@ -7,11 +7,10 @@ const rl = readline.createInterface({ output: process.stdout, }); -const rivetToken = process.env.RIVET_TOKEN; -if (!rivetToken) { - console.error("Error: RIVET_TOKEN environment variable is not set"); - process.exit(1); -} +const rivetToken = + process.env.RIVET_TOKEN || + (await rl.question("Rivet Token (default: dev): ")).trim() || + "dev"; const endpoint = process.env.RIVET_ENDPOINT || @@ -41,4 +40,4 @@ if (!response.ok) { const data = await response.json(); // Just show the raw formatted JSON -console.log(JSON.stringify(data, null, 2)); \ No newline at end of file +console.log(JSON.stringify(data, null, 2)); diff --git a/sdks/rust/data/src/lib.rs b/sdks/rust/data/src/lib.rs index a93ceaea1d..becd383e30 100644 --- a/sdks/rust/data/src/lib.rs +++ b/sdks/rust/data/src/lib.rs @@ -6,5 +6,6 @@ pub const PEGBOARD_RUNNER_ADDRESS_VERSION: u16 = 1; pub const PEGBOARD_RUNNER_METADATA_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_BY_KEY_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION: u16 = 1; +pub const PEGBOARD_NAMESPACE_RUNNER_CONFIG_VERSION: u16 = 2; pub const PEGBOARD_NAMESPACE_RUNNER_BY_KEY_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_NAME_VERSION: u16 = 1; diff --git a/sdks/rust/data/src/versioned.rs b/sdks/rust/data/src/versioned/mod.rs similarity index 83% rename from sdks/rust/data/src/versioned.rs rename to sdks/rust/data/src/versioned/mod.rs index 35d827211a..ca5b6ced97 100644 --- a/sdks/rust/data/src/versioned.rs +++ b/sdks/rust/data/src/versioned/mod.rs @@ -3,6 +3,10 @@ use vbare::OwnedVersionedData; use crate::generated::*; +mod namespace_runner_config; + +pub use namespace_runner_config::*; + pub enum RunnerAllocIdxKeyData { V1(pegboard_namespace_runner_alloc_idx_v1::Data), } @@ -172,37 +176,3 @@ impl OwnedVersionedData for ActorNameKeyData { } } } - -pub enum NamespaceRunnerConfig { - V1(namespace_runner_config_v1::Data), -} - -impl OwnedVersionedData for NamespaceRunnerConfig { - type Latest = namespace_runner_config_v1::Data; - - fn latest(latest: namespace_runner_config_v1::Data) -> Self { - NamespaceRunnerConfig::V1(latest) - } - - fn into_latest(self) -> Result { - #[allow(irrefutable_let_patterns)] - if let NamespaceRunnerConfig::V1(data) = self { - Ok(data) - } else { - bail!("version not latest"); - } - } - - fn deserialize_version(payload: &[u8], version: u16) -> Result { - match version { - 1 => Ok(NamespaceRunnerConfig::V1(serde_bare::from_slice(payload)?)), - _ => bail!("invalid version: {version}"), - } - } - - fn serialize_version(self, _version: u16) -> Result> { - match self { - NamespaceRunnerConfig::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), - } - } -} diff --git a/sdks/rust/data/src/versioned/namespace_runner_config.rs b/sdks/rust/data/src/versioned/namespace_runner_config.rs new file mode 100644 index 0000000000..a7e104655f --- /dev/null +++ b/sdks/rust/data/src/versioned/namespace_runner_config.rs @@ -0,0 +1,129 @@ +use anyhow::{Ok, Result, bail}; +use vbare::OwnedVersionedData; + +use crate::generated::*; + +pub enum NamespaceRunnerConfig { + V1(namespace_runner_config_v1::Data), + V2(namespace_runner_config_v2::RunnerConfig), +} + +impl OwnedVersionedData for NamespaceRunnerConfig { + type Latest = namespace_runner_config_v2::RunnerConfig; + + fn latest(latest: namespace_runner_config_v2::RunnerConfig) -> Self { + NamespaceRunnerConfig::V2(latest) + } + + fn into_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let NamespaceRunnerConfig::V2(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 1 => Ok(NamespaceRunnerConfig::V1(serde_bare::from_slice(payload)?)), + 2 => Ok(NamespaceRunnerConfig::V2(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + NamespaceRunnerConfig::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), + NamespaceRunnerConfig::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } + + fn deserialize_converters() -> Vec Result> { + vec![Self::v1_to_v2] + } + + fn serialize_converters() -> Vec Result> { + vec![Self::v2_to_v1] + } +} + +impl NamespaceRunnerConfig { + fn v1_to_v2(self) -> Result { + match self { + NamespaceRunnerConfig::V1(namespace_runner_config_v1::Data::Serverless(serverless)) => { + let namespace_runner_config_v1::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = serverless; + + Ok(NamespaceRunnerConfig::V2( + namespace_runner_config_v2::RunnerConfig { + metadata: None, + kind: namespace_runner_config_v2::RunnerConfigKind::Serverless( + namespace_runner_config_v2::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + }, + ), + }, + )) + } + value @ NamespaceRunnerConfig::V2(_) => Ok(value), + } + } + + fn v2_to_v1(self) -> Result { + match self { + NamespaceRunnerConfig::V1(_) => Ok(self), + NamespaceRunnerConfig::V2(config) => { + let namespace_runner_config_v2::RunnerConfig { metadata, kind } = config; + + if metadata.is_some() { + bail!("namespace runner config v1 does not support metadata"); + } + + match kind { + namespace_runner_config_v2::RunnerConfigKind::Serverless(serverless) => { + let namespace_runner_config_v2::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + } = serverless; + + Ok(NamespaceRunnerConfig::V1( + namespace_runner_config_v1::Data::Serverless( + namespace_runner_config_v1::Serverless { + url, + headers, + request_lifespan, + slots_per_runner, + min_runners, + max_runners, + runners_margin, + }, + ), + )) + } + namespace_runner_config_v2::RunnerConfigKind::Normal => { + bail!("namespace runner config v1 does not support normal runner config") + } + } + } + } + } +} diff --git a/sdks/schemas/data/namespace.runner_config.v2.bare b/sdks/schemas/data/namespace.runner_config.v2.bare new file mode 100644 index 0000000000..9faaab2bf2 --- /dev/null +++ b/sdks/schemas/data/namespace.runner_config.v2.bare @@ -0,0 +1,23 @@ +type Json str + +type Serverless struct { + url: str + headers: map + request_lifespan: u32 + slots_per_runner: u32 + min_runners: u32 + max_runners: u32 + runners_margin: u32 +} + +type Normal void + +type RunnerConfigKind union { + Serverless | + Normal +} + +type RunnerConfig struct { + kind: RunnerConfigKind + metadata: optional +}