Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,21 @@
///
/// **Experimental**
pub reschedule_backoff_max_exponent: Option<usize>,
/// How long after last ping before considering a runner ineligible for allocation.
///
/// Unit is in milliseconds.
///
/// **Experimental**
pub runner_eligible_threshold: Option<i64>,
/// How long to wait after last ping before forcibly removing a runner from the database
/// and deleting its workflow, evicting all actors.
///
/// Note that the runner may still be running and can reconnect.
///
/// Unit is in milliseconds.
///
/// **Experimental**
pub runner_lost_threshold: Option<i64>,
}

impl Pegboard {
Expand All @@ -57,4 +72,14 @@
pub fn reschedule_backoff_max_exponent(&self) -> usize {
self.reschedule_backoff_max_exponent.unwrap_or(8)
}

pub fn runner_eligible_threshold(&self) -> i64 {
self.runner_eligible_threshold
.unwrap_or(util::duration::seconds(10))

Check failure on line 78 in engine/packages/config/src/config/pegboard.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of unresolved module or unlinked crate `util`
}

pub fn runner_lost_threshold(&self) -> i64 {
self.runner_lost_threshold
.unwrap_or(util::duration::seconds(15))

Check failure on line 83 in engine/packages/config/src/config/pegboard.rs

View workflow job for this annotation

GitHub Actions / Check

failed to resolve: use of unresolved module or unlinked crate `util`
}
}
6 changes: 4 additions & 2 deletions engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use gas::prelude::*;
use universaldb::options::ConflictRangeType;
use universaldb::utils::IsolationLevel::*;

use crate::{keys, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS};
use crate::keys;

#[derive(Debug)]
pub struct Input {
Expand Down Expand Up @@ -45,6 +45,8 @@ pub enum RunnerEligibility {

#[operation]
pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();

let notifications = ctx
.udb()?
.run(|tx| {
Expand Down Expand Up @@ -183,7 +185,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input)
)?;

if last_ping_ts.saturating_sub(old_last_ping_ts)
> RUNNER_ELIGIBLE_THRESHOLD_MS
> runner_eligible_threshold
{
notifications.push(RunnerNotification {
runner_id: runner.runner_id,
Expand Down
6 changes: 4 additions & 2 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::time::Instant;
use universaldb::options::{ConflictRangeType, MutationType, StreamingMode};
use universaldb::utils::{FormalKey, IsolationLevel::*};

use crate::{keys, metrics, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS};
use crate::{keys, metrics};

use super::{Allocate, Destroy, Input, PendingAllocation, State, destroy};

Expand Down Expand Up @@ -148,12 +148,14 @@ async fn allocate_actor(
})
.unwrap_or_default();

let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();

// NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the
// client wf
let (for_serverless, res) = ctx
.udb()?
.run(|tx| async move {
let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS;
let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold;

// Check if runner is an serverless runner
let for_serverless = tx
Expand Down
23 changes: 11 additions & 12 deletions engine/packages/pegboard/src/workflows/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,6 @@ use vbare::OwnedVersionedData;

use crate::{keys, metrics, workflows::actor::Allocate};

/// How long after last ping before considering a runner ineligible for allocation.
pub const RUNNER_ELIGIBLE_THRESHOLD_MS: i64 = util::duration::seconds(10);
/// How long to wait after last ping before forcibly removing a runner from the database and deleting its
/// workflow, evicting all actors. Note that the runner may still be running and can reconnect.
///
/// Runner ping interval is currently set to 3s.
const RUNNER_LOST_THRESHOLD_MS: i64 = util::duration::seconds(15);
/// Batch size of how many events to ack.
const EVENT_ACK_BATCH_SIZE: i64 = 500;

Expand Down Expand Up @@ -88,8 +81,10 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
let input = input.clone();

async move {
let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold();

match ctx
.listen_with_timeout::<Main>(RUNNER_LOST_THRESHOLD_MS)
.listen_with_timeout::<Main>(runner_lost_threshold)
.await?
{
Some(Main::Forward(sig)) => {
Expand Down Expand Up @@ -117,7 +112,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()>
runner_id: input.runner_id.to_string(),
last_event_idx: init_data.last_event_idx,
metadata: protocol::ProtocolMetadata {
runner_lost_threshold: RUNNER_LOST_THRESHOLD_MS,
runner_lost_threshold: runner_lost_threshold,
},
}),
})
Expand Down Expand Up @@ -403,7 +398,7 @@ async fn handle_stopping(
) -> Result<()> {
if !state.draining {
// The workflow will enter a draining state where it can still process signals if
// needed. After RUNNER_LOST_THRESHOLD_MS it will exit this loop and stop.
// needed. After the runner lost threshold it will exit this loop and stop.
state.draining = true;

// Can't parallelize these two activities, requires reading from state
Expand Down Expand Up @@ -942,6 +937,8 @@ struct CheckExpiredInput {

#[activity(CheckExpired)]
async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result<bool> {
let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold();

ctx.udb()?
.run(|tx| async move {
let tx = tx.with_subspace(keys::subspace());
Expand All @@ -954,7 +951,7 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result<b
.await?;

let now = util::timestamp::now();
let expired = last_ping_ts < now - RUNNER_LOST_THRESHOLD_MS;
let expired = last_ping_ts < now - runner_lost_threshold;

if expired {
tx.write(&keys::runner::ExpiredTsKey::new(input.runner_id), now)?;
Expand Down Expand Up @@ -989,6 +986,8 @@ pub(crate) async fn allocate_pending_actors(
ctx: &ActivityCtx,
input: &AllocatePendingActorsInput,
) -> Result<AllocatePendingActorsOutput> {
let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold();

// NOTE: This txn should closely resemble the one found in the allocate_actor activity of the actor wf
let (allocations, pending_actor_count) = ctx
.udb()?
Expand All @@ -1012,7 +1011,7 @@ pub(crate) async fn allocate_pending_actors(
Snapshot,
);
let mut pending_actor_count = 0;
let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS;
let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold;

'queue_loop: loop {
let Some(queue_entry) = queue_stream.try_next().await? else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ export async function ensureEngineProcess(
RIVET__PEGBOARD__BASE_RETRY_TIMEOUT: "100",
// Set max exponent to 1 to have a maximum of base_retry_timeout
RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT: "1",
// Reduce thresholds for faster development iteration
//
// Default ping interval is 3s, this gives a 2s & 4s grace
RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD: "5000",
RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD: "7000",
},
});

Expand Down
Loading