From c088d429db52475fb19a3076193da9b7d8d1c826 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Tue, 18 Nov 2025 15:20:24 -0800 Subject: [PATCH] fix(pb): simplify runner wf --- .../packages/guard-core/src/custom_serve.rs | 2 +- .../packages/guard-core/src/proxy_service.rs | 29 +- .../src/actor_event_demuxer.rs | 127 ++++ engine/packages/pegboard-runner/src/conn.rs | 59 +- engine/packages/pegboard-runner/src/lib.rs | 4 + .../packages/pegboard-runner/src/ping_task.rs | 42 +- .../pegboard-runner/src/tunnel_to_ws_task.rs | 307 +++++++--- .../pegboard-runner/src/ws_to_tunnel_task.rs | 551 ++++++++++++++++-- .../packages/pegboard-serverless/src/lib.rs | 4 +- engine/packages/pegboard/src/lib.rs | 2 +- .../pegboard/src/workflows/actor/destroy.rs | 4 + .../pegboard/src/workflows/actor/mod.rs | 110 ++-- .../pegboard/src/workflows/actor/runtime.rs | 112 ++-- .../packages/pegboard/src/workflows/runner.rs | 17 +- .../pegboard/src/workflows/runner2.rs | 487 +++------------- engine/sdks/rust/data/src/converted.rs | 10 +- engine/sdks/rust/data/src/lib.rs | 2 +- engine/sdks/rust/data/src/versioned/mod.rs | 19 +- engine/sdks/rust/runner-protocol/src/lib.rs | 8 +- .../rust/runner-protocol/src/versioned.rs | 136 ++++- ...egboard.namespace.runner_alloc_idx.v2.bare | 8 + engine/sdks/schemas/runner-protocol/v4.bare | 431 ++++++++++++++ .../typescript/runner-protocol/src/index.ts | 217 ++++--- 23 files changed, 1837 insertions(+), 851 deletions(-) create mode 100644 engine/packages/pegboard-runner/src/actor_event_demuxer.rs create mode 100644 engine/sdks/schemas/data/pegboard.namespace.runner_alloc_idx.v2.bare create mode 100644 engine/sdks/schemas/runner-protocol/v4.bare diff --git a/engine/packages/guard-core/src/custom_serve.rs b/engine/packages/guard-core/src/custom_serve.rs index 1dd684c723..5bc1dcbcf1 100644 --- a/engine/packages/guard-core/src/custom_serve.rs +++ b/engine/packages/guard-core/src/custom_serve.rs @@ -3,8 +3,8 @@ use async_trait::async_trait; use bytes::Bytes; use http_body_util::Full; use hyper::{Request, Response}; -use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use pegboard::tunnel::id::RequestId; +use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use crate::WebSocketHandle; use crate::proxy_service::ResponseBody; diff --git a/engine/packages/guard-core/src/proxy_service.rs b/engine/packages/guard-core/src/proxy_service.rs index 8caa024a2e..9c0f880620 100644 --- a/engine/packages/guard-core/src/proxy_service.rs +++ b/engine/packages/guard-core/src/proxy_service.rs @@ -612,19 +612,20 @@ impl ProxyState { let cache_key = (actor_id, ip_addr); // Get existing counter or create a new one - let counter_arc = - if let Some(existing_counter) = self.in_flight_counters.get(&cache_key).await { - existing_counter - } else { - let new_counter = Arc::new(Mutex::new(InFlightCounter::new( - middleware_config.max_in_flight.amount, - ))); - self.in_flight_counters - .insert(cache_key, new_counter.clone()) - .await; - metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]); - new_counter - }; + let counter_arc = if let Some(existing_counter) = + self.in_flight_counters.get(&cache_key).await + { + existing_counter + } else { + let new_counter = Arc::new(Mutex::new(InFlightCounter::new( + middleware_config.max_in_flight.amount, + ))); + self.in_flight_counters + .insert(cache_key, new_counter.clone()) + .await; + metrics::IN_FLIGHT_COUNTER_COUNT.record(self.in_flight_counters.entry_count(), &[]); + new_counter + }; // Try to acquire from the counter let acquired = { @@ -638,7 +639,7 @@ impl ProxyState { } // Generate unique request ID - let request_id = Some(self.generate_unique_request_id().await?); + let request_id = Some(self.generate_unique_request_id().await?); Ok(request_id) } diff --git a/engine/packages/pegboard-runner/src/actor_event_demuxer.rs b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs new file mode 100644 index 0000000000..22216d832e --- /dev/null +++ b/engine/packages/pegboard-runner/src/actor_event_demuxer.rs @@ -0,0 +1,127 @@ +use std::collections::HashMap; +use std::time::{Duration, Instant}; + +use anyhow::Result; +use gas::prelude::*; +use rivet_runner_protocol as protocol; +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +const GC_INTERVAL: Duration = Duration::from_secs(30); +const MAX_LAST_SEEN: Duration = Duration::from_secs(30); + +struct Channel { + tx: mpsc::UnboundedSender, + handle: JoinHandle<()>, + last_seen: Instant, +} + +pub struct ActorEventDemuxer { + ctx: StandaloneCtx, + channels: HashMap, + last_gc: Instant, +} + +impl ActorEventDemuxer { + pub fn new(ctx: StandaloneCtx) -> Self { + Self { + ctx, + channels: HashMap::new(), + last_gc: Instant::now(), + } + } + + /// Process an event by routing it to the appropriate actor's queue + #[tracing::instrument(skip_all)] + pub fn ingest(&mut self, actor_id: Id, event: protocol::Event) { + if let Some(channel) = self.channels.get(&actor_id) { + let _ = channel.tx.send(event); + } else { + let (tx, mut rx) = mpsc::unbounded_channel(); + + let ctx = self.ctx.clone(); + let handle = tokio::spawn(async move { + loop { + let mut buffer = Vec::new(); + + // Batch process events + if rx.recv_many(&mut buffer, 1024).await == 0 { + break; + } + + if let Err(err) = dispatch_events(&ctx, actor_id, buffer).await { + tracing::error!(?err, "actor event processor failed"); + break; + } + } + }); + + self.channels.insert( + actor_id, + Channel { + tx, + handle, + last_seen: Instant::now(), + }, + ); + } + + // Run gc periodically + if self.last_gc.elapsed() > GC_INTERVAL { + self.last_gc = Instant::now(); + + self.channels.retain(|_, channel| { + let keep = channel.last_seen.elapsed() < MAX_LAST_SEEN; + + if !keep { + // TODO: Verify aborting is safe here + channel.handle.abort(); + } + + keep + }); + } + } + + /// Shutdown all tasks and wait for them to complete + #[tracing::instrument(skip_all)] + pub async fn shutdown(self) { + tracing::debug!(channels=?self.channels.len(), "shutting down actor demuxer"); + + // Drop all senders + let handles = self + .channels + .into_iter() + .map(|(_, channel)| channel.handle) + .collect::>(); + + // Await remaining tasks + for handle in handles { + let _ = handle.await; + } + + tracing::debug!("actor demuxer shut down"); + } +} + +async fn dispatch_events( + ctx: &StandaloneCtx, + actor_id: Id, + events: Vec, +) -> Result<()> { + let res = ctx + .signal(pegboard::workflows::actor::Events { inner: events }) + .tag("actor_id", actor_id) + .graceful_not_found() + .send() + .await + .with_context(|| format!("failed to forward signal to actor workflow: {}", actor_id))?; + if res.is_none() { + tracing::warn!( + ?actor_id, + "failed to send signal to actor workflow, likely already stopped" + ); + } + + Ok(()) +} diff --git a/engine/packages/pegboard-runner/src/conn.rs b/engine/packages/pegboard-runner/src/conn.rs index 9715cdd1d5..13d7c16c73 100644 --- a/engine/packages/pegboard-runner/src/conn.rs +++ b/engine/packages/pegboard-runner/src/conn.rs @@ -64,7 +64,7 @@ pub async fn init_conn( } }; - let packet = versioned::ToServer::deserialize(&buf, protocol_version) + let init_packet = versioned::ToServer::deserialize(&buf, protocol_version) .map_err(|err| WsError::InvalidPacket(err.to_string()).build()) .context("failed to deserialize initial packet from client")?; @@ -74,7 +74,7 @@ pub async fn init_conn( version, total_slots, .. - }) = &packet + }) = &init_packet { // Look up existing runner by key let existing_runner = ctx @@ -127,14 +127,15 @@ pub async fn init_conn( }; // Spawn a new runner workflow if one doesn't already exist - let workflow_id = ctx - .workflow(pegboard::workflows::runner2::Input { + let workflow_id = if protocol::is_new(protocol_version) { + ctx.workflow(pegboard::workflows::runner2::Input { runner_id, namespace_id: namespace.namespace_id, name: name.clone(), key: runner_key.clone(), version: version.clone(), total_slots: *total_slots, + protocol_version, }) .tag("runner_id", runner_id) .unique() @@ -145,25 +146,49 @@ pub async fn init_conn( "failed to dispatch runner workflow for runner: {}", runner_id ) - })?; + })? + } else { + ctx.workflow(pegboard::workflows::runner::Input { + runner_id, + namespace_id: namespace.namespace_id, + name: name.clone(), + key: runner_key.clone(), + version: version.clone(), + total_slots: *total_slots, + }) + .tag("runner_id", runner_id) + .unique() + .dispatch() + .await + .with_context(|| { + format!( + "failed to dispatch runner workflow for runner: {}", + runner_id + ) + })? + }; (name.clone(), runner_id, workflow_id) } else { - tracing::debug!(?packet, "invalid initial packet"); + tracing::debug!(?init_packet, "invalid initial packet"); return Err(WsError::InvalidInitialPacket("must be `ToServer::Init`").build()); }; - // Forward to runner wf - ctx.signal(pegboard::workflows::runner2::Forward { inner: packet }) - .to_workflow_id(workflow_id) - .send() - .await - .with_context(|| { - format!( - "failed to forward initial packet to workflow: {}", - workflow_id - ) - })?; + if protocol::is_new(protocol_version) { + ctx.signal(Init); + } else { + // Forward to runner wf + ctx.signal(pegboard::workflows::runner::Forward { inner: init_packet }) + .to_workflow_id(workflow_id) + .send() + .await + .with_context(|| { + format!( + "failed to forward initial packet to workflow: {}", + workflow_id + ) + })?; + } (runner_name, runner_id, workflow_id) } else { diff --git a/engine/packages/pegboard-runner/src/lib.rs b/engine/packages/pegboard-runner/src/lib.rs index 0632f217de..3fbd70d14e 100644 --- a/engine/packages/pegboard-runner/src/lib.rs +++ b/engine/packages/pegboard-runner/src/lib.rs @@ -15,6 +15,7 @@ use tokio::sync::watch; use tokio_tungstenite::tungstenite::protocol::frame::CloseFrame; use universalpubsub::PublishOpts; +mod actor_event_demuxer; mod conn; mod errors; mod ping_task; @@ -136,12 +137,14 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { let (tunnel_to_ws_abort_tx, tunnel_to_ws_abort_rx) = watch::channel(()); let (ws_to_tunnel_abort_tx, ws_to_tunnel_abort_rx) = watch::channel(()); let (ping_abort_tx, ping_abort_rx) = watch::channel(()); + let (init_tx, init_rx) = watch::channel(()); let tunnel_to_ws = tokio::spawn(tunnel_to_ws_task::task( self.ctx.clone(), conn.clone(), sub, eviction_sub, + init_rx, tunnel_to_ws_abort_rx, )); @@ -150,6 +153,7 @@ impl CustomServeTrait for PegboardRunnerWsCustomServe { conn.clone(), ws_handle.recv(), eviction_sub2, + init_tx, ws_to_tunnel_abort_rx, )); diff --git a/engine/packages/pegboard-runner/src/ping_task.rs b/engine/packages/pegboard-runner/src/ping_task.rs index 369c03b0c9..e29c1ce8d6 100644 --- a/engine/packages/pegboard-runner/src/ping_task.rs +++ b/engine/packages/pegboard-runner/src/ping_task.rs @@ -21,15 +21,32 @@ pub async fn task( } update_runner_ping(&ctx, &conn).await?; + + // Send ping to runner + let ping_msg = versioned::ToClient::wrap_latest(protocol::ToClient::ToClientPing( + protocol::ToClientPing { + ts: util::timestamp::now(), + }, + )); + let ping_msg_serialized = ping_msg.serialize(conn.protocol_version)?; + conn.ws_handle + .send(Message::Binary(ping_msg_serialized.into())) + .await?; } } async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> { - let Some(wf) = ctx - .workflow::(conn.workflow_id) - .get() - .await? - else { + let wf = if protocol::is_mk2(conn.protocol_version) { + ctx.workflow::(conn.workflow_id) + .get() + .await? + } else { + ctx.workflow::(conn.workflow_id) + .get() + .await? + }; + + let Some(wf) = wf else { tracing::error!(?conn.runner_id, "workflow does not exist"); return Ok(()); }; @@ -55,10 +72,17 @@ async fn update_runner_ping(ctx: &StandaloneCtx, conn: &Conn) -> Result<()> { if let RunnerEligibility::ReEligible = notif.eligibility { tracing::debug!(runner_id=?notif.runner_id, "runner has become eligible again"); - ctx.signal(pegboard::workflows::runner2::CheckQueue {}) - .to_workflow_id(notif.workflow_id) - .send() - .await?; + if protocol::is_mk2(conn.protocol_version) { + ctx.signal(pegboard::workflows::runner2::CheckQueue {}) + .to_workflow_id(notif.workflow_id) + .send() + .await?; + } else { + ctx.signal(pegboard::workflows::runner::CheckQueue {}) + .to_workflow_id(notif.workflow_id) + .send() + .await?; + } } } diff --git a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs index 857bc40a71..6c2f73f913 100644 --- a/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs +++ b/engine/packages/pegboard-runner/src/tunnel_to_ws_task.rs @@ -14,20 +14,15 @@ use crate::{LifecycleResult, conn::Conn, errors}; pub async fn task( ctx: StandaloneCtx, conn: Arc, - mut sub: Subscriber, + mut tunnel_sub: Subscriber, mut eviction_sub: Subscriber, + mut init_rx: watch::Receiver<()>, mut tunnel_to_ws_abort_rx: watch::Receiver<()>, ) -> Result { - loop { - let ups_msg = tokio::select! { - res = sub.next() => { - if let NextOutput::Message(ups_msg) = res.context("pubsub_to_client_task sub failed")? { - ups_msg - } else { - tracing::debug!("tunnel sub closed"); - bail!("tunnel sub closed"); - } - } + if protocol::is_mk2(conn.protocol) { + // Must first receive init from adjacent task before processing messages + tokio::select! { + _ = init_rx.changed() => {} _ = eviction_sub.next() => { tracing::debug!("runner evicted"); return Err(errors::WsError::Eviction.build()); @@ -38,99 +33,219 @@ pub async fn task( } }; - tracing::debug!( - payload_len = ups_msg.payload.len(), - "received message from pubsub, forwarding to WebSocket" - ); - - // Parse message - let msg = match versioned::ToRunner::deserialize_with_embedded_version(&ups_msg.payload) { - Result::Ok(x) => x, - Err(err) => { - tracing::error!(?err, "failed to parse tunnel message"); - continue; + loop { + match recv_msg().await? { + Ok(msg) => handle_message_mk2(msg).await?, + Err(lifecycle_res) => return Ok(lifecycle_res), } - }; + } + } else { + loop { + match recv_msg().await? { + Ok(msg) => handle_message_mk1(msg).await?, + Err(lifecycle_res) => return Ok(lifecycle_res), + } + } + } +} - // Convert to ToClient types - let to_client_msg = match msg { - protocol::ToRunner::ToRunnerPing(ping) => { - // Publish pong to UPS - let gateway_reply_to = GatewayReceiverSubject::new(ping.gateway_id).to_string(); - let msg_serialized = versioned::ToGateway::wrap_latest( - protocol::ToGateway::ToGatewayPong(protocol::ToGatewayPong { - request_id: ping.request_id, - ts: ping.ts, - }), - ) - .serialize_with_embedded_version(protocol::PROTOCOL_VERSION) - .context("failed to serialize pong message for gateway")?; - ctx.ups() - .context("failed to get UPS instance for tunnel message")? - .publish(&gateway_reply_to, &msg_serialized, PublishOpts::one()) - .await - .with_context(|| { - format!( - "failed to publish tunnel message to gateway reply topic: {}", - gateway_reply_to - ) - })?; - - // Not sent to client - continue; +async fn recv_msg() -> Result> { + let tunnel_msg = tokio::select! { + res = tunnel_sub.next() => { + if let NextOutput::Message(tunnel_msg) = res.context("pubsub_to_client_task sub failed")? { + tunnel_msg + } else { + tracing::debug!("tunnel sub closed"); + bail!("tunnel sub closed"); } - protocol::ToRunner::ToClientInit(x) => protocol::ToClient::ToClientInit(x), - protocol::ToRunner::ToClientClose => return Err(errors::WsError::Eviction.build()), - // Dynamically populate hibernating request ids - protocol::ToRunner::ToClientCommands(mut command_wrappers) => { - for command_wrapper in &mut command_wrappers { - if let protocol::Command::CommandStartActor(protocol::CommandStartActor { + } + _ = eviction_sub.next() => { + tracing::debug!("runner evicted"); + return Err(errors::WsError::Eviction.build()); + } + _ = tunnel_to_ws_abort_rx.changed() => { + tracing::debug!("task aborted"); + return Ok(Err(LifecycleResult::Aborted)); + } + }; + + tracing::debug!( + payload_len = tunnel_msg.payload.len(), + "received message from pubsub, forwarding to WebSocket" + ); + + Ok(Ok(tunnel_msg)) +} + +async fn handle_message_mk2() -> Result<()> { + // Parse message + let msg = match versioned::ToRunner2::deserialize_with_embedded_version(&tunnel_msg.payload) { + Result::Ok(x) => x, + Err(err) => { + tracing::error!(?err, "failed to parse tunnel message"); + return Ok(()); + } + }; + + // Convert to ToClient types + let to_client_msg = match msg { + protocol::mk2::ToRunner::ToRunnerPing(ping) => { + // Publish pong to UPS + let gateway_reply_to = GatewayReceiverSubject::new(ping.gateway_id).to_string(); + let msg_serialized = versioned::ToGateway::wrap_latest( + protocol::ToGateway::ToGatewayPong(protocol::ToGatewayPong { + request_id: ping.request_id, + ts: ping.ts, + }), + ) + .serialize_with_embedded_version(protocol::PROTOCOL_VERSION) + .context("failed to serialize pong message for gateway")?; + ctx.ups() + .context("failed to get UPS instance for tunnel message")? + .publish(&gateway_reply_to, &msg_serialized, PublishOpts::one()) + .await + .with_context(|| { + format!( + "failed to publish tunnel message to gateway reply topic: {}", + gateway_reply_to + ) + })?; + + // Not sent to client + return Ok(()); + } + protocol::mk2::ToRunner::ToRunnerClose => return Err(errors::WsError::Eviction.build()), + protocol::mk2::ToRunner::ToClientCommands(mut command_wrappers) => { + for command_wrapper in &mut command_wrappers { + if let protocol::mk2::Command::CommandStartActor( + protocol::mk2::CommandStartActor { actor_id, hibernating_requests, .. - }) = &mut command_wrapper.inner - { - let ids = ctx - .op(pegboard::ops::actor::hibernating_request::list::Input { - actor_id: Id::parse(actor_id)?, - }) - .await?; - - *hibernating_requests = ids - .into_iter() - .map(|x| protocol::HibernatingRequest { - gateway_id: x.gateway_id, - request_id: x.request_id, - }) - .collect(); - } - } + }, + ) = &mut command_wrapper.inner + { + let ids = ctx + .op(pegboard::ops::actor::hibernating_request::list::Input { + actor_id: Id::parse(actor_id)?, + }) + .await?; - // NOTE: `command_wrappers` is mutated in this match arm, it is not the same as the - // ToRunner data - protocol::ToClient::ToClientCommands(command_wrappers) - } - protocol::ToRunner::ToClientAckEvents(x) => protocol::ToClient::ToClientAckEvents(x), - protocol::ToRunner::ToClientKvResponse(x) => protocol::ToClient::ToClientKvResponse(x), - protocol::ToRunner::ToClientTunnelMessage(x) => { - protocol::ToClient::ToClientTunnelMessage(x) + // Dynamically populate hibernating request ids + *hibernating_requests = ids + .into_iter() + .map(|x| protocol::mk2::HibernatingRequest { + gateway_id: x.gateway_id, + request_id: x.request_id, + }) + .collect(); + } } - }; - // Forward raw message to WebSocket - let serialized_msg = match versioned::ToClient::wrap_latest(to_client_msg) - .serialize(conn.protocol_version) - { - Result::Ok(x) => x, - Err(err) => { - tracing::error!(?err, "failed to serialize tunnel message"); - continue; + // NOTE: `command_wrappers` is mutated in this match arm, it is not the same as the + // ToRunner data + protocol::mk2::ToClient::ToClientCommands(command_wrappers) + } + protocol::mk2::ToRunner::ToClientTunnelMessage(x) => { + protocol::mk2::ToClient::ToClientTunnelMessage(x) + } + }; + + // Forward raw message to WebSocket + let serialized_msg = + versioned::ToClientMk2::wrap_latest(to_client_msg).serialize(conn.protocol_version)?; + let ws_msg = WsMessage::Binary(serialized_msg.into()); + conn.ws_handle + .send(ws_msg) + .await + .context("failed to send message to WebSocket")?; + + Ok(()) +} + +async fn handle_message_mk1() -> Result<()> { + // Parse message + let msg = match versioned::ToRunner::deserialize_with_embedded_version(&tunnel_msg.payload) { + Result::Ok(x) => x, + Err(err) => { + tracing::error!(?err, "failed to parse tunnel message"); + return Ok(()); + } + }; + + // Convert to ToClient types + let to_client_msg = match msg { + protocol::ToRunner::ToRunnerPing(ping) => { + // Publish pong to UPS + let gateway_reply_to = GatewayReceiverSubject::new(ping.gateway_id).to_string(); + let msg_serialized = versioned::ToGateway::wrap_latest( + protocol::ToGateway::ToGatewayPong(protocol::ToGatewayPong { + request_id: ping.request_id, + ts: ping.ts, + }), + ) + .serialize_with_embedded_version(protocol::PROTOCOL_VERSION) + .context("failed to serialize pong message for gateway")?; + ctx.ups() + .context("failed to get UPS instance for tunnel message")? + .publish(&gateway_reply_to, &msg_serialized, PublishOpts::one()) + .await + .with_context(|| { + format!( + "failed to publish tunnel message to gateway reply topic: {}", + gateway_reply_to + ) + })?; + + // Not sent to client + continue; + } + protocol::ToRunner::ToClientInit(x) => protocol::ToClient::ToClientInit(x), + protocol::ToRunner::ToClientClose => return Err(errors::WsError::Eviction.build()), + // Dynamically populate hibernating request ids + protocol::ToRunner::ToClientCommands(mut command_wrappers) => { + for command_wrapper in &mut command_wrappers { + if let protocol::Command::CommandStartActor(protocol::CommandStartActor { + actor_id, + hibernating_requests, + .. + }) = &mut command_wrapper.inner + { + let ids = ctx + .op(pegboard::ops::actor::hibernating_request::list::Input { + actor_id: Id::parse(actor_id)?, + }) + .await?; + + *hibernating_requests = ids + .into_iter() + .map(|x| protocol::HibernatingRequest { + gateway_id: x.gateway_id, + request_id: x.request_id, + }) + .collect(); + } } - }; - let ws_msg = WsMessage::Binary(serialized_msg.into()); - conn.ws_handle - .send(ws_msg) - .await - .context("failed to send message to WebSocket")? - } + + // NOTE: `command_wrappers` is mutated in this match arm, it is not the same as the + // ToRunner data + protocol::ToClient::ToClientCommands(command_wrappers) + } + protocol::ToRunner::ToClientAckEvents(x) => protocol::ToClient::ToClientAckEvents(x), + protocol::ToRunner::ToClientKvResponse(x) => protocol::ToClient::ToClientKvResponse(x), + protocol::ToRunner::ToClientTunnelMessage(x) => { + protocol::ToClient::ToClientTunnelMessage(x) + } + }; + + // Forward raw message to WebSocket + let serialized_msg = + versioned::ToClient::wrap_latest(to_client_msg).serialize(conn.protocol_version)?; + let ws_msg = WsMessage::Binary(serialized_msg.into()); + conn.ws_handle + .send(ws_msg) + .await + .context("failed to send message to WebSocket")?; + + Ok(()) } diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 2e4fc5e7cc..ed0068a0b4 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -6,6 +6,7 @@ use hyper_tungstenite::tungstenite::Message as WsMessage; use hyper_tungstenite::tungstenite::Message; use pegboard::pubsub_subjects::GatewayReceiverSubject; use pegboard::tunnel::id as tunnel_id; +use pegboard::utils::event_actor_id; use pegboard_actor_kv as kv; use rivet_guard_core::websocket_handle::WebSocketReceiver; use rivet_runner_protocol::{self as protocol, PROTOCOL_VERSION, versioned}; @@ -15,79 +16,465 @@ use universalpubsub::PublishOpts; use universalpubsub::Subscriber; use vbare::OwnedVersionedData; -use crate::{LifecycleResult, conn::Conn, errors}; +use crate::{LifecycleResult, actor_event_demuxer::ActorEventDemuxer, conn::Conn, errors}; #[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))] pub async fn task( + ctx: StandaloneCtx, + conn: Arc, + ws_rx: Arc>, + eviction_sub2: Subscriber, + init_tx: watch::Sender<()>, + ws_to_tunnel_abort_rx: watch::Receiver<()>, +) -> Result { + let mut event_demuxer = ActorEventDemuxer::new(ctx.clone()); + + let res = task_inner( + ctx, + conn, + ws_rx, + eviction_sub2, + init_tx, + ws_to_tunnel_abort_rx, + &mut event_demuxer, + ) + .await; + + // Must shutdown demuxer to allow for all in-flight events to finish + event_demuxer.shutdown().await; + + res +} + +#[tracing::instrument(skip_all, fields(runner_id=?conn.runner_id, workflow_id=?conn.workflow_id, protocol_version=%conn.protocol_version))] +pub async fn task_inner( ctx: StandaloneCtx, conn: Arc, ws_rx: Arc>, mut eviction_sub2: Subscriber, + init_tx: watch::Sender<()>, mut ws_to_tunnel_abort_rx: watch::Receiver<()>, + event_demuxer: &mut ActorEventDemuxer, ) -> Result { let mut ws_rx = ws_rx.lock().await; - loop { - let msg = tokio::select! { - res = ws_rx.try_next() => { - if let Some(msg) = res? { - msg - } else { - tracing::debug!("websocket closed"); - return Ok(LifecycleResult::Closed); - } + if protocol::is_mk2(conn.protocol_version) { + loop { + match recv_msg().await? { + Ok(Some(msg)) => handle_message_mk2(msg).await?, + Ok(None) => {} + Err(lifecycle_res) => return Ok(lifecycle_res), } - _ = eviction_sub2.next() => { - tracing::debug!("runner evicted"); - return Err(errors::WsError::Eviction.build()); + } + } else { + loop { + match recv_msg().await? { + Ok(Some(msg)) => handle_message_mk1(msg).await?, + Ok(None) => {} + Err(lifecycle_res) => return Ok(lifecycle_res), } - _ = ws_to_tunnel_abort_rx.changed() => { - tracing::debug!("task aborted"); - return Ok(LifecycleResult::Aborted); + } + } +} + +async fn recv_msg() -> Result, LifecycleResult>> { + let msg = tokio::select! { + res = ws_rx.try_next() => { + if let Some(msg) = res? { + msg + } else { + tracing::debug!("websocket closed"); + return Ok(Err(LifecycleResult::Closed)); } - }; + } + _ = eviction_sub2.next() => { + tracing::debug!("runner evicted"); + return Err(errors::WsError::Eviction.build()); + } + _ = ws_to_tunnel_abort_rx.changed() => { + tracing::debug!("task aborted"); + return Ok(Err(LifecycleResult::Aborted)); + } + }; + + match msg { + WsMessage::Binary(data) => { + tracing::trace!( + data_len = data.len(), + "received binary message from WebSocket" + ); + + Ok(Some(data)) + } + WsMessage::Close(_) => { + tracing::debug!("websocket closed"); + return Ok(Err(LifecycleResult::Closed)); + } + _ => { + // Ignore other message types + Ok(None) + } + } +} + +#[tracing::instrument(skip_all)] +async fn handle_message_mk2( + ctx: &StandaloneCtx, + conn: &Arc, + init_tx: &watch::Sender<()>, + event_demuxer: &mut ActorEventDemuxer, + msg: (), +) -> Result<()> { + // Parse message + let msg = match versioned::ToServerMk2::deserialize(&data, conn.protocol_version) { + Ok(x) => x, + Err(err) => { + tracing::warn!(?err, data_len = data.len(), "failed to deserialize message"); + return Ok(()); + } + }; + + match msg { + protocol::ToServer::ToServerPong(pong) => { + let now = util::timestamp::now(); + let rtt = now.saturating_sub(pong.ts); + + let rtt = if let Ok(rtt) = u32::try_from(rtt) { + rtt + } else { + tracing::debug!("ping ts in the future, ignoring"); + u32::MAX + }; + + conn.last_rtt.store(rtt, Ordering::Relaxed); + } + // Process KV request + protocol::ToServer::ToServerKvRequest(req) => { + let actor_id = match Id::parse(&req.actor_id) { + Ok(actor_id) => actor_id, + Err(err) => { + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + message: err.to_string(), + }, + ), + }), + ); - match msg { - WsMessage::Binary(data) => { - tracing::trace!( - data_len = data.len(), - "received binary message from WebSocket" + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV error response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV error response to client")?; + + return Ok(()); + } + }; + + let actors_res = ctx + .op(pegboard::ops::actor::get_runner::Input { + actor_ids: vec![actor_id], + }) + .await + .with_context(|| format!("failed to get runner for actor: {}", actor_id))?; + let actor_belongs = actors_res + .actors + .first() + .map(|x| x.runner_id == conn.runner_id) + .unwrap_or_default(); + + // Verify actor belongs to this runner + if !actor_belongs { + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + message: "given actor does not belong to runner".to_string(), + }, + ), + }), ); - // Parse message - let msg = match versioned::ToServer::deserialize(&data, conn.protocol_version) { - Ok(x) => x, - Err(err) => { - tracing::warn!( - ?err, - data_len = data.len(), - "failed to deserialize message" - ); - continue; - } - }; - - handle_message(&ctx, &conn, msg) + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV actor validation error")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) .await - .context("failed to handle WebSocket message")?; + .context("failed to send KV actor validation error to client")?; + + return Ok(()); } - WsMessage::Close(_) => { - tracing::debug!("websocket closed"); - return Ok(LifecycleResult::Closed); + + // TODO: Add queue and bg thread for processing kv ops + // Run kv operation + match req.data { + protocol::KvRequestData::KvGetRequest(body) => { + let res = kv::get(&*ctx.udb()?, actor_id, body.keys).await; + + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: match res { + Ok((keys, values, metadata)) => { + protocol::KvResponseData::KvGetResponse( + protocol::KvGetResponse { + keys, + values, + metadata, + }, + ) + } + Err(err) => protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + // TODO: Don't return actual error? + message: err.to_string(), + }, + ), + }, + }), + ); + + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV get response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV get response to client")?; + } + protocol::KvRequestData::KvListRequest(body) => { + let res = kv::list( + &*ctx.udb()?, + actor_id, + body.query, + body.reverse.unwrap_or_default(), + body.limit + .map(TryInto::try_into) + .transpose() + .context("KV list limit value overflow")?, + ) + .await; + + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: match res { + Ok((keys, values, metadata)) => { + protocol::KvResponseData::KvListResponse( + protocol::KvListResponse { + keys, + values, + metadata, + }, + ) + } + Err(err) => protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + // TODO: Don't return actual error? + message: err.to_string(), + }, + ), + }, + }), + ); + + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV list response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV list response to client")?; + } + protocol::KvRequestData::KvPutRequest(body) => { + let res = kv::put(&*ctx.udb()?, actor_id, body.keys, body.values).await; + + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: match res { + Ok(()) => protocol::KvResponseData::KvPutResponse, + Err(err) => { + protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + // TODO: Don't return actual error? + message: err.to_string(), + }, + ) + } + }, + }), + ); + + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV put response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV put response to client")?; + } + protocol::KvRequestData::KvDeleteRequest(body) => { + let res = kv::delete(&*ctx.udb()?, actor_id, body.keys).await; + + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: match res { + Ok(()) => protocol::KvResponseData::KvDeleteResponse, + Err(err) => protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + // TODO: Don't return actual error? + message: err.to_string(), + }, + ), + }, + }), + ); + + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV delete response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV delete response to client")?; + } + protocol::KvRequestData::KvDropRequest => { + let res = kv::delete_all(&*ctx.udb()?, actor_id).await; + + let res_msg = versioned::ToClient::wrap_latest( + protocol::ToClient::ToClientKvResponse(protocol::ToClientKvResponse { + request_id: req.request_id, + data: match res { + Ok(()) => protocol::KvResponseData::KvDropResponse, + Err(err) => protocol::KvResponseData::KvErrorResponse( + protocol::KvErrorResponse { + // TODO: Don't return actual error? + message: err.to_string(), + }, + ), + }, + }), + ); + + let res_msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV drop response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV drop response to client")?; + } + } + } + protocol::ToServer::ToServerTunnelMessage(tunnel_msg) => { + handle_tunnel_message(&ctx, &conn, tunnel_msg) + .await + .context("failed to handle tunnel message")?; + } + protocol::ToServer::ToServerInit(_) => { + // We send the signal first because we don't want to continue if this fails + ctx.signal(pegboard::workflows::runner2::Init {}) + .to_workflow_id(conn.workflow_id) + .send() + .await + .with_context(|| { + format!( + "failed to send signal to runner workflow: {}", + conn.workflow_id + ) + })?; + + let init_data = ctx + .activity(ProcessInitInput { + runner_id: input.runner_id, + namespace_id: input.namespace_id, + last_command_idx: last_command_idx.unwrap_or(-1), + prepopulate_actor_names, + metadata, + }) + .await?; + + // Send init packet + let msg = versioned::ToClient::wrap_latest(protocol::ToRunner::ToClientInit( + protocol::ToClientInit { + runner_id: input.runner_id.to_string(), + last_event_idx: init_data.last_event_idx, + metadata: protocol::ProtocolMetadata { + runner_lost_threshold: runner_lost_threshold, + }, + }, + )); + let msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV delete response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV delete response to client")?; + + // Send missed commands + if !init_data.missed_commands.is_empty() { + let msg = versioned::ToClient::wrap_latest(protocol::ToRunner::ToClientCommands( + init_data.missed_commands, + )); + let msg_serialized = res_msg + .serialize(conn.protocol_version) + .context("failed to serialize KV delete response")?; + conn.ws_handle + .send(Message::Binary(res_msg_serialized.into())) + .await + .context("failed to send KV delete response to client")?; } - _ => { - // Ignore other message types + + // Inform adjacent task that we have processed and sent the init packet. This will allow it to + // start accepting commands + let _ = init_tx.send(()); + } + // Forward to actor wf + protocol::ToServer::ToServerEvents(events) => { + for event in events { + event_demuxer.ingest(Id::parse(event_actor_id(&event.inner))?, event.inner); } } + protocol::ToServer::ToServerAckCommands(_) => { + ack_commands(&ctx).await?; + } + protocol::ToServer::ToServerStopping => { + ctx.signal(pegboard::workflows::runner2::Stop { + reset_actor_rescheduling: false, + }) + .to_workflow_id(conn.workflow_id) + .send() + .await + .with_context(|| { + format!("failed to forward signal to workflow: {}", conn.workflow_id) + })?; + } } + + Ok(()) } #[tracing::instrument(skip_all)] -async fn handle_message( - ctx: &StandaloneCtx, - conn: &Arc, - msg: protocol::ToServer, -) -> Result<()> { +async fn handle_message_mk1(ctx: &StandaloneCtx, conn: &Arc, msg: ()) -> Result<()> { + // Parse message + let msg = match versioned::ToServer::deserialize(&data, conn.protocol_version) { + Ok(x) => x, + Err(err) => { + tracing::warn!(?err, data_len = data.len(), "failed to deserialize message"); + return Ok(()); + } + }; + match msg { protocol::ToServer::ToServerPing(ping) => { let now = util::timestamp::now(); @@ -348,7 +735,7 @@ async fn handle_message( | protocol::ToServer::ToServerEvents(_) | protocol::ToServer::ToServerAckCommands(_) | protocol::ToServer::ToServerStopping => { - ctx.signal(pegboard::workflows::runner2::Forward { + ctx.signal(pegboard::workflows::runner::Forward { inner: protocol::ToServer::try_from(msg) .context("failed to convert message for workflow forwarding")?, }) @@ -360,12 +747,78 @@ async fn handle_message( })?; } } +} + +async fn ack_commands(ctx: &StandaloneCtx) -> Result<()> { + // ctx.udb()?.run(|| { + // let last_ack = ; + // let stream = .read_ranges_keyvalues({ + // limit: + // }); + // }).await?; +} + +#[tracing::instrument(skip_all)] +async fn handle_tunnel_message_mk2( + ctx: &StandaloneCtx, + conn: &Arc, + msg: protocol::mk2::ToServerTunnelMessage, +) -> Result<()> { + // Ignore DeprecatedTunnelAck messages (used only for backwards compatibility) + if matches!( + msg.message_kind, + protocol::mk2::ToServerTunnelMessageKind::DeprecatedTunnelAck + ) { + return Ok(()); + } + + // Send DeprecatedTunnelAck back to runner for older protocol versions + if protocol::mk2::compat::version_needs_tunnel_ack(conn.protocol_version) { + let ack_msg = versioned::ToClientMk2::wrap_latest( + protocol::mk2::ToClient::ToClientTunnelMessage(protocol::mk2::ToClientTunnelMessage { + message_id: msg.message_id, + message_kind: protocol::mk2::ToClientTunnelMessageKind::DeprecatedTunnelAck, + }), + ); + + let ack_serialized = ack_msg + .serialize(conn.protocol_version) + .context("failed to serialize DeprecatedTunnelAck response")?; + + conn.ws_handle + .send(hyper_tungstenite::tungstenite::Message::Binary( + ack_serialized.into(), + )) + .await + .context("failed to send DeprecatedTunnelAck to runner")?; + } + + // Parse message ID to extract gateway_id + let parts = + tunnel_id::parse_message_id(msg.message_id).context("failed to parse message id")?; + + // Publish message to UPS + let gateway_reply_to = GatewayReceiverSubject::new(parts.gateway_id).to_string(); + let msg_serialized = + versioned::ToGateway::wrap_latest(protocol::mk2::ToGateway::ToServerTunnelMessage(msg)) + .serialize_with_embedded_version(PROTOCOL_VERSION) + .context("failed to serialize tunnel message for gateway")?; + ctx.ups() + .context("failed to get UPS instance for tunnel message")? + .publish(&gateway_reply_to, &msg_serialized, PublishOpts::one()) + .await + .with_context(|| { + format!( + "failed to publish tunnel message to gateway reply topic: {}", + gateway_reply_to + ) + })?; Ok(()) } #[tracing::instrument(skip_all)] -async fn handle_tunnel_message( +async fn handle_tunnel_message_mk1( ctx: &StandaloneCtx, conn: &Arc, msg: protocol::ToServerTunnelMessage, diff --git a/engine/packages/pegboard-serverless/src/lib.rs b/engine/packages/pegboard-serverless/src/lib.rs index 336b9586c2..87cfc9179f 100644 --- a/engine/packages/pegboard-serverless/src/lib.rs +++ b/engine/packages/pegboard-serverless/src/lib.rs @@ -519,8 +519,8 @@ async fn publish_to_client_stop(ctx: &StandaloneCtx, runner_id: Id) -> Result<() let receiver_subject = pegboard::pubsub_subjects::RunnerReceiverSubject::new(runner_id).to_string(); - let message_serialized = rivet_runner_protocol::versioned::ToClient::wrap_latest( - rivet_runner_protocol::ToClient::ToClientClose, + let message_serialized = rivet_runner_protocol::versioned::ToRunner::wrap_latest( + rivet_runner_protocol::ToRunner::ToRunnerClose, ) .serialize_with_embedded_version(rivet_runner_protocol::PROTOCOL_VERSION)?; diff --git a/engine/packages/pegboard/src/lib.rs b/engine/packages/pegboard/src/lib.rs index d6f02fdbc9..50d941a7fc 100644 --- a/engine/packages/pegboard/src/lib.rs +++ b/engine/packages/pegboard/src/lib.rs @@ -6,7 +6,7 @@ mod metrics; pub mod ops; pub mod pubsub_subjects; pub mod tunnel; -mod utils; +pub mod utils; pub mod workflows; pub fn registry() -> WorkflowResult { diff --git a/engine/packages/pegboard/src/workflows/actor/destroy.rs b/engine/packages/pegboard/src/workflows/actor/destroy.rs index 76ece1b0f2..f5d5afc7cf 100644 --- a/engine/packages/pegboard/src/workflows/actor/destroy.rs +++ b/engine/packages/pegboard/src/workflows/actor/destroy.rs @@ -201,12 +201,14 @@ pub(crate) async fn clear_slot( runner_remaining_slots, runner_total_slots, runner_last_ping_ts, + runner_protocol_version, ) = tokio::try_join!( tx.read(&runner_workflow_id_key, Serializable), tx.read(&runner_version_key, Serializable), tx.read(&runner_remaining_slots_key, Serializable), tx.read(&runner_total_slots_key, Serializable), tx.read(&runner_last_ping_ts_key, Serializable), + tx.read_opt(&runner_protocol_version_key, Serializable), )?; let old_runner_remaining_millislots = (runner_remaining_slots * 1000) / runner_total_slots; @@ -245,6 +247,8 @@ pub(crate) async fn clear_slot( workflow_id: runner_workflow_id, remaining_slots: new_runner_remaining_slots, total_slots: runner_total_slots, + // We default here because its not important for mk1 protocol runners + protocol_version: runner_protocol_version.unwrap_or(PROTOCOL_MK1_VERSION), }, )?; } diff --git a/engine/packages/pegboard/src/workflows/actor/mod.rs b/engine/packages/pegboard/src/workflows/actor/mod.rs index 8e92c9e1c9..8a43fa44b8 100644 --- a/engine/packages/pegboard/src/workflows/actor/mod.rs +++ b/engine/packages/pegboard/src/workflows/actor/mod.rs @@ -318,18 +318,19 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - // Send signal to stop actor now that we know it will be sleeping - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStopActor( - protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }, - ), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; + // TODO: Send message to tunnel + // // Send signal to stop actor now that we know it will be sleeping + // ctx.signal(crate::workflows::runner::Command { + // inner: protocol::Command::CommandStopActor( + // protocol::CommandStopActor { + // actor_id: input.actor_id.to_string(), + // generation: state.generation, + // }, + // ), + // }) + // .to_workflow_id(runner_workflow_id) + // .send() + // .await?; } } protocol::ActorIntent::ActorIntentStop => { @@ -348,17 +349,18 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStopActor( - protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }, - ), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; + // TODO: Send message to tunnel + // ctx.signal(crate::workflows::runner::Command { + // inner: protocol::Command::CommandStopActor( + // protocol::CommandStopActor { + // actor_id: input.actor_id.to_string(), + // generation: state.generation, + // }, + // ), + // }) + // .to_workflow_id(runner_workflow_id) + // .send() + // .await?; } } }, @@ -495,29 +497,31 @@ pub async fn pegboard_actor(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> }) .await?; - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; + // TODO: Send message to tunnel + // ctx.signal(crate::workflows::runner::Command { + // inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + // actor_id: input.actor_id.to_string(), + // generation: state.generation, + // }), + // }) + // .to_workflow_id(runner_workflow_id) + // .send() + // .await?; } } Main::Destroy(_) => { // If allocated, send stop actor command if let Some(runner_workflow_id) = state.runner_workflow_id { - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; + // TODO: Send message to tunnel + // ctx.signal(crate::workflows::runner::Command { + // inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + // actor_id: input.actor_id.to_string(), + // generation: state.generation, + // }), + // }) + // .to_workflow_id(runner_workflow_id) + // .send() + // .await?; } return Ok(Loop::Break(runtime::LifecycleResult { @@ -628,15 +632,16 @@ async fn handle_stopped( if let (StoppedVariant::Lost { .. }, Some(old_runner_workflow_id)) = (&variant, old_runner_workflow_id) { - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { - actor_id: input.actor_id.to_string(), - generation: state.generation, - }), - }) - .to_workflow_id(old_runner_workflow_id) - .send() - .await?; + // TODO: Send message to tunnel + // ctx.signal(crate::workflows::runner::Command { + // inner: protocol::Command::CommandStopActor(protocol::CommandStopActor { + // actor_id: input.actor_id.to_string(), + // generation: state.generation, + // }), + // }) + // .to_workflow_id(old_runner_workflow_id) + // .send() + // .await?; } // Reschedule no matter what @@ -748,6 +753,11 @@ pub struct Event { pub inner: protocol::Event, } +#[signal("pegboard_actor_events")] +pub struct Events { + pub inner: Vec, +} + #[signal("pegboard_actor_wake")] pub struct Wake {} diff --git a/engine/packages/pegboard/src/workflows/actor/runtime.rs b/engine/packages/pegboard/src/workflows/actor/runtime.rs index 7cec9252c2..c39f047b39 100644 --- a/engine/packages/pegboard/src/workflows/actor/runtime.rs +++ b/engine/packages/pegboard/src/workflows/actor/runtime.rs @@ -127,6 +127,7 @@ pub enum AllocateActorOutput { Allocated { runner_id: Id, runner_workflow_id: Id, + runner_protocol_version: Option, }, Pending { pending_allocation_ts: i64, @@ -291,6 +292,7 @@ async fn allocate_actor( workflow_id: old_runner_alloc_key_data.workflow_id, remaining_slots: new_remaining_slots, total_slots: old_runner_alloc_key_data.total_slots, + protocol_version: old_runner_alloc_key_data.protocol_version, }, )?; @@ -323,6 +325,8 @@ async fn allocate_actor( AllocateActorOutput::Allocated { runner_id: old_runner_alloc_key.runner_id, runner_workflow_id: old_runner_alloc_key_data.workflow_id, + runner_protocol_version: old_runner_alloc_key_data + .runner_protocol_version, }, )); } @@ -379,6 +383,7 @@ async fn allocate_actor( AllocateActorOutput::Allocated { runner_id, runner_workflow_id, + .. } => { state.sleep_ts = None; state.pending_allocation_ts = None; @@ -508,40 +513,46 @@ pub async fn spawn_actor( AllocateActorOutput::Allocated { runner_id, runner_workflow_id, + runner_protocol_version, } => { // Bump the autoscaler so it can scale up ctx.msg(rivet_types::msgs::pegboard::BumpServerlessAutoscaler {}) .send() .await?; - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { - actor_id: input.actor_id.to_string(), - generation, - config: protocol::ActorConfig { - name: input.name.clone(), - key: input.key.clone(), - // HACK: We should not use dynamic timestamp here, but we don't validate if signal data - // changes (like activity inputs) so this is fine for now. - create_ts: util::timestamp::now(), - input: input - .input - .as_ref() - .map(|x| BASE64_STANDARD.decode(x)) - .transpose()?, - }, - // Empty because request ids are ephemeral. This is intercepted by guard and - // populated before it reaches the runner - hibernating_requests: Vec::new(), - }), - }) - .to_workflow_id(runner_workflow_id) - .send() - .await?; + if protocol::is_new(runner_protocol_version) { + // TODO: Send message to tunnel + } else { + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { + actor_id: input.actor_id.to_string(), + generation, + config: protocol::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + // HACK: We should not use dynamic timestamp here, but we don't validate if signal data + // changes (like activity inputs) so this is fine for now. + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_requests: Vec::new(), + }), + }) + .to_workflow_id(runner_workflow_id) + .send() + .await?; + } Ok(SpawnActorOutput::Allocated { runner_id, runner_workflow_id, + runner_protocol_version, }) } AllocateActorOutput::Pending { @@ -563,32 +574,39 @@ pub async fn spawn_actor( }) .await?; - ctx.signal(crate::workflows::runner2::Command { - inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { - actor_id: input.actor_id.to_string(), - generation, - config: protocol::ActorConfig { - name: input.name.clone(), - key: input.key.clone(), - create_ts: util::timestamp::now(), - input: input - .input - .as_ref() - .map(|x| BASE64_STANDARD.decode(x)) - .transpose()?, - }, - // Empty because request ids are ephemeral. This is intercepted by guard and - // populated before it reaches the runner - hibernating_requests: Vec::new(), - }), - }) - .to_workflow_id(sig.runner_workflow_id) - .send() - .await?; + if protocol::is_new(sig.runner_protocol_version) { + // TODO: Send message to tunnel + } else { + ctx.signal(crate::workflows::runner::Command { + inner: protocol::Command::CommandStartActor( + protocol::CommandStartActor { + actor_id: input.actor_id.to_string(), + generation, + config: protocol::ActorConfig { + name: input.name.clone(), + key: input.key.clone(), + create_ts: util::timestamp::now(), + input: input + .input + .as_ref() + .map(|x| BASE64_STANDARD.decode(x)) + .transpose()?, + }, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the runner + hibernating_requests: Vec::new(), + }, + ), + }) + .to_workflow_id(sig.runner_workflow_id) + .send() + .await?; + } Ok(SpawnActorOutput::Allocated { runner_id: sig.runner_id, runner_workflow_id: sig.runner_workflow_id, + runner_protocol_version: sig.runner_protocol_version, }) } PendingAllocation::Destroy(_) => { @@ -683,11 +701,13 @@ pub async fn reschedule_actor( if let SpawnActorOutput::Allocated { runner_id, runner_workflow_id, + runner_protocol_version, } = &spawn_res { state.generation = next_generation; state.runner_id = Some(*runner_id); state.runner_workflow_id = Some(*runner_workflow_id); + state.runner_protocol_version = runner_protocol_version; // Reset gc timeout once allocated state.gc_timeout_ts = diff --git a/engine/packages/pegboard/src/workflows/runner.rs b/engine/packages/pegboard/src/workflows/runner.rs index 41739b4b26..b980f8b6b5 100644 --- a/engine/packages/pegboard/src/workflows/runner.rs +++ b/engine/packages/pegboard/src/workflows/runner.rs @@ -97,7 +97,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> // Send init packet ctx.activity(SendMessageToRunnerInput { runner_id: input.runner_id, - message: protocol::ToClient::ToClientInit(protocol::ToClientInit { + message: protocol::ToRunner::ToClientInit(protocol::ToClientInit { runner_id: input.runner_id.to_string(), last_event_idx: init_data.last_event_idx, metadata: protocol::ProtocolMetadata { @@ -111,7 +111,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> if !init_data.missed_commands.is_empty() { ctx.activity(SendMessageToRunnerInput { runner_id: input.runner_id, - message: protocol::ToClient::ToClientCommands( + message: protocol::ToRunner::ToClientCommands( init_data.missed_commands, ), }) @@ -198,7 +198,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> ctx.activity(SendMessageToRunnerInput { runner_id: input.runner_id, - message: protocol::ToClient::ToClientAckEvents( + message: protocol::ToRunner::ToClientAckEvents( protocol::ToClientAckEvents { last_event_idx: state.last_event_ack_idx, }, @@ -271,7 +271,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> // Forward ctx.activity(SendMessageToRunnerInput { runner_id: input.runner_id, - message: protocol::ToClient::ToClientCommands(vec![ + message: protocol::ToRunner::ToClientCommands(vec![ protocol::CommandWrapper { index, inner: command.inner, @@ -359,7 +359,7 @@ pub async fn pegboard_runner(ctx: &mut WorkflowCtx, input: &Input) -> Result<()> // Close websocket connection (its unlikely to be open) ctx.activity(SendMessageToRunnerInput { runner_id: input.runner_id, - message: protocol::ToClient::ToClientClose, + message: protocol::ToRunner::ToClientClose, }) .await?; @@ -619,6 +619,8 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> Result<()> { workflow_id: ctx.workflow_id(), remaining_slots, total_slots: input.total_slots, + // We default here because its not important for mk1 protocol runners + protocol_version: PROTOCOL_MK1_VERSION, }, )?; @@ -1064,6 +1066,7 @@ pub(crate) async fn allocate_pending_actors( workflow_id: old_runner_alloc_key_data.workflow_id, remaining_slots: new_remaining_slots, total_slots: old_runner_alloc_key_data.total_slots, + protocol_version: old_runner_alloc_key_data.protocol_version, }, )?; @@ -1120,7 +1123,7 @@ pub(crate) async fn allocate_pending_actors( #[derive(Debug, Serialize, Deserialize, Hash)] struct SendMessageToRunnerInput { runner_id: Id, - message: protocol::ToClient, + message: protocol::ToRunner, } #[activity(SendMessageToRunner)] @@ -1128,7 +1131,7 @@ async fn send_message_to_runner(ctx: &ActivityCtx, input: &SendMessageToRunnerIn let receiver_subject = crate::pubsub_subjects::RunnerReceiverSubject::new(input.runner_id).to_string(); - let message_serialized = versioned::ToClient::wrap_latest(input.message.clone()) + let message_serialized = versioned::ToRunner::wrap_latest(input.message.clone()) .serialize_with_embedded_version(PROTOCOL_VERSION)?; ctx.ups()? diff --git a/engine/packages/pegboard/src/workflows/runner2.rs b/engine/packages/pegboard/src/workflows/runner2.rs index bea5097432..375b1d1baf 100644 --- a/engine/packages/pegboard/src/workflows/runner2.rs +++ b/engine/packages/pegboard/src/workflows/runner2.rs @@ -23,17 +23,13 @@ pub struct Input { pub key: String, pub version: u32, pub total_slots: u32, + pub protocol_version: u16, } #[derive(Debug, Serialize, Deserialize)] struct State { namespace_id: Id, create_ts: i64, - - last_event_idx: i64, - last_command_idx: i64, - commands: Vec, - // events: Vec, } impl State { @@ -41,9 +37,6 @@ impl State { State { namespace_id, create_ts, - last_event_idx: -1, - last_command_idx: -1, - commands: Vec::new(), } } } @@ -72,258 +65,77 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() async move { let runner_lost_threshold = ctx.config().pegboard().runner_lost_threshold(); - // Batch fetch pending signals - let signals = ctx - .listen_n_with_timeout::
(runner_lost_threshold, 1024) - .await?; - - if signals.is_empty() { - let expired = ctx - .activity(CheckExpiredInput { - runner_id: input.runner_id, - }) - .await?; - - if state.draining || expired { - return Ok(Loop::Break(())); - } else { - return Ok(Loop::Continue); - } - } + match ctx + .listen_with_timeout::
(runner_lost_threshold) + .await? + { + Some(Main::Init(_)) => { + if !state.draining { + ctx.activity(InsertDbInput { + runner_id: input.runner_id, + namespace_id: input.namespace_id, + name: input.name.clone(), + key: input.key.clone(), + version: input.version, + total_slots: input.total_slots, + create_ts: ctx.create_ts(), + }) + .await?; - let mut init = None; - let mut events = Vec::new(); - let mut commands = Vec::new(); - let mut ack_last_command_idx = -1; - let mut check_queue = false; - - // Batch process signals - for signal in signals { - match signal { - Main::Forward(sig) => { - match sig.inner { - protocol::ToServer::ToServerInit(init_sig) => { - init = Some(init_sig); - check_queue = true; - } - protocol::ToServer::ToServerEvents(new_events) => { - // Ignore events that were already received - events.extend( - new_events - .into_iter() - .filter(|event| event.index > state.last_event_idx), - ); - } - protocol::ToServer::ToServerAckCommands( - protocol::ToServerAckCommands { last_command_idx }, - ) => { - ack_last_command_idx = ack_last_command_idx.max(last_command_idx); - } - protocol::ToServer::ToServerStopping => { - handle_stopping(ctx, &input, state, false).await?; - } - protocol::ToServer::ToServerPing(_) - | protocol::ToServer::ToServerKvRequest(_) - | protocol::ToServer::ToServerTunnelMessage(_) => { - bail!( - "received message that should not be sent to runner workflow: {:?}", - sig.inner - ) - } - } - } - Main::Command(command) => { - // If draining, ignore start actor command and inform the actor wf that it is lost - if let ( - protocol::Command::CommandStartActor(protocol::CommandStartActor { - actor_id, - generation, - .. - }), - true, - ) = (&command.inner, state.draining) - { - tracing::warn!( - ?actor_id, - "attempt to schedule actor to draining runner, reallocating" - ); - - let res = ctx - .signal(crate::workflows::actor::Lost { - generation: *generation, - // Because this is a race condition, we want the actor to reschedule - // regardless of its crash policy - force_reschedule: true, - reset_rescheduling: true, - }) + // Check for pending actors + let res = ctx + .activity(AllocatePendingActorsInput { + namespace_id: input.namespace_id, + name: input.name.clone(), + }) + .await?; + + // Dispatch pending allocs + for alloc in res.allocations { + ctx.signal(alloc.signal) .to_workflow::() - .tag("actor_id", actor_id) - .graceful_not_found() + .tag("actor_id", alloc.actor_id) .send() .await?; - if res.is_none() { - tracing::warn!( - ?actor_id, - "actor workflow not found, likely already stopped" - ); - } - } else { - commands.push(command.inner); } } - Main::CheckQueue(_) => { - check_queue = true; - } - Main::Stop(sig) => { - handle_stopping(ctx, &input, state, sig.reset_actor_rescheduling).await?; - } - } - } - - let mut messages = Vec::new(); - - // Process init packet - if let Some(protocol::ToServerInit { - last_command_idx, - prepopulate_actor_names, - metadata, - .. - }) = init - { - let init_data = ctx - .activity(ProcessInitInput { - runner_id: input.runner_id, - namespace_id: input.namespace_id, - last_command_idx: last_command_idx.unwrap_or(-1), - prepopulate_actor_names, - metadata, - }) - .await?; - - messages.push(protocol::ToClient::ToClientInit(protocol::ToClientInit { - runner_id: input.runner_id.to_string(), - last_event_idx: init_data.last_event_idx, - metadata: protocol::ProtocolMetadata { - runner_lost_threshold: runner_lost_threshold, - }, - })); - - // Send missed commands - if !init_data.missed_commands.is_empty() { - messages.push(protocol::ToClient::ToClientCommands( - init_data.missed_commands, - )); - } - - if !state.draining { - ctx.activity(InsertDbInput { - runner_id: input.runner_id, - namespace_id: input.namespace_id, - name: input.name.clone(), - key: input.key.clone(), - version: input.version, - total_slots: input.total_slots, - create_ts: ctx.create_ts(), - }) - .await?; } - } + Some(Main::CheckQueue(_)) => { + // Check for pending actors + let res = ctx + .activity(AllocatePendingActorsInput { + namespace_id: input.namespace_id, + name: input.name.clone(), + }) + .await?; - let last_event_idx = events.last().map(|event| event.index); - - // NOTE: This should not be parallelized because signals should be sent in order - // Forward events to actor workflows - for event in &events { - let actor_id = crate::utils::event_actor_id(&event.inner).to_string(); - let res = ctx - .signal(crate::workflows::actor::Event { - inner: event.inner.clone(), - }) - .to_workflow::() - .tag("actor_id", &actor_id) - .graceful_not_found() - .send() - .await?; - if res.is_none() { - tracing::warn!( - ?actor_id, - "actor workflow not found, likely already stopped" - ); + // Dispatch pending allocs + for alloc in res.allocations { + ctx.signal(alloc.signal) + .to_workflow::() + .tag("actor_id", alloc.actor_id) + .send() + .await?; + } } - } - - // If events is not empty, insert and ack - if let Some(last_event_idx) = last_event_idx { - ctx.activity(InsertEventsInput { events }).await?; - - state.last_event_idx = last_event_idx; - - // Ack events in batch - if last_event_idx - > state - .last_event_ack_idx - .saturating_add(EVENT_ACK_BATCH_SIZE) - { - state.last_event_ack_idx = last_event_idx; - - messages.push(protocol::ToClient::ToClientAckEvents( - protocol::ToClientAckEvents { - last_event_idx: state.last_event_ack_idx, - }, - )); + Some(Main::Stop(sig)) => { + handle_stopping(ctx, &input, state, sig.reset_actor_rescheduling).await?; } - } - - // Insert/ack commands - let batch_start_index = if ack_last_command_idx != -1 || !commands.is_empty() { - ctx.activity(InsertCommandsInput { - commands: commands.clone(), - ack_last_command_idx, - }) - .await? - } else { - None - }; - - // If commands were present during this loop iteration, forward to runner - if let Some(batch_start_index) = batch_start_index { - messages.push(protocol::ToClient::ToClientCommands( - commands - .into_iter() - .enumerate() - .map(|(index, cmd)| protocol::CommandWrapper { - index: batch_start_index + index as i64, - inner: cmd, + None => { + let expired = ctx + .activity(CheckExpiredInput { + runner_id: input.runner_id, }) - .collect(), - )); - } - - if check_queue { - // Check for pending actors - let res = ctx - .activity(AllocatePendingActorsInput { - namespace_id: input.namespace_id, - name: input.name.clone(), - }) - .await?; - - // Dispatch pending allocs - for alloc in res.allocations { - ctx.signal(alloc.signal) - .to_workflow::() - .tag("actor_id", alloc.actor_id) - .send() .await?; - } - } - if !messages.is_empty() { - ctx.activity(SendMessagesToRunnerInput { - runner_id: input.runner_id, - messages, - }) - .await?; + // TODO: Periodically ack events + + if state.draining || expired { + return Ok(Loop::Break(())); + } else { + return Ok(Loop::Continue); + } + } } Ok(Loop::Continue) @@ -370,7 +182,7 @@ pub async fn pegboard_runner2(ctx: &mut WorkflowCtx, input: &Input) -> Result<() // Close websocket connection (its unlikely to be open) ctx.activity(SendMessagesToRunnerInput { runner_id: input.runner_id, - messages: vec![protocol::ToClient::ToClientClose], + messages: vec![protocol::ToRunner::ToRunnerClose], }) .await?; @@ -424,17 +236,11 @@ async fn handle_stopping( #[derive(Debug, Serialize, Deserialize)] struct LifecycleState { draining: bool, - last_event_idx: i64, - last_event_ack_idx: i64, } impl LifecycleState { fn new() -> Self { - LifecycleState { - draining: false, - last_event_idx: -1, - last_event_ack_idx: -1, - } + LifecycleState { draining: false } } } @@ -447,14 +253,8 @@ struct InitInput { create_ts: i64, } -#[derive(Debug, Serialize, Deserialize)] -struct InitOutput { - /// Deprecated. - evict_workflow_id: Option, -} - #[activity(Init)] -async fn init(ctx: &ActivityCtx, input: &InitInput) -> Result { +async fn init(ctx: &ActivityCtx, input: &InitInput) -> Result<()> { let mut state = ctx.state::>()?; *state = Some(State::new(input.namespace_id, input.create_ts)); @@ -483,9 +283,7 @@ async fn init(ctx: &ActivityCtx, input: &InitInput) -> Result { .custom_instrument(tracing::info_span!("runner_init_tx")) .await?; - Ok(InitOutput { - evict_workflow_id: None, - }) + Ok(()) } #[derive(Debug, Serialize, Deserialize, Hash)] @@ -494,6 +292,7 @@ struct InsertDbInput { namespace_id: Id, name: String, key: String, + protocol_version: u16, version: u32, total_slots: u32, create_ts: i64, @@ -630,6 +429,7 @@ async fn insert_db(ctx: &ActivityCtx, input: &InsertDbInput) -> Result<()> { workflow_id: ctx.workflow_id(), remaining_slots, total_slots: input.total_slots, + protocol_version: input.protocol_version, }, )?; @@ -718,152 +518,6 @@ async fn clear_db(ctx: &ActivityCtx, input: &ClearDbInput) -> Result<()> { Ok(()) } -#[derive(Debug, Serialize, Deserialize, Hash)] -struct ProcessInitInput { - runner_id: Id, - namespace_id: Id, - last_command_idx: i64, - prepopulate_actor_names: Option>, - metadata: Option, -} - -#[derive(Debug, Serialize, Deserialize)] -struct ProcessInitOutput { - last_event_idx: i64, - missed_commands: Vec, -} - -#[activity(ProcessInit)] -async fn process_init(ctx: &ActivityCtx, input: &ProcessInitInput) -> Result { - let state = ctx.state::()?; - - ctx.udb()? - .run(|tx| async move { - let tx = tx.with_subspace(keys::subspace()); - - // Populate actor names if provided - if let Some(actor_names) = &input.prepopulate_actor_names { - // Write each actor name into the namespace actor names list - for (name, data) in actor_names { - let metadata = - serde_json::from_str::>( - &data.metadata, - ) - .unwrap_or_default(); - - tx.write( - &keys::ns::ActorNameKey::new(input.namespace_id, name.clone()), - ActorNameKeyData { metadata }, - )?; - } - } - - if let Some(metadata) = &input.metadata { - let metadata = MetadataKeyData { - metadata: serde_json::from_str::>( - &metadata, - ) - .unwrap_or_default(), - }; - - let metadata_key = keys::runner::MetadataKey::new(input.runner_id); - - // Clear old metadata - tx.delete_key_subspace(&metadata_key); - - // Write metadata - for (i, chunk) in metadata_key.split(metadata)?.into_iter().enumerate() { - let chunk_key = metadata_key.chunk(i); - - tx.set(&tx.pack(&chunk_key), &chunk); - } - } - - Ok(()) - }) - .custom_instrument(tracing::info_span!("runner_process_init_tx")) - .await?; - - Ok(ProcessInitOutput { - last_event_idx: state.last_event_idx, - missed_commands: state - .commands - .iter() - .filter(|row| row.index > input.last_command_idx) - .map(|row| protocol::CommandWrapper { - index: row.index, - inner: row.command.clone(), - }) - .collect(), - }) -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -struct InsertEventsInput { - events: Vec, -} - -#[activity(InsertEvents)] -async fn insert_events(ctx: &ActivityCtx, input: &InsertEventsInput) -> Result<()> { - let last_event_idx = if let Some(last_event_wrapper) = input.events.last() { - last_event_wrapper.index - } else { - return Ok(()); - }; - - let mut state = ctx.state::()?; - - // TODO: Storing events is disabled for now, otherwise state will grow indefinitely. This is only used - // for debugging anyway - // state.events.extend(input.events.into_iter().enumerate().map(|(i, event)| EventRow { - // index: event.index, - // event: event.inner, - // ack_ts: util::timestamp::now(), - // })); - - state.last_event_idx = last_event_idx; - - Ok(()) -} - -#[derive(Debug, Serialize, Deserialize, Hash)] -struct InsertCommandsInput { - commands: Vec, - ack_last_command_idx: i64, -} - -#[activity(InsertCommands)] -async fn insert_commands(ctx: &ActivityCtx, input: &InsertCommandsInput) -> Result> { - let mut state = ctx.state::()?; - - if input.ack_last_command_idx != -1 { - state - .commands - .retain(|row| row.index > input.ack_last_command_idx); - } - - if input.commands.is_empty() { - return Ok(None); - } - - let old_last_command_idx = state.last_command_idx; - state.commands.extend( - input - .commands - .iter() - .enumerate() - .map(|(i, command)| CommandRow { - index: old_last_command_idx + i as i64 + 1, - command: command.clone(), - create_ts: util::timestamp::now(), - }), - ); - - state.last_command_idx += input.commands.len() as i64; - - Ok(Some(old_last_command_idx + 1)) -} - #[derive(Debug, Serialize, Deserialize, Hash)] struct FetchRemainingActorsInput { runner_id: Id, @@ -1067,6 +721,7 @@ pub(crate) async fn allocate_pending_actors( workflow_id: old_runner_alloc_key_data.workflow_id, remaining_slots: new_remaining_slots, total_slots: old_runner_alloc_key_data.total_slots, + protocol_version: old_runner_alloc_key_data.protocol_version, }, )?; @@ -1123,7 +778,7 @@ pub(crate) async fn allocate_pending_actors( #[derive(Debug, Serialize, Deserialize, Hash)] struct SendMessagesToRunnerInput { runner_id: Id, - messages: Vec, + messages: Vec, } #[activity(SendMessagesToRunner)] @@ -1135,7 +790,7 @@ async fn send_messages_to_runner( crate::pubsub_subjects::RunnerReceiverSubject::new(input.runner_id).to_string(); for message in &input.messages { - let message_serialized = versioned::ToClient::wrap_latest(message.clone()) + let message_serialized = versioned::ToRunner::wrap_latest(message.clone()) .serialize_with_embedded_version(PROTOCOL_VERSION)?; ctx.ups()? @@ -1154,18 +809,12 @@ pub struct Stop { pub reset_actor_rescheduling: bool, } -#[signal("pegboard_runner_command")] -pub struct Command { - pub inner: protocol::Command, -} - #[signal("pegboard_runner_forward")] pub struct Forward { pub inner: protocol::ToServer, } join_signal!(Main { - Command(Command), // Forwarded from the ws to this workflow Forward(Forward), CheckQueue, diff --git a/engine/sdks/rust/data/src/converted.rs b/engine/sdks/rust/data/src/converted.rs index 4c956ffda9..bfed1e8100 100644 --- a/engine/sdks/rust/data/src/converted.rs +++ b/engine/sdks/rust/data/src/converted.rs @@ -9,26 +9,28 @@ pub struct RunnerAllocIdxKeyData { pub total_slots: u32, } -impl TryFrom for RunnerAllocIdxKeyData { +impl TryFrom for RunnerAllocIdxKeyData { type Error = anyhow::Error; - fn try_from(value: pegboard_namespace_runner_alloc_idx_v1::Data) -> Result { + fn try_from(value: pegboard_namespace_runner_alloc_idx_v2::Data) -> Result { Ok(RunnerAllocIdxKeyData { workflow_id: Id::from_slice(&value.workflow_id)?, remaining_slots: value.remaining_slots, total_slots: value.total_slots, + protocol_version: value.protocol_version, }) } } -impl TryFrom for pegboard_namespace_runner_alloc_idx_v1::Data { +impl TryFrom for pegboard_namespace_runner_alloc_idx_v2::Data { type Error = anyhow::Error; fn try_from(value: RunnerAllocIdxKeyData) -> Result { - Ok(pegboard_namespace_runner_alloc_idx_v1::Data { + Ok(pegboard_namespace_runner_alloc_idx_v2::Data { workflow_id: value.workflow_id.as_bytes(), remaining_slots: value.remaining_slots, total_slots: value.total_slots, + protocol_version: value.protocol_version, }) } } diff --git a/engine/sdks/rust/data/src/lib.rs b/engine/sdks/rust/data/src/lib.rs index becd383e30..5f2690d4ae 100644 --- a/engine/sdks/rust/data/src/lib.rs +++ b/engine/sdks/rust/data/src/lib.rs @@ -5,7 +5,7 @@ pub mod versioned; pub const PEGBOARD_RUNNER_ADDRESS_VERSION: u16 = 1; pub const PEGBOARD_RUNNER_METADATA_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_BY_KEY_VERSION: u16 = 1; -pub const PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION: u16 = 1; +pub const PEGBOARD_NAMESPACE_RUNNER_ALLOC_IDX_VERSION: u16 = 2; pub const PEGBOARD_NAMESPACE_RUNNER_CONFIG_VERSION: u16 = 2; pub const PEGBOARD_NAMESPACE_RUNNER_BY_KEY_VERSION: u16 = 1; pub const PEGBOARD_NAMESPACE_ACTOR_NAME_VERSION: u16 = 1; diff --git a/engine/sdks/rust/data/src/versioned/mod.rs b/engine/sdks/rust/data/src/versioned/mod.rs index f6814cad32..b70d52f0f8 100644 --- a/engine/sdks/rust/data/src/versioned/mod.rs +++ b/engine/sdks/rust/data/src/versioned/mod.rs @@ -9,18 +9,19 @@ pub use namespace_runner_config::*; pub enum RunnerAllocIdxKeyData { V1(pegboard_namespace_runner_alloc_idx_v1::Data), + V2(pegboard_namespace_runner_alloc_idx_v2::Data), } impl OwnedVersionedData for RunnerAllocIdxKeyData { - type Latest = pegboard_namespace_runner_alloc_idx_v1::Data; + type Latest = pegboard_namespace_runner_alloc_idx_v2::Data; - fn wrap_latest(latest: pegboard_namespace_runner_alloc_idx_v1::Data) -> Self { - RunnerAllocIdxKeyData::V1(latest) + fn wrap_latest(latest: pegboard_namespace_runner_alloc_idx_v2::Data) -> Self { + RunnerAllocIdxKeyData::V2(latest) } fn unwrap_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let RunnerAllocIdxKeyData::V1(data) = self { + if let RunnerAllocIdxKeyData::V2(data) = self { Ok(data) } else { bail!("version not latest"); @@ -30,6 +31,7 @@ impl OwnedVersionedData for RunnerAllocIdxKeyData { fn deserialize_version(payload: &[u8], version: u16) -> Result { match version { 1 => Ok(RunnerAllocIdxKeyData::V1(serde_bare::from_slice(payload)?)), + 2 => Ok(RunnerAllocIdxKeyData::V2(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } @@ -37,8 +39,17 @@ impl OwnedVersionedData for RunnerAllocIdxKeyData { fn serialize_version(self, _version: u16) -> Result> { match self { RunnerAllocIdxKeyData::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), + RunnerAllocIdxKeyData::V2(data) => serde_bare::to_vec(&data).map_err(Into::into), } } + + fn deserialize_converters() -> Vec Result> { + vec![Self::v1_to_v2] + } + + fn serialize_converters() -> Vec Result> { + vec![Self::v2_to_v1] + } } pub enum MetadataKeyData { diff --git a/engine/sdks/rust/runner-protocol/src/lib.rs b/engine/sdks/rust/runner-protocol/src/lib.rs index 2c0151c639..b71d991358 100644 --- a/engine/sdks/rust/runner-protocol/src/lib.rs +++ b/engine/sdks/rust/runner-protocol/src/lib.rs @@ -4,5 +4,11 @@ pub mod versioned; // Re-export latest pub use generated::v3::*; +pub use generated::v4 as mk2; -pub const PROTOCOL_VERSION: u16 = 3; +pub const PROTOCOL_MK1_VERSION: u16 = 3; +pub const PROTOCOL_MK2_VERSION: u16 = 4; + +pub fn is_mk2(protocol_version: u16) -> bool { + protocol_version > PROTOCOL_MK1_VERSION +} diff --git a/engine/sdks/rust/runner-protocol/src/versioned.rs b/engine/sdks/rust/runner-protocol/src/versioned.rs index aa4dfae195..ae7483809a 100644 --- a/engine/sdks/rust/runner-protocol/src/versioned.rs +++ b/engine/sdks/rust/runner-protocol/src/versioned.rs @@ -1,7 +1,109 @@ use anyhow::{Ok, Result, bail}; use vbare::OwnedVersionedData; -use crate::generated::{v1, v2, v3}; +use crate::generated::{v1, v2, v3, v4}; + +pub enum ToClientMk2 { + V4(v4::ToClient), +} + +impl OwnedVersionedData for ToClientMk2 { + type Latest = v4::ToClient; + + fn wrap_latest(latest: v4::ToClient) -> Self { + ToClientMk2::V4(latest) + } + + fn unwrap_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let ToClientMk2::V4(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 4 => Ok(ToClientMk2::V4(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + ToClientMk2::V4(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } +} + +pub enum ToServerMk2 { + V4(v4::ToServer), +} + +impl OwnedVersionedData for ToServerMk2 { + type Latest = v4::ToServer; + + fn wrap_latest(latest: v4::ToServer) -> Self { + ToServerMk2::V4(latest) + } + + fn unwrap_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let ToServerMk2::V4(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 4 => Ok(ToServerMk2::V4(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + ToServerMk2::V4(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } +} + +pub enum ToRunnerMk2 { + V4(v4::ToRunner), +} + +impl OwnedVersionedData for ToRunnerMk2 { + type Latest = v4::ToRunner; + + fn wrap_latest(latest: v4::ToRunner) -> Self { + ToRunnerMk2::V4(latest) + } + + fn unwrap_latest(self) -> Result { + #[allow(irrefutable_let_patterns)] + if let ToRunnerMk2::V4(data) = self { + Ok(data) + } else { + bail!("version not latest"); + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 4 => Ok(ToRunnerMk2::V4(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + ToRunnerMk2::V4(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } +} pub enum ToClient { V1(v1::ToClient), @@ -697,20 +799,20 @@ impl OwnedVersionedData for ToRunner { } pub enum ToGateway { - // No change between v1 and v3 - V3(v3::ToGateway), + // No change between v1 and v4 + V4(v4::ToGateway), } impl OwnedVersionedData for ToGateway { - type Latest = v3::ToGateway; + type Latest = v4::ToGateway; - fn wrap_latest(latest: v3::ToGateway) -> Self { - ToGateway::V3(latest) + fn wrap_latest(latest: v4::ToGateway) -> Self { + ToGateway::V4(latest) } fn unwrap_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let ToGateway::V3(data) = self { + if let ToGateway::V4(data) = self { Ok(data) } else { bail!("version not latest"); @@ -719,33 +821,33 @@ impl OwnedVersionedData for ToGateway { fn deserialize_version(payload: &[u8], version: u16) -> Result { match version { - 1 | 2 | 3 => Ok(ToGateway::V3(serde_bare::from_slice(payload)?)), + 1 | 2 | 4 => Ok(ToGateway::V4(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } fn serialize_version(self, _version: u16) -> Result> { match self { - ToGateway::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), + ToGateway::V4(data) => serde_bare::to_vec(&data).map_err(Into::into), } } } pub enum ToServerlessServer { - // No change between v1 and v3 - V3(v3::ToServerlessServer), + // No change between v1 and v4 + V4(v4::ToServerlessServer), } impl OwnedVersionedData for ToServerlessServer { - type Latest = v3::ToServerlessServer; + type Latest = v4::ToServerlessServer; - fn wrap_latest(latest: v3::ToServerlessServer) -> Self { - ToServerlessServer::V3(latest) + fn wrap_latest(latest: v4::ToServerlessServer) -> Self { + ToServerlessServer::V4(latest) } fn unwrap_latest(self) -> Result { #[allow(irrefutable_let_patterns)] - if let ToServerlessServer::V3(data) = self { + if let ToServerlessServer::V4(data) = self { Ok(data) } else { bail!("version not latest"); @@ -754,14 +856,14 @@ impl OwnedVersionedData for ToServerlessServer { fn deserialize_version(payload: &[u8], version: u16) -> Result { match version { - 1 | 2 | 3 => Ok(ToServerlessServer::V3(serde_bare::from_slice(payload)?)), + 1 | 2 | 3 | 4 => Ok(ToServerlessServer::V4(serde_bare::from_slice(payload)?)), _ => bail!("invalid version: {version}"), } } fn serialize_version(self, _version: u16) -> Result> { match self { - ToServerlessServer::V3(data) => serde_bare::to_vec(&data).map_err(Into::into), + ToServerlessServer::V4(data) => serde_bare::to_vec(&data).map_err(Into::into), } } } diff --git a/engine/sdks/schemas/data/pegboard.namespace.runner_alloc_idx.v2.bare b/engine/sdks/schemas/data/pegboard.namespace.runner_alloc_idx.v2.bare new file mode 100644 index 0000000000..388c6e8b49 --- /dev/null +++ b/engine/sdks/schemas/data/pegboard.namespace.runner_alloc_idx.v2.bare @@ -0,0 +1,8 @@ +type Id data + +type Data struct { + workflow_id: Id + remaining_slots: u32 + total_slots: u32 + protocol_version: u16 +} diff --git a/engine/sdks/schemas/runner-protocol/v4.bare b/engine/sdks/schemas/runner-protocol/v4.bare new file mode 100644 index 0000000000..e363bcbe1f --- /dev/null +++ b/engine/sdks/schemas/runner-protocol/v4.bare @@ -0,0 +1,431 @@ +# Runner Protocol v1 + +# MARK: Core Primitives + +type Id str +type Json str + +type GatewayId data[4] +type RequestId data[4] +type MessageIndex u16 + +# MARK: KV + +# Basic types +type KvKey data +type KvValue data +type KvMetadata struct { + version: data + # TODO: Rename to update_ts + createTs: i64 +} + +# Query types +type KvListAllQuery void +type KvListRangeQuery struct { + start: KvKey + end: KvKey + exclusive: bool +} + +type KvListPrefixQuery struct { + key: KvKey +} + +type KvListQuery union { + KvListAllQuery | + KvListRangeQuery | + KvListPrefixQuery +} + +# Request types +type KvGetRequest struct { + keys: list +} + +type KvListRequest struct { + query: KvListQuery + reverse: optional + limit: optional +} + +type KvPutRequest struct { + keys: list + values: list +} + +type KvDeleteRequest struct { + keys: list +} + +type KvDropRequest void + +# Response types +type KvErrorResponse struct { + message: str +} + +type KvGetResponse struct { + keys: list + values: list + metadata: list +} + +type KvListResponse struct { + keys: list + values: list + metadata: list +} + +type KvPutResponse void +type KvDeleteResponse void +type KvDropResponse void + +# Request/Response unions +type KvRequestData union { + KvGetRequest | + KvListRequest | + KvPutRequest | + KvDeleteRequest | + KvDropRequest +} + +type KvResponseData union { + KvErrorResponse | + KvGetResponse | + KvListResponse | + KvPutResponse | + KvDeleteResponse | + KvDropResponse +} + +# MARK: Actor + +# Core +type StopCode enum { + OK + ERROR +} + +type ActorName struct { + metadata: Json +} + +type ActorConfig struct { + name: str + key: optional + createTs: i64 + input: optional +} + +type ActorCheckpoint struct { + actorId: Id + index: i64 +} + +# Intent +type ActorIntentSleep void + +type ActorIntentStop void + +type ActorIntent union { + ActorIntentSleep | + ActorIntentStop +} + +# State +type ActorStateRunning void + +type ActorStateStopped struct { + code: StopCode + message: optional +} + +type ActorState union { + ActorStateRunning | + ActorStateStopped +} + +# MARK: Events +type EventActorIntent struct { + actorId: Id + generation: u32 + intent: ActorIntent +} + +type EventActorStateUpdate struct { + actorId: Id + generation: u32 + state: ActorState +} + +type EventActorSetAlarm struct { + actorId: Id + generation: u32 + alarmTs: optional +} + +type Event union { + EventActorIntent | + EventActorStateUpdate | + EventActorSetAlarm +} + +type EventWrapper struct { + checkpoint: ActorCheckpoint + inner: Event +} + +# MARK: Commands + +type HibernatingRequest struct { + gatewayId: GatewayId + requestId: RequestId +} + +type CommandStartActor struct { + generation: u32 + config: ActorConfig + hibernatingRequests: list +} + +type CommandStopActor struct { + generation: u32 +} + +type Command union { + CommandStartActor | + CommandStopActor +} + +type CommandWrapper struct { + checkpoint: ActorCheckpoint + inner: Command +} + +# MARK: Tunnel + +# Message ID + +type MessageIdParts struct { + # Globally unique ID + gatewayId: GatewayId + # Unique ID to the gateway + requestId: RequestId + # Unique ID to the request + messageIndex: MessageIndex +} + +type MessageId data[12] + +# HTTP +type ToClientRequestStart struct { + actorId: Id + method: str + path: str + headers: map + body: optional + stream: bool +} + +type ToClientRequestChunk struct { + body: data + finish: bool +} + +type ToClientRequestAbort void + +type ToServerResponseStart struct { + status: u16 + headers: map + body: optional + stream: bool +} + +type ToServerResponseChunk struct { + body: data + finish: bool +} + +type ToServerResponseAbort void + +# WebSocket +type ToClientWebSocketOpen struct { + actorId: Id + path: str + headers: map +} + +type ToClientWebSocketMessage struct { + data: data + binary: bool +} + +type ToClientWebSocketClose struct { + code: optional + reason: optional +} + +type ToServerWebSocketOpen struct { + canHibernate: bool +} + +type ToServerWebSocketMessage struct { + data: data + binary: bool +} + +type ToServerWebSocketMessageAck struct { + index: MessageIndex +} + +type ToServerWebSocketClose struct { + code: optional + reason: optional + hibernate: bool +} + +# To Server +type ToServerTunnelMessageKind union { + # HTTP + ToServerResponseStart | + ToServerResponseChunk | + ToServerResponseAbort | + + # WebSocket + ToServerWebSocketOpen | + ToServerWebSocketMessage | + ToServerWebSocketMessageAck | + ToServerWebSocketClose +} + +type ToServerTunnelMessage struct { + messageId: MessageId + messageKind: ToServerTunnelMessageKind +} + +# To Client +type ToClientTunnelMessageKind union { + # HTTP + ToClientRequestStart | + ToClientRequestChunk | + ToClientRequestAbort | + + # WebSocket + ToClientWebSocketOpen | + ToClientWebSocketMessage | + ToClientWebSocketClose +} + +type ToClientTunnelMessage struct { + messageId: MessageId + messageKind: ToClientTunnelMessageKind +} + +# MARK: To Server +type ToServerInit struct { + name: str + version: u32 + totalSlots: u32 + lastCommandCheckpoints: list + prepopulateActorNames: optional> + metadata: optional +} + +type ToServerEvents list + +type ToServerAckCommands struct { + lastCommandIdx: i64 +} + +type ToServerStopping void + +type ToServerPong struct { + ts: i64 +} + +type ToServerKvRequest struct { + actorId: Id + requestId: u32 + data: KvRequestData +} + +type ToServer union { + ToServerInit | + ToServerEvents | + ToServerAckCommands | + ToServerStopping | + ToServerPong | + ToServerKvRequest | + ToServerTunnelMessage +} + +# MARK: To Client +type ProtocolMetadata struct { + runnerLostThreshold: i64 +} + +type ToClientInit struct { + runnerId: Id + lastEventCheckpoints: list + metadata: ProtocolMetadata +} + +type ToClientCommands list + +type ToClientAckEvents struct { + lastEventCheckpoints: list +} + +type ToClientKvResponse struct { + requestId: u32 + data: KvResponseData +} + +type ToClient union { + ToClientInit | + ToClientCommands | + ToClientAckEvents | + ToClientKvResponse | + ToClientTunnelMessage | + ToClientPing +} + +# MARK: To Runner +type ToRunnerPing struct { + gatewayId: GatewayId + requestId: RequestId + ts: i64 +} + +type ToRunnerClose void + +# We have to re-declare the entire union since BARE will not generate the +# ser/de for ToClient if it's not a top-level type +type ToRunner union { + ToRunnerPing | + ToRunnerClose | + ToClientCommands | + ToClientAckEvents | + ToClientTunnelMessage +} + +# MARK: To Gateway +type ToGatewayPong struct { + requestId: RequestId + ts: i64 +} + +type ToGateway union { + ToGatewayPong | + ToServerTunnelMessage +} + +# MARK: Serverless +type ToServerlessServerInit struct { + runnerId: Id +} + +type ToServerlessServer union { + ToServerlessServerInit +} diff --git a/engine/sdks/typescript/runner-protocol/src/index.ts b/engine/sdks/typescript/runner-protocol/src/index.ts index f2db492e44..7a60c3a4c7 100644 --- a/engine/sdks/typescript/runner-protocol/src/index.ts +++ b/engine/sdks/typescript/runner-protocol/src/index.ts @@ -613,6 +613,23 @@ export function writeActorConfig(bc: bare.ByteCursor, x: ActorConfig): void { write6(bc, x.input) } +export type ActorCheckpoint = { + readonly actorId: Id + readonly index: i64 +} + +export function readActorCheckpoint(bc: bare.ByteCursor): ActorCheckpoint { + return { + actorId: readId(bc), + index: bare.readI64(bc), + } +} + +export function writeActorCheckpoint(bc: bare.ByteCursor, x: ActorCheckpoint): void { + writeId(bc, x.actorId) + bare.writeI64(bc, x.index) +} + /** * Intent */ @@ -824,19 +841,19 @@ export function writeEvent(bc: bare.ByteCursor, x: Event): void { } export type EventWrapper = { - readonly index: i64 + readonly checkpoint: ActorCheckpoint readonly inner: Event } export function readEventWrapper(bc: bare.ByteCursor): EventWrapper { return { - index: bare.readI64(bc), + checkpoint: readActorCheckpoint(bc), inner: readEvent(bc), } } export function writeEventWrapper(bc: bare.ByteCursor, x: EventWrapper): void { - bare.writeI64(bc, x.index) + writeActorCheckpoint(bc, x.checkpoint) writeEvent(bc, x.inner) } @@ -877,7 +894,6 @@ function write8(bc: bare.ByteCursor, x: readonly HibernatingRequest[]): void { } export type CommandStartActor = { - readonly actorId: Id readonly generation: u32 readonly config: ActorConfig readonly hibernatingRequests: readonly HibernatingRequest[] @@ -885,7 +901,6 @@ export type CommandStartActor = { export function readCommandStartActor(bc: bare.ByteCursor): CommandStartActor { return { - actorId: readId(bc), generation: bare.readU32(bc), config: readActorConfig(bc), hibernatingRequests: read8(bc), @@ -893,26 +908,22 @@ export function readCommandStartActor(bc: bare.ByteCursor): CommandStartActor { } export function writeCommandStartActor(bc: bare.ByteCursor, x: CommandStartActor): void { - writeId(bc, x.actorId) bare.writeU32(bc, x.generation) writeActorConfig(bc, x.config) write8(bc, x.hibernatingRequests) } export type CommandStopActor = { - readonly actorId: Id readonly generation: u32 } export function readCommandStopActor(bc: bare.ByteCursor): CommandStopActor { return { - actorId: readId(bc), generation: bare.readU32(bc), } } export function writeCommandStopActor(bc: bare.ByteCursor, x: CommandStopActor): void { - writeId(bc, x.actorId) bare.writeU32(bc, x.generation) } @@ -951,19 +962,19 @@ export function writeCommand(bc: bare.ByteCursor, x: Command): void { } export type CommandWrapper = { - readonly index: i64 + readonly checkpoint: ActorCheckpoint readonly inner: Command } export function readCommandWrapper(bc: bare.ByteCursor): CommandWrapper { return { - index: bare.readI64(bc), + checkpoint: readActorCheckpoint(bc), inner: readCommand(bc), } } export function writeCommandWrapper(bc: bare.ByteCursor, x: CommandWrapper): void { - bare.writeI64(bc, x.index) + writeActorCheckpoint(bc, x.checkpoint) writeCommand(bc, x.inner) } @@ -1026,8 +1037,6 @@ export function writeMessageId(bc: bare.ByteCursor, x: MessageId): void { bare.writeFixedData(bc, x) } -export type DeprecatedTunnelAck = null - function read9(bc: bare.ByteCursor): ReadonlyMap { const len = bare.readUintSafe(bc) const result = new Map() @@ -1281,7 +1290,6 @@ export function writeToServerWebSocketClose(bc: bare.ByteCursor, x: ToServerWebS * To Server */ export type ToServerTunnelMessageKind = - | { readonly tag: "DeprecatedTunnelAck"; readonly val: DeprecatedTunnelAck } /** * HTTP */ @@ -1301,20 +1309,18 @@ export function readToServerTunnelMessageKind(bc: bare.ByteCursor): ToServerTunn const tag = bare.readU8(bc) switch (tag) { case 0: - return { tag: "DeprecatedTunnelAck", val: null } - case 1: return { tag: "ToServerResponseStart", val: readToServerResponseStart(bc) } - case 2: + case 1: return { tag: "ToServerResponseChunk", val: readToServerResponseChunk(bc) } - case 3: + case 2: return { tag: "ToServerResponseAbort", val: null } - case 4: + case 3: return { tag: "ToServerWebSocketOpen", val: readToServerWebSocketOpen(bc) } - case 5: + case 4: return { tag: "ToServerWebSocketMessage", val: readToServerWebSocketMessage(bc) } - case 6: + case 5: return { tag: "ToServerWebSocketMessageAck", val: readToServerWebSocketMessageAck(bc) } - case 7: + case 6: return { tag: "ToServerWebSocketClose", val: readToServerWebSocketClose(bc) } default: { bc.offset = offset @@ -1325,41 +1331,37 @@ export function readToServerTunnelMessageKind(bc: bare.ByteCursor): ToServerTunn export function writeToServerTunnelMessageKind(bc: bare.ByteCursor, x: ToServerTunnelMessageKind): void { switch (x.tag) { - case "DeprecatedTunnelAck": { - bare.writeU8(bc, 0) - break - } case "ToServerResponseStart": { - bare.writeU8(bc, 1) + bare.writeU8(bc, 0) writeToServerResponseStart(bc, x.val) break } case "ToServerResponseChunk": { - bare.writeU8(bc, 2) + bare.writeU8(bc, 1) writeToServerResponseChunk(bc, x.val) break } case "ToServerResponseAbort": { - bare.writeU8(bc, 3) + bare.writeU8(bc, 2) break } case "ToServerWebSocketOpen": { - bare.writeU8(bc, 4) + bare.writeU8(bc, 3) writeToServerWebSocketOpen(bc, x.val) break } case "ToServerWebSocketMessage": { - bare.writeU8(bc, 5) + bare.writeU8(bc, 4) writeToServerWebSocketMessage(bc, x.val) break } case "ToServerWebSocketMessageAck": { - bare.writeU8(bc, 6) + bare.writeU8(bc, 5) writeToServerWebSocketMessageAck(bc, x.val) break } case "ToServerWebSocketClose": { - bare.writeU8(bc, 7) + bare.writeU8(bc, 6) writeToServerWebSocketClose(bc, x.val) break } @@ -1387,7 +1389,6 @@ export function writeToServerTunnelMessage(bc: bare.ByteCursor, x: ToServerTunne * To Client */ export type ToClientTunnelMessageKind = - | { readonly tag: "DeprecatedTunnelAck"; readonly val: DeprecatedTunnelAck } /** * HTTP */ @@ -1406,18 +1407,16 @@ export function readToClientTunnelMessageKind(bc: bare.ByteCursor): ToClientTunn const tag = bare.readU8(bc) switch (tag) { case 0: - return { tag: "DeprecatedTunnelAck", val: null } - case 1: return { tag: "ToClientRequestStart", val: readToClientRequestStart(bc) } - case 2: + case 1: return { tag: "ToClientRequestChunk", val: readToClientRequestChunk(bc) } - case 3: + case 2: return { tag: "ToClientRequestAbort", val: null } - case 4: + case 3: return { tag: "ToClientWebSocketOpen", val: readToClientWebSocketOpen(bc) } - case 5: + case 4: return { tag: "ToClientWebSocketMessage", val: readToClientWebSocketMessage(bc) } - case 6: + case 5: return { tag: "ToClientWebSocketClose", val: readToClientWebSocketClose(bc) } default: { bc.offset = offset @@ -1428,36 +1427,32 @@ export function readToClientTunnelMessageKind(bc: bare.ByteCursor): ToClientTunn export function writeToClientTunnelMessageKind(bc: bare.ByteCursor, x: ToClientTunnelMessageKind): void { switch (x.tag) { - case "DeprecatedTunnelAck": { - bare.writeU8(bc, 0) - break - } case "ToClientRequestStart": { - bare.writeU8(bc, 1) + bare.writeU8(bc, 0) writeToClientRequestStart(bc, x.val) break } case "ToClientRequestChunk": { - bare.writeU8(bc, 2) + bare.writeU8(bc, 1) writeToClientRequestChunk(bc, x.val) break } case "ToClientRequestAbort": { - bare.writeU8(bc, 3) + bare.writeU8(bc, 2) break } case "ToClientWebSocketOpen": { - bare.writeU8(bc, 4) + bare.writeU8(bc, 3) writeToClientWebSocketOpen(bc, x.val) break } case "ToClientWebSocketMessage": { - bare.writeU8(bc, 5) + bare.writeU8(bc, 4) writeToClientWebSocketMessage(bc, x.val) break } case "ToClientWebSocketClose": { - bare.writeU8(bc, 6) + bare.writeU8(bc, 5) writeToClientWebSocketClose(bc, x.val) break } @@ -1481,7 +1476,26 @@ export function writeToClientTunnelMessage(bc: bare.ByteCursor, x: ToClientTunne writeToClientTunnelMessageKind(bc, x.messageKind) } -function read11(bc: bare.ByteCursor): ReadonlyMap { +function read11(bc: bare.ByteCursor): readonly ActorCheckpoint[] { + const len = bare.readUintSafe(bc) + if (len === 0) { + return [] + } + const result = [readActorCheckpoint(bc)] + for (let i = 1; i < len; i++) { + result[i] = readActorCheckpoint(bc) + } + return result +} + +function write11(bc: bare.ByteCursor, x: readonly ActorCheckpoint[]): void { + bare.writeUintSafe(bc, x.length) + for (let i = 0; i < x.length; i++) { + writeActorCheckpoint(bc, x[i]) + } +} + +function read12(bc: bare.ByteCursor): ReadonlyMap { const len = bare.readUintSafe(bc) const result = new Map() for (let i = 0; i < len; i++) { @@ -1496,7 +1510,7 @@ function read11(bc: bare.ByteCursor): ReadonlyMap { return result } -function write11(bc: bare.ByteCursor, x: ReadonlyMap): void { +function write12(bc: bare.ByteCursor, x: ReadonlyMap): void { bare.writeUintSafe(bc, x.size) for (const kv of x) { bare.writeString(bc, kv[0]) @@ -1504,22 +1518,22 @@ function write11(bc: bare.ByteCursor, x: ReadonlyMap): void { } } -function read12(bc: bare.ByteCursor): ReadonlyMap | null { - return bare.readBool(bc) ? read11(bc) : null +function read13(bc: bare.ByteCursor): ReadonlyMap | null { + return bare.readBool(bc) ? read12(bc) : null } -function write12(bc: bare.ByteCursor, x: ReadonlyMap | null): void { +function write13(bc: bare.ByteCursor, x: ReadonlyMap | null): void { bare.writeBool(bc, x != null) if (x != null) { - write11(bc, x) + write12(bc, x) } } -function read13(bc: bare.ByteCursor): Json | null { +function read14(bc: bare.ByteCursor): Json | null { return bare.readBool(bc) ? readJson(bc) : null } -function write13(bc: bare.ByteCursor, x: Json | null): void { +function write14(bc: bare.ByteCursor, x: Json | null): void { bare.writeBool(bc, x != null) if (x != null) { writeJson(bc, x) @@ -1533,7 +1547,7 @@ export type ToServerInit = { readonly name: string readonly version: u32 readonly totalSlots: u32 - readonly lastCommandIdx: i64 | null + readonly lastCommandCheckpoints: readonly ActorCheckpoint[] readonly prepopulateActorNames: ReadonlyMap | null readonly metadata: Json | null } @@ -1543,9 +1557,9 @@ export function readToServerInit(bc: bare.ByteCursor): ToServerInit { name: bare.readString(bc), version: bare.readU32(bc), totalSlots: bare.readU32(bc), - lastCommandIdx: read7(bc), - prepopulateActorNames: read12(bc), - metadata: read13(bc), + lastCommandCheckpoints: read11(bc), + prepopulateActorNames: read13(bc), + metadata: read14(bc), } } @@ -1553,9 +1567,9 @@ export function writeToServerInit(bc: bare.ByteCursor, x: ToServerInit): void { bare.writeString(bc, x.name) bare.writeU32(bc, x.version) bare.writeU32(bc, x.totalSlots) - write7(bc, x.lastCommandIdx) - write12(bc, x.prepopulateActorNames) - write13(bc, x.metadata) + write11(bc, x.lastCommandCheckpoints) + write13(bc, x.prepopulateActorNames) + write14(bc, x.metadata) } export type ToServerEvents = readonly EventWrapper[] @@ -1740,21 +1754,21 @@ export function writeProtocolMetadata(bc: bare.ByteCursor, x: ProtocolMetadata): export type ToClientInit = { readonly runnerId: Id - readonly lastEventIdx: i64 + readonly lastEventCheckpoints: readonly ActorCheckpoint[] readonly metadata: ProtocolMetadata } export function readToClientInit(bc: bare.ByteCursor): ToClientInit { return { runnerId: readId(bc), - lastEventIdx: bare.readI64(bc), + lastEventCheckpoints: read11(bc), metadata: readProtocolMetadata(bc), } } export function writeToClientInit(bc: bare.ByteCursor, x: ToClientInit): void { writeId(bc, x.runnerId) - bare.writeI64(bc, x.lastEventIdx) + write11(bc, x.lastEventCheckpoints) writeProtocolMetadata(bc, x.metadata) } @@ -1780,17 +1794,17 @@ export function writeToClientCommands(bc: bare.ByteCursor, x: ToClientCommands): } export type ToClientAckEvents = { - readonly lastEventIdx: i64 + readonly lastEventCheckpoints: readonly ActorCheckpoint[] } export function readToClientAckEvents(bc: bare.ByteCursor): ToClientAckEvents { return { - lastEventIdx: bare.readI64(bc), + lastEventCheckpoints: read11(bc), } } export function writeToClientAckEvents(bc: bare.ByteCursor, x: ToClientAckEvents): void { - bare.writeI64(bc, x.lastEventIdx) + write11(bc, x.lastEventCheckpoints) } export type ToClientKvResponse = { @@ -1810,11 +1824,8 @@ export function writeToClientKvResponse(bc: bare.ByteCursor, x: ToClientKvRespon writeKvResponseData(bc, x.data) } -export type ToClientClose = null - export type ToClient = | { readonly tag: "ToClientInit"; readonly val: ToClientInit } - | { readonly tag: "ToClientClose"; readonly val: ToClientClose } | { readonly tag: "ToClientCommands"; readonly val: ToClientCommands } | { readonly tag: "ToClientAckEvents"; readonly val: ToClientAckEvents } | { readonly tag: "ToClientKvResponse"; readonly val: ToClientKvResponse } @@ -1827,14 +1838,12 @@ export function readToClient(bc: bare.ByteCursor): ToClient { case 0: return { tag: "ToClientInit", val: readToClientInit(bc) } case 1: - return { tag: "ToClientClose", val: null } - case 2: return { tag: "ToClientCommands", val: readToClientCommands(bc) } - case 3: + case 2: return { tag: "ToClientAckEvents", val: readToClientAckEvents(bc) } - case 4: + case 3: return { tag: "ToClientKvResponse", val: readToClientKvResponse(bc) } - case 5: + case 4: return { tag: "ToClientTunnelMessage", val: readToClientTunnelMessage(bc) } default: { bc.offset = offset @@ -1850,27 +1859,23 @@ export function writeToClient(bc: bare.ByteCursor, x: ToClient): void { writeToClientInit(bc, x.val) break } - case "ToClientClose": { - bare.writeU8(bc, 1) - break - } case "ToClientCommands": { - bare.writeU8(bc, 2) + bare.writeU8(bc, 1) writeToClientCommands(bc, x.val) break } case "ToClientAckEvents": { - bare.writeU8(bc, 3) + bare.writeU8(bc, 2) writeToClientAckEvents(bc, x.val) break } case "ToClientKvResponse": { - bare.writeU8(bc, 4) + bare.writeU8(bc, 3) writeToClientKvResponse(bc, x.val) break } case "ToClientTunnelMessage": { - bare.writeU8(bc, 5) + bare.writeU8(bc, 4) writeToClientTunnelMessage(bc, x.val) break } @@ -1919,17 +1924,17 @@ export function writeToRunnerPing(bc: bare.ByteCursor, x: ToRunnerPing): void { bare.writeI64(bc, x.ts) } +export type ToRunnerClose = null + /** * We have to re-declare the entire union since BARE will not generate the * ser/de for ToClient if it's not a top-level type */ export type ToRunner = | { readonly tag: "ToRunnerPing"; readonly val: ToRunnerPing } - | { readonly tag: "ToClientInit"; readonly val: ToClientInit } - | { readonly tag: "ToClientClose"; readonly val: ToClientClose } + | { readonly tag: "ToRunnerClose"; readonly val: ToRunnerClose } | { readonly tag: "ToClientCommands"; readonly val: ToClientCommands } | { readonly tag: "ToClientAckEvents"; readonly val: ToClientAckEvents } - | { readonly tag: "ToClientKvResponse"; readonly val: ToClientKvResponse } | { readonly tag: "ToClientTunnelMessage"; readonly val: ToClientTunnelMessage } export function readToRunner(bc: bare.ByteCursor): ToRunner { @@ -1939,16 +1944,12 @@ export function readToRunner(bc: bare.ByteCursor): ToRunner { case 0: return { tag: "ToRunnerPing", val: readToRunnerPing(bc) } case 1: - return { tag: "ToClientInit", val: readToClientInit(bc) } + return { tag: "ToRunnerClose", val: null } case 2: - return { tag: "ToClientClose", val: null } - case 3: return { tag: "ToClientCommands", val: readToClientCommands(bc) } - case 4: + case 3: return { tag: "ToClientAckEvents", val: readToClientAckEvents(bc) } - case 5: - return { tag: "ToClientKvResponse", val: readToClientKvResponse(bc) } - case 6: + case 4: return { tag: "ToClientTunnelMessage", val: readToClientTunnelMessage(bc) } default: { bc.offset = offset @@ -1964,32 +1965,22 @@ export function writeToRunner(bc: bare.ByteCursor, x: ToRunner): void { writeToRunnerPing(bc, x.val) break } - case "ToClientInit": { + case "ToRunnerClose": { bare.writeU8(bc, 1) - writeToClientInit(bc, x.val) - break - } - case "ToClientClose": { - bare.writeU8(bc, 2) break } case "ToClientCommands": { - bare.writeU8(bc, 3) + bare.writeU8(bc, 2) writeToClientCommands(bc, x.val) break } case "ToClientAckEvents": { - bare.writeU8(bc, 4) + bare.writeU8(bc, 3) writeToClientAckEvents(bc, x.val) break } - case "ToClientKvResponse": { - bare.writeU8(bc, 5) - writeToClientKvResponse(bc, x.val) - break - } case "ToClientTunnelMessage": { - bare.writeU8(bc, 6) + bare.writeU8(bc, 4) writeToClientTunnelMessage(bc, x.val) break }