Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions lib/chirp-workflow/core/src/builder/common/message.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use std::fmt::Display;

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;

use crate::{builder::BuilderError, ctx::MessageCtx, message::Message};

pub struct MessageBuilder<'a, M: Message> {
msg_ctx: &'a MessageCtx,
body: M,
tags: serde_json::Map<String, serde_json::Value>,
wait: bool,
error: Option<GlobalError>,
}

impl<'a, M: Message> MessageBuilder<'a, M> {
pub(crate) fn new(msg_ctx: &'a MessageCtx, body: M) -> Self {
MessageBuilder {
msg_ctx,
body,
tags: serde_json::Map::new(),
wait: false,
error: None,
}
}

pub fn tags(mut self, tags: serde_json::Value) -> Self {
if self.error.is_some() {
return self;
}

match tags {
serde_json::Value::Object(map) => {
self.tags.extend(map);
}
_ => self.error = Some(BuilderError::TagsNotMap.into()),
}

self
}

pub fn tag(mut self, k: impl Display, v: impl Serialize) -> Self {
if self.error.is_some() {
return self;
}

match serde_json::to_value(&v) {
Ok(v) => {
self.tags.insert(k.to_string(), v);
}
Err(err) => self.error = Some(err.into()),
}

self
}

pub async fn wait(mut self) -> Self {
if self.error.is_some() {
return self;
}

self.wait = true;

self
}

pub async fn send(self) -> GlobalResult<()> {
if let Some(err) = self.error {
return Err(err);
}

tracing::info!(msg_name=%M::NAME, tags=?self.tags, "dispatching message");

let tags = serde_json::Value::Object(self.tags);

if self.wait {
self.msg_ctx.message_wait(tags, self.body).await?;
} else {
self.msg_ctx.message(tags, self.body).await?;
}

Ok(())
}
}
5 changes: 5 additions & 0 deletions lib/chirp-workflow/core/src/builder/common/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! This module contains builders used by all ctx's besides the workflow ctx.

pub mod message;
pub mod signal;
pub mod workflow;
111 changes: 111 additions & 0 deletions lib/chirp-workflow/core/src/builder/common/signal.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
use std::fmt::Display;

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
use uuid::Uuid;

use crate::{builder::BuilderError, db::DatabaseHandle, error::WorkflowError, signal::Signal};

pub struct SignalBuilder<T: Signal + Serialize> {
db: DatabaseHandle,
ray_id: Uuid,
body: T,
to_workflow_id: Option<Uuid>,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<GlobalError>,
}

impl<T: Signal + Serialize> SignalBuilder<T> {
pub(crate) fn new(db: DatabaseHandle, ray_id: Uuid, body: T) -> Self {
SignalBuilder {
db,
ray_id,
body,
to_workflow_id: None,
tags: serde_json::Map::new(),
error: None,
}
}

pub fn to_workflow(mut self, workflow_id: Uuid) -> Self {
if self.error.is_some() {
return self;
}

self.to_workflow_id = Some(workflow_id);

self
}

pub fn tags(mut self, tags: serde_json::Value) -> Self {
if self.error.is_some() {
return self;
}

match tags {
serde_json::Value::Object(map) => {
self.tags.extend(map);
}
_ => self.error = Some(BuilderError::TagsNotMap.into()),
}

self
}

pub fn tag(mut self, k: impl Display, v: impl Serialize) -> Self {
if self.error.is_some() {
return self;
}

match serde_json::to_value(&v) {
Ok(v) => {
self.tags.insert(k.to_string(), v);
}
Err(err) => self.error = Some(err.into()),
}

self
}

pub async fn send(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err);
}

let signal_id = Uuid::new_v4();

// Serialize input
let input_val = serde_json::to_value(&self.body)
.map_err(WorkflowError::SerializeSignalBody)
.map_err(GlobalError::raw)?;

