Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,15 @@ impl WorkflowCtx {
}

async fn run_workflow_inner(&mut self) -> WorkflowResult<()> {
tracing::info!(id=%self.workflow_id, "running workflow");
tracing::info!(name=%self.name, id=%self.workflow_id, "running workflow");

// Lookup workflow
let workflow = self.registry.get_workflow(&self.name)?;

// Run workflow
match (workflow.run)(self).await {
Ok(output) => {
tracing::info!(id=%self.workflow_id, "workflow success");
tracing::info!(name=%self.name, id=%self.workflow_id, "workflow success");

let mut retries = 0;
let mut interval = tokio::time::interval(DB_ACTION_RETRY);
Expand All @@ -185,7 +185,7 @@ impl WorkflowCtx {
}
}
Err(err) => {
tracing::warn!(id=%self.workflow_id, ?err, "workflow error");
tracing::warn!(name=%self.name, id=%self.workflow_id, ?err, "workflow error");

// Retry the workflow if its recoverable
let deadline = if err.is_recoverable() {
Expand Down Expand Up @@ -335,9 +335,10 @@ impl WorkflowCtx {
};

tracing::info!(
workflow_name=%self.name,
workflow_id=%self.workflow_id,
signal_id=%signal.signal_id,
name=%signal.signal_name,
signal_name=%signal.signal_name,
"signal received",
);

Expand Down Expand Up @@ -366,6 +367,7 @@ impl WorkflowCtx {
}

tracing::debug!(
name=%self.name,
id=%self.workflow_id,
sub_workflow_id=%sub_workflow.sub_workflow_id,
"replaying workflow dispatch"
Expand Down Expand Up @@ -457,8 +459,9 @@ impl WorkflowCtx {
// Lookup workflow
let Ok(workflow) = self.registry.get_workflow(I::Workflow::NAME) else {
tracing::warn!(
name=%self.name,
id=%self.workflow_id,
name=%I::Workflow::NAME,
sub_workflow_name=%I::Workflow::NAME,
"sub workflow not found in current registry",
);

Expand All @@ -474,7 +477,7 @@ impl WorkflowCtx {
return Ok(output);
};

tracing::info!(id=%self.workflow_id, name=%I::Workflow::NAME, "running sub workflow");
tracing::info!(name=%self.name, id=%self.workflow_id, sub_workflow_name=%I::Workflow::NAME, "running sub workflow");

// Create a new branched workflow context for the sub workflow
let mut ctx = WorkflowCtx {
Expand Down Expand Up @@ -629,13 +632,13 @@ impl WorkflowCtx {
return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw);
};

tracing::debug!(id=%self.workflow_id, name=%signal.name, "replaying signal");
tracing::debug!(name=%self.name, id=%self.workflow_id, signal_name=%signal.name, "replaying signal");

T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?
}
// Listen for new messages
else {
tracing::debug!(id=%self.workflow_id, "listening for signal");
tracing::debug!(name=%self.name, id=%self.workflow_id, "listening for signal");

let mut retries = 0;
let mut interval = tokio::time::interval(SIGNAL_RETRY);
Expand Down Expand Up @@ -669,7 +672,7 @@ impl WorkflowCtx {

// Signal received before
let signal = if let Some(event) = event {
tracing::debug!(id=%self.workflow_id, "replaying signal");
tracing::debug!(name=%self.name, id=%self.workflow_id, "replaying signal");

// Validate history is consistent
let Event::Signal(signal) = event else {
Expand Down