From 2508f58fa5513d5ffdb022881e483dd977d069a9 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 15 Feb 2021 14:04:53 -0800 Subject: [PATCH 01/15] Added run_id update on workflow reset --- src/machines/workflow_machines.rs | 6 ++++ src/machines/workflow_task_state_machine.rs | 40 ++++++++++++++++++--- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 0a34c6332..e9f13f1c1 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -76,6 +76,9 @@ pub(super) enum WorkflowTrigger { task_started_event_id: i64, time: SystemTime, }, + UpdateRunIdOnWorkflowReset { + run_id: String, + }, } #[derive(thiserror::Error, Debug)] @@ -410,6 +413,9 @@ impl WorkflowMachines { } => { self.task_started(task_started_event_id, time); } + WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { + self.run_id = new_run_id; + } } } Ok(()) diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 5a2ea6698..9bcbdfe7e 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -14,6 +14,7 @@ use crate::{ use rustfsm::{fsm, TransitionResult}; use std::{convert::TryFrom, time::SystemTime}; use tracing::Level; +use crate::protos::temporal::api::history::v1::history_event::Attributes::WorkflowTaskFailedEventAttributes; fsm! { pub(super) name WorkflowTaskMachine; @@ -27,7 +28,7 @@ fsm! { Scheduled --(WorkflowTaskTimedOut) --> TimedOut; Started --(WorkflowTaskCompleted, on_workflow_task_completed) --> Completed; - Started --(WorkflowTaskFailed, on_workflow_task_failed) --> Failed; + Started --(WorkflowTaskFailed(WFTFailedDat), on_workflow_task_failed) --> Failed; Started --(WorkflowTaskTimedOut) --> TimedOut; } @@ -49,6 +50,9 @@ pub(super) enum WFTaskMachineCommand { task_started_event_id: i64, time: SystemTime, }, + RunIdOnWorkflowResetUpdate { + run_id: String, + }, } impl WFMachinesAdapter for WorkflowTaskMachine { @@ -78,6 +82,9 @@ impl WFMachinesAdapter for WorkflowTaskMachine { time, }]) } + WFTaskMachineCommand::RunIdOnWorkflowResetUpdate { run_id } => { + Ok(vec![WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id }]) + } } } } @@ -99,7 +106,23 @@ impl TryFrom for WorkflowTaskMachineEvents { }), Some(EventType::WorkflowTaskTimedOut) => Self::WorkflowTaskTimedOut, Some(EventType::WorkflowTaskCompleted) => Self::WorkflowTaskCompleted, - Some(EventType::WorkflowTaskFailed) => Self::WorkflowTaskFailed, + Some(EventType::WorkflowTaskFailed) => Self::WorkflowTaskFailed(WFTFailedDat { + // TODO(maxim): How to avoid clone of attributes. We need to borrow e for the + // MalformedEvent down there. But forcing clone just for that doesn't make sense. + new_run_id: e.attributes.clone().ok_or_else(|| { + WFMachinesError::MalformedEvent( + e, + "Workflow task failed missing attributes".to_string(), + ) + }).map(|attr| match attr { + WorkflowTaskFailedEventAttributes(a) => + match a.cause { + workflow_task_failed_cause_reset_workflow => Some(a.new_run_id), + _ => None + } + _ => None + })? + }), _ => return Err(WFMachinesError::UnexpectedEvent(e)), }) } @@ -134,6 +157,11 @@ pub(super) struct WFTStartedDat { current_time_millis: SystemTime, started_event_id: i64, } + +pub(super) struct WFTFailedDat { + new_run_id: Option, +} + impl Scheduled { pub(super) fn on_workflow_task_started( self, @@ -179,8 +207,12 @@ impl Started { }, ]) } - pub(super) fn on_workflow_task_failed(self) -> WorkflowTaskMachineTransition { - unimplemented!() + pub(super) fn on_workflow_task_failed(self, data: WFTFailedDat) -> WorkflowTaskMachineTransition { + let commands = match data.new_run_id { + Some(run_id) => vec![WFTaskMachineCommand::RunIdOnWorkflowResetUpdate { run_id }], + None => vec![], + }; + WorkflowTaskMachineTransition::commands::<_, Completed>(commands) } } From 9da2265ffb4839716501ae0fb6c94855145452ae Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 15 Feb 2021 15:15:42 -0800 Subject: [PATCH 02/15] Fixed WorkflowTaskFailedCause handling --- src/machines/workflow_task_state_machine.rs | 22 +++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 9bcbdfe7e..6f229d1ec 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -7,7 +7,7 @@ use crate::{ WFMachinesAdapter, }, protos::temporal::api::{ - enums::v1::{CommandType, EventType}, + enums::v1::{CommandType, EventType, WorkflowTaskFailedCause}, history::v1::HistoryEvent, }, }; @@ -112,15 +112,19 @@ impl TryFrom for WorkflowTaskMachineEvents { new_run_id: e.attributes.clone().ok_or_else(|| { WFMachinesError::MalformedEvent( e, - "Workflow task failed missing attributes".to_string(), + "Workflow task failed is missing attributes".to_string(), ) - }).map(|attr| match attr { - WorkflowTaskFailedEventAttributes(a) => - match a.cause { - workflow_task_failed_cause_reset_workflow => Some(a.new_run_id), - _ => None + }).map(|attr| { + match attr { + WorkflowTaskFailedEventAttributes(a) => { + let cause = WorkflowTaskFailedCause::from_i32(a.cause); + match cause { + Some(WorkflowTaskFailedCause::ResetWorkflow) => Some(a.new_run_id), + _ => None + } } - _ => None + _ => None + } })? }), _ => return Err(WFMachinesError::UnexpectedEvent(e)), @@ -218,11 +222,13 @@ impl Started { #[derive(Default, Clone)] pub(super) struct TimedOut {} + impl From for TimedOut { fn from(_: Scheduled) -> Self { Self::default() } } + impl From for TimedOut { fn from(_: Started) -> Self { Self::default() From 9d051b8335fd4bff2f47184634a302b7f7f55a1e Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 15 Feb 2021 18:12:06 -0800 Subject: [PATCH 03/15] cargo fmt --- src/lib.rs | 4 +-- src/machines/timer_state_machine.rs | 4 +-- src/machines/workflow_task_state_machine.rs | 31 ++++++++++++--------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d0379593e..cb306cd65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -200,9 +200,7 @@ where TaskCompletion { variant: Some(task_completion::Variant::Activity(_)), .. - } => { - unimplemented!() - } + } => unimplemented!(), _ => Err(CoreError::MalformedCompletion(req)), } } diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index e77b1d7c3..13640f22a 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -227,9 +227,7 @@ impl WFMachinesAdapter for TimerMachine { timer_id: self.shared_state.timer_attributes.timer_id.clone(), } .into()]), - TimerMachineCommand::AddCommand(_) => { - unreachable!() - } + TimerMachineCommand::AddCommand(_) => unreachable!(), } } } diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 6f229d1ec..31b2086e5 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -1,6 +1,7 @@ #![allow(clippy::enum_variant_names)] use crate::machines::workflow_machines::WorkflowTrigger; +use crate::protos::temporal::api::history::v1::history_event::Attributes::WorkflowTaskFailedEventAttributes; use crate::{ machines::{ workflow_machines::{WFMachinesError, WorkflowMachines}, @@ -14,7 +15,6 @@ use crate::{ use rustfsm::{fsm, TransitionResult}; use std::{convert::TryFrom, time::SystemTime}; use tracing::Level; -use crate::protos::temporal::api::history::v1::history_event::Attributes::WorkflowTaskFailedEventAttributes; fsm! { pub(super) name WorkflowTaskMachine; @@ -109,23 +109,25 @@ impl TryFrom for WorkflowTaskMachineEvents { Some(EventType::WorkflowTaskFailed) => Self::WorkflowTaskFailed(WFTFailedDat { // TODO(maxim): How to avoid clone of attributes. We need to borrow e for the // MalformedEvent down there. But forcing clone just for that doesn't make sense. - new_run_id: e.attributes.clone().ok_or_else(|| { - WFMachinesError::MalformedEvent( - e, - "Workflow task failed is missing attributes".to_string(), - ) - }).map(|attr| { - match attr { + new_run_id: e + .attributes + .clone() + .ok_or_else(|| { + WFMachinesError::MalformedEvent( + e, + "Workflow task failed is missing attributes".to_string(), + ) + }) + .map(|attr| match attr { WorkflowTaskFailedEventAttributes(a) => { let cause = WorkflowTaskFailedCause::from_i32(a.cause); match cause { Some(WorkflowTaskFailedCause::ResetWorkflow) => Some(a.new_run_id), - _ => None + _ => None, } } - _ => None - } - })? + _ => None, + })?, }), _ => return Err(WFMachinesError::UnexpectedEvent(e)), }) @@ -211,7 +213,10 @@ impl Started { }, ]) } - pub(super) fn on_workflow_task_failed(self, data: WFTFailedDat) -> WorkflowTaskMachineTransition { + pub(super) fn on_workflow_task_failed( + self, + data: WFTFailedDat, + ) -> WorkflowTaskMachineTransition { let commands = match data.new_run_id { Some(run_id) => vec![WFTaskMachineCommand::RunIdOnWorkflowResetUpdate { run_id }], None => vec![], From a91833022743ebbba64097affcf799b516e66ca4 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 15 Feb 2021 18:58:22 -0800 Subject: [PATCH 04/15] Added unit test of workflow reset --- src/machines/test_help/history_builder.rs | 13 ++- src/machines/timer_state_machine.rs | 108 ++++++++++++++++++++-- 2 files changed, 112 insertions(+), 9 deletions(-) diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 8a3230790..0d96bfdc9 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,5 +1,6 @@ use super::Result; -use crate::protos::temporal::api::history::v1::History; +use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; +use crate::protos::temporal::api::history::v1::{History, WorkflowTaskFailedEventAttributes}; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -88,6 +89,16 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into()); } + pub fn add_workflow_task_failed(&mut self, cause: WorkflowTaskFailedCause, new_run_id: &str) { + let attrs = WorkflowTaskFailedEventAttributes { + scheduled_event_id: self.workflow_task_scheduled_event_id, + cause: cause.into(), + new_run_id: new_run_id.into(), + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into()); + } + pub fn as_history(&self) -> History { History { events: self.events.clone(), diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 13640f22a..69a78f2cf 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -149,6 +149,7 @@ impl Created { #[derive(Default, Clone)] pub(super) struct CancelTimerCommandCreated {} + impl CancelTimerCommandCreated { pub(super) fn on_command_cancel_timer(self, dat: SharedState) -> TimerMachineTransition { TimerMachineTransition::ok( @@ -163,6 +164,7 @@ pub(super) struct CancelTimerCommandSent {} #[derive(Default, Clone)] pub(super) struct Canceled {} + impl From for Canceled { fn from(_: CancelTimerCommandSent) -> Self { Self::default() @@ -235,6 +237,7 @@ impl WFMachinesAdapter for TimerMachine { #[cfg(test)] mod test { use super::*; + use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, @@ -269,13 +272,8 @@ mod test { 8: EVENT_TYPE_WORKFLOW_TASK_STARTED We have two versions of this test, one which processes the history in two calls, - and one which replays all of it in one go. The former will run the event loop three - times total, and the latter two. - - There are two workflow tasks, so it seems we should only loop two times, but the reason - for the extra iteration in the incremental version is that we need to "wait" for the - timer to fire. In the all-in-one-go test, the timer is created and resolved in the same - task, hence no extra loop. + and one which replays all of it in one go. Both versions must produce the same number + of activations. */ let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let timer = StartTimerCommandAttributes { @@ -324,7 +322,6 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); - dbg!(state_machines.get_wf_activation()); assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, @@ -347,4 +344,99 @@ mod test { CommandType::CompleteWorkflowExecution as i32 ); } + + #[fixture] + fn workflow_reset_hist() -> (TestHistoryBuilder, WorkflowMachines) { + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_FIRED + 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 8: EVENT_TYPE_WORKFLOW_TASK_STARTED + 9: EVENT_TYPE_WORKFLOW_TASK_FAILED + 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 11: EVENT_TYPE_WORKFLOW_TASK_STARTED + + We have two versions of this test, one which processes the history in two calls, + and one which replays all of it in one go. Both versions must produce the same number + of activations. + */ + let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { + let timer = StartTimerCommandAttributes { + timer_id: "Sometimer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(5).into()), + }; + command_sink.timer(timer).await; + + let complete = CompleteWorkflowExecutionCommandAttributes::default(); + command_sink.send(complete.into()); + }); + + let mut t = TestHistoryBuilder::default(); + let mut state_machines = + WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: "timer1".to_string(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, "runId2"); + + t.add_workflow_task_scheduled_and_started(); + + assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); + (t, state_machines) + } + + #[rstest] + fn test_reset_workflow_path_full(workflow_reset_hist: (TestHistoryBuilder, WorkflowMachines)) { + let s = span!(Level::DEBUG, "Test start", t = "full"); + let _enter = s.enter(); + + let (t, mut state_machines) = workflow_reset_hist; + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .unwrap(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); + assert_eq!("runId2", state_machines.run_id); + } + + #[rstest] + fn test_reset_workflow_inc(workflow_reset_hist: (TestHistoryBuilder, WorkflowMachines)) { + let s = span!(Level::DEBUG, "Test start", t = "inc"); + let _enter = s.enter(); + + let (t, mut state_machines) = workflow_reset_hist; + + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) + .unwrap(); + dbg!(&commands); + dbg!(state_machines.get_wf_activation()); + assert_eq!(commands.len(), 1); + assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) + .unwrap(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::CompleteWorkflowExecution as i32 + ); + assert_eq!("runId2", state_machines.run_id); + } } From bfbb753b3de290a3dc35c7c3f3fa29eed01d3b5b Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Mon, 15 Feb 2021 19:04:36 -0800 Subject: [PATCH 05/15] Added dbg to tests --- src/machines/timer_state_machine.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 69a78f2cf..5efe18c4f 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -237,7 +237,6 @@ impl WFMachinesAdapter for TimerMachine { #[cfg(test)] mod test { use super::*; - use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, @@ -247,6 +246,7 @@ mod test { }, protos::temporal::api::{ command::v1::CompleteWorkflowExecutionCommandAttributes, + enums::v1::WorkflowTaskFailedCause, history::v1::{ TimerFiredEventAttributes, WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, @@ -322,6 +322,7 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); + dbg!(state_machines.get_wf_activation()); assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, @@ -407,6 +408,7 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); + assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, @@ -427,11 +429,14 @@ mod test { .unwrap(); dbg!(&commands); dbg!(state_machines.get_wf_activation()); + assert_eq!(commands.len(), 1); assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); + dbg!(state_machines.get_wf_activation()); + assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, From aa77c9a44d3dddaa3e53bc31c4a02b46efbf5ced Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 16 Feb 2021 20:26:56 -0800 Subject: [PATCH 06/15] Avoid clone of attributes --- src/machines/workflow_task_state_machine.rs | 34 +++++++++++---------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 31b2086e5..c85cb1be4 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -106,29 +106,31 @@ impl TryFrom for WorkflowTaskMachineEvents { }), Some(EventType::WorkflowTaskTimedOut) => Self::WorkflowTaskTimedOut, Some(EventType::WorkflowTaskCompleted) => Self::WorkflowTaskCompleted, - Some(EventType::WorkflowTaskFailed) => Self::WorkflowTaskFailed(WFTFailedDat { - // TODO(maxim): How to avoid clone of attributes. We need to borrow e for the - // MalformedEvent down there. But forcing clone just for that doesn't make sense. - new_run_id: e - .attributes - .clone() - .ok_or_else(|| { - WFMachinesError::MalformedEvent( - e, - "Workflow task failed is missing attributes".to_string(), - ) - }) - .map(|attr| match attr { + Some(EventType::WorkflowTaskFailed) => { + let attributes = e.attributes.as_ref().ok_or_else(|| { + WFMachinesError::MalformedEvent( + e.clone(), + "Workflow task failed is missing attributes".to_string(), + ) + })?; + + Self::WorkflowTaskFailed(WFTFailedDat { + // TODO(maxim): How to avoid clone of attributes. We need to borrow e for the + // MalformedEvent down there. But forcing clone just for that doesn't make sense. + new_run_id: match attributes { WorkflowTaskFailedEventAttributes(a) => { let cause = WorkflowTaskFailedCause::from_i32(a.cause); match cause { - Some(WorkflowTaskFailedCause::ResetWorkflow) => Some(a.new_run_id), + Some(WorkflowTaskFailedCause::ResetWorkflow) => { + Some(a.new_run_id.clone()) + } _ => None, } } _ => None, - })?, - }), + }, + }) + } _ => return Err(WFMachinesError::UnexpectedEvent(e)), }) } From abde13d37f912b34213a41fb90416727bcdf151d Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 16 Feb 2021 20:30:35 -0800 Subject: [PATCH 07/15] removed outdated comment --- src/machines/workflow_task_state_machine.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index c85cb1be4..89c73b84a 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -115,8 +115,6 @@ impl TryFrom for WorkflowTaskMachineEvents { })?; Self::WorkflowTaskFailed(WFTFailedDat { - // TODO(maxim): How to avoid clone of attributes. We need to borrow e for the - // MalformedEvent down there. But forcing clone just for that doesn't make sense. new_run_id: match attributes { WorkflowTaskFailedEventAttributes(a) => { let cause = WorkflowTaskFailedCause::from_i32(a.cause); From a7ac69bc8cdf3ad916506b25dea4cf788e829823 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 16 Feb 2021 21:31:00 -0800 Subject: [PATCH 08/15] Rewrite of WorkflowTaskFailed to avoid clones --- src/machines/workflow_task_state_machine.rs | 38 ++++++++++----------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 89c73b84a..29618a13a 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -107,27 +107,27 @@ impl TryFrom for WorkflowTaskMachineEvents { Some(EventType::WorkflowTaskTimedOut) => Self::WorkflowTaskTimedOut, Some(EventType::WorkflowTaskCompleted) => Self::WorkflowTaskCompleted, Some(EventType::WorkflowTaskFailed) => { - let attributes = e.attributes.as_ref().ok_or_else(|| { - WFMachinesError::MalformedEvent( - e.clone(), - "Workflow task failed is missing attributes".to_string(), - ) - })?; - - Self::WorkflowTaskFailed(WFTFailedDat { - new_run_id: match attributes { - WorkflowTaskFailedEventAttributes(a) => { - let cause = WorkflowTaskFailedCause::from_i32(a.cause); - match cause { - Some(WorkflowTaskFailedCause::ResetWorkflow) => { - Some(a.new_run_id.clone()) + if let Some(attributes) = e.attributes { + Self::WorkflowTaskFailed(WFTFailedDat { + new_run_id: match attributes { + WorkflowTaskFailedEventAttributes(a) => { + let cause = WorkflowTaskFailedCause::from_i32(a.cause); + match cause { + Some(WorkflowTaskFailedCause::ResetWorkflow) => { + Some(a.new_run_id) + } + _ => None, } - _ => None, } - } - _ => None, - }, - }) + _ => None, + }, + }) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Workflow task failed is missing attributes".to_string(), + )); + } } _ => return Err(WFMachinesError::UnexpectedEvent(e)), }) From b5d650e1eac74e06665d6e15dd61b27fc7df08bb Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 16 Feb 2021 22:17:44 -0800 Subject: [PATCH 09/15] Introduced randomness_seed --- src/lib.rs | 6 ++--- src/machines/timer_state_machine.rs | 26 ++++++++++++++------- src/machines/workflow_machines.rs | 18 ++++++++++++-- src/machines/workflow_task_state_machine.rs | 2 ++ 4 files changed, 39 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cb306cd65..a9bc47e04 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -343,7 +343,7 @@ mod test { #[test] fn timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; + let run_id = "58CE65AC-1C8C-413C-B525-6D3C5EAD62D7"; let timer_id = "fake_timer".to_string(); let task_queue = "test-task-queue"; @@ -411,7 +411,7 @@ mod test { #[test] fn parallel_timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; + let run_id = "9895E53E-D0A2-4E8A-A478-E6221ED2ACEB"; let timer_1_id = "timer1".to_string(); let timer_2_id = "timer2".to_string(); let task_queue = "test-task-queue"; @@ -512,7 +512,7 @@ mod test { let _enter = s.enter(); let wfid = "fake_wf_id"; - let run_id = "fake_run_id"; + let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; let timer_1_id = "timer1".to_string(); let task_queue = "test-task-queue"; diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 5efe18c4f..da46cada2 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -287,8 +287,11 @@ mod test { }); let mut t = TestHistoryBuilder::default(); - let mut state_machines = - WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + let mut state_machines = WorkflowMachines::new( + "wfid".to_string(), + "54619485-18AC-4970-9BF2-D24F875F7816".to_string(), + Box::new(twd), + ); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task(); @@ -346,6 +349,8 @@ mod test { ); } + const NEW_RUN_ID: &'static str = "86E39A5F-AE31-4626-BDFE-398EE072D156"; + #[fixture] fn workflow_reset_hist() -> (TestHistoryBuilder, WorkflowMachines) { /* @@ -377,8 +382,11 @@ mod test { }); let mut t = TestHistoryBuilder::default(); - let mut state_machines = - WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + let mut state_machines = WorkflowMachines::new( + "wfid".to_string(), + "54619485-18AC-4970-9BF2-D24F875F7816".to_string(), + Box::new(twd), + ); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task(); @@ -391,7 +399,7 @@ mod test { }), ); t.add_workflow_task_scheduled_and_started(); - t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, "runId2"); + t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, NEW_RUN_ID); t.add_workflow_task_scheduled_and_started(); @@ -405,6 +413,8 @@ mod test { let _enter = s.enter(); let (t, mut state_machines) = workflow_reset_hist; + let initial_seed = state_machines.randomness_seed; + assert!(initial_seed > 0); let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); @@ -414,7 +424,7 @@ mod test { commands[0].command_type, CommandType::CompleteWorkflowExecution as i32 ); - assert_eq!("runId2", state_machines.run_id); + assert_ne!(initial_seed, state_machines.randomness_seed); } #[rstest] @@ -423,7 +433,7 @@ mod test { let _enter = s.enter(); let (t, mut state_machines) = workflow_reset_hist; - + let initial_seed = state_machines.randomness_seed; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) .unwrap(); @@ -442,6 +452,6 @@ mod test { commands[0].command_type, CommandType::CompleteWorkflowExecution as i32 ); - assert_eq!("runId2", state_machines.run_id); + assert_ne!(initial_seed, state_machines.randomness_seed); } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index e9f13f1c1..3b1820f34 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -26,6 +26,7 @@ use std::{ time::SystemTime, }; use tracing::Level; +use uuid::Uuid; type Result = std::result::Result; @@ -44,10 +45,12 @@ pub(crate) struct WorkflowMachines { replaying: bool, /// Workflow identifier pub workflow_id: String, - /// Identifies the current run and is used as a seed for faux-randomness. + /// Identifies the current run pub run_id: String, /// The current workflow time if it has been established current_wf_time: Option, + /// Seed for faux-randomness + pub randomness_seed: i64, /// A mapping for accessing all the machines, where the key is the id of the initiating event /// for that machine. @@ -104,6 +107,7 @@ impl WorkflowMachines { ) -> Self { Self { workflow_id, + randomness_seed: uuid_to_randomness_seed(&run_id), run_id, drive_me: driven_wf, // In an ideal world one could say ..Default::default() here and it'd still work. @@ -414,7 +418,7 @@ impl WorkflowMachines { self.task_started(task_started_event_id, time); } WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { - self.run_id = new_run_id; + self.randomness_seed = uuid_to_randomness_seed(&new_run_id); } } } @@ -447,3 +451,13 @@ impl WorkflowMachines { } } } + +fn uuid_to_randomness_seed(run_id: &str) -> i64 { + let uuid = Uuid::parse_str(run_id).unwrap(); + let fields = uuid.as_fields(); + let b0 = fields.0 as i64; + let b32 = fields.1 as i64 >> 32; + let b48 = fields.2 as i64 >> 48; + let seed: i64 = b0 + b32 + b48; + seed +} diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 29618a13a..408920881 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -13,8 +13,10 @@ use crate::{ }, }; use rustfsm::{fsm, TransitionResult}; +use std::panic::resume_unwind; use std::{convert::TryFrom, time::SystemTime}; use tracing::Level; +use uuid::Uuid; fsm! { pub(super) name WorkflowTaskMachine; From 001693bb08405db64a5a7c33c1bb90ebcac58815 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Wed, 17 Feb 2021 12:45:25 -0800 Subject: [PATCH 10/15] Added RandomSeedUpdated activation --- protos/local/core_interface.proto | 11 ++++++++++- src/machines/test_help/history_builder.rs | 7 +++++-- src/machines/timer_state_machine.rs | 5 ----- src/machines/workflow_machines.rs | 19 ++++++++++++++----- 4 files changed, 29 insertions(+), 13 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index f81dc63bd..4b7382c70 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -63,8 +63,10 @@ message WFActivationJob { StartWorkflowTaskAttributes start_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed TimerFiredTaskAttributes timer_fired = 2; + // Workflow was reset. The randomness seed has to be updated. + RandomSeedUpdatedAttributes random_seed_updated = 3; - QueryWorkflowJob query_workflow = 3; + QueryWorkflowJob query_workflow = 4; } } @@ -75,6 +77,9 @@ message StartWorkflowTaskAttributes { string workflow_id = 2; // Input to the workflow code temporal.api.common.v1.Payloads arguments = 3; + // The seed must be used to initialize the random generator used by SDK. + // RandomSeedUpdatedAttributes are used to deliver seed updates. + int64 randomness_seed = 4; // TODO: Do we need namespace here, or should that just be fetchable easily? // will be others - workflow exe started attrs, etc @@ -84,6 +89,10 @@ message TimerFiredTaskAttributes { string timer_id = 1; } +message RandomSeedUpdatedAttributes { + int64 randomness_seed = 1; +} + message QueryWorkflowJob { temporal.api.query.v1.WorkflowQuery query = 1; } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 0d96bfdc9..92ff651f7 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -15,6 +15,7 @@ use crate::{ }; use anyhow::bail; use std::time::SystemTime; +use uuid::Uuid; #[derive(Default, Debug)] pub struct TestHistoryBuilder { @@ -153,9 +154,11 @@ impl TestHistoryBuilder { fn default_attribs(et: EventType) -> Result { Ok(match et { - EventType::WorkflowExecutionStarted => { - WorkflowExecutionStartedEventAttributes::default().into() + EventType::WorkflowExecutionStarted => WorkflowExecutionStartedEventAttributes { + original_execution_run_id: Uuid::new_v4().to_string(), + ..Default::default() } + .into(), EventType::WorkflowTaskScheduled => WorkflowTaskScheduledEventAttributes::default().into(), EventType::TimerStarted => TimerStartedEventAttributes::default().into(), _ => bail!("Don't know how to construct default attrs for {:?}", et), diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index da46cada2..8382b44d4 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -413,8 +413,6 @@ mod test { let _enter = s.enter(); let (t, mut state_machines) = workflow_reset_hist; - let initial_seed = state_machines.randomness_seed; - assert!(initial_seed > 0); let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); @@ -424,7 +422,6 @@ mod test { commands[0].command_type, CommandType::CompleteWorkflowExecution as i32 ); - assert_ne!(initial_seed, state_machines.randomness_seed); } #[rstest] @@ -433,7 +430,6 @@ mod test { let _enter = s.enter(); let (t, mut state_machines) = workflow_reset_hist; - let initial_seed = state_machines.randomness_seed; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) .unwrap(); @@ -452,6 +448,5 @@ mod test { commands[0].command_type, CommandType::CompleteWorkflowExecution as i32 ); - assert_ne!(initial_seed, state_machines.randomness_seed); } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 3b1820f34..b6beebb63 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -6,7 +6,10 @@ use crate::{ }, protos::coresdk::WfActivationJob, protos::{ - coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation}, + coresdk::{ + wf_activation_job, wf_activation_job::Attributes::RandomSeedUpdated, + RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, WfActivation, + }, temporal::api::{ command::v1::StartTimerCommandAttributes, common::v1::WorkflowExecution, @@ -49,8 +52,6 @@ pub(crate) struct WorkflowMachines { pub run_id: String, /// The current workflow time if it has been established current_wf_time: Option, - /// Seed for faux-randomness - pub randomness_seed: i64, /// A mapping for accessing all the machines, where the key is the id of the initiating event /// for that machine. @@ -107,7 +108,6 @@ impl WorkflowMachines { ) -> Self { Self { workflow_id, - randomness_seed: uuid_to_randomness_seed(&run_id), run_id, drive_me: driven_wf, // In an ideal world one could say ..Default::default() here and it'd still work. @@ -299,6 +299,9 @@ impl WorkflowMachines { .unwrap_or_default(), workflow_id: self.workflow_id.clone(), arguments: attrs.input.clone(), + randomness_seed: uuid_to_randomness_seed( + &attrs.original_execution_run_id, + ), } .into(), ); @@ -418,7 +421,13 @@ impl WorkflowMachines { self.task_started(task_started_event_id, time); } WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { - self.randomness_seed = uuid_to_randomness_seed(&new_run_id); + self.outgoing_wf_activation_jobs.push_back( + wf_activation_job::Attributes::RandomSeedUpdated( + RandomSeedUpdatedAttributes { + randomness_seed: uuid_to_randomness_seed(&new_run_id), + }, + ), + ); } } } From 65e117322d6febcc01b35761b270e6b3a200b639 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Wed, 17 Feb 2021 14:49:29 -0800 Subject: [PATCH 11/15] Refactored reset unit tests --- src/lib.rs | 95 ++++++++++++++++++++++++++++- src/machines/timer_state_machine.rs | 43 ------------- src/machines/workflow_machines.rs | 2 +- 3 files changed, 94 insertions(+), 46 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a9bc47e04..930075a09 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -328,13 +328,14 @@ mod test { machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ coresdk::{ - wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, + wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, + TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, }, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, - enums::v1::EventType, + enums::v1::{EventType, WorkflowTaskFailedCause}, history::v1::{history_event, TimerFiredEventAttributes}, }, }, @@ -566,4 +567,94 @@ mod test { )) .unwrap(); } + + const NEW_RUN_ID: &'static str = "86E39A5F-AE31-4626-BDFE-398EE072D156"; + + #[test] + fn workflow_reset_whole_replay_test_across_wf_bridge() { + let s = span!(Level::DEBUG, "Test start", t = "bridge"); + let _enter = s.enter(); + + let wfid = "fake_wf_id"; + let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; + let timer_1_id = "timer1".to_string(); + let task_queue = "test-task-queue"; + + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_FIRED + 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 8: EVENT_TYPE_WORKFLOW_TASK_STARTED + 9: EVENT_TYPE_WORKFLOW_TASK_FAILED + 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 11: EVENT_TYPE_WORKFLOW_TASK_STARTED + */ + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_1_id.clone(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, NEW_RUN_ID); + + t.add_workflow_task_scheduled_and_started(); + + // NOTE! What makes this a replay test is the server only responds with *one* batch here. + // So, server is polled once, but lang->core interactions look just like non-replay test. + let core = build_fake_core(wfid, run_id, &mut t, &[2]); + + let res = core.poll_task(task_queue).unwrap(); + let randomness_seed_from_start: i64; + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow( + StartWorkflowTaskAttributes{randomness_seed, ..} + )), + }] => { + randomness_seed_from_start = *randomness_seed; + } + ); + assert!(core.workflow_machines.get(run_id).is_some()); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_1_id, + ..Default::default() + } + .into()], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_),), + }, + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::RandomSeedUpdated(RandomSeedUpdatedAttributes{randomness_seed})), + }] => { + assert_ne!(randomness_seed_from_start, *randomness_seed) + } + ); + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } } diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 8382b44d4..601eef669 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -406,47 +406,4 @@ mod test { assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); (t, state_machines) } - - #[rstest] - fn test_reset_workflow_path_full(workflow_reset_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "full"); - let _enter = s.enter(); - - let (t, mut state_machines) = workflow_reset_hist; - let commands = t - .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) - .unwrap(); - - assert_eq!(commands.len(), 1); - assert_eq!( - commands[0].command_type, - CommandType::CompleteWorkflowExecution as i32 - ); - } - - #[rstest] - fn test_reset_workflow_inc(workflow_reset_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "inc"); - let _enter = s.enter(); - - let (t, mut state_machines) = workflow_reset_hist; - let commands = t - .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) - .unwrap(); - dbg!(&commands); - dbg!(state_machines.get_wf_activation()); - - assert_eq!(commands.len(), 1); - assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); - let commands = t - .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) - .unwrap(); - dbg!(state_machines.get_wf_activation()); - - assert_eq!(commands.len(), 1); - assert_eq!( - commands[0].command_type, - CommandType::CompleteWorkflowExecution as i32 - ); - } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index b6beebb63..1f15305f1 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -461,7 +461,7 @@ impl WorkflowMachines { } } -fn uuid_to_randomness_seed(run_id: &str) -> i64 { +pub(super) fn uuid_to_randomness_seed(run_id: &str) -> i64 { let uuid = Uuid::parse_str(run_id).unwrap(); let fields = uuid.as_fields(); let b0 = fields.0 as i64; From 741228a94b7f34171032e6c0d2ed1308dbb8059b Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Wed, 17 Feb 2021 19:01:26 -0800 Subject: [PATCH 12/15] Removed unwrap from Uuid.parse_str --- src/machines/workflow_machines.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1f15305f1..64d21b8d1 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -301,7 +301,7 @@ impl WorkflowMachines { arguments: attrs.input.clone(), randomness_seed: uuid_to_randomness_seed( &attrs.original_execution_run_id, - ), + )?, } .into(), ); @@ -424,7 +424,7 @@ impl WorkflowMachines { self.outgoing_wf_activation_jobs.push_back( wf_activation_job::Attributes::RandomSeedUpdated( RandomSeedUpdatedAttributes { - randomness_seed: uuid_to_randomness_seed(&new_run_id), + randomness_seed: uuid_to_randomness_seed(&new_run_id)?, }, ), ); @@ -461,12 +461,12 @@ impl WorkflowMachines { } } -pub(super) fn uuid_to_randomness_seed(run_id: &str) -> i64 { - let uuid = Uuid::parse_str(run_id).unwrap(); +pub(super) fn uuid_to_randomness_seed(run_id: &str) -> Result { + let uuid = Uuid::parse_str(run_id).map_err(|e| WFMachinesError::Underlying(e.into()))?; let fields = uuid.as_fields(); let b0 = fields.0 as i64; let b32 = fields.1 as i64 >> 32; let b48 = fields.2 as i64 >> 48; let seed: i64 = b0 + b32 + b48; - seed + Ok(seed) } From 82a604f6074fb4c5d99ce1da0773f9c2b0ddae39 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Wed, 17 Feb 2021 22:07:57 -0800 Subject: [PATCH 13/15] cleanup and dead code removal --- src/lib.rs | 13 +++--- src/machines/timer_state_machine.rs | 70 ++--------------------------- src/machines/workflow_machines.rs | 2 +- 3 files changed, 11 insertions(+), 74 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 930075a09..5781aed6d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -344,7 +344,7 @@ mod test { #[test] fn timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; - let run_id = "58CE65AC-1C8C-413C-B525-6D3C5EAD62D7"; + let run_id = "fake_run_id"; let timer_id = "fake_timer".to_string(); let task_queue = "test-task-queue"; @@ -412,7 +412,7 @@ mod test { #[test] fn parallel_timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; - let run_id = "9895E53E-D0A2-4E8A-A478-E6221ED2ACEB"; + let run_id = "fake_run_id"; let timer_1_id = "timer1".to_string(); let timer_2_id = "timer2".to_string(); let task_queue = "test-task-queue"; @@ -513,7 +513,7 @@ mod test { let _enter = s.enter(); let wfid = "fake_wf_id"; - let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; + let run_id = "fake_run_id"; let timer_1_id = "timer1".to_string(); let task_queue = "test-task-queue"; @@ -568,15 +568,14 @@ mod test { .unwrap(); } - const NEW_RUN_ID: &'static str = "86E39A5F-AE31-4626-BDFE-398EE072D156"; - #[test] - fn workflow_reset_whole_replay_test_across_wf_bridge() { + fn workflow_update_random_seed_on_workflow_reset() { let s = span!(Level::DEBUG, "Test start", t = "bridge"); let _enter = s.enter(); let wfid = "fake_wf_id"; let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; + let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; let timer_1_id = "timer1".to_string(); let task_queue = "test-task-queue"; @@ -605,7 +604,7 @@ mod test { }), ); t.add_workflow_task_scheduled_and_started(); - t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, NEW_RUN_ID); + t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, original_run_id); t.add_workflow_task_scheduled_and_started(); diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 601eef669..75e169dc1 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -246,7 +246,6 @@ mod test { }, protos::temporal::api::{ command::v1::CompleteWorkflowExecutionCommandAttributes, - enums::v1::WorkflowTaskFailedCause, history::v1::{ TimerFiredEventAttributes, WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, @@ -272,8 +271,8 @@ mod test { 8: EVENT_TYPE_WORKFLOW_TASK_STARTED We have two versions of this test, one which processes the history in two calls, - and one which replays all of it in one go. Both versions must produce the same number - of activations. + and one which replays all of it in one go. Both versions must produce the same + two activations. */ let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let timer = StartTimerCommandAttributes { @@ -287,11 +286,8 @@ mod test { }); let mut t = TestHistoryBuilder::default(); - let mut state_machines = WorkflowMachines::new( - "wfid".to_string(), - "54619485-18AC-4970-9BF2-D24F875F7816".to_string(), - Box::new(twd), - ); + let mut state_machines = + WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); t.add_by_type(EventType::WorkflowExecutionStarted); t.add_workflow_task(); @@ -348,62 +344,4 @@ mod test { CommandType::CompleteWorkflowExecution as i32 ); } - - const NEW_RUN_ID: &'static str = "86E39A5F-AE31-4626-BDFE-398EE072D156"; - - #[fixture] - fn workflow_reset_hist() -> (TestHistoryBuilder, WorkflowMachines) { - /* - 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED - 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 3: EVENT_TYPE_WORKFLOW_TASK_STARTED - 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED - 5: EVENT_TYPE_TIMER_STARTED - 6: EVENT_TYPE_TIMER_FIRED - 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 8: EVENT_TYPE_WORKFLOW_TASK_STARTED - 9: EVENT_TYPE_WORKFLOW_TASK_FAILED - 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED - 11: EVENT_TYPE_WORKFLOW_TASK_STARTED - - We have two versions of this test, one which processes the history in two calls, - and one which replays all of it in one go. Both versions must produce the same number - of activations. - */ - let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { - let timer = StartTimerCommandAttributes { - timer_id: "Sometimer".to_string(), - start_to_fire_timeout: Some(Duration::from_secs(5).into()), - }; - command_sink.timer(timer).await; - - let complete = CompleteWorkflowExecutionCommandAttributes::default(); - command_sink.send(complete.into()); - }); - - let mut t = TestHistoryBuilder::default(); - let mut state_machines = WorkflowMachines::new( - "wfid".to_string(), - "54619485-18AC-4970-9BF2-D24F875F7816".to_string(), - Box::new(twd), - ); - - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_workflow_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: "timer1".to_string(), - }), - ); - t.add_workflow_task_scheduled_and_started(); - t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, NEW_RUN_ID); - - t.add_workflow_task_scheduled_and_started(); - - assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); - (t, state_machines) - } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 64d21b8d1..ea5337a86 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -461,7 +461,7 @@ impl WorkflowMachines { } } -pub(super) fn uuid_to_randomness_seed(run_id: &str) -> Result { +fn uuid_to_randomness_seed(run_id: &str) -> Result { let uuid = Uuid::parse_str(run_id).map_err(|e| WFMachinesError::Underlying(e.into()))?; let fields = uuid.as_fields(); let b0 = fields.0 as i64; From bfaf3b97b35383cafee4a16946753773bdb0c6ba Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Tue, 23 Feb 2021 13:12:40 -0800 Subject: [PATCH 14/15] Error mapping --- src/machines/workflow_machines.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 381c8b0c3..cb3680e0b 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::machines::workflow_machines::WFMachinesError::MalformedEvent; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -301,7 +302,16 @@ impl WorkflowMachines { arguments: attrs.input.clone(), randomness_seed: uuid_to_randomness_seed( &attrs.original_execution_run_id, - )?, + ) + .map_err(|e| { + MalformedEvent( + event.clone(), + format!( + "Bad uuid in run id: {}", + attrs.original_execution_run_id + ), + ) + })?, } .into(), ); @@ -424,7 +434,14 @@ impl WorkflowMachines { self.outgoing_wf_activation_jobs.push_back( wf_activation_job::Attributes::RandomSeedUpdated( RandomSeedUpdatedAttributes { - randomness_seed: uuid_to_randomness_seed(&new_run_id)?, + randomness_seed: uuid_to_randomness_seed(&new_run_id).map_err( + |e| { + MalformedEvent( + event.clone(), + format!("Bad uuid in run id: {}", new_run_id), + ) + }, + )?, }, ), ); @@ -461,8 +478,8 @@ impl WorkflowMachines { } } -fn uuid_to_randomness_seed(run_id: &str) -> Result { - let uuid = Uuid::parse_str(run_id).map_err(|e| WFMachinesError::Underlying(e.into()))?; +fn uuid_to_randomness_seed(run_id: &str) -> std::result::Result { + let uuid = Uuid::parse_str(run_id)?; let fields = uuid.as_fields(); let b0 = fields.0 as i64; let b32 = fields.1 as i64 >> 32; From 87546d51e41550b30865f0cd448b4ab86ee27167 Mon Sep 17 00:00:00 2001 From: Maxim Fateev Date: Wed, 24 Feb 2021 11:29:31 -0800 Subject: [PATCH 15/15] changed uuid_to_randomness_seed to str_to_randomness_seed --- protos/local/core_interface.proto | 4 ++-- src/lib.rs | 2 +- src/machines/workflow_machines.rs | 37 +++++++++---------------------- 3 files changed, 13 insertions(+), 30 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index e3937c589..14595a860 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -80,7 +80,7 @@ message StartWorkflowTaskAttributes { temporal.api.common.v1.Payloads arguments = 3; // The seed must be used to initialize the random generator used by SDK. // RandomSeedUpdatedAttributes are used to deliver seed updates. - int64 randomness_seed = 4; + uint64 randomness_seed = 4; // TODO: Do we need namespace here, or should that just be fetchable easily? // will be others - workflow exe started attrs, etc @@ -95,7 +95,7 @@ message TimerFiredTaskAttributes { } message RandomSeedUpdatedAttributes { - int64 randomness_seed = 1; + uint64 randomness_seed = 1; } message QueryWorkflowJob { diff --git a/src/lib.rs b/src/lib.rs index d7c4d12d2..b4214c477 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -589,7 +589,7 @@ mod test { let core = build_fake_core(wfid, run_id, &mut t, &[2]); let res = core.poll_task(task_queue).unwrap(); - let randomness_seed_from_start: i64; + let randomness_seed_from_start: u64; assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index cb3680e0b..87b2f20d3 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -24,7 +24,10 @@ use rustfsm::StateMachine; use std::{ borrow::BorrowMut, cell::RefCell, + collections::hash_map::DefaultHasher, collections::{HashMap, HashSet, VecDeque}, + hash::Hash, + hash::Hasher, ops::DerefMut, sync::{atomic::AtomicBool, Arc}, time::SystemTime, @@ -300,18 +303,9 @@ impl WorkflowMachines { .unwrap_or_default(), workflow_id: self.workflow_id.clone(), arguments: attrs.input.clone(), - randomness_seed: uuid_to_randomness_seed( + randomness_seed: str_to_randomness_seed( &attrs.original_execution_run_id, - ) - .map_err(|e| { - MalformedEvent( - event.clone(), - format!( - "Bad uuid in run id: {}", - attrs.original_execution_run_id - ), - ) - })?, + ), } .into(), ); @@ -434,14 +428,7 @@ impl WorkflowMachines { self.outgoing_wf_activation_jobs.push_back( wf_activation_job::Attributes::RandomSeedUpdated( RandomSeedUpdatedAttributes { - randomness_seed: uuid_to_randomness_seed(&new_run_id).map_err( - |e| { - MalformedEvent( - event.clone(), - format!("Bad uuid in run id: {}", new_run_id), - ) - }, - )?, + randomness_seed: str_to_randomness_seed(&new_run_id), }, ), ); @@ -478,12 +465,8 @@ impl WorkflowMachines { } } -fn uuid_to_randomness_seed(run_id: &str) -> std::result::Result { - let uuid = Uuid::parse_str(run_id)?; - let fields = uuid.as_fields(); - let b0 = fields.0 as i64; - let b32 = fields.1 as i64 >> 32; - let b48 = fields.2 as i64 >> 48; - let seed: i64 = b0 + b32 + b48; - Ok(seed) +fn str_to_randomness_seed(run_id: &str) -> u64 { + let mut s = DefaultHasher::new(); + run_id.hash(&mut s); + s.finish() }