diff --git a/.gitignore b/.gitignore index e8609ab8c..07b4d6d81 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ Cargo.lock # Ignore generated protobuf files src/protos/*.rs !src/protos/mod.rs +/tarpaulin-report.html diff --git a/Cargo.toml b/Cargo.toml index da3a7d3cd..5de8931a1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,14 +18,16 @@ displaydoc = "0.1" env_logger = "0.8" futures = "0.3" log = "0.4" -opentelemetry-jaeger = "0.10" -opentelemetry = "0.11.2" +once_cell = "1.5" +opentelemetry-jaeger = "0.11" +opentelemetry = "0.12" prost = "0.7" prost-types = "0.7" +slotmap = "1.0" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" rand = "0.8.3" diff --git a/fsm/Cargo.toml b/fsm/Cargo.toml index 0431c4a7a..8ab58c673 100644 --- a/fsm/Cargo.toml +++ b/fsm/Cargo.toml @@ -6,7 +6,5 @@ edition = "2018" license-file = "LICENSE.txt" [dependencies] -thiserror = "1.0" -derive_more = "0.99" state_machine_procmacro = { path = "state_machine_procmacro" } state_machine_trait = { path = "state_machine_trait" } diff --git a/fsm/state_machine_procmacro/src/lib.rs b/fsm/state_machine_procmacro/src/lib.rs index 12a9a9d6c..6a19af7ea 100644 --- a/fsm/state_machine_procmacro/src/lib.rs +++ b/fsm/state_machine_procmacro/src/lib.rs @@ -329,7 +329,9 @@ impl StateMachineDefinition { .flat_map(|t| vec![t.from.clone(), t.to.clone()]) .collect(); let state_variants = states.iter().map(|s| { + let statestr = s.to_string(); quote! { + #[display(fmt=#statestr)] #s(#s) } }); @@ -349,7 +351,7 @@ impl StateMachineDefinition { } }; let states_enum = quote! { - #[derive(::derive_more::From, Clone)] + #[derive(::derive_more::From, Clone, ::derive_more::Display)] #visibility enum #state_enum_name { #(#state_variants),* } diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 14595a860..8121dcd70 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -63,11 +63,13 @@ message WFActivationJob { StartWorkflowTaskAttributes start_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed TimerFiredTaskAttributes timer_fired = 2; - // Workflow was reset. The randomness seed has to be updated. - RandomSeedUpdatedAttributes random_seed_updated = 3; + // A timer was canceled + TimerCanceledTaskAttributes timer_canceled = 3; + // Workflow was reset. The randomness seed must be updated. + RandomSeedUpdatedAttributes random_seed_updated = 4; - QueryWorkflowJob query_workflow = 4; - CancelWorkflowTaskAttributes cancel_workflow = 5; + QueryWorkflowJob query_workflow = 5; + CancelWorkflowTaskAttributes cancel_workflow = 6; } } @@ -94,6 +96,10 @@ message TimerFiredTaskAttributes { string timer_id = 1; } +message TimerCanceledTaskAttributes { + string timer_id = 1; +} + message RandomSeedUpdatedAttributes { uint64 randomness_seed = 1; } diff --git a/src/core_tracing.rs b/src/core_tracing.rs new file mode 100644 index 000000000..86d66a5a0 --- /dev/null +++ b/src/core_tracing.rs @@ -0,0 +1,24 @@ +use once_cell::sync::OnceCell; +use opentelemetry_jaeger::Uninstall; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +static TRACING_INIT: OnceCell = OnceCell::new(); +const TRACING_ENABLE_ENV_VAR: &str = "TEMPORAL_CORE_TRACING"; + +pub(crate) fn tracing_init() { + let _ = env_logger::try_init(); + if std::env::var(TRACING_ENABLE_ENV_VAR).is_ok() { + TRACING_INIT.get_or_init(|| { + let (tracer, uninstall) = opentelemetry_jaeger::new_pipeline() + .with_service_name("coresdk") + .install() + .unwrap(); + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + tracing_subscriber::registry() + .with(opentelemetry) + .try_init() + .unwrap(); + uninstall + }); + } +} diff --git a/src/lib.rs b/src/lib.rs index fbb334386..09405e935 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,14 +11,19 @@ extern crate tracing; pub mod protos; +pub(crate) mod core_tracing; mod machines; mod pollers; mod protosext; mod workflow; +#[cfg(test)] +mod test_help; + pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; +use crate::machines::WFMachinesError; use crate::{ machines::{InconvertibleCommandError, WFCommand}, protos::{ @@ -89,7 +94,7 @@ pub struct CoreInitOptions { /// * Will panic if called from within an async context, as it will construct a runtime and you /// cannot construct a runtime from within a runtime. pub fn init(opts: CoreInitOptions) -> Result { - let _ = env_logger::try_init(); + core_tracing::tracing_init(); let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; // Initialize server client let work_provider = runtime.block_on(opts.gateway_opts.connect())?; @@ -235,6 +240,7 @@ where fn shutdown(&self) -> Result<(), CoreError> { self.shutdown_requested.store(true, Ordering::SeqCst); + self.workflow_machines.shutdown(); Ok(()) } } @@ -281,7 +287,7 @@ impl CoreSDK { .collect::>>()?; self.workflow_machines.access(run_id, |mgr| { mgr.command_sink.send(cmds)?; - mgr.machines.iterate_machines(); + mgr.machines.iterate_machines()?; Ok(()) })?; Ok(()) @@ -306,6 +312,8 @@ pub enum CoreError { UninterpretableCommand(#[from] InconvertibleCommandError), /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), + /// Underlying error in state machines + UnderlyingMachinesError(#[from] WFMachinesError), /// Task token had nothing associated with it: {0:?} NothingFoundForTaskToken(Vec), /// Error calling the service: {0:?} @@ -325,71 +333,54 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; - use crate::machines::test_help::FakeCore; + use crate::machines::test_help::TestHistoryBuilder; + use crate::protos::temporal::api::enums::v1::EventType; use crate::{ - machines::test_help::{build_fake_core, TestHistoryBuilder}, + machines::test_help::{build_fake_core, FakeCore}, protos::{ coresdk::{ wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, }, - temporal::api::{ - command::v1::{ - CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, - }, - enums::v1::{EventType, WorkflowTaskFailedCause}, - history::v1::{history_event, TimerFiredEventAttributes}, + temporal::api::command::v1::{ + CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, + StartTimerCommandAttributes, }, }, + test_help::canned_histories, }; use rstest::{fixture, rstest}; - #[test] - fn single_timer_test_across_wf_bridge() { + const TASK_Q: &str = "test-task-queue"; + const RUN_ID: &str = "fake_run_id"; + + #[fixture(hist_batches=&[])] + fn single_timer_setup(hist_batches: &[usize]) -> FakeCore { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; - let timer_id = "fake_timer".to_string(); - let task_queue = "test-task-queue"; - let mut t = TestHistoryBuilder::default(); - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: timer_id.clone(), - }), - ); - t.add_workflow_task_scheduled_and_started(); - /* - 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED - 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 3: EVENT_TYPE_WORKFLOW_TASK_STARTED - --- - 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_TIMER_FIRED - 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 8: EVENT_TYPE_WORKFLOW_TASK_STARTED - --- - */ - let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); + let mut t = canned_histories::single_timer("fake_timer"); + let core = build_fake_core(wfid, RUN_ID, &mut t, hist_batches); + core + } - let res = core.poll_task(task_queue).unwrap(); + #[rstest(core, + case::incremental(single_timer_setup(&[1, 2])), + case::replay(single_timer_setup(&[2])) + )] + fn single_timer_test_across_wf_bridge(core: FakeCore) { + let res = core.poll_task(TASK_Q).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), }] ); - assert!(core.workflow_machines.exists(run_id)); + assert!(core.workflow_machines.exists(RUN_ID)); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { - timer_id, + timer_id: "fake_timer".to_string(), ..Default::default() } .into()], @@ -397,7 +388,7 @@ mod test { )) .unwrap(); - let res = core.poll_task(task_queue).unwrap(); + let res = core.poll_task(TASK_Q).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { @@ -412,49 +403,16 @@ mod test { .unwrap(); } - #[test] - fn parallel_timer_test_across_wf_bridge() { + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; - let timer_1_id = "timer1".to_string(); - let timer_2_id = "timer2".to_string(); + let timer_1_id = "timer1"; + let timer_2_id = "timer2"; let task_queue = "test-task-queue"; - let mut t = TestHistoryBuilder::default(); - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - let timer_2_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: timer_1_id.clone(), - }), - ); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_2_started_event_id, - timer_id: timer_2_id.clone(), - }), - ); - t.add_workflow_task_scheduled_and_started(); - /* - 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED - 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 3: EVENT_TYPE_WORKFLOW_TASK_STARTED - --- - 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_TIMER_STARTED - 7: EVENT_TYPE_TIMER_FIRED - 8: EVENT_TYPE_TIMER_FIRED - 9: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 10: EVENT_TYPE_WORKFLOW_TASK_STARTED - --- - */ - let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); + let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id); + let core = build_fake_core(wfid, run_id, &mut t, hist_batches); let res = core.poll_task(task_queue).unwrap(); assert_matches!( @@ -469,12 +427,12 @@ mod test { core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ StartTimerCommandAttributes { - timer_id: timer_1_id.clone(), + timer_id: timer_1_id.to_string(), ..Default::default() } .into(), StartTimerCommandAttributes { - timer_id: timer_2_id.clone(), + timer_id: timer_2_id.to_string(), ..Default::default() } .into(), @@ -510,28 +468,16 @@ mod test { .unwrap(); } - #[fixture] - fn single_timer_whole_replay() -> FakeCore { + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; - let timer_1_id = "timer1".to_string(); + let timer_id = "wait_timer"; + let cancel_timer_id = "cancel_timer"; let task_queue = "test-task-queue"; - let mut t = TestHistoryBuilder::default(); - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: timer_1_id.clone(), - }), - ); - t.add_workflow_task_scheduled_and_started(); - // NOTE! What makes this a replay test is the server only responds with *one* batch here. - // So, server is polled once, but lang->core interactions look just like non-replay test. - let core = build_fake_core(wfid, run_id, &mut t, &[2]); + let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id); + let core = build_fake_core(wfid, run_id, &mut t, hist_batches); let res = core.poll_task(task_queue).unwrap(); assert_matches!( @@ -544,11 +490,18 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { - timer_id: timer_1_id, - ..Default::default() - } - .into()], + vec![ + StartTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + ], task_tok, )) .unwrap(); @@ -562,25 +515,26 @@ mod test { ); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![ + CancelTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + } + .into(), + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + ], task_tok, )) .unwrap(); - core } - #[rstest] - fn single_timer_whole_replay_test_across_wf_bridge(_single_timer_whole_replay: FakeCore) { - // Nothing to do here -- whole real test is in fixture. Rstest properly handles leading `_` - } + #[rstest(single_timer_setup(&[1]))] + fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) { + let res = single_timer_setup.poll_task(TASK_Q).unwrap(); + assert_eq!(res.get_wf_jobs().len(), 1); - #[rstest] - fn after_shutdown_server_is_not_polled(single_timer_whole_replay: FakeCore) { - single_timer_whole_replay.shutdown().unwrap(); + single_timer_setup.shutdown().unwrap(); assert_matches!( - single_timer_whole_replay - .poll_task("irrelevant") - .unwrap_err(), + single_timer_setup.poll_task(TASK_Q).unwrap_err(), CoreError::ShuttingDown ); } @@ -593,40 +547,10 @@ mod test { let wfid = "fake_wf_id"; let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; - let timer_1_id = "timer1".to_string(); + let timer_1_id = "timer1"; let task_queue = "test-task-queue"; - /* - 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED - 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 3: EVENT_TYPE_WORKFLOW_TASK_STARTED - 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_TIMER_FIRED - 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 8: EVENT_TYPE_WORKFLOW_TASK_STARTED - 9: EVENT_TYPE_WORKFLOW_TASK_FAILED - 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 11: EVENT_TYPE_WORKFLOW_TASK_STARTED - */ - let mut t = TestHistoryBuilder::default(); - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: timer_1_id.clone(), - }), - ); - t.add_workflow_task_scheduled_and_started(); - t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, original_run_id); - - t.add_workflow_task_scheduled_and_started(); - - // NOTE! What makes this a replay test is the server only responds with *one* batch here. - // So, server is polled once, but lang->core interactions look just like non-replay test. + let mut t = canned_histories::workflow_fails_after_timer(timer_1_id, original_run_id); let core = build_fake_core(wfid, run_id, &mut t, &[2]); let res = core.poll_task(task_queue).unwrap(); @@ -646,7 +570,7 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { - timer_id: timer_1_id, + timer_id: timer_1_id.to_string(), ..Default::default() } .into()], @@ -673,4 +597,49 @@ mod test { )) .unwrap(); } + + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + fn cancel_timer_before_sent_wf_bridge(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let cancel_timer_id = "cancel_timer"; + let task_queue = "test-task-queue"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_workflow_execution_completed(); + + let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] + ); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ + StartTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + CancelTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + ], + task_tok, + )) + .unwrap(); + if hist_batches.len() > 1 { + core.poll_task(task_queue).unwrap(); + } + } } diff --git a/src/machines/complete_workflow_state_machine.rs b/src/machines/complete_workflow_state_machine.rs index 6bc3266d2..9b31dc5a7 100644 --- a/src/machines/complete_workflow_state_machine.rs +++ b/src/machines/complete_workflow_state_machine.rs @@ -1,8 +1,7 @@ -use crate::machines::workflow_machines::WorkflowTrigger; use crate::{ machines::{ - workflow_machines::WorkflowMachines, AddCommand, CancellableCommand, WFCommand, - WFMachinesAdapter, WFMachinesError, + workflow_machines::MachineResponse, Cancellable, NewMachineWithCommand, WFMachinesAdapter, + WFMachinesError, }, protos::temporal::api::{ command::v1::{Command, CompleteWorkflowExecutionCommandAttributes}, @@ -11,8 +10,7 @@ use crate::{ }, }; use rustfsm::{fsm, StateMachine, TransitionResult}; -use std::cell::RefCell; -use std::{convert::TryFrom, rc::Rc}; +use std::convert::TryFrom; fsm! { pub(super) @@ -31,17 +29,17 @@ fsm! { #[derive(Debug)] pub(super) enum CompleteWFCommand { - AddCommand(AddCommand), + AddCommand(Command), } /// Complete a workflow pub(super) fn complete_workflow( attribs: CompleteWorkflowExecutionCommandAttributes, -) -> CancellableCommand { +) -> NewMachineWithCommand { let (machine, add_cmd) = CompleteWorkflowMachine::new_scheduled(attribs); - CancellableCommand::Active { - command: add_cmd.command, - machine: Box::new(machine), + NewMachineWithCommand { + command: add_cmd, + machine, } } @@ -49,7 +47,7 @@ impl CompleteWorkflowMachine { /// Create a new WF machine and schedule it pub(crate) fn new_scheduled( attribs: CompleteWorkflowExecutionCommandAttributes, - ) -> (Self, AddCommand) { + ) -> (Self, Command) { let mut s = Self { state: Created {}.into(), shared_state: attribs, @@ -67,12 +65,17 @@ impl CompleteWorkflowMachine { } impl TryFrom for CompleteWorkflowMachineEvents { - type Error = (); + type Error = WFMachinesError; fn try_from(e: HistoryEvent) -> Result { Ok(match EventType::from_i32(e.event_type) { Some(EventType::WorkflowExecutionCompleted) => Self::WorkflowExecutionCompleted, - _ => return Err(()), + _ => { + return Err(WFMachinesError::UnexpectedEvent( + e, + "Complete workflow machine does not handle this event", + )) + } }) } } @@ -101,7 +104,7 @@ impl Created { attributes: Some(dat.into()), }; TransitionResult::commands::<_, CompleteWorkflowCommandCreated>(vec![ - CompleteWFCommand::AddCommand(cmd.into()), + CompleteWFCommand::AddCommand(cmd), ]) } } @@ -127,7 +130,9 @@ impl WFMachinesAdapter for CompleteWorkflowMachine { _event: &HistoryEvent, _has_next_event: bool, _my_command: CompleteWFCommand, - ) -> Result, WFMachinesError> { + ) -> Result, WFMachinesError> { Ok(vec![]) } } + +impl Cancellable for CompleteWorkflowMachine {} diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 35473e12c..c3ad06212 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -1,6 +1,6 @@ -#[allow(unused)] mod workflow_machines; +// TODO: Move all these inside a submachines module #[allow(unused)] mod activity_state_machine; #[allow(unused)] @@ -9,7 +9,6 @@ mod cancel_external_state_machine; mod cancel_workflow_state_machine; #[allow(unused)] mod child_workflow_state_machine; -#[allow(unused)] mod complete_workflow_state_machine; #[allow(unused)] mod continue_as_new_workflow_state_machine; @@ -23,13 +22,11 @@ mod mutable_side_effect_state_machine; mod side_effect_state_machine; #[allow(unused)] mod signal_external_state_machine; -#[allow(unused)] mod timer_state_machine; #[allow(unused)] mod upsert_search_attributes_state_machine; #[allow(unused)] mod version_state_machine; -#[allow(unused)] mod workflow_task_state_machine; #[cfg(test)] @@ -38,13 +35,13 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; use crate::{ - machines::workflow_machines::WorkflowTrigger, + machines::workflow_machines::MachineResponse, protos::{ coresdk::{self, command::Variant, wf_activation_job}, temporal::api::{ command::v1::{ - command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes, - StartTimerCommandAttributes, + command::Attributes, CancelTimerCommandAttributes, Command, + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, enums::v1::CommandType, history::v1::{ @@ -58,7 +55,7 @@ use prost::alloc::fmt::Formatter; use rustfsm::{MachineError, StateMachine}; use std::{ convert::{TryFrom, TryInto}, - fmt::Debug, + fmt::{Debug, Display}, }; use tracing::Level; @@ -68,7 +65,7 @@ pub(crate) type ProtoCommand = Command; /// drive it, start it, signal it, cancel it, etc. pub(crate) trait DrivenWorkflow: ActivationListener + Send { /// Start the workflow - fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec; + fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes); /// Obtain any output from the workflow's recent execution(s). Because the lang sdk is /// responsible for calling workflow code as a result of receiving tasks from @@ -93,13 +90,6 @@ pub(crate) trait ActivationListener { fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {} } -/// The struct for [WFCommand::AddCommand] -#[derive(Debug, derive_more::From)] -pub(crate) struct AddCommand { - /// The protobuf command - pub(crate) command: Command, -} - /// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. /// EX: Create a new timer, complete the workflow, etc. #[derive(Debug, derive_more::From)] @@ -107,6 +97,7 @@ pub enum WFCommand { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, AddTimer(StartTimerCommandAttributes), + CancelTimer(CancelTimerCommandAttributes), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), } @@ -124,6 +115,7 @@ impl TryFrom for WFCommand { .. })) => match attrs { Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), + Attributes::CancelTimerCommandAttributes(s) => Ok(WFCommand::CancelTimer(s)), Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { Ok(WFCommand::CompleteWorkflow(c)) } @@ -141,21 +133,30 @@ trait TemporalStateMachine: CheckStateMachineInFinal + Send { fn name(&self) -> &str; fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError>; - /// Tell the state machine to handle some event. Returns a list of triggers that can be used + /// Tell the state machine to handle some event. Returns a list of responses that can be used /// to update the overall state of the workflow. EX: To issue outgoing WF activations. fn handle_event( &mut self, event: &HistoryEvent, has_next_event: bool, - ) -> Result, WFMachinesError>; + ) -> Result, WFMachinesError>; + + /// Attempt to cancel the command associated with this state machine, if it is cancellable + fn cancel(&mut self) -> Result; + + /// Should return true if the command was cancelled before we sent it to the server. Always + /// returns false for non-cancellable machines + fn was_cancelled_before_sent_to_server(&self) -> bool; } impl TemporalStateMachine for SM where - SM: StateMachine + CheckStateMachineInFinal + WFMachinesAdapter + Clone + Send, + SM: StateMachine + CheckStateMachineInFinal + WFMachinesAdapter + Cancellable + Clone + Send, ::Event: TryFrom, ::Event: TryFrom, + WFMachinesError: From<<::Event as TryFrom>::Error>, ::Command: Debug, + ::State: Display, ::Error: Into + 'static + Send + Sync, { fn name(&self) -> &str { @@ -167,7 +168,8 @@ where Level::DEBUG, msg = "handling command", ?command_type, - machine_name = %self.name() + machine_name = %self.name(), + state = %self.state() ); if let Ok(converted_command) = command_type.try_into() { match self.on_event_mut(converted_command) { @@ -186,32 +188,53 @@ where &mut self, event: &HistoryEvent, has_next_event: bool, - ) -> Result, WFMachinesError> { + ) -> Result, WFMachinesError> { event!( Level::DEBUG, msg = "handling event", %event, - machine_name = %self.name() + machine_name = %self.name(), + state = %self.state() ); - if let Ok(converted_event) = event.clone().try_into() { - match self.on_event_mut(converted_event) { - Ok(c) => { - event!(Level::DEBUG, msg = "Machine produced commands", ?c); - let mut triggers = vec![]; - for cmd in c { - triggers.extend(self.adapt_response(event, has_next_event, cmd)?); - } - Ok(triggers) + let converted_event = event.clone().try_into()?; + match self.on_event_mut(converted_event) { + Ok(c) => { + if !c.is_empty() { + event!(Level::DEBUG, msg = "Machine produced commands", ?c, state = %self.state()); } - Err(MachineError::InvalidTransition) => { - Err(WFMachinesError::UnexpectedEvent(event.clone())) + let mut machine_responses = vec![]; + for cmd in c { + machine_responses.extend(self.adapt_response(event, has_next_event, cmd)?); } - Err(MachineError::Underlying(e)) => Err(e.into()), + Ok(machine_responses) } - } else { - Err(WFMachinesError::UnexpectedEvent(event.clone())) + Err(MachineError::InvalidTransition) => { + Err(WFMachinesError::InvalidTransitionDuringEvent( + event.clone(), + format!( + "{} in state {} says the transition is invalid", + self.name(), + self.state() + ), + )) + } + Err(MachineError::Underlying(e)) => Err(e.into()), } } + + fn cancel(&mut self) -> Result { + let res = self.cancel(); + res.map_err(|e| match e { + MachineError::InvalidTransition => { + WFMachinesError::InvalidTransition("while attempting to cancel") + } + MachineError::Underlying(e) => e.into(), + }) + } + + fn was_cancelled_before_sent_to_server(&self) -> bool { + self.was_cancelled_before_sent_to_server() + } } /// Exists purely to allow generic implementation of `is_final_state` for all [StateMachine] @@ -241,30 +264,33 @@ trait WFMachinesAdapter: StateMachine { event: &HistoryEvent, has_next_event: bool, my_command: Self::Command, - ) -> Result, WFMachinesError>; + ) -> Result, WFMachinesError>; } -/// A command which can be cancelled, associated with the state machine that produced it -#[derive(Debug)] -#[allow(clippy::large_enum_variant)] -enum CancellableCommand { - // TODO: You'll be used soon, friend. - #[allow(dead_code)] - Cancelled, - Active { - /// The inner protobuf command, if None, command has been cancelled - command: ProtoCommand, - machine: Box, - }, -} +trait Cancellable: StateMachine { + /// Cancel the machine / the command represented by the machine. + /// + /// # Panics + /// * If the machine is not cancellable. It's a logic error on our part to call it on such + /// machines. + fn cancel(&mut self) -> Result> { + // It's a logic error on our part if this is ever called on a machine that can't actually + // be cancelled + panic!(format!("Machine {} cannot be cancelled", self.name())) + } -impl CancellableCommand { - #[allow(dead_code)] // TODO: Use - pub(super) fn cancel(&mut self) { - *self = CancellableCommand::Cancelled; + /// Should return true if the command was cancelled before we sent it to the server + fn was_cancelled_before_sent_to_server(&self) -> bool { + false } } +#[derive(Debug)] +struct NewMachineWithCommand { + command: ProtoCommand, + machine: T, +} + impl Debug for dyn TemporalStateMachine { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.write_str(self.name()) diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 92ff651f7..e81743a10 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,13 +1,12 @@ use super::Result; -use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; -use crate::protos::temporal::api::history::v1::{History, WorkflowTaskFailedEventAttributes}; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ - enums::v1::EventType, + enums::v1::{EventType, WorkflowTaskFailedCause}, history::v1::{ - history_event::Attributes, HistoryEvent, TimerStartedEventAttributes, - WorkflowExecutionStartedEventAttributes, WorkflowTaskCompletedEventAttributes, + history_event::Attributes, History, HistoryEvent, TimerStartedEventAttributes, + WorkflowExecutionCompletedEventAttributes, WorkflowExecutionStartedEventAttributes, + WorkflowTaskCompletedEventAttributes, WorkflowTaskFailedEventAttributes, WorkflowTaskScheduledEventAttributes, WorkflowTaskStartedEventAttributes, }, }, @@ -25,6 +24,7 @@ pub struct TestHistoryBuilder { current_event_id: i64, workflow_task_scheduled_event_id: i64, previous_started_event_id: i64, + previous_task_completed_id: i64, } impl TestHistoryBuilder { @@ -57,7 +57,7 @@ impl TestHistoryBuilder { /// EVENT_TYPE_WORKFLOW_TASK_STARTED /// EVENT_TYPE_WORKFLOW_TASK_COMPLETED /// ``` - pub fn add_workflow_task(&mut self) { + pub fn add_full_wf_task(&mut self) { self.add_workflow_task_scheduled_and_started(); self.add_workflow_task_completed(); } @@ -87,7 +87,16 @@ impl TestHistoryBuilder { scheduled_event_id: self.workflow_task_scheduled_event_id, ..Default::default() }; - self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into()); + let id = self.add_get_event_id(EventType::WorkflowTaskCompleted, Some(attrs.into())); + self.previous_task_completed_id = id; + } + + pub fn add_workflow_execution_completed(&mut self) { + let attrs = WorkflowExecutionCompletedEventAttributes { + workflow_task_completed_event_id: self.previous_task_completed_id, + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowExecutionCompleted, attrs.into()); } pub fn add_workflow_task_failed(&mut self, cause: WorkflowTaskFailedCause, new_run_id: &str) { diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 9522ae745..842326a4a 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -4,7 +4,7 @@ mod history_builder; mod workflow_driver; pub(crate) use history_builder::TestHistoryBuilder; -pub(super) use workflow_driver::{CommandSender, TestWFCommand, TestWorkflowDriver}; +pub(super) use workflow_driver::{CommandSender, TestWorkflowDriver}; use crate::workflow::WorkflowConcurrencyManager; use crate::{ diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 670408595..d6edc8506 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,3 +1,4 @@ +use crate::protos::temporal::api::command::v1::CancelTimerCommandAttributes; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ @@ -23,9 +24,16 @@ use tracing::Level; /// over when commands are returned than a normal workflow would. /// /// It replaces "TestEnitityTestListenerBase" in java which is pretty hard to follow. +/// +/// It is important to understand that this driver doesn't work like a real workflow in the sense +/// that nothing in it ever blocks, or ever should block. Every workflow task will run through the +/// *entire* workflow, but any commands given to the sink after a `Waiting` command are simply +/// ignored, allowing you to simulate blocking without ever actually blocking. pub(in crate::machines) struct TestWorkflowDriver { wf_function: F, cache: Arc, + /// Set to true if a workflow execution completed/failed/cancelled/etc has been issued + sent_final_execution: bool, } #[derive(Default, Debug)] @@ -52,6 +60,7 @@ where Self { wf_function: workflow_fn, cache: Default::default(), + sent_final_execution: false, } } } @@ -72,26 +81,40 @@ where F: Fn(CommandSender) -> Fut + Send + Sync, Fut: Future, { - fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) -> Vec { + fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) { event!(Level::DEBUG, msg = "Test WF driver start called"); - vec![] } fn fetch_workflow_iteration_output(&mut self) -> Vec { + // If we have already sent the command to complete the workflow, we don't want + // to re-run the workflow again. + // TODO: This would be better to solve by actually pausing the workflow properly rather + // than doing the re-run the whole thing every time deal. + if self.sent_final_execution { + return vec![]; + } + let (sender, receiver) = CommandSender::new(self.cache.clone()); // Call the closure that produces the workflow future let wf_future = (self.wf_function)(sender); + // TODO: This is pointless right now -- either actually use async and suspend on awaits + // or just remove it. // Create a tokio runtime to block on the future let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(wf_future); let cmds = receiver.into_iter(); - let mut last_cmd = None; + let mut emit_these = vec![]; for cmd in cmds { match cmd { - TestWFCommand::WFCommand(c) => last_cmd = Some(c), + TestWFCommand::WFCommand(c) => { + if let WFCommand::CompleteWorkflow(_) = &c { + self.sent_final_execution = true; + } + emit_these.push(c); + } TestWFCommand::Waiting => { // Ignore further commands since we're waiting on something break; @@ -99,14 +122,9 @@ where } } - event!(Level::DEBUG, msg = "Test wf driver emitting", ?last_cmd); + event!(Level::DEBUG, msg = "Test wf driver emitting", ?emit_these); - // Return only the last command, since that's what would've been yielded in a real wf - if let Some(c) = last_cmd { - vec![c] - } else { - vec![] - } + emit_these } fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} @@ -134,8 +152,10 @@ impl CommandSender { (Self { chan, twd_cache }, rx) } - /// Request to create a timer, returning future which resolves when the timer completes - pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future + '_ { + /// Request to create a timer. Returns true if the timer has fired, false if it hasn't yet. + /// + /// If `do_wait` is true, issue a waiting command if the timer is not finished. + pub fn timer(&mut self, a: StartTimerCommandAttributes, do_wait: bool) -> bool { let finished = match self.twd_cache.unblocked_timers.entry(a.timer_id.clone()) { dashmap::mapref::entry::Entry::Occupied(existing) => *existing.get(), dashmap::mapref::entry::Entry::Vacant(v) => { @@ -145,10 +165,17 @@ impl CommandSender { false } }; - if !finished { + if !finished && do_wait { self.chan.send(TestWFCommand::Waiting).unwrap(); } - futures::future::ready(()) + finished + } + + pub fn cancel_timer(&mut self, timer_id: &str) { + let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { + timer_id: timer_id.to_string(), + }); + self.chan.send(c.into()).unwrap(); } pub fn send(&mut self, c: WFCommand) { diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 75e169dc1..94b4c4415 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -2,25 +2,20 @@ use crate::{ machines::{ - workflow_machines::{WFMachinesError, WorkflowMachines, WorkflowTrigger}, - AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter, + workflow_machines::{MachineResponse, WFMachinesError}, + Cancellable, NewMachineWithCommand, WFMachinesAdapter, }, protos::{ - coresdk::{HistoryEventId, TimerFiredTaskAttributes, WfActivation}, + coresdk::{HistoryEventId, TimerCanceledTaskAttributes, TimerFiredTaskAttributes}, temporal::api::{ - command::v1::{ - command::Attributes, CancelTimerCommandAttributes, Command, - StartTimerCommandAttributes, - }, + command::v1::{CancelTimerCommandAttributes, Command, StartTimerCommandAttributes}, enums::v1::{CommandType, EventType}, - history::v1::{history_event, HistoryEvent, TimerCanceledEventAttributes}, + history::v1::{history_event, HistoryEvent, TimerFiredEventAttributes}, }, }, }; -use rustfsm::{fsm, StateMachine, TransitionResult}; -use std::sync::Arc; -use std::{cell::RefCell, convert::TryFrom, rc::Rc, sync::atomic::Ordering}; -use tracing::Level; +use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; +use std::convert::TryFrom; fsm! { pub(super) name TimerMachine; @@ -28,48 +23,55 @@ fsm! { error WFMachinesError; shared_state SharedState; - Created --(Schedule, shared on_schedule) --> StartCommandCreated; + Created --(Schedule, on_schedule) --> StartCommandCreated; StartCommandCreated --(CommandStartTimer) --> StartCommandCreated; StartCommandCreated --(TimerStarted(HistoryEventId), on_timer_started) --> StartCommandRecorded; StartCommandCreated --(Cancel, shared on_cancel) --> Canceled; - StartCommandRecorded --(TimerFired(HistoryEvent), on_timer_fired) --> Fired; + StartCommandRecorded --(TimerFired(TimerFiredEventAttributes), shared on_timer_fired) --> Fired; StartCommandRecorded --(Cancel, shared on_cancel) --> CancelTimerCommandCreated; CancelTimerCommandCreated --(Cancel) --> CancelTimerCommandCreated; CancelTimerCommandCreated - --(CommandCancelTimer, shared on_command_cancel_timer) --> CancelTimerCommandSent; + --(CommandCancelTimer, on_command_cancel_timer) --> CancelTimerCommandSent; CancelTimerCommandSent --(TimerCanceled) --> Canceled; } #[derive(Debug)] pub(super) enum TimerMachineCommand { - AddCommand(AddCommand), - Complete(HistoryEvent), + Complete, + Canceled, + IssueCancelCmd(Command), +} + +#[derive(Default, Clone)] +pub(super) struct SharedState { + attrs: StartTimerCommandAttributes, + cancelled_before_sent: bool, } /// Creates a new, scheduled, timer as a [CancellableCommand] -pub(super) fn new_timer(attribs: StartTimerCommandAttributes) -> CancellableCommand { +pub(super) fn new_timer( + attribs: StartTimerCommandAttributes, +) -> NewMachineWithCommand { let (timer, add_cmd) = TimerMachine::new_scheduled(attribs); - CancellableCommand::Active { - command: add_cmd.command, - machine: Box::new(timer), + NewMachineWithCommand { + command: add_cmd, + machine: timer, } } impl TimerMachine { /// Create a new timer and immediately schedule it - pub(crate) fn new_scheduled(attribs: StartTimerCommandAttributes) -> (Self, AddCommand) { + pub(crate) fn new_scheduled(attribs: StartTimerCommandAttributes) -> (Self, Command) { let mut s = Self::new(attribs); - let cmd = match s - .on_event_mut(TimerMachineEvents::Schedule) - .expect("Scheduling timers doesn't fail") - .pop() - { - Some(TimerMachineCommand::AddCommand(c)) => c, - _ => panic!("Timer on_schedule must produce command"), + s.on_event_mut(TimerMachineEvents::Schedule) + .expect("Scheduling timers doesn't fail"); + let cmd = Command { + command_type: CommandType::StartTimer as i32, + attributes: Some(s.shared_state().attrs.clone().into()), }; (s, cmd) } @@ -78,21 +80,38 @@ impl TimerMachine { Self { state: Created {}.into(), shared_state: SharedState { - timer_attributes: attribs, + attrs: attribs, + cancelled_before_sent: false, }, } } } impl TryFrom for TimerMachineEvents { - type Error = (); + type Error = WFMachinesError; fn try_from(e: HistoryEvent) -> Result { Ok(match EventType::from_i32(e.event_type) { Some(EventType::TimerStarted) => Self::TimerStarted(e.event_id), Some(EventType::TimerCanceled) => Self::TimerCanceled, - Some(EventType::TimerFired) => Self::TimerFired(e), - _ => return Err(()), + Some(EventType::TimerFired) => { + if let Some(history_event::Attributes::TimerFiredEventAttributes(attrs)) = + e.attributes + { + Self::TimerFired(attrs) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Timer fired attribs were unset".to_string(), + )); + } + } + _ => { + return Err(WFMachinesError::UnexpectedEvent( + e, + "Timer machine does not handle this event", + )) + } }) } } @@ -109,41 +128,12 @@ impl TryFrom for TimerMachineEvents { } } -#[derive(Default, Clone)] -pub(super) struct SharedState { - timer_attributes: StartTimerCommandAttributes, -} - -impl SharedState { - fn into_timer_canceled_event_command(self) -> TimerMachineCommand { - let attrs = TimerCanceledEventAttributes { - identity: "workflow".to_string(), - timer_id: self.timer_attributes.timer_id, - ..Default::default() - }; - let event = HistoryEvent { - event_type: EventType::TimerCanceled as i32, - attributes: Some(history_event::Attributes::TimerCanceledEventAttributes( - attrs, - )), - ..Default::default() - }; - TimerMachineCommand::Complete(event) - } -} - #[derive(Default, Clone)] pub(super) struct Created {} impl Created { - pub(super) fn on_schedule(self, dat: SharedState) -> TimerMachineTransition { - let cmd = Command { - command_type: CommandType::StartTimer as i32, - attributes: Some(dat.timer_attributes.into()), - }; - TimerMachineTransition::commands::<_, StartCommandCreated>(vec![ - TimerMachineCommand::AddCommand(cmd.into()), - ]) + pub(super) fn on_schedule(self) -> TimerMachineTransition { + TimerMachineTransition::default::() } } @@ -151,10 +141,10 @@ impl Created { pub(super) struct CancelTimerCommandCreated {} impl CancelTimerCommandCreated { - pub(super) fn on_command_cancel_timer(self, dat: SharedState) -> TimerMachineTransition { + pub(super) fn on_command_cancel_timer(self) -> TimerMachineTransition { TimerMachineTransition::ok( - vec![dat.into_timer_canceled_event_command()], - Canceled::default(), + vec![TimerMachineCommand::Canceled], + CancelTimerCommandSent::default(), ) } } @@ -182,12 +172,14 @@ impl StartCommandCreated { // TODO: Java recorded an initial event ID, but it seemingly was never used. TimerMachineTransition::default::() } - pub(super) fn on_cancel(mut self, dat: SharedState) -> TimerMachineTransition { - // Cancel the initial command - which just sets a "canceled" flag in a wrapper of a - // proto command. TODO: Does this make any sense? - no - propagate up - TimerMachineTransition::ok( - vec![dat.into_timer_canceled_event_command()], + pub(super) fn on_cancel(self, dat: SharedState) -> TimerMachineTransition { + TimerMachineTransition::ok_shared( + vec![TimerMachineCommand::Canceled], Canceled::default(), + SharedState { + cancelled_before_sent: true, + ..dat + }, ) } } @@ -196,21 +188,33 @@ impl StartCommandCreated { pub(super) struct StartCommandRecorded {} impl StartCommandRecorded { - pub(super) fn on_timer_fired(self, event: HistoryEvent) -> TimerMachineTransition { - TimerMachineTransition::ok(vec![TimerMachineCommand::Complete(event)], Fired::default()) + pub(super) fn on_timer_fired( + self, + dat: SharedState, + attrs: TimerFiredEventAttributes, + ) -> TimerMachineTransition { + if dat.attrs.timer_id != attrs.timer_id { + TimerMachineTransition::Err(WFMachinesError::MalformedEventDetail(format!( + "Timer fired event did not have expected timer id {}!", + dat.attrs.timer_id + ))) + } else { + TimerMachineTransition::ok(vec![TimerMachineCommand::Complete], Fired::default()) + } } + pub(super) fn on_cancel(self, dat: SharedState) -> TimerMachineTransition { let cmd = Command { command_type: CommandType::CancelTimer as i32, attributes: Some( CancelTimerCommandAttributes { - timer_id: dat.timer_attributes.timer_id, + timer_id: dat.attrs.timer_id, } .into(), ), }; TimerMachineTransition::ok( - vec![TimerMachineCommand::AddCommand(cmd.into())], + vec![TimerMachineCommand::IssueCancelCmd(cmd)], CancelTimerCommandCreated::default(), ) } @@ -222,15 +226,33 @@ impl WFMachinesAdapter for TimerMachine { _event: &HistoryEvent, _has_next_event: bool, my_command: TimerMachineCommand, - ) -> Result, WFMachinesError> { - match my_command { + ) -> Result, WFMachinesError> { + Ok(match my_command { // Fire the completion - TimerMachineCommand::Complete(_event) => Ok(vec![TimerFiredTaskAttributes { - timer_id: self.shared_state.timer_attributes.timer_id.clone(), + TimerMachineCommand::Complete => vec![TimerFiredTaskAttributes { + timer_id: self.shared_state.attrs.timer_id.clone(), } - .into()]), - TimerMachineCommand::AddCommand(_) => unreachable!(), - } + .into()], + TimerMachineCommand::Canceled => vec![TimerCanceledTaskAttributes { + timer_id: self.shared_state.attrs.timer_id.clone(), + } + .into()], + TimerMachineCommand::IssueCancelCmd(c) => vec![MachineResponse::IssueNewCommand(c)], + }) + } +} + +impl Cancellable for TimerMachine { + fn cancel(&mut self) -> Result> { + Ok(match self.on_event_mut(TimerMachineEvents::Cancel)?.pop() { + Some(TimerMachineCommand::IssueCancelCmd(cmd)) => MachineResponse::IssueNewCommand(cmd), + Some(TimerMachineCommand::Canceled) => MachineResponse::NoOp, + x => panic!(format!("Invalid cancel event response {:?}", x)), + }) + } + + fn was_cancelled_before_sent_to_server(&self) -> bool { + self.shared_state().cancelled_before_sent } } @@ -239,74 +261,52 @@ mod test { use super::*; use crate::{ machines::{ - complete_workflow_state_machine::complete_workflow, - test_help::{CommandSender, TestHistoryBuilder, TestWFCommand, TestWorkflowDriver}, + test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}, workflow_machines::WorkflowMachines, - DrivenWorkflow, WFCommand, - }, - protos::temporal::api::{ - command::v1::CompleteWorkflowExecutionCommandAttributes, - history::v1::{ - TimerFiredEventAttributes, WorkflowExecutionCanceledEventAttributes, - WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, - }, }, + protos::temporal::api::command::v1::CompleteWorkflowExecutionCommandAttributes, + test_help::canned_histories, }; - use futures::{channel::mpsc::Sender, FutureExt, SinkExt}; use rstest::{fixture, rstest}; - use std::sync::Arc; - use std::{error::Error, time::Duration}; - use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + use std::time::Duration; + use tracing::Level; #[fixture] fn fire_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { + crate::core_tracing::tracing_init(); /* - 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED - 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 3: EVENT_TYPE_WORKFLOW_TASK_STARTED - 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_TIMER_FIRED - 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 8: EVENT_TYPE_WORKFLOW_TASK_STARTED - - We have two versions of this test, one which processes the history in two calls, - and one which replays all of it in one go. Both versions must produce the same - two activations. + We have two versions of this test, one which processes the history in two calls, and one + which replays all of it in one go. Both versions must produce the same two activations. + However, The former will iterate the machines three times and the latter will iterate + them twice. + + There are two workflow tasks, so it seems we should iterate two times, but the reason + for the extra iteration in the incremental version is that we need to "wait" for the + timer to fire. In the all-in-one-go test, the timer is created and resolved in the same + task, hence no extra loop. */ let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let timer = StartTimerCommandAttributes { - timer_id: "Sometimer".to_string(), + timer_id: "timer1".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; - command_sink.timer(timer).await; + command_sink.timer(timer, true); let complete = CompleteWorkflowExecutionCommandAttributes::default(); command_sink.send(complete.into()); }); - let mut t = TestHistoryBuilder::default(); - let mut state_machines = + let t = canned_histories::single_timer("timer1"); + let state_machines = WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: "timer1".to_string(), - }), - ); - t.add_workflow_task_scheduled_and_started(); assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); (t, state_machines) } #[rstest] fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "inc"); + let s = span!(Level::DEBUG, "Test start", t = "happy_inc"); let _enter = s.enter(); let (t, mut state_machines) = fire_happy_hist; @@ -314,14 +314,13 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) .unwrap(); - dbg!(&commands); - dbg!(state_machines.get_wf_activation()); + state_machines.get_wf_activation(); assert_eq!(commands.len(), 1); assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); - dbg!(state_machines.get_wf_activation()); + state_machines.get_wf_activation(); assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, @@ -331,12 +330,12 @@ mod test { #[rstest] fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "full"); + let s = span!(Level::DEBUG, "Test start", t = "happy_full"); let _enter = s.enter(); let (t, mut state_machines) = fire_happy_hist; let commands = t - .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .handle_workflow_task_take_cmds(&mut state_machines, None) .unwrap(); assert_eq!(commands.len(), 1); assert_eq!( @@ -344,4 +343,130 @@ mod test { CommandType::CompleteWorkflowExecution as i32 ); } + + #[test] + fn mismatched_timer_ids_errors() { + let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { + let timer = StartTimerCommandAttributes { + timer_id: "realid".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(5).into()), + }; + command_sink.timer(timer, true); + }); + + let t = canned_histories::single_timer("badid"); + let mut state_machines = + WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + + assert!(t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap_err() + .to_string() + .contains("Timer fired event did not have expected timer id realid!")) + } + + #[fixture] + fn cancellation_setup() -> (TestHistoryBuilder, WorkflowMachines) { + crate::core_tracing::tracing_init(); + + let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { + let _cancel_this = cmd_sink.timer( + StartTimerCommandAttributes { + timer_id: "cancel_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(500).into()), + }, + false, + ); + cmd_sink.timer( + StartTimerCommandAttributes { + timer_id: "wait_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(5).into()), + }, + true, + ); + // Cancel the first timer after having waited on the second + cmd_sink.cancel_timer("cancel_timer"); + + let complete = CompleteWorkflowExecutionCommandAttributes::default(); + cmd_sink.send(complete.into()); + }); + + let t = canned_histories::cancel_timer("wait_timer", "cancel_timer"); + let state_machines = + WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + (t, state_machines) + } + + #[rstest] + fn incremental_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { + let s = span!(Level::DEBUG, "Test start", t = "cancel_inc"); + let _enter = s.enter(); + + let (t, mut state_machines) = cancellation_setup; + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) + .unwrap(); + assert_eq!(commands.len(), 2); + assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); + assert_eq!(commands[1].command_type, CommandType::StartTimer as i32); + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .unwrap(); + assert_eq!(commands.len(), 2); + assert_eq!(commands[0].command_type, CommandType::CancelTimer as i32); + assert_eq!( + commands[1].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap(); + // There should be no commands - the wf completed at the same time the timer was cancelled + assert_eq!(commands.len(), 0); + } + + #[rstest] + fn full_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { + let s = span!(Level::DEBUG, "Test start", t = "cancel_full"); + let _enter = s.enter(); + + let (t, mut state_machines) = cancellation_setup; + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap(); + // There should be no commands - the wf completed at the same time the timer was cancelled + assert_eq!(commands.len(), 0); + } + + #[test] + fn cancel_before_sent_to_server() { + crate::core_tracing::tracing_init(); + let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { + cmd_sink.timer( + StartTimerCommandAttributes { + timer_id: "cancel_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(500).into()), + }, + false, + ); + // Immediately cancel the timer + cmd_sink.cancel_timer("cancel_timer"); + + let complete = CompleteWorkflowExecutionCommandAttributes::default(); + cmd_sink.send(complete.into()); + }); + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_workflow_execution_completed(); + + let mut state_machines = + WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap(); + assert_eq!(commands.len(), 0); + } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 87b2f20d3..362a7745d 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,39 +1,28 @@ -use crate::machines::workflow_machines::WFMachinesError::MalformedEvent; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, - workflow_task_state_machine::WorkflowTaskMachine, ActivationListener, CancellableCommand, - DrivenWorkflow, ProtoCommand, TemporalStateMachine, WFCommand, + workflow_task_state_machine::WorkflowTaskMachine, DrivenWorkflow, NewMachineWithCommand, + ProtoCommand, TemporalStateMachine, WFCommand, }, - protos::coresdk::WfActivationJob, protos::{ coresdk::{ - wf_activation_job, wf_activation_job::Attributes::RandomSeedUpdated, - RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, WfActivation, + wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, + WfActivation, }, temporal::api::{ - command::v1::StartTimerCommandAttributes, - common::v1::WorkflowExecution, enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent}, }, }, }; -use futures::Future; -use rustfsm::StateMachine; +use slotmap::SlotMap; use std::{ - borrow::BorrowMut, - cell::RefCell, - collections::hash_map::DefaultHasher, - collections::{HashMap, HashSet, VecDeque}, - hash::Hash, - hash::Hasher, - ops::DerefMut, - sync::{atomic::AtomicBool, Arc}, + borrow::{Borrow, BorrowMut}, + collections::{hash_map::DefaultHasher, HashMap, VecDeque}, + hash::{Hash, Hasher}, time::SystemTime, }; use tracing::Level; -use uuid::Uuid; type Result = std::result::Result; @@ -57,17 +46,25 @@ pub(crate) struct WorkflowMachines { /// The current workflow time if it has been established current_wf_time: Option, + all_machines: SlotMap>, + /// A mapping for accessing all the machines, where the key is the id of the initiating event /// for that machine. - machines_by_id: HashMap>, + machines_by_event_id: HashMap, + + /// Maps timer ids as created by workflow authors to their associated machines + /// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum. + timer_id_to_machine: HashMap, - /// Queued commands which have been produced by machines and await processing - commands: VecDeque, - /// Commands generated by the currently processing workflow task. + /// Queued commands which have been produced by machines and await processing / being sent to + /// the server. + commands: VecDeque, + /// Commands generated by the currently processing workflow task, which will eventually be + /// transferred to `commands` /// /// Old note: It is a queue as commands can be added (due to marker based commands) while /// iterating over already added commands. - current_wf_task_commands: VecDeque, + current_wf_task_commands: VecDeque, /// Outgoing activation jobs that need to be sent to the lang sdk outgoing_wf_activation_jobs: VecDeque, @@ -75,11 +72,20 @@ pub(crate) struct WorkflowMachines { drive_me: Box, } +slotmap::new_key_type! { struct MachineKey; } +#[derive(Debug)] +struct CommandAndMachine { + command: ProtoCommand, + machine: MachineKey, +} + /// Returned by [TemporalStateMachine]s when handling events #[derive(Debug, derive_more::From)] #[must_use] -pub(super) enum WorkflowTrigger { +#[allow(clippy::large_enum_variant)] +pub enum MachineResponse { PushWFJob(#[from(forward)] wf_activation_job::Attributes), + IssueNewCommand(ProtoCommand), TriggerWFTaskStarted { task_started_event_id: i64, time: SystemTime, @@ -87,21 +93,35 @@ pub(super) enum WorkflowTrigger { UpdateRunIdOnWorkflowReset { run_id: String, }, + NoOp, } #[derive(thiserror::Error, Debug)] +// TODO: Some of these are redundant with MachineError -- we should try to dedupe / simplify pub enum WFMachinesError { - #[error("Event {0:?} was not expected")] - UnexpectedEvent(HistoryEvent), + #[error("Event {0:?} was not expected: {1}")] + UnexpectedEvent(HistoryEvent, &'static str), + #[error("Event {0:?} was not expected: {1}")] + InvalidTransitionDuringEvent(HistoryEvent, String), #[error("Event {0:?} was malformed: {1}")] MalformedEvent(HistoryEvent, String), + // Expected to be transformed into a `MalformedEvent` with the full event by workflow machines, + // when emitted by a sub-machine + #[error("{0}")] + MalformedEventDetail(String), #[error("Command type {0:?} was not expected")] UnexpectedCommand(CommandType), + #[error("Command type {0} is not known")] + UnknownCommandType(i32), #[error("No command was scheduled for event {0:?}")] NoCommandScheduledForEvent(HistoryEvent), + #[error("Machine response {0:?} was not expected: {1}")] + UnexpectedMachineResponse(MachineResponse, &'static str), + #[error("Command was missing its associated machine: {0}")] + MissingAssociatedMachine(String), - #[error("Underlying error {0:?}")] - Underlying(#[from] anyhow::Error), + #[error("Machine encountered an invalid transition: {0}")] + InvalidTransition(&'static str), } impl WorkflowMachines { @@ -120,21 +140,15 @@ impl WorkflowMachines { previous_started_event_id: 0, replaying: false, current_wf_time: None, - machines_by_id: Default::default(), + all_machines: Default::default(), + machines_by_event_id: Default::default(), + timer_id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), outgoing_wf_activation_jobs: Default::default(), } } - /// Create a new timer for this workflow with the provided attributes and sender. The sender - /// is sent `true` when the timer completes. - /// - /// Returns the command and a future that will resolve when the timer completes - pub(super) fn new_timer(&mut self, attribs: StartTimerCommandAttributes) -> CancellableCommand { - new_timer(attribs) - } - /// Returns the id of the last seen WorkflowTaskStarted event pub(crate) fn get_last_started_event_id(&self) -> i64 { self.current_started_event_id @@ -154,8 +168,9 @@ impl WorkflowMachines { self.handle_command_event(event)?; return Ok(()); } - let event_type = EventType::from_i32(event.event_type) - .ok_or_else(|| WFMachinesError::UnexpectedEvent(event.clone()))?; + let event_type = EventType::from_i32(event.event_type).ok_or_else(|| { + WFMachinesError::UnexpectedEvent(event.clone(), "The event type is unknown") + })?; if self.replaying && self.current_started_event_id >= self.previous_started_event_id @@ -169,9 +184,9 @@ impl WorkflowMachines { Some(initial_cmd_id) => { // We remove the machine while we it handles events, then return it, to avoid // borrowing from ourself mutably. - let mut maybe_machine = self.machines_by_id.remove(&initial_cmd_id); - if let Some(mut sm) = maybe_machine.as_mut() { - self.submachine_handle_event((*sm).borrow_mut(), event, has_next_event)?; + let maybe_machine = self.machines_by_event_id.remove(&initial_cmd_id); + if let Some(sm) = maybe_machine { + self.submachine_handle_event(sm, event, has_next_event)?; } else { event!( Level::ERROR, @@ -183,8 +198,8 @@ impl WorkflowMachines { // Restore machine if not in it's final state if let Some(sm) = maybe_machine { - if !sm.is_final_state() { - self.machines_by_id.insert(initial_cmd_id, sm); + if !self.machine(sm).is_final_state() { + self.machines_by_event_id.insert(initial_cmd_id, sm); } } } @@ -196,7 +211,11 @@ impl WorkflowMachines { /// Called when we want to run the event loop because a workflow task started event has /// triggered - pub(super) fn task_started(&mut self, task_started_event_id: i64, time: SystemTime) { + pub(super) fn task_started( + &mut self, + task_started_event_id: i64, + time: SystemTime, + ) -> Result<()> { let s = span!(Level::DEBUG, "Task started trigger"); let _enter = s.enter(); @@ -221,12 +240,13 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); - self.iterate_machines(); + self.iterate_machines()?; + Ok(()) } - /// A command event is an event which is generated from a command emitted by a past decision. - /// Each command has a correspondent event. For example ScheduleActivityTaskCommand is recorded - /// to the history as ActivityTaskScheduledEvent. + /// A command event is an event which is generated from a command emitted as a result of + /// performing a workflow task. Each command has a corresponding event. For example + /// ScheduleActivityTaskCommand is recorded to the history as ActivityTaskScheduledEvent. /// /// Command events always follow WorkflowTaskCompletedEvent. /// @@ -238,13 +258,14 @@ impl WorkflowMachines { // if (handleLocalActivityMarker(event)) { // return; // } + event!(Level::DEBUG, msg = "handling command event", current_commands = ?self.commands); let consumed_cmd = loop { // handleVersionMarker can skip a marker event if the getVersion call was removed. // In this case we don't want to consume a command. -- we will need to replace it back // to the front when implementing, or something better let maybe_command = self.commands.pop_front(); - let mut command = if let Some(c) = maybe_command { + let command = if let Some(c) = maybe_command { c } else { return Err(WFMachinesError::NoCommandScheduledForEvent(event.clone())); @@ -252,16 +273,19 @@ impl WorkflowMachines { // Feed the machine the event let mut break_later = false; - if let CancellableCommand::Active { - ref mut machine, .. - } = &mut command - { - self.submachine_handle_event((*machine).borrow_mut(), event, true)?; + let canceled_before_sent = self + .machine(command.machine) + .was_cancelled_before_sent_to_server(); + + if !canceled_before_sent { + self.submachine_handle_event(command.machine, event, true)?; + } - // TODO: Handle invalid event errors - // * More special handling for version machine - see java - // * Command/machine supposed to have cancelled itself + // TODO: + // * More special handling for version machine - see java + // * Commands cancelled this iteration are allowed to not match the event? + if !canceled_before_sent { break_later = true; } @@ -272,10 +296,9 @@ impl WorkflowMachines { // TODO: validate command - if let CancellableCommand::Active { machine, .. } = consumed_cmd { - if !machine.is_final_state() { - self.machines_by_id.insert(event.event_id, machine); - } + if !self.machine(consumed_cmd.machine).is_final_state() { + self.machines_by_event_id + .insert(event.event_id, consumed_cmd.machine); } Ok(()) @@ -309,8 +332,7 @@ impl WorkflowMachines { } .into(), ); - let results = self.drive_me.start(attrs.clone()); - self.handle_driven_results(results); + self.drive_me.start(attrs.clone()); } else { return Err(WFMachinesError::MalformedEvent( event.clone(), @@ -320,14 +342,10 @@ impl WorkflowMachines { } } Some(EventType::WorkflowTaskScheduled) => { - let mut wf_task_sm = WorkflowTaskMachine::new(self.workflow_task_started_event_id); - self.submachine_handle_event( - &mut wf_task_sm as &mut dyn TemporalStateMachine, - event, - has_next_event, - )?; - self.machines_by_id - .insert(event.event_id, Box::new(wf_task_sm)); + let wf_task_sm = WorkflowTaskMachine::new(self.workflow_task_started_event_id); + let key = self.all_machines.insert(Box::new(wf_task_sm)); + self.submachine_handle_event(key, event, has_next_event)?; + self.machines_by_event_id.insert(event.event_id, key); } Some(EventType::WorkflowExecutionSignaled) => { // TODO: Signal callbacks @@ -335,18 +353,25 @@ impl WorkflowMachines { Some(EventType::WorkflowExecutionCancelRequested) => { // TODO: Cancel callbacks } - _ => return Err(WFMachinesError::UnexpectedEvent(event.clone())), + _ => { + return Err(WFMachinesError::UnexpectedEvent( + event.clone(), + "The event is non a non-stateful event, but we tried to handle it as one", + )) + } } Ok(()) } - /// Fetches commands ready for processing from the state machines + /// Fetches commands which are ready for processing from the state machines, generally to be + /// sent off to the server. They are not removed from the internal queue, that happens when + /// corresponding history events from the server are being handled. pub(crate) fn get_commands(&mut self) -> Vec { self.commands .iter() .filter_map(|c| { - if let CancellableCommand::Active { command, .. } = c { - Some(command.clone()) + if !self.machine(c.machine).is_final_state() { + Some(c.command.clone()) } else { None } @@ -355,7 +380,8 @@ impl WorkflowMachines { } /// Returns the next activation that needs to be performed by the lang sdk. Things like unblock - /// timer, etc. + /// timer, etc. This does *not* cause any advancement of the state machines, it merely drains + /// from the outgoing queue of activation jobs. pub(crate) fn get_wf_activation(&mut self) -> Option { if self.outgoing_wf_activation_jobs.is_empty() { None @@ -395,36 +421,49 @@ impl WorkflowMachines { /// Iterate the state machines, which consists of grabbing any pending outgoing commands from /// the workflow, handling them, and preparing them to be sent off to the server. - pub(crate) fn iterate_machines(&mut self) { + pub(crate) fn iterate_machines(&mut self) -> Result<()> { let results = self.drive_me.fetch_workflow_iteration_output(); - self.handle_driven_results(results); - - self.prepare_commands(); + self.handle_driven_results(results)?; + self.prepare_commands()?; + Ok(()) } /// Wrapper for calling [TemporalStateMachine::handle_event] which appropriately takes action - /// on the returned triggers - fn submachine_handle_event>( + /// on the returned machine responses + fn submachine_handle_event( &mut self, - mut sm: TSM, + sm: MachineKey, event: &HistoryEvent, has_next_event: bool, ) -> Result<()> { - let triggers = sm.handle_event(event, has_next_event)?; - event!(Level::DEBUG, msg = "Machine produced triggers", ?triggers); - for trigger in triggers { - match trigger { - WorkflowTrigger::PushWFJob(a) => { + let sm = self.all_machines.get_mut(sm).expect("Machine must exist"); + let machine_responses = sm.handle_event(event, has_next_event).map_err(|e| { + if let WFMachinesError::MalformedEventDetail(s) = e { + WFMachinesError::MalformedEvent(event.clone(), s) + } else { + e + } + })?; + if !machine_responses.is_empty() { + event!( + Level::DEBUG, + msg = "Machine produced responses", + ?machine_responses + ); + } + for response in machine_responses { + match response { + MachineResponse::PushWFJob(a) => { self.drive_me.on_activation_job(&a); self.outgoing_wf_activation_jobs.push_back(a); } - WorkflowTrigger::TriggerWFTaskStarted { + MachineResponse::TriggerWFTaskStarted { task_started_event_id, time, } => { - self.task_started(task_started_event_id, time); + self.task_started(task_started_event_id, time)?; } - WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { + MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { self.outgoing_wf_activation_jobs.push_back( wf_activation_job::Attributes::RandomSeedUpdated( RandomSeedUpdatedAttributes { @@ -433,35 +472,106 @@ impl WorkflowMachines { ), ); } + MachineResponse::NoOp => (), + MachineResponse::IssueNewCommand(_) => { + panic!("Issue new command machine response not expected here") + } } } Ok(()) } - fn handle_driven_results(&mut self, results: Vec) { + fn handle_driven_results(&mut self, results: Vec) -> Result<()> { for cmd in results { - // I don't love how boilerplatey this is match cmd { WFCommand::AddTimer(attrs) => { - let timer = self.new_timer(attrs); + let tid = attrs.timer_id.clone(); + let timer = self.add_new_machine(new_timer(attrs)); + self.timer_id_to_machine.insert(tid, timer.machine); self.current_wf_task_commands.push_back(timer); } + WFCommand::CancelTimer(attrs) => { + let mkey = *self + .timer_id_to_machine + .get(&attrs.timer_id) + .ok_or_else(|| { + WFMachinesError::MissingAssociatedMachine(format!( + "Missing associated machine for cancelling timer {}", + &attrs.timer_id + )) + })?; + let res = self.machine_mut(mkey).cancel()?; + match res { + MachineResponse::IssueNewCommand(c) => { + self.current_wf_task_commands.push_back(CommandAndMachine { + command: c, + machine: mkey, + }) + } + MachineResponse::NoOp => {} + v => { + return Err(WFMachinesError::UnexpectedMachineResponse( + v, + "When cancelling timer", + )) + } + } + } WFCommand::CompleteWorkflow(attrs) => { - self.current_wf_task_commands - .push_back(complete_workflow(attrs)); + let cwfm = self.add_new_machine(complete_workflow(attrs)); + self.current_wf_task_commands.push_back(cwfm); } WFCommand::NoCommandsFromLang => (), } } + Ok(()) } - fn prepare_commands(&mut self) { + /// Transfer commands from `current_wf_task_commands` to `commands`, so they may be sent off + /// to the server. While doing so, [TemporalStateMachine::handle_command] is called on the + /// machine associated with the command. + #[instrument(level = "debug", skip(self))] + fn prepare_commands(&mut self) -> Result<()> { + event!(Level::DEBUG, msg = "start prepare_commands", + cur_wf_task_cmds = ?self.current_wf_task_commands); while let Some(c) = self.current_wf_task_commands.pop_front() { - // TODO - some special case stuff that can maybe be managed differently? - // handleCommand should be called even on canceled ones to support mutableSideEffect - // command.handleCommand(command.getCommandType()); + let cmd_type = CommandType::from_i32(c.command.command_type) + .ok_or(WFMachinesError::UnknownCommandType(c.command.command_type))?; + if !self + .machine(c.machine) + .was_cancelled_before_sent_to_server() + { + self.machine_mut(c.machine).handle_command(cmd_type)?; + } self.commands.push_back(c); } + event!(Level::DEBUG, msg = "end prepare_commands", commands = ?self.commands); + Ok(()) + } + + fn add_new_machine( + &mut self, + machine: NewMachineWithCommand, + ) -> CommandAndMachine { + let k = self.all_machines.insert(Box::new(machine.machine)); + CommandAndMachine { + command: machine.command, + machine: k, + } + } + + fn machine(&self, m: MachineKey) -> &dyn TemporalStateMachine { + self.all_machines + .get(m) + .expect("Machine must exist") + .borrow() + } + + fn machine_mut(&mut self, m: MachineKey) -> &mut (dyn TemporalStateMachine + 'static) { + self.all_machines + .get_mut(m) + .expect("Machine must exist") + .borrow_mut() } } diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 408920881..1fad9a517 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -1,22 +1,17 @@ #![allow(clippy::enum_variant_names)] -use crate::machines::workflow_machines::WorkflowTrigger; +use crate::machines::workflow_machines::MachineResponse; +use crate::machines::Cancellable; use crate::protos::temporal::api::history::v1::history_event::Attributes::WorkflowTaskFailedEventAttributes; use crate::{ - machines::{ - workflow_machines::{WFMachinesError, WorkflowMachines}, - WFMachinesAdapter, - }, + machines::{workflow_machines::WFMachinesError, WFMachinesAdapter}, protos::temporal::api::{ enums::v1::{CommandType, EventType, WorkflowTaskFailedCause}, history::v1::HistoryEvent, }, }; use rustfsm::{fsm, TransitionResult}; -use std::panic::resume_unwind; use std::{convert::TryFrom, time::SystemTime}; -use tracing::Level; -use uuid::Uuid; fsm! { pub(super) name WorkflowTaskMachine; @@ -63,29 +58,31 @@ impl WFMachinesAdapter for WorkflowTaskMachine { event: &HistoryEvent, has_next_event: bool, my_command: WFTaskMachineCommand, - ) -> Result, WFMachinesError> { + ) -> Result, WFMachinesError> { match my_command { WFTaskMachineCommand::WFTaskStartedTrigger { task_started_event_id, time, } => { - let event_type = EventType::from_i32(event.event_type) - .ok_or_else(|| WFMachinesError::UnexpectedEvent(event.clone()))?; + let event_type = EventType::from_i32(event.event_type).ok_or_else(|| { + WFMachinesError::UnexpectedEvent( + event.clone(), + "WfTask machine could not interpret event type", + ) + })?; let cur_event_past_or_at_start = event.event_id >= task_started_event_id; if event_type == EventType::WorkflowTaskStarted && (!cur_event_past_or_at_start || has_next_event) { - // Last event in history is a task started event, so we don't - // want to iterate. return Ok(vec![]); } - Ok(vec![WorkflowTrigger::TriggerWFTaskStarted { + Ok(vec![MachineResponse::TriggerWFTaskStarted { task_started_event_id, time, }]) } WFTaskMachineCommand::RunIdOnWorkflowResetUpdate { run_id } => { - Ok(vec![WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id }]) + Ok(vec![MachineResponse::UpdateRunIdOnWorkflowReset { run_id }]) } } } @@ -131,7 +128,12 @@ impl TryFrom for WorkflowTaskMachineEvents { )); } } - _ => return Err(WFMachinesError::UnexpectedEvent(e)), + _ => { + return Err(WFMachinesError::UnexpectedEvent( + e, + "Event does not apply to a wf task machine", + )) + } }) } } @@ -144,6 +146,8 @@ impl TryFrom for WorkflowTaskMachineEvents { } } +impl Cancellable for WorkflowTaskMachine {} + #[derive(Debug, Clone)] pub(super) struct SharedState { wf_task_started_event_id: i64, diff --git a/src/protos/mod.rs b/src/protos/mod.rs index b0c223057..0893a5e9c 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -92,6 +92,10 @@ pub mod temporal { command_type: CommandType::StartTimer as i32, attributes: Some(a), }, + a @ Attributes::CancelTimerCommandAttributes(_) => Self { + command_type: CommandType::CancelTimer as i32, + attributes: Some(a), + }, a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self { command_type: CommandType::CompleteWorkflowExecution as i32, attributes: Some(a), @@ -140,7 +144,7 @@ pub mod temporal { /// /// If `up_to_event_id` is provided, the count will be returned as soon as /// processing advances past that id. - pub fn get_workflow_task_count( + pub(crate) fn get_workflow_task_count( &self, up_to_event_id: Option, ) -> Result { @@ -149,14 +153,22 @@ pub mod temporal { let mut history = self.events.iter().peekable(); while let Some(event) = history.next() { let next_event = history.peek(); + + if event.is_final_wf_execution_event() { + // If the workflow is complete, we're done. + return Ok(count); + } + if let Some(upto) = up_to_event_id { if event.event_id > upto { return Ok(count); } } + let next_is_completed = next_event.map_or(false, |ne| { ne.event_type == EventType::WorkflowTaskCompleted as i32 }); + if event.event_type == EventType::WorkflowTaskStarted as i32 && (next_event.is_none() || next_is_completed) { diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs index 4ef36537b..7d831af0d 100644 --- a/src/protosext/history_info.rs +++ b/src/protosext/history_info.rs @@ -1,5 +1,5 @@ use crate::{ - machines::{WFMachinesError, WorkflowMachines}, + machines::WorkflowMachines, protos::temporal::api::enums::v1::EventType, protos::temporal::api::history::v1::{History, HistoryEvent}, }; @@ -14,6 +14,7 @@ pub(crate) struct HistoryInfo { type Result = std::result::Result; #[derive(thiserror::Error, Debug)] +#[allow(clippy::large_enum_variant)] pub enum HistoryInfoError { #[error("Latest wf started id and previous one are equal! ${previous_started_event_id:?}")] UnexpectedEventId { @@ -24,8 +25,10 @@ pub enum HistoryInfoError { FailedOrTimeout(HistoryEvent), #[error("Last item in history wasn't WorkflowTaskStarted")] HistoryEndsUnexpectedly, - #[error("Underlying error in workflow machine")] - UnderlyingMachineError(#[from] WFMachinesError), + + // We erase the underlying error type here to keep from leaking it into public + #[error("Underlying error in workflow machine: {0:?}")] + UnderlyingMachineError(#[from] anyhow::Error), } impl HistoryInfo { @@ -128,7 +131,9 @@ impl HistoryInfo { if next_event.is_none() || next_is_completed { started_id = event.event_id; if next_event.is_none() { - wf_machines.handle_event(event, false)?; + wf_machines + .handle_event(event, false) + .map_err(anyhow::Error::from)?; return Ok(()); } } else if next_event.is_some() && !next_is_failed_or_timeout { @@ -136,7 +141,9 @@ impl HistoryInfo { } } - wf_machines.handle_event(event, next_event.is_some())?; + wf_machines + .handle_event(event, next_event.is_some()) + .map_err(anyhow::Error::from)?; if next_event.is_none() { if event.is_final_wf_execution_event() { @@ -158,37 +165,12 @@ impl HistoryInfo { #[cfg(test)] mod tests { - use super::*; - use crate::{ - machines::test_help::TestHistoryBuilder, - protos::temporal::api::history::v1::{history_event, TimerFiredEventAttributes}, - }; + use crate::test_help::canned_histories; #[test] fn history_info_constructs_properly() { - /* - 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED - 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 3: EVENT_TYPE_WORKFLOW_TASK_STARTED - 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_TIMER_FIRED - 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 8: EVENT_TYPE_WORKFLOW_TASK_STARTED - */ - let mut t = TestHistoryBuilder::default(); - - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: "timer1".to_string(), - }), - ); - t.add_workflow_task_scheduled_and_started(); + let t = canned_histories::single_timer("timer1"); + let history_info = t.get_history_info(1).unwrap(); assert_eq!(3, history_info.events.len()); let history_info = t.get_history_info(2).unwrap(); diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs new file mode 100644 index 000000000..b2b43341f --- /dev/null +++ b/src/test_help/canned_histories.rs @@ -0,0 +1,134 @@ +use crate::machines::test_help::TestHistoryBuilder; +use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; +use crate::protos::temporal::api::history::v1::{ + history_event, TimerCanceledEventAttributes, TimerFiredEventAttributes, +}; + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_TIMER_STARTED +/// 6: EVENT_TYPE_TIMER_FIRED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn single_timer(timer_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_id.to_string(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + t +} + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_TIMER_STARTED (cancel) +/// 6: EVENT_TYPE_TIMER_STARTED (wait) +/// 7: EVENT_TYPE_TIMER_FIRED (wait) +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 10: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 11: EVENT_TYPE_TIMER_CANCELED (cancel) +/// 12: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED +pub fn cancel_timer(wait_timer_id: &str, cancel_timer_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let cancel_timer_started_id = t.add_get_event_id(EventType::TimerStarted, None); + let wait_timer_started_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: wait_timer_started_id, + timer_id: wait_timer_id.to_string(), + }), + ); + // 8 + t.add_full_wf_task(); + // 11 + t.add( + EventType::TimerCanceled, + history_event::Attributes::TimerCanceledEventAttributes(TimerCanceledEventAttributes { + started_event_id: cancel_timer_started_id, + timer_id: cancel_timer_id.to_string(), + ..Default::default() + }), + ); + // 12 + t.add_workflow_execution_completed(); + t +} + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_TIMER_STARTED +/// 6: EVENT_TYPE_TIMER_STARTED +/// 7: EVENT_TYPE_TIMER_FIRED +/// 8: EVENT_TYPE_TIMER_FIRED +/// 9: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 10: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn parallel_timer(timer1: &str, timer2: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + let timer_2_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer1.to_string(), + }), + ); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_2_started_event_id, + timer_id: timer2.to_string(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + t +} + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_TIMER_STARTED +/// 6: EVENT_TYPE_TIMER_FIRED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_FAILED +/// 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 11: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn workflow_fails_after_timer(timer_id: &str, original_run_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_id.to_string(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, original_run_id); + + t.add_workflow_task_scheduled_and_started(); + t +} diff --git a/src/test_help/mod.rs b/src/test_help/mod.rs new file mode 100644 index 000000000..e0fd84f78 --- /dev/null +++ b/src/test_help/mod.rs @@ -0,0 +1 @@ +pub mod canned_histories; diff --git a/src/workflow/bridge.rs b/src/workflow/bridge.rs index c4b8bc699..cc6f0895a 100644 --- a/src/workflow/bridge.rs +++ b/src/workflow/bridge.rs @@ -32,10 +32,9 @@ impl WorkflowBridge { } impl DrivenWorkflow for WorkflowBridge { - fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec { + fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) { event!(Level::DEBUG, msg = "Workflow bridge start called", ?attribs); self.started_attrs = Some(attribs); - vec![] } fn fetch_workflow_iteration_output(&mut self) -> Vec { diff --git a/src/workflow/concurrency_manager.rs b/src/workflow/concurrency_manager.rs index 2e8a241bf..c9a40f467 100644 --- a/src/workflow/concurrency_manager.rs +++ b/src/workflow/concurrency_manager.rs @@ -10,6 +10,7 @@ use crossbeam::channel::{bounded, unbounded, Receiver, Select, Sender, TryRecvEr use dashmap::DashMap; use std::{ fmt::Debug, + sync::Mutex, thread::{self, JoinHandle}, }; use tracing::Level; @@ -22,7 +23,7 @@ pub(crate) struct WorkflowConcurrencyManager { // in core SDK yet either - once we're ready to remove things, they can be removed from this // map and the wfm thread will drop the machines. machines: DashMap, - wf_thread: JoinHandle<()>, + wf_thread: Mutex>>, machine_creator: Sender, shutdown_chan: Sender, } @@ -51,7 +52,7 @@ impl WorkflowConcurrencyManager { Self { machines: Default::default(), - wf_thread, + wf_thread: Mutex::new(Some(wf_thread)), machine_creator, shutdown_chan, } @@ -119,10 +120,13 @@ impl WorkflowConcurrencyManager { /// /// # Panics /// If the workflow machine thread panicked - #[allow(unused)] // TODO: Will be used when other shutdown PR is merged - pub fn shutdown(self) { + pub fn shutdown(&self) { let _ = self.shutdown_chan.send(true); self.wf_thread + .lock() + .expect("Workflow manager thread mutex must be lockable") + .take() + .unwrap() .join() .expect("Workflow manager thread should shut down cleanly"); } @@ -254,7 +258,7 @@ mod tests { let mgr = WorkflowConcurrencyManager::new(); let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); + t.add_full_wf_task(); let activation = mgr .create_or_update( diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 95109e3ae..a48731261 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -1,6 +1,7 @@ use assert_matches::assert_matches; use rand::{self, Rng}; use std::{convert::TryFrom, env, time::Duration}; +use temporal_sdk_core::protos::temporal::api::command::v1::CancelTimerCommandAttributes; use temporal_sdk_core::{ protos::{ coresdk::{wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob}, @@ -15,6 +16,9 @@ use temporal_sdk_core::{ // restarted, because pulling from the same task queue produces tasks for the previous failed // workflows. Fix that. +// TODO: We should also get expected histories for these tests and confirm that the history +// at the end matches. + const NAMESPACE: &str = "default"; #[tokio::main] @@ -134,3 +138,97 @@ fn parallel_timer_workflow() { )) .unwrap(); } + +#[test] +fn timer_cancel_workflow() { + let task_q = "timer_cancel_workflow"; + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let url = Url::try_from(&*temporal_server_address).unwrap(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + }; + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); + let timer_id = "wait_timer"; + let cancel_timer_id = "cancel_timer"; + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_millis(50).into()), + ..Default::default() + } + .into(), + StartTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(10).into()), + ..Default::default() + } + .into(), + ], + task.task_token, + )) + .unwrap(); + let task = dbg!(core.poll_task(task_q).unwrap()); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ + CancelTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + } + .into(), + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + ], + task.task_token, + )) + .unwrap(); +} + +#[test] +fn timer_immediate_cancel_workflow() { + let task_q = "timer_cancel_workflow"; + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let url = Url::try_from(&*temporal_server_address).unwrap(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + }; + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string()); + let cancel_timer_id = "cancel_timer"; + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ + StartTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + CancelTimerCommandAttributes { + timer_id: cancel_timer_id.to_string(), + ..Default::default() + } + .into(), + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + ], + task.task_token, + )) + .unwrap(); +}