Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions lib/chirp-workflow/core/src/builder/common/message.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt::Display;

use global_error::{GlobalError, GlobalResult};
use global_error::GlobalResult;
use serde::Serialize;

use crate::{builder::BuilderError, ctx::MessageCtx, message::Message};
Expand All @@ -10,7 +10,7 @@ pub struct MessageBuilder<'a, M: Message> {
body: M,
tags: serde_json::Map<String, serde_json::Value>,
wait: bool,
error: Option<GlobalError>,
error: Option<BuilderError>,
}

impl<'a, M: Message> MessageBuilder<'a, M> {
Expand Down Expand Up @@ -66,7 +66,7 @@ impl<'a, M: Message> MessageBuilder<'a, M> {

pub async fn send(self) -> GlobalResult<()> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

tracing::info!(msg_name=%M::NAME, tags=?self.tags, "dispatching message");
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/builder/common/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct SignalBuilder<T: Signal + Serialize> {
body: T,
to_workflow_id: Option<Uuid>,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<GlobalError>,
error: Option<BuilderError>,
}

impl<T: Signal + Serialize> SignalBuilder<T> {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl<T: Signal + Serialize> SignalBuilder<T> {

pub async fn send(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

let signal_id = Uuid::new_v4();
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/builder/common/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct WorkflowBuilder<I: WorkflowInput> {
ray_id: Uuid,
input: I,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<GlobalError>,
error: Option<BuilderError>,
}

impl<I: WorkflowInput> WorkflowBuilder<I>
Expand Down Expand Up @@ -66,7 +66,7 @@ where

pub async fn dispatch(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

let workflow_name = I::Workflow::NAME;
Expand Down
5 changes: 5 additions & 0 deletions lib/chirp-workflow/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,9 @@ pub(crate) enum BuilderError {
NoWorkflowIdOrTags,
#[error("cannot dispatch a workflow/signal from an operation within a workflow execution. trigger it from the workflow's body")]
CannotDispatchFromOpInWorkflow,
#[error("using tags on a sub workflow ({0}) with `.output()` is not supported")]
TagsOnSubWorkflowOutputNotSupported(&'static str),

#[error("serde: {0}")]
Serde(#[from] serde_json::Error),
}
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/builder/workflow/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct MessageBuilder<'a, M: Message> {
body: M,
tags: serde_json::Map<String, serde_json::Value>,
wait: bool,
error: Option<GlobalError>,
error: Option<BuilderError>,
}

impl<'a, M: Message> MessageBuilder<'a, M> {
Expand Down Expand Up @@ -68,7 +68,7 @@ impl<'a, M: Message> MessageBuilder<'a, M> {

pub async fn send(self) -> GlobalResult<()> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

let event = self.ctx.current_history_event();
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/builder/workflow/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct SignalBuilder<'a, T: Signal + Serialize> {
body: T,
to_workflow_id: Option<Uuid>,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<GlobalError>,
error: Option<BuilderError>,
}

impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
Expand Down Expand Up @@ -69,7 +69,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {

pub async fn send(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

let event = self.ctx.current_history_event();
Expand Down
167 changes: 115 additions & 52 deletions lib/chirp-workflow/core/src/builder/workflow/sub_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use uuid::Uuid;
use crate::{
builder::BuilderError,
ctx::WorkflowCtx,
error::WorkflowError,
error::{WorkflowError, WorkflowResult},
event::Event,
workflow::{Workflow, WorkflowInput},
};

pub struct SubWorkflowBuilder<'a, I: WorkflowInput> {
ctx: &'a mut WorkflowCtx,
input: I,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<GlobalError>,
error: Option<BuilderError>,
}

impl<'a, I: WorkflowInput> SubWorkflowBuilder<'a, I>
Expand Down Expand Up @@ -63,67 +64,32 @@ where

pub async fn dispatch(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

let sub_workflow_name = I::Workflow::NAME;
let sub_workflow_id = Uuid::new_v4();

let no_tags = self.tags.is_empty();
let tags = serde_json::Value::Object(self.tags);
let tags = if no_tags { None } else { Some(&tags) };

tracing::info!(
name=%self.ctx.name(),
id=%self.ctx.workflow_id(),
%sub_workflow_name,
%sub_workflow_id,
?tags,
input=?self.input,
"dispatching sub workflow"
);

// Serialize input
let input_val = serde_json::to_value(&self.input)
.map_err(WorkflowError::SerializeWorkflowOutput)
.map_err(GlobalError::raw)?;
let tags = if self.tags.is_empty() {
None
} else {
Some(serde_json::Value::Object(self.tags))
};

self.ctx
.db()
.dispatch_sub_workflow(
self.ctx.ray_id(),
self.ctx.workflow_id(),
self.ctx.full_location().as_ref(),
sub_workflow_id,
&sub_workflow_name,
tags,
input_val,
self.ctx.loop_location(),
)
Self::dispatch_workflow_inner(self.ctx, self.input, tags)
.await
.map_err(GlobalError::raw)?;

tracing::info!(
name=%self.ctx.name(),
id=%self.ctx.workflow_id(),
%sub_workflow_name,
?sub_workflow_id,
"sub workflow dispatched"
);

Ok(sub_workflow_id)
.map_err(GlobalError::raw)
}

pub async fn output(
self,
) -> GlobalResult<<<I as WorkflowInput>::Workflow as Workflow>::Output> {
if let Some(err) = self.error {
return Err(err);
return Err(err.into());
}

let no_tags = self.tags.is_empty();
let tags = serde_json::Value::Object(self.tags);
let tags = if no_tags { None } else { Some(&tags) };
if !self.tags.is_empty() {
return Err(
BuilderError::TagsOnSubWorkflowOutputNotSupported(I::Workflow::NAME).into(),
);
}

// Lookup workflow
let Ok(workflow) = self.ctx.registry().get_workflow(I::Workflow::NAME) else {
Expand All @@ -134,11 +100,19 @@ where
"sub workflow not found in current registry",
);

let tags = if self.tags.is_empty() {
None
} else {
Some(serde_json::Value::Object(self.tags))
};

// TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub
// worker in the registry, this will diverge in history because it will try to run the sub worker
// in-process during the replay
// If the workflow isn't in the current registry, dispatch the workflow instead
let sub_workflow_id = self.ctx.dispatch_workflow_inner(tags, self.input).await?;
let sub_workflow_id = Self::dispatch_workflow_inner(self.ctx, self.input, tags)
.await
.map_err(GlobalError::raw)?;
let output = self
.ctx
.wait_for_workflow::<I::Workflow>(sub_workflow_id)
Expand Down Expand Up @@ -168,4 +142,93 @@ where

Ok(output)
}

async fn dispatch_workflow_inner(
ctx: &mut WorkflowCtx,
input: I,
tags: Option<serde_json::Value>,
) -> WorkflowResult<Uuid>
where
I: WorkflowInput,
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let event = ctx.current_history_event();

// Signal received before
let id = if let Some(event) = event {
// Validate history is consistent
let Event::SubWorkflow(sub_workflow) = event else {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event} at {}, found sub workflow {}",
ctx.loc(),
I::Workflow::NAME
)));
};

if sub_workflow.name != I::Workflow::NAME {
return Err(WorkflowError::HistoryDiverged(format!(
"expected {event} at {}, found sub_workflow {}",
ctx.loc(),
I::Workflow::NAME
)));
}

tracing::debug!(
name=%ctx.name(),
id=%ctx.workflow_id(),
sub_workflow_name=%sub_workflow.name,
sub_workflow_id=%sub_workflow.sub_workflow_id,
"replaying workflow dispatch"
);

sub_workflow.sub_workflow_id
}
// Dispatch new workflow
else {
let sub_workflow_name = I::Workflow::NAME;
let sub_workflow_id = Uuid::new_v4();

tracing::info!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
%sub_workflow_id,
?tags,
?input,
"dispatching sub workflow"
);

// Serialize input
let input_val =
serde_json::to_value(input).map_err(WorkflowError::SerializeWorkflowOutput)?;

ctx.db()
.dispatch_sub_workflow(
ctx.ray_id(),
ctx.workflow_id(),
ctx.full_location().as_ref(),
sub_workflow_id,
&sub_workflow_name,
tags.as_ref(),
input_val,
ctx.loop_location(),
)
.await?;

tracing::info!(
name=%ctx.name(),
id=%ctx.workflow_id(),
%sub_workflow_name,
?sub_workflow_id,
"sub workflow dispatched"
);

sub_workflow_id
};

// Move to next event
ctx.inc_location();

Ok(id)
}
}
Loading