Skip to content

Commit

Permalink
feat(clusters): convert clusters to new workflow system
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 6, 2024
1 parent 64b4054 commit 834ff6a
Show file tree
Hide file tree
Showing 227 changed files with 2,141 additions and 2,373 deletions.
31 changes: 29 additions & 2 deletions docs/libraries/workflow/GLOSSARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ A collection of registered workflows. This is solely used for the worker to fetc
A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or
sub workflow triggers.

Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should
be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow.
Workflows can be though of as an outline or a list of tasks. The code defining a workflow only specifies what
items should be ran; There is no complex logic (e.g. database queries) running within the top level of the
workflow.

Upon an activity failure, workflow code can be reran without duplicate side effects because activities are
cached and re-read after they succeed.
Expand Down Expand Up @@ -51,6 +52,10 @@ this signal for it to be picked up, otherwise it will stay in the database indef
workflow. Signals do not have a response; another signal must be sent back from the workflow and listened to
by the sender.

### Differences between message

Signals are effectively just messages that can only be consumed by workflows.

## Tagged Signal

Same as a signal except it is sent with a JSON blob as its "tags" instead of to a specific workflow. Any
Expand All @@ -65,6 +70,28 @@ See [the signals document](./SIGNALS.md).
A "one of" for signal listening. Allows for listening to multiple signals at once and receiving the first one
that gets sent.

## Message

A payload that can be sent out of a workflow. Includes a JSON blob for tags which can be subscribed to with a
subscription.

### Differences between signal

Messages are effectively just signals that can be only consumed by non workflows.

## Subscription

An entity that waits for messages with the same (not a superset/subset) tags as itself. Upon receiving a
message, the message will be returned and the developer can choose to continue to listen for more messages.

## Tail

Reads the last message without waiting. If none exists (all previous messages expired), `None` is returned.

## Tail w/ Anchor

Reads the earliest message after the given anchor timestamp or waits for one to be published if none exist.

## Workflow Event

An action that gets executed in a workflow. An event can be a:
Expand Down
7 changes: 0 additions & 7 deletions docs/libraries/workflow/SIGNALS.md

This file was deleted.

26 changes: 26 additions & 0 deletions docs/libraries/workflow/SIGNALS_AND_MESSAGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Signals

## Tagged signals

Tagged signals are consumed on a first-come-first-serve basis because a single signal being consumed by more
than one workflow is not a supported design pattern. To work around this, consume the signal by a workflow
then publish multiple signals from that workflow.

# Choosing Between Signals and Messages

> **Note**: non-workflow ecosystem is API layer, standalone, operations, old workers
## Signal

- Sending data from the non-workflow ecosystem to the workflow ecosystem
- Sending data from the workflow ecosystem to somewhere else in the workflow ecosystem

## Message

- Sending data from the workflow ecosystem to the non-workflow ecosystem

## Both Signals and Messages

Sometimes you may need to listen for a particular event in the workflow system and the non-workflow ecosystem.
In this case you can publish both a signal and a message (you can derive `signal` and `message` on the same
struct to make this easier).
6 changes: 0 additions & 6 deletions lib/bolt/core/src/tasks/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ async fn generate_root(path: &Path) {
}
}
}

// Utils lib
let util_path = pkg.path().join("util");
if fs::metadata(&util_path).await.is_ok() {
set_license(&util_path.join("Cargo.toml")).await;
}
}
}

Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use uuid::Uuid;

use crate::{DatabaseHandle, Operation, OperationInput, WorkflowError};

#[derive(Clone)]
pub struct OperationCtx {
ray_id: Uuid,
name: &'static str,
Expand Down
17 changes: 15 additions & 2 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
activity::ActivityId,
event::Event,
util::{self, Location},
Activity, ActivityCtx, ActivityInput, DatabaseHandle, Executable, Listen, PulledWorkflow,
executable::{closure, Executable, AsyncResult},
Activity, ActivityCtx, ActivityInput, DatabaseHandle, Listen, PulledWorkflow,
RegistryHandle, Signal, SignalRow, Workflow, WorkflowError, WorkflowInput, WorkflowResult,
};

Expand All @@ -28,7 +29,8 @@ const DB_ACTION_RETRY: Duration = Duration::from_millis(150);
// Most db action retries
const MAX_DB_ACTION_RETRIES: usize = 5;

