diff --git a/src/lib.rs b/src/lib.rs index a4834ec71..1fcf135b5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,9 +11,11 @@ mod protosext; pub use protosext::HistoryInfo; -use crate::pollers::ServerGatewayOptions; use crate::{ - machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, + machines::{ + ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, + }, + pollers::ServerGatewayOptions, protos::{ coresdk::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, @@ -271,6 +273,9 @@ impl DrivenWorkflow for WorkflowBridge { } } +// Real bridge doesn't actually need to listen +impl ActivationListener for WorkflowBridge {} + #[derive(thiserror::Error, Debug)] #[allow(clippy::large_enum_variant)] pub enum CoreError { @@ -335,7 +340,6 @@ mod test { history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { started_event_id: timer_started_event_id, timer_id: "timer1".to_string(), - ..Default::default() }), ); t.add_workflow_task_scheduled_and_started(); @@ -368,7 +372,7 @@ mod test { history: Some(History { events: events_second_batch, }), - workflow_execution: wf.clone(), + workflow_execution: wf, ..Default::default() }; let responses = vec![first_response, second_response]; diff --git a/src/machines/complete_workflow_state_machine.rs b/src/machines/complete_workflow_state_machine.rs index e6f50f51b..522eeee5f 100644 --- a/src/machines/complete_workflow_state_machine.rs +++ b/src/machines/complete_workflow_state_machine.rs @@ -1,3 +1,4 @@ +use crate::machines::workflow_machines::WorkflowTrigger; use crate::{ machines::{ workflow_machines::WorkflowMachines, AddCommand, CancellableCommand, WFCommand, @@ -123,11 +124,10 @@ impl From for CompleteWorkflowCommandRecorded { impl WFMachinesAdapter for CompleteWorkflowMachine { fn adapt_response( &self, - _wf_machines: &mut WorkflowMachines, _event: &HistoryEvent, _has_next_event: bool, _my_command: CompleteWFCommand, - ) -> Result<(), WFMachinesError> { - Ok(()) + ) -> Result, WFMachinesError> { + Ok(vec![]) } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 7d78ca170..48d2ef3e7 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -37,17 +37,22 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::coresdk; -use crate::protos::coresdk::command::Variant; -use crate::protos::temporal::api::command::v1::command::Attributes; -use crate::protos::temporal::api::{ - command::v1::{ - Command, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, - }, - enums::v1::CommandType, - history::v1::{ - HistoryEvent, WorkflowExecutionCanceledEventAttributes, - WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, +use crate::{ + machines::workflow_machines::WorkflowTrigger, + protos::coresdk::wf_activation, + protos::{ + coresdk::{self, command::Variant}, + temporal::api::{ + command::v1::{ + command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes, + StartTimerCommandAttributes, + }, + enums::v1::CommandType, + history::v1::{ + HistoryEvent, WorkflowExecutionCanceledEventAttributes, + WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, + }, + }, }, }; use prost::alloc::fmt::Formatter; @@ -57,16 +62,14 @@ use std::{ convert::{TryFrom, TryInto}, fmt::Debug, rc::Rc, - sync::{atomic::AtomicBool, Arc}, }; use tracing::Level; -// TODO: May need to be our SDKWFCommand type -pub(crate) type MachineCommand = Command; +pub(crate) type ProtoCommand = Command; /// Implementors of this trait represent something that can (eventually) call into a workflow to /// drive it, start it, signal it, cancel it, etc. -pub(crate) trait DrivenWorkflow: Send { +pub(crate) trait DrivenWorkflow: ActivationListener + Send { /// Start the workflow fn start( &mut self, @@ -90,6 +93,12 @@ pub(crate) trait DrivenWorkflow: Send { ) -> Result<(), anyhow::Error>; } +/// Allows observers to listen to newly generated outgoing activations. Used for testing, where +/// some activations must be handled before outgoing commands are issued to avoid deadlocking. +pub(crate) trait ActivationListener { + fn on_activation(&mut self, _activation: &wf_activation::Attributes) {} +} + /// The struct for [WFCommand::AddCommand] #[derive(Debug, derive_more::From)] pub(crate) struct AddCommand { @@ -103,7 +112,7 @@ pub(crate) struct AddCommand { pub enum WFCommand { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, - AddTimer(StartTimerCommandAttributes, Arc), + AddTimer(StartTimerCommandAttributes), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), } @@ -122,9 +131,7 @@ impl TryFrom for WFCommand { attributes: Some(attrs), .. }) => match attrs { - Attributes::StartTimerCommandAttributes(s) => { - Ok(WFCommand::AddTimer(s, Arc::new(AtomicBool::new(false)))) - } + Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { Ok(WFCommand::CompleteWorkflow(c)) } @@ -144,16 +151,13 @@ trait TemporalStateMachine: CheckStateMachineInFinal { fn name(&self) -> &str; fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError>; - /// Tell the state machine to handle some event. The [WorkflowMachines] instance calling this - /// function passes a reference to itself in so that the [WFMachinesAdapter] trait can use - /// the machines' specific type information while also manipulating [WorkflowMachines] + /// Tell the state machine to handle some event. Returns a list of triggers that can be used + /// to update the overall state of the workflow. EX: To issue outgoing WF activations. fn handle_event( &mut self, event: &HistoryEvent, has_next_event: bool, - // TODO: Don't pass this all in, rather use responses. - wf_machines: &mut WorkflowMachines, - ) -> Result<(), WFMachinesError>; + ) -> Result, WFMachinesError>; } impl TemporalStateMachine for SM @@ -192,8 +196,7 @@ where &mut self, event: &HistoryEvent, has_next_event: bool, - wf_machines: &mut WorkflowMachines, - ) -> Result<(), WFMachinesError> { + ) -> Result, WFMachinesError> { event!( Level::DEBUG, msg = "handling event", @@ -204,10 +207,11 @@ where match self.on_event_mut(converted_event) { Ok(c) => { event!(Level::DEBUG, msg = "Machine produced commands", ?c); + let mut triggers = vec![]; for cmd in c { - self.adapt_response(wf_machines, event, has_next_event, cmd)?; + triggers.extend(self.adapt_response(event, has_next_event, cmd)?); } - Ok(()) + Ok(triggers) } Err(MachineError::InvalidTransition) => { Err(WFMachinesError::UnexpectedEvent(event.clone())) @@ -238,17 +242,16 @@ where /// This trait exists to bridge [StateMachine]s and the [WorkflowMachines] instance. It has access /// to the machine's concrete types while hiding those details from [WorkflowMachines] -pub(super) trait WFMachinesAdapter: StateMachine { - /// Given a reference to [WorkflowMachines], the event being processed, and a command that this - /// [StateMachine] instance just produced, perform any handling that needs access to the - /// [WorkflowMachines] instance in response to that command. +trait WFMachinesAdapter: StateMachine { + /// Given a the event being processed, and a command that this [StateMachine] instance just + /// produced, perform any handling that needs inform the [WorkflowMachines] instance of some + /// action to be taken in response to that command. fn adapt_response( &self, - _wf_machines: &mut WorkflowMachines, - _event: &HistoryEvent, - _has_next_event: bool, - _my_command: Self::Command, - ) -> Result<(), WFMachinesError>; + event: &HistoryEvent, + has_next_event: bool, + my_command: Self::Command, + ) -> Result, WFMachinesError>; } /// A command which can be cancelled, associated with some state machine that produced it @@ -260,7 +263,7 @@ enum CancellableCommand { Cancelled, Active { /// The inner protobuf command, if None, command has been cancelled - command: MachineCommand, + command: ProtoCommand, machine: Rc>, }, } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 523ac41a6..fcb319276 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,6 +1,6 @@ use super::Result; use crate::{ - machines::{workflow_machines::WorkflowMachines, MachineCommand}, + machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ enums::v1::EventType, history::v1::{ @@ -35,7 +35,7 @@ impl TestHistoryBuilder { pub fn add_by_type(&mut self, event_type: EventType) { let attribs = default_attribs(event_type).expect("Couldn't make default attributes in test builder"); - self.build_and_push_event(event_type.clone(), attribs); + self.build_and_push_event(event_type, attribs); } /// Adds an event, returning the ID that was assigned to it @@ -133,7 +133,7 @@ impl TestHistoryBuilder { &self, wf_machines: &mut WorkflowMachines, to_wf_task_num: Option, - ) -> Result> { + ) -> Result> { self.handle_workflow_task(wf_machines, to_wf_task_num)?; Ok(wf_machines.get_commands()) } diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index aa89b8562..d6dd8fe05 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,45 +1,42 @@ use super::Result; use crate::{ - machines::{DrivenWorkflow, WFCommand}, - protos::temporal::api::{ - command::v1::StartTimerCommandAttributes, - history::v1::{ - WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, - WorkflowExecutionStartedEventAttributes, + machines::{ActivationListener, DrivenWorkflow, WFCommand}, + protos::{ + coresdk::{wf_activation::Attributes, UnblockTimerTaskAttributes}, + temporal::api::{ + command::v1::StartTimerCommandAttributes, + history::v1::{ + WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, + WorkflowExecutionStartedEventAttributes, + }, }, }, }; +use dashmap::DashMap; use futures::Future; -use std::{ - collections::{hash_map, HashMap}, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{self, Receiver, Sender}, - Arc, RwLock, - }, +use std::sync::{ + mpsc::{self, Receiver, Sender}, + Arc, }; use tracing::Level; -type TimerMap = HashMap>; - /// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control /// over when commands are returned than a normal workflow would. /// /// It replaces "TestEnitityTestListenerBase" in java which is pretty hard to follow. -#[derive(Debug)] pub(in crate::machines) struct TestWorkflowDriver { wf_function: F, + cache: Arc, +} - /// Holds status for timers so we don't recreate them by accident. Key is timer id, value is - /// true if it is complete. - /// - /// I don't love that this is pretty duplicative of workflow machines, nor the nasty sync - /// involved. Would be good to eliminate. +#[derive(Default, Debug)] +pub(in crate::machines) struct TestWfDriverCache { + /// Holds a mapping of timer id -> is finished /// /// It can and should be eliminated by not recreating CommandSender on every iteration, which /// means keeping the workflow suspended in a future somewhere. I tried this and it was hard, /// but ultimately it's how real workflows will need to work. - timers: Arc>, + unblocked_timers: DashMap, } impl TestWorkflowDriver @@ -55,7 +52,18 @@ where pub(in crate::machines) fn new(workflow_fn: F) -> Self { Self { wf_function: workflow_fn, - timers: Arc::new(RwLock::new(HashMap::default())), + cache: Default::default(), + } + } +} + +impl ActivationListener for TestWorkflowDriver { + fn on_activation(&mut self, activation: &Attributes) { + if let Attributes::UnblockTimer(UnblockTimerTaskAttributes { timer_id }) = activation { + Arc::get_mut(&mut self.cache) + .unwrap() + .unblocked_timers + .insert(timer_id.clone(), true); } } } @@ -75,7 +83,7 @@ where #[instrument(skip(self))] fn iterate_wf(&mut self) -> Result, anyhow::Error> { - let (sender, receiver) = CommandSender::new(self.timers.clone()); + let (sender, receiver) = CommandSender::new(self.cache.clone()); // Call the closure that produces the workflow future let wf_future = (self.wf_function)(sender); @@ -131,26 +139,23 @@ pub enum TestWFCommand { pub struct CommandSender { chan: Sender, - timer_map: Arc>, + twd_cache: Arc, } -impl<'a> CommandSender { - fn new(timer_map: Arc>) -> (Self, Receiver) { +impl CommandSender { + fn new(twd_cache: Arc) -> (Self, Receiver) { let (chan, rx) = mpsc::channel(); - (Self { chan, timer_map }, rx) + (Self { chan, twd_cache }, rx) } /// Request to create a timer, returning future which resolves when the timer completes pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future + '_ { - let finished = match self.timer_map.write().unwrap().entry(a.timer_id.clone()) { - hash_map::Entry::Occupied(existing) => existing.get().load(Ordering::SeqCst), - hash_map::Entry::Vacant(v) => { - let atomic = Arc::new(AtomicBool::new(false)); - - let c = WFCommand::AddTimer(a, atomic.clone()); - // In theory we should send this in both branches + let finished = match self.twd_cache.unblocked_timers.entry(a.timer_id.clone()) { + dashmap::mapref::entry::Entry::Occupied(existing) => *existing.get(), + dashmap::mapref::entry::Entry::Vacant(v) => { + let c = WFCommand::AddTimer(a); self.chan.send(c.into()).unwrap(); - v.insert(atomic); + v.insert(false); false } }; diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 7afff44d3..e29006fb1 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -1,5 +1,6 @@ #![allow(clippy::large_enum_variant)] +use crate::machines::workflow_machines::WorkflowTrigger; use crate::protos::coresdk::{wf_activation, UnblockTimerTaskAttributes, WfActivation}; use crate::{ machines::{ @@ -217,33 +218,20 @@ impl StartCommandRecorded { impl WFMachinesAdapter for TimerMachine { fn adapt_response( &self, - wf_machines: &mut WorkflowMachines, _event: &HistoryEvent, _has_next_event: bool, my_command: TimerMachineCommand, - ) -> Result<(), WFMachinesError> { + ) -> Result, WFMachinesError> { match my_command { + // Fire the completion + TimerMachineCommand::Complete(_event) => Ok(vec![UnblockTimerTaskAttributes { + timer_id: self.shared_state.timer_attributes.timer_id.clone(), + } + .into()]), TimerMachineCommand::AddCommand(_) => { unreachable!() } - // Fire the completion - TimerMachineCommand::Complete(_event) => { - // TODO: Remove atomic bool nonsense -- kept for now to keep test here passing - if let Some(a) = wf_machines - .timer_notifiers - .remove(&self.shared_state.timer_attributes.timer_id) - { - a.store(true, Ordering::SeqCst) - }; - wf_machines.outgoing_wf_actications.push_back( - UnblockTimerTaskAttributes { - timer_id: self.shared_state.timer_attributes.timer_id.clone(), - } - .into(), - ); - } } - Ok(()) } } @@ -267,6 +255,7 @@ mod test { }; use futures::{channel::mpsc::Sender, FutureExt, SinkExt}; use rstest::{fixture, rstest}; + use std::sync::Arc; use std::{error::Error, time::Duration}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -295,7 +284,6 @@ mod test { let timer = StartTimerCommandAttributes { timer_id: "Sometimer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), - ..Default::default() }; command_sink.timer(timer).await; @@ -315,7 +303,6 @@ mod test { history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { started_event_id: timer_started_event_id, timer_id: "timer1".to_string(), - ..Default::default() }), ); t.add_workflow_task_scheduled_and_started(); @@ -334,11 +321,13 @@ mod test { .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) .unwrap(); dbg!(&commands); + dbg!(state_machines.get_wf_activation()); assert_eq!(commands.len(), 1); assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); + dbg!(state_machines.get_wf_activation()); assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 20a3522b7..9f3d4cad5 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,8 +1,9 @@ +use crate::machines::ActivationListener; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, workflow_task_state_machine::WorkflowTaskMachine, CancellableCommand, DrivenWorkflow, - MachineCommand, TemporalStateMachine, WFCommand, + ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}, @@ -16,6 +17,8 @@ use crate::{ }; use futures::Future; use rustfsm::StateMachine; +use std::collections::HashSet; +use std::ops::DerefMut; use std::{ borrow::BorrowMut, cell::RefCell, @@ -60,13 +63,21 @@ pub(crate) struct WorkflowMachines { /// iterating over already added commands. current_wf_task_commands: VecDeque, /// Outgoing activations that need to be sent to the lang sdk - pub(super) outgoing_wf_actications: VecDeque, + outgoing_wf_activations: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, +} - /// Holds atomics for completing timers. Key is the ID of the timer. - pub(super) timer_notifiers: HashMap>, +/// Returned by [TemporalStateMachine]s when handling events +#[derive(Debug, derive_more::From)] +#[must_use] +pub(super) enum WorkflowTrigger { + PushWFActivation(#[from(forward)] wf_activation::Attributes), + TriggerWFTaskStarted { + task_started_event_id: i64, + time: SystemTime, + }, } #[derive(thiserror::Error, Debug)] @@ -103,8 +114,7 @@ impl WorkflowMachines { machines_by_id: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), - timer_notifiers: Default::default(), - outgoing_wf_actications: Default::default(), + outgoing_wf_activations: Default::default(), } } @@ -112,15 +122,8 @@ impl WorkflowMachines { /// is sent `true` when the timer completes. /// /// Returns the command and a future that will resolve when the timer completes - pub(super) fn new_timer( - &mut self, - attribs: StartTimerCommandAttributes, - completion_flag: Arc, - ) -> CancellableCommand { - let timer_id = attribs.timer_id.clone(); - let timer = new_timer(attribs); - self.timer_notifiers.insert(timer_id, completion_flag); - timer + pub(super) fn new_timer(&mut self, attribs: StartTimerCommandAttributes) -> CancellableCommand { + new_timer(attribs) } /// Returns the id of the last seen WorkflowTaskStarted event @@ -156,9 +159,10 @@ impl WorkflowMachines { match event.get_initial_command_event_id() { Some(initial_cmd_id) => { if let Some(sm) = self.machines_by_id.get(&initial_cmd_id) { - (*sm.clone()) - .borrow_mut() - .handle_event(event, has_next_event, self)?; + // TODO: Don't like the rc stuff + let fsm_rc = sm.clone(); + let fsm_ref = (*fsm_rc).borrow_mut(); + self.submachine_handle_event(fsm_ref, event, has_next_event)?; } else { event!( Level::ERROR, @@ -245,16 +249,12 @@ impl WorkflowMachines { // Feed the machine the event let mut break_later = false; if let CancellableCommand::Active { mut machine, .. } = command.clone() { - let out_commands = (*machine).borrow_mut().handle_event(event, true, self)?; + self.submachine_handle_event((*machine).borrow_mut(), event, true)?; + // TODO: Handle invalid event errors // * More special handling for version machine - see java // * Command/machine supposed to have cancelled itself - event!( - Level::DEBUG, - msg = "Machine produced commands", - ?out_commands - ); break_later = true; } @@ -287,7 +287,7 @@ impl WorkflowMachines { { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow - self.outgoing_wf_actications.push_back( + self.outgoing_wf_activations.push_back( StartWorkflowTaskAttributes { // TODO: This needs to be set during init workflow_type: "".to_string(), @@ -308,7 +308,11 @@ impl WorkflowMachines { } Some(EventType::WorkflowTaskScheduled) => { let mut wf_task_sm = WorkflowTaskMachine::new(self.workflow_task_started_event_id); - wf_task_sm.handle_event(event, has_next_event, self)?; + self.submachine_handle_event( + &mut wf_task_sm as &mut dyn TemporalStateMachine, + event, + has_next_event, + )?; self.machines_by_id .insert(event.event_id, Rc::new(RefCell::new(wf_task_sm))); } @@ -324,7 +328,7 @@ impl WorkflowMachines { } /// Fetches commands ready for processing from the state machines - pub(crate) fn get_commands(&mut self) -> Vec { + pub(crate) fn get_commands(&mut self) -> Vec { self.commands .iter() .filter_map(|c| { @@ -340,7 +344,7 @@ impl WorkflowMachines { /// Returns the next activation that needs to be performed by the lang sdk. Things like unblock /// timer, etc. pub(crate) fn get_wf_activation(&mut self) -> Option { - self.outgoing_wf_actications + self.outgoing_wf_activations .pop_front() .map(|attrs| WfActivation { // todo wat ? @@ -378,12 +382,39 @@ impl WorkflowMachines { Ok(()) } + /// Wrapper for calling [TemporalStateMachine::handle_event] which appropriately takes action + /// on the returned triggers + fn submachine_handle_event>( + &mut self, + mut sm: TSM, + event: &HistoryEvent, + has_next_event: bool, + ) -> Result<()> { + let triggers = sm.handle_event(event, has_next_event)?; + event!(Level::DEBUG, msg = "Machine produced triggers", ?triggers); + for trigger in triggers { + match trigger { + WorkflowTrigger::PushWFActivation(a) => { + self.drive_me.on_activation(&a); + self.outgoing_wf_activations.push_back(a); + } + WorkflowTrigger::TriggerWFTaskStarted { + task_started_event_id, + time, + } => { + self.task_started(task_started_event_id, time)?; + } + } + } + Ok(()) + } + fn handle_driven_results(&mut self, results: Vec) { for cmd in results { // I don't love how boilerplatey this is match cmd { - WFCommand::AddTimer(attrs, completion_flag) => { - let timer = self.new_timer(attrs, completion_flag); + WFCommand::AddTimer(attrs) => { + let timer = self.new_timer(attrs); self.current_wf_task_commands.push_back(timer); } WFCommand::CompleteWorkflow(attrs) => { diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 333a74796..5a2ea6698 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -1,5 +1,6 @@ #![allow(clippy::enum_variant_names)] +use crate::machines::workflow_machines::WorkflowTrigger; use crate::{ machines::{ workflow_machines::{WFMachinesError, WorkflowMachines}, @@ -53,11 +54,10 @@ pub(super) enum WFTaskMachineCommand { impl WFMachinesAdapter for WorkflowTaskMachine { fn adapt_response( &self, - wf_machines: &mut WorkflowMachines, event: &HistoryEvent, has_next_event: bool, my_command: WFTaskMachineCommand, - ) -> Result<(), WFMachinesError> { + ) -> Result, WFMachinesError> { match my_command { WFTaskMachineCommand::WFTaskStartedTrigger { task_started_event_id, @@ -71,12 +71,14 @@ impl WFMachinesAdapter for WorkflowTaskMachine { { // Last event in history is a task started event, so we don't // want to iterate. - return Ok(()); + return Ok(vec![]); } - wf_machines.task_started(task_started_event_id, time)?; + Ok(vec![WorkflowTrigger::TriggerWFTaskStarted { + task_started_event_id, + time, + }]) } } - Ok(()) } } diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs index 9db67a641..23524fb55 100644 --- a/src/protosext/history_info.rs +++ b/src/protosext/history_info.rs @@ -183,7 +183,6 @@ mod tests { history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { started_event_id: timer_started_event_id, timer_id: "timer1".to_string(), - ..Default::default() }), ); t.add_workflow_task_scheduled_and_started();