Skip to content

Commit

Permalink
feat(clusters): convert clusters to new workflow system
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jul 9, 2024
1 parent 64b4054 commit 8655e83
Show file tree
Hide file tree
Showing 233 changed files with 3,336 additions and 3,141 deletions.
5 changes: 5 additions & 0 deletions docs/libraries/workflow/DESIGN.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Design

## Hierarchy

TODO
36 changes: 34 additions & 2 deletions docs/libraries/workflow/GLOSSARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ A collection of registered workflows. This is solely used for the worker to fetc
A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or
sub workflow triggers.

Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should
be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow.
Workflows can be though of as an outline or a list of tasks. The code defining a workflow only specifies what
items should be ran; There is no complex logic (e.g. database queries) running within the top level of the
workflow.

Upon an activity failure, workflow code can be reran without duplicate side effects because activities are
cached and re-read after they succeed.
Expand All @@ -27,6 +28,11 @@ A block of code that can fail. This cannot trigger other workflows or activities
Activities are retried by workflows when they fail or replayed when they succeed but a later part of the
workflow fails.

When choosing between a workflow and an activity:

- Choose a workflow when there are multiple steps that need to be individually retried upon failure.
- Choose an activity when there is only one chunk of retryable code that needs to be executed.

## Operation

Effectively a native rust function. Can fail or not fail. Used for widely used operations like fetching a
Expand All @@ -51,6 +57,10 @@ this signal for it to be picked up, otherwise it will stay in the database indef
workflow. Signals do not have a response; another signal must be sent back from the workflow and listened to
by the sender.

### Differences between message

Signals are effectively just messages that can only be consumed by workflows.

## Tagged Signal

Same as a signal except it is sent with a JSON blob as its "tags" instead of to a specific workflow. Any
Expand All @@ -65,6 +75,28 @@ See [the signals document](./SIGNALS.md).
A "one of" for signal listening. Allows for listening to multiple signals at once and receiving the first one
that gets sent.

## Message

A payload that can be sent out of a workflow. Includes a JSON blob for tags which can be subscribed to with a
subscription.

### Differences between signal

Messages are effectively just signals that can be only consumed by non workflows.

## Subscription

An entity that waits for messages with the same (not a superset/subset) tags as itself. Upon receiving a
message, the message will be returned and the developer can choose to continue to listen for more messages.

## Tail

Reads the last message without waiting. If none exists (all previous messages expired), `None` is returned.

## Tail w/ Anchor

Reads the earliest message after the given anchor timestamp or waits for one to be published if none exist.

## Workflow Event

An action that gets executed in a workflow. An event can be a:
Expand Down
15 changes: 15 additions & 0 deletions docs/libraries/workflow/GOTCHAS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Gotchas

## Timestamps

Use timestamps with care when passing them between activity inputs/outputs. Because activity inputs need to be
consistent for replays, use `util::timestamp::now()` only within activities and not workflow bodies.

If you need a timestamp in a workflow body, use `ctx.create_ts()` for the creation of the workflow. using
`ctx.ts()` is inconsistent also because it marks the current workflow run (different between replays).

If you need a consistent current timestamp, create a new activity that just returns `util::timestamp::now()`.
This will be the current timestamp on the first execution of the activity and wont change on replay.

> **When an activity's input doesn't produce the same hash as the first time it was executed (i.e. its input
> changed), the entire workflow will error with "History Diverged" and will not restart.**
7 changes: 0 additions & 7 deletions docs/libraries/workflow/SIGNALS.md

This file was deleted.

29 changes: 29 additions & 0 deletions docs/libraries/workflow/SIGNALS_AND_MESSAGES.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Signals

## Tagged signals

Tagged signals are consumed on a first-come-first-serve basis because a single signal being consumed by more
than one workflow is not a supported design pattern. To work around this, consume the signal by a workflow
then publish multiple signals from that workflow.

# Choosing Between Signals and Messages

> **Note**: non-workflow ecosystem is API layer, standalone, operations, old workers
## Signal

