diff --git a/Cargo.lock b/Cargo.lock index a941174f1f..06c3aa51e1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3523,6 +3523,7 @@ dependencies = [ "pegboard", "pegboard-actor-kv", "rivet-config", + "rivet-data", "rivet-error", "rivet-guard-core", "rivet-metrics", @@ -3534,6 +3535,7 @@ dependencies = [ "tokio", "tokio-tungstenite", "tracing", + "universaldb", "universalpubsub", "url", "vbare", diff --git a/engine/packages/api-builder/src/middleware.rs b/engine/packages/api-builder/src/middleware.rs index 67168cd2cc..df834fe88d 100644 --- a/engine/packages/api-builder/src/middleware.rs +++ b/engine/packages/api-builder/src/middleware.rs @@ -186,13 +186,7 @@ pub async fn http_logging_middleware( ); // Update metrics - metrics::API_REQUEST_PENDING.add( - -1, - &[ - KeyValue::new("method", method_clone.to_string()), - KeyValue::new("path", path_clone.clone()), - ], - ); + metrics::API_REQUEST_PENDING.add(-1, &[KeyValue::new("method", method_clone.to_string()), KeyValue::new("path", path_clone.clone())]); let error_code: String = if status.is_success() { String::new() diff --git a/engine/packages/engine/src/commands/db/mod.rs b/engine/packages/engine/src/commands/db/mod.rs index d3caff16aa..686103ee54 100644 --- a/engine/packages/engine/src/commands/db/mod.rs +++ b/engine/packages/engine/src/commands/db/mod.rs @@ -22,10 +22,6 @@ pub enum SubCommand { pub enum DatabaseType { #[clap(alias = "ch")] Clickhouse, - #[clap(alias = "wfd")] - WorkflowData, - #[clap(alias = "wfi")] - WorkflowInternal, } impl SubCommand { @@ -48,12 +44,6 @@ impl SubCommand { DatabaseType::Clickhouse => { crate::util::db::clickhouse_shell(config, shell_ctx).await? } - DatabaseType::WorkflowData => { - crate::util::db::wf_sqlite_shell(config, shell_ctx, false).await? - } - DatabaseType::WorkflowInternal => { - crate::util::db::wf_sqlite_shell(config, shell_ctx, true).await? - } } Ok(()) diff --git a/engine/packages/engine/src/commands/start.rs b/engine/packages/engine/src/commands/start.rs index b6f9200c72..c0d2922b20 100644 --- a/engine/packages/engine/src/commands/start.rs +++ b/engine/packages/engine/src/commands/start.rs @@ -115,15 +115,14 @@ async fn verify_engine_version( pools .udb()? .run(|tx| async move { - let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION")) - .context("failed to parse cargo pkg version as semver")?; + let current_version = semver::Version::parse(env!("CARGO_PKG_VERSION")).context("failed to parse cargo pkg version as semver")?; - if let Some(existing_version) = - tx.read_opt(&keys::EngineVersionKey {}, Serializable).await? - { + if let Some(existing_version) = tx.read_opt(&keys::EngineVersionKey {}, Serializable).await? { if current_version < existing_version { - return Ok(Err(anyhow!("{}", formatdoc!( - " + return Ok(Err(anyhow!( + "{}", + formatdoc!( + " Rivet Engine has been rolled back to a previous version: - Last Used Version: {existing_version} - Current Version: {current_version} @@ -131,7 +130,8 @@ async fn verify_engine_version( (If you know what you're doing, this error can be disabled in the Rivet config via `allow_version_rollback: true`) " - )))); + ) + ))); } } diff --git a/engine/packages/engine/src/util/db.rs b/engine/packages/engine/src/util/db.rs index 788cc837d3..5c03646c68 100644 --- a/engine/packages/engine/src/util/db.rs +++ b/engine/packages/engine/src/util/db.rs @@ -1,7 +1,6 @@ -use std::{path::Path, result::Result::Ok, str::FromStr}; +use std::{path::Path, result::Result::Ok}; use anyhow::*; -use rivet_util::Id; use serde_json::json; pub struct ShellQuery { @@ -74,26 +73,3 @@ pub async fn clickhouse_shell( Ok(()) } - -pub async fn wf_sqlite_shell( - config: rivet_config::Config, - shell_ctx: ShellContext<'_>, - _internal: bool, -) -> Result<()> { - let ShellContext { queries, .. } = shell_ctx; - - let _pools = rivet_pools::Pools::new(config.clone()).await?; - - // Combine all queries into one command - for ShellQuery { - svc: workflow_id, - query: _query, - } in queries - { - let _workflow_id = Id::from_str(workflow_id).context("could not parse input as Id")?; - - todo!(); - } - - Ok(()) -} diff --git a/engine/packages/guard-core/src/websocket_handle.rs b/engine/packages/guard-core/src/websocket_handle.rs index 2a3c50a4b3..fdb317a411 100644 --- a/engine/packages/guard-core/src/websocket_handle.rs +++ b/engine/packages/guard-core/src/websocket_handle.rs @@ -2,7 +2,7 @@ use anyhow::*; use futures_util::{SinkExt, StreamExt, stream::Peekable}; use hyper::upgrade::Upgraded; use hyper_tungstenite::HyperWebsocket; -use hyper_tungstenite::tungstenite::Message as WsMessage; +use hyper_tungstenite::tungstenite::Message; use hyper_util::rt::TokioIo; use std::sync::Arc; use tokio::sync::Mutex; @@ -12,7 +12,7 @@ pub type WebSocketReceiver = Peekable>>>; pub type WebSocketSender = - futures_util::stream::SplitSink>, WsMessage>; + futures_util::stream::SplitSink>, Message>; #[derive(Clone)] pub struct WebSocketHandle { @@ -31,7 +31,7 @@ impl WebSocketHandle { }) } - pub async fn send(&self, message: WsMessage) -> Result<()> { + pub async fn send(&self, message: Message) -> Result<()> { self.ws_tx.lock().await.send(message).await?; Ok(()) } diff --git a/engine/packages/guard-core/tests/simple_websocket.rs b/engine/packages/guard-core/tests/simple_websocket.rs index ead8c7a5ce..2672993d52 100644 --- a/engine/packages/guard-core/tests/simple_websocket.rs +++ b/engine/packages/guard-core/tests/simple_websocket.rs @@ -175,25 +175,25 @@ async fn start_websocket_server() -> SocketAddr { match message_result { Ok(msg) => { match &msg { - hyper_tungstenite::tungstenite::Message::Text(text) => { - println!("Server: Received text message: {}", text); - }, - hyper_tungstenite::tungstenite::Message::Binary(data) => { - println!("Server: Received binary message of {} bytes", data.len()); - }, - hyper_tungstenite::tungstenite::Message::Ping(_) => { - println!("Server: Received ping"); - }, - hyper_tungstenite::tungstenite::Message::Pong(_) => { - println!("Server: Received pong"); - }, - hyper_tungstenite::tungstenite::Message::Close(_) => { - println!("Server: Received close message"); - }, - _ => { - println!("Server: Received unknown message type"); - } - } + hyper_tungstenite::tungstenite::Message::Text(text) => { + println!("Server: Received text message: {}", text); + } + hyper_tungstenite::tungstenite::Message::Binary(data) => { + println!("Server: Received binary message of {} bytes", data.len()); + } + hyper_tungstenite::tungstenite::Message::Ping(_) => { + println!("Server: Received ping"); + } + hyper_tungstenite::tungstenite::Message::Pong(_) => { + println!("Server: Received pong"); + } + hyper_tungstenite::tungstenite::Message::Close(_) => { + println!("Server: Received close message"); + } + _ => { + println!("Server: Received unknown message type"); + } + } println!("Server: Echoing message back"); match write.send(msg).await { diff --git a/engine/packages/guard/src/routing/pegboard_gateway.rs b/engine/packages/guard/src/routing/pegboard_gateway.rs index b3501b4bd2..45ed922d24 100644 --- a/engine/packages/guard/src/routing/pegboard_gateway.rs +++ b/engine/packages/guard/src/routing/pegboard_gateway.rs @@ -210,7 +210,7 @@ async fn route_request_inner( .to_workflow_id(actor.workflow_id) .graceful_not_found() .send() - .await; + .await?; if res.is_none() { tracing::warn!( diff --git a/engine/packages/metrics/src/buckets.rs b/engine/packages/metrics/src/buckets.rs index b719898714..720e459ff8 100644 --- a/engine/packages/metrics/src/buckets.rs +++ b/engine/packages/metrics/src/buckets.rs @@ -1,8 +1,8 @@ pub const BUCKETS: &[f64] = &[ // For otel 0.0, // Added - 0.001, 0.0025, - // Copied from https://docs.rs/prometheus/latest/src/prometheus/histogram.rs.html#25-27 + 0.001, + 0.0025, // Copied from https://docs.rs/prometheus/latest/src/prometheus/histogram.rs.html#25-27 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, // Added 25.0, 50.0, 100.0, 250.0, 500.0, ]; diff --git a/engine/packages/pegboard-gateway/src/keepalive_task.rs b/engine/packages/pegboard-gateway/src/keepalive_task.rs index 1632cd68b0..3ea3378956 100644 --- a/engine/packages/pegboard-gateway/src/keepalive_task.rs +++ b/engine/packages/pegboard-gateway/src/keepalive_task.rs @@ -52,7 +52,7 @@ pub async fn task( ctx.op(pegboard::ops::actor::hibernating_request::upsert::Input { actor_id, gateway_id, - request_id, + request_id }), // Keep alive in flight req during hibernation shared_state.keepalive_hws(request_id), diff --git a/engine/packages/pegboard-gateway/src/shared_state.rs b/engine/packages/pegboard-gateway/src/shared_state.rs index 535b3f907d..0ee669698e 100644 --- a/engine/packages/pegboard-gateway/src/shared_state.rs +++ b/engine/packages/pegboard-gateway/src/shared_state.rs @@ -527,12 +527,10 @@ impl SharedState { let reason = 'reason: { if let Some(hs) = &req.hibernation_state { if let Some(earliest_pending_ws_msg) = hs.pending_ws_msgs.first() { - if now.duration_since(earliest_pending_ws_msg.send_instant) - > HWS_MESSAGE_ACK_TIMEOUT - { + if now.duration_since(earliest_pending_ws_msg.send_instant) > HWS_MESSAGE_ACK_TIMEOUT { break 'reason Some(MsgGcReason::WebSocketMessageNotAcked { first_msg_index: earliest_pending_ws_msg.message_index, - last_msg_index: req.message_index + last_msg_index: req.message_index, }); } } @@ -543,7 +541,7 @@ impl SharedState { timeout=%hibernation_timeout.as_secs_f64(), "checking hibernating state elapsed time" ); - if hs_elapsed> hibernation_timeout { + if hs_elapsed > hibernation_timeout { break 'reason Some(MsgGcReason::HibernationTimeout); } } else if req.msg_tx.is_closed() { diff --git a/engine/packages/pegboard-runner/Cargo.toml b/engine/packages/pegboard-runner/Cargo.toml index b2df569e73..6a618c779f 100644 --- a/engine/packages/pegboard-runner/Cargo.toml +++ b/engine/packages/pegboard-runner/Cargo.toml @@ -23,15 +23,17 @@ rivet-guard-core.workspace = true rivet-metrics.workspace = true rivet-runner-protocol.workspace = true rivet-runtime.workspace = true -serde.workspace = true -serde_json.workspace = true +rivet-data.workspace = true serde_bare.workspace = true -tokio.workspace = true +serde_json.workspace = true +serde.workspace = true tokio-tungstenite.workspace = true +tokio.workspace = true tracing.workspace = true +universaldb.workspace = true +universalpubsub.workspace = true url.workspace = true vbare.workspace = true -universalpubsub.workspace = true pegboard.workspace = true pegboard-actor-kv.workspace = true diff --git a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs index 8244d3fcb7..678e21d1d4 100644 --- a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs +++ b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs @@ -11,21 +11,23 @@ const GC_INTERVAL: Duration = Duration::from_secs(30); const MAX_LAST_SEEN: Duration = Duration::from_secs(30); struct Channel { - tx: mpsc::UnboundedSender, + tx: mpsc::UnboundedSender, handle: JoinHandle<()>, last_seen: Instant, } pub struct ActorEventDemuxer { ctx: StandaloneCtx, + runner_id: Id, channels: HashMap, last_gc: Instant, } impl ActorEventDemuxer { - pub fn new(ctx: StandaloneCtx) -> Self { + pub fn new(ctx: StandaloneCtx, runner_id: Id) -> Self { Self { ctx, + runner_id, channels: HashMap::new(), last_gc: Instant::now(), } @@ -33,13 +35,14 @@ impl ActorEventDemuxer { /// Process an event by routing it to the appropriate actor's queue #[tracing::instrument(skip_all)] - pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::Event) { + pub fn ingest(&mut self, actor_id: Id, event: protocol::mk2::EventWrapper) { if let Some(channel) = self.channels.get(&actor_id) { let _ = channel.tx.send(event); } else { let (tx, mut rx) = mpsc::unbounded_channel(); let ctx = self.ctx.clone(); + let runner_id = self.runner_id; let handle = tokio::spawn(async move { loop { let mut buffer = Vec::new(); @@ -49,7 +52,7 @@ impl ActorEventDemuxer { break; } - if let Err(err) = dispatch_events(&ctx, actor_id, buffer).await { + if let Err(err) = dispatch_events(&ctx, runner_id, actor_id, buffer).await { tracing::error!(?err, "actor event processor failed"); break; } @@ -106,11 +109,12 @@ impl ActorEventDemuxer { async fn dispatch_events( ctx: &StandaloneCtx, + runner_id: Id, actor_id: Id, - events: Vec, + events: Vec, ) -> Result<()> { let res = ctx - .signal(pegboard::workflows::actor::Events { inner: events }) + .signal(pegboard::workflows::actor::Events { runner_id, events }) .tag("actor_id", actor_id) .graceful_not_found() .send() diff --git a/engine/packages/pegboard-runner/src/conn.rs b/engine/packages/pegboard-runner/src/conn.rs index 573ff52d44..e9310cbc96 100644 --- a/engine/packages/pegboard-runner/src/conn.rs +++ b/engine/packages/pegboard-runner/src/conn.rs @@ -5,12 +5,15 @@ use std::{ use anyhow::Context; use futures_util::StreamExt; +use futures_util::TryStreamExt; use gas::prelude::Id; use gas::prelude::*; use hyper_tungstenite::tungstenite::Message; use pegboard::ops::runner::update_alloc_idx::{Action, RunnerEligibility}; +use rivet_data::converted::{ActorNameKeyData, MetadataKeyData}; use rivet_guard_core::WebSocketHandle; use rivet_runner_protocol::{self as protocol, versioned}; +use universaldb::prelude::*; use vbare::OwnedVersionedData; use crate::{errors::WsError, utils::UrlData}; @@ -50,7 +53,7 @@ pub async fn init_conn( let mut ws_rx = ws_rx.lock().await; // Receive init packet - let (runner_name, runner_id, workflow_id) = if let Some(msg) = + let (runner_name, runner_id, workflow_id, init) = if let Some(msg) = tokio::time::timeout(Duration::from_secs(5), ws_rx.next()) .await .map_err(|_| WsError::TimedOutWaitingForInit.build())? @@ -64,147 +67,107 @@ pub async fn init_conn( } }; - let init_packet = versioned::ToServer::deserialize(&buf, protocol_version) - .map_err(|err| WsError::InvalidPacket(err.to_string()).build()) - .context("failed to deserialize initial packet from client")?; - - let (runner_name, runner_id, workflow_id) = - if let protocol::ToServer::ToServerInit(protocol::ToServerInit { - name, - version, - total_slots, - .. - }) = &init_packet - { - // Look up existing runner by key - let existing_runner = ctx - .op(pegboard::ops::runner::get_by_key::Input { - namespace_id: namespace.namespace_id, - name: name.clone(), - key: runner_key.clone(), - }) - .await - .with_context(|| { - format!( - "failed to get existing runner by key: {}:{}", - name, runner_key - ) - })?; - - let runner_id = if let Some(runner) = existing_runner.runner { - // IMPORTANT: Before we spawn/get the workflow, we try to update the runner's last ping ts. - // This ensures if the workflow is currently checking for expiry that it will not expire - // (because we are about to send signals to it) and if it is already expired (but not - // completed) we can choose a new runner id. - let update_ping_res = ctx - .op(pegboard::ops::runner::update_alloc_idx::Input { - runners: vec![pegboard::ops::runner::update_alloc_idx::Runner { - runner_id: runner.runner_id, - action: Action::UpdatePing { rtt: 0 }, - }], - }) - .await - .with_context(|| { - format!("failed to update ping for runner: {}", runner.runner_id) - })?; - - if update_ping_res - .notifications - .into_iter() - .next() - .map(|notif| matches!(notif.eligibility, RunnerEligibility::Expired)) - .unwrap_or_default() - { - // Runner expired, create a new one - Id::new_v1(ctx.config().dc_label()) - } else { - // Use existing runner - runner.runner_id - } - } else { - // No existing runner for this key, create a new one - Id::new_v1(ctx.config().dc_label()) - }; - - // Spawn a new runner workflow if one doesn't already exist - let workflow_id = if protocol::is_mk2(protocol_version) { - ctx.workflow(pegboard::workflows::runner2::Input { - runner_id, - namespace_id: namespace.namespace_id, - name: name.clone(), - key: runner_key.clone(), - version: version.clone(), - total_slots: *total_slots, - protocol_version, - }) - .tag("runner_id", runner_id) - .unique() - .dispatch() - .await - .with_context(|| { - format!( - "failed to dispatch runner workflow for runner: {}", - runner_id - ) - })? - } else { - ctx.workflow(pegboard::workflows::runner::Input { - runner_id, - namespace_id: namespace.namespace_id, - name: name.clone(), - key: runner_key.clone(), - version: version.clone(), - total_slots: *total_slots, - }) - .tag("runner_id", runner_id) - .unique() - .dispatch() - .await - .with_context(|| { - format!( - "failed to dispatch runner workflow for runner: {}", - runner_id - ) - })? - }; - - (name.clone(), runner_id, workflow_id) - } else { - tracing::debug!(?init_packet, "invalid initial packet"); - return Err(WsError::InvalidInitialPacket("must be `ToServer::Init`").build()); - }; + let init = Init::new(&buf, protocol_version)?; - if protocol::is_mk2(protocol_version) { - ctx.signal(pegboard::workflows::runner2::Init {}) - .to_workflow_id(workflow_id) - .send() + // Look up existing runner by key + let existing_runner = ctx + .op(pegboard::ops::runner::get_by_key::Input { + namespace_id: namespace.namespace_id, + name: init.name().to_string(), + key: runner_key.clone(), + }) + .await + .with_context(|| { + format!( + "failed to get existing runner by key: {}:{}", + init.name(), + runner_key + ) + })?; + + let runner_id = if let Some(runner) = existing_runner.runner { + // IMPORTANT: Before we spawn/get the workflow, we try to update the runner's last ping ts. + // This ensures if the workflow is currently checking for expiry that it will not expire + // (because we are about to send signals to it) and if it is already expired (but not + // completed) we can choose a new runner id. + let update_ping_res = ctx + .op(pegboard::ops::runner::update_alloc_idx::Input { + runners: vec![pegboard::ops::runner::update_alloc_idx::Runner { + runner_id: runner.runner_id, + action: Action::UpdatePing { rtt: 0 }, + }], + }) .await .with_context(|| { - format!( - "failed to forward initial packet to workflow: {}", - workflow_id - ) + format!("failed to update ping for runner: {}", runner.runner_id) })?; + + if update_ping_res + .notifications + .into_iter() + .next() + .map(|notif| matches!(notif.eligibility, RunnerEligibility::Expired)) + .unwrap_or_default() + { + // Runner expired, create a new one + Id::new_v1(ctx.config().dc_label()) + } else { + // Use existing runner + runner.runner_id + } } else { - // Forward to runner wf - ctx.signal(pegboard::workflows::runner::Forward { inner: init_packet }) - .to_workflow_id(workflow_id) - .send() - .await - .with_context(|| { - format!( - "failed to forward initial packet to workflow: {}", - workflow_id - ) - })?; - } + // No existing runner for this key, create a new one + Id::new_v1(ctx.config().dc_label()) + }; - (runner_name, runner_id, workflow_id) + // Spawn a new runner workflow if one doesn't already exist + let workflow_id = if protocol::is_mk2(protocol_version) { + ctx.workflow(pegboard::workflows::runner2::Input { + runner_id, + namespace_id: namespace.namespace_id, + name: init.name().to_string(), + key: runner_key.clone(), + version: init.version(), + total_slots: init.total_slots(), + protocol_version, + }) + .tag("runner_id", runner_id) + .unique() + .dispatch() + .await + .with_context(|| { + format!( + "failed to dispatch runner workflow for runner: {}", + runner_id + ) + })? + } else { + ctx.workflow(pegboard::workflows::runner::Input { + runner_id, + namespace_id: namespace.namespace_id, + name: init.name().to_string(), + key: runner_key.clone(), + version: init.version(), + total_slots: init.total_slots(), + }) + .tag("runner_id", runner_id) + .unique() + .dispatch() + .await + .with_context(|| { + format!( + "failed to dispatch runner workflow for runner: {}", + runner_id + ) + })? + }; + + (init.name().to_string(), runner_id, workflow_id, init) } else { return Err(WsError::ConnectionClosed.build()); }; - Ok(Arc::new(Conn { + let conn = Arc::new(Conn { namespace_id: namespace.namespace_id, runner_name, runner_key, @@ -213,5 +176,213 @@ pub async fn init_conn( protocol_version, ws_handle, last_rtt: AtomicU32::new(0), - })) + }); + + match init { + Init::Mk2(init) => handle_init(ctx, &conn, init).await?, + Init::Mk1(init) => { + // Forward to runner wf + ctx.signal(pegboard::workflows::runner::Forward { + inner: protocol::ToServer::ToServerInit(init), + }) + .to_workflow_id(workflow_id) + .send() + .await + .with_context(|| { + format!( + "failed to forward initial packet to workflow: {}", + workflow_id + ) + })?; + } + } + + Ok(conn) +} + +enum Init { + Mk2(protocol::mk2::ToServerInit), + Mk1(protocol::ToServerInit), +} + +impl Init { + fn new(buf: &[u8], protocol_version: u16) -> Result { + if protocol::is_mk2(protocol_version) { + let init_packet = versioned::ToServerMk2::deserialize(&buf, protocol_version) + .map_err(|err| WsError::InvalidPacket(err.to_string()).build()) + .context("failed to deserialize initial packet from client")?; + + let protocol::mk2::ToServer::ToServerInit(init) = init_packet else { + tracing::debug!(?init_packet, "invalid initial packet"); + return Err(WsError::InvalidInitialPacket("must be `ToServer::Init`").build()); + }; + + Ok(Init::Mk2(init)) + } else { + let init_packet = versioned::ToServer::deserialize(&buf, protocol_version) + .map_err(|err| WsError::InvalidPacket(err.to_string()).build()) + .context("failed to deserialize initial packet from client")?; + + let protocol::ToServer::ToServerInit(init) = init_packet else { + tracing::debug!(?init_packet, "invalid initial packet"); + return Err(WsError::InvalidInitialPacket("must be `ToServer::Init`").build()); + }; + + Ok(Init::Mk1(init)) + } + } + + fn name(&self) -> &str { + match self { + Init::Mk2(init) => &init.name, + Init::Mk1(init) => &init.name, + } + } + + fn version(&self) -> u32 { + match self { + Init::Mk2(init) => init.version, + Init::Mk1(init) => init.version, + } + } + + fn total_slots(&self) -> u32 { + match self { + Init::Mk2(init) => init.total_slots, + Init::Mk1(init) => init.total_slots, + } + } +} + +pub async fn handle_init( + ctx: &StandaloneCtx, + conn: &Conn, + init: protocol::mk2::ToServerInit, +) -> Result<()> { + // We send the signal first because we don't want to continue if this fails + ctx.signal(pegboard::workflows::runner2::Init {}) + .to_workflow_id(conn.workflow_id) + .send() + .await + .with_context(|| { + format!( + "failed to send signal to runner workflow: {}", + conn.workflow_id + ) + })?; + + let missed_commands = ctx + .udb()? + .run(|tx| { + let init = init.clone(); + async move { + let tx = tx.with_subspace(pegboard::keys::subspace()); + + // Populate actor names if provided + if let Some(actor_names) = &init.prepopulate_actor_names { + // Write each actor name into the namespace actor names list + for (name, data) in actor_names { + let metadata = serde_json::from_str::< + serde_json::Map, + >(&data.metadata) + .unwrap_or_default(); + + tx.write( + &pegboard::keys::ns::ActorNameKey::new(conn.namespace_id, name.clone()), + ActorNameKeyData { metadata }, + )?; + } + } + + if let Some(metadata) = &init.metadata { + let metadata = MetadataKeyData { + metadata: + serde_json::from_str::>( + &metadata, + ) + .unwrap_or_default(), + }; + + let metadata_key = pegboard::keys::runner::MetadataKey::new(conn.runner_id); + + // Clear old metadata + tx.delete_key_subspace(&metadata_key); + + // Write metadata + for (i, chunk) in metadata_key.split(metadata)?.into_iter().enumerate() { + let chunk_key = metadata_key.chunk(i); + + tx.set(&tx.pack(&chunk_key), &chunk); + } + } + + let runner_actor_commands_subspace = pegboard::keys::subspace().subspace( + &pegboard::keys::runner::ActorCommandKey::subspace(conn.runner_id), + ); + + // Read missed commands + tx.get_ranges_keyvalues( + RangeOption { + mode: StreamingMode::WantAll, + ..(&runner_actor_commands_subspace).into() + }, + Serializable, + ) + .map(|res| { + let (key, command) = + tx.read_entry::(&res?)?; + match command { + protocol::mk2::ActorCommandKeyData::CommandStartActor(x) => { + Ok(protocol::mk2::CommandWrapper { + checkpoint: protocol::mk2::ActorCheckpoint { + actor_id: key.actor_id.to_string(), + index: key.index, + }, + inner: protocol::mk2::Command::CommandStartActor(x), + }) + } + protocol::mk2::ActorCommandKeyData::CommandStopActor(x) => { + Ok(protocol::mk2::CommandWrapper { + checkpoint: protocol::mk2::ActorCheckpoint { + actor_id: key.actor_id.to_string(), + index: key.index, + }, + inner: protocol::mk2::Command::CommandStopActor(x), + }) + } + } + }) + .try_collect::>() + .await + } + }) + .custom_instrument(tracing::info_span!("runner_process_init_tx")) + .await?; + + // Send init packet + let init_msg = versioned::ToClientMk2::wrap_latest(protocol::mk2::ToClient::ToClientInit( + protocol::mk2::ToClientInit { + runner_id: conn.runner_id.to_string(), + metadata: protocol::mk2::ProtocolMetadata { + runner_lost_threshold: ctx.config().pegboard().runner_lost_threshold(), + }, + }, + )); + let init_msg_serialized = init_msg.serialize(conn.protocol_version)?; + conn.ws_handle + .send(Message::Binary(init_msg_serialized.into())) + .await?; + + // Send missed commands + if !missed_commands.is_empty() { + let msg = versioned::ToClientMk2::wrap_latest(protocol::mk2::ToClient::ToClientCommands( + missed_commands, + )); + let msg_serialized = msg.serialize(conn.protocol_version)?; + conn.ws_handle + .send(Message::Binary(msg_serialized.into())) + .await?; + } + + Ok(()) } diff --git a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs index 5920570ad4..2dee97fe1c 100644 --- a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs @@ -2,9 +2,7 @@ use anyhow::Result; use gas::prelude::*; use hyper_tungstenite::tungstenite::Message; use pegboard::pubsub_subjects::GatewayReceiverSubject; -use rivet_runner_protocol::{ - self as protocol, PROTOCOL_MK1_VERSION, PROTOCOL_MK2_VERSION, versioned, -}; +use rivet_runner_protocol::{self as protocol, PROTOCOL_MK2_VERSION, versioned}; use std::sync::Arc; use tokio::sync::watch; use universalpubsub as ups; diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 7bb98b9466..67db731aab 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -3,17 +3,15 @@ use bytes::Bytes; use futures_util::TryStreamExt; use gas::prelude::Id; use gas::prelude::*; -use hyper_tungstenite::tungstenite::Message as WsMessage; use hyper_tungstenite::tungstenite::Message; use pegboard::pubsub_subjects::GatewayReceiverSubject; use pegboard::utils::event_actor_id; use pegboard_actor_kv as kv; use rivet_guard_core::websocket_handle::WebSocketReceiver; -use rivet_runner_protocol::{ - self as protocol, PROTOCOL_MK1_VERSION, PROTOCOL_MK2_VERSION, versioned, -}; +use rivet_runner_protocol::{self as protocol, PROTOCOL_MK2_VERSION, versioned}; use std::sync::{Arc, atomic::Ordering}; use tokio::sync::{Mutex, MutexGuard, watch}; +use universaldb::utils::end_of_key_range; use universalpubsub::PublishOpts; use universalpubsub::Subscriber; use vbare::OwnedVersionedData; @@ -28,7 +26,7 @@ pub async fn task( eviction_sub2: Subscriber, ws_to_tunnel_abort_rx: watch::Receiver<()>, ) -> Result { - let mut event_demuxer = ActorEventDemuxer::new(ctx.clone()); + let mut event_demuxer = ActorEventDemuxer::new(ctx.clone(), conn.runner_id); let res = task_inner( ctx, @@ -97,7 +95,7 @@ async fn recv_msg( }; match msg { - WsMessage::Binary(data) => { + Message::Binary(data) => { tracing::trace!( data_len = data.len(), "received binary message from WebSocket" @@ -105,7 +103,7 @@ async fn recv_msg( Ok(Ok(Some(data))) } - WsMessage::Close(_) => { + Message::Close(_) => { tracing::debug!("websocket closed"); return Ok(Err(LifecycleResult::Closed)); } @@ -388,67 +386,22 @@ async fn handle_message_mk2( } } protocol::mk2::ToServer::ToServerTunnelMessage(tunnel_msg) => { - handle_tunnel_message_mk2(&ctx, &conn, tunnel_msg) + handle_tunnel_message_mk2(&ctx, tunnel_msg) .await .context("failed to handle tunnel message")?; } - protocol::mk2::ToServer::ToServerInit(init) => { - // We send the signal first because we don't want to continue if this fails - ctx.signal(pegboard::workflows::runner2::Init {}) - .to_workflow_id(conn.workflow_id) - .send() - .await - .with_context(|| { - format!( - "failed to send signal to runner workflow: {}", - conn.workflow_id - ) - })?; - - let init_data = ctx - .activity(ProcessInitInput { - runner_id: conn.runner_id, - namespace_id: conn.namespace_id, - last_command_idx: init.last_command_idx.unwrap_or(-1), - prepopulate_actor_names: init.prepopulate_actor_names, - metadata: init.metadata, - }) - .await?; - - // Send init packet - let init_msg = versioned::ToClientMk2::wrap_latest( - protocol::mk2::ToClient::ToClientInit(protocol::mk2::ToClientInit { - runner_id: conn.runner_id.to_string(), - last_event_idx: init_data.last_event_idx, - metadata: protocol::mk2::ProtocolMetadata { - runner_lost_threshold: ctx.config().pegboard().runner_lost_threshold(), - }, - }), - ); - let init_msg_serialized = init_msg.serialize(conn.protocol_version)?; - conn.ws_handle - .send(Message::Binary(init_msg_serialized.into())) - .await?; - - // Send missed commands - if !init_data.missed_commands.is_empty() { - let msg = versioned::ToClientMk2::wrap_latest( - protocol::mk2::ToClient::ToClientCommands(init_data.missed_commands), - ); - let msg_serialized = msg.serialize(conn.protocol_version)?; - conn.ws_handle - .send(Message::Binary(msg_serialized.into())) - .await?; - } + // NOTE: This does not process the first init event. See `conn::init_conn` + protocol::mk2::ToServer::ToServerInit(_) => { + tracing::debug!("received additional init packet, ignoring"); } // Forward to actor wf protocol::mk2::ToServer::ToServerEvents(events) => { for event in events { - event_demuxer.ingest(Id::parse(event_actor_id(&event.inner))?, event.inner); + event_demuxer.ingest(Id::parse(event_actor_id(&event.inner))?, event); } } - protocol::mk2::ToServer::ToServerAckCommands(_) => { - ack_commands(&ctx).await?; + protocol::mk2::ToServer::ToServerAckCommands(ack) => { + ack_commands(&ctx, conn.runner_id, ack).await?; } protocol::mk2::ToServer::ToServerStopping => { ctx.signal(pegboard::workflows::runner2::Stop { @@ -793,21 +746,43 @@ async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Conn, msg: Bytes) -> Res Ok(()) } -async fn ack_commands(ctx: &StandaloneCtx) -> Result<()> { - // ctx.udb()?.run(|| { - // let last_ack = ; - // let stream = .read_ranges_keyvalues({ - // limit: - // }); - // }).await?; +async fn ack_commands( + ctx: &StandaloneCtx, + runner_id: Id, + ack: protocol::mk2::ToServerAckCommands, +) -> Result<()> { + ctx.udb()? + .run(|tx| { + let ack = ack.clone(); + async move { + tx.with_subspace(pegboard::keys::subspace()); + + for checkpoint in &ack.last_command_checkpoints { + let start = tx.pack( + &pegboard::keys::runner::ActorCommandKey::subspace_with_actor( + runner_id, + Id::parse(&checkpoint.actor_id)?, + ), + ); + let end = end_of_key_range(&tx.pack( + &pegboard::keys::runner::ActorCommandKey::subspace_with_index( + runner_id, + Id::parse(&checkpoint.actor_id)?, + checkpoint.index, + ), + )); + tx.clear_range(&start, &end); + } - todo!(); + Ok(()) + } + }) + .await } #[tracing::instrument(skip_all)] async fn handle_tunnel_message_mk2( ctx: &StandaloneCtx, - conn: &Conn, msg: protocol::mk2::ToServerTunnelMessage, ) -> Result<()> { // Publish message to UPS diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs index 720d43461d..c2c1c7bbf9 100644 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ b/engine/packages/pegboard-serverless/src/lib.rs @@ -70,9 +70,7 @@ async fn tick( .run(|tx| async move { let tx = tx.with_subspace(keys::subspace()); - let serverless_desired_subspace = keys::subspace().subspace( - &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace(), - ); + let serverless_desired_subspace = keys::subspace().subspace(&rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::entire_subspace()); tx.get_ranges_keyvalues( universaldb::RangeOption { @@ -84,8 +82,7 @@ async fn tick( ) .map(|res| match res { Ok(entry) => { - let (key, desired_slots) = - tx.read_entry::(&entry)?; + let (key, desired_slots) = tx.read_entry::(&entry)?; Ok((key.namespace_id, key.runner_name, desired_slots)) } @@ -372,9 +369,7 @@ async fn outbound_handler( if runner_id.is_none() { let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = - protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data) - .context("invalid payload")?; + let payload = protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data).context("invalid payload")?; match payload { protocol::mk2::ToServerlessServer::ToServerlessServerInit(init) => { @@ -437,11 +432,7 @@ async fn outbound_handler( // send it now if runner_id.is_none() { let data = BASE64.decode(msg.data).context("invalid base64 message")?; - let payload = - protocol::versioned::ToServerlessServer::deserialize_with_embedded_version( - &data, - ) - .context("invalid payload")?; + let payload = protocol::versioned::ToServerlessServer::deserialize_with_embedded_version(&data).context("invalid payload")?; match payload { protocol::mk2::ToServerlessServer::ToServerlessServerInit(init) => { diff --git a/engine/packages/pegboard/src/keys/runner.rs b/engine/packages/pegboard/src/keys/runner.rs index e51555e4c4..50f7771f0f 100644 --- a/engine/packages/pegboard/src/keys/runner.rs +++ b/engine/packages/pegboard/src/keys/runner.rs @@ -860,3 +860,253 @@ impl<'de> TupleUnpack<'de> for MetadataChunkKey { Ok((input, v)) } } + +#[derive(Debug)] +pub struct ActorLastCommandIdxKey { + runner_id: Id, + actor_id: Id, +} + +impl ActorLastCommandIdxKey { + pub fn new(runner_id: Id, actor_id: Id) -> Self { + ActorLastCommandIdxKey { + runner_id, + actor_id, + } + } +} + +impl FormalKey for ActorLastCommandIdxKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for ActorLastCommandIdxKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + RUNNER, + DATA, + self.runner_id, + ACTOR, + LAST_COMMAND_IDX, + self.actor_id, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ActorLastCommandIdxKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, runner_id, _, _, actor_id)) = + <(usize, usize, Id, usize, usize, Id)>::unpack(input, tuple_depth)?; + let v = ActorLastCommandIdxKey { + runner_id, + actor_id, + }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct ActorLastEventIdxKey { + runner_id: Id, + actor_id: Id, +} + +impl ActorLastEventIdxKey { + pub fn new(runner_id: Id, actor_id: Id) -> Self { + ActorLastEventIdxKey { + runner_id, + actor_id, + } + } +} + +impl FormalKey for ActorLastEventIdxKey { + // Timestamp. + type Value = i64; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(i64::from_be_bytes(raw.try_into()?)) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.to_be_bytes().to_vec()) + } +} + +impl TuplePack for ActorLastEventIdxKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + RUNNER, + DATA, + self.runner_id, + ACTOR, + LAST_EVENT_IDX, + self.actor_id, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ActorLastEventIdxKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, runner_id, _, _, actor_id)) = + <(usize, usize, Id, usize, usize, Id)>::unpack(input, tuple_depth)?; + let v = ActorLastEventIdxKey { + runner_id, + actor_id, + }; + + Ok((input, v)) + } +} + +#[derive(Debug)] +pub struct ActorCommandKey { + pub runner_id: Id, + pub actor_id: Id, + pub index: i64, +} + +impl ActorCommandKey { + pub fn new(runner_id: Id, actor_id: Id, index: i64) -> Self { + ActorCommandKey { + runner_id, + actor_id, + index, + } + } + + pub fn subspace(runner_id: Id) -> ActorCommandSubspaceKey { + ActorCommandSubspaceKey::new(runner_id) + } + + pub fn subspace_with_actor(runner_id: Id, actor_id: Id) -> ActorCommandSubspaceKey { + ActorCommandSubspaceKey::new_with_actor(runner_id, actor_id) + } + + pub fn subspace_with_index(runner_id: Id, actor_id: Id, index: i64) -> ActorCommandSubspaceKey { + ActorCommandSubspaceKey::new_with_index(runner_id, actor_id, index) + } +} + +impl FormalKey for ActorCommandKey { + type Value = rivet_runner_protocol::mk2::ActorCommandKeyData; + + fn deserialize(&self, raw: &[u8]) -> Result { + rivet_runner_protocol::versioned::ActorCommandKeyData::deserialize_with_embedded_version( + raw, + ) + } + + fn serialize(&self, value: Self::Value) -> Result> { + rivet_runner_protocol::versioned::ActorCommandKeyData::wrap_latest(value) + .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_MK2_VERSION) + } +} + +impl TuplePack for ActorCommandKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let t = ( + RUNNER, + DATA, + self.runner_id, + ACTOR, + COMMAND, + self.actor_id, + self.index, + ); + t.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for ActorCommandKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (_, _, runner_id, _, _, actor_id, index)) = + <(usize, usize, Id, usize, usize, Id, i64)>::unpack(input, tuple_depth)?; + let v = ActorCommandKey { + runner_id, + actor_id, + index, + }; + + Ok((input, v)) + } +} + +pub struct ActorCommandSubspaceKey { + runner_id: Id, + actor_id: Option, + index: Option, +} + +impl ActorCommandSubspaceKey { + pub fn new(runner_id: Id) -> Self { + ActorCommandSubspaceKey { + runner_id, + actor_id: None, + index: None, + } + } + + pub fn new_with_actor(runner_id: Id, actor_id: Id) -> Self { + ActorCommandSubspaceKey { + runner_id, + actor_id: Some(actor_id), + index: None, + } + } + + pub fn new_with_index(runner_id: Id, actor_id: Id, index: i64) -> Self { + ActorCommandSubspaceKey { + runner_id, + actor_id: Some(actor_id), + index: Some(index), + } + } +} + +impl TuplePack for ActorCommandSubspaceKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = (RUNNER, DATA, self.runner_id, ACTOR, COMMAND); + offset += t.pack(w, tuple_depth)?; + + if let Some(actor_id) = &self.actor_id { + offset += actor_id.pack(w, tuple_depth)?; + + if let Some(index) = &self.index { + offset += index.pack(w, tuple_depth)?; + } + } + + Ok(offset) + } +} diff --git a/engine/packages/pegboard/src/utils.rs b/engine/packages/pegboard/src/utils.rs index 0bf3e91be7..7897c86478 100644 --- a/engine/packages/pegboard/src/utils.rs +++ b/engine/packages/pegboard/src/utils.rs @@ -30,7 +30,24 @@ pub fn event_actor_id_mk1(event: &protocol::Event) -> &str { } } -pub fn event_generation(event: &protocol::Event) -> u32 { +pub fn event_generation(event: &protocol::mk2::Event) -> u32 { + match event { + protocol::mk2::Event::EventActorIntent(protocol::mk2::EventActorIntent { + generation, + .. + }) => *generation, + protocol::mk2::Event::EventActorStateUpdate(protocol::mk2::EventActorStateUpdate { + generation, + .. + }) => *generation, + protocol::mk2::Event::EventActorSetAlarm(protocol::mk2::EventActorSetAlarm { + generation, + .. + }) => *generation, + } +} + +pub fn event_generation_mk1(event: &protocol::Event) -> u32 { match event { protocol::Event::EventActorIntent(protocol::EventActorIntent { generation, .. }) => { *generation diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index e7391a6164..74bc8434d6 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -72,30 +72,37 @@ async fn update_state_and_db( ctx.udb()? .run(|tx| { - let state = (*state).clone(); + let runner_id = state.runner_id.clone(); + let namespace_id = state.namespace_id.clone(); + let runner_name_selector = state.runner_name_selector.clone(); + let for_serverless = state.for_serverless.clone(); + let allocated_serverless_slot = state.allocated_serverless_slot.clone(); + let name = state.name.clone(); + let create_ts = state.create_ts.clone(); + let key = state.key.clone(); async move { let tx = tx.with_subspace(keys::subspace()); tx.write(&keys::actor::DestroyTsKey::new(input.actor_id), destroy_ts)?; - if let Some(runner_id) = state.runner_id { + if let Some(runner_id) = runner_id { clear_slot( input.actor_id, - state.namespace_id, - &state.runner_name_selector, + namespace_id, + &runner_name_selector, runner_id, - state.for_serverless, + for_serverless, &tx, ) .await?; - } else if state.allocated_serverless_slot { + } else if allocated_serverless_slot { // Clear the serverless slot even if we do not have a runner id. This happens when the // actor is destroyed while pending allocation tx.atomic_op( &rivet_types::keys::pegboard::ns::ServerlessDesiredSlotsKey::new( - state.namespace_id, - state.runner_name_selector.clone(), + namespace_id, + runner_name_selector.clone(), ), &(-1i64).to_le_bytes(), MutationType::Add, @@ -104,19 +111,19 @@ async fn update_state_and_db( // Update namespace indexes tx.delete(&keys::ns::ActiveActorKey::new( - state.namespace_id, - state.name.clone(), - state.create_ts, + namespace_id, + name.clone(), + create_ts, input.actor_id, )); - if let Some(k) = &state.key { + if let Some(k) = &key { tx.write( &keys::ns::ActorByKeyKey::new( - state.namespace_id, - state.name.clone(), + namespace_id, + name.clone(), k.clone(), - state.create_ts, + create_ts, input.actor_id, ), ActorByKeyKeyData { diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 79df340e42..9cadcc2a26 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use futures_util::FutureExt; use gas::prelude::*; use rivet_runner_protocol as protocol; @@ -12,6 +14,9 @@ mod setup; pub use runtime::AllocationOverride; +/// Batch size of how many events to ack. +const EVENT_ACK_BATCH_SIZE: i64 = 500; + #[derive(Clone, Debug, Serialize, Deserialize, Hash)] pub struct Input { pub actor_id: Id, @@ -26,7 +31,7 @@ pub struct Input { pub input: Option, } -#[derive(Deserialize, Serialize, Clone)] +#[derive(Deserialize, Serialize)] pub struct State { pub name: String, pub key: Option, @@ -57,6 +62,8 @@ pub struct State { // Null if not allocated pub runner_id: Option, pub runner_workflow_id: Option, + #[serde(default)] + pub runner_states: HashMap, } impl State { @@ -92,6 +99,20 @@ impl State { runner_id: None, runner_workflow_id: None, + runner_states: HashMap::new(), + } + } +} + +#[derive(Deserialize, Serialize)] +pub struct RunnerState { + pub last_command_idx: i64, +} + +impl Default for RunnerState { + fn default() -> Self { + RunnerState { + last_command_idx: -1, } } } @@ -250,56 +271,67 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }; let lifecycle_res = ctx - .loope( - lifecycle_state, - |ctx, state| { - let input = input.clone(); - - async move { - let sig = if let Some(gc_timeout_ts) = state.gc_timeout_ts { - // Listen for signal with gc timeout. if a timeout happens, it means this actor is lost - if let Some(sig) = ctx.listen_until::
(gc_timeout_ts).await? { - sig - } else { - tracing::warn!(actor_id=?input.actor_id, "actor lost"); - - // Fake signal - Main::Lost(Lost { - generation: state.generation, - force_reschedule: false, - reset_rescheduling: false, - }) - } - } else if let Some(alarm_ts) = state.alarm_ts { - // Listen for signal with timeout. if a timeout happens, it means this actor should - // wake up - if let Some(sig) = ctx.listen_until::
(alarm_ts).await? { - sig - } else { - tracing::debug!(actor_id=?input.actor_id, "actor wake"); - - // Fake signal - Main::Wake(Wake { allocation_override: AllocationOverride::DontSleep { pending_timeout: None } }) - } + .loope(lifecycle_state, |ctx, state| { + let input = input.clone(); + + async move { + let signals = if let Some(gc_timeout_ts) = state.gc_timeout_ts { + // Listen for signals with gc timeout. if a timeout happens, it means this actor is lost + let signals = ctx.listen_n_until::
(gc_timeout_ts, 1024).await?; + if signals.is_empty() { + tracing::warn!(actor_id=?input.actor_id, "actor lost"); + + // Fake signal + vec![Main::Lost(Lost { + generation: state.generation, + force_reschedule: false, + reset_rescheduling: false, + })] + } else { + signals + } + } else if let Some(alarm_ts) = state.alarm_ts { + // Listen for signals with timeout. if a timeout happens, it means this actor should + // wake up + let signals = ctx.listen_n_until::
(alarm_ts, 1024).await?; + if signals.is_empty() { + tracing::debug!(actor_id=?input.actor_id, "actor wake"); + + // Fake signal + vec![Main::Wake(Wake { + allocation_override: AllocationOverride::DontSleep { + pending_timeout: None, + }, + })] } else { - // Listen for signal normally - ctx.listen::
().await? - }; + signals + } + } else { + // Listen for signals normally + ctx.listen_n::
(1024).await? + }; + for sig in signals { match sig { + // NOTE: This is only received when allocated to mk1 runner Main::Event(sig) => { - // Ignore state updates for previous generations - if crate::utils::event_generation(&sig.inner) != state.generation { - return Ok(Loop::Continue); - } - - let (Some(runner_id), Some(runner_workflow_id), Some(runner_protocol_version)) = - (state.runner_id, state.runner_workflow_id, state.runner_protocol_version) + let ( + Some(runner_id), + Some(runner_workflow_id), + ) = ( + state.runner_id, + state.runner_workflow_id, + ) else { tracing::warn!("actor not allocated, ignoring event"); - return Ok(Loop::Continue); + continue; }; + // Ignore events for previous generations + if crate::utils::event_generation_mk1(&sig.inner) != state.generation { + continue; + } + match sig.inner { protocol::Event::EventActorIntent(protocol::EventActorIntent { intent, @@ -321,22 +353,18 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel - } else { - // Send signal to stop actor now that we know it will be sleeping - ctx.signal(crate::workflows::runner::Command { - inner: protocol::Command::CommandStopActor( - protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }, - ), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; - } + // Send signal to stop actor now that we know it will be sleeping + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStopActor( + protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }, + ), + }) + .to_workflow_id(runner_workflow_id) + .send() + .await?; } } protocol::ActorIntent::ActorIntentStop => { @@ -355,21 +383,17 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel - } else { - ctx.signal(crate::workflows::runner::Command { - inner: protocol::Command::CommandStopActor( - protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }, - ), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; - } + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStopActor( + protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }, + ), + }) + .to_workflow_id(runner_workflow_id) + .send() + .await?; } } }, @@ -398,7 +422,12 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> ctx, &input, state, - StoppedVariant::Normal { code }, + StoppedVariant::Normal { + code: match code { + protocol::StopCode::Ok => protocol::mk2::StopCode::Ok, + protocol::StopCode::Error => protocol::mk2::StopCode::Error, + } + }, ) .await? { @@ -415,6 +444,187 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } } } + // NOTE: This signal is only received when allocated to a mk2 runner + Main::Events(sig) => { + let Some(runner_id) = state.runner_id else { + tracing::warn!("actor not allocated, ignoring events"); + continue; + }; + + if sig.runner_id != runner_id { + tracing::debug!("events not from current runner, ignoring"); + continue; + } + + let seen_runner_ids = state + .runner_states + .iter() + .map(|(runner_id, _)| *runner_id) + .collect(); + // Fetch the last event index for the current runner + let last_event_idx = + state.runner_states.entry(runner_id).or_default().last_event_idx; + + // Filter already received events and events from previous generations + let generation = state.generation; + let new_events = sig.events + .iter() + .filter(|event| { + event.checkpoint.index > last_event_idx && + crate::utils::event_generation(&event.inner) == generation + }); + let new_last_event_idx = + new_events.clone().last().map(|event| event.checkpoint.index); + + for event in new_events { + match &event.inner { + protocol::mk2::Event::EventActorIntent( + protocol::mk2::EventActorIntent { intent, .. }, + ) => match intent { + protocol::mk2::ActorIntent::ActorIntentSleep => { + if !state.sleeping { + state.gc_timeout_ts = Some( + util::timestamp::now() + + ctx + .config() + .pegboard() + .actor_stop_threshold(), + ); + state.sleeping = true; + + ctx.activity(runtime::SetSleepingInput { + actor_id: input.actor_id, + }) + .await?; + + ctx.activity(runtime::InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id, + commands: vec![protocol::mk2::Command::CommandStopActor( + protocol::mk2::CommandStopActor { + generation: state.generation, + }, + )], + }) + .await?; + } + } + protocol::mk2::ActorIntent::ActorIntentStop => { + if !state.stopping { + state.gc_timeout_ts = Some( + util::timestamp::now() + + ctx + .config() + .pegboard() + .actor_stop_threshold(), + ); + state.stopping = true; + + ctx.activity(runtime::SetNotConnectableInput { + actor_id: input.actor_id, + }) + .await?; + + ctx.activity(runtime::InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id, + commands: vec![protocol::mk2::Command::CommandStopActor( + protocol::mk2::CommandStopActor { + generation: state.generation, + }, + )], + }) + .await?; + } + } + }, + protocol::mk2::Event::EventActorStateUpdate( + protocol::mk2::EventActorStateUpdate { + state: actor_state, + .. + }, + ) => match actor_state { + protocol::mk2::ActorState::ActorStateRunning => { + state.gc_timeout_ts = None; + + ctx.activity(runtime::SetStartedInput { + actor_id: input.actor_id, + }) + .await?; + + ctx.msg(Ready { runner_id }) + .tag("actor_id", input.actor_id) + .send() + .await?; + } + protocol::mk2::ActorState::ActorStateStopped( + protocol::mk2::ActorStateStopped { code, .. }, + ) => { + if let StoppedResult::Destroy = handle_stopped( + ctx, + &input, + state, + StoppedVariant::Normal { code: code.clone() }, + ) + .await? + { + return Ok(Loop::Break(runtime::LifecycleResult { + generation: state.generation, + })); + } + } + }, + protocol::mk2::Event::EventActorSetAlarm( + protocol::mk2::EventActorSetAlarm { alarm_ts, .. }, + ) => { + state.alarm_ts = *alarm_ts; + } + } + } + + if let Some(new_last_event_idx) = new_last_event_idx { + // Reborrow for mutability guarantees + let runner_state = state.runner_states.entry(runner_id).or_default(); + + runner_state.last_event_idx = runner_state.last_event_idx.max(new_last_event_idx); + + let res = ctx + .activity(runtime::InsertLastEventIdxInput { + actor_id: input.actor_id, + runner_id, + last_event_idx: runner_state.last_event_idx, + seen_runners: seen_runner_ids, + }) + .await?; + + // Ack events in batch + if runner_state.last_event_idx + > runner_state.last_event_ack_idx.saturating_add(EVENT_ACK_BATCH_SIZE) + { + runner_state.last_event_ack_idx = runner_state.last_event_idx; + + ctx.activity(runtime::SendMessagesToRunnerInput { + runner_id, + messages: vec![protocol::mk2::ToRunner::ToClientAckEvents( + protocol::mk2::ToClientAckEvents { + last_event_checkpoints: vec![ + protocol::mk2::ActorCheckpoint { + actor_id: input.actor_id.to_string(), + index: runner_state.last_event_ack_idx, + }, + ], + }, + )], + }) + .await?; + } + + // Clean up state + for runner_id in res.remove_runners { + state.runner_states.remove(&runner_id); + } + } + } Main::Wake(sig) => { if state.sleeping { if state.runner_id.is_none() { @@ -422,8 +632,13 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> state.sleeping = false; state.will_wake = false; - match runtime::reschedule_actor(ctx, &input, state, sig.allocation_override) - .await? + match runtime::reschedule_actor( + ctx, + &input, + state, + sig.allocation_override, + ) + .await? { runtime::SpawnActorOutput::Allocated { .. } => {} runtime::SpawnActorOutput::Sleep => { @@ -454,7 +669,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> Main::Lost(sig) => { // Ignore signals for previous generations if sig.generation != state.generation { - return Ok(Loop::Continue); + continue; } if sig.reset_rescheduling { @@ -479,7 +694,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> Main::GoingAway(sig) => { // Ignore signals for previous generations if sig.generation != state.generation { - return Ok(Loop::Continue); + continue; } if sig.reset_rescheduling { @@ -487,8 +702,10 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } if !state.going_away { - let (Some(runner_workflow_id), Some(runner_protocol_version)) = (state.runner_workflow_id, state.runner_protocol_version) else { - return Ok(Loop::Continue); + let (Some(runner_id), Some(runner_workflow_id), Some(runner_protocol_version)) = + (state.runner_id, state.runner_workflow_id, state.runner_protocol_version) + else { + continue; }; state.gc_timeout_ts = Some( @@ -503,13 +720,24 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> .await?; if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel + ctx.activity(runtime::InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id, + commands: vec![protocol::mk2::Command::CommandStopActor( + protocol::mk2::CommandStopActor { + generation: state.generation, + }, + )], + }) + .await?; } else { ctx.signal(crate::workflows::runner::Command { - inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }), + inner: protocol::Command::CommandStopActor( + protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }, + ), }) .to_workflow_id(runner_workflow_id) .send() @@ -519,15 +747,28 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> } Main::Destroy(_) => { // If allocated, send stop actor command - if let (Some(runner_workflow_id), Some(runner_protocol_version)) = (state.runner_workflow_id, state.runner_protocol_version) { + if let (Some(runner_id), Some(runner_workflow_id), Some(runner_protocol_version)) = + (state.runner_id, state.runner_workflow_id, state.runner_protocol_version) + { if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel + ctx.activity(runtime::InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id, + commands: vec![protocol::mk2::Command::CommandStopActor( + protocol::mk2::CommandStopActor { + generation: state.generation, + }, + )], + }) + .await?; } else { ctx.signal(crate::workflows::runner::Command { - inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }), + inner: protocol::Command::CommandStopActor( + protocol::CommandStopActor { + actor_id: input.actor_id.to_string(), + generation: state.generation, + }, + ), }) .to_workflow_id(runner_workflow_id) .send() @@ -540,12 +781,12 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> })); } } - - Ok(Loop::Continue) } - .boxed() - }, - ) + + Ok(Loop::Continue) + } + .boxed() + }) .await?; // At this point, the actor is not allocated so no cleanup related to alloc idx/desired slots needs to be @@ -566,7 +807,7 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> #[derive(Debug)] enum StoppedVariant { - Normal { code: protocol::StopCode }, + Normal { code: protocol::mk2::StopCode }, Lost { force_reschedule: bool }, } @@ -586,7 +827,7 @@ async fn handle_stopped( let force_reschedule = match &variant { StoppedVariant::Normal { code } => { // Reset retry count on successful exit - if let protocol::StopCode::Ok = code { + if let protocol::mk2::StopCode::Ok = code { state.reschedule_state = Default::default(); } @@ -598,7 +839,7 @@ async fn handle_stopped( // Clear stop gc timeout to prevent being marked as lost in the lifecycle loop state.gc_timeout_ts = None; state.stopping = false; - state.runner_id = None; + let old_runner_id = state.runner_id.take(); let old_runner_workflow_id = state.runner_workflow_id.take(); let old_runner_protocol_version = state.runner_protocol_version.take(); @@ -641,15 +882,26 @@ async fn handle_stopped( // command in case it ended up allocating if let ( StoppedVariant::Lost { .. }, + Some(old_runner_id), Some(old_runner_workflow_id), Some(old_runner_protocol_version), ) = ( &variant, + old_runner_id, old_runner_workflow_id, old_runner_protocol_version, ) { if protocol::is_mk2(old_runner_protocol_version) { - // TODO: Send message to tunnel + ctx.activity(runtime::InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id: old_runner_id, + commands: vec![protocol::mk2::Command::CommandStopActor( + protocol::mk2::CommandStopActor { + generation: state.generation, + }, + )], + }) + .await?; } else { ctx.signal(crate::workflows::runner::Command { inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { @@ -695,7 +947,7 @@ async fn handle_stopped( && matches!( variant, StoppedVariant::Normal { - code: protocol::StopCode::Ok + code: protocol::mk2::StopCode::Ok } ); @@ -787,7 +1039,8 @@ pub struct Event { #[signal("pegboard_actor_events")] pub struct Events { - pub inner: Vec, + pub runner_id: Id, + pub events: Vec, } #[signal("pegboard_actor_wake")] @@ -832,9 +1085,11 @@ join_signal!(PendingAllocation { }); join_signal!(Main { - Event(Event), + Event, + Events, Wake, Lost, GoingAway, Destroy, + // Comment to prevent invalid formatting }); diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 299e8da410..c64287886c 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -4,19 +4,39 @@ use futures_util::StreamExt; use futures_util::TryStreamExt; use gas::prelude::*; use rivet_metrics::KeyValue; -use rivet_runner_protocol::{self as protocol, PROTOCOL_MK1_VERSION}; +use rivet_runner_protocol::{ + self as protocol, PROTOCOL_MK1_VERSION, PROTOCOL_MK2_VERSION, versioned, +}; use rivet_types::{ actors::CrashPolicy, keys::namespace::runner_config::RunnerConfigVariant, runner_configs::RunnerConfigKind, }; +use std::collections::HashMap; use std::time::Instant; use universaldb::options::{ConflictRangeType, MutationType, StreamingMode}; use universaldb::utils::{FormalKey, IsolationLevel::*}; +use universalpubsub::PublishOpts; +use vbare::OwnedVersionedData; use crate::{keys, metrics}; use super::{Allocate, Destroy, Input, PendingAllocation, State, destroy}; +#[derive(Deserialize, Serialize)] +pub struct LifecycleRunnerState { + pub last_event_idx: i64, + pub last_event_ack_idx: i64, +} + +impl Default for LifecycleRunnerState { + fn default() -> Self { + LifecycleRunnerState { + last_event_idx: -1, + last_event_ack_idx: -1, + } + } +} + // TODO: Rewrite this as a series of nested structs/enums for better transparency of current state (likely // requires actor wf v2) #[derive(Deserialize, Serialize)] @@ -27,6 +47,8 @@ pub struct LifecycleState { pub runner_id: Option, pub runner_workflow_id: Option, pub runner_protocol_version: Option, + #[serde(default)] + pub runner_states: HashMap, pub sleeping: bool, #[serde(default)] @@ -58,6 +80,7 @@ impl LifecycleState { runner_id: Some(runner_id), runner_workflow_id: Some(runner_workflow_id), runner_protocol_version: Some(runner_protocol_version), + runner_states: HashMap::new(), sleeping: false, stopping: false, going_away: false, @@ -74,6 +97,7 @@ impl LifecycleState { runner_id: None, runner_workflow_id: None, runner_protocol_version: None, + runner_states: HashMap::new(), sleeping: true, stopping: false, going_away: false, @@ -537,7 +561,31 @@ pub async fn spawn_actor( .await?; if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel + ctx.activity(InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id, + commands: vec![protocol::mk2::Command::CommandStartActor( + protocol::mk2::CommandStartActor { + generation, + config: protocol::mk2::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + // HACK: We should not use dynamic timestamp here, but we don't validate if signal data + // changes (like activity inputs) so this is fine for now. + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_requests: Vec::new(), + }, + )], + }) + .await?; } else { ctx.signal(crate::workflows::runner::Command { inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { @@ -604,7 +652,29 @@ pub async fn spawn_actor( .await?; if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel + ctx.activity(InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id: sig.runner_id, + commands: vec![protocol::mk2::Command::CommandStartActor( + protocol::mk2::CommandStartActor { + generation, + config: protocol::mk2::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_requests: Vec::new(), + }, + )], + }) + .await?; } else { ctx.signal(crate::workflows::runner::Command { inner: protocol::Command::CommandStartActor( @@ -692,7 +762,29 @@ pub async fn spawn_actor( .await?; if protocol::is_mk2(runner_protocol_version) { - // TODO: Send message to tunnel + ctx.activity(InsertAndSendCommandsInput { + actor_id: input.actor_id, + runner_id: sig.runner_id, + commands: vec![protocol::mk2::Command::CommandStartActor( + protocol::mk2::CommandStartActor { + generation, + config: protocol::mk2::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_requests: Vec::new(), + }, + )], + }) + .await?; } else { ctx.signal(crate::workflows::runner::Command { inner: protocol::Command::CommandStartActor( @@ -950,3 +1042,156 @@ fn reschedule_backoff( ) -> util::backoff::Backoff { util::backoff::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count) } + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct InsertLastEventIdxInput { + pub actor_id: Id, + pub runner_id: Id, + pub last_event_idx: i64, + pub seen_runners: Vec, +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct InsertLastEventIdxOutput { + pub remove_runners: Vec, +} + +#[activity(InsertLastEventIdx)] +pub async fn insert_last_event_idx( + ctx: &ActivityCtx, + input: &InsertLastEventIdxInput, +) -> Result { + ctx.udb()? + .run(|tx| async move { + tx.write( + &keys::runner::ActorLastEventIdxKey::new(input.runner_id, input.actor_id), + input.last_event_idx, + )?; + + // Diff the list of seen runners with the list of active runners so we know which we can clean up + // state + let remove_runners = futures_util::stream::iter(input.seen_runners.clone()) + .map(|runner_id| { + let tx = tx.clone(); + async move { + if tx + .exists(&keys::runner::StopTsKey::new(runner_id), Snapshot) + .await? + { + anyhow::Ok(Some(runner_id)) + } else { + Ok(None) + } + } + }) + .buffer_unordered(1024) + .try_filter_map(|x| std::future::ready(Ok(x))) + .try_collect::>() + .await?; + + Ok(InsertLastEventIdxOutput { remove_runners }) + }) + .await +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct InsertAndSendCommandsInput { + pub actor_id: Id, + pub runner_id: Id, + pub commands: Vec, +} + +#[activity(InsertAndSendCommands)] +pub async fn insert_and_send_commands( + ctx: &ActivityCtx, + input: &InsertAndSendCommandsInput, +) -> Result<()> { + let mut state = ctx.state::()?; + + let runner_state = state.runner_states.entry(input.runner_id).or_default(); + let old_last_command_idx = runner_state.last_command_idx; + runner_state.last_command_idx += input.commands.len() as i64; + + // This does not have to be part of its own activity because the txn is idempotent + let last_command_idx = runner_state.last_command_idx; + ctx.udb()? + .run(|tx| async move { + tx.write( + &keys::runner::ActorLastCommandIdxKey::new(input.runner_id, input.actor_id), + last_command_idx, + )?; + + for (i, command) in input.commands.iter().enumerate() { + tx.write( + &keys::runner::ActorCommandKey::new( + input.runner_id, + input.actor_id, + old_last_command_idx + i as i64 + 1, + ), + match command { + protocol::mk2::Command::CommandStartActor(x) => { + protocol::mk2::ActorCommandKeyData::CommandStartActor(x.clone()) + } + protocol::mk2::Command::CommandStopActor(x) => { + protocol::mk2::ActorCommandKeyData::CommandStopActor(x.clone()) + } + }, + )?; + } + + Ok(()) + }) + .await?; + + let receiver_subject = + crate::pubsub_subjects::RunnerReceiverSubject::new(input.runner_id).to_string(); + + let message_serialized = + versioned::ToRunnerMk2::wrap_latest(protocol::mk2::ToRunner::ToClientCommands( + input + .commands + .iter() + .enumerate() + .map(|(i, command)| protocol::mk2::CommandWrapper { + checkpoint: protocol::mk2::ActorCheckpoint { + actor_id: input.actor_id.to_string(), + index: old_last_command_idx + i as i64 + 1, + }, + inner: command.clone(), + }) + .collect(), + )) + .serialize_with_embedded_version(PROTOCOL_MK2_VERSION)?; + + ctx.ups()? + .publish(&receiver_subject, &message_serialized, PublishOpts::one()) + .await?; + + Ok(()) +} + +#[derive(Debug, Serialize, Deserialize, Hash)] +pub struct SendMessagesToRunnerInput { + pub runner_id: Id, + pub messages: Vec, +} + +#[activity(SendMessagesToRunner)] +pub async fn send_messages_to_runner( + ctx: &ActivityCtx, + input: &SendMessagesToRunnerInput, +) -> Result<()> { + let receiver_subject = + crate::pubsub_subjects::RunnerReceiverSubject::new(input.runner_id).to_string(); + + for message in &input.messages { + let message_serialized = versioned::ToRunnerMk2::wrap_latest(message.clone()) + .serialize_with_embedded_version(PROTOCOL_MK2_VERSION)?; + + ctx.ups()? + .publish(&receiver_subject, &message_serialized, PublishOpts::one()) + .await?; + } + + Ok(()) +} diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index c7ac61e7e7..2d1038449d 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -119,8 +119,6 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() }) .await?; - // TODO: Periodically ack events - if state.draining || expired { return Ok(Loop::Break(())); } else { @@ -810,4 +808,5 @@ join_signal!(Main { Init, CheckQueue, Stop, + // Comment to prevent invalid formatting }); diff --git a/engine/packages/universaldb/src/driver/postgres/database.rs b/engine/packages/universaldb/src/driver/postgres/database.rs index 7cae89e26d..1ab832997d 100644 --- a/engine/packages/universaldb/src/driver/postgres/database.rs +++ b/engine/packages/universaldb/src/driver/postgres/database.rs @@ -63,12 +63,9 @@ impl PostgresDatabaseDriver { .await .context("failed to create btree_gist extension")?; - conn.execute( - "CREATE UNLOGGED SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1", - &[], - ) - .await - .context("failed to create global version sequence")?; + conn.execute("CREATE UNLOGGED SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1", &[]) + .await + .context("failed to create global version sequence")?; // Create the KV table if it doesn't exist conn.execute( diff --git a/engine/packages/universaldb/src/utils/keys.rs b/engine/packages/universaldb/src/utils/keys.rs index 2f3ceee6ac..6dfe7a099b 100644 --- a/engine/packages/universaldb/src/utils/keys.rs +++ b/engine/packages/universaldb/src/utils/keys.rs @@ -132,4 +132,7 @@ define_keys! { // 104 - RESERVED BY EE // 105 - RESERVED BY EE (106, PROTOCOL_VERSION, "protocol_version"), + (107, LAST_COMMAND_IDX, "last_command_idx"), + (108, LAST_EVENT_IDX, "last_event_idx"), + (109, COMMAND, "command"), } diff --git a/engine/sdks/rust/api-full/src/apis/actors_api.rs b/engine/sdks/rust/api-full/src/apis/actors_api.rs index b1906914f9..feb60b24a7 100644 --- a/engine/sdks/rust/api-full/src/apis/actors_api.rs +++ b/engine/sdks/rust/api-full/src/apis/actors_api.rs @@ -51,10 +51,14 @@ pub async fn create_actor( if !status.is_client_error() && !status.is_server_error() { let content = resp.text().await?; match content_type { - ContentType::Json => serde_json::from_str(&content).map_err(Error::from), - ContentType::Text => return Err(Error::from(serde_json::Error::custom("Received `text/plain` content type response that cannot be converted to `models::ActorsCreateResponse`"))), - ContentType::Unsupported(unknown_type) => return Err(Error::from(serde_json::Error::custom(format!("Received `{unknown_type}` content type response that cannot be converted to `models::ActorsCreateResponse`")))), - } + ContentType::Json => serde_json::from_str(&content).map_err(Error::from), + ContentType::Text => return Err(Error::from(serde_json::Error::custom("Received `text/plain` content type response that cannot be converted to `models::ActorsCreateResponse`"))), + ContentType::Unsupported(unknown_type) => { + return Err(Error::from(serde_json::Error::custom(format!( + "Received `{unknown_type}` content type response that cannot be converted to `models::ActorsCreateResponse`" + )))) + } + } } else { let content = resp.text().await?; let entity: Option = serde_json::from_str(&content).ok(); diff --git a/engine/sdks/rust/api-full/src/apis/ns_api.rs b/engine/sdks/rust/api-full/src/apis/ns_api.rs index 6232e204fe..e44967a724 100644 --- a/engine/sdks/rust/api-full/src/apis/ns_api.rs +++ b/engine/sdks/rust/api-full/src/apis/ns_api.rs @@ -51,10 +51,14 @@ pub async fn create_namespace( if !status.is_client_error() && !status.is_server_error() { let content = resp.text().await?; match content_type { - ContentType::Json => serde_json::from_str(&content).map_err(Error::from), - ContentType::Text => return Err(Error::from(serde_json::Error::custom("Received `text/plain` content type response that cannot be converted to `models::NamespacesCreateResponse`"))), - ContentType::Unsupported(unknown_type) => return Err(Error::from(serde_json::Error::custom(format!("Received `{unknown_type}` content type response that cannot be converted to `models::NamespacesCreateResponse`")))), - } + ContentType::Json => serde_json::from_str(&content).map_err(Error::from), + ContentType::Text => return Err(Error::from(serde_json::Error::custom("Received `text/plain` content type response that cannot be converted to `models::NamespacesCreateResponse`"))), + ContentType::Unsupported(unknown_type) => { + return Err(Error::from(serde_json::Error::custom(format!( + "Received `{unknown_type}` content type response that cannot be converted to `models::NamespacesCreateResponse`" + )))) + } + } } else { let content = resp.text().await?; let entity: Option = serde_json::from_str(&content).ok(); diff --git a/engine/sdks/rust/runner-protocol/src/versioned.rs b/engine/sdks/rust/runner-protocol/src/versioned.rs index ad1dcc9bbb..9747ee205b 100644 --- a/engine/sdks/rust/runner-protocol/src/versioned.rs +++ b/engine/sdks/rust/runner-protocol/src/versioned.rs @@ -1024,6 +1024,51 @@ impl ToServerlessServer { } } +pub enum ActorCommandKeyData { + // Only in v4 + V4(v4::ActorCommandKeyData), +} + +impl OwnedVersionedData for ActorCommandKeyData { + type Latest = v4::ActorCommandKeyData; + + fn wrap_latest(latest: v4::ActorCommandKeyData) -> Self { + ActorCommandKeyData::V4(latest) + } + + fn unwrap_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let ActorCommandKeyData::V4(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 4 => Ok(ActorCommandKeyData::V4(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + ActorCommandKeyData::V4(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } + + fn deserialize_converters() -> Vec Result> { + // No changes between v1 and v4 + vec![Ok, Ok, Ok] + } + + fn serialize_converters() -> Vec Result> { + // No changes between v1 and v4 + vec![Ok, Ok, Ok] + } +} + // Helper conversion functions fn convert_to_client_tunnel_message_kind_v1_to_v2( kind: v1::ToClientTunnelMessageKind, diff --git a/engine/sdks/schemas/runner-protocol/v4.bare b/engine/sdks/schemas/runner-protocol/v4.bare index ff1a4bba50..a91d1dfd00 100644 --- a/engine/sdks/schemas/runner-protocol/v4.bare +++ b/engine/sdks/schemas/runner-protocol/v4.bare @@ -203,6 +203,12 @@ type CommandWrapper struct { inner: Command } +# We redeclare this so its top level +type ActorCommandKeyData union { + CommandStartActor | + CommandStopActor +} + # MARK: Tunnel # Message ID @@ -330,7 +336,6 @@ type ToServerInit struct { name: str version: u32 totalSlots: u32 - lastCommandCheckpoints: list prepopulateActorNames: optional> metadata: optional } @@ -338,7 +343,7 @@ type ToServerInit struct { type ToServerEvents list type ToServerAckCommands struct { - lastCommandIdx: i64 + lastCommandCheckpoints: list } type ToServerStopping void @@ -370,7 +375,6 @@ type ProtocolMetadata struct { type ToClientInit struct { runnerId: Id - lastEventCheckpoints: list metadata: ProtocolMetadata } diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index 7e619fa402..8a47f762d8 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -978,6 +978,62 @@ export function writeCommandWrapper(bc: bare.ByteCursor, x: CommandWrapper): voi writeCommand(bc, x.inner) } +/** + * We redeclare this so its top level + */ +export type ActorCommandKeyData = + | { readonly tag: "CommandStartActor"; readonly val: CommandStartActor } + | { readonly tag: "CommandStopActor"; readonly val: CommandStopActor } + +export function readActorCommandKeyData(bc: bare.ByteCursor): ActorCommandKeyData { + const offset = bc.offset + const tag = bare.readU8(bc) + switch (tag) { + case 0: + return { tag: "CommandStartActor", val: readCommandStartActor(bc) } + case 1: + return { tag: "CommandStopActor", val: readCommandStopActor(bc) } + default: { + bc.offset = offset + throw new bare.BareError(offset, "invalid tag") + } + } +} + +export function writeActorCommandKeyData(bc: bare.ByteCursor, x: ActorCommandKeyData): void { + switch (x.tag) { + case "CommandStartActor": { + bare.writeU8(bc, 0) + writeCommandStartActor(bc, x.val) + break + } + case "CommandStopActor": { + bare.writeU8(bc, 1) + writeCommandStopActor(bc, x.val) + break + } + } +} + +export function encodeActorCommandKeyData(x: ActorCommandKeyData, config?: Partial): Uint8Array { + const fullConfig = config != null ? bare.Config(config) : DEFAULT_CONFIG + const bc = new bare.ByteCursor( + new Uint8Array(fullConfig.initialBufferLength), + fullConfig, + ) + writeActorCommandKeyData(bc, x) + return new Uint8Array(bc.view.buffer, bc.view.byteOffset, bc.offset) +} + +export function decodeActorCommandKeyData(bytes: Uint8Array): ActorCommandKeyData { + const bc = new bare.ByteCursor(bytes, DEFAULT_CONFIG) + const result = readActorCommandKeyData(bc) + if (bc.offset < bc.view.byteLength) { + throw new bare.BareError(bc.offset, "remaining bytes") + } + return result +} + export type MessageId = { /** * Globally unique ID @@ -1460,26 +1516,7 @@ export function writeToClientPing(bc: bare.ByteCursor, x: ToClientPing): void { bare.writeI64(bc, x.ts) } -function read11(bc: bare.ByteCursor): readonly ActorCheckpoint[] { - const len = bare.readUintSafe(bc) - if (len === 0) { - return [] - } - const result = [readActorCheckpoint(bc)] - for (let i = 1; i < len; i++) { - result[i] = readActorCheckpoint(bc) - } - return result -} - -function write11(bc: bare.ByteCursor, x: readonly ActorCheckpoint[]): void { - bare.writeUintSafe(bc, x.length) - for (let i = 0; i < x.length; i++) { - writeActorCheckpoint(bc, x[i]) - } -} - -function read12(bc: bare.ByteCursor): ReadonlyMap { +function read11(bc: bare.ByteCursor): ReadonlyMap { const len = bare.readUintSafe(bc) const result = new Map() for (let i = 0; i < len; i++) { @@ -1494,7 +1531,7 @@ function read12(bc: bare.ByteCursor): ReadonlyMap { return result } -function write12(bc: bare.ByteCursor, x: ReadonlyMap): void { +function write11(bc: bare.ByteCursor, x: ReadonlyMap): void { bare.writeUintSafe(bc, x.size) for (const kv of x) { bare.writeString(bc, kv[0]) @@ -1502,22 +1539,22 @@ function write12(bc: bare.ByteCursor, x: ReadonlyMap): void { } } -function read13(bc: bare.ByteCursor): ReadonlyMap | null { - return bare.readBool(bc) ? read12(bc) : null +function read12(bc: bare.ByteCursor): ReadonlyMap | null { + return bare.readBool(bc) ? read11(bc) : null } -function write13(bc: bare.ByteCursor, x: ReadonlyMap | null): void { +function write12(bc: bare.ByteCursor, x: ReadonlyMap | null): void { bare.writeBool(bc, x != null) if (x != null) { - write12(bc, x) + write11(bc, x) } } -function read14(bc: bare.ByteCursor): Json | null { +function read13(bc: bare.ByteCursor): Json | null { return bare.readBool(bc) ? readJson(bc) : null } -function write14(bc: bare.ByteCursor, x: Json | null): void { +function write13(bc: bare.ByteCursor, x: Json | null): void { bare.writeBool(bc, x != null) if (x != null) { writeJson(bc, x) @@ -1531,7 +1568,6 @@ export type ToServerInit = { readonly name: string readonly version: u32 readonly totalSlots: u32 - readonly lastCommandCheckpoints: readonly ActorCheckpoint[] readonly prepopulateActorNames: ReadonlyMap | null readonly metadata: Json | null } @@ -1541,9 +1577,8 @@ export function readToServerInit(bc: bare.ByteCursor): ToServerInit { name: bare.readString(bc), version: bare.readU32(bc), totalSlots: bare.readU32(bc), - lastCommandCheckpoints: read11(bc), - prepopulateActorNames: read13(bc), - metadata: read14(bc), + prepopulateActorNames: read12(bc), + metadata: read13(bc), } } @@ -1551,9 +1586,8 @@ export function writeToServerInit(bc: bare.ByteCursor, x: ToServerInit): void { bare.writeString(bc, x.name) bare.writeU32(bc, x.version) bare.writeU32(bc, x.totalSlots) - write11(bc, x.lastCommandCheckpoints) - write13(bc, x.prepopulateActorNames) - write14(bc, x.metadata) + write12(bc, x.prepopulateActorNames) + write13(bc, x.metadata) } export type ToServerEvents = readonly EventWrapper[] @@ -1577,18 +1611,37 @@ export function writeToServerEvents(bc: bare.ByteCursor, x: ToServerEvents): voi } } +function read14(bc: bare.ByteCursor): readonly ActorCheckpoint[] { + const len = bare.readUintSafe(bc) + if (len === 0) { + return [] + } + const result = [readActorCheckpoint(bc)] + for (let i = 1; i < len; i++) { + result[i] = readActorCheckpoint(bc) + } + return result +} + +function write14(bc: bare.ByteCursor, x: readonly ActorCheckpoint[]): void { + bare.writeUintSafe(bc, x.length) + for (let i = 0; i < x.length; i++) { + writeActorCheckpoint(bc, x[i]) + } +} + export type ToServerAckCommands = { - readonly lastCommandIdx: i64 + readonly lastCommandCheckpoints: readonly ActorCheckpoint[] } export function readToServerAckCommands(bc: bare.ByteCursor): ToServerAckCommands { return { - lastCommandIdx: bare.readI64(bc), + lastCommandCheckpoints: read14(bc), } } export function writeToServerAckCommands(bc: bare.ByteCursor, x: ToServerAckCommands): void { - bare.writeI64(bc, x.lastCommandIdx) + write14(bc, x.lastCommandCheckpoints) } export type ToServerStopping = null @@ -1738,21 +1791,18 @@ export function writeProtocolMetadata(bc: bare.ByteCursor, x: ProtocolMetadata): export type ToClientInit = { readonly runnerId: Id - readonly lastEventCheckpoints: readonly ActorCheckpoint[] readonly metadata: ProtocolMetadata } export function readToClientInit(bc: bare.ByteCursor): ToClientInit { return { runnerId: readId(bc), - lastEventCheckpoints: read11(bc), metadata: readProtocolMetadata(bc), } } export function writeToClientInit(bc: bare.ByteCursor, x: ToClientInit): void { writeId(bc, x.runnerId) - write11(bc, x.lastEventCheckpoints) writeProtocolMetadata(bc, x.metadata) } @@ -1783,12 +1833,12 @@ export type ToClientAckEvents = { export function readToClientAckEvents(bc: bare.ByteCursor): ToClientAckEvents { return { - lastEventCheckpoints: read11(bc), + lastEventCheckpoints: read14(bc), } } export function writeToClientAckEvents(bc: bare.ByteCursor, x: ToClientAckEvents): void { - write11(bc, x.lastEventCheckpoints) + write14(bc, x.lastEventCheckpoints) } export type ToClientKvResponse = {