From fe2c38ec19133b6613c404076039aaca74280634 Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Fri, 2 Aug 2024 03:29:54 +0000 Subject: [PATCH] fix(workflows): fix invalid event history graph (#996) ## Changes --- lib/chirp-workflow/core/src/ctx/workflow.rs | 13 +++- lib/chirp-workflow/core/src/util.rs | 86 +++++++++++---------- 2 files changed, 56 insertions(+), 43 deletions(-) diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 8778b871b0..5d9b4cc0e2 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -196,8 +196,6 @@ impl WorkflowCtx { } } Err(err) => { - tracing::warn!(name=%self.name, id=%self.workflow_id, ?err, "workflow error"); - // Retry the workflow if its recoverable let deadline = if err.is_recoverable() { Some(rivet_util::timestamp::now() + RETRY_TIMEOUT.as_millis() as i64) @@ -209,10 +207,16 @@ impl WorkflowCtx { // be retried when a signal is published let wake_signals = err.signals(); - // This sub workflow come from a `wait_for_workflow` call on a workflow that did not + // This sub workflow comes from a `wait_for_workflow` call on a workflow that did not // finish. This workflow will be retried when the sub workflow completes let wake_sub_workflow = err.sub_workflow(); + if deadline.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() { + tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping"); + } else { + tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error"); + } + let err_str = err.to_string(); let mut retries = 0; @@ -392,6 +396,7 @@ impl WorkflowCtx { tracing::debug!( name=%self.name, id=%self.workflow_id, + sub_workflow_name=%sub_workflow.name, sub_workflow_id=%sub_workflow.sub_workflow_id, "replaying workflow dispatch" ); @@ -620,7 +625,7 @@ impl WorkflowCtx { // Activity succeeded if let Some(output) = activity.parse_output().map_err(GlobalError::raw)? { tracing::debug!(id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity"); - + output } // Activity failed, retry diff --git a/lib/chirp-workflow/core/src/util.rs b/lib/chirp-workflow/core/src/util.rs index cacb91551c..ee8ccbe88c 100644 --- a/lib/chirp-workflow/core/src/util.rs +++ b/lib/chirp-workflow/core/src/util.rs @@ -54,8 +54,8 @@ pub async fn sleep_until_ts(ts: i64) { } } -/// Takes activity, signal, and sub workflow events (each with their own location) and combines them via enum -/// into a hashmap of the following structure: +/// Takes all workflow events (each with their own location) and combines them via enum into a hashmap of the +/// following structure: /// /// Given the location [1, 2, 3], 3 is the index and [1, 2] is the root location /// @@ -63,6 +63,7 @@ pub async fn sleep_until_ts(ts: i64) { /// [1, 2]: [ /// example signal event, /// example activity event, +/// example sub workflow event, /// example activity event (this is [1, 2, 3]) /// ], /// } @@ -74,6 +75,7 @@ pub fn combine_events( msg_send_events: Vec, sub_workflow_events: Vec, ) -> WorkflowResult> { + // Map workflow rows by workflow id let mut workflows_by_id = workflow_rows .into_iter() .map(|row| { @@ -89,8 +91,6 @@ pub fn combine_events( .expect("unreachable, workflow for event not found"); let (root_location, idx) = split_location(&event.location); - insert_placeholder(events_by_location, &event.location, idx); - events_by_location .entry(root_location) .or_default() @@ -103,8 +103,6 @@ pub fn combine_events( .expect("unreachable, workflow for event not found"); let (root_location, idx) = split_location(&event.location); - insert_placeholder(events_by_location, &event.location, idx); - events_by_location .entry(root_location) .or_default() @@ -117,8 +115,6 @@ pub fn combine_events( .expect("unreachable, workflow for event not found"); let (root_location, idx) = split_location(&event.location); - insert_placeholder(events_by_location, &event.location, idx); - events_by_location .entry(root_location) .or_default() @@ -131,8 +127,6 @@ pub fn combine_events( .expect("unreachable, workflow for event not found"); let (root_location, idx) = split_location(&event.location); - insert_placeholder(events_by_location, &event.location, idx); - events_by_location .entry(root_location) .or_default() @@ -145,8 +139,6 @@ pub fn combine_events( .expect("unreachable, workflow for event not found"); let (root_location, idx) = split_location(&event.location); - insert_placeholder(events_by_location, &event.location, idx); - events_by_location .entry(root_location) .or_default() @@ -156,11 +148,10 @@ pub fn combine_events( let workflows = workflows_by_id .into_values() .map(|(row, mut events_by_location)| { - // TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting into a - // hashmap. Could be improved by iterating over both lists simultaneously and sorting each item at a - // time before inserting - // Sort all of the events because we are inserting from two different lists. Both lists are already - // sorted themselves so this should be fairly cheap + // TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting + // into a hashmap + // Sort all of the events because we are inserting from two different lists. Both lists are + // already sorted themselves so this should be fairly cheap for (_, list) in events_by_location.iter_mut() { list.sort_by_key(|(idx, _)| *idx); } @@ -168,7 +159,24 @@ pub fn combine_events( // Remove idx from lists let event_history = events_by_location .into_iter() - .map(|(k, v)| (k, v.into_iter().map(|(_, v)| v).collect())) + .map(|(k, events)| { + let mut expected_idx = 0; + + // Check for missing indexes and insert a `Branch` placeholder event for each missing spot + let events = events + .into_iter() + .flat_map(|(idx, v)| { + let offset = (idx - expected_idx) as usize; + expected_idx = idx + 1; + + std::iter::repeat_with(|| Event::Branch) + .take(offset) + .chain(std::iter::once(v)) + }) + .collect(); + + (k, events) + }) .collect(); PulledWorkflow { @@ -197,27 +205,27 @@ fn split_location(location: &[i64]) -> (Location, i64) { ) } -// Insert placeholder record into parent location list (ex. for [4, 0] insert into the [] list at -// the 4th index) -fn insert_placeholder( - events_by_location: &mut HashMap>, - location: &[i64], - idx: i64, -) { - if idx == 0 && location.len() > 1 { - let parent_location = location - .iter() - .take(location.len().saturating_sub(2)) - .map(|x| *x as usize) - .collect::(); - let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap(); - - events_by_location - .entry(parent_location) - .or_default() - .push((parent_idx, Event::Branch)); - } -} +// // Insert placeholder record into parent location list (ex. for the location [4, 0], insert placeholder into +// // the [] list at the 4th index) +// fn insert_placeholder( +// events_by_location: &mut HashMap>, +// location: &[i64], +// idx: i64, +// ) { +// if idx == 0 && location.len() > 1 { +// let parent_location = location +// .iter() +// .take(location.len().saturating_sub(2)) +// .map(|x| *x as usize) +// .collect::(); +// let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap(); + +// events_by_location +// .entry(parent_location) +// .or_default() +// .push((parent_idx, Event::Branch)); +// } +// } pub fn inject_fault() -> GlobalResult<()> { if rand::thread_rng().gen_range(0..100) < FAULT_RATE {