- Sending data from the non-workflow ecosystem to the workflow ecosystem
- Sending data from the workflow ecosystem to somewhere else in the workflow ecosystem

## Message

- Sending data from the workflow ecosystem to the non-workflow ecosystem

## Both Signals and Messages

Sometimes you may need to listen for a particular event in the workflow system and the non-workflow ecosystem.
In this case you can publish both a signal and a message (you can derive `signal` and `message` on the same
struct to make this easier).

Both messages and signals are meant to be payloads with a specific recipient. They are not meant to be
published without an intended target (i.e. any listener can consume).
6 changes: 0 additions & 6 deletions lib/bolt/core/src/tasks/gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,6 @@ async fn generate_root(path: &Path) {
}
}
}

// Utils lib
let util_path = pkg.path().join("util");
if fs::metadata(&util_path).await.is_ok() {
set_license(&util_path.join("Cargo.toml")).await;
}
}
}

Expand Down
4 changes: 4 additions & 0 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ impl ActivityCtx {
self.name
}

pub fn workflow_id(&self) -> Uuid {
self.workflow_id
}

pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}
Expand Down
1 change: 1 addition & 0 deletions lib/chirp-workflow/core/src/ctx/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use uuid::Uuid;

use crate::{DatabaseHandle, Operation, OperationInput, WorkflowError};

