Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ impl WorkflowCtx {
}
}
Err(err) => {
tracing::warn!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");

// Retry the workflow if its recoverable
let deadline = if err.is_recoverable() {
Some(rivet_util::timestamp::now() + RETRY_TIMEOUT.as_millis() as i64)
Expand All @@ -209,10 +207,16 @@ impl WorkflowCtx {
// be retried when a signal is published
let wake_signals = err.signals();

// This sub workflow come from a `wait_for_workflow` call on a workflow that did not
// This sub workflow comes from a `wait_for_workflow` call on a workflow that did not
// finish. This workflow will be retried when the sub workflow completes
let wake_sub_workflow = err.sub_workflow();

if deadline.is_some() || !wake_signals.is_empty() || wake_sub_workflow.is_some() {
tracing::info!(name=%self.name, id=%self.workflow_id, ?err, "workflow sleeping");
} else {
tracing::error!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");
}

let err_str = err.to_string();

let mut retries = 0;
Expand Down Expand Up @@ -392,6 +396,7 @@ impl WorkflowCtx {
tracing::debug!(
name=%self.name,
id=%self.workflow_id,
sub_workflow_name=%sub_workflow.name,
sub_workflow_id=%sub_workflow.sub_workflow_id,
"replaying workflow dispatch"
);
Expand Down Expand Up @@ -620,7 +625,7 @@ impl WorkflowCtx {
// Activity succeeded
if let Some(output) = activity.parse_output().map_err(GlobalError::raw)? {
tracing::debug!(id=%self.workflow_id, activity_name=%I::Activity::NAME, "replaying activity");

output
}
// Activity failed, retry
Expand Down
86 changes: 47 additions & 39 deletions lib/chirp-workflow/core/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,16 @@ pub async fn sleep_until_ts(ts: i64) {
}
}

/// Takes activity, signal, and sub workflow events (each with their own location) and combines them via enum
/// into a hashmap of the following structure:
/// Takes all workflow events (each with their own location) and combines them via enum into a hashmap of the
/// following structure:
///
/// Given the location [1, 2, 3], 3 is the index and [1, 2] is the root location
///
/// HashMap {
/// [1, 2]: [
/// example signal event,
/// example activity event,
/// example sub workflow event,
/// example activity event (this is [1, 2, 3])
/// ],
/// }
Expand All @@ -74,6 +75,7 @@ pub fn combine_events(
msg_send_events: Vec<MessageSendEventRow>,
sub_workflow_events: Vec<SubWorkflowEventRow>,
) -> WorkflowResult<Vec<PulledWorkflow>> {
// Map workflow rows by workflow id
let mut workflows_by_id = workflow_rows
.into_iter()
.map(|row| {
Expand All @@ -89,8 +91,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -103,8 +103,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -117,8 +115,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -131,8 +127,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -145,8 +139,6 @@ pub fn combine_events(
.expect("unreachable, workflow for event not found");
let (root_location, idx) = split_location(&event.location);

insert_placeholder(events_by_location, &event.location, idx);

events_by_location
.entry(root_location)
.or_default()
Expand All @@ -156,19 +148,35 @@ pub fn combine_events(
let workflows = workflows_by_id
.into_values()
.map(|(row, mut events_by_location)| {
// TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting into a
// hashmap. Could be improved by iterating over both lists simultaneously and sorting each item at a
// time before inserting
// Sort all of the events because we are inserting from two different lists. Both lists are already
// sorted themselves so this should be fairly cheap
// TODO(RVT-3754): This involves inserting, sorting, then recollecting into lists and recollecting
// into a hashmap
// Sort all of the events because we are inserting from two different lists. Both lists are
// already sorted themselves so this should be fairly cheap
for (_, list) in events_by_location.iter_mut() {
list.sort_by_key(|(idx, _)| *idx);
}

// Remove idx from lists
let event_history = events_by_location
.into_iter()
.map(|(k, v)| (k, v.into_iter().map(|(_, v)| v).collect()))
.map(|(k, events)| {
let mut expected_idx = 0;

// Check for missing indexes and insert a `Branch` placeholder event for each missing spot
let events = events
.into_iter()
.flat_map(|(idx, v)| {
let offset = (idx - expected_idx) as usize;
expected_idx = idx + 1;

std::iter::repeat_with(|| Event::Branch)
.take(offset)
.chain(std::iter::once(v))
})
.collect();

(k, events)
})
.collect();

PulledWorkflow {
Expand Down Expand Up @@ -197,27 +205,27 @@ fn split_location(location: &[i64]) -> (Location, i64) {
)
}

// Insert placeholder record into parent location list (ex. for [4, 0] insert into the [] list at
// the 4th index)
fn insert_placeholder(
events_by_location: &mut HashMap<Location, Vec<(i64, Event)>>,
location: &[i64],
idx: i64,
) {
if idx == 0 && location.len() > 1 {
let parent_location = location
.iter()
.take(location.len().saturating_sub(2))
.map(|x| *x as usize)
.collect::<Location>();
let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap();

events_by_location
.entry(parent_location)
.or_default()
.push((parent_idx, Event::Branch));
}
}
// // Insert placeholder record into parent location list (ex. for the location [4, 0], insert placeholder into
// // the [] list at the 4th index)
// fn insert_placeholder(
// events_by_location: &mut HashMap<Location, Vec<(i64, Event)>>,
// location: &[i64],
// idx: i64,
// ) {
// if idx == 0 && location.len() > 1 {
// let parent_location = location
// .iter()
// .take(location.len().saturating_sub(2))
// .map(|x| *x as usize)
// .collect::<Location>();
// let parent_idx = *location.get(location.len().saturating_sub(2)).unwrap();

// events_by_location
// .entry(parent_location)
// .or_default()
// .push((parent_idx, Event::Branch));
// }
// }

pub fn inject_fault() -> GlobalResult<()> {
if rand::thread_rng().gen_range(0..100) < FAULT_RATE {
Expand Down