From a923a668558ddb10e13a4439455da41961e1bdee Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 9 May 2024 21:23:45 +0000 Subject: [PATCH] feat: add ray ids to workflows, clean up types --- docs/libraries/workflow/GLOSSARY.md | 49 +++--- docs/libraries/workflow/OVERVIEW.md | 64 ++++++++ docs/libraries/workflow/WORKFLOW.md | 98 ----------- lib/chirp-workflow/core/Cargo.toml | 2 +- lib/chirp-workflow/core/src/ctx/activity.rs | 81 +++++----- lib/chirp-workflow/core/src/ctx/operation.rs | 152 +++++++++++++++++- lib/chirp-workflow/core/src/ctx/test.rs | 19 ++- lib/chirp-workflow/core/src/ctx/workflow.rs | 41 +++-- lib/chirp-workflow/core/src/db/mod.rs | 48 ++++-- lib/chirp-workflow/core/src/db/postgres.rs | 46 ++++-- lib/chirp-workflow/core/src/prelude.rs | 1 + lib/chirp-workflow/core/src/registry.rs | 13 +- lib/chirp-workflow/core/src/schema.rs | 8 +- lib/chirp-workflow/core/src/signal.rs | 10 +- lib/chirp-workflow/core/src/util.rs | 37 +++++ lib/chirp-workflow/core/src/worker.rs | 56 +++++-- lib/chirp-workflow/macros/src/lib.rs | 4 +- lib/chirp/worker/src/manager.rs | 5 +- lib/operation/core/src/lib.rs | 12 +- .../migrations/20240430191643_init.up.sql | 17 +- 20 files changed, 507 insertions(+), 256 deletions(-) create mode 100644 docs/libraries/workflow/OVERVIEW.md delete mode 100644 docs/libraries/workflow/WORKFLOW.md diff --git a/docs/libraries/workflow/GLOSSARY.md b/docs/libraries/workflow/GLOSSARY.md index 1ef9cd8f5..20eaf8498 100644 --- a/docs/libraries/workflow/GLOSSARY.md +++ b/docs/libraries/workflow/GLOSSARY.md @@ -1,36 +1,49 @@ -TODO - # Glossary ## Worker -A process that's running workflows. +A process that queries for pending workflows with a specific filter. Filter is based on which workflows are registered in the given worker's registry. +The queried workflows are run on the same machine as the worker but given their own thread. + +## Registry -There are usually multiple workers running at the same time. +A collection of registered workflows. This is solely used for the worker to fetch workflows from the database. ## Workflow -A series of activies to be ran together. +A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or sub workflow triggers. + +Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow. + +Upon an activity failure, workflow code can be reran without duplicate side effects because activities are cached and re-read after they succeed. + +## Activity -The code defining a workflow only specifies what activites to be ran. There is no complex logic (e.g. database queries) running within workflows. +A block of code that can fail. This cannot trigger other workflows or activities, but it can call operations. +Activities are retried by workflows when they fail or replayed when they succeed but a later part of the +workflow fails. -Workflow code can be reran multiple times to replay a workflow. +## Operation -## Workflow State +Effectively a native rust function. Can fail or not fail, used simply for tidiness (as you would with any other function). +Operations can only be called from activities, not from workflows. -Persistated data about a workflow. +Examples include: -## Workflow Run +- most `get` operations (`user-get`) +- any complex logic you'd want in it's own function (fetching some http data and parsing it) -An instance of a node running a workflow. If re-running a workflow, it will be replaying events. +Operations are not required; all of their functionality can be put into an activity instead. ## Workflow Event An action that gets executed in a workflow. An event can be a: - Activity +- Received signal +- Dispatched sub-workflow -Events store the output from activities and are used to ensure activites are ran only once. +Events store the output from activities and are used to ensure activities are ran only once. ## Workflow Event History @@ -38,7 +51,7 @@ List of events that have executed in this workflow. These are used in replays to ## Workflow Replay -After the first run of a workflow, all runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will be skipped in the replay and use the output from the previous run. +After the first run of a workflow, subsequent runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will not actually run any code and instead use the output from the previous run. ## Workflow Wake Condition @@ -47,10 +60,6 @@ If a workflow is not currently running an activity, wake conditions define when The available conditions are: - **Immediately** Run immediately by the first available node -- **Deadline** Run at a given timesetamp. - -## Activity - -A unit of code to run within a workflow. - -Activities can fail and will be retried accoriding to the retry policy of the workflow. +- **Deadline** Run at a given timestamp. +- **Signal** Run once any one of the listed signals is received. +- **Sub workflow** Run once the given sub workflow is completed. diff --git a/docs/libraries/workflow/OVERVIEW.md b/docs/libraries/workflow/OVERVIEW.md new file mode 100644 index 000000000..e363d8f79 --- /dev/null +++ b/docs/libraries/workflow/OVERVIEW.md @@ -0,0 +1,64 @@ +# Overview + +Workflows are designed to provide highly durable code executions for distributed systems. The main goal is to allow for writing easy to understand multi-step programs with effective error handling, retryability, and a rigid state. + +## Goals + +**Primary** + +- Performance +- Quick iteration speed +- Architectural simplicity (only depends on CockroachDB) + +**Secondary** + +- Easy to operate, managable via simple SQL queries +- Easier to write, understand, and maintain than event-driven architectures +- Rust-native + - Run in-process and as part of the binary to simplify architecture + - Leverage traits to reduce copies and needless ser/de + - ## Use native serde instead of Protobuf for simplicity (**this comes at the cost of verifiable backwards compatibility with protobuf**) + +## Use cases + +- Billing cron jobs with batch +- Creating servers +- Email loops +- Creating dynamic servers +- Automating Cloudflare APIs (Cloudflare workers, DNS, issuing SSL certs) + +## Relation to existing Chirp primitives + +### Messages + +Workflows replace the use case of messages for durable execution, which is almost all uses of messages. + +The biggest pain point with messages is the lack of a rigid state. Message executions always match the following outline: + +1. Read whatever data is required +2. Perform some action(s) +3. Update data as needed +4. Finish (possibly publish more messages) OR upon failure, start all over at #1 + +The issue with this is that messages do not have any knowledge of messages that came before them, their own previous failed executions, or even other messages of the same system executing in parallel. Without thorough manually written sync checks and consistency validations (which are verbose and hard to follow), this type of execution often results in an overall broken state of whatever system the message is acting on (i.e. matchmaking, server provisioning). + +**Once a broken state is reached, the retry system for messages _practically never_ successfully retries the message.** + +### Cross-package hooks + +We currently use messages for hooking in to events from other workflows so we don't have to bake in support directly. + +This is potentially error prone since it makes control flow more opaque. + +We will use sub workflows instead. + +## Post-workflow message uses + +Messages should still be used, but much less frequently. They're helpful for: + +- Real-time Data Processing +- Complex Event Processing (CEP) +- Data Transformation and Enrichment +- Continuous Data Integration +- Real-time Monitoring and Alerting +- High-throughput, Low-latency Processing diff --git a/docs/libraries/workflow/WORKFLOW.md b/docs/libraries/workflow/WORKFLOW.md deleted file mode 100644 index 9878ab8fa..000000000 --- a/docs/libraries/workflow/WORKFLOW.md +++ /dev/null @@ -1,98 +0,0 @@ -## Goals - -**Primary** - -- Performance -- Fast to write for -- Only depend on CockroachDB - -**Secondary** - -- Easy to monitor & manage via simple SQL queries -- Easier to understand than messages -- Rust-native - - Run in-process and as part of the binary to simplify architecture - - Leverage traits to reduce copies and needless ser/de - - Use native serde instead of Protobuf for simplicity (**this comes at the cost of verifiable backwards compatability with protobuf**) -- Lay foundations for OpenGB - -## Use cases - -- Billing cron jobs with batch -- Creating servers -- Email loops -- Creating dynamic servers - - What about dynamic server lifecycle? Is this more of an actor? This is blending between state and other stuff. -- Deploying CF workers - -## Questions - -- Concurrency -- Nondeterministic patches: https://docs.temporal.io/dev-guide/typescript/versioning#patching -- Do we plan to support side effects? - -## Relation to existing Chirp primitives - -### Messages - -Workflows replace the usecase of messages for durable execution, which is almost all uses of messages. - -Messages should still be used, but much less frequently. They're helpful for: - -**Real-time Data Processing** - -- When you have a continuous flow of data that needs to be processed in real-time or near-real-time. -- Examples include processing sensor data, social media feeds, financial market data, or clickstream data. -- Stream processing frameworks like Apache Kafka, Apache Flink, or Apache Spark Streaming are well-suited for handling high-volume, real-time data streams. - -**Complex Event Processing (CEP)** - -- When you need to detect and respond to patterns, correlations, or anomalies in real-time data streams. -- CEP involves analyzing and combining multiple event streams to identify meaningful patterns or trigger actions. -- Stream processing frameworks provide capabilities for defining and matching complex event patterns in real-time. - -**Data Transformation and Enrichment** - -- When you need to transform, enrich, or aggregate data as it arrives in real-time. -- This can involve tasks like data cleansing, normalization, joining with other data sources, or applying machine learning models. -- Stream processing allows you to process and transform data on-the-fly, enabling real-time analytics and insights. - -**Continuous Data Integration** - -- When you need to continuously integrate and process data from multiple sources in real-time. -- This can involve merging data streams, performing data synchronization, or updating downstream systems. -- Stream processing frameworks provide connectors and integrations with various data sources and sinks. - -**Real-time Monitoring and Alerting** - -- When you need to monitor data streams in real-time and trigger alerts or notifications based on predefined conditions. -- Stream processing allows you to define rules and thresholds to detect anomalies, errors, or critical events and send real-time alerts. - -**High-throughput, Low-latency Processing** - -- When you have a high volume of data that needs to be processed with low latency. -- Stream processing frameworks are designed to handle high-throughput data streams and provide low-latency processing capabilities. -- This is particularly useful in scenarios like fraud detection, real-time recommendations, or real-time bidding in advertising systems. - -### Cross-package hooks - -We currently use messages for hooking in to events from other workflows so we don't have to bake in support directly. - -This is potentially error prone since it makes control flow more opaque. - -TBD on if we keed this pattern. - -### Workflows & operations across packages - -**Child workflows** - -TODO - -**Operations** - -TODO - -## Temporal docs - -https://docs.temporal.io/encyclopedia/ - diff --git a/lib/chirp-workflow/core/Cargo.toml b/lib/chirp-workflow/core/Cargo.toml index 9298cd645..e4acfd4c8 100644 --- a/lib/chirp-workflow/core/Cargo.toml +++ b/lib/chirp-workflow/core/Cargo.toml @@ -25,7 +25,7 @@ rivet-runtime = { path = "../../runtime" } rivet-util = { path = "../../util/core" } serde = { version = "1.0.198", features = ["derive"] } serde_json = "1.0.116" -sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "ipnetwork"] } +sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "json", "ipnetwork"] } thiserror = "1.0.59" tokio = { version = "1.37.0", features = ["full"] } tracing = "0.1.40" diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index a504afc6d..cc5af986c 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -2,15 +2,17 @@ use global_error::{GlobalError, GlobalResult}; use rivet_pools::prelude::*; use uuid::Uuid; -use crate::{ - ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError, -}; +use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError}; pub struct ActivityCtx { - db: DatabaseHandle, - conn: rivet_connection::Connection, workflow_id: Uuid, + ray_id: Uuid, name: &'static str, + ts: i64, + + db: DatabaseHandle, + + conn: rivet_connection::Connection, // Backwards compatibility op_ctx: rivet_operation::OperationContext<()>, @@ -19,28 +21,22 @@ pub struct ActivityCtx { impl ActivityCtx { pub fn new( db: DatabaseHandle, - conn: rivet_connection::Connection, + conn: &rivet_connection::Connection, workflow_id: Uuid, + workflow_create_ts: i64, + ray_id: Uuid, name: &'static str, ) -> Self { - let op_ctx = rivet_operation::OperationContext::new( - name.to_string(), - std::time::Duration::from_secs(60), - conn.clone(), - workflow_id, - // TODO: ray_id - Uuid::new_v4(), - rivet_util::timestamp::now(), - // TODO: req_ts - rivet_util::timestamp::now(), - (), - ); + let ts = rivet_util::timestamp::now(); + let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, name, ts); ActivityCtx { - db, - conn, workflow_id, + ray_id, name, + ts, + db, + conn, op_ctx, } } @@ -55,7 +51,14 @@ impl ActivityCtx { I: OperationInput, ::Operation: Operation, { - let mut ctx = OperationCtx::new(self.db.clone(), self.workflow_id); + let mut ctx = OperationCtx::new( + self.db.clone(), + &self.conn, + self.workflow_id, + self.ray_id, + self.op_ctx.req_ts(), + I::Operation::name(), + ); I::Operation::run(&mut ctx, &input) .await @@ -71,28 +74,28 @@ impl ActivityCtx { // self.timeout // } - // pub fn req_id(&self) -> Uuid { - // self.req_id - // } + pub fn req_id(&self) -> Uuid { + self.op_ctx.req_id() + } - // pub fn ray_id(&self) -> Uuid { - // self.ray_id - // } + pub fn ray_id(&self) -> Uuid { + self.ray_id + } - // /// Timestamp at which the request started. - // pub fn ts(&self) -> i64 { - // self.ts - // } + /// Timestamp at which the request started. + pub fn ts(&self) -> i64 { + self.ts + } - // /// Timestamp at which the request was published. - // pub fn req_ts(&self) -> i64 { - // self.req_ts - // } + /// Timestamp at which the request was published. + pub fn req_ts(&self) -> i64 { + self.op_ctx.req_ts() + } - // /// Time between when the timestamp was processed and when it was published. - // pub fn req_dt(&self) -> i64 { - // self.ts.saturating_sub(self.req_ts) - // } + /// Time between when the timestamp was processed and when it was published. + pub fn req_dt(&self) -> i64 { + self.ts.saturating_sub(self.op_ctx.req_ts()) + } // pub fn perf(&self) -> &chirp_perf::PerfCtx { // self.conn.perf() diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index 7429ebd65..44551ccaa 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -1,18 +1,158 @@ +use global_error::{GlobalError, GlobalResult}; +use rivet_pools::prelude::*; use uuid::Uuid; -use crate::DatabaseHandle; +use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError}; pub struct OperationCtx { - pub db: DatabaseHandle, - pub workflow_id: Uuid, + workflow_id: Uuid, + ray_id: Uuid, + name: &'static str, + ts: i64, + + db: DatabaseHandle, + + conn: rivet_connection::Connection, + + // Backwards compatibility + op_ctx: rivet_operation::OperationContext<()>, } impl OperationCtx { - pub fn new(db: DatabaseHandle, workflow_id: Uuid) -> Self { - OperationCtx { workflow_id, db } + pub fn new( + db: DatabaseHandle, + conn: &rivet_connection::Connection, + workflow_id: Uuid, + ray_id: Uuid, + req_ts: i64, + name: &'static str, + ) -> Self { + let ts = rivet_util::timestamp::now(); + let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, name, ts); + + OperationCtx { + workflow_id, + ray_id, + name, + ts, + db, + conn, + op_ctx, + } } } impl OperationCtx { - // TODO: + pub async fn op( + &mut self, + input: I, + ) -> GlobalResult<<::Operation as Operation>::Output> + where + I: OperationInput, + ::Operation: Operation, + { + let mut ctx = OperationCtx::new( + self.db.clone(), + &self.conn, + self.workflow_id, + self.ray_id, + self.op_ctx.req_ts(), + I::Operation::name(), + ); + + I::Operation::run(&mut ctx, &input) + .await + .map_err(WorkflowError::OperationFailure) + .map_err(GlobalError::raw) + } + + pub fn name(&self) -> &str { + self.name + } + + // pub fn timeout(&self) -> Duration { + // self.timeout + // } + + pub fn req_id(&self) -> Uuid { + self.op_ctx.req_id() + } + + pub fn ray_id(&self) -> Uuid { + self.ray_id + } + + /// Timestamp at which the request started. + pub fn ts(&self) -> i64 { + self.ts + } + + /// Timestamp at which the request was published. + pub fn req_ts(&self) -> i64 { + self.op_ctx.req_ts() + } + + /// Time between when the timestamp was processed and when it was published. + pub fn req_dt(&self) -> i64 { + self.ts.saturating_sub(self.op_ctx.req_ts()) + } + + // pub fn perf(&self) -> &chirp_perf::PerfCtx { + // self.conn.perf() + // } + + pub fn trace(&self) -> &[chirp_client::TraceEntry] { + self.conn.trace() + } + + pub fn test(&self) -> bool { + self.trace() + .iter() + .any(|x| x.run_context == chirp_client::RunContext::Test as i32) + } + + pub fn chirp(&self) -> &chirp_client::Client { + self.conn.chirp() + } + + pub fn cache(&self) -> rivet_cache::RequestConfig { + self.conn.cache() + } + + pub fn cache_handle(&self) -> rivet_cache::Cache { + self.conn.cache_handle() + } + + pub async fn crdb(&self) -> Result { + self.conn.crdb().await + } + + pub async fn redis_cache(&self) -> Result { + self.conn.redis_cache().await + } + + pub async fn redis_cdn(&self) -> Result { + self.conn.redis_cdn().await + } + + pub async fn redis_job(&self) -> Result { + self.conn.redis_job().await + } + + pub async fn redis_mm(&self) -> Result { + self.conn.redis_mm().await + } + + pub async fn redis_user_presence(&self) -> Result { + self.conn.redis_user_presence().await + } + + pub async fn clickhouse(&self) -> GlobalResult { + self.conn.clickhouse().await + } + + // Backwards compatibility + pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> { + &self.op_ctx + } } diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index a3ab6d06a..17607e472 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -5,14 +5,14 @@ use serde::Serialize; use tokio::time::Duration; use uuid::Uuid; -use crate::{ - DatabaseHandle, DatabasePostgres, Signal, Workflow, WorkflowError, WorkflowInput, -}; +use crate::{DatabaseHandle, DatabasePostgres, Signal, Workflow, WorkflowError, WorkflowInput}; pub type TestCtxHandle = Arc; pub struct TestCtx { name: String, + ray_id: Uuid, + pub db: DatabaseHandle, } @@ -20,6 +20,7 @@ impl TestCtx { pub fn new(db: DatabaseHandle) -> TestCtxHandle { Arc::new(TestCtx { name: "internal-test".to_string(), + ray_id: Uuid::new_v4(), db, }) } @@ -37,6 +38,7 @@ impl TestCtx { TestCtx { name: service_name, + ray_id: Uuid::new_v4(), db, } } @@ -61,12 +63,12 @@ impl TestCtx { let id = Uuid::new_v4(); // Serialize input - let input_str = serde_json::to_string(&input) + let input_val = serde_json::to_value(input) .map_err(WorkflowError::SerializeWorkflowOutput) .map_err(GlobalError::raw)?; self.db - .dispatch_workflow(id, &name, &input_str) + .dispatch_workflow(self.ray_id, id, &name, input_val) .await .map_err(GlobalError::raw)?; @@ -123,11 +125,12 @@ impl TestCtx { let signal_id = Uuid::new_v4(); // Serialize input - let input_str = - serde_json::to_string(&input).map_err(WorkflowError::SerializeSignalBody).map_err(GlobalError::raw)?; + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; self.db - .publish_signal(workflow_id, signal_id, I::name(), &input_str) + .publish_signal(self.ray_id, workflow_id, signal_id, I::name(), input_val) .await .map_err(GlobalError::raw)?; diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 93fa15b53..7d3e1b934 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -33,6 +33,8 @@ pub struct WorkflowCtx { pub workflow_id: Uuid, /// Name of the workflow to run in the registry. pub name: String, + create_ts: i64, + ray_id: Uuid, registry: RegistryHandle, db: DatabaseHandle, @@ -47,7 +49,7 @@ pub struct WorkflowCtx { /// The reason this type is a hashmap is to allow querying by location. event_history: Arc>>, /// Input data passed to this workflow. - pub(crate) input: Arc, + pub(crate) input: Arc, root_location: Location, location_idx: usize, @@ -63,6 +65,9 @@ impl WorkflowCtx { GlobalResult::Ok(WorkflowCtx { workflow_id: workflow.workflow_id, name: workflow.workflow_name, + create_ts: workflow.create_ts, + + ray_id: workflow.ray_id, registry, db, @@ -89,6 +94,8 @@ impl WorkflowCtx { let branch = WorkflowCtx { workflow_id: self.workflow_id, name: self.name.clone(), + create_ts: self.create_ts, + ray_id: self.ray_id, registry: self.registry.clone(), db: self.db.clone(), @@ -237,8 +244,10 @@ impl WorkflowCtx { ) -> WorkflowResult { let mut ctx = ActivityCtx::new( self.db.clone(), - self.conn.clone(), + &self.conn, self.workflow_id, + self.create_ts, + self.ray_id, A::name(), ); @@ -247,17 +256,17 @@ impl WorkflowCtx { tracing::debug!("activity success"); // Write output - let input_str = - serde_json::to_string(input).map_err(WorkflowError::SerializeActivityInput)?; - let output_str = serde_json::to_string(&output) + let input_val = + serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?; + let output_val = serde_json::to_value(&output) .map_err(WorkflowError::SerializeActivityOutput)?; self.db .commit_workflow_activity_event( self.workflow_id, self.full_location().as_ref(), activity_id, - &input_str, - Some(&output_str), + input_val, + Some(output_val), ) .await?; @@ -267,14 +276,14 @@ impl WorkflowCtx { tracing::debug!(?err, "activity error"); // Write empty output (failed state) - let input_str = - serde_json::to_string(input).map_err(WorkflowError::SerializeActivityInput)?; + let input_val = + serde_json::to_value(input).map_err(WorkflowError::SerializeActivityInput)?; self.db .commit_workflow_activity_event( self.workflow_id, self.full_location().as_ref(), activity_id, - &input_str, + input_val, None, ) .await?; @@ -350,17 +359,18 @@ impl WorkflowCtx { let sub_workflow_id = Uuid::new_v4(); // Serialize input - let input_str = serde_json::to_string(&input) + let input_val = serde_json::to_value(input) .map_err(WorkflowError::SerializeWorkflowOutput) .map_err(GlobalError::raw)?; self.db .dispatch_sub_workflow( + self.ray_id, self.workflow_id, self.full_location().as_ref(), sub_workflow_id, &name, - &input_str, + input_val, ) .await .map_err(GlobalError::raw)?; @@ -490,10 +500,11 @@ impl WorkflowCtx { self.db .publish_signal( + self.ray_id, workflow_id, id, T::name(), - &serde_json::to_string(&body) + serde_json::to_value(&body) .map_err(WorkflowError::SerializeSignalBody) .map_err(GlobalError::raw)?, ) @@ -517,7 +528,7 @@ impl WorkflowCtx { tracing::debug!(id=%self.workflow_id, name=%signal.name, "replaying signal"); - T::parse(&signal.name, &signal.body).map_err(GlobalError::raw)? + T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)? } // Listen for new messages else { @@ -562,7 +573,7 @@ impl WorkflowCtx { return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); }; - Some(T::parse(&signal.name, &signal.body).map_err(GlobalError::raw)?) + Some(T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?) } // Listen for new message else { diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index d2b624d81..102877cf9 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -11,12 +11,22 @@ pub type DatabaseHandle = Arc; #[async_trait::async_trait] pub trait Database: Send { - async fn dispatch_workflow(&self, id: Uuid, name: &str, input: &str) -> WorkflowResult<()>; + async fn dispatch_workflow( + &self, + ray_id: Uuid, + id: Uuid, + name: &str, + input: serde_json::Value, + ) -> WorkflowResult<()>; async fn get_workflow(&self, id: Uuid) -> WorkflowResult>; async fn pull_workflows(&self, filter: &[&str]) -> WorkflowResult>; // When a workflow is completed - async fn commit_workflow(&self, workflow_id: Uuid, output: &str) -> WorkflowResult<()>; + async fn commit_workflow( + &self, + workflow_id: Uuid, + output: &serde_json::Value, + ) -> WorkflowResult<()>; // When a workflow fails async fn fail_workflow( &self, @@ -32,16 +42,17 @@ pub trait Database: Send { workflow_id: Uuid, location: &[usize], activity_id: &ActivityId, - input: &str, - output: Option<&str>, + input: serde_json::Value, + output: Option, ) -> WorkflowResult<()>; async fn publish_signal( &self, + ray_id: Uuid, workflow_id: Uuid, signal_id: Uuid, signal_name: &str, - body: &str, + body: serde_json::Value, ) -> WorkflowResult<()>; async fn pull_latest_signal( &self, @@ -52,26 +63,26 @@ pub trait Database: Send { async fn dispatch_sub_workflow( &self, + ray_id: Uuid, workflow_id: Uuid, location: &[usize], sub_workflow_id: Uuid, sub_workflow_name: &str, - input: &str, + input: serde_json::Value, ) -> WorkflowResult<()>; } #[derive(sqlx::FromRow)] pub struct WorkflowRow { pub workflow_id: Uuid, - pub input: String, - pub output: Option, + pub input: serde_json::Value, + pub output: Option, } impl WorkflowRow { - pub fn parse_output(&self) -> WorkflowResult> { + pub fn parse_output(self) -> WorkflowResult> { self.output - .as_deref() - .map(serde_json::from_str) + .map(serde_json::from_value) .transpose() .map_err(WorkflowError::DeserializeWorkflowOutput) } @@ -81,7 +92,9 @@ impl WorkflowRow { pub struct PulledWorkflowRow { pub workflow_id: Uuid, pub workflow_name: String, - pub input: String, + pub create_ts: i64, + pub ray_id: Uuid, + pub input: serde_json::Value, pub wake_deadline_ts: Option, } @@ -89,8 +102,11 @@ pub struct PulledWorkflowRow { pub struct PulledWorkflow { pub workflow_id: Uuid, pub workflow_name: String, - pub input: String, + pub create_ts: i64, + pub ray_id: Uuid, + pub input: serde_json::Value, pub wake_deadline_ts: Option, + pub activity_events: Vec, pub signal_events: Vec, pub sub_workflow_events: Vec, @@ -102,7 +118,7 @@ pub struct ActivityEventRow { pub location: Vec, pub activity_name: String, pub input_hash: Vec, - pub output: Option, + pub output: Option, } #[derive(sqlx::FromRow)] @@ -110,7 +126,7 @@ pub struct SignalEventRow { pub workflow_id: Uuid, pub location: Vec, pub signal_name: String, - pub body: String, + pub body: serde_json::Value, } #[derive(sqlx::FromRow)] @@ -125,5 +141,5 @@ pub struct SubWorkflowEventRow { pub struct SignalRow { pub signal_id: Uuid, pub signal_name: String, - pub body: String, + pub body: serde_json::Value, } diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 6861caa6b..64c05af7f 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -60,18 +60,23 @@ impl DatabasePostgres { impl Database for DatabasePostgres { async fn dispatch_workflow( &self, + ray_id: Uuid, workflow_id: Uuid, workflow_name: &str, - input: &str, + input: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( " - INSERT INTO db_workflow.workflows (workflow_id, workflow_name, input, wake_immediate) - VALUES ($1, $2, $3, true) + INSERT INTO db_workflow.workflows ( + workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate + ) + VALUES ($1, $2, $3, $4, $5, true) ", )) .bind(workflow_id) .bind(workflow_name) + .bind(rivet_util::timestamp::now()) + .bind(ray_id) .bind(input) .execute(&mut *self.conn().await?) .await @@ -127,7 +132,7 @@ impl Database for DatabasePostgres { output IS NOT NULL ) ) - RETURNING workflow_id, workflow_name, input, wake_deadline_ts + RETURNING workflow_id, workflow_name, create_ts, ray_id, input, wake_deadline_ts ", )) .bind(NODE_ID) @@ -146,6 +151,8 @@ impl Database for DatabasePostgres { PulledWorkflow { workflow_id: row.workflow_id, workflow_name: row.workflow_name, + create_ts: row.create_ts, + ray_id: row.ray_id, input: row.input, wake_deadline_ts: row.wake_deadline_ts, activity_events: Vec::new(), @@ -233,7 +240,11 @@ impl Database for DatabasePostgres { Ok(workflows_by_id.into_values().collect()) } - async fn commit_workflow(&self, workflow_id: Uuid, output: &str) -> WorkflowResult<()> { + async fn commit_workflow( + &self, + workflow_id: Uuid, + output: &serde_json::Value, + ) -> WorkflowResult<()> { sqlx::query(indoc!( " UPDATE db_workflow.workflows @@ -288,8 +299,8 @@ impl Database for DatabasePostgres { workflow_id: Uuid, location: &[usize], activity_id: &ActivityId, - input: &str, - output: Option<&str>, + input: serde_json::Value, + output: Option, ) -> WorkflowResult<()> { sqlx::query(indoc!( " @@ -361,21 +372,23 @@ impl Database for DatabasePostgres { async fn publish_signal( &self, + ray_id: Uuid, workflow_id: Uuid, signal_id: Uuid, signal_name: &str, - body: &str, + body: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( " - INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, create_ts) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO db_workflow.signals (signal_id, workflow_id, signal_name, body, create_ts, ray_id) + VALUES ($1, $2, $3, $4, $5, $6) ", )) .bind(signal_id) .bind(workflow_id) .bind(signal_name) .bind(body) + .bind(ray_id) .bind(rivet_util::timestamp::now()) .execute(&mut *self.conn().await?) .await @@ -386,25 +399,28 @@ impl Database for DatabasePostgres { async fn dispatch_sub_workflow( &self, + ray_id: Uuid, workflow_id: Uuid, location: &[usize], sub_workflow_id: Uuid, sub_workflow_name: &str, - input: &str, + input: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( " WITH workflow AS ( - INSERT INTO db_workflow.workflows (workflow_id, workflow_name, input, wake_immediate) - VALUES ($5, $2, $3, true) + INSERT INTO db_workflow.workflows ( + workflow_id, workflow_name, create_ts, ray_id, input, wake_immediate + ) + VALUES ($5, $2, $3, $4, $5, true) RETURNING 1 ), sub_workflow AS ( INSERT INTO db_workflow.workflow_sub_workflow_events( workflow_id, location, sub_workflow_id ) - VALUES($1, $4, $5) + VALUES($1, $6, $7) RETURNING 1 ) SELECT 1 @@ -412,6 +428,8 @@ impl Database for DatabasePostgres { )) .bind(workflow_id) .bind(sub_workflow_name) + .bind(rivet_util::timestamp::now()) + .bind(ray_id) .bind(input) .bind(location.iter().map(|x| *x as i64).collect::>()) .bind(sub_workflow_id) diff --git a/lib/chirp-workflow/core/src/prelude.rs b/lib/chirp-workflow/core/src/prelude.rs index 75681e66f..25f4c9c86 100644 --- a/lib/chirp-workflow/core/src/prelude.rs +++ b/lib/chirp-workflow/core/src/prelude.rs @@ -18,6 +18,7 @@ pub use crate::{ ctx::*, db, error::{WorkflowError, WorkflowResult}, + executable::closure, executable::Executable, operation::Operation, registry::Registry, diff --git a/lib/chirp-workflow/core/src/registry.rs b/lib/chirp-workflow/core/src/registry.rs index 2daf06dbc..60750e40d 100644 --- a/lib/chirp-workflow/core/src/registry.rs +++ b/lib/chirp-workflow/core/src/registry.rs @@ -36,7 +36,7 @@ impl Registry { run: |ctx| { async move { // Deserialize input - let input = serde_json::from_str(&ctx.input) + let input = serde_json::from_value(ctx.input.as_ref().clone()) .map_err(WorkflowError::DeserializeWorkflowInput)?; // Run workflow @@ -59,10 +59,10 @@ impl Registry { }; // Serialize output - let output_str = serde_json::to_string(&output) + let output_val = serde_json::to_value(output) .map_err(WorkflowError::SerializeWorkflowOutput)?; - Ok(output_str) + Ok(output_val) } .boxed() }, @@ -78,7 +78,8 @@ impl Registry { } pub struct RegistryWorkflow { - pub run: for<'a> fn( - &'a mut WorkflowCtx, - ) -> Pin> + Send + 'a>>, + pub run: + for<'a> fn( + &'a mut WorkflowCtx, + ) -> Pin> + Send + 'a>>, } diff --git a/lib/chirp-workflow/core/src/schema.rs b/lib/chirp-workflow/core/src/schema.rs index 7be489e65..94356eb99 100644 --- a/lib/chirp-workflow/core/src/schema.rs +++ b/lib/chirp-workflow/core/src/schema.rs @@ -25,14 +25,14 @@ pub struct ActivityEvent { pub activity_id: ActivityId, /// If activity succeeds, this will be some. - pub output: Option, + pub output: Option, } impl ActivityEvent { pub fn get_output(&self) -> WorkflowResult> { self.output - .as_deref() - .map(serde_json::from_str) + .clone() + .map(serde_json::from_value) .transpose() .map_err(WorkflowError::DeserializeActivityOutput) } @@ -52,7 +52,7 @@ impl TryFrom for ActivityEvent { #[derive(Debug)] pub struct SignalEvent { pub name: String, - pub body: String, + pub body: serde_json::Value, } impl TryFrom for SignalEvent { diff --git a/lib/chirp-workflow/core/src/signal.rs b/lib/chirp-workflow/core/src/signal.rs index a4ffcbb78..25e64b3eb 100644 --- a/lib/chirp-workflow/core/src/signal.rs +++ b/lib/chirp-workflow/core/src/signal.rs @@ -11,7 +11,7 @@ pub trait Signal { #[async_trait] pub trait Listen: Sized { async fn listen(ctx: &mut WorkflowCtx) -> WorkflowResult; - fn parse(name: &str, body: &str) -> WorkflowResult; + fn parse(name: &str, body: serde_json::Value) -> WorkflowResult; } /// Creates an enum that implements `Listen` and selects one of X signals. @@ -51,17 +51,17 @@ macro_rules! join_signal { #[::async_trait::async_trait] impl Listen for $join { - async fn listen(ctx: &mut ::wf::WorkflowCtx) -> ::wf::WorkflowResult { + async fn listen(ctx: &mut chirp_workflow::prelude::WorkflowCtx) -> chirp_workflow::prelude::WorkflowResult { let row = ctx.listen_any(&[$($signals::name()),*]).await?; - Self::parse(&row.name, &row.body) + Self::parse(&row.name, row.body) } - fn parse(name: &str, body: &str) -> ::wf::WorkflowResult { + fn parse(name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult { $( if name == $signals::name() { Ok( Self::$signals( - serde_json::from_str(body) + serde_json::from_value(body) .map_err(WorkflowError::DeserializeActivityOutput)? ) ) diff --git a/lib/chirp-workflow/core/src/util.rs b/lib/chirp-workflow/core/src/util.rs index 4db453824..697512406 100644 --- a/lib/chirp-workflow/core/src/util.rs +++ b/lib/chirp-workflow/core/src/util.rs @@ -6,6 +6,7 @@ use std::{ use global_error::{macros::*, GlobalResult}; use rand::Rng; use tokio::time::{self, Duration}; +use uuid::Uuid; use crate::{schema::Event, ActivityEventRow, SignalEventRow, SubWorkflowEventRow, WorkflowResult}; @@ -110,3 +111,39 @@ pub fn inject_fault() -> GlobalResult<()> { Ok(()) } + +pub fn wrap_conn( + conn: &rivet_connection::Connection, + ray_id: Uuid, + req_ts: i64, + name: &str, + ts: i64, +) -> ( + rivet_connection::Connection, + rivet_operation::OperationContext<()>, +) { + let req_id = Uuid::new_v4(); + let trace_entry = chirp_client::TraceEntry { + context_name: name.to_string(), + req_id: Some(req_id.into()), + ts, + run_context: match rivet_util::env::run_context() { + rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, + rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, + } as i32, + }; + let conn = conn.wrap(req_id, ray_id, trace_entry); + let mut op_ctx = rivet_operation::OperationContext::new( + name.to_string(), + std::time::Duration::from_secs(60), + conn.clone(), + req_id, + ray_id, + ts, + req_ts, + (), + ); + op_ctx.from_workflow = true; + + (conn, op_ctx) +} diff --git a/lib/chirp-workflow/core/src/worker.rs b/lib/chirp-workflow/core/src/worker.rs index 333b188e7..ef363f877 100644 --- a/lib/chirp-workflow/core/src/worker.rs +++ b/lib/chirp-workflow/core/src/worker.rs @@ -1,5 +1,7 @@ use global_error::GlobalResult; use tokio::time::Duration; +use tracing::Instrument; +use uuid::Uuid; use crate::{util, DatabaseHandle, RegistryHandle, WorkflowCtx}; @@ -49,22 +51,56 @@ impl Worker { // Query awake workflows let workflows = self.db.pull_workflows(®istered_workflows).await?; for workflow in workflows { - let client = shared_client.clone().wrap_new(&workflow.workflow_name); - let conn = rivet_connection::Connection::new(client, pools.clone(), cache.clone()); - + let conn = new_conn( + &shared_client, + pools, + cache, + workflow.workflow_id, + &workflow.workflow_name, + workflow.ray_id, + ); let wake_deadline_ts = workflow.wake_deadline_ts; let ctx = WorkflowCtx::new(self.registry.clone(), self.db.clone(), conn, workflow)?; - tokio::task::spawn(async move { - // Sleep until deadline - if let Some(wake_deadline_ts) = wake_deadline_ts { - util::sleep_until_ts(wake_deadline_ts).await; - } + tokio::task::spawn( + async move { + // Sleep until deadline + if let Some(wake_deadline_ts) = wake_deadline_ts { + util::sleep_until_ts(wake_deadline_ts).await; + } - ctx.run_workflow().await; - }); + ctx.run_workflow().await; + } + .in_current_span(), + ); } Ok(()) } } + +fn new_conn( + shared_client: &chirp_client::SharedClientHandle, + pools: &rivet_pools::Pools, + cache: &rivet_cache::Cache, + workflow_id: Uuid, + name: &str, + ray_id: Uuid, +) -> rivet_connection::Connection { + let req_id = workflow_id; + let client = shared_client.clone().wrap( + req_id, + ray_id, + vec![chirp_client::TraceEntry { + context_name: name.into(), + req_id: Some(req_id.into()), + ts: rivet_util::timestamp::now(), + run_context: match rivet_util::env::run_context() { + rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, + rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, + } as i32, + }], + ); + + rivet_connection::Connection::new(client, pools.clone(), cache.clone()) +} diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index d643213ce..8005f25de 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -201,8 +201,8 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream { Self::parse(&row.name, &row.body) } - fn parse(_name: &str, body: &str) -> chirp_workflow::prelude::WorkflowResult { - serde_json::from_str(body).map_err(WorkflowError::DeserializeActivityOutput) + fn parse(_name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult { + serde_json::from_value(body).map_err(WorkflowError::DeserializeActivityOutput) } } }; diff --git a/lib/chirp/worker/src/manager.rs b/lib/chirp/worker/src/manager.rs index 730afac63..225ad7c59 100644 --- a/lib/chirp/worker/src/manager.rs +++ b/lib/chirp/worker/src/manager.rs @@ -48,7 +48,6 @@ where // Cloned copies of the pools that we've asserted exist. nats: NatsPool, redis_chirp: RedisPool, - redis_cache: RedisPool, } impl Debug for Manager @@ -76,7 +75,6 @@ where let nats = pools.nats()?; let redis_chirp = pools.redis_chirp()?; - let redis_cache = pools.redis_cache()?; let manager = Arc::new(Manager { config: Arc::new(config), @@ -87,7 +85,6 @@ where nats, redis_chirp, - redis_cache, }); Ok(manager) @@ -701,7 +698,7 @@ where x.push(chirp::TraceEntry { context_name: worker_name.clone(), req_id: req_id_proto.clone(), - ts: rivet_util::timestamp::now(), + ts, run_context: chirp::RunContext::Service as i32, }); x diff --git a/lib/operation/core/src/lib.rs b/lib/operation/core/src/lib.rs index d2706d9e1..2f8fe1d46 100644 --- a/lib/operation/core/src/lib.rs +++ b/lib/operation/core/src/lib.rs @@ -40,6 +40,10 @@ where ts: i64, req_ts: i64, body: B, + + // Denotes whether this is from a workflow. Disables the ability to create workflows itself when true + // (workflows must be created at the workflow level context) + pub from_workflow: bool, } impl OperationContext @@ -65,6 +69,7 @@ where ts, req_ts, body, + from_workflow: false, } } @@ -130,11 +135,12 @@ where /// Adds trace and correctly wraps `Connection` (and subsequently `chirp_client::Client`). fn wrap(&self, body: O::Request) -> OperationContext { + let now = util::timestamp::now(); let req_id = Uuid::new_v4(); let trace_entry = chirp_client::TraceEntry { context_name: O::NAME.to_string(), req_id: Some(req_id.into()), - ts: rivet_util::timestamp::now(), + ts: now, run_context: match rivet_util::env::run_context() { rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, @@ -147,9 +153,10 @@ where conn: self.conn.wrap(req_id, self.ray_id, trace_entry), req_id, ray_id: self.ray_id, - ts: util::timestamp::now(), + ts: now, req_ts: self.req_ts, body, + from_workflow: self.from_workflow, } } @@ -165,6 +172,7 @@ where ts: self.ts, req_ts: self.req_ts, body: (), + from_workflow: self.from_workflow, } } diff --git a/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql index 031982071..452d78dfb 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql @@ -8,11 +8,14 @@ CREATE TABLE nodes ( CREATE TABLE workflows ( workflow_id UUID PRIMARY KEY, workflow_name TEXT NOT NULL, + create_ts INT NOT NULL, + ray_id UUID NOT NULL, -- The node that's running this workflow node_id UUID, - input TEXT NOT NULL, + + input JSONB NOT NULL, -- Null if incomplete - output TEXT, + output JSONB, wake_immediate BOOLEAN NOT NULL DEFAULT false, wake_deadline_ts INT, @@ -35,9 +38,9 @@ CREATE TABLE workflow_activity_events ( activity_name TEXT NOT NULL, -- CRDB can't store u64, so we have to store bytes input_hash BYTES NOT NULL, - input TEXT NOT NULL, + input JSONB NOT NULL, -- Null if incomplete - output TEXT, + output JSONB, PRIMARY KEY (workflow_id, location) ); @@ -48,7 +51,7 @@ CREATE TABLE workflow_signal_events ( location INT[] NOT NULL, signal_id TEXT NOT NULL, signal_name TEXT NOT NULL, - body TEXT NOT NULL, + body JSONB NOT NULL, PRIMARY KEY (workflow_id, location) ); @@ -69,9 +72,11 @@ CREATE TABLE signals ( -- exists workflow_id UUID NOT NULL, signal_name TEXT NOT NULL, - body TEXT NOT NULL, create_ts INT NOT NULL, + ray_id UUID NOT NULL, + + body JSONB NOT NULL, INDEX (workflow_id), INDEX (signal_name)