diff --git a/lib/api-helper/build/Cargo.toml b/lib/api-helper/build/Cargo.toml index ffd52b8a8c..193b1b393b 100644 --- a/lib/api-helper/build/Cargo.toml +++ b/lib/api-helper/build/Cargo.toml @@ -13,6 +13,7 @@ macros = [] api-helper-macros = { path = "../macros" } async-trait = "0.1" chirp-client = { path = "../../chirp/client" } +chirp-workflow = { path = "../../chirp-workflow/core" } chrono = "0.4" formatted-error = { path = "../../formatted-error" } futures-util = "0.3" diff --git a/lib/api-helper/build/src/ctx.rs b/lib/api-helper/build/src/ctx.rs index ff83612e08..629f7b73d4 100644 --- a/lib/api-helper/build/src/ctx.rs +++ b/lib/api-helper/build/src/ctx.rs @@ -1,5 +1,6 @@ use std::net::IpAddr; +use chirp_workflow::ctx::ApiCtx; use rivet_operation::OperationContext; use types::rivet::backend; use url::Url; @@ -8,7 +9,7 @@ use crate::auth; pub struct Ctx { pub(crate) auth: A, - pub(crate) op_ctx: OperationContext<()>, + pub(crate) internal_ctx: ApiCtx, pub(crate) user_agent: Option, pub(crate) origin: Option, pub(crate) remote_address: Option, @@ -22,19 +23,19 @@ impl Ctx { } pub fn op_ctx(&self) -> &OperationContext<()> { - &self.op_ctx + self.internal_ctx.op_ctx() } pub fn chirp(&self) -> &chirp_client::Client { - self.op_ctx.chirp() + self.op_ctx().chirp() } pub fn cache(&self) -> rivet_cache::RequestConfig { - self.op_ctx.cache() + self.op_ctx().cache() } pub fn cache_handle(&self) -> rivet_cache::Cache { - self.op_ctx.cache_handle() + self.op_ctx().cache_handle() } pub fn client_info(&self) -> backend::net::ClientInfo { @@ -69,9 +70,9 @@ impl Ctx { } impl std::ops::Deref for Ctx { - type Target = OperationContext<()>; + type Target = ApiCtx; fn deref(&self) -> &Self::Target { - &self.op_ctx + &self.internal_ctx } } diff --git a/lib/api-helper/build/src/macro_util.rs b/lib/api-helper/build/src/macro_util.rs index 563710c245..4e000213e8 100644 --- a/lib/api-helper/build/src/macro_util.rs +++ b/lib/api-helper/build/src/macro_util.rs @@ -11,6 +11,7 @@ use rivet_operation::prelude::util; use serde::de::DeserializeOwned; use url::Url; use uuid::Uuid; +use chirp_workflow::ctx::ApiCtx; use crate::{ auth::{self, AuthRateLimitCtx}, @@ -308,12 +309,12 @@ pub async fn __with_ctx( // Create connections let req_id = Uuid::new_v4(); let ts = rivet_util::timestamp::now(); - let svc_name = rivet_util::env::chirp_service_name().to_string(); + let svc_name = rivet_util::env::chirp_service_name(); let client = shared_client.wrap( req_id, ray_id, vec![chirp_client::TraceEntry { - context_name: svc_name.clone(), + context_name: svc_name.to_string(), req_id: Some(req_id.into()), ts, run_context: match rivet_util::env::run_context() { @@ -323,16 +324,8 @@ pub async fn __with_ctx( }], ); let conn = rivet_connection::Connection::new(client, pools.clone(), cache.clone()); - let op_ctx = rivet_operation::OperationContext::new( - svc_name, - std::time::Duration::from_secs(60), - conn, - req_id, - ray_id, - ts, - ts, - (), - ); + let db = chirp_workflow::compat::db_from_pools(&pools).await?; + let internal_ctx = ApiCtx::new(db, conn, req_id, ray_id, ts, svc_name); // Create auth let rate_limit_ctx = AuthRateLimitCtx { @@ -349,7 +342,7 @@ pub async fn __with_ctx( Ok(Ctx { auth, - op_ctx, + internal_ctx, user_agent, origin, remote_address, diff --git a/lib/chirp-workflow/core/src/activity.rs b/lib/chirp-workflow/core/src/activity.rs index 9ca262d729..1ab8a0b730 100644 --- a/lib/chirp-workflow/core/src/activity.rs +++ b/lib/chirp-workflow/core/src/activity.rs @@ -15,7 +15,7 @@ pub trait Activity { const MAX_RETRIES: u32; const TIMEOUT: std::time::Duration; - async fn run(ctx: &mut ActivityCtx, input: &Self::Input) -> GlobalResult; + async fn run(ctx: &ActivityCtx, input: &Self::Input) -> GlobalResult; } pub trait ActivityInput: Serialize + DeserializeOwned + Debug + Hash + Send { diff --git a/lib/chirp-workflow/core/src/compat.rs b/lib/chirp-workflow/core/src/compat.rs index ba23beedd7..1b313739c7 100644 --- a/lib/chirp-workflow/core/src/compat.rs +++ b/lib/chirp-workflow/core/src/compat.rs @@ -7,8 +7,8 @@ use serde::Serialize; use uuid::Uuid; use crate::{ - DatabaseHandle, DatabasePostgres, Operation, OperationCtx, OperationInput, Signal, Workflow, - WorkflowError, WorkflowInput, + ctx::api::WORKFLOW_TIMEOUT, DatabaseHandle, DatabasePostgres, Operation, OperationCtx, + OperationInput, Signal, Workflow, WorkflowError, WorkflowInput, }; pub async fn dispatch_workflow( @@ -35,7 +35,7 @@ where .map_err(WorkflowError::SerializeWorkflowOutput) .map_err(GlobalError::raw)?; - db(ctx) + db_from_ctx(ctx) .await? .dispatch_workflow(ctx.ray_id(), id, &name, input_val) .await @@ -46,6 +46,8 @@ where Ok(id) } +/// Wait for a given workflow to complete. +/// **IMPORTANT:** Has no timeout. pub async fn wait_for_workflow( ctx: &rivet_operation::OperationContext, workflow_id: Uuid, @@ -58,7 +60,7 @@ pub async fn wait_for_workflow( interval.tick().await; // Check if state finished - let workflow = db(ctx) + let workflow = db_from_ctx(ctx) .await? .get_workflow(workflow_id) .await @@ -71,6 +73,7 @@ pub async fn wait_for_workflow( } } +/// Dispatch a new workflow and wait for it to complete. Has a 60s timeout. pub async fn workflow( ctx: &rivet_operation::OperationContext, input: I, @@ -81,8 +84,12 @@ where B: Debug + Clone, { let workflow_id = dispatch_workflow(ctx, input).await?; - let output = wait_for_workflow::(ctx, workflow_id).await?; - Ok(output) + + tokio::time::timeout( + WORKFLOW_TIMEOUT, + wait_for_workflow::(ctx, workflow_id), + ) + .await? } pub async fn signal( @@ -103,7 +110,7 @@ pub async fn signal( .map_err(WorkflowError::SerializeSignalBody) .map_err(GlobalError::raw)?; - db(ctx) + db_from_ctx(ctx) .await? .publish_signal(ctx.ray_id(), workflow_id, signal_id, I::NAME, input_val) .await @@ -122,7 +129,7 @@ where B: Debug + Clone, { let mut ctx = OperationCtx::new( - db(ctx).await?, + db_from_ctx(ctx).await?, ctx.conn(), ctx.ray_id(), ctx.req_ts(), @@ -137,10 +144,17 @@ where } // Get crdb pool as a trait object -async fn db( +async fn db_from_ctx( ctx: &rivet_operation::OperationContext, ) -> GlobalResult { let crdb = ctx.crdb().await?; Ok(DatabasePostgres::from_pool(crdb)) } + +// Get crdb pool as a trait object +pub async fn db_from_pools(pools: &rivet_pools::Pools) -> GlobalResult { + let crdb = pools.crdb()?; + + Ok(DatabasePostgres::from_pool(crdb)) +} diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index a69164a1fe..1aef06e648 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -75,7 +75,9 @@ impl ActivityCtx { .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) } +} +impl ActivityCtx { pub fn name(&self) -> &str { self.name } diff --git a/lib/chirp-workflow/core/src/ctx/api.rs b/lib/chirp-workflow/core/src/ctx/api.rs new file mode 100644 index 0000000000..6580ce1f93 --- /dev/null +++ b/lib/chirp-workflow/core/src/ctx/api.rs @@ -0,0 +1,268 @@ +use std::time::Duration; + +use global_error::{GlobalError, GlobalResult}; +use rivet_pools::prelude::*; +use serde::Serialize; +use uuid::Uuid; + +use crate::{ + ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, Signal, Workflow, WorkflowError, + WorkflowInput, +}; + +pub const WORKFLOW_TIMEOUT: Duration = Duration::from_secs(60); + +pub struct ApiCtx { + ray_id: Uuid, + name: &'static str, + ts: i64, + + db: DatabaseHandle, + + conn: rivet_connection::Connection, + + // Backwards compatibility + op_ctx: rivet_operation::OperationContext<()>, +} + +impl ApiCtx { + pub fn new( + db: DatabaseHandle, + conn: rivet_connection::Connection, + req_id: Uuid, + ray_id: Uuid, + ts: i64, + name: &'static str, + ) -> Self { + let mut op_ctx = rivet_operation::OperationContext::new( + name.to_string(), + std::time::Duration::from_secs(60), + conn.clone(), + req_id, + ray_id, + ts, + ts, + (), + ); + op_ctx.from_workflow = true; + + ApiCtx { + ray_id, + name, + ts, + db, + conn, + op_ctx, + } + } +} + +impl ApiCtx { + pub async fn dispatch_workflow(&self, input: I) -> GlobalResult + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let name = I::Workflow::NAME; + + tracing::debug!(%name, ?input, "dispatching workflow"); + + let id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeWorkflowOutput) + .map_err(GlobalError::raw)?; + + self.db + .dispatch_workflow(self.ray_id, id, &name, input_val) + .await + .map_err(GlobalError::raw)?; + + tracing::info!(%name, ?id, "workflow dispatched"); + + Ok(id) + } + + /// Wait for a given workflow to complete. + /// **IMPORTANT:** Has no timeout. + pub async fn wait_for_workflow( + &self, + workflow_id: Uuid, + ) -> GlobalResult { + tracing::info!(name=W::NAME, id=?workflow_id, "waiting for workflow"); + + let period = Duration::from_millis(50); + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + + // Check if state finished + let workflow = self + .db + .get_workflow(workflow_id) + .await + .map_err(GlobalError::raw)? + .ok_or(WorkflowError::WorkflowNotFound) + .map_err(GlobalError::raw)?; + if let Some(output) = workflow.parse_output::().map_err(GlobalError::raw)? { + return Ok(output); + } + } + } + + /// Dispatch a new workflow and wait for it to complete. Has a 60s timeout. + pub async fn workflow( + &self, + input: I, + ) -> GlobalResult<<::Workflow as Workflow>::Output> + where + I: WorkflowInput, + ::Workflow: Workflow, + { + let workflow_id = self.dispatch_workflow(input).await?; + + tokio::time::timeout( + WORKFLOW_TIMEOUT, + self.wait_for_workflow::(workflow_id), + ) + .await? + } + + pub async fn signal( + &self, + workflow_id: Uuid, + input: T, + ) -> GlobalResult { + tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal"); + + let signal_id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(input) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; + + self.db + .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, input_val) + .await + .map_err(GlobalError::raw)?; + + Ok(signal_id) + } + + pub async fn op( + &mut self, + input: I, + ) -> GlobalResult<<::Operation as Operation>::Output> + where + I: OperationInput, + ::Operation: Operation, + { + let mut ctx = OperationCtx::new( + self.db.clone(), + &self.conn, + self.ray_id, + self.op_ctx.req_ts(), + false, + I::Operation::NAME, + ); + + I::Operation::run(&mut ctx, &input) + .await + .map_err(WorkflowError::OperationFailure) + .map_err(GlobalError::raw) + } +} + +impl ApiCtx { + pub fn name(&self) -> &str { + self.name + } + + // pub fn timeout(&self) -> Duration { + // self.timeout + // } + + pub fn req_id(&self) -> Uuid { + self.op_ctx.req_id() + } + + pub fn ray_id(&self) -> Uuid { + self.ray_id + } + + /// Timestamp at which the request started. + pub fn ts(&self) -> i64 { + self.ts + } + + /// Timestamp at which the request was published. + pub fn req_ts(&self) -> i64 { + self.op_ctx.req_ts() + } + + /// Time between when the timestamp was processed and when it was published. + pub fn req_dt(&self) -> i64 { + self.ts.saturating_sub(self.op_ctx.req_ts()) + } + + // pub fn perf(&self) -> &chirp_perf::PerfCtx { + // self.conn.perf() + // } + + pub fn trace(&self) -> &[chirp_client::TraceEntry] { + self.conn.trace() + } + + pub fn test(&self) -> bool { + self.trace() + .iter() + .any(|x| x.run_context == chirp_client::RunContext::Test as i32) + } + + pub fn chirp(&self) -> &chirp_client::Client { + self.conn.chirp() + } + + pub fn cache(&self) -> rivet_cache::RequestConfig { + self.conn.cache() + } + + pub fn cache_handle(&self) -> rivet_cache::Cache { + self.conn.cache_handle() + } + + pub async fn crdb(&self) -> Result { + self.conn.crdb().await + } + + pub async fn redis_cache(&self) -> Result { + self.conn.redis_cache().await + } + + pub async fn redis_cdn(&self) -> Result { + self.conn.redis_cdn().await + } + + pub async fn redis_job(&self) -> Result { + self.conn.redis_job().await + } + + pub async fn redis_mm(&self) -> Result { + self.conn.redis_mm().await + } + + pub async fn redis_user_presence(&self) -> Result { + self.conn.redis_user_presence().await + } + + pub async fn clickhouse(&self) -> GlobalResult { + self.conn.clickhouse().await + } + + // Backwards compatibility + pub fn op_ctx(&self) -> &rivet_operation::OperationContext<()> { + &self.op_ctx + } +} diff --git a/lib/chirp-workflow/core/src/ctx/mod.rs b/lib/chirp-workflow/core/src/ctx/mod.rs index 6e1445f00b..8d75c427df 100644 --- a/lib/chirp-workflow/core/src/ctx/mod.rs +++ b/lib/chirp-workflow/core/src/ctx/mod.rs @@ -1,10 +1,12 @@ mod activity; +pub(crate) mod api; mod operation; mod test; mod workflow; pub use activity::ActivityCtx; +pub use api::ApiCtx; pub use operation::OperationCtx; pub use test::TestCtx; pub use workflow::WorkflowCtx; -// TODO: StandaloneCtx, ApiCtx +// TODO: StandaloneCtx diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index 8b46bb96ac..e0cb6d04dc 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -75,7 +75,9 @@ impl OperationCtx { .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) } +} +impl OperationCtx { pub fn name(&self) -> &str { self.name } diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index 74511577ae..351b92af9e 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -126,12 +126,12 @@ impl TestCtx { Ok(output) } - pub async fn signal( + pub async fn signal( &self, workflow_id: Uuid, - input: I, + input: T, ) -> GlobalResult { - tracing::debug!(name=%I::NAME, %workflow_id, "dispatching signal"); + tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal"); let signal_id = Uuid::new_v4(); @@ -141,7 +141,7 @@ impl TestCtx { .map_err(GlobalError::raw)?; self.db - .publish_signal(self.ray_id, workflow_id, signal_id, I::NAME, input_val) + .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, input_val) .await .map_err(GlobalError::raw)?; diff --git a/lib/chirp-workflow/core/src/ctx/workflow.rs b/lib/chirp-workflow/core/src/ctx/workflow.rs index a0a1184f3c..3925bea07d 100644 --- a/lib/chirp-workflow/core/src/ctx/workflow.rs +++ b/lib/chirp-workflow/core/src/ctx/workflow.rs @@ -89,7 +89,8 @@ impl WorkflowCtx { }) } - /// Creates a new workflow run with one more depth in the location. + /// Creates a new workflow run with one more depth in the location. Meant to be implemented and not used + /// directly in workflows. pub fn branch(&mut self) -> Self { let branch = WorkflowCtx { workflow_id: self.workflow_id, @@ -119,7 +120,8 @@ impl WorkflowCtx { branch } - /// Like `branch` but it does not add another layer of depth. + /// Like `branch` but it does not add another layer of depth. Meant to be implemented and not used + /// directly in workflows. pub fn step(&mut self) -> Self { let branch = self.clone(); @@ -239,12 +241,12 @@ impl WorkflowCtx { } /// Run then handle the result of an activity. - pub async fn run_activity( + async fn run_activity( &mut self, input: &A::Input, activity_id: &ActivityId, ) -> WorkflowResult { - let mut ctx = ActivityCtx::new( + let ctx = ActivityCtx::new( self.db.clone(), &self.conn, self.create_ts, @@ -252,7 +254,7 @@ impl WorkflowCtx { A::NAME, ); - let res = tokio::time::timeout(A::TIMEOUT, A::run(&mut ctx, input)) + let res = tokio::time::timeout(A::TIMEOUT, A::run(&ctx, input)) .await .map_err(|_| WorkflowError::ActivityTimeout); @@ -321,7 +323,7 @@ impl WorkflowCtx { // Fetch new pending signal let signal = self .db - .pull_latest_signal( + .pull_next_signal( self.workflow_id, signal_names, self.full_location().as_ref(), @@ -407,7 +409,8 @@ impl WorkflowCtx { Ok(id) } - /// Wait for another workflow's response. + /// Wait for another workflow's response. If no response was found after polling the database, this + /// workflow will go to sleep until the sub workflow completes. pub async fn wait_for_workflow( &self, sub_workflow_id: Uuid, @@ -461,7 +464,7 @@ impl WorkflowCtx { // TODO(RVT-3755): If a sub workflow is dispatched, then the worker is updated to include the sub // worker in the registry, this will diverge in history because it will try to run the sub worker - // in process during the replay + // in-process during the replay // If the workflow isn't in the current registry, dispatch the workflow instead let sub_workflow_id = self.dispatch_workflow(input).await?; let output = self @@ -597,22 +600,21 @@ impl WorkflowCtx { workflow_id: Uuid, body: T, ) -> GlobalResult { - let id = Uuid::new_v4(); + tracing::debug!(name=%T::NAME, %workflow_id, "dispatching signal"); + + let signal_id = Uuid::new_v4(); + + // Serialize input + let input_val = serde_json::to_value(&body) + .map_err(WorkflowError::SerializeSignalBody) + .map_err(GlobalError::raw)?; self.db - .publish_signal( - self.ray_id, - workflow_id, - id, - T::NAME, - serde_json::to_value(&body) - .map_err(WorkflowError::SerializeSignalBody) - .map_err(GlobalError::raw)?, - ) + .publish_signal(self.ray_id, workflow_id, signal_id, T::NAME, input_val) .await .map_err(GlobalError::raw)?; - Ok(id) + Ok(signal_id) } /// Listens for a signal for a short time before setting the workflow to sleep. Once the signal is diff --git a/lib/chirp-workflow/core/src/db/mod.rs b/lib/chirp-workflow/core/src/db/mod.rs index 64898c9c36..16b37c1f7e 100644 --- a/lib/chirp-workflow/core/src/db/mod.rs +++ b/lib/chirp-workflow/core/src/db/mod.rs @@ -59,7 +59,7 @@ pub trait Database: Send { signal_name: &str, body: serde_json::Value, ) -> WorkflowResult<()>; - async fn pull_latest_signal( + async fn pull_next_signal( &self, workflow_id: Uuid, filter: &[&str], diff --git a/lib/chirp-workflow/core/src/db/postgres.rs b/lib/chirp-workflow/core/src/db/postgres.rs index d6fee0d57f..3c5345f756 100644 --- a/lib/chirp-workflow/core/src/db/postgres.rs +++ b/lib/chirp-workflow/core/src/db/postgres.rs @@ -389,7 +389,7 @@ impl Database for DatabasePostgres { Ok(()) } - async fn pull_latest_signal( + async fn pull_next_signal( &self, workflow_id: Uuid, filter: &[&str], @@ -399,7 +399,7 @@ impl Database for DatabasePostgres { let signal = sqlx::query_as::<_, SignalRow>(indoc!( " WITH - latest_signal AS ( + next_signal AS ( DELETE FROM db_workflow.signals WHERE workflow_id = $1 AND @@ -411,7 +411,7 @@ impl Database for DatabasePostgres { clear_wake AS ( UPDATE db_workflow.workflows AS w SET wake_signals = ARRAY[] - FROM db_workflow.latest_signal AS s + FROM next_signal AS s WHERE w.workflow_id = s.workflow_id RETURNING 1 ), @@ -420,10 +420,10 @@ impl Database for DatabasePostgres { workflow_id, location, signal_id, signal_name, body ) SELECT workflow_id, $3 AS location, signal_id, signal_name, body - FROM db_workflow.latest_signal + FROM next_signal RETURNING 1 ) - SELECT * FROM latest_signal + SELECT * FROM next_signal ", )) .bind(workflow_id) diff --git a/lib/chirp-workflow/core/src/executable.rs b/lib/chirp-workflow/core/src/executable.rs index 143b570a31..2790d5a1ee 100644 --- a/lib/chirp-workflow/core/src/executable.rs +++ b/lib/chirp-workflow/core/src/executable.rs @@ -16,6 +16,7 @@ pub trait Executable: Send { type AsyncResult<'a, T> = Pin> + Send + 'a>>; +// Closure executuable impl #[async_trait] impl Executable for F where diff --git a/lib/chirp-workflow/core/src/lib.rs b/lib/chirp-workflow/core/src/lib.rs index a024270364..e724e979bb 100644 --- a/lib/chirp-workflow/core/src/lib.rs +++ b/lib/chirp-workflow/core/src/lib.rs @@ -1,6 +1,6 @@ pub mod activity; pub mod compat; -mod ctx; +pub mod ctx; pub mod db; mod error; mod executable; diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index 477d3360b4..801f9acd7f 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -75,7 +75,7 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { Err(err) => return err.into_compile_error().into(), }; - let ctx_ty = syn::parse_str("&mut ActivityCtx").unwrap(); + let ctx_ty = syn::parse_str("&ActivityCtx").unwrap(); let TraitFnOutput { ctx_ident, input_ident, diff --git a/svc/Cargo.lock b/svc/Cargo.lock index c0ee8ba145..90a7ee52f6 100644 --- a/svc/Cargo.lock +++ b/svc/Cargo.lock @@ -401,6 +401,7 @@ dependencies = [ "api-helper-macros", "async-trait", "chirp-client", + "chirp-workflow", "chrono", "formatted-error", "futures-util", diff --git a/svc/pkg/foo/worker/src/workflows/test.rs b/svc/pkg/foo/worker/src/workflows/test.rs index 63b924ad38..480f99e3ad 100644 --- a/svc/pkg/foo/worker/src/workflows/test.rs +++ b/svc/pkg/foo/worker/src/workflows/test.rs @@ -5,16 +5,22 @@ pub struct TestInput { pub x: i64, } +type TestOutput = Result; + #[derive(Debug, Serialize, Deserialize)] -pub struct TestOutput { +pub struct TestOutputOk { pub y: usize, } +#[derive(Debug, Serialize, Deserialize)] +pub struct TestOutputErr { + pub z: usize, +} #[workflow(Test)] async fn test(ctx: &mut WorkflowCtx, input: &TestInput) -> GlobalResult { let a = ctx.activity(FooInput {}).await?; - Ok(TestOutput { y: a.ids.len() }) + Ok(Ok(TestOutputOk { y: a.ids.len() })) } #[derive(Debug, Serialize, Deserialize, Hash)] @@ -26,7 +32,7 @@ pub struct FooOutput { } #[activity(Foo)] -pub fn foo(ctx: &mut ActivityCtx, input: &FooInput) -> GlobalResult { +pub fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult { let ids = sql_fetch_all!( [ctx, (Uuid,)] "