Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion out/openapi.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions packages/common/types/src/keys/pegboard/ns.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ impl ServerlessDesiredSlotsKey {

impl FormalKey for ServerlessDesiredSlotsKey {
/// Count.
type Value = u32;
type Value = i64;

fn deserialize(&self, raw: &[u8]) -> Result<Self::Value> {
// NOTE: Atomic ops use little endian
Ok(u32::from_le_bytes(raw.try_into()?))
Ok(i64::from_le_bytes(raw.try_into()?))
}

fn serialize(&self, value: Self::Value) -> Result<Vec<u8>> {
Expand Down
27 changes: 27 additions & 0 deletions packages/common/util/core/src/math.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,30 @@ macro_rules! div_up {
($a + ($b - 1)) / $b
};
}

/// Performs ceiling division for i64 values.
///
/// Returns the smallest integer greater than or equal to `a / b`.
///
/// # Examples
/// ```
/// assert_eq!(div_ceil_i64(10, 3), 4); // 10/3 = 3.33.. -> 4
/// assert_eq!(div_ceil_i64(9, 3), 3); // 9/3 = 3 -> 3
/// assert_eq!(div_ceil_i64(-10, 3), -3); // -10/3 = -3.33.. -> -3
/// ```
///
/// # Panics
/// Panics if `b` is zero.
pub fn div_ceil_i64(a: i64, b: i64) -> i64 {
if b == 0 {
panic!("attempt to divide by zero");
}

if a == 0 || (a > 0 && b > 0) || (a < 0 && b < 0) {
// Standard ceiling division when signs match or a is zero
(a + b - 1) / b
} else {
// When signs differ, regular division gives the ceiling
a / b
}
}
1 change: 1 addition & 0 deletions packages/core/pegboard-serverless/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ reqwest.workspace = true
rivet-config.workspace = true
rivet-runner-protocol.workspace = true
rivet-types.workspace = true
rivet-util.workspace = true
tracing.workspace = true
universaldb.workspace = true
vbare.workspace = true
Expand Down
23 changes: 19 additions & 4 deletions packages/core/pegboard-serverless/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,25 @@ async fn tick(
// Remove finished and draining connections from list
curr.retain(|conn| !conn.handle.is_finished() && !conn.draining.load(Ordering::SeqCst));

let desired_count = (desired_slots.div_ceil(*slots_per_runner).max(*min_runners)
+ *runners_margin)
.min(*max_runners)
.try_into()?;
// Log warning and reset to 0 if negative
let adjusted_desired_slots = if *desired_slots < 0 {
tracing::warn!(
?ns_id,
?runner_name,
desired_slots = ?desired_slots,
"Negative desired_slots detected, resetting to 0"
);
0
} else {
*desired_slots
};

let desired_count =
(rivet_util::math::div_ceil_i64(adjusted_desired_slots, *slots_per_runner as i64)
.max(*min_runners as i64)
+ *runners_margin as i64)
.min(*max_runners as i64)
.try_into()?;

// Calculate diff
let drain_count = curr.len().saturating_sub(desired_count);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ pub async fn namespace_runner_config_upsert(ctx: &OperationCtx, input: &Input) -
input.namespace_id,
input.name.clone(),
),
&0u32.to_le_bytes(),
&0i64.to_le_bytes(),
MutationType::Add,
);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/services/pegboard/src/workflows/actor/destroy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ pub(crate) async fn clear_slot(
namespace_id,
runner_name_selector.to_string(),
),
&(-1i32).to_le_bytes(),
&(-1i64).to_le_bytes(),
MutationType::Add,
);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/services/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ async fn allocate_actor(
namespace_id,
input.runner_name_selector.clone(),
),
&1u32.to_le_bytes(),
&1i64.to_le_bytes(),
MutationType::Add,
);
}
Expand Down Expand Up @@ -367,7 +367,7 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result<()
namespace_id,
runner_name_selector.clone(),
),
&(-1i32).to_le_bytes(),
&(-1i64).to_le_bytes(),
MutationType::Add,
);
}
Expand Down
Loading