Skip to content

Commit

Permalink
fix(workflows): fix listening traits
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 13, 2024
1 parent 13e1332 commit 3e0c528
Show file tree
Hide file tree
Showing 19 changed files with 365 additions and 119 deletions.
90 changes: 90 additions & 0 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,93 @@ This will be the current timestamp on the first execution of the activity and wo
## Randomly generated content

Randomly generated content like UUIDs should be placed in activities for consistent history.

## Stale data

When fetching data for use in a workflow, you will most often put it in an activity for retryability. However,
depending on how much later the data from the activity is used, it may become stale. Make sure to add another
activity where needed when you need more up-to-date info.

## `WorkflowCtx::spawn`

`WorkflowCtx::spawn` allows you to run workflow steps in a different thread and returns its join handle. Be
**very careful** when using it because it is the developers responsibility to make sure it's result is handled
correctly. If a spawn thread errors but its result is not handled, the main thread may continue as though no
error occurred. This will result in a corrupt workflow state and a divergent history.

Also see [Consistency with concurrency](#consistency-with-concurrency).

## Consistency with concurrency

When you need to run multiple workflow events (like activities or signals) in parallel, be careful that you
ensure the state of the context is consistent between replays.

Take this example trying to concurrently run multiple activities:

```rust
let iter = actions.into_iter().map(|action| {
let ctx = ctx.clone();

async move {
ctx.activity(MyActivityInput {
action,
}).await?;
}
.boxed()
});

futures_util::stream::iter(iter)
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
```

This will error because of the `ctx.clone()`; each activity has the same internal location because none of the
ctx's know about each other\*.

Instead, you can increment the location preemptively with `ctx.step()`:

```rust
let iter = actions.into_iter().map(|action| {
let ctx = ctx.step();

async move {
ctx.activity(MyActivityInput {
action,
}).await?;
}
.boxed()
});

futures_util::stream::iter(iter)
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
```

If you plan on running more than one workflow step in each future, use a branch instead:

```rust
let iter = actions.into_iter().map(|action| {
let ctx = ctx.branch();

async move {
ctx.activity(MyActivityInput {
action,
}).await?;
}
.boxed()
});

futures_util::stream::iter(iter)
.buffer_unordered(16)
.try_collect::<Vec<_>>()
.await?;
```

Note that the first example would also work with a branch, but its a bit overkill as it creates a new layer in
the internal location.

> **\*** Even if they did know about each other via atomics, there is no guarantee of consistency from
> `buffer_unordered`. Preemptively incrementing the location ensures consistency regardless of the order or
> completion time of the futures.
10 changes: 8 additions & 2 deletions lib/bolt/core/src/tasks/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,6 @@ struct SshKey {
id: u64,
}

// TODO: This only deletes linodes and firewalls, the ssh key still remains
async fn cleanup_servers(ctx: &ProjectContext) -> Result<()> {
if ctx.ns().rivet.provisioning.is_none() {
return Ok(());
Expand All @@ -584,6 +583,7 @@ async fn cleanup_servers(ctx: &ProjectContext) -> Result<()> {
rivet_term::status::progress("Cleaning up servers", "");

let ns = ctx.ns_id();
let ns_full = format!("rivet-{ns}");

// Create client
let api_token = ctx.read_secret(&["linode", "token"]).await?;
Expand Down Expand Up @@ -642,7 +642,13 @@ async fn cleanup_servers(ctx: &ProjectContext) -> Result<()> {
.data
.into_iter()
// Only delete test objects from this namespace
.filter(|object| object.data.tags.iter().any(|tag| tag == ns))
.filter(|object| {
object
.data
.tags
.iter()
.any(|tag| tag == ns || tag == &ns_full)
})
.map(|object| {
let client = client.clone();
let obj_type = object._type;
Expand Down
21 changes: 13 additions & 8 deletions lib/chirp-workflow/core/src/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@ where
}

let name = I::Workflow::NAME;

tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
Expand Down Expand Up @@ -67,11 +66,10 @@ where
}

let name = I::Workflow::NAME;

tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
Expand Down Expand Up @@ -202,6 +200,7 @@ pub async fn tagged_signal<I: Signal + Serialize, B: Debug + Clone>(
Ok(signal_id)
}

#[tracing::instrument(err, skip_all, fields(operation = I::Operation::NAME))]
pub async fn op<I, B>(
ctx: &rivet_operation::OperationContext<B>,
input: I,
Expand All @@ -211,6 +210,8 @@ where
<I as OperationInput>::Operation: Operation<Input = I>,
B: Debug + Clone,
{
tracing::info!(?input, "operation call");

let ctx = OperationCtx::new(
db_from_ctx(ctx).await?,
ctx.conn(),
Expand All @@ -220,10 +221,14 @@ where
I::Operation::NAME,
);

I::Operation::run(&ctx, &input)
let res = I::Operation::run(&ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
.map_err(GlobalError::raw);

tracing::info!(?res, "operation response");

res
}

pub async fn subscribe<M, B>(
Expand Down
11 changes: 9 additions & 2 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl ActivityCtx {
}

impl ActivityCtx {
#[tracing::instrument(err, skip_all, fields(operation = I::Operation::NAME))]
pub async fn op<I>(
&self,
input: I,
Expand All @@ -64,6 +65,8 @@ impl ActivityCtx {
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
tracing::info!(?input, "operation call");

let ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
Expand All @@ -73,11 +76,15 @@ impl ActivityCtx {
I::Operation::NAME,
);

tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
let res = tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input))
.await
.map_err(|_| WorkflowError::OperationTimeout)?
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
.map_err(GlobalError::raw);

tracing::info!(?res, "operation response");

res
}

pub async fn update_workflow_tags(&self, tags: &serde_json::Value) -> GlobalResult<()> {
Expand Down
21 changes: 13 additions & 8 deletions lib/chirp-workflow/core/src/ctx/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,10 @@ impl ApiCtx {
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let name = I::Workflow::NAME;

tracing::info!(%name, ?input, "dispatching workflow");

let id = Uuid::new_v4();

tracing::info!(%name, %id, ?input, "dispatching workflow");

// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
Expand All @@ -103,11 +102,10 @@ impl ApiCtx {
<I as WorkflowInput>::Workflow: Workflow<Input = I>,
{
let name = I::Workflow::NAME;

tracing::info!(%name, ?tags, ?input, "dispatching tagged workflow");

let id = Uuid::new_v4();

tracing::info!(%name, %id, ?tags, ?input, "dispatching tagged workflow");

// Serialize input
let input_val = serde_json::to_value(input)
.map_err(WorkflowError::SerializeWorkflowOutput)
Expand Down Expand Up @@ -223,6 +221,7 @@ impl ApiCtx {
Ok(signal_id)
}

#[tracing::instrument(err, skip_all, fields(operation = I::Operation::NAME))]
pub async fn op<I>(
&self,
input: I,
Expand All @@ -231,6 +230,8 @@ impl ApiCtx {
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
tracing::info!(?input, "operation call");

let ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
Expand All @@ -240,10 +241,14 @@ impl ApiCtx {
I::Operation::NAME,
);

I::Operation::run(&ctx, &input)
let res = I::Operation::run(&ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
.map_err(GlobalError::raw);

tracing::info!(?res, "operation response");

res
}

pub async fn subscribe<M>(
Expand Down
44 changes: 44 additions & 0 deletions lib/chirp-workflow/core/src/ctx/listen.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
use crate::{
ctx::WorkflowCtx,
db::SignalRow,
error::{WorkflowError, WorkflowResult},
};

/// Indirection struct to prevent invalid implementations of listen traits.
pub struct ListenCtx<'a> {
ctx: &'a mut WorkflowCtx,
}

impl<'a> ListenCtx<'a> {
pub(crate) fn new(ctx: &'a mut WorkflowCtx) -> Self {
ListenCtx { ctx }
}

/// Checks for a signal to this workflow with any of the given signal names.
pub async fn listen_any(&self, signal_names: &[&'static str]) -> WorkflowResult<SignalRow> {
// Fetch new pending signal
let signal = self
.ctx
.db
.pull_next_signal(
self.ctx.workflow_id(),
signal_names,
self.ctx.full_location().as_ref(),
)
.await?;

let Some(signal) = signal else {
return Err(WorkflowError::NoSignalFound(Box::from(signal_names)));
};

tracing::info!(
workflow_name=%self.ctx.name(),
workflow_id=%self.ctx.workflow_id(),
signal_id=%signal.signal_id,
signal_name=%signal.signal_name,
"signal received",
);

Ok(signal)
}
}
2 changes: 2 additions & 0 deletions lib/chirp-workflow/core/src/ctx/mod.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
mod activity;
pub(crate) mod api;
mod listen;
pub mod message;
mod operation;
mod standalone;
mod test;
pub(crate) mod workflow;
pub use activity::ActivityCtx;
pub use api::ApiCtx;
pub use listen::ListenCtx;
pub use message::MessageCtx;
pub use operation::OperationCtx;
pub use standalone::StandaloneCtx;
Expand Down
11 changes: 9 additions & 2 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ impl OperationCtx {
}

impl OperationCtx {
#[tracing::instrument(err, skip_all, fields(operation = I::Operation::NAME))]
pub async fn op<I>(
&self,
input: I,
Expand All @@ -63,6 +64,8 @@ impl OperationCtx {
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
tracing::info!(?input, "operation call");

let ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
Expand All @@ -72,10 +75,14 @@ impl OperationCtx {
I::Operation::NAME,
);

I::Operation::run(&ctx, &input)
let res = I::Operation::run(&ctx, &input)
.await
.map_err(WorkflowError::OperationFailure)
.map_err(GlobalError::raw)
.map_err(GlobalError::raw);

tracing::info!(?res, "operation response");

res
}

pub async fn signal<T: Signal + Serialize>(
Expand Down
Loading

0 comments on commit 3e0c528

Please sign in to comment.