#[derive(Clone)]
pub struct OperationCtx {
ray_id: Uuid,
name: &'static str,
Expand Down
29 changes: 25 additions & 4 deletions lib/chirp-workflow/core/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ use crate::{
activity::ActivityId,
event::Event,
util::{self, Location},
Activity, ActivityCtx, ActivityInput, DatabaseHandle, Executable, Listen, PulledWorkflow,
executable::{closure, Executable, AsyncResult},
Activity, ActivityCtx, ActivityInput, DatabaseHandle, Listen, PulledWorkflow,
RegistryHandle, Signal, SignalRow, Workflow, WorkflowError, WorkflowInput, WorkflowResult,
};

Expand All @@ -28,12 +29,13 @@ const DB_ACTION_RETRY: Duration = Duration::from_millis(150);
// Most db action retries
const MAX_DB_ACTION_RETRIES: usize = 5;

// TODO: Use generics to store input instead of a string
// TODO: Use generics to store input instead of a json value
// NOTE: Clonable because of inner arcs
#[derive(Clone)]
pub struct WorkflowCtx {
pub workflow_id: Uuid,
workflow_id: Uuid,
/// Name of the workflow to run in the registry.
pub name: String,
name: String,
create_ts: i64,
ts: i64,
ray_id: Uuid,
Expand Down Expand Up @@ -663,6 +665,17 @@ impl WorkflowCtx {
exec.execute(self).await
}

/// Spawns a new thread to execute workflow steps in.
pub fn spawn<F, T: Send + 'static>(&mut self, f: F) -> tokio::task::JoinHandle<GlobalResult<T>>
where
F: for<'a> FnOnce(&'a mut WorkflowCtx) -> AsyncResult<'a, T> + Send + 'static
{
let mut ctx = self.clone();
tokio::task::spawn(async move {
closure(f).execute(&mut ctx).await
})
}

/// Sends a signal.
pub async fn signal<T: Signal + Serialize>(
&mut self,
Expand Down Expand Up @@ -789,6 +802,14 @@ impl WorkflowCtx {
}

impl WorkflowCtx {
pub fn name(&self) -> &str {
&self.name
}

pub fn workflow_id(&self) -> Uuid {
self.workflow_id
}

/// Timestamp at which this workflow run started.
pub fn ts(&self) -> i64 {
self.ts
Expand Down
6 changes: 3 additions & 3 deletions lib/chirp-workflow/core/src/executable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@ pub trait Executable: Send {
async fn execute(self, ctx: &mut WorkflowCtx) -> GlobalResult<Self::Output>;
}

type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = GlobalResult<T>> + Send + 'a>>;
pub type AsyncResult<'a, T> = Pin<Box<dyn Future<Output = GlobalResult<T>> + Send + 'a>>;

// Closure executuable impl
// Closure executable impl
#[async_trait]
impl<F, T> Executable for F
where
Expand Down Expand Up @@ -76,7 +76,7 @@ struct TupleHelper<T: Executable> {

// Must wrap all closured being used as executables in this function due to
// https://github.com/rust-lang/rust/issues/70263
pub fn closure<F, T>(f: F) -> F
pub fn closure<F, T: Send>(f: F) -> F
where
F: for<'a> FnOnce(&'a mut WorkflowCtx) -> AsyncResult<'a, T> + Send,
{
Expand Down
4 changes: 2 additions & 2 deletions lib/chirp-workflow/core/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,14 @@ pub trait Listen: Sized {
/// ````
#[macro_export]
macro_rules! join_signal {
(pub $join:ident, [$($signals:ident),*]) => {
(pub $join:ident, [$($signals:ident),* $(,)?]) => {
pub enum $join {
$($signals($signals)),*
}

join_signal!(@ $join, [$($signals),*]);
};
($join:ident, [$($signals:ident),*]) => {
($join:ident, [$($signals:ident),* $(,)?]) => {
enum $join {
$($signals($signals)),*
}
Expand Down
37 changes: 21 additions & 16 deletions lib/chirp-workflow/macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,18 +284,26 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream {

let struct_ident = &item_struct.ident;

// If also a message, don't derive serde traits
let also_message = item_struct.attrs.iter().filter_map(|attr| attr.path().segments.last()).any(|seg| seg.ident == "message");
let serde_derive = if also_message {
quote! {}
} else {
quote!{ #[derive(serde::Serialize, serde::Deserialize)] }
};

let expanded = quote! {
#[derive(serde::Serialize, serde::Deserialize)]
#serde_derive
#item_struct

impl Signal for #struct_ident {
const NAME: &'static str = #name;
}

#[async_trait::async_trait]
impl Listen for #struct_ident {
impl Listen for #struct_ident {
async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[Self::NAME]).await?;
let row = ctx.listen_any(&[<Self as Signal>::NAME]).await?;
Self::parse(&row.signal_name, row.body)
}

Expand All @@ -313,6 +321,14 @@ pub fn message(attr: TokenStream, item: TokenStream) -> TokenStream {
let name = parse_macro_input!(attr as LitStr);
let item_struct = parse_macro_input!(item as ItemStruct);

// If also a signal, don't derive serde traits
let also_signal = item_struct.attrs.iter().filter_map(|attr| attr.path().segments.last()).any(|seg| seg.ident == "signal");
let serde_derive = if also_signal {
quote! {}
} else {
quote!{ #[derive(serde::Serialize, serde::Deserialize)] }
};

let config = match parse_msg_config(&item_struct.attrs) {
Ok(x) => x,
Err(err) => return err.into_compile_error().into(),
Expand All @@ -322,25 +338,14 @@ pub fn message(attr: TokenStream, item: TokenStream) -> TokenStream {
let tail_ttl = config.tail_ttl;

let expanded = quote! {
#[derive(Debug, serde::Serialize, serde::Deserialize)]
#serde_derive
#[derive(Debug)]
#item_struct

impl Message for #struct_ident {
const NAME: &'static str = #name;
const TAIL_TTL: std::time::Duration = std::time::Duration::from_secs(#tail_ttl);
}

#[async_trait::async_trait]
impl Listen for #struct_ident {
async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult<Self> {
let row = ctx.listen_any(&[Self::NAME]).await?;
Self::parse(&row.signal_name, row.body)
}

fn parse(_name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult<Self> {
serde_json::from_value(body).map_err(WorkflowError::DeserializeActivityOutput)
}
}
};

TokenStream::from(expanded)
Expand Down
5 changes: 5 additions & 0 deletions proto/backend/cluster.proto
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,8 @@ message ServerFilter {
bool filter_public_ips = 9;
repeated string public_ips = 10;
}

// Helper proto for writing to sql
message Pools {
repeated rivet.backend.cluster.Pool pools = 1;
}
Loading

0 comments on commit 8655e83

Please sign in to comment.