Skip to content

Commit a743b64

Browse files
committed
feat: add workflows
1 parent f3e2823 commit a743b64

File tree

52 files changed

+3798
-46
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+3798
-46
lines changed
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
TODO
2+
3+
# Glossary
4+
5+
## Worker
6+
7+
A process that's running workflows.
8+
9+
There are usually multiple workers running at the same time.
10+
11+
## Workflow
12+
13+
A series of activies to be ran together.
14+
15+
The code defining a workflow only specifies what activites to be ran. There is no complex logic (e.g. database queries) running within workflows.
16+
17+
Workflow code can be reran multiple times to replay a workflow.
18+
19+
## Workflow State
20+
21+
Persistated data about a workflow.
22+
23+
## Workflow Run
24+
25+
An instance of a node running a workflow. If re-running a workflow, it will be replaying events.
26+
27+
## Workflow Event
28+
29+
An action that gets executed in a workflow. An event can be a:
30+
31+
- Activity
32+
33+
Events store the output from activities and are used to ensure activites are ran only once.
34+
35+
## Workflow Event History
36+
37+
List of events that have executed in this workflow. These are used in replays to verify that the workflow has not changed to an invalid state.
38+
39+
## Workflow Replay
40+
41+
After the first run of a workflow, all runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will be skipped in the replay and use the output from the previous run.
42+
43+
## Workflow Wake Condition
44+
45+
If a workflow is not currently running an activity, wake conditions define when the workflow should be ran again.
46+
47+
The available conditions are:
48+
49+
- **Immediately** Run immediately by the first available node
50+
- **Deadline** Run at a given timesetamp.
51+
52+
## Activity
53+
54+
A unit of code to run within a workflow.
55+
56+
Activities can fail and will be retried accoriding to the retry policy of the workflow.
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
## Goals
2+
3+
**Primary**
4+
5+
- Performance
6+
- Fast to write for
7+
- Only depend on CockroachDB
8+
9+
**Secondary**
10+
11+
- Easy to monitor & manage via simple SQL queries
12+
- Easier to understand than messages
13+
- Rust-native
14+
- Run in-process and as part of the binary to simplify architecture
15+
- Leverage traits to reduce copies and needless ser/de
16+
- Use native serde instead of Protobuf for simplicity (**this comes at the cost of verifiable backwards compatability with protobuf**)
17+
- Lay foundations for OpenGB
18+
19+
## Use cases
20+
21+
- Billing cron jobs with batch
22+
- Creating servers
23+
- Email loops
24+
- Creating dynamic servers
25+
- What about dynamic server lifecycle? Is this more of an actor? This is blending between state and other stuff.
26+
- Deploying CF workers
27+
28+
## Questions
29+
30+
- Concurrency
31+
- Nondeterministic patches: https://docs.temporal.io/dev-guide/typescript/versioning#patching
32+
- Do we plan to support side effects?
33+
34+
## Relation to existing Chirp primitives
35+
36+
### Messages
37+
38+
Workflows replace the usecase of messages for durable execution, which is almost all uses of messages.
39+
40+
Messages should still be used, but much less frequently. They're helpful for:
41+
42+
**Real-time Data Processing**
43+
44+
- When you have a continuous flow of data that needs to be processed in real-time or near-real-time.
45+
- Examples include processing sensor data, social media feeds, financial market data, or clickstream data.
46+
- Stream processing frameworks like Apache Kafka, Apache Flink, or Apache Spark Streaming are well-suited for handling high-volume, real-time data streams.
47+
48+
**Complex Event Processing (CEP)**
49+
50+
- When you need to detect and respond to patterns, correlations, or anomalies in real-time data streams.
51+
- CEP involves analyzing and combining multiple event streams to identify meaningful patterns or trigger actions.
52+
- Stream processing frameworks provide capabilities for defining and matching complex event patterns in real-time.
53+
54+
**Data Transformation and Enrichment**
55+
56+
- When you need to transform, enrich, or aggregate data as it arrives in real-time.
57+
- This can involve tasks like data cleansing, normalization, joining with other data sources, or applying machine learning models.
58+
- Stream processing allows you to process and transform data on-the-fly, enabling real-time analytics and insights.
59+
60+
**Continuous Data Integration**
61+
62+
- When you need to continuously integrate and process data from multiple sources in real-time.
63+
- This can involve merging data streams, performing data synchronization, or updating downstream systems.
64+
- Stream processing frameworks provide connectors and integrations with various data sources and sinks.
65+
66+
**Real-time Monitoring and Alerting**
67+
68+
- When you need to monitor data streams in real-time and trigger alerts or notifications based on predefined conditions.
69+
- Stream processing allows you to define rules and thresholds to detect anomalies, errors, or critical events and send real-time alerts.
70+
71+
**High-throughput, Low-latency Processing**
72+
73+
- When you have a high volume of data that needs to be processed with low latency.
74+
- Stream processing frameworks are designed to handle high-throughput data streams and provide low-latency processing capabilities.
75+
- This is particularly useful in scenarios like fraud detection, real-time recommendations, or real-time bidding in advertising systems.
76+
77+
### Cross-package hooks
78+
79+
We currently use messages for hooking in to events from other workflows so we don't have to bake in support directly.
80+
81+
This is potentially error prone since it makes control flow more opaque.
82+
83+
TBD on if we keed this pattern.
84+
85+
### Workflows & operations across packages
86+
87+
**Child workflows**
88+
89+
TODO
90+
91+
**Operations**
92+
93+
TODO
94+
95+
## Temporal docs
96+
97+
https://docs.temporal.io/encyclopedia/
98+

