Skip to content

Commit

Permalink
feat: add ray ids to workflows, clean up types
Browse files Browse the repository at this point in the history
  • Loading branch information
MasterPtato committed Jun 4, 2024
1 parent 56f9056 commit a923a66
Show file tree
Hide file tree
Showing 20 changed files with 507 additions and 256 deletions.
49 changes: 29 additions & 20 deletions docs/libraries/workflow/GLOSSARY.md
Original file line number Diff line number Diff line change
@@ -1,44 +1,57 @@
TODO

# Glossary

## Worker

A process that's running workflows.
A process that queries for pending workflows with a specific filter. Filter is based on which workflows are registered in the given worker's registry.
The queried workflows are run on the same machine as the worker but given their own thread.

## Registry

There are usually multiple workers running at the same time.
A collection of registered workflows. This is solely used for the worker to fetch workflows from the database.

## Workflow

A series of activies to be ran together.
A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or sub workflow triggers.

Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow.

Upon an activity failure, workflow code can be reran without duplicate side effects because activities are cached and re-read after they succeed.

## Activity

The code defining a workflow only specifies what activites to be ran. There is no complex logic (e.g. database queries) running within workflows.
A block of code that can fail. This cannot trigger other workflows or activities, but it can call operations.
Activities are retried by workflows when they fail or replayed when they succeed but a later part of the
workflow fails.

Workflow code can be reran multiple times to replay a workflow.
## Operation

## Workflow State
Effectively a native rust function. Can fail or not fail, used simply for tidiness (as you would with any other function).
Operations can only be called from activities, not from workflows.

Persistated data about a workflow.
Examples include:

## Workflow Run
- most `get` operations (`user-get`)
- any complex logic you'd want in it's own function (fetching some http data and parsing it)

An instance of a node running a workflow. If re-running a workflow, it will be replaying events.
Operations are not required; all of their functionality can be put into an activity instead.

## Workflow Event

An action that gets executed in a workflow. An event can be a:

- Activity
- Received signal
- Dispatched sub-workflow

Events store the output from activities and are used to ensure activites are ran only once.
Events store the output from activities and are used to ensure activities are ran only once.

## Workflow Event History

List of events that have executed in this workflow. These are used in replays to verify that the workflow has not changed to an invalid state.

## Workflow Replay

After the first run of a workflow, all runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will be skipped in the replay and use the output from the previous run.
After the first run of a workflow, subsequent runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will not actually run any code and instead use the output from the previous run.

## Workflow Wake Condition

Expand All @@ -47,10 +60,6 @@ If a workflow is not currently running an activity, wake conditions define when
The available conditions are:

- **Immediately** Run immediately by the first available node
- **Deadline** Run at a given timesetamp.

## Activity

A unit of code to run within a workflow.

Activities can fail and will be retried accoriding to the retry policy of the workflow.
- **Deadline** Run at a given timestamp.
- **Signal** Run once any one of the listed signals is received.
- **Sub workflow** Run once the given sub workflow is completed.
64 changes: 64 additions & 0 deletions docs/libraries/workflow/OVERVIEW.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Overview

Workflows are designed to provide highly durable code executions for distributed systems. The main goal is to allow for writing easy to understand multi-step programs with effective error handling, retryability, and a rigid state.

## Goals

**Primary**

- Performance
- Quick iteration speed
- Architectural simplicity (only depends on CockroachDB)

**Secondary**

- Easy to operate, managable via simple SQL queries
- Easier to write, understand, and maintain than event-driven architectures
- Rust-native
- Run in-process and as part of the binary to simplify architecture
- Leverage traits to reduce copies and needless ser/de
- ## Use native serde instead of Protobuf for simplicity (**this comes at the cost of verifiable backwards compatibility with protobuf**)

## Use cases

- Billing cron jobs with batch
- Creating servers
- Email loops
- Creating dynamic servers
- Automating Cloudflare APIs (Cloudflare workers, DNS, issuing SSL certs)

## Relation to existing Chirp primitives

### Messages

Workflows replace the use case of messages for durable execution, which is almost all uses of messages.

The biggest pain point with messages is the lack of a rigid state. Message executions always match the following outline:

1. Read whatever data is required
2. Perform some action(s)
3. Update data as needed
4. Finish (possibly publish more messages) OR upon failure, start all over at #1

The issue with this is that messages do not have any knowledge of messages that came before them, their own previous failed executions, or even other messages of the same system executing in parallel. Without thorough manually written sync checks and consistency validations (which are verbose and hard to follow), this type of execution often results in an overall broken state of whatever system the message is acting on (i.e. matchmaking, server provisioning).

