diff --git a/Cargo.toml b/Cargo.toml index 4835310f5..f3bcacd8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,19 +6,32 @@ edition = "2018" [lib] +# TODO: Feature flag opentelem stuff [dependencies] +anyhow = "1.0" async-trait = "0.1" derive_more = "0.99" -prost = "0.6" -prost-types = "0.6" +env_logger = "0.8" +futures = "0.3" +log = "0.4" +opentelemetry-jaeger = "0.10" +prost = "0.7" +prost-types = "0.7" thiserror = "1.0" -tonic = "0.3" +tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } +tonic = "0.4" +tracing = { version = "0.1", features = ["log"] } +tracing-opentelemetry = "0.10" +tracing-subscriber = "0.2" [dependencies.rustfsm] path = "fsm" +[dev-dependencies] +rstest = "0.6" + [build-dependencies] -tonic-build = "0.3" +tonic-build = "0.4" [workspace] members = [".", "fsm"] diff --git a/README.md b/README.md index fff1f3818..cc352b7ba 100644 --- a/README.md +++ b/README.md @@ -26,3 +26,12 @@ To format all code run: We are using [clippy](https://github.com/rust-lang/rust-clippy) for linting. You can run it using: `cargo clippy --all -- -D warnings` + +## Style Guidelines + +### Error handling +Any error which is returned from a public interface should be well-typed, and we use +[thiserror](https://github.com/dtolnay/thiserror) for that purpose. + +Errors returned from things only used in testing are free to use +[anyhow](https://github.com/dtolnay/anyhow) for less verbosity. diff --git a/build.rs b/build.rs index f51c6abe5..282ce147a 100644 --- a/build.rs +++ b/build.rs @@ -5,6 +5,15 @@ fn main() -> Result<(), Box> { .build_server(false) .build_client(true) .out_dir("src/protos") + // Make conversions easier for some types + .type_attribute( + "temporal.api.history.v1.HistoryEvent.attributes", + "#[derive(::derive_more::From)]", + ) + .type_attribute( + "temporal.api.command.v1.Command.attributes", + "#[derive(::derive_more::From)]", + ) .compile( &[ "protos/local/core_interface.proto", diff --git a/fsm/src/lib.rs b/fsm/src/lib.rs index 9f1f43fa8..a41113ac5 100644 --- a/fsm/src/lib.rs +++ b/fsm/src/lib.rs @@ -1,2 +1,2 @@ pub use state_machine_procmacro::fsm; -pub use state_machine_trait::{StateMachine, TransitionResult}; +pub use state_machine_trait::{MachineError, StateMachine, TransitionResult}; diff --git a/fsm/state_machine_procmacro/src/lib.rs b/fsm/state_machine_procmacro/src/lib.rs index 7dc3284b8..6a5056bb5 100644 --- a/fsm/state_machine_procmacro/src/lib.rs +++ b/fsm/state_machine_procmacro/src/lib.rs @@ -9,7 +9,7 @@ use syn::{ parse_macro_input, punctuated::Punctuated, spanned::Spanned, - Error, Fields, Ident, Token, Type, Variant, + Error, Fields, Ident, Token, Type, Variant, Visibility, }; /// Parses a DSL for defining finite state machines, and produces code implementing the @@ -176,6 +176,7 @@ mod kw { } struct StateMachineDefinition { + visibility: Visibility, name: Ident, shared_state_type: Option, command_type: Ident, @@ -183,10 +184,19 @@ struct StateMachineDefinition { transitions: HashSet, } +impl StateMachineDefinition { + fn is_final_state(&self, state: &Ident) -> bool { + // If no transitions go from this state, it's a final state. + self.transitions.iter().find(|t| t.from == *state).is_none() + } +} + impl Parse for StateMachineDefinition { // TODO: Pub keyword fn parse(input: ParseStream) -> Result { - // First parse the state machine name, command type, and error type + // Parse visibility if present + let visibility = input.parse()?; + // parse the state machine name, command type, and error type let (name, command_type, error_type, shared_state_type) = parse_machine_types(&input).map_err(|mut e| { e.combine(Error::new( e.span(), @@ -200,6 +210,7 @@ impl Parse for StateMachineDefinition { input.parse_terminated(Transition::parse)?; let transitions = transitions.into_iter().collect(); Ok(Self { + visibility, name, shared_state_type, transitions, @@ -316,6 +327,7 @@ impl Parse for Transition { impl StateMachineDefinition { fn codegen(&self) -> TokenStream { + let visibility = self.visibility.clone(); // First extract all of the states into a set, and build the enum's insides let states: HashSet<_> = self .transitions @@ -328,6 +340,7 @@ impl StateMachineDefinition { } }); let name = &self.name; + let name_str = &self.name.to_string(); let state_enum_name = Ident::new(&format!("{}State", name), name.span()); // If user has not defined any shared state, use the unit type. let shared_state_type = self @@ -336,24 +349,41 @@ impl StateMachineDefinition { .unwrap_or_else(|| syn::parse_str("()").unwrap()); let machine_struct = quote! { #[derive(Clone)] - pub struct #name { + #visibility struct #name { state: #state_enum_name, shared_state: #shared_state_type } }; let states_enum = quote! { #[derive(::derive_more::From, Clone)] - pub enum #state_enum_name { + #visibility enum #state_enum_name { #(#state_variants),* } }; + let state_is_final_match_arms = states.iter().map(|s| { + let val = if self.is_final_state(s) { + quote! { true } + } else { + quote! { false } + }; + quote! { #state_enum_name::#s(_) => #val } + }); + let states_enum_impl = quote! { + impl #state_enum_name { + fn is_final(&self) -> bool { + match self { + #(#state_is_final_match_arms),* + } + } + } + }; // Build the events enum let events: HashSet = self.transitions.iter().map(|t| t.event.clone()).collect(); let events_enum_name = Ident::new(&format!("{}Events", name), name.span()); let events: Vec<_> = events.into_iter().collect(); let events_enum = quote! { - pub enum #events_enum_name { + #visibility enum #events_enum_name { #(#events),* } }; @@ -451,6 +481,10 @@ impl StateMachineDefinition { type Event = #events_enum_name; type Command = #cmd_type; + fn name(&self) -> &str { + #name_str + } + fn on_event(self, event: #events_enum_name) -> ::rustfsm::TransitionResult { match self.state { @@ -469,6 +503,10 @@ impl StateMachineDefinition { &self.shared_state } + fn on_final_state(&self) -> bool { + self.state.is_final() + } + fn from_parts(shared: Self::SharedState, state: Self::State) -> Self { Self { shared_state: shared, state } } @@ -484,6 +522,7 @@ impl StateMachineDefinition { #transition_type_alias #machine_struct #states_enum + #states_enum_impl #events_enum #trait_impl }; diff --git a/fsm/state_machine_trait/src/lib.rs b/fsm/state_machine_trait/src/lib.rs index a205ce3d1..364f7e5c9 100644 --- a/fsm/state_machine_trait/src/lib.rs +++ b/fsm/state_machine_trait/src/lib.rs @@ -56,11 +56,18 @@ pub trait StateMachine: Sized { } } + fn name(&self) -> &str; + + /// Returns the current state of the machine fn state(&self) -> &Self::State; fn set_state(&mut self, new_state: Self::State); + /// Returns the current shared state of the machine fn shared_state(&self) -> &Self::SharedState; + /// Returns true if the machine's current state is a final one + fn on_final_state(&self) -> bool; + /// Given the shared data and new state, create a new instance. fn from_parts(shared: Self::SharedState, state: Self::State) -> Self; } @@ -160,6 +167,18 @@ where } } + /// Produce a transition with commands relying on [Default] for the destination state's value + pub fn commands(commands: CI) -> Self + where + CI: IntoIterator, + DestState: Into + Default, + { + Self::OkNoShare { + commands: commands.into_iter().collect(), + new_state: DestState::default().into(), + } + } + /// Produce a transition with no commands relying on [Default] for the destination state's /// value pub fn default() -> Self diff --git a/src/lib.rs b/src/lib.rs index cd0c9ea44..4267ccf6e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,6 @@ +#[macro_use] +extern crate tracing; + mod machines; mod pollers; pub mod protos; diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 66ac7aabd..23efd857d 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -3,7 +3,7 @@ use rustfsm::{fsm, TransitionResult}; // Schedule / cancel are "explicit events" (imperative rather than past events?) fsm! { - name ActivityMachine; command ActivityCommand; error ActivityMachineError; + pub(super) name ActivityMachine; command ActivityCommand; error ActivityMachineError; Created --(Schedule, on_schedule)--> ScheduleCommandCreated; @@ -50,94 +50,94 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum ActivityMachineError {} +pub(super) enum ActivityMachineError {} -pub enum ActivityCommand {} +pub(super) enum ActivityCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> ActivityMachineTransition { + pub(super) fn on_schedule(self) -> ActivityMachineTransition { // would add command here ActivityMachineTransition::default::() } } #[derive(Default, Clone)] -pub struct ScheduleCommandCreated {} +pub(super) struct ScheduleCommandCreated {} impl ScheduleCommandCreated { - pub fn on_activity_task_scheduled(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_scheduled(self) -> ActivityMachineTransition { // set initial command event id // this.initialCommandEventId = currentEvent.getEventId(); ActivityMachineTransition::default::() } - pub fn on_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_canceled(self) -> ActivityMachineTransition { // cancelCommandNotifyCanceled ActivityMachineTransition::default::() } } #[derive(Default, Clone)] -pub struct ScheduledEventRecorded {} +pub(super) struct ScheduledEventRecorded {} impl ScheduledEventRecorded { - pub fn on_task_started(self) -> ActivityMachineTransition { + pub(super) fn on_task_started(self) -> ActivityMachineTransition { // setStartedCommandEventId ActivityMachineTransition::default::() } - pub fn on_task_timed_out(self) -> ActivityMachineTransition { + pub(super) fn on_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out ActivityMachineTransition::default::() } - pub fn on_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_canceled(self) -> ActivityMachineTransition { // createRequestCancelActivityTaskCommand ActivityMachineTransition::default::() } } #[derive(Default, Clone)] -pub struct Started {} +pub(super) struct Started {} impl Started { - pub fn on_activity_task_completed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_completed(self) -> ActivityMachineTransition { // notify_completed ActivityMachineTransition::default::() } - pub fn on_activity_task_failed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_failed(self) -> ActivityMachineTransition { // notify_failed ActivityMachineTransition::default::() } - pub fn on_activity_task_timed_out(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out ActivityMachineTransition::default::() } - pub fn on_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_canceled(self) -> ActivityMachineTransition { // createRequestCancelActivityTaskCommand ActivityMachineTransition::default::() } } #[derive(Default, Clone)] -pub struct ScheduledActivityCancelCommandCreated {} +pub(super) struct ScheduledActivityCancelCommandCreated {} impl ScheduledActivityCancelCommandCreated { - pub fn on_command_request_cancel_activity_task(self) -> ActivityMachineTransition { + pub(super) fn on_command_request_cancel_activity_task(self) -> ActivityMachineTransition { // notifyCanceledIfTryCancel ActivityMachineTransition::default::() } } #[derive(Default, Clone)] -pub struct ScheduledActivityCancelEventRecorded {} +pub(super) struct ScheduledActivityCancelEventRecorded {} impl ScheduledActivityCancelEventRecorded { - pub fn on_activity_task_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_canceled(self) -> ActivityMachineTransition { // notify_canceled ActivityMachineTransition::default::() } - pub fn on_activity_task_timed_out(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out ActivityMachineTransition::default::() } @@ -150,32 +150,32 @@ impl From for ScheduledActivityCancelEven } #[derive(Default, Clone)] -pub struct StartedActivityCancelCommandCreated {} +pub(super) struct StartedActivityCancelCommandCreated {} impl StartedActivityCancelCommandCreated { - pub fn on_activity_task_cancel_requested(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_cancel_requested(self) -> ActivityMachineTransition { // notifyCanceledIfTryCancel ActivityMachineTransition::default::() } } #[derive(Default, Clone)] -pub struct StartedActivityCancelEventRecorded {} +pub(super) struct StartedActivityCancelEventRecorded {} impl StartedActivityCancelEventRecorded { - pub fn on_activity_task_completed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_completed(self) -> ActivityMachineTransition { // notify_completed ActivityMachineTransition::default::() } - pub fn on_activity_task_failed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_failed(self) -> ActivityMachineTransition { // notify_failed ActivityMachineTransition::default::() } - pub fn on_activity_task_timed_out(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out ActivityMachineTransition::default::() } - pub fn on_activity_task_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_canceled(self) -> ActivityMachineTransition { // notifyCancellationFromEvent ActivityMachineTransition::default::() } @@ -188,16 +188,16 @@ impl From for StartedActivityCancelEventRe } #[derive(Default, Clone)] -pub struct Completed {} +pub(super) struct Completed {} #[derive(Default, Clone)] -pub struct Failed {} +pub(super) struct Failed {} #[derive(Default, Clone)] -pub struct TimedOut {} +pub(super) struct TimedOut {} #[derive(Default, Clone)] -pub struct Canceled {} +pub(super) struct Canceled {} #[cfg(test)] mod activity_machine_tests { diff --git a/src/machines/cancel_external_state_machine.rs b/src/machines/cancel_external_state_machine.rs index 45cfc48f6..8c12ccfae 100644 --- a/src/machines/cancel_external_state_machine.rs +++ b/src/machines/cancel_external_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name CancelExternalMachine; command CancelExternalCommand; error CancelExternalMachineError; + pub(super) name CancelExternalMachine; command CancelExternalCommand; error CancelExternalMachineError; Created --(Schedule, on_schedule) --> RequestCancelExternalCommandCreated; @@ -13,27 +13,27 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum CancelExternalMachineError {} +pub(super) enum CancelExternalMachineError {} -pub enum CancelExternalCommand {} +pub(super) enum CancelExternalCommand {} #[derive(Default, Clone)] -pub struct CancelRequested {} +pub(super) struct CancelRequested {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> CancelExternalMachineTransition { + pub(super) fn on_schedule(self) -> CancelExternalMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct RequestCancelExternalCommandCreated {} +pub(super) struct RequestCancelExternalCommandCreated {} impl RequestCancelExternalCommandCreated { - pub fn on_request_cancel_external_workflow_execution_initiated( + pub(super) fn on_request_cancel_external_workflow_execution_initiated( self, ) -> CancelExternalMachineTransition { unimplemented!() @@ -41,15 +41,15 @@ impl RequestCancelExternalCommandCreated { } #[derive(Default, Clone)] -pub struct RequestCancelExternalCommandRecorded {} +pub(super) struct RequestCancelExternalCommandRecorded {} impl RequestCancelExternalCommandRecorded { - pub fn on_external_workflow_execution_cancel_requested( + pub(super) fn on_external_workflow_execution_cancel_requested( self, ) -> CancelExternalMachineTransition { unimplemented!() } - pub fn on_request_cancel_external_workflow_execution_failed( + pub(super) fn on_request_cancel_external_workflow_execution_failed( self, ) -> CancelExternalMachineTransition { unimplemented!() @@ -57,4 +57,4 @@ impl RequestCancelExternalCommandRecorded { } #[derive(Default, Clone)] -pub struct RequestCancelFailed {} +pub(super) struct RequestCancelFailed {} diff --git a/src/machines/cancel_workflow_state_machine.rs b/src/machines/cancel_workflow_state_machine.rs index e60175615..d28a90429 100644 --- a/src/machines/cancel_workflow_state_machine.rs +++ b/src/machines/cancel_workflow_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name CancelWorkflowMachine; command CancelWorkflowCommand; error CancelWorkflowMachineError; + pub(super) name CancelWorkflowMachine; command CancelWorkflowCommand; error CancelWorkflowMachineError; CancelWorkflowCommandCreated --(CommandCancelWorkflowExecution) --> CancelWorkflowCommandCreated; CancelWorkflowCommandCreated --(WorkflowExecutionCanceled, on_workflow_execution_canceled) --> CancelWorkflowCommandRecorded; @@ -10,27 +10,27 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum CancelWorkflowMachineError {} +pub(super) enum CancelWorkflowMachineError {} -pub enum CancelWorkflowCommand {} +pub(super) enum CancelWorkflowCommand {} #[derive(Default, Clone)] -pub struct CancelWorkflowCommandCreated {} +pub(super) struct CancelWorkflowCommandCreated {} impl CancelWorkflowCommandCreated { - pub fn on_workflow_execution_canceled(self) -> CancelWorkflowMachineTransition { + pub(super) fn on_workflow_execution_canceled(self) -> CancelWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct CancelWorkflowCommandRecorded {} +pub(super) struct CancelWorkflowCommandRecorded {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> CancelWorkflowMachineTransition { + pub(super) fn on_schedule(self) -> CancelWorkflowMachineTransition { unimplemented!() } } diff --git a/src/machines/child_workflow_state_machine.rs b/src/machines/child_workflow_state_machine.rs index fc5e30ad6..026820e96 100644 --- a/src/machines/child_workflow_state_machine.rs +++ b/src/machines/child_workflow_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name ChildWorkflowMachine; command ChildWorkflowCommand; error ChildWorkflowMachineError; + pub(super) name ChildWorkflowMachine; command ChildWorkflowCommand; error ChildWorkflowMachineError; Created --(Schedule, on_schedule) --> StartCommandCreated; @@ -20,78 +20,80 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum ChildWorkflowMachineError {} +pub(super) enum ChildWorkflowMachineError {} -pub enum ChildWorkflowCommand {} +pub(super) enum ChildWorkflowCommand {} #[derive(Default, Clone)] -pub struct Canceled {} +pub(super) struct Canceled {} #[derive(Default, Clone)] -pub struct Completed {} +pub(super) struct Completed {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_schedule(self) -> ChildWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Failed {} +pub(super) struct Failed {} #[derive(Default, Clone)] -pub struct StartCommandCreated {} +pub(super) struct StartCommandCreated {} impl StartCommandCreated { - pub fn on_start_child_workflow_execution_initiated(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_start_child_workflow_execution_initiated( + self, + ) -> ChildWorkflowMachineTransition { unimplemented!() } - pub fn on_cancel(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_cancel(self) -> ChildWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct StartEventRecorded {} +pub(super) struct StartEventRecorded {} impl StartEventRecorded { - pub fn on_child_workflow_execution_started(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_child_workflow_execution_started(self) -> ChildWorkflowMachineTransition { unimplemented!() } - pub fn on_start_child_workflow_execution_failed(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_start_child_workflow_execution_failed(self) -> ChildWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct StartFailed {} +pub(super) struct StartFailed {} #[derive(Default, Clone)] -pub struct Started {} +pub(super) struct Started {} impl Started { - pub fn on_child_workflow_execution_completed(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_child_workflow_execution_completed(self) -> ChildWorkflowMachineTransition { unimplemented!() } - pub fn on_child_workflow_execution_failed(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_child_workflow_execution_failed(self) -> ChildWorkflowMachineTransition { unimplemented!() } - pub fn on_child_workflow_execution_timed_out(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_child_workflow_execution_timed_out(self) -> ChildWorkflowMachineTransition { unimplemented!() } - pub fn on_child_workflow_execution_canceled(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_child_workflow_execution_canceled(self) -> ChildWorkflowMachineTransition { unimplemented!() } - pub fn on_child_workflow_execution_terminated(self) -> ChildWorkflowMachineTransition { + pub(super) fn on_child_workflow_execution_terminated(self) -> ChildWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Terminated {} +pub(super) struct Terminated {} #[derive(Default, Clone)] -pub struct TimedOut {} +pub(super) struct TimedOut {} diff --git a/src/machines/complete_workflow_state_machine.rs b/src/machines/complete_workflow_state_machine.rs index d1cbe455a..e6f50f51b 100644 --- a/src/machines/complete_workflow_state_machine.rs +++ b/src/machines/complete_workflow_state_machine.rs @@ -1,36 +1,133 @@ -use rustfsm::{fsm, TransitionResult}; +use crate::{ + machines::{ + workflow_machines::WorkflowMachines, AddCommand, CancellableCommand, WFCommand, + WFMachinesAdapter, WFMachinesError, + }, + protos::temporal::api::{ + command::v1::{Command, CompleteWorkflowExecutionCommandAttributes}, + enums::v1::{CommandType, EventType}, + history::v1::HistoryEvent, + }, +}; +use rustfsm::{fsm, StateMachine, TransitionResult}; +use std::cell::RefCell; +use std::{convert::TryFrom, rc::Rc}; fsm! { - name CompleteWorkflowMachine; command CompleteWorkflowCommand; error CompleteWorkflowMachineError; + pub(super) + name CompleteWorkflowMachine; + command CompleteWFCommand; + error WFMachinesError; + shared_state CompleteWorkflowExecutionCommandAttributes; - CompleteWorkflowCommandCreated --(CommandCompleteWorkflowExecution) --> CompleteWorkflowCommandCreated; - CompleteWorkflowCommandCreated --(WorkflowExecutionCompleted, on_workflow_execution_completed) --> CompleteWorkflowCommandRecorded; + Created --(Schedule, shared on_schedule) --> CompleteWorkflowCommandCreated; - Created --(Schedule, on_schedule) --> CompleteWorkflowCommandCreated; + CompleteWorkflowCommandCreated --(CommandCompleteWorkflowExecution) + --> CompleteWorkflowCommandCreated; + CompleteWorkflowCommandCreated --(WorkflowExecutionCompleted) + --> CompleteWorkflowCommandRecorded; } -#[derive(thiserror::Error, Debug)] -pub enum CompleteWorkflowMachineError {} +#[derive(Debug)] +pub(super) enum CompleteWFCommand { + AddCommand(AddCommand), +} + +/// Complete a workflow +pub(super) fn complete_workflow( + attribs: CompleteWorkflowExecutionCommandAttributes, +) -> CancellableCommand { + let (machine, add_cmd) = CompleteWorkflowMachine::new_scheduled(attribs); + CancellableCommand::Active { + command: add_cmd.command, + machine: Rc::new(RefCell::new(machine)), + } +} + +impl CompleteWorkflowMachine { + /// Create a new WF machine and schedule it + pub(crate) fn new_scheduled( + attribs: CompleteWorkflowExecutionCommandAttributes, + ) -> (Self, AddCommand) { + let mut s = Self { + state: Created {}.into(), + shared_state: attribs, + }; + let cmd = match s + .on_event_mut(CompleteWorkflowMachineEvents::Schedule) + .expect("Scheduling timers doesn't fail") + .pop() + { + Some(CompleteWFCommand::AddCommand(c)) => c, + _ => panic!("Timer on_schedule must produce command"), + }; + (s, cmd) + } +} + +impl TryFrom for CompleteWorkflowMachineEvents { + type Error = (); + + fn try_from(e: HistoryEvent) -> Result { + Ok(match EventType::from_i32(e.event_type) { + Some(EventType::WorkflowExecutionCompleted) => Self::WorkflowExecutionCompleted, + _ => return Err(()), + }) + } +} + +impl TryFrom for CompleteWorkflowMachineEvents { + type Error = (); -pub enum CompleteWorkflowCommand {} + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::CompleteWorkflowExecution => Self::CommandCompleteWorkflowExecution, + _ => return Err(()), + }) + } +} #[derive(Default, Clone)] -pub struct CompleteWorkflowCommandCreated {} +pub(super) struct Created {} -impl CompleteWorkflowCommandCreated { - pub fn on_workflow_execution_completed(self) -> CompleteWorkflowMachineTransition { - unimplemented!() +impl Created { + pub(super) fn on_schedule( + self, + dat: CompleteWorkflowExecutionCommandAttributes, + ) -> CompleteWorkflowMachineTransition { + let cmd = Command { + command_type: CommandType::CompleteWorkflowExecution as i32, + attributes: Some(dat.into()), + }; + TransitionResult::commands::<_, CompleteWorkflowCommandCreated>(vec![ + CompleteWFCommand::AddCommand(cmd.into()), + ]) } } +#[derive(thiserror::Error, Debug)] +pub(super) enum CompleteWorkflowMachineError {} + #[derive(Default, Clone)] -pub struct CompleteWorkflowCommandRecorded {} +pub(super) struct CompleteWorkflowCommandCreated {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct CompleteWorkflowCommandRecorded {} -impl Created { - pub fn on_schedule(self) -> CompleteWorkflowMachineTransition { - unimplemented!() +impl From for CompleteWorkflowCommandRecorded { + fn from(_: CompleteWorkflowCommandCreated) -> Self { + Default::default() + } +} + +impl WFMachinesAdapter for CompleteWorkflowMachine { + fn adapt_response( + &self, + _wf_machines: &mut WorkflowMachines, + _event: &HistoryEvent, + _has_next_event: bool, + _my_command: CompleteWFCommand, + ) -> Result<(), WFMachinesError> { + Ok(()) } } diff --git a/src/machines/continue_as_new_workflow_state_machine.rs b/src/machines/continue_as_new_workflow_state_machine.rs index 4a45d15b2..ea23d8a57 100644 --- a/src/machines/continue_as_new_workflow_state_machine.rs +++ b/src/machines/continue_as_new_workflow_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name ContinueAsNewWorkflowMachine; command ContinueAsNewWorkflowCommand; error ContinueAsNewWorkflowMachineError; + pub(super) name ContinueAsNewWorkflowMachine; command ContinueAsNewWorkflowCommand; error ContinueAsNewWorkflowMachineError; ContinueAsNewWorkflowCommandCreated --(CommandContinueAsNewWorkflowExecution) --> ContinueAsNewWorkflowCommandCreated; ContinueAsNewWorkflowCommandCreated --(WorkflowExecutionContinuedAsNew, on_workflow_execution_continued_as_new) --> ContinueAsNewWorkflowCommandRecorded; @@ -10,27 +10,29 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum ContinueAsNewWorkflowMachineError {} +pub(super) enum ContinueAsNewWorkflowMachineError {} -pub enum ContinueAsNewWorkflowCommand {} +pub(super) enum ContinueAsNewWorkflowCommand {} #[derive(Default, Clone)] -pub struct ContinueAsNewWorkflowCommandCreated {} +pub(super) struct ContinueAsNewWorkflowCommandCreated {} impl ContinueAsNewWorkflowCommandCreated { - pub fn on_workflow_execution_continued_as_new(self) -> ContinueAsNewWorkflowMachineTransition { + pub(super) fn on_workflow_execution_continued_as_new( + self, + ) -> ContinueAsNewWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct ContinueAsNewWorkflowCommandRecorded {} +pub(super) struct ContinueAsNewWorkflowCommandRecorded {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> ContinueAsNewWorkflowMachineTransition { + pub(super) fn on_schedule(self) -> ContinueAsNewWorkflowMachineTransition { unimplemented!() } } diff --git a/src/machines/fail_workflow_state_machine.rs b/src/machines/fail_workflow_state_machine.rs index 872d9cacd..194a88cb8 100644 --- a/src/machines/fail_workflow_state_machine.rs +++ b/src/machines/fail_workflow_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name FailWorkflowMachine; command FailWorkflowCommand; error FailWorkflowMachineError; + pub(super) name FailWorkflowMachine; command FailWorkflowCommand; error FailWorkflowMachineError; Created --(Schedule, on_schedule) --> FailWorkflowCommandCreated; @@ -10,27 +10,27 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum FailWorkflowMachineError {} +pub(super) enum FailWorkflowMachineError {} -pub enum FailWorkflowCommand {} +pub(super) enum FailWorkflowCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> FailWorkflowMachineTransition { + pub(super) fn on_schedule(self) -> FailWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct FailWorkflowCommandCreated {} +pub(super) struct FailWorkflowCommandCreated {} impl FailWorkflowCommandCreated { - pub fn on_workflow_execution_failed(self) -> FailWorkflowMachineTransition { + pub(super) fn on_workflow_execution_failed(self) -> FailWorkflowMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct FailWorkflowCommandRecorded {} +pub(super) struct FailWorkflowCommandRecorded {} diff --git a/src/machines/local_activity_state_machine.rs b/src/machines/local_activity_state_machine.rs index d275ab49f..738de64fd 100644 --- a/src/machines/local_activity_state_machine.rs +++ b/src/machines/local_activity_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name LocalActivityMachine; command LocalActivityCommand; error LocalActivityMachineError; + pub(super) name LocalActivityMachine; command LocalActivityCommand; error LocalActivityMachineError; Created --(CheckExecutionState, on_check_execution_state) --> Replaying; Created --(CheckExecutionState, on_check_execution_state) --> Executing; @@ -24,51 +24,51 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum LocalActivityMachineError {} +pub(super) enum LocalActivityMachineError {} -pub enum LocalActivityCommand {} +pub(super) enum LocalActivityCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_check_execution_state(self) -> LocalActivityMachineTransition { + pub(super) fn on_check_execution_state(self) -> LocalActivityMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Executing {} +pub(super) struct Executing {} impl Executing { - pub fn on_schedule(self) -> LocalActivityMachineTransition { + pub(super) fn on_schedule(self) -> LocalActivityMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreated {} +pub(super) struct MarkerCommandCreated {} impl MarkerCommandCreated { - pub fn on_command_record_marker(self) -> LocalActivityMachineTransition { + pub(super) fn on_command_record_marker(self) -> LocalActivityMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandRecorded {} +pub(super) struct MarkerCommandRecorded {} #[derive(Default, Clone)] -pub struct Replaying {} +pub(super) struct Replaying {} #[derive(Default, Clone)] -pub struct RequestPrepared {} +pub(super) struct RequestPrepared {} #[derive(Default, Clone)] -pub struct RequestSent {} +pub(super) struct RequestSent {} impl RequestSent { - pub fn on_handle_result(self) -> LocalActivityMachineTransition { + pub(super) fn on_handle_result(self) -> LocalActivityMachineTransition { unimplemented!() } } @@ -80,22 +80,22 @@ impl From for RequestSent { } #[derive(Default, Clone)] -pub struct ResultNotified {} +pub(super) struct ResultNotified {} impl ResultNotified { - pub fn on_marker_recorded(self) -> LocalActivityMachineTransition { + pub(super) fn on_marker_recorded(self) -> LocalActivityMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct WaitingMarkerEvent {} +pub(super) struct WaitingMarkerEvent {} impl WaitingMarkerEvent { - pub fn on_marker_recorded(self) -> LocalActivityMachineTransition { + pub(super) fn on_marker_recorded(self) -> LocalActivityMachineTransition { unimplemented!() } - pub fn on_non_replay_workflow_task_started(self) -> LocalActivityMachineTransition { + pub(super) fn on_non_replay_workflow_task_started(self) -> LocalActivityMachineTransition { unimplemented!() } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 18392bee1..bd2a2f6fd 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -1,4 +1,5 @@ -use crate::protos::temporal::api::command::v1::Command; +#[allow(unused)] +mod workflow_machines; #[allow(unused)] mod activity_state_machine; @@ -34,23 +35,210 @@ mod workflow_task_state_machine; #[cfg(test)] mod test_help; -/// A command which can be cancelled +use crate::{ + machines::workflow_machines::{WFMachinesError, WorkflowMachines}, + protos::temporal::api::{ + command::v1::{ + Command, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + enums::v1::CommandType, + history::v1::{ + HistoryEvent, WorkflowExecutionCanceledEventAttributes, + WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, + }, + }, +}; +use prost::alloc::fmt::Formatter; +use rustfsm::{MachineError, StateMachine}; +use std::{ + cell::RefCell, + convert::{TryFrom, TryInto}, + fmt::Debug, + rc::Rc, + sync::{atomic::AtomicBool, Arc}, +}; +use tracing::Level; + +// TODO: May need to be our SDKWFCommand type +type MachineCommand = Command; + +/// Implementors of this trait represent something that can (eventually) call into a workflow to +/// drive it, start it, signal it, cancel it, etc. +trait DrivenWorkflow { + /// Start the workflow + fn start( + &self, + attribs: WorkflowExecutionStartedEventAttributes, + ) -> Result, anyhow::Error>; + + /// Iterate the workflow. The workflow driver should execute workflow code until there is + /// nothing left to do. EX: Awaiting an activity/timer, workflow completion. + fn iterate_wf(&self) -> Result, anyhow::Error>; + + /// Signal the workflow + fn signal( + &self, + attribs: WorkflowExecutionSignaledEventAttributes, + ) -> Result<(), anyhow::Error>; + + /// Cancel the workflow + fn cancel( + &self, + attribs: WorkflowExecutionCanceledEventAttributes, + ) -> Result<(), anyhow::Error>; +} + +/// The struct for [WFCommand::AddCommand] +#[derive(Debug, derive_more::From)] +pub(crate) struct AddCommand { + /// The protobuf command + pub(crate) command: Command, +} + +/// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. +/// EX: Create a new timer, complete the workflow, etc. +#[derive(Debug, derive_more::From)] +pub enum WFCommand { + AddTimer(StartTimerCommandAttributes, Arc), + CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), +} + +/// Extends [rustfsm::StateMachine] with some functionality specific to the temporal SDK. +/// +/// Formerly known as `EntityStateMachine` in Java. +trait TemporalStateMachine: CheckStateMachineInFinal { + fn name(&self) -> &str; + fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError>; + + /// Tell the state machine to handle some event. The [WorkflowMachines] instance calling this + /// function passes a reference to itself in so that the [WFMachinesAdapter] trait can use + /// the machines' specific type information while also manipulating [WorkflowMachines] + fn handle_event( + &mut self, + event: &HistoryEvent, + has_next_event: bool, + wf_machines: &mut WorkflowMachines, + ) -> Result<(), WFMachinesError>; +} + +impl TemporalStateMachine for SM +where + SM: StateMachine + CheckStateMachineInFinal + WFMachinesAdapter + Clone, + ::Event: TryFrom, + ::Event: TryFrom, + ::Command: Debug, + ::Error: Into + 'static + Send + Sync, +{ + fn name(&self) -> &str { + ::name(self) + } + + fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError> { + event!( + Level::DEBUG, + msg = "handling command", + ?command_type, + machine_name = %self.name() + ); + if let Ok(converted_command) = command_type.try_into() { + match self.on_event_mut(converted_command) { + Ok(_c) => Ok(()), + Err(MachineError::InvalidTransition) => { + Err(WFMachinesError::UnexpectedCommand(command_type)) + } + Err(MachineError::Underlying(e)) => Err(e.into()), + } + } else { + Err(WFMachinesError::UnexpectedCommand(command_type)) + } + } + + fn handle_event( + &mut self, + event: &HistoryEvent, + has_next_event: bool, + wf_machines: &mut WorkflowMachines, + ) -> Result<(), WFMachinesError> { + event!( + Level::DEBUG, + msg = "handling event", + %event, + machine_name = %self.name() + ); + if let Ok(converted_event) = event.clone().try_into() { + match self.on_event_mut(converted_event) { + Ok(c) => { + event!(Level::DEBUG, msg = "Machine produced commands", ?c); + for cmd in c { + self.adapt_response(wf_machines, event, has_next_event, cmd)?; + } + Ok(()) + } + Err(MachineError::InvalidTransition) => { + Err(WFMachinesError::UnexpectedEvent(event.clone())) + } + Err(MachineError::Underlying(e)) => Err(e.into()), + } + } else { + Err(WFMachinesError::UnexpectedEvent(event.clone())) + } + } +} + +/// Exists purely to allow generic implementation of `is_final_state` for all [StateMachine] +/// implementors +trait CheckStateMachineInFinal { + /// Returns true if the state machine is in a final state + fn is_final_state(&self) -> bool; +} + +impl CheckStateMachineInFinal for SM +where + SM: StateMachine, +{ + fn is_final_state(&self) -> bool { + self.on_final_state() + } +} + +/// This trait exists to bridge [StateMachine]s and the [WorkflowMachines] instance. It has access +/// to the machine's concrete types while hiding those details from [WorkflowMachines] +pub(super) trait WFMachinesAdapter: StateMachine { + /// Given a reference to [WorkflowMachines], the event being processed, and a command that this + /// [StateMachine] instance just produced, perform any handling that needs access to the + /// [WorkflowMachines] instance in response to that command. + fn adapt_response( + &self, + _wf_machines: &mut WorkflowMachines, + _event: &HistoryEvent, + _has_next_event: bool, + _my_command: Self::Command, + ) -> Result<(), WFMachinesError>; +} + +/// A command which can be cancelled, associated with some state machine that produced it #[derive(Debug, Clone)] -pub struct CancellableCommand { - /// The inner protobuf command, if None, command has been cancelled - command: Option, +#[allow(clippy::large_enum_variant)] +enum CancellableCommand { + // TODO: You'll be used soon, friend. + #[allow(dead_code)] + Cancelled, + Active { + /// The inner protobuf command, if None, command has been cancelled + command: MachineCommand, + machine: Rc>, + }, } impl CancellableCommand { - pub(crate) fn cancel(&mut self) { - self.command = None; + #[allow(dead_code)] // TODO: Use + pub(super) fn cancel(&mut self) { + *self = CancellableCommand::Cancelled; } } -impl From for CancellableCommand { - fn from(command: Command) -> Self { - Self { - command: Some(command), - } +impl Debug for dyn TemporalStateMachine { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.write_str(self.name()) } } diff --git a/src/machines/mutable_side_effect_state_machine.rs b/src/machines/mutable_side_effect_state_machine.rs index c75d88f06..de635b3c1 100644 --- a/src/machines/mutable_side_effect_state_machine.rs +++ b/src/machines/mutable_side_effect_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name MutableSideEffectMachine; command MutableSideEffectCommand; error MutableSideEffectMachineError; + pub(super) name MutableSideEffectMachine; command MutableSideEffectCommand; error MutableSideEffectMachineError; Created --(CheckExecutionState, on_check_execution_state) --> Replaying; Created --(CheckExecutionState, on_check_execution_state) --> Executing; @@ -25,69 +25,69 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum MutableSideEffectMachineError {} +pub(super) enum MutableSideEffectMachineError {} -pub enum MutableSideEffectCommand {} +pub(super) enum MutableSideEffectCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_check_execution_state(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_check_execution_state(self) -> MutableSideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Executing {} +pub(super) struct Executing {} impl Executing { - pub fn on_schedule(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_schedule(self) -> MutableSideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreated {} +pub(super) struct MarkerCommandCreated {} impl MarkerCommandCreated { - pub fn on_command_record_marker(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_command_record_marker(self) -> MutableSideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreatedReplaying {} +pub(super) struct MarkerCommandCreatedReplaying {} #[derive(Default, Clone)] -pub struct MarkerCommandRecorded {} +pub(super) struct MarkerCommandRecorded {} #[derive(Default, Clone)] -pub struct Replaying {} +pub(super) struct Replaying {} impl Replaying { - pub fn on_schedule(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_schedule(self) -> MutableSideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct ResultNotified {} +pub(super) struct ResultNotified {} impl ResultNotified { - pub fn on_marker_recorded(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_marker_recorded(self) -> MutableSideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct ResultNotifiedReplaying {} +pub(super) struct ResultNotifiedReplaying {} impl ResultNotifiedReplaying { - pub fn on_non_matching_event(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_non_matching_event(self) -> MutableSideEffectMachineTransition { unimplemented!() } - pub fn on_marker_recorded(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_marker_recorded(self) -> MutableSideEffectMachineTransition { unimplemented!() } } @@ -99,13 +99,13 @@ impl From for ResultNotifiedReplaying { } #[derive(Default, Clone)] -pub struct Skipped {} +pub(super) struct Skipped {} impl Skipped { - pub fn on_command_record_marker(self) -> MutableSideEffectMachineTransition { + pub(super) fn on_command_record_marker(self) -> MutableSideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct SkippedNotified {} +pub(super) struct SkippedNotified {} diff --git a/src/machines/side_effect_state_machine.rs b/src/machines/side_effect_state_machine.rs index 29aed6606..8860d1dc4 100644 --- a/src/machines/side_effect_state_machine.rs +++ b/src/machines/side_effect_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name SideEffectMachine; command SideEffectCommand; error SideEffectMachineError; + pub(super) name SideEffectMachine; command SideEffectCommand; error SideEffectMachineError; Created --(Schedule, on_schedule) --> MarkerCommandCreated; Created --(Schedule, on_schedule) --> MarkerCommandCreatedReplaying; @@ -16,48 +16,48 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum SideEffectMachineError {} +pub(super) enum SideEffectMachineError {} -pub enum SideEffectCommand {} +pub(super) enum SideEffectCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> SideEffectMachineTransition { + pub(super) fn on_schedule(self) -> SideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreated {} +pub(super) struct MarkerCommandCreated {} impl MarkerCommandCreated { - pub fn on_command_record_marker(self) -> SideEffectMachineTransition { + pub(super) fn on_command_record_marker(self) -> SideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreatedReplaying {} +pub(super) struct MarkerCommandCreatedReplaying {} #[derive(Default, Clone)] -pub struct MarkerCommandRecorded {} +pub(super) struct MarkerCommandRecorded {} #[derive(Default, Clone)] -pub struct ResultNotified {} +pub(super) struct ResultNotified {} impl ResultNotified { - pub fn on_marker_recorded(self) -> SideEffectMachineTransition { + pub(super) fn on_marker_recorded(self) -> SideEffectMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct ResultNotifiedReplaying {} +pub(super) struct ResultNotifiedReplaying {} impl ResultNotifiedReplaying { - pub fn on_marker_recorded(self) -> SideEffectMachineTransition { + pub(super) fn on_marker_recorded(self) -> SideEffectMachineTransition { unimplemented!() } } diff --git a/src/machines/signal_external_state_machine.rs b/src/machines/signal_external_state_machine.rs index a50b57a01..88bf1f6cb 100644 --- a/src/machines/signal_external_state_machine.rs +++ b/src/machines/signal_external_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name SignalExternalMachine; command SignalExternalCommand; error SignalExternalMachineError; + pub(super) name SignalExternalMachine; command SignalExternalCommand; error SignalExternalMachineError; Created --(Schedule, on_schedule) --> SignalExternalCommandCreated; @@ -15,33 +15,33 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum SignalExternalMachineError {} +pub(super) enum SignalExternalMachineError {} -pub enum SignalExternalCommand {} +pub(super) enum SignalExternalCommand {} #[derive(Default, Clone)] -pub struct Canceled {} +pub(super) struct Canceled {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> SignalExternalMachineTransition { + pub(super) fn on_schedule(self) -> SignalExternalMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Failed {} +pub(super) struct Failed {} #[derive(Default, Clone)] -pub struct SignalExternalCommandCreated {} +pub(super) struct SignalExternalCommandCreated {} impl SignalExternalCommandCreated { - pub fn on_cancel(self) -> SignalExternalMachineTransition { + pub(super) fn on_cancel(self) -> SignalExternalMachineTransition { unimplemented!() } - pub fn on_signal_external_workflow_execution_initiated( + pub(super) fn on_signal_external_workflow_execution_initiated( self, ) -> SignalExternalMachineTransition { unimplemented!() @@ -49,16 +49,18 @@ impl SignalExternalCommandCreated { } #[derive(Default, Clone)] -pub struct SignalExternalCommandRecorded {} +pub(super) struct SignalExternalCommandRecorded {} impl SignalExternalCommandRecorded { - pub fn on_external_workflow_execution_signaled(self) -> SignalExternalMachineTransition { + pub(super) fn on_external_workflow_execution_signaled(self) -> SignalExternalMachineTransition { unimplemented!() } - pub fn on_signal_external_workflow_execution_failed(self) -> SignalExternalMachineTransition { + pub(super) fn on_signal_external_workflow_execution_failed( + self, + ) -> SignalExternalMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Signaled {} +pub(super) struct Signaled {} diff --git a/src/machines/test_help.rs b/src/machines/test_help.rs deleted file mode 100644 index f3970db65..000000000 --- a/src/machines/test_help.rs +++ /dev/null @@ -1,128 +0,0 @@ -use crate::protos::temporal::api::{ - enums::v1::EventType, - history::v1::{ - history_event::Attributes, HistoryEvent, WorkflowTaskCompletedEventAttributes, - WorkflowTaskStartedEventAttributes, - }, -}; -use std::time::SystemTime; - -#[derive(Default, Debug)] -pub(crate) struct TestHistoryBuilder { - events: Vec, - /// Is incremented every time a new event is added, and that *new* value is used as that event's - /// id - current_event_id: i64, - workflow_task_scheduled_event_id: i64, - previous_started_event_id: i64, -} - -impl TestHistoryBuilder { - /// Add an event by type with attributes. Bundles both into a [HistoryEvent] with an id that is - /// incremented on each call to add. - pub(crate) fn add(&mut self, event_type: EventType, attribs: Attributes) { - self.build_and_push_event(event_type, Some(attribs)); - } - - /// Adds an event to the history by type, without attributes - pub(crate) fn add_by_type(&mut self, event_type: EventType) { - self.build_and_push_event(event_type.clone(), None); - } - - /// Adds an event, returning the ID that was assigned to it - pub(crate) fn add_get_event_id( - &mut self, - event_type: EventType, - attrs: Option, - ) -> i64 { - self.build_and_push_event(event_type, attrs); - self.current_event_id - } - - /// Adds the following events: - /// ```text - /// EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - /// EVENT_TYPE_WORKFLOW_TASK_STARTED - /// EVENT_TYPE_WORKFLOW_TASK_COMPLETED - /// ``` - pub(crate) fn add_workflow_task(&mut self) { - self.add_workflow_task_scheduled_and_started(); - self.add_workflow_task_completed(); - } - - pub(crate) fn add_workflow_task_scheduled_and_started(&mut self) { - self.add_workflow_task_scheduled(); - self.add_workflow_task_started(); - } - - pub(crate) fn add_workflow_task_scheduled(&mut self) { - // WFStarted always immediately follows WFScheduled - self.previous_started_event_id = self.workflow_task_scheduled_event_id + 1; - self.workflow_task_scheduled_event_id = - self.add_get_event_id(EventType::WorkflowTaskScheduled, None); - } - - pub(crate) fn add_workflow_task_started(&mut self) { - let attrs = WorkflowTaskStartedEventAttributes { - scheduled_event_id: self.workflow_task_scheduled_event_id, - ..Default::default() - }; - self.build_and_push_event( - EventType::WorkflowTaskStarted, - Some(Attributes::WorkflowTaskStartedEventAttributes(attrs)), - ); - } - - pub(crate) fn add_workflow_task_completed(&mut self) { - let attrs = WorkflowTaskCompletedEventAttributes { - scheduled_event_id: self.workflow_task_scheduled_event_id, - ..Default::default() - }; - self.build_and_push_event( - EventType::WorkflowTaskCompleted, - Some(Attributes::WorkflowTaskCompletedEventAttributes(attrs)), - ); - } - - /// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed by - /// WFTaskCompleted, adding one to the count for every match. It will additionally count - /// a WFTaskStarted at the end of the event list. - pub(crate) fn get_workflow_task_count(&self) -> usize { - let mut last_wf_started_id = 0; - let mut count = 0; - for (i, event) in self.events.iter().enumerate() { - let at_last_item = i == self.events.len() - 1; - let next_item_is_wftc = self - .events - .get(i + 1) - .map(|e| e.event_type == EventType::WorkflowTaskCompleted as i32) - .unwrap_or(false); - if event.event_type == EventType::WorkflowTaskStarted as i32 - && (at_last_item || next_item_is_wftc) - { - last_wf_started_id = event.event_id; - count += 1; - } - if at_last_item { - // No more events - if last_wf_started_id != event.event_id { - panic!("Last item in history wasn't WorkflowTaskStarted") - } - return count; - } - } - count - } - - fn build_and_push_event(&mut self, event_type: EventType, attribs: Option) { - self.current_event_id += 1; - let evt = HistoryEvent { - event_type: event_type as i32, - event_id: self.current_event_id, - event_time: Some(SystemTime::now().into()), - attributes: attribs, - ..Default::default() - }; - self.events.push(evt); - } -} diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs new file mode 100644 index 000000000..19d1bf94b --- /dev/null +++ b/src/machines/test_help/history_builder.rs @@ -0,0 +1,297 @@ +use super::Result; +use crate::{ + machines::{workflow_machines::WorkflowMachines, MachineCommand}, + protos::temporal::api::{ + enums::v1::EventType, + history::v1::{ + history_event::Attributes, HistoryEvent, TimerStartedEventAttributes, + WorkflowExecutionStartedEventAttributes, WorkflowTaskCompletedEventAttributes, + WorkflowTaskScheduledEventAttributes, WorkflowTaskStartedEventAttributes, + }, + }, +}; +use anyhow::bail; +use std::time::SystemTime; + +#[derive(Default, Debug)] +pub struct TestHistoryBuilder { + events: Vec, + /// Is incremented every time a new event is added, and that *new* value is used as that event's + /// id + current_event_id: i64, + workflow_task_scheduled_event_id: i64, + previous_started_event_id: i64, +} + +impl TestHistoryBuilder { + /// Add an event by type with attributes. Bundles both into a [HistoryEvent] with an id that is + /// incremented on each call to add. + pub fn add(&mut self, event_type: EventType, attribs: Attributes) { + self.build_and_push_event(event_type, attribs); + } + + /// Adds an event to the history by type, with default attributes. + pub fn add_by_type(&mut self, event_type: EventType) { + let attribs = + default_attribs(event_type).expect("Couldn't make default attributes in test builder"); + self.build_and_push_event(event_type.clone(), attribs); + } + + /// Adds an event, returning the ID that was assigned to it + pub fn add_get_event_id(&mut self, event_type: EventType, attrs: Option) -> i64 { + if let Some(a) = attrs { + self.build_and_push_event(event_type, a); + } else { + self.add_by_type(event_type); + } + self.current_event_id + } + + /// Adds the following events: + /// ```text + /// EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + /// EVENT_TYPE_WORKFLOW_TASK_STARTED + /// EVENT_TYPE_WORKFLOW_TASK_COMPLETED + /// ``` + pub fn add_workflow_task(&mut self) { + self.add_workflow_task_scheduled_and_started(); + self.add_workflow_task_completed(); + } + + pub fn add_workflow_task_scheduled_and_started(&mut self) { + self.add_workflow_task_scheduled(); + self.add_workflow_task_started(); + } + + pub fn add_workflow_task_scheduled(&mut self) { + // WFStarted always immediately follows WFScheduled + self.previous_started_event_id = self.workflow_task_scheduled_event_id + 1; + self.workflow_task_scheduled_event_id = + self.add_get_event_id(EventType::WorkflowTaskScheduled, None); + } + + pub fn add_workflow_task_started(&mut self) { + let attrs = WorkflowTaskStartedEventAttributes { + scheduled_event_id: self.workflow_task_scheduled_event_id, + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowTaskStarted, attrs.into()); + } + + pub fn add_workflow_task_completed(&mut self) { + let attrs = WorkflowTaskCompletedEventAttributes { + scheduled_event_id: self.workflow_task_scheduled_event_id, + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into()); + } + + /// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed by + /// WFTaskCompleted, adding one to the count for every match. It will additionally count + /// a WFTaskStarted at the end of the event list. + /// + /// If `up_to_event_id` is provided, the count will be returned as soon as processing advances + /// past that id. + pub fn get_workflow_task_count(&self, up_to_event_id: Option) -> Result { + let mut last_wf_started_id = 0; + let mut count = 0; + let mut history = self.events.iter().peekable(); + while let Some(event) = history.next() { + let next_event = history.peek(); + if let Some(upto) = up_to_event_id { + if event.event_id > upto { + return Ok(count); + } + } + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + if event.event_type == EventType::WorkflowTaskStarted as i32 + && (next_event.is_none() || next_is_completed) + { + last_wf_started_id = event.event_id; + count += 1; + } + + if next_event.is_none() { + if last_wf_started_id != event.event_id { + bail!("Last item in history wasn't WorkflowTaskStarted") + } + return Ok(count); + } + } + Ok(count) + } + + /// Handle workflow task(s) using the provided [WorkflowMachines]. Will process as many workflow + /// tasks as the provided `to_wf_task_num` parameter.. + /// + /// # Panics + /// * Can panic if the passed in machines have been manipulated outside of this builder + pub(crate) fn handle_workflow_task_take_cmds( + &self, + wf_machines: &mut WorkflowMachines, + to_wf_task_num: Option, + ) -> Result> { + self.handle_workflow_task(wf_machines, to_wf_task_num)?; + Ok(wf_machines.get_commands()) + } + + fn handle_workflow_task( + &self, + wf_machines: &mut WorkflowMachines, + to_wf_task_num: Option, + ) -> Result<()> { + let to_wf_task_num = to_wf_task_num.unwrap_or(usize::MAX); + let (_, events) = self + .events + .split_at(wf_machines.get_last_started_event_id() as usize); + let mut history = events.iter().peekable(); + + let hist_info = self.get_history_info(to_wf_task_num)?; + wf_machines.set_started_ids( + hist_info.previous_started_event_id, + hist_info.workflow_task_started_event_id, + ); + let mut started_id = hist_info.previous_started_event_id; + let mut num_seen_wf_tasks = if wf_machines.get_last_started_event_id() > 0 { + self.get_workflow_task_count(history.peek().map(|e| e.event_id - 1))? + } else { + 0 + }; + + while let Some(event) = history.next() { + let next_event = history.peek(); + + if event.event_type == EventType::WorkflowTaskStarted as i32 { + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + let next_is_failed_or_timeout = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskFailed as i32 + || ne.event_type == EventType::WorkflowTaskTimedOut as i32 + }); + + if next_event.is_none() || next_is_completed { + started_id = event.event_id; + num_seen_wf_tasks += 1; + if num_seen_wf_tasks == to_wf_task_num || next_event.is_none() { + wf_machines.handle_event(event, false)?; + return Ok(()); + } + } else if next_event.is_some() && !next_is_failed_or_timeout { + bail!( + "Invalid history! Event {:?} should be WF task completed, \ + failed, or timed out.", + &event + ); + } + } + + wf_machines.handle_event(event, next_event.is_some())?; + + if next_event.is_none() { + if event.is_final_wf_execution_event() { + return Ok(()); + } + if started_id != event.event_id { + bail!( + "The last event in the history (id {}) isn't the last WF task \ + started (id {})", + event.event_id, + started_id + ); + } + unreachable!() + } + } + + Ok(()) + } + + /// Iterates over the events in this builder to return a [HistoryInfo] + pub(crate) fn get_history_info(&self, to_task_index: usize) -> Result { + let mut lastest_wf_started_id = 0; + let mut previous_wf_started_id = 0; + let mut count = 0; + let mut history = self.events.iter().peekable(); + + while let Some(event) = history.next() { + let next_event = history.peek(); + + if event.event_type == EventType::WorkflowTaskStarted as i32 { + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + let next_is_failed_or_timeout = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskFailed as i32 + || ne.event_type == EventType::WorkflowTaskTimedOut as i32 + }); + + if next_event.is_none() || next_is_completed { + previous_wf_started_id = lastest_wf_started_id; + lastest_wf_started_id = event.event_id; + if lastest_wf_started_id == previous_wf_started_id { + bail!("Latest wf started id and previous one are equal!") + } + count += 1; + if count == to_task_index || next_event.is_none() { + return Ok(HistoryInfo::new( + previous_wf_started_id, + lastest_wf_started_id, + )); + } + } else if next_event.is_some() && !next_is_failed_or_timeout { + bail!( + "Invalid history! Event {:?} should be WF task completed, \ + failed, or timed out.", + &event + ); + } + } + + if next_event.is_none() { + if event.is_final_wf_execution_event() { + return Ok(HistoryInfo::new( + previous_wf_started_id, + lastest_wf_started_id, + )); + } + // No more events + if lastest_wf_started_id != event.event_id { + bail!("Last item in history wasn't WorkflowTaskStarted") + } + } + } + unreachable!() + } + + fn build_and_push_event(&mut self, event_type: EventType, attribs: Attributes) { + self.current_event_id += 1; + let evt = HistoryEvent { + event_type: event_type as i32, + event_id: self.current_event_id, + event_time: Some(SystemTime::now().into()), + attributes: Some(attribs), + ..Default::default() + }; + self.events.push(evt); + } +} + +fn default_attribs(et: EventType) -> Result { + Ok(match et { + EventType::WorkflowExecutionStarted => { + WorkflowExecutionStartedEventAttributes::default().into() + } + EventType::WorkflowTaskScheduled => WorkflowTaskScheduledEventAttributes::default().into(), + EventType::TimerStarted => TimerStartedEventAttributes::default().into(), + _ => bail!("Don't know how to construct default attrs for {:?}", et), + }) +} + +#[derive(Clone, Debug, derive_more::Constructor, Eq, PartialEq, Hash)] +pub struct HistoryInfo { + pub previous_started_event_id: i64, + pub workflow_task_started_event_id: i64, +} diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs new file mode 100644 index 000000000..427f0876f --- /dev/null +++ b/src/machines/test_help/mod.rs @@ -0,0 +1,7 @@ +type Result = std::result::Result; + +mod history_builder; +mod workflow_driver; + +pub(super) use history_builder::TestHistoryBuilder; +pub(super) use workflow_driver::{CommandSender, TestWFCommand, TestWorkflowDriver}; diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs new file mode 100644 index 000000000..1db3dd5ef --- /dev/null +++ b/src/machines/test_help/workflow_driver.rs @@ -0,0 +1,166 @@ +use super::Result; +use crate::{ + machines::{DrivenWorkflow, WFCommand}, + protos::temporal::api::{ + command::v1::StartTimerCommandAttributes, + history::v1::{ + WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, + WorkflowExecutionStartedEventAttributes, + }, + }, +}; +use futures::Future; +use std::{ + collections::{hash_map, HashMap}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{self, Receiver, Sender}, + Arc, RwLock, + }, +}; +use tracing::Level; + +type TimerMap = HashMap>; + +/// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control +/// over when commands are returned than a normal workflow would. +/// +/// It replaces "TestEnitityTestListenerBase" in java which is pretty hard to follow. +#[derive(Debug)] +pub(in crate::machines) struct TestWorkflowDriver { + wf_function: F, + + /// Holds status for timers so we don't recreate them by accident. Key is timer id, value is + /// true if it is complete. + /// + /// I don't love that this is pretty duplicative of workflow machines, nor the nasty sync + /// involved. Would be good to eliminate. + /// + /// It can and should be eliminated by not recreating CommandSender on every iteration, which + /// means keeping the workflow suspended in a future somewhere. I tried this and it was hard, + /// but ultimately it's how real workflows will need to work. + timers: Arc>, +} + +impl TestWorkflowDriver +where + F: Fn(CommandSender) -> Fut, + Fut: Future, +{ + /// Create a new test workflow driver from a workflow "function" which is really a closure + /// that returns an async block. + /// + /// In an ideal world, the workflow fn would actually be a generator which can yield commands, + /// and we may well end up doing something like that later. + pub(in crate::machines) fn new(workflow_fn: F) -> Self { + Self { + wf_function: workflow_fn, + timers: Arc::new(RwLock::new(HashMap::default())), + } + } +} + +impl DrivenWorkflow for TestWorkflowDriver +where + F: Fn(CommandSender) -> Fut, + Fut: Future, +{ + #[instrument(skip(self))] + fn start( + &self, + _attribs: WorkflowExecutionStartedEventAttributes, + ) -> Result, anyhow::Error> { + Ok(vec![]) + } + + #[instrument(skip(self))] + fn iterate_wf(&self) -> Result, anyhow::Error> { + let (sender, receiver) = CommandSender::new(self.timers.clone()); + // Call the closure that produces the workflow future + let wf_future = (self.wf_function)(sender); + + // Create a tokio runtime to block on the future + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(wf_future); + + let cmds = receiver.into_iter(); + event!(Level::DEBUG, msg = "Test wf driver emitting", ?cmds); + + let mut last_cmd = None; + for cmd in cmds { + match cmd { + TestWFCommand::WFCommand(c) => last_cmd = Some(c), + TestWFCommand::Waiting => { + // Ignore further commands since we're waiting on something + break; + } + } + } + + // Return only the last command, since that's what would've been yielded in a real wf + Ok(if let Some(c) = last_cmd { + vec![c] + } else { + vec![] + }) + } + + fn signal( + &self, + _attribs: WorkflowExecutionSignaledEventAttributes, + ) -> Result<(), anyhow::Error> { + Ok(()) + } + + fn cancel( + &self, + _attribs: WorkflowExecutionCanceledEventAttributes, + ) -> Result<(), anyhow::Error> { + Ok(()) + } +} + +#[derive(Debug, derive_more::From)] +pub enum TestWFCommand { + WFCommand(WFCommand), + /// When a test workflow wants to await something like a timer or an activity, we will + /// ignore all commands produced after the wait, since they couldn't have actually happened + /// in a real workflow, since you'd be stuck waiting + Waiting, +} + +pub struct CommandSender { + chan: Sender, + timer_map: Arc>, +} + +impl<'a> CommandSender { + fn new(timer_map: Arc>) -> (Self, Receiver) { + let (chan, rx) = mpsc::channel(); + (Self { chan, timer_map }, rx) + } + + /// Request to create a timer, returning future which resolves when the timer completes + pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future + '_ { + let finished = match self.timer_map.write().unwrap().entry(a.timer_id.clone()) { + hash_map::Entry::Occupied(existing) => existing.get().load(Ordering::SeqCst), + hash_map::Entry::Vacant(v) => { + let atomic = Arc::new(AtomicBool::new(false)); + + let c = WFCommand::AddTimer(a, atomic.clone()); + // In theory we should send this in both branches + self.chan.send(c.into()).unwrap(); + v.insert(atomic); + false + } + }; + if !finished { + self.chan.send(TestWFCommand::Waiting).unwrap(); + } + futures::future::ready(()) + } + + pub fn send(&mut self, c: WFCommand) { + self.chan.send(c.into()).unwrap(); + } +} diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 5beeb6be9..e38d31f2d 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -1,29 +1,32 @@ #![allow(clippy::large_enum_variant)] use crate::{ - machines::CancellableCommand, + machines::{ + workflow_machines::WFMachinesError, workflow_machines::WorkflowMachines, AddCommand, + CancellableCommand, WFCommand, WFMachinesAdapter, + }, protos::{ coresdk::HistoryEventId, temporal::api::{ - command::v1::{command::Attributes, Command, StartTimerCommandAttributes}, + command::v1::{ + command::Attributes, CancelTimerCommandAttributes, Command, + StartTimerCommandAttributes, + }, enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent, TimerCanceledEventAttributes}, }, }, }; -use rustfsm::{fsm, TransitionResult}; +use rustfsm::{fsm, StateMachine, TransitionResult}; +use std::{cell::RefCell, convert::TryFrom, rc::Rc, sync::atomic::Ordering}; +use tracing::Level; fsm! { - name TimerMachine; - command TimerCommand; - error TimerMachineError; + pub(super) name TimerMachine; + command TimerMachineCommand; + error WFMachinesError; shared_state SharedState; - CancelTimerCommandCreated --(Cancel) --> CancelTimerCommandCreated; - CancelTimerCommandCreated --(CommandCancelTimer, shared on_command_cancel_timer) --> CancelTimerCommandSent; - - CancelTimerCommandSent --(TimerCanceled) --> Canceled; - Created --(Schedule, shared on_schedule) --> StartCommandCreated; StartCommandCreated --(CommandStartTimer) --> StartCommandCreated; @@ -31,16 +34,87 @@ fsm! { StartCommandCreated --(Cancel, shared on_cancel) --> Canceled; StartCommandRecorded --(TimerFired(HistoryEvent), on_timer_fired) --> Fired; - StartCommandRecorded --(Cancel, on_cancel) --> CancelTimerCommandCreated; + StartCommandRecorded --(Cancel, shared on_cancel) --> CancelTimerCommandCreated; + + CancelTimerCommandCreated --(Cancel) --> CancelTimerCommandCreated; + CancelTimerCommandCreated + --(CommandCancelTimer, shared on_command_cancel_timer) --> CancelTimerCommandSent; + + CancelTimerCommandSent --(TimerCanceled) --> Canceled; +} + +#[derive(Debug)] +pub(super) enum TimerMachineCommand { + AddCommand(AddCommand), + Complete(HistoryEvent), +} + +/// Creates a new, scheduled, timer as a [CancellableCommand] +pub(super) fn new_timer(attribs: StartTimerCommandAttributes) -> CancellableCommand { + let (timer, add_cmd) = TimerMachine::new_scheduled(attribs); + CancellableCommand::Active { + command: add_cmd.command, + machine: Rc::new(RefCell::new(timer)), + } +} + +impl TimerMachine { + /// Create a new timer and immediately schedule it + pub(crate) fn new_scheduled(attribs: StartTimerCommandAttributes) -> (Self, AddCommand) { + let mut s = Self::new(attribs); + let cmd = match s + .on_event_mut(TimerMachineEvents::Schedule) + .expect("Scheduling timers doesn't fail") + .pop() + { + Some(TimerMachineCommand::AddCommand(c)) => c, + _ => panic!("Timer on_schedule must produce command"), + }; + (s, cmd) + } + + fn new(attribs: StartTimerCommandAttributes) -> Self { + Self { + state: Created {}.into(), + shared_state: SharedState { + timer_attributes: attribs, + }, + } + } +} + +impl TryFrom for TimerMachineEvents { + type Error = (); + + fn try_from(e: HistoryEvent) -> Result { + Ok(match EventType::from_i32(e.event_type) { + Some(EventType::TimerStarted) => Self::TimerStarted(e.event_id), + Some(EventType::TimerCanceled) => Self::TimerCanceled, + Some(EventType::TimerFired) => Self::TimerFired(e), + _ => return Err(()), + }) + } +} + +impl TryFrom for TimerMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::StartTimer => Self::CommandStartTimer, + CommandType::CancelTimer => Self::CommandCancelTimer, + _ => return Err(()), + }) + } } #[derive(Default, Clone)] -pub struct SharedState { +pub(super) struct SharedState { timer_attributes: StartTimerCommandAttributes, } impl SharedState { - fn into_timer_canceled_event_command(self) -> TimerCommand { + fn into_timer_canceled_event_command(self) -> TimerMachineCommand { let attrs = TimerCanceledEventAttributes { identity: "workflow".to_string(), timer_id: self.timer_attributes.timer_id, @@ -53,23 +127,30 @@ impl SharedState { )), ..Default::default() }; - TimerCommand::ProduceHistoryEvent(event) + TimerMachineCommand::Complete(event) } } -#[derive(thiserror::Error, Debug)] -pub enum TimerMachineError {} +#[derive(Default, Clone)] +pub(super) struct Created {} -pub enum TimerCommand { - StartTimer(CancellableCommand), - CancelTimer(/* TODO: Command attribs */), - ProduceHistoryEvent(HistoryEvent), +impl Created { + pub(super) fn on_schedule(self, dat: SharedState) -> TimerMachineTransition { + let cmd = Command { + command_type: CommandType::StartTimer as i32, + attributes: Some(dat.timer_attributes.into()), + }; + TimerMachineTransition::commands::<_, StartCommandCreated>(vec![ + TimerMachineCommand::AddCommand(cmd.into()), + ]) + } } #[derive(Default, Clone)] -pub struct CancelTimerCommandCreated {} +pub(super) struct CancelTimerCommandCreated {} impl CancelTimerCommandCreated { - pub fn on_command_cancel_timer(self, dat: SharedState) -> TimerMachineTransition { + pub(super) fn on_command_cancel_timer(self, dat: SharedState) -> TimerMachineTransition { + // TODO: Think we need to produce a completion command here TimerMachineTransition::ok( vec![dat.into_timer_canceled_event_command()], Canceled::default(), @@ -78,10 +159,10 @@ impl CancelTimerCommandCreated { } #[derive(Default, Clone)] -pub struct CancelTimerCommandSent {} +pub(super) struct CancelTimerCommandSent {} #[derive(Default, Clone)] -pub struct Canceled {} +pub(super) struct Canceled {} impl From for Canceled { fn from(_: CancelTimerCommandSent) -> Self { Self::default() @@ -89,42 +170,19 @@ impl From for Canceled { } #[derive(Default, Clone)] -pub struct Created {} - -impl Created { - pub fn on_schedule(self, dat: SharedState) -> TimerMachineTransition { - let cmd = Command { - command_type: CommandType::StartTimer as i32, - attributes: Some(Attributes::StartTimerCommandAttributes( - dat.timer_attributes, - )), - }; - TimerMachineTransition::ok( - vec![TimerCommand::StartTimer(cmd.clone().into())], - StartCommandCreated { - cancellable_command: CancellableCommand::from(cmd), - }, - ) - } -} +pub(super) struct Fired {} #[derive(Default, Clone)] -pub struct Fired {} - -#[derive(Clone)] -pub struct StartCommandCreated { - cancellable_command: CancellableCommand, -} +pub(super) struct StartCommandCreated {} impl StartCommandCreated { - pub fn on_timer_started(self, id: HistoryEventId) -> TimerMachineTransition { - // Java recorded an initial event ID, but it seemingly was never used. + pub(super) fn on_timer_started(self, _id: HistoryEventId) -> TimerMachineTransition { + // TODO: Java recorded an initial event ID, but it seemingly was never used. TimerMachineTransition::default::() } - pub fn on_cancel(mut self, dat: SharedState) -> TimerMachineTransition { + pub(super) fn on_cancel(mut self, dat: SharedState) -> TimerMachineTransition { // Cancel the initial command - which just sets a "canceled" flag in a wrapper of a - // proto command. TODO: Does this make any sense? - let _canceled_cmd = self.cancellable_command.cancel(); + // proto command. TODO: Does this make any sense? - no - propagate up TimerMachineTransition::ok( vec![dat.into_timer_canceled_event_command()], Canceled::default(), @@ -133,37 +191,82 @@ impl StartCommandCreated { } #[derive(Default, Clone)] -pub struct StartCommandRecorded {} +pub(super) struct StartCommandRecorded {} impl StartCommandRecorded { - pub fn on_timer_fired(self, event: HistoryEvent) -> TimerMachineTransition { - TimerMachineTransition::ok( - vec![TimerCommand::ProduceHistoryEvent(event)], - Fired::default(), - ) + pub(super) fn on_timer_fired(self, event: HistoryEvent) -> TimerMachineTransition { + TimerMachineTransition::ok(vec![TimerMachineCommand::Complete(event)], Fired::default()) } - pub fn on_cancel(self) -> TimerMachineTransition { + pub(super) fn on_cancel(self, dat: SharedState) -> TimerMachineTransition { + let cmd = Command { + command_type: CommandType::CancelTimer as i32, + attributes: Some( + CancelTimerCommandAttributes { + timer_id: dat.timer_attributes.timer_id, + } + .into(), + ), + }; TimerMachineTransition::ok( - vec![TimerCommand::CancelTimer()], + vec![TimerMachineCommand::AddCommand(cmd.into())], CancelTimerCommandCreated::default(), ) } } +impl WFMachinesAdapter for TimerMachine { + fn adapt_response( + &self, + wf_machines: &mut WorkflowMachines, + _event: &HistoryEvent, + _has_next_event: bool, + my_command: TimerMachineCommand, + ) -> Result<(), WFMachinesError> { + match my_command { + TimerMachineCommand::AddCommand(_) => { + unreachable!() + } + // Fire the completion + TimerMachineCommand::Complete(_event) => { + if let Some(a) = wf_machines + .timer_notifiers + .remove(&self.shared_state.timer_attributes.timer_id) + { + a.store(true, Ordering::SeqCst) + }; + } + } + Ok(()) + } +} + #[cfg(test)] mod test { + use super::*; + use crate::machines::test_help::CommandSender; use crate::{ - machines::test_help::TestHistoryBuilder, + machines::{ + complete_workflow_state_machine::complete_workflow, + test_help::{TestHistoryBuilder, TestWFCommand, TestWorkflowDriver}, + workflow_machines::WorkflowMachines, + DrivenWorkflow, WFCommand, + }, protos::temporal::api::{ - enums::v1::EventType, - history::{v1::history_event::Attributes, v1::TimerFiredEventAttributes}, + command::v1::CompleteWorkflowExecutionCommandAttributes, + history::v1::{ + TimerFiredEventAttributes, WorkflowExecutionCanceledEventAttributes, + WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, + }, }, }; + use futures::channel::mpsc::Sender; + use futures::{FutureExt, SinkExt}; + use rstest::{fixture, rstest}; + use std::{error::Error, time::Duration}; + use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - #[test] - fn test_fire_happy_path() { - // We don't actually have a way to author workflows in rust yet, but the workflow that would - // match up with this is just a wf with one timer in it that fires normally. + #[fixture] + fn fire_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { /* 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED @@ -173,20 +276,93 @@ mod test { 6: EVENT_TYPE_TIMER_FIRED 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED 8: EVENT_TYPE_WORKFLOW_TASK_STARTED + + We have two versions of this test, one which processes the history in two calls, + and one which replays all of it in one go. The former will run the event loop three + times total, and the latter two. + + There are two workflow tasks, so it seems we should only loop two times, but the reason + for the extra iteration in the incremental version is that we need to "wait" for the + timer to fire. In the all-in-one-go test, the timer is created and resolved in the same + task, hence no extra loop. */ + let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { + let timer = StartTimerCommandAttributes { + timer_id: "Sometimer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(5).into()), + ..Default::default() + }; + command_sink.timer(timer).await; + + let complete = CompleteWorkflowExecutionCommandAttributes::default(); + command_sink.send(complete.into()); + }); + let mut t = TestHistoryBuilder::default(); + let mut state_machines = WorkflowMachines::new(Box::new(twd)); + t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task(); let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); t.add( EventType::TimerFired, - Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { started_event_id: timer_started_event_id, timer_id: "timer1".to_string(), ..Default::default() }), ); t.add_workflow_task_scheduled_and_started(); - assert_eq!(2, t.get_workflow_task_count()); + assert_eq!(2, t.get_workflow_task_count(None).unwrap()); + (t, state_machines) + } + + #[rstest] + fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + let s = span!(Level::DEBUG, "Test start", t = "inc"); + let _enter = s.enter(); + + let (t, mut state_machines) = fire_happy_hist; + + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) + .unwrap(); + assert_eq!(commands.len(), 1); + assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .unwrap(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); + } + + #[rstest] + fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() + .with_service_name("report_example") + .install() + .unwrap(); + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + tracing_subscriber::registry() + .with(opentelemetry) + .try_init() + .unwrap(); + + let s = span!(Level::DEBUG, "Test start", t = "full"); + let _enter = s.enter(); + + let (t, mut state_machines) = fire_happy_hist; + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .unwrap(); + dbg!(&commands); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); } } diff --git a/src/machines/upsert_search_attributes_state_machine.rs b/src/machines/upsert_search_attributes_state_machine.rs index 874689b6d..7424f1de3 100644 --- a/src/machines/upsert_search_attributes_state_machine.rs +++ b/src/machines/upsert_search_attributes_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name UpsertSearchAttributesMachine; command UpsertSearchAttributesCommand; error UpsertSearchAttributesMachineError; + pub(super) name UpsertSearchAttributesMachine; command UpsertSearchAttributesCommand; error UpsertSearchAttributesMachineError; Created --(Schedule, on_schedule) --> UpsertCommandCreated; @@ -10,27 +10,29 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum UpsertSearchAttributesMachineError {} +pub(super) enum UpsertSearchAttributesMachineError {} -pub enum UpsertSearchAttributesCommand {} +pub(super) enum UpsertSearchAttributesCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_schedule(self) -> UpsertSearchAttributesMachineTransition { + pub(super) fn on_schedule(self) -> UpsertSearchAttributesMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct UpsertCommandCreated {} +pub(super) struct UpsertCommandCreated {} impl UpsertCommandCreated { - pub fn on_upsert_workflow_search_attributes(self) -> UpsertSearchAttributesMachineTransition { + pub(super) fn on_upsert_workflow_search_attributes( + self, + ) -> UpsertSearchAttributesMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct UpsertCommandRecorded {} +pub(super) struct UpsertCommandRecorded {} diff --git a/src/machines/version_state_machine.rs b/src/machines/version_state_machine.rs index fc6846dbd..55cbb38d7 100644 --- a/src/machines/version_state_machine.rs +++ b/src/machines/version_state_machine.rs @@ -1,7 +1,7 @@ use rustfsm::{fsm, TransitionResult}; fsm! { - name VersionMachine; command VersionCommand; error VersionMachineError; + pub(super) name VersionMachine; command VersionCommand; error VersionMachineError; Created --(CheckExecutionState, on_check_execution_state) --> Replaying; Created --(CheckExecutionState, on_check_execution_state) --> Executing; @@ -25,69 +25,69 @@ fsm! { } #[derive(thiserror::Error, Debug)] -pub enum VersionMachineError {} +pub(super) enum VersionMachineError {} -pub enum VersionCommand {} +pub(super) enum VersionCommand {} #[derive(Default, Clone)] -pub struct Created {} +pub(super) struct Created {} impl Created { - pub fn on_check_execution_state(self) -> VersionMachineTransition { + pub(super) fn on_check_execution_state(self) -> VersionMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct Executing {} +pub(super) struct Executing {} impl Executing { - pub fn on_schedule(self) -> VersionMachineTransition { + pub(super) fn on_schedule(self) -> VersionMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreated {} +pub(super) struct MarkerCommandCreated {} impl MarkerCommandCreated { - pub fn on_command_record_marker(self) -> VersionMachineTransition { + pub(super) fn on_command_record_marker(self) -> VersionMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct MarkerCommandCreatedReplaying {} +pub(super) struct MarkerCommandCreatedReplaying {} #[derive(Default, Clone)] -pub struct MarkerCommandRecorded {} +pub(super) struct MarkerCommandRecorded {} #[derive(Default, Clone)] -pub struct Replaying {} +pub(super) struct Replaying {} impl Replaying { - pub fn on_schedule(self) -> VersionMachineTransition { + pub(super) fn on_schedule(self) -> VersionMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct ResultNotified {} +pub(super) struct ResultNotified {} impl ResultNotified { - pub fn on_marker_recorded(self) -> VersionMachineTransition { + pub(super) fn on_marker_recorded(self) -> VersionMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct ResultNotifiedReplaying {} +pub(super) struct ResultNotifiedReplaying {} impl ResultNotifiedReplaying { - pub fn on_non_matching_event(self) -> VersionMachineTransition { + pub(super) fn on_non_matching_event(self) -> VersionMachineTransition { unimplemented!() } - pub fn on_marker_recorded(self) -> VersionMachineTransition { + pub(super) fn on_marker_recorded(self) -> VersionMachineTransition { unimplemented!() } } @@ -99,13 +99,13 @@ impl From for ResultNotifiedReplaying { } #[derive(Default, Clone)] -pub struct Skipped {} +pub(super) struct Skipped {} impl Skipped { - pub fn on_command_record_marker(self) -> VersionMachineTransition { + pub(super) fn on_command_record_marker(self) -> VersionMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct SkippedNotified {} +pub(super) struct SkippedNotified {} diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs new file mode 100644 index 000000000..ed9b50ac4 --- /dev/null +++ b/src/machines/workflow_machines.rs @@ -0,0 +1,364 @@ +use crate::{ + machines::{ + complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, + workflow_task_state_machine::WorkflowTaskMachine, CancellableCommand, DrivenWorkflow, + MachineCommand, TemporalStateMachine, WFCommand, + }, + protos::temporal::api::{ + command::v1::StartTimerCommandAttributes, + enums::v1::{CommandType, EventType}, + history::v1::{history_event, HistoryEvent}, + }, +}; +use futures::Future; +use rustfsm::StateMachine; +use std::{ + borrow::BorrowMut, + cell::RefCell, + collections::{HashMap, VecDeque}, + rc::Rc, + sync::{atomic::AtomicBool, Arc}, + time::SystemTime, +}; +use tracing::Level; + +type Result = std::result::Result; + +pub(crate) struct WorkflowMachines { + /// The event id of the last wf task started event in the history which is expected to be + /// [current_started_event_id] except during replay. + workflow_task_started_event_id: i64, + /// EventId of the last handled WorkflowTaskStarted event + current_started_event_id: i64, + /// The event id of the started event of the last successfully executed workflow task + previous_started_event_id: i64, + /// True if the workflow is replaying from history + replaying: bool, + /// Identifies the current run and is used as a seed for faux-randomness. + current_run_id: Option, + /// The current workflow time if it has been established + current_wf_time: Option, + + /// A mapping for accessing all the machines, where the key is the id of the initiating event + /// for that machine. + machines_by_id: HashMap>>, + + /// Queued commands which have been produced by machines and await processing + commands: VecDeque, + /// Commands generated by the currently processed workflow task. It is a queue as commands can + /// be added (due to marker based commands) while iterating over already added commands. + current_wf_task_commands: VecDeque, + + /// The workflow that is being driven by this instance of the machines + drive_me: Box, + + /// Holds atomics for completing timers. Key is the ID of the timer. + pub(super) timer_notifiers: HashMap>, +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum WFMachinesError { + #[error("Event {0:?} was not expected")] + UnexpectedEvent(HistoryEvent), + #[error("Event {0:?} was malformed: {1}")] + MalformedEvent(HistoryEvent, String), + #[error("Command type {0:?} was not expected")] + UnexpectedCommand(CommandType), + #[error("No command was scheduled for event {0:?}")] + NoCommandScheduledForEvent(HistoryEvent), + + // TODO: This may not be the best thing to do here, tbd. + #[error("Underlying error {0:?}")] + Underlying(#[from] anyhow::Error), +} + +impl WorkflowMachines { + pub(super) fn new(driven_wf: Box) -> Self { + Self { + drive_me: driven_wf, + // In an ideal world one could say ..Default::default() here and it'd still work. + workflow_task_started_event_id: 0, + current_started_event_id: 0, + previous_started_event_id: 0, + replaying: false, + current_run_id: None, + current_wf_time: None, + machines_by_id: Default::default(), + commands: Default::default(), + current_wf_task_commands: Default::default(), + timer_notifiers: Default::default(), + } + } + + /// Create a new timer for this workflow with the provided attributes and sender. The sender + /// is sent `true` when the timer completes. + /// + /// Returns the command and a future that will resolve when the timer completes + pub(super) fn new_timer( + &mut self, + attribs: StartTimerCommandAttributes, + completion_flag: Arc, + ) -> CancellableCommand { + let timer_id = attribs.timer_id.clone(); + let timer = new_timer(attribs); + self.timer_notifiers.insert(timer_id, completion_flag); + timer + } + + /// Returns the id of the last seen WorkflowTaskStarted event + pub(super) fn get_last_started_event_id(&self) -> i64 { + self.current_started_event_id + } + + /// Handle a single event from the workflow history. `has_next_event` should be false if `event` + /// is the last event in the history. + /// + /// TODO: Describe what actually happens in here + #[instrument(skip(self))] + pub(crate) fn handle_event( + &mut self, + event: &HistoryEvent, + has_next_event: bool, + ) -> Result<()> { + if event.is_command_event() { + self.handle_command_event(event)?; + return Ok(()); + } + let event_type = EventType::from_i32(event.event_type) + .ok_or_else(|| WFMachinesError::UnexpectedEvent(event.clone()))?; + + if self.replaying + && self.current_started_event_id >= self.previous_started_event_id + && event_type != EventType::WorkflowTaskCompleted + { + // Replay is finished + self.replaying = false; + } + + match event.get_initial_command_event_id() { + Some(initial_cmd_id) => { + if let Some(sm) = self.machines_by_id.get(&initial_cmd_id) { + (*sm.clone()) + .borrow_mut() + .handle_event(event, has_next_event, self)?; + } else { + event!( + Level::ERROR, + msg = "During event handling, this event had an initial command ID but \ + we could not find a matching state machine! Event: {:?}", + ?event + ); + } + + // Have to fetch machine again here to avoid borrowing self mutably twice + if let Some(sm) = self.machines_by_id.get_mut(&initial_cmd_id) { + if sm.borrow().is_final_state() { + self.machines_by_id.remove(&initial_cmd_id); + } + } + } + None => self.handle_non_stateful_event(event, has_next_event)?, + } + + Ok(()) + } + + /// Called when we want to run the event loop because a workflow task started event has + /// triggered + pub(super) fn task_started( + &mut self, + task_started_event_id: i64, + time: SystemTime, + ) -> Result<()> { + let s = span!(Level::DEBUG, "Task started trigger"); + let _enter = s.enter(); + + // TODO: Seems to only matter for version machine. Figure out then. + // // If some new commands are pending and there are no more command events. + // for (CancellableCommand cancellableCommand : commands) { + // if (cancellableCommand == null) { + // break; + // } + // cancellableCommand.handleWorkflowTaskStarted(); + // } + + // TODO: Local activity machines + // // Give local activities a chance to recreate their requests if they were lost due + // // to the last workflow task failure. The loss could happen only the last workflow task + // // was forcibly created by setting forceCreate on RespondWorkflowTaskCompletedRequest. + // if (nonProcessedWorkflowTask) { + // for (LocalActivityStateMachine value : localActivityMap.values()) { + // value.nonReplayWorkflowTaskStarted(); + // } + // } + + self.current_started_event_id = task_started_event_id; + self.set_current_time(time); + self.event_loop()?; + Ok(()) + } + + /// A command event is an event which is generated from a command emitted by a past decision. + /// Each command has a correspondent event. For example ScheduleActivityTaskCommand is recorded + /// to the history as ActivityTaskScheduledEvent. + /// + /// Command events always follow WorkflowTaskCompletedEvent. + /// + /// The handling consists of verifying that the next command in the commands queue is associated + /// with a state machine, which is then notified about the event and the command is removed from + /// the commands queue. + fn handle_command_event(&mut self, event: &HistoryEvent) -> Result<()> { + // TODO: Local activity handling stuff + // if (handleLocalActivityMarker(event)) { + // return; + // } + + let consumed_cmd = loop { + // handleVersionMarker can skip a marker event if the getVersion call was removed. + // In this case we don't want to consume a command. -- we will need to replace it back + // to the front when implementing, or something better + let maybe_command = self.commands.pop_front(); + let command = if let Some(c) = maybe_command { + c + } else { + return Err(WFMachinesError::NoCommandScheduledForEvent(event.clone())); + }; + + // Feed the machine the event + let mut break_later = false; + if let CancellableCommand::Active { mut machine, .. } = command.clone() { + let out_commands = (*machine).borrow_mut().handle_event(event, true, self)?; + // TODO: Handle invalid event errors + // * More special handling for version machine - see java + // * Command/machine supposed to have cancelled itself + + event!( + Level::DEBUG, + msg = "Machine produced commands", + ?out_commands + ); + break_later = true; + } + + if break_later { + break command; + } + }; + + // TODO: validate command + + if let CancellableCommand::Active { machine, .. } = consumed_cmd { + if !machine.borrow().is_final_state() { + self.machines_by_id.insert(event.event_id, machine); + } + } + + Ok(()) + } + + fn handle_non_stateful_event( + &mut self, + event: &HistoryEvent, + has_next_event: bool, + ) -> Result<()> { + match EventType::from_i32(event.event_type) { + Some(EventType::WorkflowExecutionStarted) => { + if let Some(history_event::Attributes::WorkflowExecutionStartedEventAttributes( + attrs, + )) = &event.attributes + { + self.current_run_id = Some(attrs.original_execution_run_id.clone()); + let results = self.drive_me.start(attrs.clone())?; + self.handle_driven_results(results); + } else { + return Err(WFMachinesError::MalformedEvent( + event.clone(), + "WorkflowExecutionStarted event did not have appropriate attributes" + .to_string(), + )); + } + } + Some(EventType::WorkflowTaskScheduled) => { + let mut wf_task_sm = WorkflowTaskMachine::new(self.workflow_task_started_event_id); + wf_task_sm.handle_event(event, has_next_event, self)?; + self.machines_by_id + .insert(event.event_id, Rc::new(RefCell::new(wf_task_sm))); + } + Some(EventType::WorkflowExecutionSignaled) => { + // TODO: Signal callbacks + } + Some(EventType::WorkflowExecutionCancelRequested) => { + // TODO: Cancel callbacks + } + _ => return Err(WFMachinesError::UnexpectedEvent(event.clone())), + } + Ok(()) + } + + /// Fetches commands ready for processing from the state machines + pub(crate) fn get_commands(&mut self) -> Vec { + self.commands + .iter() + .filter_map(|c| { + if let CancellableCommand::Active { command, .. } = c { + Some(command.clone()) + } else { + None + } + }) + .collect() + } + + /// Given an event id (possibly zero) of the last successfully executed workflow task and an + /// id of the last event, sets the ids internally and appropriately sets the replaying flag. + pub(super) fn set_started_ids( + &mut self, + previous_started_event_id: i64, + workflow_task_started_event_id: i64, + ) { + self.previous_started_event_id = previous_started_event_id; + self.workflow_task_started_event_id = workflow_task_started_event_id; + self.replaying = previous_started_event_id > 0; + } + + fn set_current_time(&mut self, time: SystemTime) -> SystemTime { + if self.current_wf_time.map(|t| t < time).unwrap_or(true) { + self.current_wf_time = Some(time); + } + self.current_wf_time + .expect("We have just ensured this is populated") + } + + fn event_loop(&mut self) -> Result<()> { + let results = self.drive_me.iterate_wf()?; + self.handle_driven_results(results); + + self.prepare_commands(); + Ok(()) + } + + fn handle_driven_results(&mut self, results: Vec) { + for cmd in results { + // I don't love how boilerplatey this is + match cmd { + WFCommand::AddTimer(attrs, completion_flag) => { + let timer = self.new_timer(attrs, completion_flag); + self.current_wf_task_commands.push_back(timer); + } + WFCommand::CompleteWorkflow(attrs) => { + self.current_wf_task_commands + .push_back(complete_workflow(attrs)); + } + } + } + } + + fn prepare_commands(&mut self) { + while let Some(c) = self.current_wf_task_commands.pop_front() { + // TODO - some special case stuff that can maybe be managed differently? + // handleCommand should be called even on canceled ones to support mutableSideEffect + // command.handleCommand(command.getCommandType()); + self.commands.push_back(c); + } + } +} diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 15a0a8292..333a74796 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -1,64 +1,196 @@ +#![allow(clippy::enum_variant_names)] + +use crate::{ + machines::{ + workflow_machines::{WFMachinesError, WorkflowMachines}, + WFMachinesAdapter, + }, + protos::temporal::api::{ + enums::v1::{CommandType, EventType}, + history::v1::HistoryEvent, + }, +}; use rustfsm::{fsm, TransitionResult}; +use std::{convert::TryFrom, time::SystemTime}; +use tracing::Level; fsm! { - name WorkflowTaskMachine; command WorkflowTaskCommand; error WorkflowTaskMachineError; + pub(super) name WorkflowTaskMachine; + command WFTaskMachineCommand; + error WFMachinesError; + shared_state SharedState; - Created --(WorkflowTaskScheduled, on_workflow_task_scheduled) --> Scheduled; + Created --(WorkflowTaskScheduled) --> Scheduled; - Scheduled --(WorkflowTaskStarted, on_workflow_task_started) --> Started; - Scheduled --(WorkflowTaskTimedOut, on_workflow_task_timed_out) --> TimedOut; + Scheduled --(WorkflowTaskStarted(WFTStartedDat), shared on_workflow_task_started) --> Started; + Scheduled --(WorkflowTaskTimedOut) --> TimedOut; Started --(WorkflowTaskCompleted, on_workflow_task_completed) --> Completed; Started --(WorkflowTaskFailed, on_workflow_task_failed) --> Failed; - Started --(WorkflowTaskTimedOut, on_workflow_task_timed_out) --> TimedOut; + Started --(WorkflowTaskTimedOut) --> TimedOut; } -#[derive(thiserror::Error, Debug)] -pub enum WorkflowTaskMachineError {} +impl WorkflowTaskMachine { + pub(super) fn new(wf_task_started_event_id: i64) -> Self { + Self { + state: Created {}.into(), + shared_state: SharedState { + wf_task_started_event_id, + }, + } + } +} -pub enum WorkflowTaskCommand {} +#[derive(Debug)] +pub(super) enum WFTaskMachineCommand { + /// Issued to (possibly) trigger the event loop + WFTaskStartedTrigger { + task_started_event_id: i64, + time: SystemTime, + }, +} -#[derive(Default, Clone)] -pub struct Completed {} +impl WFMachinesAdapter for WorkflowTaskMachine { + fn adapt_response( + &self, + wf_machines: &mut WorkflowMachines, + event: &HistoryEvent, + has_next_event: bool, + my_command: WFTaskMachineCommand, + ) -> Result<(), WFMachinesError> { + match my_command { + WFTaskMachineCommand::WFTaskStartedTrigger { + task_started_event_id, + time, + } => { + let event_type = EventType::from_i32(event.event_type) + .ok_or_else(|| WFMachinesError::UnexpectedEvent(event.clone()))?; + let cur_event_past_or_at_start = event.event_id >= task_started_event_id; + if event_type == EventType::WorkflowTaskStarted + && (!cur_event_past_or_at_start || has_next_event) + { + // Last event in history is a task started event, so we don't + // want to iterate. + return Ok(()); + } + wf_machines.task_started(task_started_event_id, time)?; + } + } + Ok(()) + } +} -#[derive(Default, Clone)] -pub struct Created {} +impl TryFrom for WorkflowTaskMachineEvents { + type Error = WFMachinesError; -impl Created { - pub fn on_workflow_task_scheduled(self) -> WorkflowTaskMachineTransition { - unimplemented!() + fn try_from(e: HistoryEvent) -> Result { + Ok(match EventType::from_i32(e.event_type) { + Some(EventType::WorkflowTaskScheduled) => Self::WorkflowTaskScheduled, + Some(EventType::WorkflowTaskStarted) => Self::WorkflowTaskStarted(WFTStartedDat { + started_event_id: e.event_id, + current_time_millis: e.event_time.clone().map(|ts| ts.into()).ok_or_else(|| { + WFMachinesError::MalformedEvent( + e, + "Workflow task started event must contain timestamp".to_string(), + ) + })?, + }), + Some(EventType::WorkflowTaskTimedOut) => Self::WorkflowTaskTimedOut, + Some(EventType::WorkflowTaskCompleted) => Self::WorkflowTaskCompleted, + Some(EventType::WorkflowTaskFailed) => Self::WorkflowTaskFailed, + _ => return Err(WFMachinesError::UnexpectedEvent(e)), + }) } } +impl TryFrom for WorkflowTaskMachineEvents { + type Error = (); + + fn try_from(_: CommandType) -> Result { + Err(()) + } +} + +#[derive(Debug, Clone)] +pub(super) struct SharedState { + wf_task_started_event_id: i64, +} + +#[derive(Default, Clone)] +pub(super) struct Completed {} + #[derive(Default, Clone)] -pub struct Failed {} +pub(super) struct Created {} #[derive(Default, Clone)] -pub struct Scheduled {} +pub(super) struct Failed {} +#[derive(Default, Clone)] +pub(super) struct Scheduled {} + +pub(super) struct WFTStartedDat { + current_time_millis: SystemTime, + started_event_id: i64, +} impl Scheduled { - pub fn on_workflow_task_started(self) -> WorkflowTaskMachineTransition { - unimplemented!() + pub(super) fn on_workflow_task_started( + self, + shared: SharedState, + WFTStartedDat { + current_time_millis, + started_event_id, + }: WFTStartedDat, + ) -> WorkflowTaskMachineTransition { + WorkflowTaskMachineTransition::ok( + vec![WFTaskMachineCommand::WFTaskStartedTrigger { + task_started_event_id: shared.wf_task_started_event_id, + time: current_time_millis, + }], + Started { + current_time_millis, + started_event_id, + }, + ) } - pub fn on_workflow_task_timed_out(self) -> WorkflowTaskMachineTransition { - unimplemented!() +} + +impl From for Scheduled { + fn from(_: Created) -> Self { + Self::default() } } -#[derive(Default, Clone)] -pub struct Started {} +#[derive(Clone)] +pub(super) struct Started { + /// Started event's timestamp + current_time_millis: SystemTime, + /// Started event's id + started_event_id: i64, +} impl Started { - pub fn on_workflow_task_completed(self) -> WorkflowTaskMachineTransition { - unimplemented!() - } - pub fn on_workflow_task_failed(self) -> WorkflowTaskMachineTransition { - unimplemented!() + pub(super) fn on_workflow_task_completed(self) -> WorkflowTaskMachineTransition { + WorkflowTaskMachineTransition::commands::<_, Completed>(vec![ + WFTaskMachineCommand::WFTaskStartedTrigger { + task_started_event_id: self.started_event_id, + time: self.current_time_millis, + }, + ]) } - pub fn on_workflow_task_timed_out(self) -> WorkflowTaskMachineTransition { + pub(super) fn on_workflow_task_failed(self) -> WorkflowTaskMachineTransition { unimplemented!() } } #[derive(Default, Clone)] -pub struct TimedOut {} +pub(super) struct TimedOut {} +impl From for TimedOut { + fn from(_: Scheduled) -> Self { + Self::default() + } +} +impl From for TimedOut { + fn from(_: Started) -> Self { + Self::default() + } +} diff --git a/src/protos/mod.rs b/src/protos/mod.rs index d52a71031..0d7816532 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -36,39 +36,144 @@ pub mod temporal { } pub mod history { pub mod v1 { + use crate::protos::temporal::api::{ + enums::v1::EventType, history::v1::history_event::Attributes, + }; + use prost::alloc::fmt::Formatter; + use std::fmt::Display; + include!("temporal.api.history.v1.rs"); + + impl HistoryEvent { + /// Returns true if this is an event created to mirror a command + pub fn is_command_event(&self) -> bool { + if let Some(et) = EventType::from_i32(self.event_type) { + match et { + EventType::ActivityTaskScheduled + | EventType::ActivityTaskCancelRequested + | EventType::MarkerRecorded + | EventType::RequestCancelExternalWorkflowExecutionInitiated + | EventType::SignalExternalWorkflowExecutionInitiated + | EventType::StartChildWorkflowExecutionInitiated + | EventType::TimerCanceled + | EventType::TimerStarted + | EventType::UpsertWorkflowSearchAttributes + | EventType::WorkflowExecutionCanceled + | EventType::WorkflowExecutionCompleted + | EventType::WorkflowExecutionContinuedAsNew + | EventType::WorkflowExecutionFailed => true, + _ => false, + } + } else { + debug!( + "Could not determine type of event with enum index {}", + self.event_type + ); + false + } + } + + /// Returns the command's initiating event id, if present. This is the id of the + /// event which "started" the command. Usually, the "scheduled" event for the + /// command. + pub fn get_initial_command_event_id(&self) -> Option { + self.attributes.as_ref().and_then(|a| { + // Fun! Not really any way to make this better w/o incompatibly changing + // protos. + match a { + Attributes::ActivityTaskStartedEventAttributes(a) => + Some(a.scheduled_event_id), + Attributes::ActivityTaskCompletedEventAttributes(a) => + Some(a.scheduled_event_id), + Attributes::ActivityTaskFailedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::ActivityTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::ActivityTaskCancelRequestedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::ActivityTaskCanceledEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::TimerFiredEventAttributes(a) => Some(a.started_event_id), + Attributes::TimerCanceledEventAttributes(a) => Some(a.started_event_id), + Attributes::RequestCancelExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ExternalWorkflowExecutionCancelRequestedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::StartChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ChildWorkflowExecutionStartedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ChildWorkflowExecutionCompletedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ChildWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ChildWorkflowExecutionCanceledEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ChildWorkflowExecutionTimedOutEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ChildWorkflowExecutionTerminatedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::SignalExternalWorkflowExecutionFailedEventAttributes(a) => Some(a.initiated_event_id), + Attributes::ExternalWorkflowExecutionSignaledEventAttributes(a) => Some(a.initiated_event_id), + Attributes::WorkflowTaskStartedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::WorkflowTaskCompletedEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::WorkflowTaskTimedOutEventAttributes(a) => Some(a.scheduled_event_id), + Attributes::WorkflowTaskFailedEventAttributes(a) => Some(a.scheduled_event_id), + _ => None + } + }) + } + + /// Returns true if the event is one which would end a workflow + pub fn is_final_wf_execution_event(&self) -> bool { + match EventType::from_i32(self.event_type) { + Some(EventType::WorkflowExecutionCompleted) => true, + Some(EventType::WorkflowExecutionCanceled) => true, + Some(EventType::WorkflowExecutionFailed) => true, + Some(EventType::WorkflowExecutionTimedOut) => true, + Some(EventType::WorkflowExecutionContinuedAsNew) => true, + Some(EventType::WorkflowExecutionTerminated) => true, + _ => false, + } + } + } + + impl Display for HistoryEvent { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "HistoryEvent(id: {}, {:?})", + self.event_id, + EventType::from_i32(self.event_type) + ) + } + } } } + pub mod namespace { pub mod v1 { include!("temporal.api.namespace.v1.rs"); } } + pub mod query { pub mod v1 { include!("temporal.api.query.v1.rs"); } } + pub mod replication { pub mod v1 { include!("temporal.api.replication.v1.rs"); } } + pub mod taskqueue { pub mod v1 { include!("temporal.api.taskqueue.v1.rs"); } } + pub mod version { pub mod v1 { include!("temporal.api.version.v1.rs"); } } + pub mod workflow { pub mod v1 { include!("temporal.api.workflow.v1.rs"); } } + pub mod workflowservice { pub mod v1 { include!("temporal.api.workflowservice.v1.rs");