lib/api-helper/build/src/error.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,22 @@ pub fn handle_rejection(
3333
GlobalError::Internal { .. } => {
3434
tracing::error!(?err, "internal error response");
3535

36+
// Replace internal errors with global errors
37+
if std::env::var("RIVET_API_ERROR_VERBOSE")
38+
.ok()
39+
.map_or(false, |x| x == "1")
40+
{
41+
err_code!(ERROR, error = err.to_string())
42+
} else {
43+
err_code!(
44+
ERROR,
45+
error = format!("An internal error has occurred (ray_id {}).", ray_id)
46+
)
47+
}
48+
}
49+
GlobalError::Raw(err) => {
50+
tracing::error!(?err, "internal error response");
51+
3652
// Replace internal errors with global errors
3753
if std::env::var("RIVET_API_ERROR_VERBOSE")
3854
.ok()

lib/bolt/config/src/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub enum ServiceKind {
9898
#[serde(rename = "operation")]
9999
Operation {},
100100

101+
// TODO: Rename to worker
101102
#[serde(rename = "consumer")]
102103
Consumer {
103104
#[serde(default)]

lib/bolt/core/src/context/service.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,7 @@ impl ServiceContextData {
287287

288288
pub fn is_monolith_worker(&self) -> bool {
289289
self.config().service.name == "monolith-worker"
290+
|| self.config().service.name == "monolith-workflow-worker"
290291
}
291292

292293
pub fn depends_on_nomad_api(&self) -> bool {
@@ -349,7 +350,9 @@ impl ServiceContextData {
349350
}
350351

351352
pub fn depends_on_infra(&self) -> bool {
352-
self.name() == "cluster-worker" || self.name() == "monolith-worker"
353+
self.name() == "cluster-worker"
354+
|| self.name() == "monolith-worker"
355+
|| self.name() == "monolith-workflow-worker"
353356
}
354357

355358
pub fn depends_on_cluster_config(&self) -> bool {
@@ -1102,11 +1105,18 @@ impl ServiceContextData {
11021105
let password = project_ctx.read_secret(&["crdb", "password"]).await?;
11031106
let sslmode = "verify-ca";
11041107

1105-
let uri = format!(
1108+
let url = format!(
11061109
"postgres://{}:{}@{crdb_host}/postgres?sslmode={sslmode}",
11071110
username, password,
11081111
);
1109-
env.insert("CRDB_URL".into(), uri);
1112+
env.insert("CRDB_URL".into(), url);
1113+
1114+
// TODO:
1115+
let workflow_url = format!(
1116+
"postgres://{}:{}@{crdb_host}/db_workflow?sslmode={sslmode}",
1117+
username, password,
1118+
);
1119+
env.insert("CRDB_WORKFLOW_URL".into(), workflow_url);
11101120
}
11111121

11121122
// Redis

lib/chirp-workflow/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
[workspace]
2+
members = [
3+
"core",
4+
"macros"
5+
]

lib/chirp-workflow/core/Cargo.toml

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
[package]
2+
name = "chirp-workflow"
3+
version = "0.1.0"
4+
authors = ["Rivet Gaming, LLC <developer@rivet.gg>"]
5+
edition = "2021"
6+
license = "Apache-2.0"
7+
8+
[dependencies]
9+
anyhow = "1.0.82"
10+
async-trait = "0.1.80"
11+
chirp-client = { path = "../../chirp/client" }
12+
chirp-workflow-macros = { path = "../macros" }
13+
formatted-error = { path = "../../formatted-error" }
14+
futures-util = "0.3"
15+
global-error = { path = "../../global-error" }
16+
indoc = "2.0.5"
17+
prost = "0.12.4"
18+
prost-types = "0.12.4"
19+
rand = "0.8.5"
20+
rivet-cache = { path = "../../cache/build" }
21+
rivet-connection = { path = "../../connection" }
22+
rivet-metrics = { path = "../../metrics" }
23+
rivet-operation = { path = "../../operation/core" }
24+
rivet-pools = { path = "../../pools" }
25+
rivet-runtime = { path = "../../runtime" }
26+
rivet-util = { path = "../../util/core" }
27+
serde = { version = "1.0.198", features = ["derive"] }
28+
serde_json = "1.0.116"
29+
sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "ipnetwork"] }
30+
thiserror = "1.0.59"
31+
tokio = { version = "1.37.0", features = ["full"] }
32+
tracing = "0.1.40"
33+
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
34+
uuid = { version = "1.8.0", features = ["v4", "serde"] }
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use std::{fmt::Debug, hash::Hash};
2+
3+
use anyhow::*;
4+
use async_trait::async_trait;
5+
use serde::{de::DeserializeOwned, Serialize};
6+
7+
use crate::ActivityCtx;
8+
9+
#[async_trait]
10+
pub trait Activity {
11+
type Input: ActivityInput;
12+
type Output: Serialize + DeserializeOwned + Debug + Send;
13+
14+
fn name() -> &'static str;
15+
16+
// TODO: Is there any reason for input to be a reference?
17+
async fn run(ctx: &mut ActivityCtx, input: &Self::Input) -> Result<Self::Output>;
18+
}
19+
20+
pub trait ActivityInput: Serialize + DeserializeOwned + Debug + Hash + Send {
21+
type Activity: Activity;
22+
}

0 commit comments

Comments
 (0)