diff --git a/packages/services/pegboard/src/workflows/actor/mod.rs b/packages/services/pegboard/src/workflows/actor/mod.rs index 709313735e..c594443b68 100644 --- a/packages/services/pegboard/src/workflows/actor/mod.rs +++ b/packages/services/pegboard/src/workflows/actor/mod.rs @@ -219,25 +219,39 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .send() .await?; - let Some(allocate_res) = runtime::spawn_actor(ctx, input, 0).await? else { - // Destroyed early - ctx.workflow(destroy::Input { - namespace_id: input.namespace_id, - actor_id: input.actor_id, - name: input.name.clone(), - key: input.key.clone(), - generation: 0, - kill: false, - }) - .output() - .await?; + let lifecycle_state = match runtime::spawn_actor(ctx, input, 0, false).await? { + runtime::SpawnActorOutput::Allocated { + runner_id, + runner_workflow_id, + } => runtime::LifecycleState::new(runner_id, runner_workflow_id), + runtime::SpawnActorOutput::Sleep => { + ctx.activity(runtime::SetSleepingInput { + actor_id: input.actor_id, + }) + .await?; - return Ok(()); + runtime::LifecycleState::new_sleeping() + } + runtime::SpawnActorOutput::Destroy => { + // Destroyed early + ctx.workflow(destroy::Input { + namespace_id: input.namespace_id, + actor_id: input.actor_id, + name: input.name.clone(), + key: input.key.clone(), + generation: 0, + kill: false, + }) + .output() + .await?; + + return Ok(()); + } }; let lifecycle_res = ctx .loope( - runtime::LifecycleState::new(allocate_res.runner_id, allocate_res.runner_workflow_id), + lifecycle_state, |ctx, state| { let input = input.clone(); @@ -262,6 +276,8 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } else { tracing::debug!(actor_id=?input.actor_id, "actor wake"); + state.wake_for_alarm = true; + // Fake signal Main::Wake(Wake {}) } @@ -370,16 +386,24 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> state.sleeping = false; state.will_wake = false; - if runtime::reschedule_actor(ctx, &input, state).await? { - // Destroyed early - return Ok(Loop::Break(runtime::LifecycleRes { - generation: state.generation, - // False here because if we received the destroy signal, it is - // guaranteed that we did not allocate another actor. - kill: false, - })); + match runtime::reschedule_actor(ctx, &input, state).await? { + runtime::SpawnActorOutput::Allocated { .. } => {}, + runtime::SpawnActorOutput::Sleep => { + state.sleeping = true; + } + runtime::SpawnActorOutput::Destroy => { + // Destroyed early + return Ok(Loop::Break(runtime::LifecycleRes { + generation: state.generation, + // False here because if we received the destroy signal, it is + // guaranteed that we did not allocate another actor. + kill: false, + })); + } } - } else { + + state.wake_for_alarm = false; + } else if !state.will_wake { state.will_wake = true; tracing::debug!( @@ -525,14 +549,19 @@ async fn handle_stopped( .await?; } - if runtime::reschedule_actor(ctx, &input, state).await? { - // Destroyed early - return Ok(Some(runtime::LifecycleRes { - generation: state.generation, - // False here because if we received the destroy signal, it is - // guaranteed that we did not allocate another actor. - kill: false, - })); + match runtime::reschedule_actor(ctx, &input, state).await? { + runtime::SpawnActorOutput::Allocated { .. } => {} + // NOTE: Its not possible for `SpawnActorOutput::Sleep` to be returned here, the crash + // policy is `Restart`. + runtime::SpawnActorOutput::Sleep | runtime::SpawnActorOutput::Destroy => { + // Destroyed early + return Ok(Some(runtime::LifecycleRes { + generation: state.generation, + // False here because if we received the destroy signal, it is + // guaranteed that we did not allocate another actor. + kill: false, + })); + } } } (true, CrashPolicy::Sleep) => { @@ -566,17 +595,26 @@ async fn handle_stopped( } // Rewake actor immediately after stopping if `will_wake` was set else if state.will_wake { + state.sleeping = false; state.will_wake = false; - if runtime::reschedule_actor(ctx, &input, state).await? { - // Destroyed early - return Ok(Some(runtime::LifecycleRes { - generation: state.generation, - // False here because if we received the destroy signal, it is - // guaranteed that we did not allocate another actor. - kill: false, - })); + match runtime::reschedule_actor(ctx, &input, state).await? { + runtime::SpawnActorOutput::Allocated { .. } => {} + runtime::SpawnActorOutput::Sleep => { + state.sleeping = true; + } + runtime::SpawnActorOutput::Destroy => { + // Destroyed early + return Ok(Some(runtime::LifecycleRes { + generation: state.generation, + // False here because if we received the destroy signal, it is + // guaranteed that we did not allocate another actor. + kill: false, + })); + } } + + state.wake_for_alarm = false; } Ok(None) diff --git a/packages/services/pegboard/src/workflows/actor/runtime.rs b/packages/services/pegboard/src/workflows/actor/runtime.rs index ce608db648..23bdadfeea 100644 --- a/packages/services/pegboard/src/workflows/actor/runtime.rs +++ b/packages/services/pegboard/src/workflows/actor/runtime.rs @@ -1,11 +1,11 @@ use base64::Engine; use base64::prelude::BASE64_STANDARD; use futures_util::StreamExt; -use futures_util::{FutureExt, TryStreamExt}; +use futures_util::TryStreamExt; use gas::prelude::*; use rivet_metrics::KeyValue; use rivet_runner_protocol as protocol; -use rivet_types::keys::namespace::runner_config::RunnerConfigVariant; +use rivet_types::{actors::CrashPolicy, keys::namespace::runner_config::RunnerConfigVariant}; use std::time::Instant; use universaldb::options::{ConflictRangeType, MutationType, StreamingMode}; use universaldb::utils::{FormalKey, IsolationLevel::*}; @@ -26,8 +26,12 @@ pub struct LifecycleState { pub runner_workflow_id: Option, pub sleeping: bool, + /// If a wake was received in between an actor's intent to sleep and actor stop. #[serde(default)] pub will_wake: bool, + /// Whether or not the last wake was triggered by an alarm. + #[serde(default)] + pub wake_for_alarm: bool, pub alarm_ts: Option, pub gc_timeout_ts: Option, @@ -42,11 +46,26 @@ impl LifecycleState { runner_workflow_id: Some(runner_workflow_id), sleeping: false, will_wake: false, + wake_for_alarm: false, alarm_ts: None, gc_timeout_ts: Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS), reschedule_state: RescheduleState::default(), } } + + pub fn new_sleeping() -> Self { + LifecycleState { + generation: 0, + runner_id: None, + runner_workflow_id: None, + sleeping: true, + will_wake: false, + wake_for_alarm: false, + alarm_ts: None, + gc_timeout_ts: None, + reschedule_state: RescheduleState::default(), + } + } } #[derive(Serialize, Deserialize)] @@ -84,14 +103,21 @@ async fn update_runner(ctx: &ActivityCtx, input: &UpdateRunnerInput) -> Result<( #[derive(Debug, Serialize, Deserialize, Hash)] struct AllocateActorInput { actor_id: Id, - generation: u32, runner_name_selector: String, + generation: u32, + from_alarm: bool, } #[derive(Debug, Serialize, Deserialize)] -pub struct AllocateActorOutput { - pub runner_id: Id, - pub runner_workflow_id: Id, +pub enum AllocateActorOutput { + Allocated { + runner_id: Id, + runner_workflow_id: Id, + }, + Pending { + pending_allocation_ts: i64, + }, + Sleep, } // If no availability, returns the timestamp of the actor's queue key @@ -99,10 +125,11 @@ pub struct AllocateActorOutput { async fn allocate_actor( ctx: &ActivityCtx, input: &AllocateActorInput, -) -> Result> { +) -> Result { let start_instant = Instant::now(); let mut state = ctx.state::()?; let namespace_id = state.namespace_id; + let crash_policy = state.crash_policy; // NOTE: This txn should closely resemble the one found in the allocate_pending_actors activity of the // client wf @@ -260,53 +287,75 @@ async fn allocate_actor( return Ok(( for_serverless, - Ok(AllocateActorOutput { + AllocateActorOutput::Allocated { runner_id: old_runner_alloc_key.runner_id, runner_workflow_id: old_runner_alloc_key_data.workflow_id, - }), + }, )); } } - // At this point in the txn there is no availability. Write the actor to the alloc queue to wait. + // At this point in the txn there is no availability - let pending_ts = util::timestamp::now(); + match (crash_policy, input.from_alarm) { + (CrashPolicy::Sleep, false) => Ok((for_serverless, AllocateActorOutput::Sleep)), + // Write the actor to the alloc queue to wait + _ => { + let pending_allocation_ts = util::timestamp::now(); - // NOTE: This will conflict with serializable reads to the alloc queue, which is the behavior we - // want. If a runner reads from the queue while this is being inserted, one of the two txns will - // retry and we ensure the actor does not end up in queue limbo. - tx.write( - &keys::ns::PendingActorByRunnerNameSelectorKey::new( - namespace_id, - input.runner_name_selector.clone(), - pending_ts, - input.actor_id, - ), - input.generation, - )?; + // NOTE: This will conflict with serializable reads to the alloc queue, which is the behavior we + // want. If a runner reads from the queue while this is being inserted, one of the two txns will + // retry and we ensure the actor does not end up in queue limbo. + tx.write( + &keys::ns::PendingActorByRunnerNameSelectorKey::new( + namespace_id, + input.runner_name_selector.clone(), + pending_allocation_ts, + input.actor_id, + ), + input.generation, + )?; - return Ok((for_serverless, Err(pending_ts))); + Ok(( + for_serverless, + AllocateActorOutput::Pending { + pending_allocation_ts, + }, + )) + } + } }) .custom_instrument(tracing::info_span!("actor_allocate_tx")) .await?; let dt = start_instant.elapsed().as_secs_f64(); - metrics::ACTOR_ALLOCATE_DURATION - .record(dt, &[KeyValue::new("did_reserve", res.is_ok().to_string())]); + metrics::ACTOR_ALLOCATE_DURATION.record( + dt, + &[KeyValue::new( + "did_reserve", + matches!(res, AllocateActorOutput::Allocated { .. }).to_string(), + )], + ); state.for_serverless = for_serverless; state.allocated_slot = true; match &res { - Ok(res) => { + AllocateActorOutput::Allocated { + runner_id, + runner_workflow_id, + } => { state.sleep_ts = None; state.pending_allocation_ts = None; - state.runner_id = Some(res.runner_id); - state.runner_workflow_id = Some(res.runner_workflow_id); + state.runner_id = Some(*runner_id); + state.runner_workflow_id = Some(*runner_workflow_id); } - Err(pending_allocation_ts) => { + AllocateActorOutput::Pending { + pending_allocation_ts, + } => { state.pending_allocation_ts = Some(*pending_allocation_ts); } + AllocateActorOutput::Sleep => {} } Ok(res) @@ -389,34 +438,83 @@ pub async fn deallocate(ctx: &ActivityCtx, input: &DeallocateInput) -> Result Result> { + from_alarm: bool, +) -> Result { // Attempt allocation let allocate_res = ctx .activity(AllocateActorInput { actor_id: input.actor_id, runner_name_selector: input.runner_name_selector.clone(), generation, + from_alarm, }) .await?; - // Always bump the autoscaler so it can scale up - ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) - .send() - .await?; + match allocate_res { + AllocateActorOutput::Allocated { + runner_id, + runner_workflow_id, + } => { + // Bump the autoscaler so it can scale up + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; - let allocate_res = match allocate_res { - Ok(x) => x, - Err(pending_allocation_ts) => { + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { + actor_id: input.actor_id.to_string(), + generation, + config: protocol::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + // HACK: We should not use dynamic timestamp here, but we don't validate if signal data + // changes (like activity inputs) so this is fine for now. + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + }), + }) + .to_workflow_id(runner_workflow_id) + .send() + .await?; + + Ok(SpawnActorOutput::Allocated { + runner_id, + runner_workflow_id, + }) + } + AllocateActorOutput::Pending { + pending_allocation_ts, + } => { tracing::warn!( actor_id=?input.actor_id, "failed to allocate (no availability), waiting for allocation", ); + // Bump the autoscaler so it can scale up + ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) + .send() + .await?; + // If allocation fails, the allocate txn already inserted this actor into the queue. Now we wait for // an `Allocate` signal match ctx.listen::().await? { @@ -428,10 +526,32 @@ pub async fn spawn_actor( }) .await?; - AllocateActorOutput { + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { + actor_id: input.actor_id.to_string(), + generation, + config: protocol::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + // HACK: We should not use dynamic timestamp here, but we don't validate if signal data + // changes (like activity inputs) so this is fine for now. + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + }), + }) + .to_workflow_id(sig.runner_workflow_id) + .send() + .await?; + + Ok(SpawnActorOutput::Allocated { runner_id: sig.runner_id, runner_workflow_id: sig.runner_workflow_id, - } + }) } PendingAllocation::Destroy(_) => { tracing::debug!(actor_id=?input.actor_id, "destroying before actor allocated"); @@ -458,118 +578,78 @@ pub async fn spawn_actor( .await?; } - return Ok(None); + Ok(SpawnActorOutput::Destroy) } } } - }; - - ctx.signal(crate::workflows::runner::Command { - inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { - actor_id: input.actor_id.to_string(), - generation, - config: protocol::ActorConfig { - name: input.name.clone(), - key: input.key.clone(), - // HACK: We should not use dynamic timestamp here, but we don't validate if signal data - // changes (like activity inputs) so this is fine for now. - create_ts: util::timestamp::now(), - input: input - .input - .as_ref() - .map(|x| BASE64_STANDARD.decode(x)) - .transpose()?, - }, - }), - }) - .to_workflow_id(allocate_res.runner_workflow_id) - .send() - .await?; - - Ok(Some(allocate_res)) + AllocateActorOutput::Sleep => Ok(SpawnActorOutput::Sleep), + } } -/// Returns true if the actor should be destroyed. +/// Wrapper around `spawn_actor` that handles rescheduling retries. Returns true if the actor should be +/// destroyed. pub async fn reschedule_actor( ctx: &mut WorkflowCtx, input: &Input, state: &mut LifecycleState, -) -> Result { +) -> Result { tracing::debug!(actor_id=?input.actor_id, "rescheduling actor"); - let next_generation = state.generation + 1; + // Determine next backoff sleep duration + let mut backoff = util::backoff::Backoff::new_at( + 8, + None, + BASE_RETRY_TIMEOUT_MS, + 500, + state.reschedule_state.retry_count, + ); + + let (now, reset) = ctx + .v(2) + .activity(CompareRetryInput { + last_retry_ts: state.reschedule_state.last_retry_ts, + }) + .await?; - // Waits for the actor to be ready (or destroyed) and automatically retries if failed to allocate. - let res = ctx - .loope(state.reschedule_state.clone(), |ctx, resched_state| { - let input = input.clone(); - - async move { - // Determine next backoff sleep duration - let mut backoff = util::backoff::Backoff::new_at( - 8, - None, - BASE_RETRY_TIMEOUT_MS, - 500, - resched_state.retry_count, - ); + state.reschedule_state.retry_count = if reset { + 0 + } else { + state.reschedule_state.retry_count + 1 + }; + state.reschedule_state.last_retry_ts = now; - let (now, reset) = ctx - .v(2) - .activity(CompareRetryInput { - last_retry_ts: resched_state.last_retry_ts, - }) - .await?; + // Don't sleep for first retry + if state.reschedule_state.retry_count > 0 { + let next = backoff.step().expect("should not have max retry"); - resched_state.retry_count = if reset { - 0 - } else { - resched_state.retry_count + 1 - }; - resched_state.last_retry_ts = now; - - // Don't sleep for first retry - if resched_state.retry_count > 0 { - let next = backoff.step().expect("should not have max retry"); - - // Sleep for backoff or destroy early - if let Some(_sig) = ctx - .listen_with_timeout::(Instant::from(next) - Instant::now()) - .await? - { - tracing::debug!("destroying before actor start"); - - return Ok(Loop::Break(None)); - } - } + // Sleep for backoff or destroy early + if let Some(_sig) = ctx + .listen_with_timeout::(Instant::from(next) - Instant::now()) + .await? + { + tracing::debug!("destroying before actor start"); - if let Some(res) = spawn_actor(ctx, &input, next_generation).await? { - Ok(Loop::Break(Some((resched_state.clone(), res)))) - } else { - // Destroyed early - Ok(Loop::Break(None)) - } - } - .boxed() - }) - .await?; + return Ok(SpawnActorOutput::Destroy); + } + } - // Update loop state - if let Some((reschedule_state, res)) = res { - state.generation = next_generation; - state.runner_id = Some(res.runner_id); - state.runner_workflow_id = Some(res.runner_workflow_id); + let next_generation = state.generation + 1; + let spawn_res = spawn_actor(ctx, &input, next_generation, state.wake_for_alarm).await?; - // Save reschedule state in global state - state.reschedule_state = reschedule_state; + if let SpawnActorOutput::Allocated { + runner_id, + runner_workflow_id, + } = &spawn_res + { + state.generation = next_generation; + state.runner_id = Some(*runner_id); + state.runner_workflow_id = Some(*runner_workflow_id); // Reset gc timeout once allocated state.gc_timeout_ts = Some(util::timestamp::now() + ACTOR_START_THRESHOLD_MS); - - Ok(false) - } else { - Ok(true) } + + Ok(spawn_res) } #[derive(Debug, Serialize, Deserialize, Hash)]