// TODO: Use generics to store input instead of a string
// TODO: Use generics to store input instead of a json value
// NOTE: Clonable because of inner arcs
#[derive(Clone)]
pub struct WorkflowCtx {
pub workflow_id: Uuid,
Expand Down Expand Up @@ -663,6 +665,17 @@ impl WorkflowCtx {
exec.execute(self).await
}

/// Spawns a new thread to execute workflow steps in.
pub fn spawn<F, T: Send + 'static>(&mut self, f: F) -> tokio::task::JoinHandle<GlobalResult<T>>
where
F: for<'a> FnOnce(&'a mut WorkflowCtx) -> AsyncResult<'a, T> + Send + 'static
{
let mut ctx = self.clone();
tokio::task::spawn(async move {
closure(f).execute(&mut ctx).await
})
}

/// Sends a signal.
pub async fn signal<T: Signal + Serialize>(
&mut self,
Expand Down
6 changes: 3 additions & 3 deletions lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pub trait Executable: Send {
async fn execute(self, ctx: &mut WorkflowCtx) -> GlobalResult<Self::Output>;
}

type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = GlobalResult<T>> + Send + 'a>>;
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = GlobalResult<T>> + Send + 'a>>;

// Closure executuable impl
// Closure executable impl
#[async_trait]
impl<F, T> Executable for F
where
Expand Down Expand Up @@ -76,7 +76,7 @@ struct TupleHelper<T: Executable> {

// Must wrap all closured being used as executables in this function due to
// https://github.com/rust-lang/rust/issues/70263
pub fn closure<F, T>(f: F) -> F
pub fn closure<F, T: Send>(f: F) -> F
where
F: for<'a> FnOnce(&'a mut WorkflowCtx) -> AsyncResult<'a, T> + Send,
{
Expand Down
35 changes: 20 additions & 15 deletions lib/chirp-workflow/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,8 +284,16 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream {

let struct_ident = &item_struct.ident;

// If also a message, don't derive serde traits
let also_message = item_struct.attrs.iter().filter_map(|attr| attr.path().segments.last()).any(|seg| seg.ident == "message");
let serde_derive = if also_message {
quote! {}
} else {
quote!{ #[derive(serde::Serialize, serde::Deserialize)] }
};

let expanded = quote! {
#[derive(serde::Serialize, serde::Deserialize)]
#serde_derive
#item_struct

impl Signal for #struct_ident {
Expand All @@ -295,7 +303,7 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream {
#[async_trait::async_trait]
impl Listen for #struct_ident {
async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[Self::NAME]).await?;
let row = ctx.listen_any(&[<Self as Signal>::NAME]).await?;
Self::parse(&row.signal_name, row.body)
}

Expand All @@ -313,6 +321,14 @@ pub fn message(attr: TokenStream, item: TokenStream) -> TokenStream {
let name = parse_macro_input!(attr as LitStr);
let item_struct = parse_macro_input!(item as ItemStruct);

// If also a signal, don't derive serde traits
let also_signal = item_struct.attrs.iter().filter_map(|attr| attr.path().segments.last()).any(|seg| seg.ident == "signal");
let serde_derive = if also_signal {
quote! {}
} else {
quote!{ #[derive(serde::Serialize, serde::Deserialize)] }
};

let config = match parse_msg_config(&item_struct.attrs) {
Ok(x) => x,
Err(err) => return err.into_compile_error().into(),
Expand All @@ -322,25 +338,14 @@ pub fn message(attr: TokenStream, item: TokenStream) -> TokenStream {
let tail_ttl = config.tail_ttl;

let expanded = quote! {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#serde_derive
#[derive(Debug)]
#item_struct

impl Message for #struct_ident {
const NAME: &'static str = #name;
const TAIL_TTL: std::time::Duration = std::time::Duration::from_secs(#tail_ttl);
}

#[async_trait::async_trait]
impl Listen for #struct_ident {
async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[Self::NAME]).await?;
Self::parse(&row.signal_name, row.body)
}

fn parse(_name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult<Self> {
serde_json::from_value(body).map_err(WorkflowError::DeserializeActivityOutput)
}
}
};

TokenStream::from(expanded)
Expand Down
5 changes: 5 additions & 0 deletions proto/backend/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@ message ServerFilter {
bool filter_public_ips = 9;
repeated string public_ips = 10;
}

// Helper proto for writing to sql
message Pools {
repeated rivet.backend.cluster.Pool pools = 1;
}
Loading

0 comments on commit 834ff6a

Please sign in to comment.