Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 21 additions & 15 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ where
bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body.");
}

let name = I::Workflow::NAME;
let id = Uuid::new_v4();
let workflow_name = I::Workflow::NAME;
let workflow_id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");
tracing::info!(%workflow_name, %workflow_id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -46,13 +46,13 @@ where

db_from_ctx(ctx)
.await?
.dispatch_workflow(ctx.ray_id(), id, &name, None, input_val)
.dispatch_workflow(ctx.ray_id(), workflow_id, &workflow_name, None, input_val)
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(%workflow_name, ?workflow_id, "workflow dispatched");

Ok(id)
Ok(workflow_id)
}

pub async fn dispatch_tagged_workflow<I, B>(
Expand All @@ -69,10 +69,10 @@ where
bail!("cannot dispatch a workflow from an operation within a workflow execution. trigger it from the workflow's body.");
}

let name = I::Workflow::NAME;
let id = Uuid::new_v4();
let workflow_name = I::Workflow::NAME;
let workflow_id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");
tracing::info!(%workflow_name, %workflow_id, ?input, "dispatching tagged workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -81,13 +81,19 @@ where

db_from_ctx(ctx)
.await?
.dispatch_workflow(ctx.ray_id(), id, &name, Some(tags), input_val)
.dispatch_workflow(
ctx.ray_id(),
workflow_id,
&workflow_name,
Some(tags),
input_val,
)
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(%workflow_name, ?workflow_id, "workflow tagged dispatched");

Ok(id)
Ok(workflow_id)
}

/// Wait for a given workflow to complete.
Expand All @@ -96,7 +102,7 @@ pub async fn wait_for_workflow<W: Workflow, B: Debug + Clone>(
ctx: &rivet_operation::OperationContext<B>,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
tracing::info!(sub_workflow_name=W::NAME, sub_workflow_id=?workflow_id, "waiting for workflow");

tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
Expand Down Expand Up @@ -161,7 +167,7 @@ pub async fn signal<I: Signal + Serialize, B: Debug + Clone>(

let signal_id = Uuid::new_v4();

tracing::info!(name=%I::NAME, %workflow_id, %signal_id, "dispatching signal");
tracing::info!(signal_name=%I::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -188,7 +194,7 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(

let signal_id = Uuid::new_v4();

tracing::info!(name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");
tracing::info!(signal_name=%I::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl ActivityCtx {
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
tracing::info!(?input, "operation call");
tracing::info!(activity_name=%self.name, ?input, "operation call");

let ctx = OperationCtx::new(
self.db.clone(),
Expand All @@ -87,7 +87,7 @@ impl ActivityCtx {
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw);

tracing::info!(?res, "operation response");
tracing::info!(activity_name=%self.name, ?res, "operation response");

res
}
Expand Down
14 changes: 7 additions & 7 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl ApiCtx {
let name = I::Workflow::NAME;
let id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");
tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -91,7 +91,7 @@ impl ApiCtx {
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");

Ok(id)
}
Expand All @@ -108,7 +108,7 @@ impl ApiCtx {
let name = I::Workflow::NAME;
let id = Uuid::new_v4();

tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");
tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -120,7 +120,7 @@ impl ApiCtx {
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(workflow_name=%name, workflow_id=%id, "tagged workflow dispatched");

Ok(id)
}
Expand All @@ -131,7 +131,7 @@ impl ApiCtx {
&self,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow");

tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
Expand Down Expand Up @@ -188,7 +188,7 @@ impl ApiCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
tracing::info!(signal_name=%T::NAME, to_workflow_id=%workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -210,7 +210,7 @@ impl ApiCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ impl OperationCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -119,7 +119,7 @@ impl OperationCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
14 changes: 7 additions & 7 deletions lib/chirp-workflow/core/src/ctx/standalone.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl StandaloneCtx {
let name = I::Workflow::NAME;
let id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");
tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -90,7 +90,7 @@ impl StandaloneCtx {
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");

Ok(id)
}
Expand All @@ -107,7 +107,7 @@ impl StandaloneCtx {
let name = I::Workflow::NAME;
let id = Uuid::new_v4();

tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");
tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -119,7 +119,7 @@ impl StandaloneCtx {
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");

Ok(id)
}
Expand All @@ -130,7 +130,7 @@ impl StandaloneCtx {
&self,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
tracing::info!(workflow_name=%W::NAME, id=?workflow_id, "waiting for workflow");

tokio::time::timeout(WORKFLOW_TIMEOUT, async move {
let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
Expand Down Expand Up @@ -187,7 +187,7 @@ impl StandaloneCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -209,7 +209,7 @@ impl StandaloneCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
14 changes: 7 additions & 7 deletions lib/chirp-workflow/core/src/ctx/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl TestCtx {
let name = I::Workflow::NAME;
let id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");
tracing::info!(workflow_name=%name, workflow_id=%id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -106,7 +106,7 @@ impl TestCtx {
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");

Ok(id)
}
Expand All @@ -123,7 +123,7 @@ impl TestCtx {
let name = I::Workflow::NAME;
let id = Uuid::new_v4();

tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");
tracing::info!(workflow_name=%name, workflow_id=%id, ?tags, ?input, "dispatching tagged workflow");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -135,7 +135,7 @@ impl TestCtx {
.await
.map_err(GlobalError::raw)?;

tracing::info!(%name, ?id, "workflow dispatched");
tracing::info!(workflow_name=%name, workflow_id=%id, "workflow dispatched");

Ok(id)
}
Expand All @@ -144,7 +144,7 @@ impl TestCtx {
&self,
workflow_id: Uuid,
) -> GlobalResult<W::Output> {
tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow");
tracing::info!(workflow_name=%W::NAME, %workflow_id, "waiting for workflow");

let mut interval = tokio::time::interval(SUB_WORKFLOW_RETRY);
loop {
Expand Down Expand Up @@ -198,7 +198,7 @@ impl TestCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");
tracing::info!(signal_name=%T::NAME, %workflow_id, %signal_id, "dispatching signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand All @@ -220,7 +220,7 @@ impl TestCtx {
) -> GlobalResult<Uuid> {
let signal_id = Uuid::new_v4();

tracing::info!(name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");
tracing::info!(signal_name=%T::NAME, ?tags, %signal_id, "dispatching tagged signal");

// Serialize input
let input_val = serde_json::to_value(input)
Expand Down
Loading