From 4eabd07a6add872f3a5b81e2e024aba162c958de Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 17 Nov 2025 18:29:34 -0800 Subject: [PATCH] fix(pb): rewrite runner wf to handle batch signals --- engine/packages/pegboard-runner/src/conn.rs | 2 +- .../packages/pegboard-runner/src/ping_task.rs | 2 +- .../pegboard-runner/src/ws_to_tunnel_task.rs | 2 +- .../pegboard/src/workflows/actor/mod.rs | 10 +- .../pegboard/src/workflows/actor/runtime.rs | 4 +- engine/packages/pegboard/src/workflows/mod.rs | 1 + .../pegboard/src/workflows/runner2.rs | 1176 +++++++++++++++++ 7 files changed, 1187 insertions(+), 10 deletions(-) create mode 100644 engine/packages/pegboard/src/workflows/runner2.rs diff --git a/engine/packages/pegboard-runner/src/conn.rs b/engine/packages/pegboard-runner/src/conn.rs index 8a47743f2e..4602115fdd 100644 --- a/engine/packages/pegboard-runner/src/conn.rs +++ b/engine/packages/pegboard-runner/src/conn.rs @@ -166,7 +166,7 @@ pub async fn init_conn( }; // Forward to runner wf - ctx.signal(pegboard::workflows::runner::Forward { inner: packet }) + ctx.signal(pegboard::workflows::runner2::Forward { inner: packet }) .to_workflow_id(workflow_id) .send() .await diff --git a/engine/packages/pegboard-runner/src/ping_task.rs b/engine/packages/pegboard-runner/src/ping_task.rs index 5a0e783481..445240e777 100644 --- a/engine/packages/pegboard-runner/src/ping_task.rs +++ b/engine/packages/pegboard-runner/src/ping_task.rs @@ -50,7 +50,7 @@ pub async fn task( if let RunnerEligibility::ReEligible = notif.eligibility { tracing::debug!(runner_id=?notif.runner_id, "runner has become eligible again"); - ctx.signal(pegboard::workflows::runner::CheckQueue {}) + ctx.signal(pegboard::workflows::runner2::CheckQueue {}) .to_workflow_id(notif.workflow_id) .send() .await?; diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 5d7f63df04..da41e8b8a3 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -342,7 +342,7 @@ async fn handle_message( | protocol::ToServer::ToServerEvents(_) | protocol::ToServer::ToServerAckCommands(_) | protocol::ToServer::ToServerStopping => { - ctx.signal(pegboard::workflows::runner::Forward { + ctx.signal(pegboard::workflows::runner2::Forward { inner: protocol::ToServer::try_from(msg) .context("failed to convert message for workflow forwarding")?, }) diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index c90f614051..8e92c9e1c9 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -319,7 +319,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; // Send signal to stop actor now that we know it will be sleeping - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStopActor( protocol::CommandStopActor { actor_id: input.actor_id.to_string(), @@ -348,7 +348,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStopActor( protocol::CommandStopActor { actor_id: input.actor_id.to_string(), @@ -495,7 +495,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { actor_id: input.actor_id.to_string(), generation: state.generation, @@ -509,7 +509,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> Main::Destroy(_) => { // If allocated, send stop actor command if let Some(runner_workflow_id) = state.runner_workflow_id { - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { actor_id: input.actor_id.to_string(), generation: state.generation, @@ -628,7 +628,7 @@ async fn handle_stopped( if let (StoppedVariant::Lost { .. }, Some(old_runner_workflow_id)) = (&variant, old_runner_workflow_id) { - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { actor_id: input.actor_id.to_string(), generation: state.generation, diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index ce9bc1acec..bbab70bf2a 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -514,7 +514,7 @@ pub async fn spawn_actor( .send() .await?; - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { actor_id: input.actor_id.to_string(), generation, @@ -563,7 +563,7 @@ pub async fn spawn_actor( }) .await?; - ctx.signal(crate::workflows::runner::Command { + ctx.signal(crate::workflows::runner2::Command { inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { actor_id: input.actor_id.to_string(), generation, diff --git a/engine/packages/pegboard/src/workflows/mod.rs b/engine/packages/pegboard/src/workflows/mod.rs index f8878061f9..9cdac04acc 100644 --- a/engine/packages/pegboard/src/workflows/mod.rs +++ b/engine/packages/pegboard/src/workflows/mod.rs @@ -1,2 +1,3 @@ pub mod actor; pub mod runner; +pub mod runner2; diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs new file mode 100644 index 0000000000..51dca7bef0 --- /dev/null +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -0,0 +1,1176 @@ +use futures_util::{FutureExt, StreamExt, TryStreamExt}; +use gas::prelude::*; +use rivet_data::converted::{ActorNameKeyData, MetadataKeyData, RunnerByKeyKeyData}; +use rivet_metrics::KeyValue; +use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned}; +use universaldb::{ + options::{ConflictRangeType, StreamingMode}, + utils::{FormalChunkedKey, IsolationLevel::*}, +}; +use universalpubsub::PublishOpts; +use vbare::OwnedVersionedData; + +use crate::{keys, metrics, workflows::actor::Allocate}; + +/// Batch size of how many events to ack. +const EVENT_ACK_BATCH_SIZE: i64 = 500; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct Input { + pub runner_id: Id, + pub namespace_id: Id, + pub name: String, + pub key: String, + pub version: u32, + pub total_slots: u32, +} + +#[derive(Debug, Serialize, Deserialize)] +struct State { + namespace_id: Id, + create_ts: i64, + + last_event_idx: i64, + last_command_idx: i64, + commands: Vec, + // events: Vec, +} + +impl State { + fn new(namespace_id: Id, create_ts: i64) -> Self { + State { + namespace_id, + create_ts, + last_event_idx: -1, + last_command_idx: -1, + commands: Vec::new(), + } + } +} + +#[derive(Debug, Serialize, Deserialize)] +struct CommandRow { + index: i64, + command: protocol::Command, + create_ts: i64, +} + +#[workflow] +pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> { + ctx.activity(InitInput { + runner_id: input.runner_id, + name: input.name.clone(), + key: input.key.clone(), + namespace_id: input.namespace_id, + create_ts: ctx.create_ts(), + }) + .await?; + + ctx.loope(LifecycleState::new(), |ctx, state| { + let input = input.clone(); + + async move { + let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold(); + + // Batch fetch pending signals + let signals = ctx + .listen_n_with_timeout::
(runner_lost_threshold, 1024) + .await?; + + if signals.is_empty() { + let expired = ctx + .activity(CheckExpiredInput { + runner_id: input.runner_id, + }) + .await?; + + if state.draining || expired { + return Ok(Loop::Break(())); + } else { + return Ok(Loop::Continue); + } + } + + let mut init = None; + let mut events = Vec::new(); + let mut commands = Vec::new(); + let mut ack_last_command_idx = -1; + let mut check_queue = false; + + // Batch process signals + for signal in signals { + match signal { + Main::Forward(sig) => { + match sig.inner { + protocol::ToServer::ToServerInit(init_sig) => { + if init.is_none() { + init = Some(init_sig); + check_queue = true; + } + } + protocol::ToServer::ToServerEvents(new_events) => { + // Ignore events that were already received + events.extend( + new_events + .into_iter() + .filter(|event| event.index > state.last_event_idx), + ); + } + protocol::ToServer::ToServerAckCommands( + protocol::ToServerAckCommands { last_command_idx }, + ) => { + ack_last_command_idx = ack_last_command_idx.max(last_command_idx); + } + protocol::ToServer::ToServerStopping => { + handle_stopping(ctx, &input, state, false).await?; + } + protocol::ToServer::ToServerPing(_) + | protocol::ToServer::ToServerKvRequest(_) + | protocol::ToServer::ToServerTunnelMessage(_) => { + bail!( + "received message that should not be sent to runner workflow: {:?}", + sig.inner + ) + } + } + } + Main::Command(command) => { + // If draining, ignore start actor command and inform the actor wf that it is lost + if let ( + protocol::Command::CommandStartActor(protocol::CommandStartActor { + actor_id, + generation, + .. + }), + true, + ) = (&command.inner, state.draining) + { + tracing::warn!( + ?actor_id, + "attempt to schedule actor to draining runner, reallocating" + ); + + let res = ctx + .signal(crate::workflows::actor::Lost { + generation: *generation, + // Because this is a race condition, we want the actor to reschedule + // regardless of its crash policy + force_reschedule: true, + reset_rescheduling: true, + }) + .to_workflow::() + .tag("actor_id", actor_id) + .graceful_not_found() + .send() + .await?; + if res.is_none() { + tracing::warn!( + ?actor_id, + "actor workflow not found, likely already stopped" + ); + } + } else { + commands.push(command.inner); + } + } + Main::CheckQueue(_) => { + check_queue = true; + } + Main::Stop(sig) => { + handle_stopping(ctx, &input, state, sig.reset_actor_rescheduling).await?; + } + } + } + + let mut messages = Vec::new(); + + // Process init packet + if let Some(protocol::ToServerInit { + last_command_idx, + prepopulate_actor_names, + metadata, + .. + }) = init + { + let init_data = ctx + .activity(ProcessInitInput { + runner_id: input.runner_id, + namespace_id: input.namespace_id, + last_command_idx: last_command_idx.unwrap_or(-1), + prepopulate_actor_names, + metadata, + }) + .await?; + + messages.push(protocol::ToClient::ToClientInit(protocol::ToClientInit { + runner_id: input.runner_id.to_string(), + last_event_idx: init_data.last_event_idx, + metadata: protocol::ProtocolMetadata { + runner_lost_threshold: runner_lost_threshold, + }, + })); + + // Send missed commands + if !init_data.missed_commands.is_empty() { + messages.push(protocol::ToClient::ToClientCommands( + init_data.missed_commands, + )); + } + + if !state.draining { + ctx.activity(InsertDbInput { + runner_id: input.runner_id, + namespace_id: input.namespace_id, + name: input.name.clone(), + key: input.key.clone(), + version: input.version, + total_slots: input.total_slots, + create_ts: ctx.create_ts(), + }) + .await?; + } + } + + let last_event_idx = events.last().map(|event| event.index); + + // NOTE: This should not be parallelized because signals should be sent in order + // Forward to actor workflows + // Process events + for event in &events { + let actor_id = crate::utils::event_actor_id(&event.inner).to_string(); + let res = ctx + .signal(crate::workflows::actor::Event { + inner: event.inner.clone(), + }) + .to_workflow::() + .tag("actor_id", &actor_id) + .graceful_not_found() + .send() + .await?; + if res.is_none() { + tracing::warn!( + ?actor_id, + "actor workflow not found, likely already stopped" + ); + } + } + + // If events is not empty, insert and ack + if let Some(last_event_idx) = last_event_idx { + ctx.activity(InsertEventsInput { events }).await?; + + state.last_event_idx = last_event_idx; + + // Ack events in batch + if last_event_idx + > state + .last_event_ack_idx + .saturating_add(EVENT_ACK_BATCH_SIZE) + { + state.last_event_ack_idx = last_event_idx; + + messages.push(protocol::ToClient::ToClientAckEvents( + protocol::ToClientAckEvents { + last_event_idx: state.last_event_ack_idx, + }, + )); + } + } + + // Insert/ack commands + let batch_start_index = if ack_last_command_idx != -1 || !commands.is_empty() { + ctx.activity(InsertCommandsInput { + commands: commands.clone(), + ack_last_command_idx, + }) + .await? + } else { + None + }; + + // If commands were present during this loop iteration, forward to runner + if let Some(batch_start_index) = batch_start_index { + messages.push(protocol::ToClient::ToClientCommands( + commands + .into_iter() + .enumerate() + .map(|(index, cmd)| protocol::CommandWrapper { + index: batch_start_index + index as i64, + inner: cmd, + }) + .collect(), + )); + } + + if check_queue { + // Check for pending actors + let res = ctx + .activity(AllocatePendingActorsInput { + namespace_id: input.namespace_id, + name: input.name.clone(), + }) + .await?; + + // Dispatch pending allocs + for alloc in res.allocations { + ctx.signal(alloc.signal) + .to_workflow::() + .tag("actor_id", alloc.actor_id) + .send() + .await?; + } + } + + if !messages.is_empty() { + ctx.activity(SendMessagesToRunnerInput { + runner_id: input.runner_id, + messages, + }) + .await?; + } + + Ok(Loop::Continue) + } + .boxed() + }) + .await?; + + ctx.activity(ClearDbInput { + runner_id: input.runner_id, + name: input.name.clone(), + key: input.key.clone(), + update_state: RunnerState::Stopped, + }) + .await?; + + let actors = ctx + .activity(FetchRemainingActorsInput { + runner_id: input.runner_id, + }) + .await?; + + // Set all remaining actors as lost + for (actor_id, generation) in actors { + let res = ctx + .signal(crate::workflows::actor::Lost { + generation, + force_reschedule: false, + reset_rescheduling: false, + }) + .to_workflow::() + .tag("actor_id", actor_id) + .graceful_not_found() + .send() + .await?; + if res.is_none() { + tracing::warn!( + ?actor_id, + "actor workflow not found, likely already stopped" + ); + } + } + + // Close websocket connection (its unlikely to be open) + ctx.activity(SendMessagesToRunnerInput { + runner_id: input.runner_id, + messages: vec![protocol::ToClient::ToClientClose], + }) + .await?; + + Ok(()) +} + +async fn handle_stopping( + ctx: &mut WorkflowCtx, + input: &Input, + state: &mut LifecycleState, + reset_actor_rescheduling: bool, +) -> Result<()> { + if !state.draining { + // The workflow will enter a draining state where it can still process signals if + // needed. After the runner lost threshold it will exit this loop and stop. + state.draining = true; + + // Can't parallelize these two activities, requires reading from state + ctx.activity(ClearDbInput { + runner_id: input.runner_id, + name: input.name.clone(), + key: input.key.clone(), + update_state: RunnerState::Draining, + }) + .await?; + + let actors = ctx + .activity(FetchRemainingActorsInput { + runner_id: input.runner_id, + }) + .await?; + + // Set all remaining actors as going away immediately + if !actors.is_empty() { + for (actor_id, generation) in &actors { + ctx.signal(crate::workflows::actor::GoingAway { + generation: *generation, + reset_rescheduling: reset_actor_rescheduling, + }) + .to_workflow::() + .tag("actor_id", actor_id) + .send() + .await?; + } + } + } + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize)] +struct LifecycleState { + draining: bool, + last_event_idx: i64, + last_event_ack_idx: i64, +} + +impl LifecycleState { + fn new() -> Self { + LifecycleState { + draining: false, + last_event_idx: -1, + last_event_ack_idx: -1, + } + } +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct InitInput { + runner_id: Id, + name: String, + key: String, + namespace_id: Id, + create_ts: i64, +} + +#[derive(Debug, Serialize, Deserialize)] +struct InitOutput { + /// Deprecated. + evict_workflow_id: Option, +} + +#[activity(Init)] +async fn init(ctx: &ActivityCtx, input: &InitInput) -> Result { + let mut state = ctx.state::>()?; + + *state = Some(State::new(input.namespace_id, input.create_ts)); + + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let runner_by_key_key = keys::ns::RunnerByKeyKey::new( + input.namespace_id, + input.name.clone(), + input.key.clone(), + ); + + // Allocate self + tx.write( + &runner_by_key_key, + RunnerByKeyKeyData { + runner_id: input.runner_id, + workflow_id: ctx.workflow_id(), + }, + )?; + + Ok(()) + }) + .custom_instrument(tracing::info_span!("runner_init_tx")) + .await?; + + Ok(InitOutput { + evict_workflow_id: None, + }) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct InsertDbInput { + runner_id: Id, + namespace_id: Id, + name: String, + key: String, + version: u32, + total_slots: u32, + create_ts: i64, +} + +#[activity(InsertDb)] +async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> Result<()> { + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let remaining_slots_key = keys::runner::RemainingSlotsKey::new(input.runner_id); + let last_ping_ts_key = keys::runner::LastPingTsKey::new(input.runner_id); + let workflow_id_key = keys::runner::WorkflowIdKey::new(input.runner_id); + + let (remaining_slots_entry, last_ping_ts_entry) = tokio::try_join!( + tx.read_opt(&remaining_slots_key, Serializable), + tx.read_opt(&last_ping_ts_key, Serializable), + )?; + let now = util::timestamp::now(); + + // See if key already exists + let existing = if let (Some(remaining_slots), Some(last_ping_ts)) = + (remaining_slots_entry, last_ping_ts_entry) + { + Some((remaining_slots, last_ping_ts)) + } else { + // Initial insert + None + }; + + let (remaining_slots, last_ping_ts) = if let Some(existing) = existing { + existing + } + // NOTE: These properties are only inserted once + else { + tx.write(&workflow_id_key, ctx.workflow_id())?; + + tx.write( + &keys::runner::NamespaceIdKey::new(input.runner_id), + input.namespace_id, + )?; + + tx.write( + &keys::runner::NameKey::new(input.runner_id), + input.name.clone(), + )?; + + tx.write( + &keys::runner::KeyKey::new(input.runner_id), + input.key.clone(), + )?; + + tx.write( + &keys::runner::VersionKey::new(input.runner_id), + input.version, + )?; + + tx.write(&remaining_slots_key, input.total_slots)?; + + tx.write( + &keys::runner::TotalSlotsKey::new(input.runner_id), + input.total_slots, + )?; + + tx.write( + &keys::runner::CreateTsKey::new(input.runner_id), + input.create_ts, + )?; + + tx.write(&last_ping_ts_key, now)?; + + // Populate ns indexes + tx.write( + &keys::ns::ActiveRunnerKey::new( + input.namespace_id, + input.create_ts, + input.runner_id, + ), + ctx.workflow_id(), + )?; + tx.write( + &keys::ns::ActiveRunnerByNameKey::new( + input.namespace_id, + input.name.clone(), + input.create_ts, + input.runner_id, + ), + ctx.workflow_id(), + )?; + tx.write( + &keys::ns::AllRunnerKey::new( + input.namespace_id, + input.create_ts, + input.runner_id, + ), + ctx.workflow_id(), + )?; + tx.write( + &keys::ns::AllRunnerByNameKey::new( + input.namespace_id, + input.name.clone(), + input.create_ts, + input.runner_id, + ), + ctx.workflow_id(), + )?; + + // Write name into namespace runner names list + tx.write( + &keys::ns::RunnerNameKey::new(input.namespace_id, input.name.clone()), + (), + )?; + + (input.total_slots, now) + }; + + // Set last connect ts + tx.write(&keys::runner::ConnectedTsKey::new(input.runner_id), now)?; + + let remaining_millislots = (remaining_slots * 1000) / input.total_slots; + + // Insert into index (same as the `update_alloc_idx` op with `AddIdx`) + tx.write( + &keys::ns::RunnerAllocIdxKey::new( + input.namespace_id, + input.name.clone(), + input.version, + remaining_millislots, + last_ping_ts, + input.runner_id, + ), + rivet_data::converted::RunnerAllocIdxKeyData { + workflow_id: ctx.workflow_id(), + remaining_slots, + total_slots: input.total_slots, + }, + )?; + + Ok(()) + }) + .custom_instrument(tracing::info_span!("runner_insert_tx")) + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct ClearDbInput { + runner_id: Id, + name: String, + key: String, + update_state: RunnerState, +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +enum RunnerState { + Draining, + Stopped, +} + +#[activity(ClearDb)] +async fn clear_db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> { + let state = ctx.state::()?; + let namespace_id = state.namespace_id; + let create_ts = state.create_ts; + + // TODO: Combine into a single udb txn + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + let now = util::timestamp::now(); + + // Clear runner by key idx if its still the current runner + let runner_by_key_key = + keys::ns::RunnerByKeyKey::new(namespace_id, input.name.clone(), input.key.clone()); + let runner_id = tx + .read_opt(&runner_by_key_key, Serializable) + .await? + .map(|x| x.runner_id); + if runner_id == Some(input.runner_id) { + tx.delete(&runner_by_key_key); + } + + match input.update_state { + RunnerState::Draining => { + tx.write(&keys::runner::DrainTsKey::new(input.runner_id), now)?; + tx.write(&keys::runner::ExpiredTsKey::new(input.runner_id), now)?; + } + RunnerState::Stopped => { + tx.write(&keys::runner::StopTsKey::new(input.runner_id), now)?; + + // Update namespace indexes + tx.delete(&keys::ns::ActiveRunnerKey::new( + namespace_id, + create_ts, + input.runner_id, + )); + tx.delete(&keys::ns::ActiveRunnerByNameKey::new( + namespace_id, + input.name.clone(), + create_ts, + input.runner_id, + )); + } + } + + Ok(()) + }) + .custom_instrument(tracing::info_span!("runner_clear_tx")) + .await?; + + // Does not clear the data keys like last ping ts, just the allocation idx + ctx.op(crate::ops::runner::update_alloc_idx::Input { + runners: vec![crate::ops::runner::update_alloc_idx::Runner { + runner_id: input.runner_id, + action: crate::ops::runner::update_alloc_idx::Action::ClearIdx, + }], + }) + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct ProcessInitInput { + runner_id: Id, + namespace_id: Id, + last_command_idx: i64, + prepopulate_actor_names: Option>, + metadata: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +struct ProcessInitOutput { + last_event_idx: i64, + missed_commands: Vec, +} + +#[activity(ProcessInit)] +async fn process_init(ctx: &ActivityCtx, input: &ProcessInitInput) -> Result { + let state = ctx.state::()?; + + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + // Populate actor names if provided + if let Some(actor_names) = &input.prepopulate_actor_names { + // Write each actor name into the namespace actor names list + for (name, data) in actor_names { + let metadata = + serde_json::from_str::>( + &data.metadata, + ) + .unwrap_or_default(); + + tx.write( + &keys::ns::ActorNameKey::new(input.namespace_id, name.clone()), + ActorNameKeyData { metadata }, + )?; + } + } + + if let Some(metadata) = &input.metadata { + let metadata = MetadataKeyData { + metadata: serde_json::from_str::>( + &metadata, + ) + .unwrap_or_default(), + }; + + let metadata_key = keys::runner::MetadataKey::new(input.runner_id); + + // Clear old metadata + tx.delete_key_subspace(&metadata_key); + + // Write metadata + for (i, chunk) in metadata_key.split(metadata)?.into_iter().enumerate() { + let chunk_key = metadata_key.chunk(i); + + tx.set(&tx.pack(&chunk_key), &chunk); + } + } + + Ok(()) + }) + .custom_instrument(tracing::info_span!("runner_process_init_tx")) + .await?; + + Ok(ProcessInitOutput { + last_event_idx: state.last_event_idx, + missed_commands: state + .commands + .iter() + .filter(|row| row.index > input.last_command_idx) + .map(|row| protocol::CommandWrapper { + index: row.index, + inner: row.command.clone(), + }) + .collect(), + }) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct InsertEventsInput { + events: Vec, +} + +#[activity(InsertEvents)] +async fn insert_events(ctx: &ActivityCtx, input: &InsertEventsInput) -> Result<()> { + let last_event_idx = if let Some(last_event_wrapper) = input.events.last() { + last_event_wrapper.index + } else { + return Ok(()); + }; + + let mut state = ctx.state::()?; + + // TODO: Storing events is disabled for now, otherwise state will grow indefinitely. This is only used + // for debugging anyway + // state.events.extend(input.events.into_iter().enumerate().map(|(i, event)| EventRow { + // index: event.index, + // event: event.inner, + // ack_ts: util::timestamp::now(), + // })); + + state.last_event_idx = last_event_idx; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct InsertCommandsInput { + commands: Vec, + ack_last_command_idx: i64, +} + +#[activity(InsertCommands)] +async fn insert_commands(ctx: &ActivityCtx, input: &InsertCommandsInput) -> Result> { + let mut state = ctx.state::()?; + + if input.ack_last_command_idx != -1 { + state + .commands + .retain(|row| row.index > input.ack_last_command_idx); + } + + if input.commands.is_empty() { + return Ok(None); + } + + let old_last_command_idx = state.last_command_idx; + state.commands.extend( + input + .commands + .iter() + .enumerate() + .map(|(i, command)| CommandRow { + index: old_last_command_idx + i as i64 + 1, + command: command.clone(), + create_ts: util::timestamp::now(), + }), + ); + + state.last_command_idx += input.commands.len() as i64; + + Ok(Some(old_last_command_idx + 1)) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct FetchRemainingActorsInput { + runner_id: Id, +} + +#[activity(FetchRemainingActors)] +async fn fetch_remaining_actors( + ctx: &ActivityCtx, + input: &FetchRemainingActorsInput, +) -> Result> { + let actors = ctx + .udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let actor_subspace = + keys::subspace().subspace(&keys::runner::ActorKey::subspace(input.runner_id)); + + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&actor_subspace).into() + }, + Serializable, + ) + .map(|res| { + let (key, generation) = tx.read_entry::(&res?)?; + + Ok((key.actor_id.into(), generation)) + }) + .try_collect::>() + .await + }) + .custom_instrument(tracing::info_span!("runner_fetch_remaining_actors_tx")) + .await?; + + Ok(actors) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct CheckExpiredInput { + runner_id: Id, +} + +#[activity(CheckExpired)] +async fn check_expired(ctx: &ActivityCtx, input: &CheckExpiredInput) -> Result { + let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold(); + + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + + let last_ping_ts = tx + .read( + &keys::runner::LastPingTsKey::new(input.runner_id), + Serializable, + ) + .await?; + + let now = util::timestamp::now(); + let expired = last_ping_ts < now - runner_lost_threshold; + + if expired { + tx.write(&keys::runner::ExpiredTsKey::new(input.runner_id), now)?; + } + + Ok(expired) + }) + .custom_instrument(tracing::info_span!("runner_check_expired_tx")) + .await + .map_err(Into::into) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub(crate) struct AllocatePendingActorsInput { + pub namespace_id: Id, + pub name: String, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct AllocatePendingActorsOutput { + pub allocations: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub(crate) struct ActorAllocation { + pub actor_id: Id, + pub signal: Allocate, +} + +#[activity(AllocatePendingActors)] +pub(crate) async fn allocate_pending_actors( + ctx: &ActivityCtx, + input: &AllocatePendingActorsInput, +) -> Result { + let runner_eligible_threshold = ctx.config().pegboard().runner_eligible_threshold(); + + // NOTE: This txn should closely resemble the one found in the allocate_actor activity of the actor wf + let (allocations, pending_actor_count) = ctx + .udb()? + .run(|tx| async move { + let tx = tx.with_subspace(keys::subspace()); + let mut allocations = Vec::new(); + + let pending_actor_subspace = keys::subspace().subspace( + &keys::ns::PendingActorByRunnerNameSelectorKey::subspace( + input.namespace_id, + input.name.clone(), + ), + ); + let mut queue_stream = tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::Iterator, + ..(&pending_actor_subspace).into() + }, + // NOTE: This is not Serializable because we don't want to conflict with all of the keys, just + // the one we choose + Snapshot, + ); + let mut pending_actor_count = 0; + let ping_threshold_ts = util::timestamp::now() - runner_eligible_threshold; + + 'queue_loop: loop { + let Some(queue_entry) = queue_stream.try_next().await? else { + break; + }; + + pending_actor_count += 1; + + let (queue_key, generation) = + tx.read_entry::(&queue_entry)?; + + let runner_alloc_subspace = keys::subspace().subspace( + &keys::ns::RunnerAllocIdxKey::subspace(input.namespace_id, input.name.clone()), + ); + + let mut stream = tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::Iterator, + ..(&runner_alloc_subspace).into() + }, + // NOTE: This is not Serializable because we don't want to conflict with all of the + // keys, just the one we choose + Snapshot, + ); + + let mut highest_version = None; + + loop { + let Some(entry) = stream.try_next().await? else { + break; + }; + + let (old_runner_alloc_key, old_runner_alloc_key_data) = + tx.read_entry::(&entry)?; + + if let Some(highest_version) = highest_version { + // We have passed all of the runners with the highest version. This is reachable if + // the ping of the highest version workers makes them ineligible + if old_runner_alloc_key.version < highest_version { + break; + } + } else { + highest_version = Some(old_runner_alloc_key.version); + } + + // An empty runner means we have reached the end of the runners with the highest version + if old_runner_alloc_key.remaining_millislots == 0 { + break; + } + + // Scan by last ping + if old_runner_alloc_key.last_ping_ts < ping_threshold_ts { + continue; + } + + // Add read conflict only for this runner key + tx.add_conflict_key(&old_runner_alloc_key, ConflictRangeType::Read)?; + tx.delete(&old_runner_alloc_key); + + // Add read conflict and delete the queue key + tx.add_conflict_key(&queue_key, ConflictRangeType::Read)?; + tx.delete(&queue_key); + + let new_remaining_slots = + old_runner_alloc_key_data.remaining_slots.saturating_sub(1); + let new_remaining_millislots = + (new_remaining_slots * 1000) / old_runner_alloc_key_data.total_slots; + + // Write new allocation key with 1 less slot + tx.write( + &keys::ns::RunnerAllocIdxKey::new( + input.namespace_id, + input.name.clone(), + old_runner_alloc_key.version, + new_remaining_millislots, + old_runner_alloc_key.last_ping_ts, + old_runner_alloc_key.runner_id, + ), + rivet_data::converted::RunnerAllocIdxKeyData { + workflow_id: old_runner_alloc_key_data.workflow_id, + remaining_slots: new_remaining_slots, + total_slots: old_runner_alloc_key_data.total_slots, + }, + )?; + + // Update runner record + tx.write( + &keys::runner::RemainingSlotsKey::new(old_runner_alloc_key.runner_id), + new_remaining_slots, + )?; + + // Set runner id of actor + tx.write( + &keys::actor::RunnerIdKey::new(queue_key.actor_id), + old_runner_alloc_key.runner_id, + )?; + + // Insert actor index key + tx.write( + &keys::runner::ActorKey::new( + old_runner_alloc_key.runner_id, + queue_key.actor_id, + ), + generation, + )?; + + allocations.push(ActorAllocation { + actor_id: queue_key.actor_id, + signal: Allocate { + runner_id: old_runner_alloc_key.runner_id, + runner_workflow_id: old_runner_alloc_key_data.workflow_id, + }, + }); + + pending_actor_count -= 1; + continue 'queue_loop; + } + } + + Ok((allocations, pending_actor_count)) + }) + .custom_instrument(tracing::info_span!("runner_allocate_pending_actors_tx")) + .await?; + + metrics::ACTOR_PENDING_ALLOCATION.record( + pending_actor_count as f64, + &[ + KeyValue::new("namespace_id", input.namespace_id.to_string()), + KeyValue::new("runner_name", input.name.to_string()), + ], + ); + + Ok(AllocatePendingActorsOutput { allocations }) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +struct SendMessagesToRunnerInput { + runner_id: Id, + messages: Vec, +} + +#[activity(SendMessagesToRunner)] +async fn send_messages_to_runner( + ctx: &ActivityCtx, + input: &SendMessagesToRunnerInput, +) -> Result<()> { + let receiver_subject = + crate::pubsub_subjects::RunnerReceiverSubject::new(input.runner_id).to_string(); + + for message in &input.messages { + let message_serialized = versioned::ToClient::wrap_latest(message.clone()) + .serialize_with_embedded_version(PROTOCOL_VERSION)?; + + ctx.ups()? + .publish(&receiver_subject, &message_serialized, PublishOpts::one()) + .await?; + } + + Ok(()) +} + +#[signal("pegboard_runner_check_queue")] +pub struct CheckQueue {} + +#[signal("pegboard_runner_stop")] +pub struct Stop { + pub reset_actor_rescheduling: bool, +} + +#[signal("pegboard_runner_command")] +pub struct Command { + pub inner: protocol::Command, +} + +#[signal("pegboard_runner_forward")] +pub struct Forward { + pub inner: protocol::ToServer, +} + +join_signal!(Main { + Command(Command), + // Forwarded from the ws to this workflow + Forward(Forward), + CheckQueue, + Stop, +});