From 0a0d377a7bc2cfda92848beae454b596cdd4bc2b Mon Sep 17 00:00:00 2001 From: MasterPtato <23087326+MasterPtato@users.noreply.github.com> Date: Tue, 18 Jun 2024 19:19:03 +0000 Subject: [PATCH] feat(workflows): add operations service type (#898) ## Changes --- docs/libraries/workflow/ERRORS.md | 61 ++++++++++++++++++- docs/libraries/workflow/GLOSSARY.md | 46 ++++++++------ lib/bolt/config/src/service.rs | 5 ++ lib/bolt/core/src/context/project.rs | 22 ++++++- lib/bolt/core/src/context/service.rs | 3 + lib/bolt/core/src/dep/k8s/gen.rs | 1 + lib/bolt/core/src/tasks/gen.rs | 47 +++++++++----- lib/bolt/core/src/tasks/template.rs | 17 ++++++ lib/chirp-workflow/core/src/compat.rs | 4 +- lib/chirp-workflow/core/src/ctx/activity.rs | 5 +- lib/chirp-workflow/core/src/ctx/api.rs | 6 +- lib/chirp-workflow/core/src/ctx/operation.rs | 6 +- lib/chirp-workflow/core/src/ctx/test.rs | 6 +- lib/chirp-workflow/core/src/operation.rs | 2 +- lib/chirp-workflow/macros/src/lib.rs | 54 +++++++++++----- svc/pkg/foo/worker/src/lib.rs | 2 +- svc/pkg/foo/worker/src/workflows/mod.rs | 1 - svc/pkg/foo/worker/src/workflows/test.rs | 8 +-- svc/pkg/foo/worker/tests/test.rs | 2 +- .../standalone/workflow-worker/src/lib.rs | 10 +-- 20 files changed, 227 insertions(+), 81 deletions(-) diff --git a/docs/libraries/workflow/ERRORS.md b/docs/libraries/workflow/ERRORS.md index bf6d6b6da..f7b26b73c 100644 --- a/docs/libraries/workflow/ERRORS.md +++ b/docs/libraries/workflow/ERRORS.md @@ -9,5 +9,62 @@ because they will never succeed (the state is consistent up the point of error). Sub workflow errors cannot be caught because it's up to the workflow to handle its own errors gracefully. -We return OK responses from workflows for failure cases we explicitly handle (e.g. linode server provision -cleaning itself up) +We return OK responses from workflows for failure cases that we will explicitly handle (e.g. linode server +provision cleaning itself up). See +[Errors that are meant to be propagated up](#errors-that-are-meant-to-be-propagated-up). + +## Propagation + +There are 3 classes of errors in workflows: + +1. Errors that can't be retried +2. Errors that can be retried +3. Errors that are meant to be propagated up + +### Errors that can't be retried + +Certain errors cannot be retried by the workflow system. These are usually problems with the internal +mechanisms of the workflow system itself. + +### Errors that can be retried + +All user errors thrown in an activity will cause a workflow retry. While this is good for errors meant to be +retried, it causes unnecessary retries for errors that you know can't be recovered from (like assertions). We +don't currently have a way to mitigate this besides propagating the errors manually (see below) or letting the +useless retries happen. + +### Errors that are meant to be propagated up + +To propagate an error, you must manually serialize it in the activity/workflow output. The workflow itself +will succeed, but the output data will have the error you want to propagate up. + +You can use nested `Result`'s for this: + +```rust +#[derive(...)] +struct MyActivityInput { } + +type MyActivityOutput = Result; + +#[derive(...)] +struct MyActivityOutputOk { + foo: String, +} + +#[derive(...)] +struct MyActivityOutputErr { + bar: u32, +} + +fn activity(input: MyActivityInput) -> GlobalResult { + if ... { + return Ok(Err(MyActivityOutputErr { + bar: 404, + })); + } + + Ok(Ok(MyActivityOutputOk { + foo: "all good".to_string(), + })) +} +``` diff --git a/docs/libraries/workflow/GLOSSARY.md b/docs/libraries/workflow/GLOSSARY.md index 20eaf8498..46c734538 100644 --- a/docs/libraries/workflow/GLOSSARY.md +++ b/docs/libraries/workflow/GLOSSARY.md @@ -2,8 +2,9 @@ ## Worker -A process that queries for pending workflows with a specific filter. Filter is based on which workflows are registered in the given worker's registry. -The queried workflows are run on the same machine as the worker but given their own thread. +A process that queries for pending workflows with a specific filter. Filter is based on which workflows are +registered in the given worker's registry. The queried workflows are run on the same machine as the worker but +given their own thread. ## Registry @@ -11,11 +12,14 @@ A collection of registered workflows. This is solely used for the worker to fetc ## Workflow -A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or sub workflow triggers. +A series of fallible executions of code (also known as activities), signal listeners, signal transmitters, or +sub workflow triggers. -Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow. +Workflows can be though of as a list of tasks. The code defining a workflow only specifies what items should +be ran; There is no complex logic (e.g. database queries) running within the top level of the workflow. -Upon an activity failure, workflow code can be reran without duplicate side effects because activities are cached and re-read after they succeed. +Upon an activity failure, workflow code can be reran without duplicate side effects because activities are +cached and re-read after they succeed. ## Activity @@ -25,13 +29,13 @@ workflow fails. ## Operation -Effectively a native rust function. Can fail or not fail, used simply for tidiness (as you would with any other function). -Operations can only be called from activities, not from workflows. +Effectively a native rust function. Can fail or not fail. Used for widely used operations like fetching a +user. Operations cannot be called from workflows. Examples include: -- most `get` operations (`user-get`) -- any complex logic you'd want in it's own function (fetching some http data and parsing it) +- most `get` operations (`user-get`) +- any complex logic you'd want in it's own function (fetching some http data and parsing it) Operations are not required; all of their functionality can be put into an activity instead. @@ -39,27 +43,31 @@ Operations are not required; all of their functionality can be put into an activ An action that gets executed in a workflow. An event can be a: -- Activity -- Received signal -- Dispatched sub-workflow +- Activity +- Received signal +- Dispatched sub-workflow Events store the output from activities and are used to ensure activities are ran only once. ## Workflow Event History -List of events that have executed in this workflow. These are used in replays to verify that the workflow has not changed to an invalid state. +List of events that have executed in this workflow. These are used in replays to verify that the workflow has +not changed to an invalid state. ## Workflow Replay -After the first run of a workflow, subsequent runs will replay the activities and compare against the event history. If an activity has already been ran successfully, the activity will not actually run any code and instead use the output from the previous run. +After the first run of a workflow, subsequent runs will replay the activities and compare against the event +history. If an activity has already been ran successfully, the activity will not actually run any code and +instead use the output from the previous run. ## Workflow Wake Condition -If a workflow is not currently running an activity, wake conditions define when the workflow should be ran again. +If a workflow is not currently running an activity, wake conditions define when the workflow should be ran +again. The available conditions are: -- **Immediately** Run immediately by the first available node -- **Deadline** Run at a given timestamp. -- **Signal** Run once any one of the listed signals is received. -- **Sub workflow** Run once the given sub workflow is completed. +- **Immediately** Run immediately by the first available node +- **Deadline** Run at a given timestamp. +- **Signal** Run once any one of the listed signals is received. +- **Sub workflow** Run once the given sub workflow is completed. diff --git a/lib/bolt/config/src/service.rs b/lib/bolt/config/src/service.rs index 8bfb9aab6..5c62ec219 100644 --- a/lib/bolt/config/src/service.rs +++ b/lib/bolt/config/src/service.rs @@ -99,6 +99,9 @@ pub enum ServiceKind { #[serde(rename = "operation")] Operation {}, + #[serde(rename = "operations")] + Operations {}, + // TODO: Rename to worker #[serde(rename = "consumer")] Consumer { @@ -359,6 +362,7 @@ impl ServiceKind { ServiceKind::Static { .. } => "static", ServiceKind::Database { .. } => "database", ServiceKind::Cache { .. } => "cache", + ServiceKind::Operations { .. } => "operations", } } @@ -371,6 +375,7 @@ impl ServiceKind { | ServiceKind::Api { .. } => ComponentClass::Executable, ServiceKind::Operation { .. } + | ServiceKind::Operations { .. } | ServiceKind::Consumer { .. } | ServiceKind::ApiRoutes { .. } => ComponentClass::NonExecutable, ServiceKind::Database { .. } => ComponentClass::Database, diff --git a/lib/bolt/core/src/context/project.rs b/lib/bolt/core/src/context/project.rs index 80d8ea879..eaf4e282c 100644 --- a/lib/bolt/core/src/context/project.rs +++ b/lib/bolt/core/src/context/project.rs @@ -410,7 +410,27 @@ impl ProjectContextData { .await; // Read ops - Self::load_services_dir(svc_ctxs_map, &workspace_path, pkg.path().join("ops")).await; + // Check if service config exists + if fs::metadata(pkg.path().join("ops").join("Service.toml")) + .await + .is_ok() + { + // Load the ops directory as a single service + let svc_ctx = context::service::ServiceContextData::from_path( + Weak::new(), + svc_ctxs_map, + &workspace_path, + &pkg.path().join("ops"), + ) + .await + .unwrap(); + + svc_ctxs_map.insert(svc_ctx.name(), svc_ctx.clone()); + } else { + // Load all individual ops + Self::load_services_dir(svc_ctxs_map, &workspace_path, pkg.path().join("ops")) + .await; + } // Read dbs Self::load_services_dir(svc_ctxs_map, &workspace_path, pkg.path().join("db")).await; diff --git a/lib/bolt/core/src/context/service.rs b/lib/bolt/core/src/context/service.rs index 779430d96..a5439f225 100644 --- a/lib/bolt/core/src/context/service.rs +++ b/lib/bolt/core/src/context/service.rs @@ -512,6 +512,7 @@ impl ServiceContextData { dep.config().kind, ServiceKind::Database { .. } | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } + | ServiceKind::Operations { .. } | ServiceKind::Consumer { .. } ) } else if matches!(self.config().kind, ServiceKind::Api { .. }) { @@ -519,6 +520,7 @@ impl ServiceContextData { dep.config().kind, ServiceKind::Database { .. } | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } + | ServiceKind::Operations { .. } | ServiceKind::ApiRoutes { .. } ) } else { @@ -526,6 +528,7 @@ impl ServiceContextData { dep.config().kind, ServiceKind::Database { .. } | ServiceKind::Cache { .. } | ServiceKind::Operation { .. } + | ServiceKind::Operations { .. } ) }; diff --git a/lib/bolt/core/src/dep/k8s/gen.rs b/lib/bolt/core/src/dep/k8s/gen.rs index ae7501160..bcae9ae70 100644 --- a/lib/bolt/core/src/dep/k8s/gen.rs +++ b/lib/bolt/core/src/dep/k8s/gen.rs @@ -139,6 +139,7 @@ pub async fn gen_svc(exec_ctx: &ExecServiceContext) -> Vec { ServiceKind::Oneshot { .. } => SpecType::Job, ServiceKind::Periodic { .. } => SpecType::CronJob, ServiceKind::Operation { .. } + | ServiceKind::Operations { .. } | ServiceKind::Database { .. } | ServiceKind::Cache { .. } | ServiceKind::ApiRoutes { .. } => { diff --git a/lib/bolt/core/src/tasks/gen.rs b/lib/bolt/core/src/tasks/gen.rs index cf17ec311..90916b94a 100644 --- a/lib/bolt/core/src/tasks/gen.rs +++ b/lib/bolt/core/src/tasks/gen.rs @@ -129,23 +129,40 @@ async fn generate_root(path: &Path) { } } } - // Iterate through `ops` folder - let ops_path = pkg.path().join("ops"); - if fs::metadata(&ops_path).await.is_ok() { - let mut dir = fs::read_dir(ops_path).await.unwrap(); - while let Some(entry) = dir.next_entry().await.unwrap() { - if entry.metadata().await.unwrap().is_dir() { - workspace_members.push(format!( - r#""pkg/{pkg}/ops/{entry}""#, - pkg = pkg.file_name().into_string().unwrap(), - entry = entry.file_name().into_string().unwrap() - )); - // Remove services' Cargo.lock files in favor of the shared svc - // Cargo.toml - let _ = fs::remove_file(entry.path().join("Cargo.lock")).await; + // Check if service config exists + if fs::metadata(pkg.path().join("ops").join("Service.toml")) + .await + .is_ok() + { + workspace_members.push(format!( + r#""pkg/{pkg}/ops""#, + pkg = pkg.file_name().into_string().unwrap(), + )); - set_license(&entry.path().join("Cargo.toml")).await; + let _ = fs::remove_file(pkg.path().join("ops").join("Cargo.lock")).await; + + set_license(&pkg.path().join("ops").join("Cargo.toml")).await; + } + // Iterate through `ops` folder + else { + let ops_path = pkg.path().join("ops"); + if fs::metadata(&ops_path).await.is_ok() { + let mut dir = fs::read_dir(ops_path).await.unwrap(); + while let Some(entry) = dir.next_entry().await.unwrap() { + if entry.metadata().await.unwrap().is_dir() { + workspace_members.push(format!( + r#""pkg/{pkg}/ops/{entry}""#, + pkg = pkg.file_name().into_string().unwrap(), + entry = entry.file_name().into_string().unwrap() + )); + + // Remove services' Cargo.lock files in favor of the shared svc + // Cargo.toml + let _ = fs::remove_file(entry.path().join("Cargo.lock")).await; + + set_license(&entry.path().join("Cargo.toml")).await; + } } } } diff --git a/lib/bolt/core/src/tasks/template.rs b/lib/bolt/core/src/tasks/template.rs index b2676ff03..f456f88d6 100644 --- a/lib/bolt/core/src/tasks/template.rs +++ b/lib/bolt/core/src/tasks/template.rs @@ -75,6 +75,23 @@ pub async fn generate(ctx: &mut ProjectContext, opts: TemplateOpts) -> Result<() ); } + // Check for new operations service type + if matches!(template_type, TemplateType::Operation) + && fs::metadata( + base_path + .join("svc") + .join("pkg") + .join(&pkg_name) + .join("Service.toml"), + ) + .await + .is_ok() + { + bail!( + "Creating operations in new `operations` service type ({pkg_name}/ops) not yet supported.", + ); + } + // Touch types lib to force it to rebuild generated proto when making a new package if create_pkg { let lib_file = base_path diff --git a/lib/chirp-workflow/core/src/compat.rs b/lib/chirp-workflow/core/src/compat.rs index 1b313739c..f413cddc8 100644 --- a/lib/chirp-workflow/core/src/compat.rs +++ b/lib/chirp-workflow/core/src/compat.rs @@ -128,7 +128,7 @@ where ::Operation: Operation, B: Debug + Clone, { - let mut ctx = OperationCtx::new( + let ctx = OperationCtx::new( db_from_ctx(ctx).await?, ctx.conn(), ctx.ray_id(), @@ -137,7 +137,7 @@ where I::Operation::NAME, ); - I::Operation::run(&mut ctx, &input) + I::Operation::run(&ctx, &input) .await .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) diff --git a/lib/chirp-workflow/core/src/ctx/activity.rs b/lib/chirp-workflow/core/src/ctx/activity.rs index 1aef06e64..c925bb7ee 100644 --- a/lib/chirp-workflow/core/src/ctx/activity.rs +++ b/lib/chirp-workflow/core/src/ctx/activity.rs @@ -4,6 +4,7 @@ use uuid::Uuid; use crate::{ctx::OperationCtx, DatabaseHandle, Operation, OperationInput, WorkflowError}; +#[derive(Clone)] pub struct ActivityCtx { ray_id: Uuid, name: &'static str, @@ -60,7 +61,7 @@ impl ActivityCtx { I: OperationInput, ::Operation: Operation, { - let mut ctx = OperationCtx::new( + let ctx = OperationCtx::new( self.db.clone(), &self.conn, self.ray_id, @@ -69,7 +70,7 @@ impl ActivityCtx { I::Operation::NAME, ); - tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&mut ctx, &input)) + tokio::time::timeout(I::Operation::TIMEOUT, I::Operation::run(&ctx, &input)) .await .map_err(|_| WorkflowError::OperationTimeout)? .map_err(WorkflowError::OperationFailure) diff --git a/lib/chirp-workflow/core/src/ctx/api.rs b/lib/chirp-workflow/core/src/ctx/api.rs index 6580ce1f9..cb99d82d9 100644 --- a/lib/chirp-workflow/core/src/ctx/api.rs +++ b/lib/chirp-workflow/core/src/ctx/api.rs @@ -152,14 +152,14 @@ impl ApiCtx { } pub async fn op( - &mut self, + &self, input: I, ) -> GlobalResult<<::Operation as Operation>::Output> where I: OperationInput, ::Operation: Operation, { - let mut ctx = OperationCtx::new( + let ctx = OperationCtx::new( self.db.clone(), &self.conn, self.ray_id, @@ -168,7 +168,7 @@ impl ApiCtx { I::Operation::NAME, ); - I::Operation::run(&mut ctx, &input) + I::Operation::run(&ctx, &input) .await .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) diff --git a/lib/chirp-workflow/core/src/ctx/operation.rs b/lib/chirp-workflow/core/src/ctx/operation.rs index e0cb6d04d..702eba23b 100644 --- a/lib/chirp-workflow/core/src/ctx/operation.rs +++ b/lib/chirp-workflow/core/src/ctx/operation.rs @@ -54,14 +54,14 @@ impl OperationCtx { impl OperationCtx { pub async fn op( - &mut self, + &self, input: I, ) -> GlobalResult<<::Operation as Operation>::Output> where I: OperationInput, ::Operation: Operation, { - let mut ctx = OperationCtx::new( + let ctx = OperationCtx::new( self.db.clone(), &self.conn, self.ray_id, @@ -70,7 +70,7 @@ impl OperationCtx { I::Operation::NAME, ); - I::Operation::run(&mut ctx, &input) + I::Operation::run(&ctx, &input) .await .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) diff --git a/lib/chirp-workflow/core/src/ctx/test.rs b/lib/chirp-workflow/core/src/ctx/test.rs index 351b92af9..eec38775f 100644 --- a/lib/chirp-workflow/core/src/ctx/test.rs +++ b/lib/chirp-workflow/core/src/ctx/test.rs @@ -149,14 +149,14 @@ impl TestCtx { } pub async fn op( - &mut self, + &self, input: I, ) -> GlobalResult<<::Operation as Operation>::Output> where I: OperationInput, ::Operation: Operation, { - let mut ctx = OperationCtx::new( + let ctx = OperationCtx::new( self.db.clone(), self.conn .as_ref() @@ -167,7 +167,7 @@ impl TestCtx { I::Operation::NAME, ); - I::Operation::run(&mut ctx, &input) + I::Operation::run(&ctx, &input) .await .map_err(WorkflowError::OperationFailure) .map_err(GlobalError::raw) diff --git a/lib/chirp-workflow/core/src/operation.rs b/lib/chirp-workflow/core/src/operation.rs index 4eb1cdc49..17a447e8b 100644 --- a/lib/chirp-workflow/core/src/operation.rs +++ b/lib/chirp-workflow/core/src/operation.rs @@ -11,7 +11,7 @@ pub trait Operation { const NAME: &'static str; const TIMEOUT: std::time::Duration; - async fn run(ctx: &mut OperationCtx, input: &Self::Input) -> GlobalResult; + async fn run(ctx: &OperationCtx, input: &Self::Input) -> GlobalResult; } pub trait OperationInput: Send { diff --git a/lib/chirp-workflow/macros/src/lib.rs b/lib/chirp-workflow/macros/src/lib.rs index 801f9acd7..c07dd5908 100644 --- a/lib/chirp-workflow/macros/src/lib.rs +++ b/lib/chirp-workflow/macros/src/lib.rs @@ -1,6 +1,7 @@ use proc_macro::TokenStream; use quote::{quote, ToTokens}; use syn::{ + parse::{Parse, ParseStream}, parse_macro_input, spanned::Spanned, GenericArgument, Ident, ItemFn, ItemStruct, LitStr, PathArguments, ReturnType, Type, }; @@ -21,7 +22,7 @@ impl Default for Config { #[proc_macro_attribute] pub fn workflow(attr: TokenStream, item: TokenStream) -> TokenStream { - let name = parse_macro_input!(attr as Ident).to_string(); + let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Workflow".to_string()); let item_fn = parse_macro_input!(item as ItemFn); if let Err(err) = parse_empty_config(&item_fn.attrs) { @@ -39,9 +40,10 @@ pub fn workflow(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); let fn_body = item_fn.block; + let vis = item_fn.vis; let expanded = quote! { - pub struct #struct_ident; + #vis struct #struct_ident; impl chirp_workflow::workflow::WorkflowInput for #input_type { type Workflow = #struct_ident; @@ -86,12 +88,13 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); let fn_body = item_fn.block; + let vis = item_fn.vis; let max_retries = config.max_retries; - let timeout = config.timeout * 1000; + let timeout = config.timeout; let expanded = quote! { - pub struct #struct_ident; + #vis struct #struct_ident; impl chirp_workflow::activity::ActivityInput for #input_type { type Activity = #struct_ident; @@ -116,7 +119,7 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { const NAME: &'static str = #fn_name; const MAX_RETRIES: u32 = #max_retries; - const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(#timeout); + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(#timeout); async fn run(#ctx_ident: #ctx_ty, #input_ident: &Self::Input) -> GlobalResult { #fn_body @@ -131,7 +134,7 @@ pub fn activity(attr: TokenStream, item: TokenStream) -> TokenStream { #[proc_macro_attribute] pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { - let name = parse_macro_input!(attr as Ident).to_string(); + let name = parse_macro_input!(attr as OptionalIdent).ident.map(|x| x.to_string()).unwrap_or_else(|| "Operation".to_string()); let item_fn = parse_macro_input!(item as ItemFn); let config = match parse_config(&item_fn.attrs) { @@ -139,7 +142,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { Err(err) => return err.into_compile_error().into(), }; - let ctx_ty = syn::parse_str("&mut OperationCtx").unwrap(); + let ctx_ty = syn::parse_str("&OperationCtx").unwrap(); let TraitFnOutput { ctx_ident, input_ident, @@ -150,13 +153,14 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { let struct_ident = Ident::new(&name, proc_macro2::Span::call_site()); let fn_name = item_fn.sig.ident.to_string(); let fn_body = item_fn.block; + let vis = item_fn.vis; - let timeout = config.timeout * 1000; + let timeout = config.timeout; let expanded = quote! { - pub struct #struct_ident; + #vis struct #struct_ident; - impl chirp_workflow::workflow::OperationInput for #input_type { + impl chirp_workflow::operation::OperationInput for #input_type { type Operation = #struct_ident; } @@ -166,7 +170,7 @@ pub fn operation(attr: TokenStream, item: TokenStream) -> TokenStream { type Output = #output_type; const NAME: &'static str = #fn_name; - const TIMEOUT: std::time::Duration = std::time::Duration::from_millis(#timeout); + const TIMEOUT: std::time::Duration = std::time::Duration::from_secs(#timeout); async fn run(#ctx_ident: #ctx_ty, #input_ident: &Self::Input) -> GlobalResult { #fn_body @@ -183,10 +187,15 @@ struct TraitFnOutput { ctx_ident: syn::Ident, input_ident: syn::Ident, input_type: Box, - output_type: syn::Ident, + output_type: syn::Type, } fn parse_trait_fn(ctx_ty: &syn::Type, trait_name: &str, item_fn: &syn::ItemFn) -> TraitFnOutput { + // Check if is async + if item_fn.sig.asyncness.is_none() { + panic!("the async keyword is missing from the function declaration"); + } + let mut arg_names = vec![]; let mut arg_types = vec![]; @@ -209,7 +218,7 @@ fn parse_trait_fn(ctx_ty: &syn::Type, trait_name: &str, item_fn: &syn::ItemFn) - panic!( "{} function must have exactly two parameters: ctx: {:?} and input: YourInputType", trait_name, - ctx_ty.to_token_stream().to_string() + ctx_ty.to_token_stream().to_string(), ); } @@ -226,9 +235,9 @@ fn parse_trait_fn(ctx_ty: &syn::Type, trait_name: &str, item_fn: &syn::ItemFn) - if segment.ident == "GlobalResult" { match &segment.arguments { PathArguments::AngleBracketed(args) => { - if let Some(GenericArgument::Type(Type::Path(path))) = args.args.first() + if let Some(GenericArgument::Type(ty)) = args.args.first() { - path.path.segments.last().unwrap().ident.clone() + ty.clone() } else { panic!("Unsupported Result type"); } @@ -394,3 +403,18 @@ fn parse_empty_config(attrs: &[syn::Attribute]) -> syn::Result<()> { Ok(()) } + +struct OptionalIdent { + ident: Option, +} + +impl Parse for OptionalIdent { + fn parse(input: ParseStream) -> syn::Result { + if input.is_empty() { + Ok(OptionalIdent { ident: None }) + } else { + let ident: Ident = input.parse()?; + Ok(OptionalIdent { ident: Some(ident) }) + } + } +} diff --git a/svc/pkg/foo/worker/src/lib.rs b/svc/pkg/foo/worker/src/lib.rs index 06fcc19da..70a876322 100644 --- a/svc/pkg/foo/worker/src/lib.rs +++ b/svc/pkg/foo/worker/src/lib.rs @@ -4,7 +4,7 @@ pub mod workflows; pub fn registry() -> Registry { let mut registry = Registry::new(); - registry.register_workflow::(); + registry.register_workflow::(); registry } diff --git a/svc/pkg/foo/worker/src/workflows/mod.rs b/svc/pkg/foo/worker/src/workflows/mod.rs index ddf32b235..7b788c207 100644 --- a/svc/pkg/foo/worker/src/workflows/mod.rs +++ b/svc/pkg/foo/worker/src/workflows/mod.rs @@ -1,2 +1 @@ pub mod test; -pub use test::*; diff --git a/svc/pkg/foo/worker/src/workflows/test.rs b/svc/pkg/foo/worker/src/workflows/test.rs index 480f99e3a..6039859a2 100644 --- a/svc/pkg/foo/worker/src/workflows/test.rs +++ b/svc/pkg/foo/worker/src/workflows/test.rs @@ -17,22 +17,22 @@ pub struct TestOutputErr { } #[workflow(Test)] -async fn test(ctx: &mut WorkflowCtx, input: &TestInput) -> GlobalResult { +pub async fn test(ctx: &mut WorkflowCtx, input: &TestInput) -> GlobalResult { let a = ctx.activity(FooInput {}).await?; Ok(Ok(TestOutputOk { y: a.ids.len() })) } #[derive(Debug, Serialize, Deserialize, Hash)] -pub struct FooInput {} +struct FooInput {} #[derive(Debug, Serialize, Deserialize, Hash)] -pub struct FooOutput { +struct FooOutput { ids: Vec, } #[activity(Foo)] -pub fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult { +async fn foo(ctx: &ActivityCtx, input: &FooInput) -> GlobalResult { let ids = sql_fetch_all!( [ctx, (Uuid,)] " diff --git a/svc/pkg/foo/worker/tests/test.rs b/svc/pkg/foo/worker/tests/test.rs index 0921cd273..3da6bd331 100644 --- a/svc/pkg/foo/worker/tests/test.rs +++ b/svc/pkg/foo/worker/tests/test.rs @@ -3,7 +3,7 @@ use chirp_workflow::prelude::*; #[workflow_test] async fn empty(ctx: TestCtx) { let res = ctx - .workflow(foo_worker::workflows::TestInput { x: 12 }) + .workflow(foo_worker::workflows::test::TestInput { x: 12 }) .await .unwrap(); diff --git a/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs b/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs index 83c5fbc61..e4dda933d 100644 --- a/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs +++ b/svc/pkg/monolith/standalone/workflow-worker/src/lib.rs @@ -8,12 +8,6 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> { let worker = Worker::new(reg.handle(), db.clone()); // Start worker - match worker.start(pools).await { - Ok(_) => { - bail!("worker exited unexpectedly") - } - Err(err) => { - return Err(err); - } - } + worker.start(pools).await?; + bail!("worker exited unexpectedly"); }