diff --git a/Cargo.lock b/Cargo.lock index bafa6976f1..752a259544 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3360,6 +3360,7 @@ dependencies = [ "rivet-config", "rivet-runner-protocol", "rivet-types", + "rivet-util", "tracing", "universaldb", "vbare", diff --git a/out/openapi.json b/out/openapi.json index b92675ce3e..e56645a2ad 100644 --- a/out/openapi.json +++ b/out/openapi.json @@ -11,7 +11,7 @@ "name": "Apache-2.0", "identifier": "Apache-2.0" }, - "version": "25.7.2" + "version": "25.7.3" }, "paths": { "/actors": { diff --git a/packages/common/types/src/keys/pegboard/ns.rs b/packages/common/types/src/keys/pegboard/ns.rs index f513eae2fc..ef1fe086d6 100644 --- a/packages/common/types/src/keys/pegboard/ns.rs +++ b/packages/common/types/src/keys/pegboard/ns.rs @@ -29,11 +29,11 @@ impl ServerlessDesiredSlotsKey { impl FormalKey for ServerlessDesiredSlotsKey { /// Count. - type Value = u32; + type Value = i64; fn deserialize(&self, raw: &[u8]) -> Result { // NOTE: Atomic ops use little endian - Ok(u32::from_le_bytes(raw.try_into()?)) + Ok(i64::from_le_bytes(raw.try_into()?)) } fn serialize(&self, value: Self::Value) -> Result> { diff --git a/packages/common/util/core/src/math.rs b/packages/common/util/core/src/math.rs index a25e97ef0c..0bb1f7d365 100644 --- a/packages/common/util/core/src/math.rs +++ b/packages/common/util/core/src/math.rs @@ -6,3 +6,30 @@ macro_rules! div_up { ($a + ($b - 1)) / $b }; } + +/// Performs ceiling division for i64 values. +/// +/// Returns the smallest integer greater than or equal to `a / b`. +/// +/// # Examples +/// ``` +/// assert_eq!(div_ceil_i64(10, 3), 4); // 10/3 = 3.33.. -> 4 +/// assert_eq!(div_ceil_i64(9, 3), 3); // 9/3 = 3 -> 3 +/// assert_eq!(div_ceil_i64(-10, 3), -3); // -10/3 = -3.33.. -> -3 +/// ``` +/// +/// # Panics +/// Panics if `b` is zero. +pub fn div_ceil_i64(a: i64, b: i64) -> i64 { + if b == 0 { + panic!("attempt to divide by zero"); + } + + if a == 0 || (a > 0 && b > 0) || (a < 0 && b < 0) { + // Standard ceiling division when signs match or a is zero + (a + b - 1) / b + } else { + // When signs differ, regular division gives the ceiling + a / b + } +} diff --git a/packages/core/pegboard-serverless/Cargo.toml b/packages/core/pegboard-serverless/Cargo.toml index 6ec4861e0c..ad985a1b4c 100644 --- a/packages/core/pegboard-serverless/Cargo.toml +++ b/packages/core/pegboard-serverless/Cargo.toml @@ -15,6 +15,7 @@ reqwest.workspace = true rivet-config.workspace = true rivet-runner-protocol.workspace = true rivet-types.workspace = true +rivet-util.workspace = true tracing.workspace = true universaldb.workspace = true vbare.workspace = true diff --git a/packages/core/pegboard-serverless/src/lib.rs b/packages/core/pegboard-serverless/src/lib.rs index 57a780c9eb..4e6ef41c05 100644 --- a/packages/core/pegboard-serverless/src/lib.rs +++ b/packages/core/pegboard-serverless/src/lib.rs @@ -142,10 +142,25 @@ async fn tick( // Remove finished and draining connections from list curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst)); - let desired_count = (desired_slots.div_ceil(*slots_per_runner).max(*min_runners) - + *runners_margin) - .min(*max_runners) - .try_into()?; + // Log warning and reset to 0 if negative + let adjusted_desired_slots = if *desired_slots < 0 { + tracing::warn!( + ?ns_id, + ?runner_name, + desired_slots = ?desired_slots, + "Negative desired_slots detected, resetting to 0" + ); + 0 + } else { + *desired_slots + }; + + let desired_count = + (rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64) + .max(*min_runners as i64) + + *runners_margin as i64) + .min(*max_runners as i64) + .try_into()?; // Calculate diff let drain_count = curr.len().saturating_sub(desired_count); diff --git a/packages/services/namespace/src/ops/runner_config/upsert.rs b/packages/services/namespace/src/ops/runner_config/upsert.rs index 416796fc0e..0d2cb0b798 100644 --- a/packages/services/namespace/src/ops/runner_config/upsert.rs +++ b/packages/services/namespace/src/ops/runner_config/upsert.rs @@ -95,7 +95,7 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) - input.namespace_id, input.name.clone(), ), - &0u32.to_le_bytes(), + &0i64.to_le_bytes(), MutationType::Add, ); } diff --git a/packages/services/pegboard/src/workflows/actor/destroy.rs b/packages/services/pegboard/src/workflows/actor/destroy.rs index a410cef164..fe85bfb517 100644 --- a/packages/services/pegboard/src/workflows/actor/destroy.rs +++ b/packages/services/pegboard/src/workflows/actor/destroy.rs @@ -240,7 +240,7 @@ pub(crate) async fn clear_slot( namespace_id, runner_name_selector.to_string(), ), - &(-1i32).to_le_bytes(), + &(-1i64).to_le_bytes(), MutationType::Add, ); } diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index f33d61b252..b9228919af 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -129,7 +129,7 @@ async fn allocate_actor( namespace_id, input.runner_name_selector.clone(), ), - &1u32.to_le_bytes(), + &1i64.to_le_bytes(), MutationType::Add, ); } @@ -367,7 +367,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<() namespace_id, runner_name_selector.clone(), ), - &(-1i32).to_le_bytes(), + &(-1i64).to_le_bytes(), MutationType::Add, ); }