diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 1773694e3f..f86fd50ff8 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -35,6 +35,21 @@ pub struct Pegboard { /// /// **Experimental** pub reschedule_backoff_max_exponent: Option, + /// How long after last ping before considering a runner ineligible for allocation. + /// + /// Unit is in milliseconds. + /// + /// **Experimental** + pub runner_eligible_threshold: Option, + /// How long to wait after last ping before forcibly removing a runner from the database + /// and deleting its workflow, evicting all actors. + /// + /// Note that the runner may still be running and can reconnect. + /// + /// Unit is in milliseconds. + /// + /// **Experimental** + pub runner_lost_threshold: Option, } impl Pegboard { @@ -57,4 +72,14 @@ impl Pegboard { pub fn reschedule_backoff_max_exponent(&self) -> usize { self.reschedule_backoff_max_exponent.unwrap_or(8) } + + pub fn runner_eligible_threshold(&self) -> i64 { + self.runner_eligible_threshold + .unwrap_or(util::duration::seconds(10)) + } + + pub fn runner_lost_threshold(&self) -> i64 { + self.runner_lost_threshold + .unwrap_or(util::duration::seconds(15)) + } } diff --git a/engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs b/engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs index 0682db32b7..be31d4c962 100644 --- a/engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs +++ b/engine/packages/pegboard/src/ops/runner/update_alloc_idx.rs @@ -2,7 +2,7 @@ use gas::prelude::*; use universaldb::options::ConflictRangeType; use universaldb::utils::IsolationLevel::*; -use crate::{keys, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS}; +use crate::keys; #[derive(Debug)] pub struct Input { @@ -45,6 +45,8 @@ pub enum RunnerEligibility { #[operation] pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) -> Result { + let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold(); + let notifications = ctx .udb()? .run(|tx| { @@ -183,7 +185,7 @@ pub async fn pegboard_runner_update_alloc_idx(ctx: &OperationCtx, input: &Input) )?; if last_ping_ts.saturating_sub(old_last_ping_ts) - > RUNNER_ELIGIBLE_THRESHOLD_MS + > runner_eligible_threshold { notifications.push(RunnerNotification { runner_id: runner.runner_id, diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 71d1d2956b..751c39adb4 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -13,7 +13,7 @@ use std::time::Instant; use universaldb::options::{ConflictRangeType, MutationType, StreamingMode}; use universaldb::utils::{FormalKey, IsolationLevel::*}; -use crate::{keys, metrics, workflows::runner::RUNNER_ELIGIBLE_THRESHOLD_MS}; +use crate::{keys, metrics}; use super::{Allocate, Destroy, Input, PendingAllocation, State, destroy}; @@ -148,12 +148,14 @@ async fn allocate_actor( }) .unwrap_or_default(); + let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold(); + // NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the // client wf let (for_serverless, res) = ctx .udb()? .run(|tx| async move { - let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS; + let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; // Check if runner is an serverless runner let for_serverless = tx diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index c25c11a3d1..5b11f6fb97 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -12,13 +12,6 @@ use vbare::OwnedVersionedData; use crate::{keys, metrics, workflows::actor::Allocate}; -/// How long after last ping before considering a runner ineligible for allocation. -pub const RUNNER_ELIGIBLE_THRESHOLD_MS: i64 = util::duration::seconds(10); -/// How long to wait after last ping before forcibly removing a runner from the database and deleting its -/// workflow, evicting all actors. Note that the runner may still be running and can reconnect. -/// -/// Runner ping interval is currently set to 3s. -const RUNNER_LOST_THRESHOLD_MS: i64 = util::duration::seconds(15); /// Batch size of how many events to ack. const EVENT_ACK_BATCH_SIZE: i64 = 500; @@ -88,8 +81,10 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> let input = input.clone(); async move { + let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold(); + match ctx - .listen_with_timeout::
(RUNNER_LOST_THRESHOLD_MS) + .listen_with_timeout::
(runner_lost_threshold) .await? { Some(Main::Forward(sig)) => { @@ -117,7 +112,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> runner_id: input.runner_id.to_string(), last_event_idx: init_data.last_event_idx, metadata: protocol::ProtocolMetadata { - runner_lost_threshold: RUNNER_LOST_THRESHOLD_MS, + runner_lost_threshold: runner_lost_threshold, }, }), }) @@ -403,7 +398,7 @@ async fn handle_stopping( ) -> Result<()> { if !state.draining { // The workflow will enter a draining state where it can still process signals if - // needed. After RUNNER_LOST_THRESHOLD_MS it will exit this loop and stop. + // needed. After the runner lost threshold it will exit this loop and stop. state.draining = true; // Can't parallelize these two activities, requires reading from state @@ -942,6 +937,8 @@ struct CheckExpiredInput { #[activity(CheckExpired)] async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result { + let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold(); + ctx.udb()? .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); @@ -954,7 +951,7 @@ async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result Result { + let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold(); + // NOTE: This txn should closely resemble the one found in the allocate_actor activity of the actor wf let (allocations, pending_actor_count) = ctx .udb()? @@ -1012,7 +1011,7 @@ pub(crate) async fn allocate_pending_actors( Snapshot, ); let mut pending_actor_count = 0; - let ping_threshold_ts = util::timestamp::now() - RUNNER_ELIGIBLE_THRESHOLD_MS; + let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; 'queue_loop: loop { let Some(queue_entry) = queue_stream.try_next().await? else { diff --git a/rivetkit-typescript/packages/rivetkit/src/engine-process/mod.ts b/rivetkit-typescript/packages/rivetkit/src/engine-process/mod.ts index bcd73a123d..1b9c91f62c 100644 --- a/rivetkit-typescript/packages/rivetkit/src/engine-process/mod.ts +++ b/rivetkit-typescript/packages/rivetkit/src/engine-process/mod.ts @@ -99,6 +99,11 @@ export async function ensureEngineProcess( RIVET__PEGBOARD__BASE_RETRY_TIMEOUT: "100", // Set max exponent to 1 to have a maximum of base_retry_timeout RIVET__PEGBOARD__RESCHEDULE_BACKOFF_MAX_EXPONENT: "1", + // Reduce thresholds for faster development iteration + // + // Default ping interval is 3s, this gives a 2s & 4s grace + RIVET__PEGBOARD__RUNNER_ELIGIBLE_THRESHOLD: "5000", + RIVET__PEGBOARD__RUNNER_LOST_THRESHOLD: "7000", }, });