From 4f58c50ced3a761b6e22f8448d07e75b4cecbe85 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 20 Nov 2025 12:15:22 -0800 Subject: [PATCH] fix(pb): add ability to timeout force resceduling pending state --- .../guard/src/routing/pegboard_gateway.rs | 25 ++-- .../pegboard/src/workflows/actor/mod.rs | 107 ++++++++++-------- .../pegboard/src/workflows/actor/runtime.rs | 102 +++++++++++++---- 3 files changed, 158 insertions(+), 76 deletions(-) diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index 22054a9ac2..67d02102b5 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -8,6 +8,7 @@ use rivet_guard_core::proxy_service::{RouteConfig, RouteTarget, RoutingOutput, R use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_ACTOR, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN}; use crate::{errors, shared_state::SharedState}; +const ACTOR_FORCE_WAKE_PENDING_TIMEOUT: i64 = util::duration::seconds(60); const ACTOR_READY_TIMEOUT: Duration = Duration::from_secs(10); pub const X_RIVET_ACTOR: HeaderName = HeaderName::from_static("x-rivet-actor"); @@ -169,10 +170,14 @@ async fn route_request_inner( if actor.sleeping { tracing::debug!(?actor_id, "actor sleeping, waking"); - ctx.signal(pegboard::workflows::actor::Wake {}) - .to_workflow_id(actor.workflow_id) - .send() - .await?; + ctx.signal(pegboard::workflows::actor::Wake { + allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep { + pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT), + }, + }) + .to_workflow_id(actor.workflow_id) + .send() + .await?; } let runner_id = if let (Some(runner_id), true) = (actor.runner_id, actor.connectable) { @@ -193,10 +198,14 @@ async fn route_request_inner( tracing::debug!(?actor_id, ?wake_retries, "actor stopped while we were waiting for it to become ready, attempting rewake"); wake_retries += 1; - let res = ctx.signal(pegboard::workflows::actor::Wake {}) - .to_workflow_id(actor.workflow_id) - .send() - .await; + let res = ctx.signal(pegboard::workflows::actor::Wake { + allocation_override: pegboard::workflows::actor::AllocationOverride::DontSleep { + pending_timeout: Some(ACTOR_FORCE_WAKE_PENDING_TIMEOUT), + }, + }) + .to_workflow_id(actor.workflow_id) + .send() + .await; if let Some(WorkflowError::WorkflowNotFound) = res .as_ref() diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index fea15c712f..53fb5d9159 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -10,6 +10,8 @@ mod keys; mod runtime; mod setup; +pub use runtime::AllocationOverride; + #[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct Input { pub actor_id: Id, @@ -211,38 +213,39 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .send() .await?; - let lifecycle_state = match runtime::spawn_actor(ctx, input, 0, false).await? { - runtime::SpawnActorOutput::Allocated { - runner_id, - runner_workflow_id, - } => runtime::LifecycleState::new( - runner_id, - runner_workflow_id, - ctx.config().pegboard().actor_start_threshold(), - ), - runtime::SpawnActorOutput::Sleep => { - ctx.activity(runtime::SetSleepingInput { - actor_id: input.actor_id, - }) - .await?; + let lifecycle_state = + match runtime::spawn_actor(ctx, input, 0, AllocationOverride::None).await? { + runtime::SpawnActorOutput::Allocated { + runner_id, + runner_workflow_id, + } => runtime::LifecycleState::new( + runner_id, + runner_workflow_id, + ctx.config().pegboard().actor_start_threshold(), + ), + runtime::SpawnActorOutput::Sleep => { + ctx.activity(runtime::SetSleepingInput { + actor_id: input.actor_id, + }) + .await?; - runtime::LifecycleState::new_sleeping() - } - runtime::SpawnActorOutput::Destroy => { - // Destroyed early - ctx.workflow(destroy::Input { - namespace_id: input.namespace_id, - actor_id: input.actor_id, - name: input.name.clone(), - key: input.key.clone(), - generation: 0, - }) - .output() - .await?; + runtime::LifecycleState::new_sleeping() + } + runtime::SpawnActorOutput::Destroy => { + // Destroyed early + ctx.workflow(destroy::Input { + namespace_id: input.namespace_id, + actor_id: input.actor_id, + name: input.name.clone(), + key: input.key.clone(), + generation: 0, + }) + .output() + .await?; - return Ok(()); - } - }; + return Ok(()); + } + }; let lifecycle_res = ctx .loope( @@ -273,10 +276,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } else { tracing::debug!(actor_id=?input.actor_id, "actor wake"); - state.wake_for_alarm = true; - // Fake signal - Main::Wake(Wake {}) + Main::Wake(Wake { allocation_override: AllocationOverride::DontSleep { pending_timeout: None } }) } } else { // Listen for signal normally @@ -404,14 +405,14 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } } } - Main::Wake(_sig) => { + Main::Wake(sig) => { if state.sleeping { if state.runner_id.is_none() { state.alarm_ts = None; state.sleeping = false; state.will_wake = false; - match runtime::reschedule_actor(ctx, &input, state, false) + match runtime::reschedule_actor(ctx, &input, state, sig.allocation_override) .await? { runtime::SpawnActorOutput::Allocated { .. } => {} @@ -425,8 +426,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> })); } } - - state.wake_for_alarm = false; } else if !state.will_wake { state.will_wake = true; @@ -440,8 +439,6 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> actor_id=?input.actor_id, "cannot wake actor that is not sleeping", ); - - state.wake_for_alarm = false; } } Main::Lost(sig) => { @@ -569,16 +566,15 @@ async fn handle_stopped( tracing::debug!(?variant, "actor stopped"); let force_reschedule = match &variant { - StoppedVariant::Normal { - code: protocol::StopCode::Ok, - } => { + StoppedVariant::Normal { code } => { // Reset retry count on successful exit - state.reschedule_state = Default::default(); + if let protocol::StopCode::Ok = code { + state.reschedule_state = Default::default(); + } false } StoppedVariant::Lost { force_reschedule } => *force_reschedule, - _ => false, }; // Clear stop gc timeout to prevent being marked as lost in the lifecycle loop @@ -640,7 +636,16 @@ async fn handle_stopped( // Reschedule no matter what if force_reschedule { - match runtime::reschedule_actor(ctx, &input, state, true).await? { + match runtime::reschedule_actor( + ctx, + &input, + state, + AllocationOverride::DontSleep { + pending_timeout: None, + }, + ) + .await? + { runtime::SpawnActorOutput::Allocated { .. } => {} // NOTE: This should be unreachable because force_reschedule is true runtime::SpawnActorOutput::Sleep => { @@ -667,7 +672,9 @@ async fn handle_stopped( match (input.crash_policy, graceful_exit) { (CrashPolicy::Restart, false) => { - match runtime::reschedule_actor(ctx, &input, state, false).await? { + match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None) + .await? + { runtime::SpawnActorOutput::Allocated { .. } => {} // NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash // policy is `Restart`. @@ -698,7 +705,7 @@ async fn handle_stopped( else if state.will_wake { state.sleeping = false; - match runtime::reschedule_actor(ctx, &input, state, false).await? { + match runtime::reschedule_actor(ctx, &input, state, AllocationOverride::None).await? { runtime::SpawnActorOutput::Allocated { .. } => {} runtime::SpawnActorOutput::Sleep => { state.sleeping = true; @@ -708,7 +715,6 @@ async fn handle_stopped( } } - state.wake_for_alarm = false; state.will_wake = false; state.going_away = false; @@ -749,7 +755,10 @@ pub struct Event { } #[signal("pegboard_actor_wake")] -pub struct Wake {} +pub struct Wake { + #[serde(default)] + pub allocation_override: AllocationOverride, +} #[derive(Debug)] #[signal("pegboard_actor_lost")] diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 7f062d03e6..71f4689390 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -36,9 +36,6 @@ pub struct LifecycleState { /// If a wake was received in between an actor's intent to sleep and actor stop. #[serde(default)] pub will_wake: bool, - /// Whether or not the last wake was triggered by an alarm. - #[serde(default)] - pub wake_for_alarm: bool, pub alarm_ts: Option, /// Handles cleaning up the actor if it does not receive a certain state before the timeout (ex. /// created -> running event, stop intent -> stop event). If the timeout is reached, the actor is @@ -58,7 +55,6 @@ impl LifecycleState { stopping: false, going_away: false, will_wake: false, - wake_for_alarm: false, alarm_ts: None, gc_timeout_ts: Some(util::timestamp::now() + actor_start_threshold), reschedule_state: RescheduleState::default(), @@ -74,7 +70,6 @@ impl LifecycleState { stopping: false, going_away: false, will_wake: false, - wake_for_alarm: false, alarm_ts: None, gc_timeout_ts: None, reschedule_state: RescheduleState::default(), @@ -117,7 +112,6 @@ async fn update_runner(ctx: &ActivityCtx, input: &UpdateRunnerInput) -> Result<( struct AllocateActorInput { actor_id: Id, generation: u32, - /// When set, forces actors with CrashPolicy::Sleep to pend instead of sleep. force_allocate: bool, } @@ -478,6 +472,15 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result }, +} + #[derive(Debug)] pub enum SpawnActorOutput { Allocated { @@ -493,14 +496,14 @@ pub async fn spawn_actor( ctx: &mut WorkflowCtx, input: &Input, generation: u32, - force_allocate: bool, + allocation_override: AllocationOverride, ) -> Result { // Attempt allocation let allocate_res = ctx .activity(AllocateActorInput { actor_id: input.actor_id, generation, - force_allocate, + force_allocate: matches!(&allocation_override, AllocationOverride::DontSleep { .. }), }) .await?; @@ -552,10 +555,20 @@ pub async fn spawn_actor( .send() .await?; + let signal = if let AllocationOverride::DontSleep { + pending_timeout: Some(timeout), + } = allocation_override + { + ctx.listen_with_timeout::(timeout) + .await? + } else { + Some(ctx.listen::().await?) + }; + // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for // an `Allocate` signal - match ctx.listen::().await? { - PendingAllocation::Allocate(sig) => { + match signal { + Some(PendingAllocation::Allocate(sig)) => { ctx.activity(UpdateRunnerInput { actor_id: input.actor_id, runner_id: sig.runner_id, @@ -591,7 +604,7 @@ pub async fn spawn_actor( runner_workflow_id: sig.runner_workflow_id, }) } - PendingAllocation::Destroy(_) => { + Some(PendingAllocation::Destroy(_)) => { tracing::debug!(actor_id=?input.actor_id, "destroying before actor allocated"); let cleared = ctx @@ -618,6 +631,63 @@ pub async fn spawn_actor( Ok(SpawnActorOutput::Destroy) } + None => { + tracing::debug!(actor_id=?input.actor_id, "timed out before actor allocated"); + + let cleared = ctx + .activity(ClearPendingAllocationInput { + actor_id: input.actor_id, + namespace_id: input.namespace_id, + runner_name_selector: input.runner_name_selector.clone(), + pending_allocation_ts, + }) + .await?; + + // If this actor was no longer present in the queue it means it was allocated. We must now + // wait for the allocated signal to prevent a race condition. + if !cleared { + let sig = ctx.listen::().await?; + + ctx.activity(UpdateRunnerInput { + actor_id: input.actor_id, + runner_id: sig.runner_id, + runner_workflow_id: sig.runner_workflow_id, + }) + .await?; + + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStartActor( + protocol::CommandStartActor { + actor_id: input.actor_id.to_string(), + generation, + config: protocol::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_requests: Vec::new(), + }, + ), + }) + .to_workflow_id(sig.runner_workflow_id) + .send() + .await?; + + Ok(SpawnActorOutput::Allocated { + runner_id: sig.runner_id, + runner_workflow_id: sig.runner_workflow_id, + }) + } else { + Ok(SpawnActorOutput::Sleep) + } + } } } AllocateActorOutput::Sleep => Ok(SpawnActorOutput::Sleep), @@ -630,7 +700,7 @@ pub async fn reschedule_actor( ctx: &mut WorkflowCtx, input: &Input, state: &mut LifecycleState, - force_reschedule: bool, + allocation_override: AllocationOverride, ) -> Result { tracing::debug!(actor_id=?input.actor_id, "rescheduling actor"); @@ -672,13 +742,7 @@ pub async fn reschedule_actor( } let next_generation = state.generation + 1; - let spawn_res = spawn_actor( - ctx, - &input, - next_generation, - force_reschedule || state.wake_for_alarm, - ) - .await?; + let spawn_res = spawn_actor(ctx, &input, next_generation, allocation_override).await?; if let SpawnActorOutput::Allocated { runner_id,