diff --git a/engine/packages/engine/src/util/wf/mod.rs b/engine/packages/engine/src/util/wf/mod.rs index 1216ada18e..2c9e04c4c4 100644 --- a/engine/packages/engine/src/util/wf/mod.rs +++ b/engine/packages/engine/src/util/wf/mod.rs @@ -467,6 +467,35 @@ pub async fn print_history( } } } + EventData::Signals(data) => { + // Indent + print!("{}{c} ", " ".repeat(indent)); + + for ((signal_id, name), body) in + data.signal_ids.iter().zip(&data.names).zip(&data.bodies) + { + // Indent + print!("{}{c} - ", " ".repeat(indent)); + println!("{}", event_style.apply_to(name)); + + print!("{}{c} ", " ".repeat(indent)); + println!("id {}", style(signal_id).green()); + + if !exclude_json { + // Indent + print!("{}{c} ", " ".repeat(indent)); + + println!( + "body {}", + indent_string( + &colored_json(body)?, + format!("{}{c} ", " ".repeat(indent)), + true + ) + ); + } + } + } _ => {} } } @@ -543,7 +572,7 @@ pub fn event_style(event: &Event) -> Style { EventData::Removed(_) => Style::new().red(), EventData::VersionCheck => Style::new().red(), EventData::Branch => Style::new(), - EventData::Empty => Style::new(), + EventData::Signals(_) => Style::new().cyan(), } } @@ -595,7 +624,11 @@ pub fn print_event_name(event: &Event) { } EventData::VersionCheck => print!("{}", style.apply_to("version check").bold()), EventData::Branch => print!("{}", style.apply_to("branch").bold()), - EventData::Empty => print!("{}", style.apply_to("empty").bold()), + EventData::Signals(signal) => print!( + "{} {}", + style.apply_to("signal receive").bold(), + style.apply_to(&signal.names.len()) + ), } } diff --git a/engine/packages/gasoline-macros/src/lib.rs b/engine/packages/gasoline-macros/src/lib.rs index 5f05f1af41..9e98065fa1 100644 --- a/engine/packages/gasoline-macros/src/lib.rs +++ b/engine/packages/gasoline-macros/src/lib.rs @@ -372,9 +372,13 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream { #[async_trait::async_trait] impl gas::listen::Listen for #ident { - async fn listen(ctx: &mut gas::prelude::ListenCtx) -> gas::prelude::WorkflowResult { - let row = ctx.listen_any(&[::NAME]).await?; - Self::parse(&row.signal_name, &row.body) + async fn listen(ctx: &mut gas::prelude::ListenCtx, limit: usize) -> gas::prelude::WorkflowResult> { + ctx + .listen_any(&[::NAME], limit) + .await? + .into_iter() + .map(|signal| Self::parse(&signal.signal_name, &signal.body)) + .collect() } fn parse(_name: &str, body: &serde_json::value::RawValue) -> gas::prelude::WorkflowResult { diff --git a/engine/packages/gasoline/src/ctx/listen.rs b/engine/packages/gasoline/src/ctx/listen.rs index c72b7092bb..b00c9d598f 100644 --- a/engine/packages/gasoline/src/ctx/listen.rs +++ b/engine/packages/gasoline/src/ctx/listen.rs @@ -1,5 +1,5 @@ use rivet_metrics::KeyValue; -use std::{ops::Deref, time::Instant}; +use std::ops::Deref; use crate::{ ctx::WorkflowCtx, @@ -14,7 +14,7 @@ pub struct ListenCtx<'a> { ctx: &'a WorkflowCtx, location: &'a Location, // Used by certain db drivers to know when to update internal indexes for signal wake conditions - last_try: bool, + last_attempt: bool, // HACK: Prevent `ListenCtx::listen_any` from being called more than once used: bool, } @@ -24,14 +24,14 @@ impl<'a> ListenCtx<'a> { ListenCtx { ctx, location, - last_try: false, + last_attempt: false, used: false, } } - pub(crate) fn reset(&mut self, last_try: bool) { + pub(crate) fn reset(&mut self, last_attempt: bool) { self.used = false; - self.last_try = last_try; + self.last_attempt = last_attempt; } /// Checks for a signal to this workflow with any of the given signal names. @@ -40,75 +40,63 @@ impl<'a> ListenCtx<'a> { pub async fn listen_any( &mut self, signal_names: &[&'static str], - ) -> WorkflowResult { + limit: usize, + ) -> WorkflowResult> { if self.used { return Err(WorkflowError::ListenCtxUsed); } else { self.used = true; } - let start_instant = Instant::now(); - - // Fetch new pending signal - let signal = self + // Fetch new pending signals + let signals = self .ctx .db() - .pull_next_signal( + .pull_next_signals( self.ctx.workflow_id(), self.ctx.name(), signal_names, self.location, self.ctx.version(), self.ctx.loop_location(), - self.last_try, + limit, + self.last_attempt, ) .await?; - let dt = start_instant.elapsed().as_secs_f64(); - metrics::SIGNAL_PULL_DURATION.record( - dt, - &[ - KeyValue::new("workflow_name", self.ctx.name().to_string()), - KeyValue::new( - "signal_name", - signal - .as_ref() - .map(|signal| signal.signal_name.clone()) - .unwrap_or("".into()), - ), - ], - ); - - let Some(signal) = signal else { + if signals.is_empty() { return Err(WorkflowError::NoSignalFound(Box::from(signal_names))); - }; + } - let recv_lag = (rivet_util::timestamp::now() as f64 - signal.create_ts as f64) / 1000.; - crate::metrics::SIGNAL_RECV_LAG.record( - recv_lag, - &[ - KeyValue::new("workflow_name", self.ctx.name().to_string()), - KeyValue::new("signal_name", signal.signal_name.clone()), - ], - ); + let now = rivet_util::timestamp::now(); + for signal in &signals { + let recv_lag = (now as f64 - signal.create_ts as f64) / 1000.0; + metrics::SIGNAL_RECV_LAG.record( + recv_lag, + &[ + KeyValue::new("workflow_name", self.ctx.name().to_string()), + KeyValue::new("signal_name", signal.signal_name.clone()), + ], + ); + + if recv_lag > 3.0 { + // We print an error here so the trace of this workflow does not get dropped + tracing::error!( + ?recv_lag, + signal_id=%signal.signal_id, + signal_name=%signal.signal_name, + "long signal recv time", + ); + } - if recv_lag > 3.0 { - // We print an error here so the trace of this workflow does not get dropped - tracing::error!( - ?recv_lag, + tracing::debug!( signal_id=%signal.signal_id, signal_name=%signal.signal_name, - "long signal recv time", + "signal received", ); } - tracing::debug!( - signal_id=%signal.signal_id, - signal_name=%signal.signal_name, - "signal received", - ); - - Ok(signal) + Ok(signals) } } diff --git a/engine/packages/gasoline/src/ctx/versioned_workflow.rs b/engine/packages/gasoline/src/ctx/versioned_workflow.rs index 440093fc04..104871e4e5 100644 --- a/engine/packages/gasoline/src/ctx/versioned_workflow.rs +++ b/engine/packages/gasoline/src/ctx/versioned_workflow.rs @@ -9,7 +9,7 @@ use crate::{ builder::{WorkflowRepr, workflow as builder}, ctx::{WorkflowCtx, workflow::Loop}, executable::{AsyncResult, Executable}, - listen::{CustomListener, Listen}, + listen::Listen, message::Message, signal::Signal, utils::time::{DurationToMillis, TsToMillis}, @@ -125,17 +125,6 @@ impl<'a> VersionedWorkflowCtx<'a> { }) } - /// Execute a custom listener. - #[tracing::instrument(skip_all, fields(t=std::any::type_name::()))] - pub async fn custom_listener( - &mut self, - listener: &T, - ) -> Result<::Output> { - wrap!(self, "listen", { - self.inner.custom_listener(listener).in_current_span().await - }) - } - /// Creates a message builder. pub fn msg(&mut self, body: M) -> builder::message::MessageBuilder<'_, M> { builder::message::MessageBuilder::new(self.inner, self.version(), body) diff --git a/engine/packages/gasoline/src/ctx/workflow.rs b/engine/packages/gasoline/src/ctx/workflow.rs index 5c2c6be6c8..d2c567659e 100644 --- a/engine/packages/gasoline/src/ctx/workflow.rs +++ b/engine/packages/gasoline/src/ctx/workflow.rs @@ -4,7 +4,7 @@ use std::{ time::{Duration, Instant}, }; -use anyhow::Result; +use anyhow::{Context, Result}; use futures_util::StreamExt; use opentelemetry::trace::SpanContext; use rivet_util::Id; @@ -29,7 +29,7 @@ use crate::{ location::{Coordinate, Location}, removed::Removed, }, - listen::{CustomListener, Listen}, + listen::Listen, message::Message, metrics, registry::RegistryHandle, @@ -700,91 +700,40 @@ impl WorkflowCtx { /// received, the workflow will be woken up and continue. #[tracing::instrument(skip_all, fields(t=std::any::type_name::()))] pub async fn listen(&mut self) -> Result { - self.check_stop()?; - - let history_res = self.cursor.compare_signal(self.version)?; - let location = self.cursor.current_location_for(&history_res); - - // Signal received before - let signal = if let HistoryResult::Event(signal) = history_res { - tracing::debug!( - signal_name=%signal.name, - "replaying signal" - ); - - T::parse(&signal.name, &signal.body)? - } - // Listen for new signal - else { - tracing::debug!("listening for signal"); + let signals = self.listen_n::(1).in_current_span().await?; - let mut bump_sub = self - .db - .bump_sub(BumpSubSubject::SignalPublish { - to_workflow_id: self.workflow_id, - }) - .await?; - let mut retries = self.db.max_signal_poll_retries(); - let mut interval = tokio::time::interval(self.db.signal_poll_interval()); - interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - - // Skip first tick, we wait after the db call instead of before - interval.tick().await; - - let mut ctx = ListenCtx::new(self, &location); - - loop { - ctx.reset(retries == 0); - - match T::listen(&mut ctx).in_current_span().await { - Ok(res) => break res, - Err(err) if matches!(err, WorkflowError::NoSignalFound(_)) => { - if retries == 0 { - return Err(err.into()); - } - retries -= 1; - } - Err(err) => return Err(err.into()), - } - - // Poll and wait for a wake at the same time - tokio::select! { - _ = bump_sub.next() => {}, - _ = interval.tick() => {}, - res = self.wait_stop() => res?, - } - } - }; - - // Move to next event - self.cursor.update(&location); - - Ok(signal) + signals + .into_iter() + .next() + .context("must return at least 1 signal") } - /// Execute a custom listener. + /// Listens for a N signals for a short time before setting the workflow to sleep. Once signals are + /// received, the workflow will be woken up and continue. Never returns an empty vec. #[tracing::instrument(skip_all, fields(t=std::any::type_name::()))] - pub async fn custom_listener( - &mut self, - listener: &T, - ) -> Result<::Output> { + pub async fn listen_n(&mut self, limit: usize) -> Result> { self.check_stop()?; - let history_res = self.cursor.compare_signal(self.version)?; + let history_res = self.cursor.compare_signals(self.version)?; let location = self.cursor.current_location_for(&history_res); - // Signal received before - let signal = if let HistoryResult::Event(signal) = history_res { + // Signals received before + let signals = if let HistoryResult::Event(signals) = history_res { tracing::debug!( - signal_name=%signal.name, - "replaying signal", + count=%signals.names.len(), + "replaying signals" ); - T::parse(&signal.name, &signal.body)? + signals + .names + .iter() + .zip(&signals.bodies) + .map(|(name, body)| T::parse(name, &body)) + .collect::, _>>()? } - // Listen for new signal + // Listen for new signals else { - tracing::debug!("listening for signal"); + tracing::debug!("listening for signals"); let mut bump_sub = self .db @@ -804,7 +753,7 @@ impl WorkflowCtx { loop { ctx.reset(retries == 0); - match listener.listen(&mut ctx).in_current_span().await { + match T::listen(&mut ctx, limit).in_current_span().await { Ok(res) => break res, Err(err) if matches!(err, WorkflowError::NoSignalFound(_)) => { if retries == 0 { @@ -827,7 +776,7 @@ impl WorkflowCtx { // Move to next event self.cursor.update(&location); - Ok(signal) + Ok(signals) } /// Creates a message builder. @@ -1135,19 +1084,47 @@ impl WorkflowCtx { &mut self, duration: impl DurationToMillis, ) -> Result> { + let signals = self.listen_n_with_timeout(duration, 1).await?; + + Ok(signals.into_iter().next()) + } + + /// Listens for signals with a timeout. Returns an empty vec if the timeout is reached. + /// + /// Internally this is a sleep event and a signals event. + #[tracing::instrument(skip_all, fields(t=std::any::type_name::()))] + pub async fn listen_n_with_timeout( + &mut self, + duration: impl DurationToMillis, + limit: usize, + ) -> Result> { let time = (rivet_util::timestamp::now() as u64 + duration.to_millis()?) as i64; - self.listen_until(time).await + self.listen_n_until(time, limit).await + } + + /// Listens for a signal until the given timestamp. Returns `None` if the timestamp is reached. + /// + /// Internally this is a sleep event and a signals event. + #[tracing::instrument(skip_all, fields(t=std::any::type_name::(), duration))] + pub async fn listen_until(&mut self, time: impl TsToMillis) -> Result> { + let signals = self.listen_n_until(time, 1).await?; + + Ok(signals.into_iter().next()) } // TODO: Potential bad transaction: if the signal gets pulled and saved in history but an error occurs // before the sleep event state is set to "interrupted", the next time this workflow is run it will error // because it tries to pull a signal again - /// Listens for a signal until the given timestamp. Returns `None` if the timestamp is reached. + /// Listens for signals until the given timestamp. Returns an empty vec if the timestamp is reached. /// /// Internally this is a sleep event and a signal event. #[tracing::instrument(skip_all, fields(t=std::any::type_name::(), duration))] - pub async fn listen_until(&mut self, time: impl TsToMillis) -> Result> { + pub async fn listen_n_until( + &mut self, + time: impl TsToMillis, + limit: usize, + ) -> Result> { self.check_stop()?; let history_res = self.cursor.compare_sleep(self.version)?; @@ -1180,51 +1157,56 @@ impl WorkflowCtx { // Move to next event self.cursor.update(&sleep_location); - // Signal received before + // Signals received before if matches!(state, SleepState::Interrupted) { - let history_res = self.cursor.compare_signal(self.version)?; - let signal_location = self.cursor.current_location_for(&history_res); + let history_res = self.cursor.compare_signals(self.version)?; + let signals_location = self.cursor.current_location_for(&history_res); - if let HistoryResult::Event(signal) = history_res { + if let HistoryResult::Event(signals) = history_res { tracing::debug!( - signal_name=%signal.name, - "replaying signal", + count=?signals.names.len(), + "replaying signals", ); - let signal = T::parse(&signal.name, &signal.body)?; + let signals = signals + .names + .iter() + .zip(&signals.bodies) + .map(|(name, body)| T::parse(name, &body)) + .collect::, _>>()?; // Move to next event - self.cursor.update(&signal_location); + self.cursor.update(&signals_location); // Short circuit - return Ok(Some(signal)); + return Ok(signals); } else { return Err(WorkflowError::HistoryDiverged(format!( - "expected signal at {}, found nothing", - signal_location, + "expected signals at {}, found nothing", + signals_location, )) .into()); } } - // Location of the signal event (comes after the sleep event) - let signal_location = self.cursor.current_location_for(&history_res2); + // Location of the signals event (comes after the sleep event) + let signals_location = self.cursor.current_location_for(&history_res2); let duration = deadline_ts.saturating_sub(rivet_util::timestamp::now()); tracing::Span::current().record("duration", &duration); // Duration is now 0, timeout is over - let signal = if duration <= 0 { - // After timeout is over, check once for signal + let signals = if duration <= 0 { + // After timeout is over, check once for signals if matches!(state, SleepState::Normal) { - let mut ctx = ListenCtx::new(self, &signal_location); + let mut ctx = ListenCtx::new(self, &signals_location); - match T::listen(&mut ctx).in_current_span().await { - Ok(x) => Some(x), - Err(WorkflowError::NoSignalFound(_)) => None, + match T::listen(&mut ctx, limit).in_current_span().await { + Ok(x) => x, + Err(WorkflowError::NoSignalFound(_)) => Vec::new(), Err(err) => return Err(err.into()), } } else { - None + Vec::new() } } // Sleep in memory if duration is shorter than the worker tick @@ -1234,7 +1216,7 @@ impl WorkflowCtx { let res = tokio::time::timeout( Duration::from_millis(duration.try_into()?), (async { - tracing::debug!("listening for signal with timeout"); + tracing::debug!("listening for signals with timeout"); let mut bump_sub = self .db @@ -1248,12 +1230,12 @@ impl WorkflowCtx { // Skip first tick, we wait after the db call instead of before interval.tick().await; - let mut ctx = ListenCtx::new(self, &signal_location); + let mut ctx = ListenCtx::new(self, &signals_location); loop { ctx.reset(false); - match T::listen(&mut ctx).in_current_span().await { + match T::listen(&mut ctx, limit).in_current_span().await { // Retry Err(WorkflowError::NoSignalFound(_)) => {} x => return x, @@ -1272,17 +1254,17 @@ impl WorkflowCtx { .await; match res { - Ok(res) => Some(res?), + Ok(res) => res?, Err(_) => { - tracing::debug!("timed out listening for signal"); + tracing::debug!("timed out listening for signals"); - None + Vec::new() } } } // Workflow sleep for long durations else { - tracing::debug!("listening for signal with timeout"); + tracing::debug!("listening for signals with timeout"); let mut bump_sub = self .db @@ -1297,13 +1279,13 @@ impl WorkflowCtx { // Skip first tick, we wait after the db call instead of before interval.tick().await; - let mut ctx = ListenCtx::new(self, &signal_location); + let mut ctx = ListenCtx::new(self, &signals_location); loop { ctx.reset(retries == 0); - match T::listen(&mut ctx).in_current_span().await { - Ok(res) => break Some(res), + match T::listen(&mut ctx, limit).in_current_span().await { + Ok(res) => break res, Err(WorkflowError::NoSignalFound(signals)) => { if retries == 0 { return Err( @@ -1325,7 +1307,7 @@ impl WorkflowCtx { }; // Update sleep state - if signal.is_some() { + if !signals.is_empty() { self.db .update_workflow_sleep_event_state( self.workflow_id, @@ -1335,7 +1317,7 @@ impl WorkflowCtx { .await?; // Move to next event - self.cursor.update(&signal_location); + self.cursor.update(&signals_location); } else if matches!(state, SleepState::Normal) { self.db .update_workflow_sleep_event_state( @@ -1346,7 +1328,7 @@ impl WorkflowCtx { .await?; } - Ok(signal) + Ok(signals) } /// Represents a removed workflow step. diff --git a/engine/packages/gasoline/src/db/debug.rs b/engine/packages/gasoline/src/db/debug.rs index d6be9951ca..0dba696a81 100644 --- a/engine/packages/gasoline/src/db/debug.rs +++ b/engine/packages/gasoline/src/db/debug.rs @@ -91,10 +91,7 @@ pub enum EventData { Removed(RemovedEvent), VersionCheck, Branch, - - /// NOTE: Strictly used as a placeholder for backfilling. When using this, the coordinate of the `Event` - /// must still be valid. - Empty, + Signals(SignalsEvent), } impl std::fmt::Display for EventData { @@ -120,7 +117,13 @@ impl std::fmt::Display for EventData { } EventData::VersionCheck => write!(f, "version check"), EventData::Branch => write!(f, "branch"), - EventData::Empty => write!(f, "empty"), + EventData::Signals(signals) => { + let mut unique_names = signals.names.clone(); + unique_names.sort(); + unique_names.dedup(); + + write!(f, "signals {:?}", unique_names.join(", ")) + } } } } @@ -172,6 +175,13 @@ pub struct LoopEvent { pub iteration: usize, } +#[derive(Debug)] +pub struct SignalsEvent { + pub signal_ids: Vec, + pub names: Vec, + pub bodies: Vec, +} + #[derive(Debug, Clone)] pub struct ActivityError { pub error: String, diff --git a/engine/packages/gasoline/src/db/kv/debug.rs b/engine/packages/gasoline/src/db/kv/debug.rs index 624220077a..21c6fbe7d3 100644 --- a/engine/packages/gasoline/src/db/kv/debug.rs +++ b/engine/packages/gasoline/src/db/kv/debug.rs @@ -22,7 +22,7 @@ use crate::{ BumpSubSubject, debug::{ ActivityError, ActivityEvent, DatabaseDebug, Event, EventData, HistoryData, LoopEvent, - MessageSendEvent, SignalData, SignalEvent, SignalSendEvent, SignalState, + MessageSendEvent, SignalData, SignalEvent, SignalSendEvent, SignalState, SignalsEvent, SubWorkflowEvent, WorkflowData, WorkflowState, }, }, @@ -895,6 +895,44 @@ impl DatabaseDebug for DatabaseKv { let inner_event_type = key.deserialize(entry.value())?; current_event.inner_event_type = Some(inner_event_type); + } else if let Ok(key) = + self.subspace + .unpack::(entry.key()) + { + ensure!( + current_event.indexed_signal_ids.len() == key.index, + "corrupt history, index doesn't exist yet or is out of order" + ); + + let signal_id = key.deserialize(entry.value())?; + current_event + .indexed_signal_ids + .insert(key.index, signal_id); + } else if let Ok(key) = self + .subspace + .unpack::(entry.key()) + { + ensure!( + current_event.indexed_names.len() == key.index, + "corrupt history, index doesn't exist yet or is out of order" + ); + + let name = key.deserialize(entry.value())?; + current_event.indexed_names.insert(key.index, name); + } else if let Ok(key) = + self.subspace + .unpack::(entry.key()) + { + ensure!( + current_event.indexed_input_chunks.len() == key.index, + "corrupt history, index doesn't exist yet or is out of order" + ); + + if let Some(input_chunks) = + current_event.indexed_input_chunks.get_mut(key.index) + { + input_chunks.push(entry); + } } // We ignore keys we don't need (like tags) @@ -1190,6 +1228,10 @@ struct WorkflowHistoryEventBuilder { deadline_ts: Option, sleep_state: Option, inner_event_type: Option, + + indexed_signal_ids: Vec, + indexed_names: Vec, + indexed_input_chunks: Vec>, } impl WorkflowHistoryEventBuilder { @@ -1211,6 +1253,10 @@ impl WorkflowHistoryEventBuilder { deadline_ts: None, sleep_state: None, inner_event_type: None, + + indexed_signal_ids: Vec::new(), + indexed_names: Vec::new(), + indexed_input_chunks: Vec::new(), } } } @@ -1243,6 +1289,7 @@ impl TryFrom for Event { EventType::Branch => EventData::Branch, EventType::Removed => EventData::Removed(value.try_into()?), EventType::VersionCheck => EventData::VersionCheck, + EventType::Signals => EventData::Signals(value.try_into()?), }, }) } @@ -1491,3 +1538,42 @@ impl TryFrom for RemovedEvent { }) } } + +impl TryFrom for SignalsEvent { + type Error = WorkflowError; + + fn try_from(value: WorkflowHistoryEventBuilder) -> WorkflowResult { + Ok(SignalsEvent { + signal_ids: if value.indexed_signal_ids.is_empty() { + return Err(WorkflowError::MissingEventData("signal_id")); + } else { + value.indexed_signal_ids + }, + names: if value.indexed_names.is_empty() { + return Err(WorkflowError::MissingEventData("name")); + } else { + value.indexed_names + }, + bodies: if value.indexed_input_chunks.is_empty() { + return Err(WorkflowError::MissingEventData("input")); + } else { + value + .indexed_input_chunks + .into_iter() + .map(|input_chunks| { + // workflow_id not needed + let input_key = + keys::history::InputKey::new(Id::nil(), value.location.clone()); + let v = input_key + .combine(input_chunks) + .map_err(WorkflowError::DeserializeEventData)?; + + serde_json::from_str::(v.get()) + .map_err(Into::into) + .map_err(WorkflowError::DeserializeEventData) + }) + .collect::>()? + }, + }) + } +} diff --git a/engine/packages/gasoline/src/db/kv/keys/history.rs b/engine/packages/gasoline/src/db/kv/keys/history.rs index bf50b7d6e2..7650ad4d63 100644 --- a/engine/packages/gasoline/src/db/kv/keys/history.rs +++ b/engine/packages/gasoline/src/db/kv/keys/history.rs @@ -1167,6 +1167,272 @@ impl<'de> TupleUnpack<'de> for TagKey { } } +#[derive(Debug)] +pub struct IndexedSignalIdKey { + workflow_id: Id, + location: Location, + forgotten: bool, + pub index: usize, +} + +impl IndexedSignalIdKey { + pub fn new(workflow_id: Id, location: Location, index: usize) -> Self { + IndexedSignalIdKey { + workflow_id, + location, + forgotten: false, + index, + } + } +} + +impl FormalKey for IndexedSignalIdKey { + type Value = Id; + + fn deserialize(&self, raw: &[u8]) -> Result { + Ok(Id::from_slice(raw)?) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.as_bytes().to_vec()) + } +} + +impl TuplePack for IndexedSignalIdKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + pack_indexed_history_key( + self.workflow_id, + &self.location, + w, + tuple_depth, + self.forgotten, + SIGNAL, + self.index, + SIGNAL_ID, + ) + } +} + +impl<'de> TupleUnpack<'de> for IndexedSignalIdKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (workflow_id, location, forgotten, index)) = unpack_indexed_history_key( + input, + tuple_depth, + SIGNAL, + "SIGNAL", + SIGNAL_ID, + "SIGNAL_ID", + )?; + + Ok(( + input, + IndexedSignalIdKey { + workflow_id, + location, + forgotten, + index, + }, + )) + } +} + +#[derive(Debug)] +pub struct IndexedNameKey { + workflow_id: Id, + location: Location, + forgotten: bool, + pub index: usize, +} + +impl IndexedNameKey { + pub fn new(workflow_id: Id, location: Location, index: usize) -> Self { + IndexedNameKey { + workflow_id, + location, + forgotten: false, + index, + } + } +} + +impl FormalKey for IndexedNameKey { + type Value = String; + + fn deserialize(&self, raw: &[u8]) -> Result { + String::from_utf8(raw.to_vec()).map_err(Into::into) + } + + fn serialize(&self, value: Self::Value) -> Result> { + Ok(value.into_bytes()) + } +} + +impl TuplePack for IndexedNameKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + pack_indexed_history_key( + self.workflow_id, + &self.location, + w, + tuple_depth, + self.forgotten, + SIGNAL, + self.index, + NAME, + ) + } +} + +impl<'de> TupleUnpack<'de> for IndexedNameKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (workflow_id, location, forgotten, index)) = + unpack_indexed_history_key(input, tuple_depth, SIGNAL, "SIGNAL", NAME, "NAME")?; + + Ok(( + input, + IndexedNameKey { + workflow_id, + location, + forgotten, + index, + }, + )) + } +} + +#[derive(Debug)] +pub struct IndexedInputKey { + workflow_id: Id, + location: Location, + forgotten: bool, + pub index: usize, +} + +impl IndexedInputKey { + pub fn new(workflow_id: Id, location: Location, index: usize) -> Self { + IndexedInputKey { + workflow_id, + location, + forgotten: false, + index, + } + } + + pub fn split_ref(&self, value: &serde_json::value::RawValue) -> Result>> { + Ok(value + .get() + .as_bytes() + .chunks(universaldb::utils::CHUNK_SIZE) + .map(|x| x.to_vec()) + .collect()) + } +} + +impl FormalChunkedKey for IndexedInputKey { + type ChunkKey = IndexedInputChunkKey; + type Value = Box; + + fn chunk(&self, chunk: usize) -> Self::ChunkKey { + IndexedInputChunkKey { + workflow_id: self.workflow_id, + location: self.location.clone(), + forgotten: self.forgotten, + index: self.index, + chunk, + } + } + + fn combine(&self, chunks: Vec) -> Result { + serde_json::value::RawValue::from_string(String::from_utf8( + chunks + .iter() + .map(|x| x.value().iter().map(|x| *x)) + .flatten() + .collect(), + )?) + .context("failed to combine `InputKey`") + } + + fn split(&self, value: Self::Value) -> Result>> { + self.split_ref(value.as_ref()) + } +} + +impl TuplePack for IndexedInputKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + pack_indexed_history_key( + self.workflow_id, + &self.location, + w, + tuple_depth, + self.forgotten, + SIGNAL, + self.index, + INPUT, + ) + } +} + +pub struct IndexedInputChunkKey { + workflow_id: Id, + location: Location, + forgotten: bool, + pub index: usize, + chunk: usize, +} + +impl TuplePack for IndexedInputChunkKey { + fn pack( + &self, + w: &mut W, + tuple_depth: TupleDepth, + ) -> std::io::Result { + pack_indexed_history_key( + self.workflow_id, + &self.location, + w, + tuple_depth, + self.forgotten, + SIGNAL, + self.index, + INPUT, + )?; + + self.chunk.pack(w, tuple_depth) + } +} + +impl<'de> TupleUnpack<'de> for IndexedInputChunkKey { + fn unpack(input: &[u8], tuple_depth: TupleDepth) -> PackResult<(&[u8], Self)> { + let (input, (workflow_id, location, forgotten, index)) = + unpack_indexed_history_key(input, tuple_depth, SIGNAL, "SIGNAL", INPUT, "INPUT")?; + + let (input, chunk) = ::unpack(input, tuple_depth)?; + + Ok(( + input, + IndexedInputChunkKey { + workflow_id, + location, + forgotten, + index, + chunk, + }, + )) + } +} + fn pack_history_key( workflow_id: Id, location: &Location, @@ -1237,6 +1503,88 @@ fn unpack_history_key<'de>( )) } +fn pack_indexed_history_key( + workflow_id: Id, + location: &Location, + w: &mut W, + tuple_depth: TupleDepth, + forgotten: bool, + variant: usize, + index: usize, + variant2: usize, +) -> std::io::Result { + let mut offset = VersionstampOffset::None { size: 0 }; + + let t = ( + WORKFLOW, + DATA, + workflow_id, + HISTORY, + if forgotten { FORGOTTEN } else { ACTIVE }, + ); + offset += t.pack(w, tuple_depth)?; + + for coord in &**location { + offset += coord.pack(w, tuple_depth)?; + } + + let t = (DATA, variant, index, variant2); + offset += t.pack(w, tuple_depth)?; + + Ok(offset) +} + +fn unpack_indexed_history_key<'de>( + input: &'de [u8], + tuple_depth: TupleDepth, + variant: usize, + variant_str: &str, + variant2: usize, + variant2_str: &str, +) -> PackResult<(&'de [u8], (Id, Location, bool, usize))> { + let (mut input, (_, _, workflow_id, data, history_variant)) = + <(usize, usize, Id, usize, usize)>::unpack(input, tuple_depth)?; + if data != HISTORY { + return Err(PackError::Message("expected HISTORY data".into())); + } + + let mut coords = Vec::new(); + + loop { + let Ok((input2, coord)) = Coordinate::unpack(input, tuple_depth) else { + break; + }; + + coords.push(coord); + input = input2; + } + + let (input, (_, data, index, data2)) = + <(usize, usize, usize, usize)>::unpack(input, tuple_depth)?; + + if data != variant { + return Err(PackError::Message( + format!("expected {variant_str} data").into(), + )); + } + + if data2 != variant2 { + return Err(PackError::Message( + format!("expected {variant2_str} data").into(), + )); + } + + Ok(( + input, + ( + workflow_id, + Location::from_iter(coords), + history_variant == FORGOTTEN, + index, + ), + )) +} + pub mod insert { use anyhow::Result; use rivet_util::Id; @@ -1281,16 +1629,13 @@ pub mod insert { Ok(()) } - pub fn signal_event( + pub fn signals_event( subspace: &universaldb::tuple::Subspace, tx: &universaldb::RetryableTransaction, workflow_id: Id, location: &Location, version: usize, create_ts: i64, - signal_id: Id, - signal_name: &str, - body: &serde_json::value::RawValue, ) -> Result<()> { common( subspace, @@ -1300,21 +1645,32 @@ pub mod insert { EventType::Signal, version, create_ts, - )?; + ) + } - let signal_id_key = super::SignalIdKey::new(workflow_id, location.clone()); + pub fn signals_event_signal( + subspace: &universaldb::tuple::Subspace, + tx: &universaldb::RetryableTransaction, + workflow_id: Id, + location: &Location, + index: usize, + signal_id: Id, + signal_name: &str, + body: &serde_json::value::RawValue, + ) -> Result<()> { + let signal_id_key = super::IndexedSignalIdKey::new(workflow_id, location.clone(), index); tx.set( &subspace.pack(&signal_id_key), &signal_id_key.serialize(signal_id)?, ); - let signal_name_key = super::NameKey::new(workflow_id, location.clone()); + let signal_name_key = super::IndexedNameKey::new(workflow_id, location.clone(), index); tx.set( &subspace.pack(&signal_name_key), &signal_name_key.serialize(signal_name.to_string())?, ); - let signal_body_key = super::InputKey::new(workflow_id, location.clone()); + let signal_body_key = super::IndexedInputKey::new(workflow_id, location.clone(), index); // Write signal body for (i, chunk) in signal_body_key.split_ref(&body)?.into_iter().enumerate() { diff --git a/engine/packages/gasoline/src/db/kv/mod.rs b/engine/packages/gasoline/src/db/kv/mod.rs index 474e271100..6d3777c3be 100644 --- a/engine/packages/gasoline/src/db/kv/mod.rs +++ b/engine/packages/gasoline/src/db/kv/mod.rs @@ -8,7 +8,7 @@ use std::{ time::Instant, }; -use anyhow::{Context, Result}; +use anyhow::{Context, Result, ensure}; use futures_util::{StreamExt, TryStreamExt, stream::BoxStream}; use rivet_util::Id; use rivet_util::future::CustomInstrumentExt; @@ -31,7 +31,7 @@ use crate::{ history::{ event::{ ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, RemovedEvent, - SignalEvent, SignalSendEvent, SleepEvent, SleepState, SubWorkflowEvent, + SignalSendEvent, SignalsEvent, SleepEvent, SleepState, SubWorkflowEvent, }, location::Location, }, @@ -1477,6 +1477,35 @@ impl Database for DatabaseKv { current_event.inner_event_type = Some(inner_event_type); + } else if let Ok(key) = self + .subspace + .unpack::( + entry.key(), + ) { + ensure!( + current_event.indexed_names.len() == key.index, + "corrupt history, index doesn't exist yet or is out of order" + ); + + let name = key.deserialize(entry.value())?; + current_event.indexed_names.insert(key.index, name); + } else if let Ok(key) = + self.subspace + .unpack::( + entry.key(), + ) { + ensure!( + current_event.indexed_input_chunks.len() + == key.index, + "corrupt history, index doesn't exist yet or is out of order" + ); + + if let Some(input_chunks) = current_event + .indexed_input_chunks + .get_mut(key.index) + { + input_chunks.push(entry); + } } // We ignore keys we don't need (like tags) @@ -1883,7 +1912,7 @@ impl Database for DatabaseKv { } #[tracing::instrument(skip_all)] - async fn pull_next_signal( + async fn pull_next_signals( &self, workflow_id: Id, _workflow_name: &str, @@ -1891,176 +1920,178 @@ impl Database for DatabaseKv { location: &Location, version: usize, _loop_location: Option<&Location>, - last_try: bool, - ) -> WorkflowResult> { + limit: usize, + last_attempt: bool, + ) -> WorkflowResult> { let owned_filter = filter .into_iter() .map(|x| x.to_string()) .collect::>(); - // Fetch signal from UDB - let signal = - self.pools - .udb() - .map_err(WorkflowError::PoolsGeneric)? - .run(|tx| { - let owned_filter = owned_filter.clone(); - - async move { - let signal = { - // Create a stream for each signal name subspace - let streams = owned_filter - .iter() - .map(|signal_name| { - let pending_signal_subspace = self.subspace.subspace( - &keys::workflow::PendingSignalKey::subspace( - workflow_id, - signal_name.to_string(), - ), - ); + // Fetch signals from UDB + let signals = self + .pools + .udb() + .map_err(WorkflowError::PoolsGeneric)? + .run(|tx| { + let owned_filter = owned_filter.clone(); - tx.get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - limit: Some(1), - ..(&pending_signal_subspace).into() - }, - // NOTE: This is Serializable because any insert into this subspace - // should cause a conflict and retry of this txn - Serializable, - ) - }) - .collect::>(); - - // Fetch the next entry from all streams at the same time - let mut results = futures_util::future::try_join_all( - streams.into_iter().map(|mut stream| async move { - if let Some(entry) = stream.try_next().await? { - Result::<_>::Ok(Some(( - entry.key().to_vec(), - self.subspace - .unpack::( - &entry.key(), - )?, - ))) - } else { - Ok(None) - } - }), - ) - .instrument(tracing::trace_span!("map_signals")) - .await?; - - // Sort by ts - results.sort_by_key(|res| res.as_ref().map(|(_, key)| key.ts)); - - results.into_iter().flatten().next().map( - |(raw_key, pending_signal_key)| { - ( - raw_key, - pending_signal_key.signal_name, - pending_signal_key.ts, - pending_signal_key.signal_id, - ) + async move { + // Fetch signals from all streams at the same time + let signals = futures_util::stream::iter(owned_filter.clone()) + .map(|signal_name| { + let pending_signal_subspace = self.subspace.subspace( + &keys::workflow::PendingSignalKey::subspace( + workflow_id, + signal_name.to_string(), + ), + ); + + tx.get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::Exact, + limit: Some(limit), + ..(&pending_signal_subspace).into() }, + // NOTE: This is Serializable because any insert into this subspace + // should cause a conflict and retry of this txn + Serializable, ) - }; + }) + .flatten() + .map(|res| { + let entry = res?; - // Signal found - if let Some((raw_key, signal_name, ts, signal_id)) = signal { - let ack_ts_key = keys::signal::AckTsKey::new(signal_id); + anyhow::Ok( + self.subspace + .unpack::(&entry.key())?, + ) + }) + .try_collect::>() + .instrument(tracing::trace_span!("map_signals")) + .await?; - // Ack signal - tx.add_conflict_range( - &raw_key, - &end_of_key_range(&raw_key), - ConflictRangeType::Read, - )?; - tx.set( - &self.subspace.pack(&ack_ts_key), - &ack_ts_key.serialize(rivet_util::timestamp::now())?, - ); + if !signals.is_empty() { + let now = rivet_util::timestamp::now(); - update_metric( - &tx.with_subspace(self.subspace.clone()), - Some(keys::metric::GaugeMetric::SignalPending( - signal_name.to_string(), - )), - None, - ); + // Insert history event + keys::history::insert::signals_event( + &self.subspace, + &tx, + workflow_id, + &location, + version, + now, + )?; - // TODO: Split txn into two after acking here? + let mut signals = + futures_util::stream::iter(signals.into_iter().enumerate()) + .map(|(index, key)| { + let tx = tx.clone(); + async move { + let ack_ts_key = keys::signal::AckTsKey::new(key.signal_id); + let packed_key = tx.pack(&key); + + // Ack signal + tx.add_conflict_range( + &packed_key, + &end_of_key_range(&packed_key), + ConflictRangeType::Read, + )?; + tx.set( + &self.subspace.pack(&ack_ts_key), + &ack_ts_key.serialize(rivet_util::timestamp::now())?, + ); - // Clear pending signal key - tx.clear(&raw_key); + update_metric( + &tx.with_subspace(self.subspace.clone()), + Some(keys::metric::GaugeMetric::SignalPending( + key.signal_name.to_string(), + )), + None, + ); - // Read signal body - let body_key = keys::signal::BodyKey::new(signal_id); - let body_subspace = self.subspace.subspace(&body_key); + // TODO: Split txn into two after acking here? - let chunks = tx - .get_ranges_keyvalues( - universaldb::RangeOption { - mode: StreamingMode::WantAll, - ..(&body_subspace).into() - }, - Serializable, - ) + // Clear pending signal key + tx.clear(&packed_key); + + // Read signal body + let body_key = keys::signal::BodyKey::new(key.signal_id); + let body_subspace = self.subspace.subspace(&body_key); + + let chunks = tx + .get_ranges_keyvalues( + universaldb::RangeOption { + mode: StreamingMode::WantAll, + ..(&body_subspace).into() + }, + Serializable, + ) + .try_collect::>() + .await?; + + let body = body_key.combine(chunks)?; + + keys::history::insert::signals_event_signal( + &self.subspace, + &tx, + workflow_id, + &location, + index, + key.signal_id, + &key.signal_name, + &body, + )?; + + anyhow::Ok(SignalData { + signal_id: key.signal_id, + signal_name: key.signal_name, + create_ts: key.ts, + body, + }) + } + }) + .buffer_unordered(1024) .try_collect::>() .await?; - let body = body_key.combine(chunks)?; + // Sort by ts + signals.sort_by_key(|key| key.create_ts); - // Insert history event - keys::history::insert::signal_event( - &self.subspace, - &tx, + // Apply limit + Ok(signals.into_iter().take(limit).collect()) + } + // No signals found + else { + // Write signal wake index if no signal was received. Normally this is done in + // `commit_workflow` but without this code there would be a race condition if the + // signal is published between after this transaction and before `commit_workflow`. + // There is a possibility of `commit_workflow` NOT writing a signal secondary index + // after this in which case there might be an unnecessary wake condition inserted + // causing the workflow to wake up again, but this is not as big of an issue because + // workflow wakes should be idempotent if no events happen. + // It is important that this is only written on the last try to pull workflows + // (the workflow engine internally retries a few times) because it should only + // write signal wake indexes before going to sleep (with err `NoSignalFound`) and + // not during a retry. + if last_attempt { + self.write_signal_wake_idxs( workflow_id, - &location, - version, - rivet_util::timestamp::now(), - signal_id, - &signal_name, - &body, + &owned_filter.iter().map(|x| x.as_str()).collect::>(), + &tx, )?; - - Ok(Some(SignalData { - signal_id, - signal_name, - create_ts: ts, - body, - })) } - // No signal found - else { - // Write signal wake index if no signal was received. Normally this is done in - // `commit_workflow` but without this code there would be a race condition if the - // signal is published between after this transaction and before `commit_workflow`. - // There is a possibility of `commit_workflow` NOT writing a signal secondary index - // after this in which case there might be an unnecessary wake condition inserted - // causing the workflow to wake up again, but this is not as big of an issue because - // workflow wakes should be idempotent if no events happen. - // It is important that this is only written on the last try to pull workflows - // (the workflow engine internally retries a few times) because it should only - // write signal wake indexes before going to sleep (with err `NoSignalFound`) and - // not during a retry. - if last_try { - self.write_signal_wake_idxs( - workflow_id, - &owned_filter.iter().map(|x| x.as_str()).collect::>(), - &tx, - )?; - } - Ok(None) - } + Ok(Vec::new()) } - }) - .custom_instrument(tracing::info_span!("pull_next_signal_tx")) - .await - .map_err(WorkflowError::Udb)?; + } + }) + .custom_instrument(tracing::info_span!("pull_next_signals_tx")) + .await + .map_err(WorkflowError::Udb)?; - Ok(signal) + Ok(signals) } #[tracing::instrument(skip_all)] @@ -2818,6 +2849,9 @@ struct WorkflowHistoryEventBuilder { deadline_ts: Option, sleep_state: Option, inner_event_type: Option, + + indexed_names: Vec, + indexed_input_chunks: Vec>, } impl WorkflowHistoryEventBuilder { @@ -2837,6 +2871,9 @@ impl WorkflowHistoryEventBuilder { deadline_ts: None, sleep_state: None, inner_event_type: None, + + indexed_names: Vec::new(), + indexed_input_chunks: Vec::new(), } } } @@ -2860,7 +2897,21 @@ impl TryFrom for Event { .ok_or(WorkflowError::MissingEventData("version"))?, data: match event_type { EventType::Activity => EventData::Activity(value.try_into()?), - EventType::Signal => EventData::Signal(value.try_into()?), + // Deprecated, manually convert to newer type + EventType::Signal => { + EventData::Signals(SignalsEvent { + names: vec![value.name.ok_or(WorkflowError::MissingEventData("name"))?], + bodies: vec![if value.input_chunks.is_empty() { + return Err(WorkflowError::MissingEventData("input")); + } else { + // workflow_id not needed + let input_key = keys::history::InputKey::new(Id::nil(), value.location); + input_key + .combine(value.input_chunks) + .map_err(WorkflowError::DeserializeEventData)? + }], + }) + } EventType::SignalSend => EventData::SignalSend(value.try_into()?), EventType::MessageSend => EventData::MessageSend(value.try_into()?), EventType::SubWorkflow => EventData::SubWorkflow(value.try_into()?), @@ -2869,6 +2920,7 @@ impl TryFrom for Event { EventType::Branch => EventData::Branch, EventType::Removed => EventData::Removed(value.try_into()?), EventType::VersionCheck => EventData::VersionCheck, + EventType::Signals => EventData::Signals(value.try_into()?), }, }) } @@ -2901,27 +2953,6 @@ impl TryFrom for ActivityEvent { } } -impl TryFrom for SignalEvent { - type Error = WorkflowError; - - fn try_from(value: WorkflowHistoryEventBuilder) -> WorkflowResult { - Ok(SignalEvent { - name: value.name.ok_or(WorkflowError::MissingEventData("name"))?, - body: { - if value.input_chunks.is_empty() { - return Err(WorkflowError::MissingEventData("input")); - } else { - // workflow_id not needed - let input_key = keys::history::InputKey::new(Id::nil(), value.location); - input_key - .combine(value.input_chunks) - .map_err(WorkflowError::DeserializeEventData)? - } - }, - }) - } -} - impl TryFrom for SignalSendEvent { type Error = WorkflowError; @@ -3024,6 +3055,36 @@ impl TryFrom for RemovedEvent { } } +impl TryFrom for SignalsEvent { + type Error = WorkflowError; + + fn try_from(value: WorkflowHistoryEventBuilder) -> WorkflowResult { + Ok(SignalsEvent { + names: if value.indexed_names.is_empty() { + return Err(WorkflowError::MissingEventData("name")); + } else { + value.indexed_names + }, + bodies: if value.indexed_input_chunks.is_empty() { + return Err(WorkflowError::MissingEventData("input")); + } else { + value + .indexed_input_chunks + .into_iter() + .map(|input_chunks| { + // workflow_id not needed + let input_key = + keys::history::InputKey::new(Id::nil(), value.location.clone()); + input_key + .combine(input_chunks) + .map_err(WorkflowError::DeserializeEventData) + }) + .collect::>()? + }, + }) + } +} + fn value_to_str(v: &serde_json::Value) -> WorkflowResult { match v { serde_json::Value::String(s) => Ok(s.clone()), diff --git a/engine/packages/gasoline/src/db/mod.rs b/engine/packages/gasoline/src/db/mod.rs index a4e94020ca..69f28490d1 100644 --- a/engine/packages/gasoline/src/db/mod.rs +++ b/engine/packages/gasoline/src/db/mod.rs @@ -129,8 +129,8 @@ pub trait Database: Send { error: &str, ) -> WorkflowResult<()>; - /// Pulls the oldest signal with the given filter. - async fn pull_next_signal( + /// Pulls signals in order from oldest to newest with the given filter. + async fn pull_next_signals( &self, workflow_id: Id, workflow_name: &str, @@ -138,8 +138,9 @@ pub trait Database: Send { location: &Location, version: usize, loop_location: Option<&Location>, - last_try: bool, - ) -> WorkflowResult>; + limit: usize, + last_attempt: bool, + ) -> WorkflowResult>; /// Retrieves a workflow with the given ID. Can only be called from a workflow context. async fn get_sub_workflow( diff --git a/engine/packages/gasoline/src/history/cursor.rs b/engine/packages/gasoline/src/history/cursor.rs index 4c8a1b5c6f..a098c351be 100644 --- a/engine/packages/gasoline/src/history/cursor.rs +++ b/engine/packages/gasoline/src/history/cursor.rs @@ -3,8 +3,8 @@ use std::cmp::Ordering; use super::{ History, event::{ - ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, SignalEvent, - SignalSendEvent, SleepEvent, SubWorkflowEvent, + ActivityEvent, Event, EventData, EventType, LoopEvent, MessageSendEvent, SignalSendEvent, + SignalsEvent, SleepEvent, SubWorkflowEvent, }, location::{Coordinate, Location}, removed::Removed, @@ -119,18 +119,7 @@ impl Cursor { pub fn current_event(&self) -> Option<&Event> { if let Some(branch) = self.events.get(&self.root_location) { - let event = branch.get(self.iter_idx); - - // Empty events are considered `None` - if let Some(Event { - data: EventData::Empty, - .. - }) = &event - { - None - } else { - event - } + branch.get(self.iter_idx) } else { None } @@ -183,320 +172,282 @@ impl Cursor { Ok(()) } - /// Returns `Some` if the current event is being replayed. pub fn compare_activity( &self, version: usize, name: &str, ) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found activity {:?} v{}", - event.data, - event.version, - self.current_location(), - name, - version, - ))); - } + if version > event.version { + return Ok(HistoryResult::Insertion); + } - // Validate history is consistent - let EventData::Activity(activity) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found activity {:?}", - event.data, - self.current_location(), - name, - ))); - }; + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found activity {:?} v{}", + event.data, + event.version, + self.current_location(), + name, + version, + ))); + } - if &activity.name != name { - return Err(WorkflowError::HistoryDiverged(format!( - "expected activity {:?} at {}, found activity {:?}", - activity.name, - self.current_location(), - name, - ))); - } + // Validate history is consistent + let EventData::Activity(activity) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found activity {:?}", + event.data, + self.current_location(), + name, + ))); + }; - Ok(HistoryResult::Event(activity)) - } else { - Ok(HistoryResult::New) + if &activity.name != name { + return Err(WorkflowError::HistoryDiverged(format!( + "expected activity {:?} at {}, found activity {:?}", + activity.name, + self.current_location(), + name, + ))); } + + Ok(HistoryResult::Event(activity)) } - /// Returns `Some` if the current event is being replayed. pub fn compare_msg( &self, version: usize, msg_name: &str, ) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found message send {:?} v{}", - event.data, - event.version, - self.current_location(), - msg_name, - version, - ))); - } + if version > event.version { + return Ok(HistoryResult::Insertion); + } - // Validate history is consistent - let EventData::MessageSend(msg) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found message send {:?}", - event.data, - self.current_location(), - msg_name, - ))); - }; + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found message send {:?} v{}", + event.data, + event.version, + self.current_location(), + msg_name, + version, + ))); + } - if msg.name != msg_name { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found message send {:?}", - event.data, - self.current_location(), - msg_name, - ))); - } + // Validate history is consistent + let EventData::MessageSend(msg) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found message send {:?}", + event.data, + self.current_location(), + msg_name, + ))); + }; - Ok(HistoryResult::Event(msg)) - } else { - Ok(HistoryResult::New) + if msg.name != msg_name { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found message send {:?}", + event.data, + self.current_location(), + msg_name, + ))); } + + Ok(HistoryResult::Event(msg)) } - /// Returns `Some` if the current event is being replayed. pub fn compare_signal_send( &self, version: usize, signal_name: &str, ) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found signal send {:?} v{}", - event.data, - event.version, - self.current_location(), - signal_name, - version, - ))); - } + if version > event.version { + return Ok(HistoryResult::Insertion); + } - // Validate history is consistent - let EventData::SignalSend(signal) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found signal send {:?}", - event.data, - self.current_location(), - signal_name, - ))); - }; + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found signal send {:?} v{}", + event.data, + event.version, + self.current_location(), + signal_name, + version, + ))); + } - if signal.name != signal_name { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found signal send {:?}", - event.data, - self.current_location(), - signal_name, - ))); - } + // Validate history is consistent + let EventData::SignalSend(signal) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found signal send {:?}", + event.data, + self.current_location(), + signal_name, + ))); + }; - Ok(HistoryResult::Event(signal)) - } else { - Ok(HistoryResult::New) + if signal.name != signal_name { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found signal send {:?}", + event.data, + self.current_location(), + signal_name, + ))); } + + Ok(HistoryResult::Event(signal)) } - /// Returns `Some` if the current event is being replayed. pub fn compare_sub_workflow( &self, version: usize, sub_workflow_name: &str, ) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } - - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found sub workflow {:?} v{}", - event.data, - event.version, - self.current_location(), - sub_workflow_name, - version, - ))); - } - - // Validate history is consistent - let EventData::SubWorkflow(sub_workflow) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found sub workflow {:?}", - event.data, - self.current_location(), - sub_workflow_name, - ))); - }; - - if sub_workflow.name != sub_workflow_name { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found sub_workflow {:?}", - event.data, - self.current_location(), - sub_workflow_name, - ))); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - Ok(HistoryResult::Event(sub_workflow)) - } else { - Ok(HistoryResult::New) + if version > event.version { + return Ok(HistoryResult::Insertion); } - } - /// Returns `Some` if the current event is being replayed. - pub fn compare_signal(&self, version: usize) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } - - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found signal v{}", - event.data, - event.version, - self.current_location(), - version, - ))); - } + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found sub workflow {:?} v{}", + event.data, + event.version, + self.current_location(), + sub_workflow_name, + version, + ))); + } - // Validate history is consistent - let EventData::Signal(signal) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found signal", - event.data, - self.current_location(), - ))); - }; + // Validate history is consistent + let EventData::SubWorkflow(sub_workflow) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found sub workflow {:?}", + event.data, + self.current_location(), + sub_workflow_name, + ))); + }; - Ok(HistoryResult::Event(signal)) - } else { - Ok(HistoryResult::New) + if sub_workflow.name != sub_workflow_name { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found sub_workflow {:?}", + event.data, + self.current_location(), + sub_workflow_name, + ))); } + + Ok(HistoryResult::Event(sub_workflow)) } - /// Returns `Some` if the current event is being replayed. pub fn compare_loop(&self, version: usize) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } - - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found loop v{}", - event.data, - event.version, - self.current_location(), - version, - ))); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - // Validate history is consistent - let EventData::Loop(loop_event) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found loop", - event.data, - self.current_location(), - ))); - }; + if version > event.version { + return Ok(HistoryResult::Insertion); + } - Ok(HistoryResult::Event(loop_event)) - } else { - Ok(HistoryResult::New) + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found loop v{}", + event.data, + event.version, + self.current_location(), + version, + ))); } + + // Validate history is consistent + let EventData::Loop(loop_event) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found loop", + event.data, + self.current_location(), + ))); + }; + + Ok(HistoryResult::Event(loop_event)) } - /// Returns `Some` if the current event is being replayed. pub fn compare_sleep(&self, version: usize) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } - - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found sleep v{}", - event.data, - event.version, - self.current_location(), - version, - ))); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - // Validate history is consistent - let EventData::Sleep(sleep) = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found sleep", - event.data, - self.current_location(), - ))); - }; + if version > event.version { + return Ok(HistoryResult::Insertion); + } - Ok(HistoryResult::Event(sleep)) - } else { - Ok(HistoryResult::New) + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found sleep v{}", + event.data, + event.version, + self.current_location(), + version, + ))); } + + // Validate history is consistent + let EventData::Sleep(sleep) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found sleep", + event.data, + self.current_location(), + ))); + }; + + Ok(HistoryResult::Event(sleep)) } /// Returns `true` if the current event is being replayed. pub fn compare_branch(&self, version: usize) -> WorkflowResult> { - if let Some(event) = self.current_event() { - if version > event.version { - return Ok(HistoryResult::Insertion); - } - - if version < event.version { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} v{} at {}, found branch v{}", - event.data, - event.version, - self.current_location(), - version, - ))); - } + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; - // Validate history is consistent - let EventData::Branch = &event.data else { - return Err(WorkflowError::HistoryDiverged(format!( - "expected {} at {}, found branch", - event.data, - self.current_location(), - ))); - }; + if version > event.version { + return Ok(HistoryResult::Insertion); + } - Ok(HistoryResult::Event(())) - } else { - Ok(HistoryResult::New) + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found branch v{}", + event.data, + event.version, + self.current_location(), + version, + ))); } + + // Validate history is consistent + let EventData::Branch = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found branch", + event.data, + self.current_location(), + ))); + }; + + Ok(HistoryResult::Event(())) } /// Returns `true` if the current event is being replayed. @@ -526,84 +477,116 @@ impl Cursor { /// Returns `true` if the current event is being replayed. pub fn compare_removed(&self) -> WorkflowResult { - if let Some(event) = self.current_event() { - // Validate history is consistent - let valid = if let EventData::Removed(removed) = &event.data { - removed.event_type == T::event_type() && removed.name.as_deref() == T::name() - } else { - match T::event_type() { - EventType::Activity => { - if let EventData::Activity(activity) = &event.data { - T::name().expect("bad impl") == activity.name - } else { - false - } + let Some(event) = self.current_event() else { + return Ok(false); + }; + + // Validate history is consistent + let valid = if let EventData::Removed(removed) = &event.data { + removed.event_type == T::event_type() && removed.name.as_deref() == T::name() + } else { + match T::event_type() { + EventType::Activity => { + if let EventData::Activity(activity) = &event.data { + T::name().expect("bad impl") == activity.name + } else { + false } - EventType::SignalSend => { - if let EventData::SignalSend(signal) = &event.data { - T::name().expect("bad impl") == signal.name - } else { - false - } + } + EventType::Signal => matches!(event.data, EventData::Signals(_)), + EventType::SignalSend => { + if let EventData::SignalSend(signal) = &event.data { + T::name().expect("bad impl") == signal.name + } else { + false } - EventType::MessageSend => { - if let EventData::MessageSend(msg) = &event.data { - T::name().expect("bad impl") == msg.name - } else { - false - } + } + EventType::MessageSend => { + if let EventData::MessageSend(msg) = &event.data { + T::name().expect("bad impl") == msg.name + } else { + false } - EventType::Signal => matches!(event.data, EventData::Signal(_)), - EventType::Loop => matches!(event.data, EventData::Loop(_)), - EventType::Sleep => matches!(event.data, EventData::Sleep(_)), - EventType::SubWorkflow => { - if let EventData::SubWorkflow(sub_workflow) = &event.data { - T::name().expect("bad impl") == sub_workflow.name - } else { - false - } + } + EventType::Loop => matches!(event.data, EventData::Loop(_)), + EventType::Sleep => matches!(event.data, EventData::Sleep(_)), + EventType::SubWorkflow => { + if let EventData::SubWorkflow(sub_workflow) = &event.data { + T::name().expect("bad impl") == sub_workflow.name + } else { + false } - EventType::Branch => matches!(event.data, EventData::Branch), - _ => unreachable!("not implemented as a removable type"), } - }; - - if !valid { - let msg = if let Some(name) = T::name() { - format!( - "expected {} at {}, found removed {} {name:?}", - event.data, - self.current_location(), - T::event_type(), - ) - } else { - format!( - "expected {} at {}, found removed {}", - event.data, - self.current_location(), - T::event_type(), - ) - }; - - return Err(WorkflowError::HistoryDiverged(msg)); + EventType::Branch => matches!(event.data, EventData::Branch), + EventType::Signals => matches!(event.data, EventData::Signals(_)), + EventType::Removed => unreachable!(), + EventType::VersionCheck => unreachable!("not implemented as a removable type"), } + }; - Ok(true) - } else { - Ok(false) + if !valid { + let msg = if let Some(name) = T::name() { + format!( + "expected {} at {}, found removed {} {name:?}", + event.data, + self.current_location(), + T::event_type(), + ) + } else { + format!( + "expected {} at {}, found removed {}", + event.data, + self.current_location(), + T::event_type(), + ) + }; + + return Err(WorkflowError::HistoryDiverged(msg)); } + + Ok(true) } - /// Returns `Some` if the current event is being replayed. pub fn compare_version_check(&self) -> WorkflowResult> { - if let Some(event) = self.current_event() { - Ok(Some(( - matches!(event.data, EventData::VersionCheck), + let Some(event) = self.current_event() else { + return Ok(None); + }; + + Ok(Some(( + matches!(event.data, EventData::VersionCheck), + event.version, + ))) + } + + pub fn compare_signals(&self, version: usize) -> WorkflowResult> { + let Some(event) = self.current_event() else { + return Ok(HistoryResult::New); + }; + + if version > event.version { + return Ok(HistoryResult::Insertion); + } + + if version < event.version { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} v{} at {}, found signals v{}", + event.data, event.version, - ))) - } else { - Ok(None) + self.current_location(), + version, + ))); } + + // Validate history is consistent + let EventData::Signals(signals) = &event.data else { + return Err(WorkflowError::HistoryDiverged(format!( + "expected {} at {}, found signals", + event.data, + self.current_location(), + ))); + }; + + Ok(HistoryResult::Event(signals)) } } diff --git a/engine/packages/gasoline/src/history/event.rs b/engine/packages/gasoline/src/history/event.rs index 7d3f7300ef..09e919b54c 100644 --- a/engine/packages/gasoline/src/history/event.rs +++ b/engine/packages/gasoline/src/history/event.rs @@ -41,7 +41,6 @@ impl Deref for Event { #[derive(Debug)] pub enum EventData { Activity(ActivityEvent), - Signal(SignalEvent), SignalSend(SignalSendEvent), MessageSend(MessageSendEvent), SubWorkflow(SubWorkflowEvent), @@ -50,17 +49,13 @@ pub enum EventData { Removed(RemovedEvent), VersionCheck, Branch, - - /// NOTE: Strictly used as a placeholder for backfilling. When using this, the coordinate of the `Event` - /// must still be valid. - Empty, + Signals(SignalsEvent), } impl std::fmt::Display for EventData { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match &self { EventData::Activity(activity) => write!(f, "activity {:?}", activity.name), - EventData::Signal(signal) => write!(f, "signal {:?}", signal.name), EventData::SignalSend(signal_send) => write!(f, "signal send {:?}", signal_send.name), EventData::MessageSend(message_send) => { write!(f, "message send {:?}", message_send.name) @@ -79,7 +74,13 @@ impl std::fmt::Display for EventData { } EventData::VersionCheck => write!(f, "version check"), EventData::Branch => write!(f, "branch"), - EventData::Empty => write!(f, "empty"), + EventData::Signals(signals) => { + let mut unique_names = signals.names.clone(); + unique_names.sort(); + unique_names.dedup(); + + write!(f, "signals {:?}", unique_names.join(", ")) + } } } } @@ -87,6 +88,7 @@ impl std::fmt::Display for EventData { #[derive(Hash, Debug, Clone, Copy, PartialEq, Eq, FromRepr)] pub enum EventType { Activity = 0, + /// Deprecated. Signal = 1, SignalSend = 2, MessageSend = 3, @@ -96,6 +98,7 @@ pub enum EventType { Branch = 7, Removed = 8, VersionCheck = 9, + Signals = 10, } impl std::fmt::Display for EventType { @@ -111,6 +114,7 @@ impl std::fmt::Display for EventType { EventType::Removed => write!(f, "removed event"), EventType::VersionCheck => write!(f, "version check"), EventType::Branch => write!(f, "branch"), + EventType::Signals => write!(f, "signals"), } } } @@ -135,12 +139,6 @@ impl ActivityEvent { } } -#[derive(Debug)] -pub struct SignalEvent { - pub name: String, - pub body: Box, -} - #[derive(Debug)] pub struct SignalSendEvent { pub signal_id: Id, @@ -208,3 +206,9 @@ pub struct RemovedEvent { pub event_type: EventType, pub name: Option, } + +#[derive(Debug)] +pub struct SignalsEvent { + pub names: Vec, + pub bodies: Vec>, +} diff --git a/engine/packages/gasoline/src/listen.rs b/engine/packages/gasoline/src/listen.rs index 61000af545..7a7c6249de 100644 --- a/engine/packages/gasoline/src/listen.rs +++ b/engine/packages/gasoline/src/listen.rs @@ -3,22 +3,10 @@ use async_trait::async_trait; use crate::{ctx::ListenCtx, error::WorkflowResult}; /// A trait which allows listening for signals from the workflows database. This is used by -/// `WorkflowCtx::listen` and `WorkflowCtx::query_signal`. If you need a listener with state, use -/// `CustomListener`. +/// `WorkflowCtx::listen` and `WorkflowCtx::query_signal`. #[async_trait] pub trait Listen: Sized { /// This function may be polled by the `WorkflowCtx`. - async fn listen(ctx: &mut ListenCtx) -> WorkflowResult; + async fn listen(ctx: &mut ListenCtx, limit: usize) -> WorkflowResult>; fn parse(name: &str, body: &serde_json::value::RawValue) -> WorkflowResult; } - -/// A trait which allows listening for signals with a custom state. This is used by -/// `WorkflowCtx::custom_listener`. -#[async_trait] -pub trait CustomListener: Sized { - type Output; - - /// This function may be polled by the `WorkflowCtx`. - async fn listen(&self, ctx: &mut ListenCtx) -> WorkflowResult; - fn parse(name: &str, body: &serde_json::value::RawValue) -> WorkflowResult; -} diff --git a/engine/packages/gasoline/src/metrics.rs b/engine/packages/gasoline/src/metrics.rs index 5687c8d2a8..b4f05d8bb1 100644 --- a/engine/packages/gasoline/src/metrics.rs +++ b/engine/packages/gasoline/src/metrics.rs @@ -95,11 +95,6 @@ lazy_static::lazy_static! { .with_boundaries(BUCKETS.to_vec()) .build(); /// Expected attributes: "workflow_name", "signal_name" - pub static ref SIGNAL_PULL_DURATION: Histogram = METER.f64_histogram("rivet_gasoline_signal_pull_duration") - .with_description("Total duration to pull signals.") - .with_boundaries(BUCKETS.to_vec()) - .build(); - /// Expected attributes: "workflow_name", "signal_name" pub static ref SIGNAL_PUBLISHED: Counter = METER.u64_counter("rivet_gasoline_signal_published") .with_description("Total published signals.") .build(); diff --git a/engine/packages/gasoline/src/prelude.rs b/engine/packages/gasoline/src/prelude.rs index ffd4c77929..60cceeee82 100644 --- a/engine/packages/gasoline/src/prelude.rs +++ b/engine/packages/gasoline/src/prelude.rs @@ -17,7 +17,7 @@ pub use crate::{ error::{WorkflowError, WorkflowResult}, executable::Executable, history::removed::*, - listen::{CustomListener, Listen}, + listen::Listen, message::Message as MessageTrait, operation::Operation as OperationTrait, registry::Registry, diff --git a/engine/packages/gasoline/src/signal.rs b/engine/packages/gasoline/src/signal.rs index 215d66081a..13d7ba0be5 100644 --- a/engine/packages/gasoline/src/signal.rs +++ b/engine/packages/gasoline/src/signal.rs @@ -57,12 +57,15 @@ macro_rules! join_signal { #[async_trait::async_trait] impl Listen for $join { #[tracing::instrument(skip_all, fields(t=std::any::type_name::()))] - async fn listen(ctx: &mut gas::prelude::ListenCtx) -> gas::prelude::WorkflowResult { - let row = ctx.listen_any(&[ - $(<$just_types as gas::signal::Signal>::NAME),* - ]).await?; - - Self::parse(&row.signal_name, &row.body) + async fn listen(ctx: &mut gas::prelude::ListenCtx, limit: usize) -> gas::prelude::WorkflowResult> { + ctx + .listen_any(&[ + $(<$just_types as gas::signal::Signal>::NAME),* + ], limit) + .await? + .into_iter() + .map(|signal| Self::parse(&signal.signal_name, &signal.body)) + .collect() } fn parse(name: &str, body: &serde_json::value::RawValue) -> gas::prelude::WorkflowResult {