**Once a broken state is reached, the retry system for messages _practically never_ successfully retries the message.**

### Cross-package hooks

We currently use messages for hooking in to events from other workflows so we don't have to bake in support directly.

This is potentially error prone since it makes control flow more opaque.

We will use sub workflows instead.

## Post-workflow message uses

Messages should still be used, but much less frequently. They're helpful for:

- Real-time Data Processing
- Complex Event Processing (CEP)
- Data Transformation and Enrichment
- Continuous Data Integration
- Real-time Monitoring and Alerting
- High-throughput, Low-latency Processing
98 changes: 0 additions & 98 deletions docs/libraries/workflow/WORKFLOW.md

This file was deleted.

2 changes: 1 addition & 1 deletion lib/chirp-workflow/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ rivet-runtime = { path = "../../runtime" }
rivet-util = { path = "../../util/core" }
serde = { version = "1.0.198", features = ["derive"] }
serde_json = "1.0.116"
sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "ipnetwork"] }
sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "json", "ipnetwork"] }
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["full"] }
tracing = "0.1.40"
Expand Down
81 changes: 42 additions & 39 deletions lib/chirp-workflow/core/src/ctx/activity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@ use global_error::{GlobalError, GlobalResult};
use rivet_pools::prelude::*;
use uuid::Uuid;

use crate::{
ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError,
};
use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError};

pub struct ActivityCtx {
db: DatabaseHandle,
conn: rivet_connection::Connection,
workflow_id: Uuid,
ray_id: Uuid,
name: &'static str,
ts: i64,

db: DatabaseHandle,

conn: rivet_connection::Connection,

// Backwards compatibility
op_ctx: rivet_operation::OperationContext<()>,
Expand All @@ -19,28 +21,22 @@ pub struct ActivityCtx {
impl ActivityCtx {
pub fn new(
db: DatabaseHandle,
conn: rivet_connection::Connection,
conn: &rivet_connection::Connection,
workflow_id: Uuid,
workflow_create_ts: i64,
ray_id: Uuid,
name: &'static str,
) -> Self {
let op_ctx = rivet_operation::OperationContext::new(
name.to_string(),
std::time::Duration::from_secs(60),
conn.clone(),
workflow_id,
// TODO: ray_id
Uuid::new_v4(),
rivet_util::timestamp::now(),
// TODO: req_ts
rivet_util::timestamp::now(),
(),
);
let ts = rivet_util::timestamp::now();
let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, name, ts);

ActivityCtx {
db,
conn,
workflow_id,
ray_id,
name,
ts,
db,
conn,
op_ctx,
}
}
Expand All @@ -55,7 +51,14 @@ impl ActivityCtx {
I: OperationInput,
<I as OperationInput>::Operation: Operation<Input = I>,
{
let mut ctx = OperationCtx::new(self.db.clone(), self.workflow_id);
let mut ctx = OperationCtx::new(
self.db.clone(),
&self.conn,
self.workflow_id,
self.ray_id,
self.op_ctx.req_ts(),
I::Operation::name(),
);

I::Operation::run(&mut ctx, &input)
.await
Expand All @@ -71,28 +74,28 @@ impl ActivityCtx {
// self.timeout
// }

// pub fn req_id(&self) -> Uuid {
// self.req_id
// }
pub fn req_id(&self) -> Uuid {
self.op_ctx.req_id()
}

// pub fn ray_id(&self) -> Uuid {
// self.ray_id
// }
pub fn ray_id(&self) -> Uuid {
self.ray_id
}

// /// Timestamp at which the request started.
// pub fn ts(&self) -> i64 {
// self.ts
// }
/// Timestamp at which the request started.
pub fn ts(&self) -> i64 {
self.ts
}

// /// Timestamp at which the request was published.
// pub fn req_ts(&self) -> i64 {
// self.req_ts
// }
/// Timestamp at which the request was published.
pub fn req_ts(&self) -> i64 {
self.op_ctx.req_ts()
}

// /// Time between when the timestamp was processed and when it was published.
// pub fn req_dt(&self) -> i64 {
// self.ts.saturating_sub(self.req_ts)
// }
/// Time between when the timestamp was processed and when it was published.
pub fn req_dt(&self) -> i64 {
self.ts.saturating_sub(self.op_ctx.req_ts())
}

// pub fn perf(&self) -> &chirp_perf::PerfCtx {
// self.conn.perf()
Expand Down
Loading

0 comments on commit a923a66

Please sign in to comment.