From 4f009a2bc562d04aff446e6fbcc035b27c119a2d Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 9 May 2024 21:23:45 +0000 Subject: [PATCH] feat: add ray ids to workflows, clean up types --- docs/libraries/workflow/GLOSSARY.md | 38 +++-- docs/libraries/workflow/OVERVIEW.md | 100 ++++++++++++ docs/libraries/workflow/WORKFLOW.md | 98 ----------- lib/chirp-workflow/core/Cargo.toml | 2 +- lib/chirp-workflow/core/src/ctx/activity.rs | 81 +++++----- lib/chirp-workflow/core/src/ctx/operation.rs | 152 +++++++++++++++++- lib/chirp-workflow/core/src/ctx/test.rs | 19 ++- lib/chirp-workflow/core/src/ctx/workflow.rs | 31 ++-- lib/chirp-workflow/core/src/db/mod.rs | 46 ++++-- lib/chirp-workflow/core/src/db/postgres.rs | 49 ++++-- lib/chirp-workflow/core/src/prelude.rs | 1 + lib/chirp-workflow/core/src/registry.rs | 17 +- lib/chirp-workflow/core/src/schema.rs | 8 +- lib/chirp-workflow/core/src/signal.rs | 8 +- lib/chirp-workflow/core/src/util.rs | 37 +++++ lib/chirp-workflow/core/src/worker.rs | 56 +++++-- lib/chirp-workflow/macros/src/lib.rs | 4 +- lib/chirp/client/src/client.rs | 11 -- lib/chirp/worker/src/manager.rs | 27 ++-- lib/connection/src/lib.rs | 14 +- lib/operation/core/src/lib.rs | 32 ++-- svc/pkg/foo/worker/src/workflows/test.rs | 1 - .../migrations/20240430191643_init.up.sql | 16 +- 23 files changed, 551 insertions(+), 297 deletions(-) create mode 100644 docs/libraries/workflow/OVERVIEW.md delete mode 100644 docs/libraries/workflow/WORKFLOW.md diff --git a/docs/libraries/workflow/GLOSSARY.md b/docs/libraries/workflow/GLOSSARY.md index 1ef9cd8f5..d95151b86 100644 --- a/docs/libraries/workflow/GLOSSARY.md +++ b/docs/libraries/workflow/GLOSSARY.md @@ -1,36 +1,38 @@ -TODO - # Glossary ## Worker -A process that's running workflows. +A process that queries for pending workflows with a specific filter. Filter is based on which workflows are registered in the given worker's registry. + +## Registry -There are usually multiple workers running at the same time. +A collection of registered workflows. ## Workflow -A series of activies to be ran together. +A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or sub workflow triggers. -The code defining a workflow only specifies what activites to be ran. There is no complex logic (e.g. database queries) running within workflows. +Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow. -Workflow code can be reran multiple times to replay a workflow. +Upon an activity failure, workflow code can be reran without duplicate side effects because activities are cached and re-read after they succeed. -## Workflow State +## Activity -Persistated data about a workflow. +A block of code that can fail. This cannot trigger other workflows or activities, but it can call operations. -## Workflow Run +## Operation -An instance of a node running a workflow. If re-running a workflow, it will be replaying events. +A block of code. Can fail or not fail, used simply for tidiness. ## Workflow Event An action that gets executed in a workflow. An event can be a: - Activity +- Received signal +- Dispatched sub-workflow -Events store the output from activities and are used to ensure activites are ran only once. +Events store the output from activities and are used to ensure activities are ran only once. ## Workflow Event History @@ -38,7 +40,7 @@ List of events that have executed in this workflow. These are used in replays to ## Workflow Replay -After the first run of a workflow, all runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will be skipped in the replay and use the output from the previous run. +After the first run of a workflow, subsequent runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will not actually run any code and instead use the output from the previous run. ## Workflow Wake Condition @@ -47,10 +49,6 @@ If a workflow is not currently running an activity, wake conditions define when The available conditions are: - **Immediately** Run immediately by the first available node -- **Deadline** Run at a given timesetamp. - -## Activity - -A unit of code to run within a workflow. - -Activities can fail and will be retried accoriding to the retry policy of the workflow. +- **Deadline** Run at a given timestamp. +- **Signal** Run once any one of the listed signals is received. +- **Sub workflow** Run once the given sub workflow is completed. diff --git a/docs/libraries/workflow/OVERVIEW.md b/docs/libraries/workflow/OVERVIEW.md new file mode 100644 index 000000000..993958375 --- /dev/null +++ b/docs/libraries/workflow/OVERVIEW.md @@ -0,0 +1,100 @@ +# Overview + +Workflows are designed to provide highly durable code executions for distributed systems. The main goal is to allow for writing easy to understand multi-step programs with effective error handling, retryability, and a rigid state. + +## Goals + +**Primary** + +- Performance +- Quick iteration speed +- Only depend on CockroachDB + +**Secondary** + +- Easy to monitor & manage via simple SQL queries +- Easier to understand than messages +- Rust-native + - Run in-process and as part of the binary to simplify architecture + - Leverage traits to reduce copies and needless ser/de + - Use native serde instead of Protobuf for simplicity (**this comes at the cost of verifiable backwards compatability with protobuf**) +- Lay foundations for OpenGB + +## Use cases + +- Billing cron jobs with batch +- Creating servers +- Email loops +- Creating dynamic servers + - What about dynamic server lifecycle? Is this more of an actor? This is blending between state and other stuff. +- Deploying CF workers + +## Questions + +- Concurrency +- Nondeterministic patches: https://docs.temporal.io/dev-guide/typescript/versioning#patching +- Do we plan to support side effects? + +## Relation to existing Chirp primitives + +### Messages + +Workflows replace the usecase of messages for durable execution, which is almost all uses of messages. + +The biggest pain point with messages is the lack of a rigid state. Message executions always match the following outline: + +1. Read whatever data is required +2. perform some action(s) +3. update data as needed +4. finish (possibly publish more messages) OR upon failure, start all over at #1 + +The issue with this is that messages do not have any knowledge of messages that came before them, their own previous failed executions, or even other messages of the same system executing in parallel. Without thorough manually written sync checks and consistency validations (which are verbose and hard to follow), this type of execution often results in an overall broken state of whatever system the message is acting on (i.e. matchmaking, server provisioning). + +**Once a broken state is reached, the retry system for messages _practically never_ successfully retries the message.** + +### Cross-package hooks + +We currently use messages for hooking in to events from other workflows so we don't have to bake in support directly. + +This is potentially error prone since it makes control flow more opaque. + +TBD on if we keep this pattern. + +## Post-workflow message uses + +Messages should still be used, but much less frequently. They're helpful for: + +**Real-time Data Processing** + +- When you have a continuous flow of data that needs to be processed in real-time or near-real-time. +- Examples include processing sensor data, social media feeds, financial market data, or clickstream data. +- Stream processing frameworks like Apache Kafka, Apache Flink, or Apache Spark Streaming are well-suited for handling high-volume, real-time data streams. + +**Complex Event Processing (CEP)** + +- When you need to detect and respond to patterns, correlations, or anomalies in real-time data streams. +- CEP involves analyzing and combining multiple event streams to identify meaningful patterns or trigger actions. +- Stream processing frameworks provide capabilities for defining and matching complex event patterns in real-time. + +**Data Transformation and Enrichment** + +- When you need to transform, enrich, or aggregate data as it arrives in real-time. +- This can involve tasks like data cleansing, normalization, joining with other data sources, or applying machine learning models. +- Stream processing allows you to process and transform data on-the-fly, enabling real-time analytics and insights. + +**Continuous Data Integration** + +- When you need to continuously integrate and process data from multiple sources in real-time. +- This can involve merging data streams, performing data synchronization, or updating downstream systems. +- Stream processing frameworks provide connectors and integrations with various data sources and sinks. + +**Real-time Monitoring and Alerting** + +- When you need to monitor data streams in real-time and trigger alerts or notifications based on predefined conditions. +- Stream processing allows you to define rules and thresholds to detect anomalies, errors, or critical events and send real-time alerts. + +**High-throughput, Low-latency Processing** + +- When you have a high volume of data that needs to be processed with low latency. +- Stream processing frameworks are designed to handle high-throughput data streams and provide low-latency processing capabilities. +- This is particularly useful in scenarios like fraud detection, real-time recommendations, or real-time bidding in advertising systems. diff --git a/docs/libraries/workflow/WORKFLOW.md b/docs/libraries/workflow/WORKFLOW.md deleted file mode 100644 index 9878ab8fa..000000000 --- a/docs/libraries/workflow/WORKFLOW.md +++ /dev/null @@ -1,98 +0,0 @@ -## Goals - -**Primary** - -- Performance -- Fast to write for -- Only depend on CockroachDB - -**Secondary** - -- Easy to monitor & manage via simple SQL queries -- Easier to understand than messages -- Rust-native - - Run in-process and as part of the binary to simplify architecture - - Leverage traits to reduce copies and needless ser/de - - Use native serde instead of Protobuf for simplicity (**this comes at the cost of verifiable backwards compatability with protobuf**) -- Lay foundations for OpenGB - -## Use cases - -- Billing cron jobs with batch -- Creating servers -- Email loops -- Creating dynamic servers - - What about dynamic server lifecycle? Is this more of an actor? This is blending between state and other stuff. -- Deploying CF workers - -## Questions - -- Concurrency -- Nondeterministic patches: https://docs.temporal.io/dev-guide/typescript/versioning#patching -- Do we plan to support side effects? - -## Relation to existing Chirp primitives - -### Messages - -Workflows replace the usecase of messages for durable execution, which is almost all uses of messages. - -Messages should still be used, but much less frequently. They're helpful for: - -**Real-time Data Processing** - -- When you have a continuous flow of data that needs to be processed in real-time or near-real-time. -- Examples include processing sensor data, social media feeds, financial market data, or clickstream data. -- Stream processing frameworks like Apache Kafka, Apache Flink, or Apache Spark Streaming are well-suited for handling high-volume, real-time data streams. - -**Complex Event Processing (CEP)** - -- When you need to detect and respond to patterns, correlations, or anomalies in real-time data streams. -- CEP involves analyzing and combining multiple event streams to identify meaningful patterns or trigger actions. -- Stream processing frameworks provide capabilities for defining and matching complex event patterns in real-time. - -**Data Transformation and Enrichment** - -- When you need to transform, enrich, or aggregate data as it arrives in real-time. -- This can involve tasks like data cleansing, normalization, joining with other data sources, or applying machine learning models. -- Stream processing allows you to process and transform data on-the-fly, enabling real-time analytics and insights. - -**Continuous Data Integration** - -- When you need to continuously integrate and process data from multiple sources in real-time. -- This can involve merging data streams, performing data synchronization, or updating downstream systems. -- Stream processing frameworks provide connectors and integrations with various data sources and sinks. - -**Real-time Monitoring and Alerting** - -- When you need to monitor data streams in real-time and trigger alerts or notifications based on predefined conditions. -- Stream processing allows you to define rules and thresholds to detect anomalies, errors, or critical events and send real-time alerts. - -**High-throughput, Low-latency Processing** - -- When you have a high volume of data that needs to be processed with low latency. -- Stream processing frameworks are designed to handle high-throughput data streams and provide low-latency processing capabilities. -- This is particularly useful in scenarios like fraud detection, real-time recommendations, or real-time bidding in advertising systems. - -### Cross-package hooks - -We currently use messages for hooking in to events from other workflows so we don't have to bake in support directly. - -This is potentially error prone since it makes control flow more opaque. - -TBD on if we keed this pattern. - -### Workflows & operations across packages - -**Child workflows** - -TODO - -**Operations** - -TODO - -## Temporal docs - -https://docs.temporal.io/encyclopedia/ - diff --git a/lib/chirp-workflow/core/Cargo.toml b/lib/chirp-workflow/core/Cargo.toml index 9298cd645..e4acfd4c8 100644 --- a/lib/chirp-workflow/core/Cargo.toml +++ b/lib/chirp-workflow/core/Cargo.toml @@ -25,7 +25,7 @@ rivet-runtime = { path = "../../runtime" } rivet-util = { path = "../../util/core" } serde = { version = "1.0.198", features = ["derive"] } serde_json = "1.0.116" -sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "ipnetwork"] } +sqlx = { version = "0.7.4", features = ["runtime-tokio", "postgres", "uuid", "json", "ipnetwork"] } thiserror = "1.0.59" tokio = { version = "1.37.0", features = ["full"] } tracing = "0.1.40" diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index a504afc6d..cc5af986c 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -2,15 +2,17 @@ use global_error::{GlobalError, GlobalResult}; use rivet_pools::prelude::*; use uuid::Uuid; -use crate::{ - ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError, -}; +use crate::{ctx::OperationCtx, util, DatabaseHandle, Operation, OperationInput, WorkflowError}; pub struct ActivityCtx { - db: DatabaseHandle, - conn: rivet_connection::Connection, workflow_id: Uuid, + ray_id: Uuid, name: &'static str, + ts: i64, + + db: DatabaseHandle, + + conn: rivet_connection::Connection, // Backwards compatibility op_ctx: rivet_operation::OperationContext<()>, @@ -19,28 +21,22 @@ pub struct ActivityCtx { impl ActivityCtx { pub fn new( db: DatabaseHandle, - conn: rivet_connection::Connection, + conn: &rivet_connection::Connection, workflow_id: Uuid, + workflow_create_ts: i64, + ray_id: Uuid, name: &'static str, ) -> Self { - let op_ctx = rivet_operation::OperationContext::new( - name.to_string(), - std::time::Duration::from_secs(60), - conn.clone(), - workflow_id, - // TODO: ray_id - Uuid::new_v4(), - rivet_util::timestamp::now(), - // TODO: req_ts - rivet_util::timestamp::now(), - (), - ); + let ts = rivet_util::timestamp::now(); + let (conn, op_ctx) = util::wrap_conn(conn, ray_id, workflow_create_ts, name, ts); ActivityCtx { - db, - conn, workflow_id, + ray_id, name, + ts, + db, + conn, op_ctx, } } @@ -55,7 +51,14 @@ impl ActivityCtx { I: OperationInput, ::Operation: Operation, { - let mut ctx = OperationCtx::new(self.db.clone(), self.workflow_id); + let mut ctx = OperationCtx::new( + self.db.clone(), + &self.conn, + self.workflow_id, + self.ray_id, + self.op_ctx.req_ts(), + I::Operation::name(), + ); I::Operation::run(&mut ctx, &input) .await @@ -71,28 +74,28 @@ impl ActivityCtx { // self.timeout // } - // pub fn req_id(&self) -> Uuid { - // self.req_id - // } + pub fn req_id(&self) -> Uuid { + self.op_ctx.req_id() + } - // pub fn ray_id(&self) -> Uuid { - // self.ray_id - // } + pub fn ray_id(&self) -> Uuid { + self.ray_id + } - // /// Timestamp at which the request started. - // pub fn ts(&self) -> i64 { - // self.ts - // } + /// Timestamp at which the request started. + pub fn ts(&self) -> i64 { + self.ts + } - // /// Timestamp at which the request was published. - // pub fn req_ts(&self) -> i64 { - // self.req_ts - // } + /// Timestamp at which the request was published. + pub fn req_ts(&self) -> i64 { + self.op_ctx.req_ts() + } - // /// Time between when the timestamp was processed and when it was published. - // pub fn req_dt(&self) -> i64 { - // self.ts.saturating_sub(self.req_ts) - // } + /// Time between when the timestamp was processed and when it was published. + pub fn req_dt(&self) -> i64 { + self.ts.saturating_sub(self.op_ctx.req_ts()) + } // pub fn perf(&self) -> &chirp_perf::PerfCtx { // self.conn.perf() diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index 7429ebd65..44551ccaa 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -1,18 +1,158 @@ +use global_error::{GlobalError, GlobalResult}; +use rivet_pools::prelude::*; use uuid::Uuid; -use crate::DatabaseHandle; +use crate::{util, DatabaseHandle, Operation, OperationInput, WorkflowError}; pub struct OperationCtx { - pub db: DatabaseHandle, - pub workflow_id: Uuid, + workflow_id: Uuid, + ray_id: Uuid, + name: &'static str, + ts: i64, + + db: DatabaseHandle, + + conn: rivet_connection::Connection, + + // Backwards compatibility + op_ctx: rivet_operation::OperationContext<()>, } impl OperationCtx { - pub fn new(db: DatabaseHandle, workflow_id: Uuid) -> Self { - OperationCtx { workflow_id, db } + pub fn new( + db: DatabaseHandle, + conn: &rivet_connection::Connection, + workflow_id: Uuid, + ray_id: Uuid, + req_ts: i64, + name: &'static str, + ) -> Self { + let ts = rivet_util::timestamp::now(); + let (conn, op_ctx) = util::wrap_conn(conn, ray_id, req_ts, name, ts); + + OperationCtx { + workflow_id, + ray_id, + name, + ts, + db, + conn, + op_ctx, + } } } impl OperationCtx { - // TODO: + pub async fn op( + &mut self, + input: I, + ) -> GlobalResult<<::Operation as Operation>::Output> + where + I: OperationInput, + ::Operation: Operation, + { + let mut ctx = OperationCtx::new( + self.db.clone(), + &self.conn, + self.workflow_id, + self.ray_id, + self.op_ctx.req_ts(), + I::Operation::name(), + ); + + I::Operation::run(&mut ctx, &input) + .await + .map_err(WorkflowError::OperationFailure) + .map_err(GlobalError::raw) + } + + pub fn name(&self) -> &str { + self.name + } + + // pub fn timeout(&self) -> Duration { + // self.timeout + // } + + pub fn req_id(&self) -> Uuid { + self.op_ctx.req_id() + } + + pub fn ray_id(&self) -> Uuid { + self.ray_id + } + + /// Timestamp at which the request started. + pub fn ts(&self) -> i64 { + self.ts + } + + /// Timestamp at which the request was published. + pub fn req_ts(&self) -> i64 { + self.op_ctx.req_ts() + } + + /// Time between when the timestamp was processed and when it was published. + pub fn req_dt(&self) -> i64 { + self.ts.saturating_sub(self.op_ctx.req_ts()) + } + + // pub fn perf(&self) -> &chirp_perf::PerfCtx { + // self.conn.perf() + // } + + pub fn trace(&self) -> &[chirp_client::TraceEntry] { + self.conn.trace() + } + + pub fn test(&self) -> bool { + self.trace() + .iter() + .any(|x| x.run_context == chirp_client::RunContext::Test as i32) + } + + pub fn chirp(&self) -> &chirp_client::Client { + self.conn.chirp() + } + + pub fn cache(&self) -> rivet_cache::RequestConfig { + self.conn.cache() + } + + pub fn cache_handle(&self) -> rivet_cache::Cache { + self.conn.cache_handle() + } + + pub async fn crdb(&self) -> Result { + self.conn.crdb().await + } + + pub async fn redis_cache(&self) -> Result { + self.conn.redis_cache().await + } + + pub async fn redis_cdn(&self) -> Result { + self.conn.redis_cdn().await + } + + pub async fn redis_job(&self) -> Result { + self.conn.redis_job().await + } + + pub async fn redis_mm(&self) -> Result { + self.conn.redis_mm().await + } + + pub async fn redis_user_presence(&self) -> Result { + self.conn.redis_user_presence().await + } + + pub async fn clickhouse(&self) -> GlobalResult { + self.conn.clickhouse().await + } + + // Backwards compatibility + pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> { + &self.op_ctx + } } diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index a3ab6d06a..17607e472 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -5,14 +5,14 @@ use serde::Serialize; use tokio::time::Duration; use uuid::Uuid; -use crate::{ - DatabaseHandle, DatabasePostgres, Signal, Workflow, WorkflowError, WorkflowInput, -}; +use crate::{DatabaseHandle, DatabasePostgres, Signal, Workflow, WorkflowError, WorkflowInput}; pub type TestCtxHandle = Arc; pub struct TestCtx { name: String, + ray_id: Uuid, + pub db: DatabaseHandle, } @@ -20,6 +20,7 @@ impl TestCtx { pub fn new(db: DatabaseHandle) -> TestCtxHandle { Arc::new(TestCtx { name: "internal-test".to_string(), + ray_id: Uuid::new_v4(), db, }) } @@ -37,6 +38,7 @@ impl TestCtx { TestCtx { name: service_name, + ray_id: Uuid::new_v4(), db, } } @@ -61,12 +63,12 @@ impl TestCtx { let id = Uuid::new_v4(); // Serialize input - let input_str = serde_json::to_string(&input) + let input_val = serde_json::to_value(input) .map_err(WorkflowError::SerializeWorkflowOutput) .map_err(GlobalError::raw)?; self.db - .dispatch_workflow(id, &name, &input_str) + .dispatch_workflow(self.ray_id, id, &name, input_val) .await .map_err(GlobalError::raw)?; @@ -123,11 +125,12 @@ impl TestCtx { let signal_id = Uuid::new_v4(); // Serialize input - let input_str = - serde_json::to_string(&input).map_err(WorkflowError::SerializeSignalBody).map_err(GlobalError::raw)?; + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; self.db - .publish_signal(workflow_id, signal_id, I::name(), &input_str) + .publish_signal(self.ray_id, workflow_id, signal_id, I::name(), input_val) .await .map_err(GlobalError::raw)?; diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index 37e812c0c..04405e235 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -24,6 +24,8 @@ pub struct WorkflowCtx { pub workflow_id: Uuid, /// Name of the workflow to run in the registry. pub name: String, + create_ts: i64, + ray_id: Uuid, registry: RegistryHandle, db: DatabaseHandle, @@ -38,7 +40,7 @@ pub struct WorkflowCtx { /// The reason this type is a hashmap is to allow querying by location. event_history: Arc>>, /// Input data passed to this workflow. - pub(crate) input: Arc, + pub(crate) input: Arc, root_location: Location, location_idx: usize, @@ -54,6 +56,9 @@ impl WorkflowCtx { GlobalResult::Ok(WorkflowCtx { workflow_id: workflow.id, name: workflow.name, + create_ts: workflow.create_ts, + + ray_id: workflow.ray_id, registry, db, @@ -80,6 +85,8 @@ impl WorkflowCtx { let branch = WorkflowCtx { workflow_id: self.workflow_id, name: self.name.clone(), + create_ts: self.create_ts, + ray_id: self.ray_id, registry: self.registry.clone(), db: self.db.clone(), @@ -148,7 +155,7 @@ impl WorkflowCtx { tracing::info!(id=%self.workflow_id, "workflow success"); // Write output - self.db.commit_workflow(self.workflow_id, &output).await?; + self.db.commit_workflow(self.workflow_id, output).await?; } Err(err) => { tracing::warn!(id=%self.workflow_id, ?err, "workflow error"); @@ -192,8 +199,10 @@ impl WorkflowCtx { ) -> WorkflowResult { let mut ctx = ActivityCtx::new( self.db.clone(), - self.conn.clone(), + &self.conn, self.workflow_id, + self.create_ts, + self.ray_id, A::name(), ); @@ -202,14 +211,14 @@ impl WorkflowCtx { tracing::debug!("activity success"); // Write output - let output_str = serde_json::to_string(&output) + let output_val = serde_json::to_value(&output) .map_err(WorkflowError::SerializeActivityOutput)?; self.db .commit_workflow_event( self.workflow_id, self.full_location().as_ref(), activity_id, - Some(&output_str), + Some(output_val), ) .await?; @@ -295,17 +304,18 @@ impl WorkflowCtx { let sub_workflow_id = Uuid::new_v4(); // Serialize input - let input_str = serde_json::to_string(&input) + let input_val = serde_json::to_value(input) .map_err(WorkflowError::SerializeWorkflowOutput) .map_err(GlobalError::raw)?; self.db .dispatch_sub_workflow( + self.ray_id, self.workflow_id, self.full_location().as_ref(), sub_workflow_id, &name, - &input_str, + input_val, ) .await .map_err(GlobalError::raw)?; @@ -431,10 +441,11 @@ impl WorkflowCtx { self.db .publish_signal( + self.ray_id, workflow_id, id, T::name(), - &serde_json::to_string(&body) + serde_json::to_value(&body) .map_err(WorkflowError::SerializeSignalBody) .map_err(GlobalError::raw)?, ) @@ -456,7 +467,7 @@ impl WorkflowCtx { tracing::debug!(id=%self.workflow_id, name=%signal.name, "replaying signal"); - T::parse(&signal.name, &signal.body).map_err(GlobalError::raw)? + T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)? } // Listen for new messages else { @@ -501,7 +512,7 @@ impl WorkflowCtx { return Err(WorkflowError::HistoryDiverged).map_err(GlobalError::raw); }; - Some(T::parse(&signal.name, &signal.body).map_err(GlobalError::raw)?) + Some(T::parse(&signal.name, signal.body.clone()).map_err(GlobalError::raw)?) } // Listen for new message else { diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 7db3752b3..91b443d95 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -11,12 +11,22 @@ pub type DatabaseHandle = Arc; #[async_trait::async_trait] pub trait Database: Send { - async fn dispatch_workflow(&self, id: Uuid, name: &str, input: &str) -> WorkflowResult<()>; + async fn dispatch_workflow( + &self, + ray_id: Uuid, + id: Uuid, + name: &str, + input: serde_json::Value, + ) -> WorkflowResult<()>; async fn get_workflow(&self, id: Uuid) -> WorkflowResult>; async fn pull_workflows(&self, filter: &[&str]) -> WorkflowResult>; // When a workflow is completed - async fn commit_workflow(&self, workflow_id: Uuid, output: &str) -> WorkflowResult<()>; + async fn commit_workflow( + &self, + workflow_id: Uuid, + output: serde_json::Value, + ) -> WorkflowResult<()>; // When a workflow fails async fn fail_workflow( &self, @@ -32,15 +42,16 @@ pub trait Database: Send { workflow_id: Uuid, location: &[usize], activity_id: &ActivityId, - output: Option<&str>, + output: Option, ) -> WorkflowResult<()>; async fn publish_signal( &self, + ray_id: Uuid, workflow_id: Uuid, id: Uuid, name: &str, - body: &str, + body: serde_json::Value, ) -> WorkflowResult<()>; async fn pull_latest_signal( &self, @@ -51,26 +62,26 @@ pub trait Database: Send { async fn dispatch_sub_workflow( &self, + ray_id: Uuid, workflow_id: Uuid, location: &[usize], sub_workflow_id: Uuid, name: &str, - input: &str, + input: serde_json::Value, ) -> WorkflowResult<()>; } #[derive(sqlx::FromRow)] pub struct WorkflowRow { pub id: Uuid, - pub input: String, - pub output: Option, + pub input: serde_json::Value, + pub output: Option, } impl WorkflowRow { - pub fn parse_output(&self) -> WorkflowResult> { + pub fn parse_output(self) -> WorkflowResult> { self.output - .as_deref() - .map(serde_json::from_str) + .map(serde_json::from_value) .transpose() .map_err(WorkflowError::DeserializeWorkflowOutput) } @@ -80,7 +91,9 @@ impl WorkflowRow { pub struct PulledWorkflowRow { pub id: Uuid, pub name: String, - pub input: String, + pub create_ts: i64, + pub ray_id: Uuid, + pub input: serde_json::Value, pub wake_deadline_ts: Option, } @@ -88,8 +101,11 @@ pub struct PulledWorkflowRow { pub struct PulledWorkflow { pub id: Uuid, pub name: String, - pub input: String, + pub create_ts: i64, + pub ray_id: Uuid, + pub input: serde_json::Value, pub wake_deadline_ts: Option, + pub activity_events: Vec, pub signal_events: Vec, pub sub_workflow_events: Vec, @@ -101,7 +117,7 @@ pub struct ActivityEventRow { pub location: Vec, pub name: String, pub input_hash: Vec, - pub output: Option, + pub output: Option, } #[derive(sqlx::FromRow)] @@ -109,7 +125,7 @@ pub struct SignalEventRow { pub workflow_id: Uuid, pub location: Vec, pub name: String, - pub body: String, + pub body: serde_json::Value, } #[derive(sqlx::FromRow)] @@ -124,5 +140,5 @@ pub struct SubWorkflowEventRow { pub struct SignalRow { pub id: Uuid, pub name: String, - pub body: String, + pub body: serde_json::Value, } diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index 87d4d1239..8427fad80 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -58,15 +58,25 @@ impl DatabasePostgres { #[async_trait::async_trait] impl Database for DatabasePostgres { - async fn dispatch_workflow(&self, id: Uuid, name: &str, input: &str) -> WorkflowResult<()> { + async fn dispatch_workflow( + &self, + ray_id: Uuid, + id: Uuid, + name: &str, + input: serde_json::Value, + ) -> WorkflowResult<()> { sqlx::query(indoc!( " - INSERT INTO db_workflow.workflows (id, name, input, wake_immediate) - VALUES ($1, $2, $3, true) + INSERT INTO db_workflow.workflows ( + id, name, create_ts, ray_id, input, wake_immediate + ) + VALUES ($1, $2, $3, $4, $5, true) ", )) .bind(id) .bind(name) + .bind(rivet_util::timestamp::now()) + .bind(ray_id) .bind(input) .execute(&mut *self.conn().await?) .await @@ -122,7 +132,7 @@ impl Database for DatabasePostgres { output IS NOT NULL ) ) - RETURNING id, name, input, wake_deadline_ts + RETURNING id, name, create_ts, ray_id, input, wake_deadline_ts ", )) .bind(NODE_ID) @@ -141,6 +151,8 @@ impl Database for DatabasePostgres { PulledWorkflow { id: row.id, name: row.name, + create_ts: row.create_ts, + ray_id: row.ray_id, input: row.input, wake_deadline_ts: row.wake_deadline_ts, activity_events: Vec::new(), @@ -228,7 +240,11 @@ impl Database for DatabasePostgres { Ok(workflows_by_id.into_values().collect()) } - async fn commit_workflow(&self, workflow_id: Uuid, output: &str) -> WorkflowResult<()> { + async fn commit_workflow( + &self, + workflow_id: Uuid, + output: serde_json::Value, + ) -> WorkflowResult<()> { sqlx::query(indoc!( " UPDATE db_workflow.workflows @@ -283,7 +299,7 @@ impl Database for DatabasePostgres { workflow_id: Uuid, location: &[usize], activity_id: &ActivityId, - output: Option<&str>, + output: Option, ) -> WorkflowResult<()> { sqlx::query(indoc!( " @@ -355,21 +371,23 @@ impl Database for DatabasePostgres { async fn publish_signal( &self, + ray_id: Uuid, workflow_id: Uuid, id: Uuid, name: &str, - body: &str, + body: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( " - INSERT INTO db_workflow.signals (id, workflow_id, name, body, create_ts) - VALUES ($1, $2, $3, $4, $5) + INSERT INTO db_workflow.signals (id, workflow_id, name, body, create_ts, ray_id) + VALUES ($1, $2, $3, $4, $5, $6) ", )) .bind(id) .bind(workflow_id) .bind(name) .bind(body) + .bind(ray_id) .bind(rivet_util::timestamp::now()) .execute(&mut *self.conn().await?) .await @@ -380,25 +398,28 @@ impl Database for DatabasePostgres { async fn dispatch_sub_workflow( &self, + ray_id: Uuid, workflow_id: Uuid, location: &[usize], sub_workflow_id: Uuid, name: &str, - input: &str, + input: serde_json::Value, ) -> WorkflowResult<()> { sqlx::query(indoc!( " WITH workflow AS ( - INSERT INTO db_workflow.workflows (id, name, input, wake_immediate) - VALUES ($5, $2, $3, true) + INSERT INTO db_workflow.workflows ( + id, name, create_ts, ray_id, input, wake_immediate + ) + VALUES ($5, $2, $3, $4, $5, true) RETURNING 1 ), sub_workflow AS ( INSERT INTO db_workflow.workflow_sub_workflow_events( workflow_id, location, sub_workflow_id ) - VALUES($1, $4, $5) + VALUES($1, $6, $7) RETURNING 1 ) SELECT 1 @@ -406,6 +427,8 @@ impl Database for DatabasePostgres { )) .bind(workflow_id) .bind(name) + .bind(rivet_util::timestamp::now()) + .bind(ray_id) .bind(input) .bind(location.iter().map(|x| *x as i64).collect::>()) .bind(sub_workflow_id) diff --git a/lib/chirp-workflow/core/src/prelude.rs b/lib/chirp-workflow/core/src/prelude.rs index 75681e66f..25f4c9c86 100644 --- a/lib/chirp-workflow/core/src/prelude.rs +++ b/lib/chirp-workflow/core/src/prelude.rs @@ -18,6 +18,7 @@ pub use crate::{ ctx::*, db, error::{WorkflowError, WorkflowResult}, + executable::closure, executable::Executable, operation::Operation, registry::Registry, diff --git a/lib/chirp-workflow/core/src/registry.rs b/lib/chirp-workflow/core/src/registry.rs index 3eded152e..fff700fcb 100644 --- a/lib/chirp-workflow/core/src/registry.rs +++ b/lib/chirp-workflow/core/src/registry.rs @@ -35,7 +35,7 @@ impl Registry { run: |ctx| { async move { // Deserialize input - let input = serde_json::from_str(&ctx.input) + let input = serde_json::from_value(ctx.input.as_ref().clone()) .map_err(WorkflowError::DeserializeWorkflowInput)?; // Run workflow @@ -58,10 +58,10 @@ impl Registry { }; // Serialize output - let output_str = serde_json::to_string(&output) + let output_val = serde_json::to_value(output) .map_err(WorkflowError::SerializeWorkflowOutput)?; - Ok(output_str) + Ok(output_val) } .boxed() }, @@ -77,11 +77,8 @@ impl Registry { } pub struct RegistryWorkflow { - pub run: for<'a> fn( - &'a mut WorkflowCtx, - ) -> Pin> + Send + 'a>>, + pub run: + for<'a> fn( + &'a mut WorkflowCtx, + ) -> Pin> + Send + 'a>>, } - -// pub struct RegistryActivity { -// pub run: fn(ActivityCtx, String) -> Pin>>>, -// } diff --git a/lib/chirp-workflow/core/src/schema.rs b/lib/chirp-workflow/core/src/schema.rs index c2e76bb31..3fac2765f 100644 --- a/lib/chirp-workflow/core/src/schema.rs +++ b/lib/chirp-workflow/core/src/schema.rs @@ -25,14 +25,14 @@ pub struct ActivityEvent { pub activity_id: ActivityId, /// If activity succeeds, this will be some. - pub output: Option, + pub output: Option, } impl ActivityEvent { pub fn get_output(&self) -> WorkflowResult> { self.output - .as_deref() - .map(serde_json::from_str) + .clone() + .map(serde_json::from_value) .transpose() .map_err(WorkflowError::DeserializeActivityOutput) } @@ -52,7 +52,7 @@ impl TryFrom for ActivityEvent { #[derive(Debug)] pub struct SignalEvent { pub name: String, - pub body: String, + pub body: serde_json::Value, } impl TryFrom for SignalEvent { diff --git a/lib/chirp-workflow/core/src/signal.rs b/lib/chirp-workflow/core/src/signal.rs index baf198719..e5fdb514e 100644 --- a/lib/chirp-workflow/core/src/signal.rs +++ b/lib/chirp-workflow/core/src/signal.rs @@ -9,7 +9,7 @@ pub trait Signal { #[async_trait] pub trait Listen: Sized { async fn listen(ctx: &mut WorkflowCtx) -> WorkflowResult; - fn parse(name: &str, body: &str) -> WorkflowResult; + fn parse(name: &str, body: serde_json::Value) -> WorkflowResult; } /// Creates an enum that implements `Listen` and selects one of X signals. @@ -24,15 +24,15 @@ macro_rules! join_signal { impl Listen for $join { async fn listen(ctx: &mut ::wf::WorkflowCtx) -> ::wf::WorkflowResult { let row = ctx.listen_any(&[$($signals::name()),*]).await?; - Self::parse(&row.name, &row.body) + Self::parse(&row.name, row.body) } - fn parse(name: &str, body: &str) -> ::wf::WorkflowResult { + fn parse(name: &str, body: serde_json::Value) -> ::wf::WorkflowResult { $( if name == $signals::name() { Ok( Self::$signals( - serde_json::from_str(body) + serde_json::from_value(body) .map_err(WorkflowError::DeserializeActivityOutput)? ) ) diff --git a/lib/chirp-workflow/core/src/util.rs b/lib/chirp-workflow/core/src/util.rs index 4709129c6..3aee2922c 100644 --- a/lib/chirp-workflow/core/src/util.rs +++ b/lib/chirp-workflow/core/src/util.rs @@ -6,6 +6,7 @@ use std::{ use global_error::{macros::*, GlobalResult}; use rand::Rng; use tokio::time::{self, Duration}; +use uuid::Uuid; use crate::{schema::Event, ActivityEventRow, SignalEventRow, SubWorkflowEventRow, WorkflowResult}; @@ -97,3 +98,39 @@ pub fn inject_fault() -> GlobalResult<()> { Ok(()) } + +pub fn wrap_conn( + conn: &rivet_connection::Connection, + ray_id: Uuid, + req_ts: i64, + name: &str, + ts: i64, +) -> ( + rivet_connection::Connection, + rivet_operation::OperationContext<()>, +) { + let req_id = Uuid::new_v4(); + let trace_entry = chirp_client::TraceEntry { + context_name: name.to_string(), + req_id: Some(req_id.into()), + ts, + run_context: match rivet_util::env::run_context() { + rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, + rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, + } as i32, + }; + let conn = conn.wrap(req_id, ray_id, trace_entry); + let mut op_ctx = rivet_operation::OperationContext::new( + name.to_string(), + std::time::Duration::from_secs(60), + conn.clone(), + req_id, + ray_id, + ts, + req_ts, + (), + ); + op_ctx.from_workflow = true; + + (conn, op_ctx) +} diff --git a/lib/chirp-workflow/core/src/worker.rs b/lib/chirp-workflow/core/src/worker.rs index babd80244..eba6c8e9d 100644 --- a/lib/chirp-workflow/core/src/worker.rs +++ b/lib/chirp-workflow/core/src/worker.rs @@ -1,5 +1,7 @@ use global_error::GlobalResult; use tokio::time::Duration; +use tracing::Instrument; +use uuid::Uuid; use crate::{util, DatabaseHandle, RegistryHandle, WorkflowCtx}; @@ -45,22 +47,56 @@ impl Worker { // Query awake workflows let workflows = self.db.pull_workflows(®istered_workflows).await?; for workflow in workflows { - let client = shared_client.clone().wrap_new(&workflow.name); - let conn = rivet_connection::Connection::new(client, pools.clone(), cache.clone()); - + let conn = new_conn( + &shared_client, + pools, + cache, + workflow.id, + &workflow.name, + workflow.ray_id, + ); let wake_deadline_ts = workflow.wake_deadline_ts; let ctx = WorkflowCtx::new(self.registry.clone(), self.db.clone(), conn, workflow)?; - tokio::task::spawn(async move { - // Sleep until deadline - if let Some(wake_deadline_ts) = wake_deadline_ts { - util::sleep_until_ts(wake_deadline_ts).await; - } + tokio::task::spawn( + async move { + // Sleep until deadline + if let Some(wake_deadline_ts) = wake_deadline_ts { + util::sleep_until_ts(wake_deadline_ts).await; + } - ctx.run_workflow().await; - }); + ctx.run_workflow().await; + } + .in_current_span(), + ); } Ok(()) } } + +fn new_conn( + shared_client: &chirp_client::SharedClientHandle, + pools: &rivet_pools::Pools, + cache: &rivet_cache::Cache, + workflow_id: Uuid, + name: &str, + ray_id: Uuid, +) -> rivet_connection::Connection { + let req_id = workflow_id; + let client = shared_client.clone().wrap( + req_id, + ray_id, + vec![chirp_client::TraceEntry { + context_name: name.into(), + req_id: Some(req_id.into()), + ts: rivet_util::timestamp::now(), + run_context: match rivet_util::env::run_context() { + rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, + rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, + } as i32, + }], + ); + + rivet_connection::Connection::new(client, pools.clone(), cache.clone()) +} diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index 29e7d2ace..62814fccd 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -200,8 +200,8 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream { Self::parse(&row.name, &row.body) } - fn parse(_name: &str, body: &str) -> chirp_workflow::prelude::WorkflowResult { - serde_json::from_str(body).map_err(WorkflowError::DeserializeActivityOutput) + fn parse(_name: &str, body: serde_json::Value) -> chirp_workflow::prelude::WorkflowResult { + serde_json::from_value(body).map_err(WorkflowError::DeserializeActivityOutput) } } }; diff --git a/lib/chirp/client/src/client.rs b/lib/chirp/client/src/client.rs index 35188d541..86fc5269c 100644 --- a/lib/chirp/client/src/client.rs +++ b/lib/chirp/client/src/client.rs @@ -131,17 +131,6 @@ impl SharedClient { ts, ) } - - pub fn wrap_with( - self: Arc, - parent_req_id: Uuid, - ray_id: Uuid, - ts: i64, - trace: Vec, - perf_ctx: chirp_perf::PerfCtxInner, - ) -> Client { - Client::new(self, parent_req_id, ray_id, trace, Arc::new(perf_ctx), ts) - } } /// Used to communicate with other Chirp clients. diff --git a/lib/chirp/worker/src/manager.rs b/lib/chirp/worker/src/manager.rs index da2ef1e49..215162b5b 100644 --- a/lib/chirp/worker/src/manager.rs +++ b/lib/chirp/worker/src/manager.rs @@ -695,23 +695,16 @@ where let worker_req = { // Build client - let ts = rivet_util::timestamp::now(); - let client = self.shared_client.clone().wrap_with( - req_id, - ray_id, - ts, - { - let mut x = trace.clone(); - x.push(chirp::TraceEntry { - context_name: worker_name.clone(), - req_id: req_id_proto.clone(), - ts: rivet_util::timestamp::now(), - run_context: chirp::RunContext::Service as i32, - }); - x - }, - chirp_perf::PerfCtxInner::new(self.redis_cache.clone(), ts, req_id, ray_id), - ); + let client = self.shared_client.clone().wrap(req_id, ray_id, { + let mut x = trace.clone(); + x.push(chirp::TraceEntry { + context_name: worker_name.clone(), + req_id: req_id_proto.clone(), + ts: rivet_util::timestamp::now(), + run_context: chirp::RunContext::Service as i32, + }); + x + }); let conn = Connection::new(client, self.pools.clone(), self.cache.clone()); let ts = req_debug diff --git a/lib/connection/src/lib.rs b/lib/connection/src/lib.rs index a5c0955c7..cc5f0425e 100644 --- a/lib/connection/src/lib.rs +++ b/lib/connection/src/lib.rs @@ -31,26 +31,20 @@ impl Connection { parent_req_id: Uuid, ray_id: Uuid, trace_entry: chirp_client::TraceEntry, - ) -> GlobalResult { - // Not the same as the operation ctx's ts because this cannot be overridden by debug start ts - let ts = rivet_util::timestamp::now(); - let redis_cache = self.pools.redis("ephemeral")?; - - Ok(Connection::new( - (*self.client).clone().wrap_with( + ) -> Connection { + Connection::new( + (*self.client).clone().wrap( parent_req_id, ray_id, - ts, { let mut x = self.client.trace().to_vec(); x.push(trace_entry); x }, - chirp_perf::PerfCtxInner::new(redis_cache, ts, parent_req_id, ray_id), ), self.pools.clone(), self.cache.clone(), - )) + ) } pub fn chirp(&self) -> &chirp_client::Client { diff --git a/lib/operation/core/src/lib.rs b/lib/operation/core/src/lib.rs index 7f30e5a44..2f8fe1d46 100644 --- a/lib/operation/core/src/lib.rs +++ b/lib/operation/core/src/lib.rs @@ -40,6 +40,10 @@ where ts: i64, req_ts: i64, body: B, + + // Denotes whether this is from a workflow. Disables the ability to create workflows itself when true + // (workflows must be created at the workflow level context) + pub from_workflow: bool, } impl OperationContext @@ -65,6 +69,7 @@ where ts, req_ts, body, + from_workflow: false, } } @@ -85,7 +90,7 @@ where // TODO: Throw dedicated "timed out" error here // Process the request - let req_op_ctx = self.wrap::(body)?; + let req_op_ctx = self.wrap::(body); let timeout_fut = tokio::time::timeout(O::TIMEOUT, O::handle(req_op_ctx).in_current_span()); let res = tokio::task::Builder::new() .name("operation::handle") @@ -129,28 +134,30 @@ where } /// Adds trace and correctly wraps `Connection` (and subsequently `chirp_client::Client`). - fn wrap(&self, body: O::Request) -> GlobalResult> { - let ray_id = Uuid::new_v4(); + fn wrap(&self, body: O::Request) -> OperationContext { + let now = util::timestamp::now(); + let req_id = Uuid::new_v4(); let trace_entry = chirp_client::TraceEntry { - context_name: self.name.clone(), - req_id: Some(self.req_id.into()), - ts: rivet_util::timestamp::now(), + context_name: O::NAME.to_string(), + req_id: Some(req_id.into()), + ts: now, run_context: match rivet_util::env::run_context() { rivet_util::env::RunContext::Service => chirp_client::RunContext::Service, rivet_util::env::RunContext::Test => chirp_client::RunContext::Test, } as i32, }; - Ok(OperationContext { + OperationContext { name: O::NAME.to_string(), timeout: O::TIMEOUT, - conn: self.conn.wrap(self.req_id, ray_id, trace_entry)?, - req_id: self.req_id, - ray_id, - ts: util::timestamp::now(), + conn: self.conn.wrap(req_id, self.ray_id, trace_entry), + req_id, + ray_id: self.ray_id, + ts: now, req_ts: self.req_ts, body, - }) + from_workflow: self.from_workflow, + } } /// Clones everything but the body. This should always be used over `.clone()` unless you need to @@ -165,6 +172,7 @@ where ts: self.ts, req_ts: self.req_ts, body: (), + from_workflow: self.from_workflow, } } diff --git a/svc/pkg/foo/worker/src/workflows/test.rs b/svc/pkg/foo/worker/src/workflows/test.rs index 3d55d3618..63b924ad3 100644 --- a/svc/pkg/foo/worker/src/workflows/test.rs +++ b/svc/pkg/foo/worker/src/workflows/test.rs @@ -46,6 +46,5 @@ pub fn foo(ctx: &mut ActivityCtx, input: &FooInput) -> GlobalResult { .await?; let user = unwrap!(user_get_res.users.first()); - Ok(FooOutput { ids }) } diff --git a/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql b/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql index 9773a26d7..c66dd3591 100644 --- a/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql +++ b/svc/pkg/workflow/db/workflow/migrations/20240430191643_init.up.sql @@ -8,11 +8,14 @@ CREATE TABLE nodes ( CREATE TABLE workflows ( id UUID PRIMARY KEY, name TEXT NOT NULL, + create_ts INT NOT NULL, + ray_id UUID NOT NULL, -- The node that's running this workflow node_id UUID, - input TEXT NOT NULL, + + input JSONB NOT NULL, -- Null if incomplete - output TEXT, + output JSONB, wake_immediate BOOLEAN NOT NULL DEFAULT false, wake_deadline_ts INT, @@ -32,7 +35,7 @@ CREATE TABLE workflow_activity_events ( -- CRDB can't store u64, so we have to store bytes input_hash BYTES NOT NULL, -- Null if incomplete - output TEXT, + output JSONB, PRIMARY KEY (workflow_id, location) ); @@ -43,7 +46,7 @@ CREATE TABLE workflow_signal_events ( location INT[] NOT NULL, id TEXT NOT NULL, name TEXT NOT NULL, - body TEXT NOT NULL, + body JSONB NOT NULL, PRIMARY KEY (workflow_id, location) ); @@ -64,9 +67,10 @@ CREATE TABLE signals ( -- exists workflow_id UUID NOT NULL, name TEXT NOT NULL, - body TEXT NOT NULL, - create_ts INT NOT NULL, + ray_id UUID NOT NULL, + + body JSONB NOT NULL, INDEX (workflow_id), INDEX (name)