match (self.to_workflow_id, self.tags.is_empty()) {
(Some(workflow_id), true) => {
tracing::info!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal");

self.db
.publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, input_val)
.await
.map_err(GlobalError::raw)?;
}
(None, false) => {
tracing::info!(signal_name=%T::NAME, tags=?self.tags, %signal_id, "dispatching tagged signal");

self.db
.publish_tagged_signal(
self.ray_id,
&serde_json::Value::Object(self.tags),
signal_id,
T::NAME,
input_val,
)
.await
.map_err(GlobalError::raw)?;
}
(Some(_), false) => return Err(BuilderError::WorkflowIdAndTags.into()),
(None, true) => return Err(BuilderError::NoWorkflowIdOrTags.into()),
}

Ok(signal_id)
}
}
108 changes: 108 additions & 0 deletions lib/chirp-workflow/core/src/builder/common/workflow.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
use std::fmt::Display;

use global_error::{GlobalError, GlobalResult};
use serde::Serialize;
use uuid::Uuid;

use crate::{
builder::BuilderError,
ctx::common,
db::DatabaseHandle,
error::WorkflowError,
workflow::{Workflow, WorkflowInput},
};

pub struct WorkflowBuilder<I: WorkflowInput> {
db: DatabaseHandle,
ray_id: Uuid,
input: I,
tags: serde_json::Map<String, serde_json::Value>,
error: Option<GlobalError>,
}

impl<I: WorkflowInput> WorkflowBuilder<I>
where
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
pub(crate) fn new(db: DatabaseHandle, ray_id: Uuid, input: I) -> Self {
WorkflowBuilder {
db,
ray_id,
input,
tags: serde_json::Map::new(),
error: None,
}
}

pub fn tags(mut self, tags: serde_json::Value) -> Self {
if self.error.is_some() {
return self;
}

match tags {
serde_json::Value::Object(map) => {
self.tags.extend(map);
}
_ => self.error = Some(BuilderError::TagsNotMap.into()),
}

self
}

pub fn tag(mut self, k: impl Display, v: impl Serialize) -> Self {
if self.error.is_some() {
return self;
}

match serde_json::to_value(&v) {
Ok(v) => {
self.tags.insert(k.to_string(), v);
}
Err(err) => self.error = Some(err.into()),
}

self
}

pub async fn dispatch(self) -> GlobalResult<Uuid> {
if let Some(err) = self.error {
return Err(err);
}

let workflow_name = I::Workflow::NAME;
let workflow_id = Uuid::new_v4();

let no_tags = self.tags.is_empty();
let tags = serde_json::Value::Object(self.tags);
let tags = if no_tags { None } else { Some(&tags) };

tracing::info!(
%workflow_name,
%workflow_id,
?tags,
input=?self.input,
"dispatching workflow"
);

// Serialize input
let input_val = serde_json::to_value(&self.input)
.map_err(WorkflowError::SerializeWorkflowOutput)
.map_err(GlobalError::raw)?;

self.db
.dispatch_workflow(self.ray_id, workflow_id, &workflow_name, tags, input_val)
.await
.map_err(GlobalError::raw)?;

Ok(workflow_id)
}

pub async fn output(
self,
) -> GlobalResult<<<I as WorkflowInput>::Workflow as Workflow>::Output> {
let db = self.db.clone();

let workflow_id = self.dispatch().await?;
common::wait_for_workflow::<I::Workflow>(&db, workflow_id).await
}
}
14 changes: 14 additions & 0 deletions lib/chirp-workflow/core/src/builder/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
pub mod common;
pub mod workflow;

#[derive(thiserror::Error, Debug)]
pub(crate) enum BuilderError {
#[error("tags must be a JSON map")]
TagsNotMap,
#[error("cannot call `to_workflow` and set tags on the same signal")]
WorkflowIdAndTags,
#[error("must call `to_workflow` or set tags on signal")]
NoWorkflowIdOrTags,
#[error("cannot dispatch a workflow/signal from an operation within a workflow execution. trigger it from the workflow's body")]
CannotDispatchFromOpInWorkflow,
}
Loading