From 476a6f27bfeeee622240a3afacd49b7032a1405f Mon Sep 17 00:00:00 2001 From: Vitaly Date: Tue, 23 Mar 2021 22:17:45 -0700 Subject: [PATCH 1/3] Adding state machine tests for activity --- src/machines/activity_state_machine.rs | 76 +++++++++++++++++++ .../test_help/async_workflow_driver.rs | 30 +++++++- 2 files changed, 104 insertions(+), 2 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 1bc7eb76b..db27e2126 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -368,3 +368,79 @@ pub(super) struct TimedOut {} #[derive(Default, Clone)] pub(super) struct Canceled {} + +#[cfg(test)] +mod test { + use super::*; + use crate::{ + machines::{ + test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}, + workflow_machines::WorkflowMachines, + }, + protos::coresdk::workflow_commands::CompleteWorkflowExecution, + test_help::canned_histories, + }; + use rstest::{fixture, rstest}; + use std::time::Duration; + + #[fixture] + fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { + let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { + let activity = ScheduleActivity { + activity_id: "activity-id-1".to_string(), + ..Default::default() + }; + command_sink.activity(activity).await; + + let complete = CompleteWorkflowExecution::default(); + command_sink.send(complete.into()); + }); + + let t = canned_histories::single_activity("activity-id-1"); + let state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); + + assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); + (t, state_machines) + } + + #[rstest] + fn activity_happy_path_inc(activity_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + let (t, mut state_machines) = activity_happy_hist; + + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) + .unwrap(); + state_machines.get_wf_activation(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::ScheduleActivityTask as i32 + ); + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .unwrap(); + state_machines.get_wf_activation(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); + } + + #[rstest] + fn activity_happy_path_full(activity_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + let (t, mut state_machines) = activity_happy_hist; + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); + } +} diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 6ad4cfd37..ecc936802 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,3 +1,6 @@ +use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; +use crate::protos::coresdk::workflow_activation::ResolveActivity; +use crate::protos::coresdk::workflow_commands::ScheduleActivity; use crate::{ machines::{workflow_machines::CommandID, WFCommand}, protos::coresdk::{ @@ -21,6 +24,7 @@ use tokio::{ runtime::Runtime, task::{JoinError, JoinHandle}, }; +use CommandID::Activity; use CommandID::Timer; pub struct TestWorkflowDriver { @@ -138,6 +142,19 @@ impl CommandSender { } } + /// Request to run an activity + pub fn activity(&mut self, a: ScheduleActivity) -> impl Future { + let aid = a.activity_id.clone(); + let c = WFCommand::AddActivity(a); + self.send(c); + let rx = self.twd_cache.add_sent_cmd(Activity(aid.clone())); + let cache_clone = self.twd_cache.clone(); + async move { + cache_clone.set_cmd_blocked(Activity(aid)); + rx.await + } + } + /// Cancel a timer pub fn cancel_timer(&self, timer_id: &str) { let c = WFCommand::CancelTimer(CancelTimer { @@ -239,8 +256,17 @@ impl WorkflowFetcher for TestWorkflowDriver { impl ActivationListener for TestWorkflowDriver { fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { - if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { - self.cache.unblock(Timer(timer_id.to_owned())); + match activation { + Variant::FireTimer(FireTimer { timer_id }) => { + self.cache.unblock(Timer(timer_id.to_owned())); + } + Variant::ResolveActivity(ResolveActivity { + activity_id, + result: _result, + }) => { + self.cache.unblock(Activity(activity_id.to_owned())); + } + _ => {} } } } From a4bab83cfb38cb764f54585e59b08bd87de18185 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 24 Mar 2021 09:52:05 -0700 Subject: [PATCH 2/3] Update command --- src/machines/workflow_machines.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index ca29d88f2..dbb99448f 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -58,8 +58,7 @@ pub(crate) struct WorkflowMachines { /// for that machine. machines_by_event_id: HashMap, - /// Maps timer ids as created by workflow authors to their associated machines - /// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum. + /// Maps command ids as created by workflow authors to their associated machines. id_to_machine: HashMap, /// Queued commands which have been produced by machines and await processing / being sent to From f02dd02060e9f608ea990277d8df8e2821e9f8c9 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 24 Mar 2021 11:27:50 -0700 Subject: [PATCH 3/3] factor out method --- .../test_help/async_workflow_driver.rs | 20 +++++++------------ src/machines/workflow_machines.rs | 2 +- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index ecc936802..7f7784ed6 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -131,26 +131,20 @@ impl CommandSender { /// Request to create a timer pub fn timer(&mut self, a: StartTimer) -> impl Future { - let tid = a.timer_id.clone(); - let c = WFCommand::AddTimer(a); - self.send(c); - let rx = self.twd_cache.add_sent_cmd(Timer(tid.clone())); - let cache_clone = self.twd_cache.clone(); - async move { - cache_clone.set_cmd_blocked(Timer(tid)); - rx.await - } + self.send_blocking_cmd(Timer(a.timer_id.clone()), WFCommand::AddTimer(a)) } /// Request to run an activity pub fn activity(&mut self, a: ScheduleActivity) -> impl Future { - let aid = a.activity_id.clone(); - let c = WFCommand::AddActivity(a); + self.send_blocking_cmd(Activity(a.activity_id.clone()), WFCommand::AddActivity(a)) + } + + fn send_blocking_cmd(&mut self, id: CommandID, c: WFCommand) -> impl Future { self.send(c); - let rx = self.twd_cache.add_sent_cmd(Activity(aid.clone())); + let rx = self.twd_cache.add_sent_cmd(id.clone()); let cache_clone = self.twd_cache.clone(); async move { - cache_clone.set_cmd_blocked(Activity(aid)); + cache_clone.set_cmd_blocked(id); rx.await } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index dbb99448f..05344181f 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -75,7 +75,7 @@ pub(crate) struct WorkflowMachines { drive_me: DrivenWorkflow, } -#[derive(Debug, PartialEq, Eq, Hash)] +#[derive(Debug, PartialEq, Eq, Hash, Clone)] pub enum CommandID { Timer(String), Activity(String),