diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index ceabca17b..14595a860 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -63,9 +63,11 @@ message WFActivationJob { StartWorkflowTaskAttributes start_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed TimerFiredTaskAttributes timer_fired = 2; + // Workflow was reset. The randomness seed has to be updated. + RandomSeedUpdatedAttributes random_seed_updated = 3; - QueryWorkflowJob query_workflow = 3; - CancelWorkflowTaskAttributes cancel_workflow = 4; + QueryWorkflowJob query_workflow = 4; + CancelWorkflowTaskAttributes cancel_workflow = 5; } } @@ -76,6 +78,9 @@ message StartWorkflowTaskAttributes { string workflow_id = 2; // Input to the workflow code temporal.api.common.v1.Payloads arguments = 3; + // The seed must be used to initialize the random generator used by SDK. + // RandomSeedUpdatedAttributes are used to deliver seed updates. + uint64 randomness_seed = 4; // TODO: Do we need namespace here, or should that just be fetchable easily? // will be others - workflow exe started attrs, etc @@ -89,6 +94,10 @@ message TimerFiredTaskAttributes { string timer_id = 1; } +message RandomSeedUpdatedAttributes { + uint64 randomness_seed = 1; +} + message QueryWorkflowJob { temporal.api.query.v1.WorkflowQuery query = 1; } diff --git a/src/lib.rs b/src/lib.rs index 53f1838ea..fbb334386 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -224,9 +224,7 @@ where TaskCompletion { variant: Some(task_completion::Variant::Activity(_)), .. - } => { - unimplemented!() - } + } => unimplemented!(), _ => Err(CoreError::MalformedCompletion(req)), } } @@ -332,13 +330,14 @@ mod test { machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ coresdk::{ - wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, + wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, + TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, }, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, - enums::v1::EventType, + enums::v1::{EventType, WorkflowTaskFailedCause}, history::v1::{history_event, TimerFiredEventAttributes}, }, }, @@ -585,4 +584,93 @@ mod test { CoreError::ShuttingDown ); } + + #[test] + fn workflow_update_random_seed_on_workflow_reset() { + let s = span!(Level::DEBUG, "Test start", t = "bridge"); + let _enter = s.enter(); + + let wfid = "fake_wf_id"; + let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B"; + let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156"; + let timer_1_id = "timer1".to_string(); + let task_queue = "test-task-queue"; + + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_FIRED + 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 8: EVENT_TYPE_WORKFLOW_TASK_STARTED + 9: EVENT_TYPE_WORKFLOW_TASK_FAILED + 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 11: EVENT_TYPE_WORKFLOW_TASK_STARTED + */ + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_1_id.clone(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, original_run_id); + + t.add_workflow_task_scheduled_and_started(); + + // NOTE! What makes this a replay test is the server only responds with *one* batch here. + // So, server is polled once, but lang->core interactions look just like non-replay test. + let core = build_fake_core(wfid, run_id, &mut t, &[2]); + + let res = core.poll_task(task_queue).unwrap(); + let randomness_seed_from_start: u64; + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow( + StartWorkflowTaskAttributes{randomness_seed, ..} + )), + }] => { + randomness_seed_from_start = *randomness_seed; + } + ); + assert!(core.workflow_machines.exists(run_id)); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_1_id, + ..Default::default() + } + .into()], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_),), + }, + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::RandomSeedUpdated(RandomSeedUpdatedAttributes{randomness_seed})), + }] => { + assert_ne!(randomness_seed_from_start, *randomness_seed) + } + ); + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 8a3230790..92ff651f7 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,5 +1,6 @@ use super::Result; -use crate::protos::temporal::api::history::v1::History; +use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; +use crate::protos::temporal::api::history::v1::{History, WorkflowTaskFailedEventAttributes}; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -14,6 +15,7 @@ use crate::{ }; use anyhow::bail; use std::time::SystemTime; +use uuid::Uuid; #[derive(Default, Debug)] pub struct TestHistoryBuilder { @@ -88,6 +90,16 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into()); } + pub fn add_workflow_task_failed(&mut self, cause: WorkflowTaskFailedCause, new_run_id: &str) { + let attrs = WorkflowTaskFailedEventAttributes { + scheduled_event_id: self.workflow_task_scheduled_event_id, + cause: cause.into(), + new_run_id: new_run_id.into(), + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into()); + } + pub fn as_history(&self) -> History { History { events: self.events.clone(), @@ -142,9 +154,11 @@ impl TestHistoryBuilder { fn default_attribs(et: EventType) -> Result { Ok(match et { - EventType::WorkflowExecutionStarted => { - WorkflowExecutionStartedEventAttributes::default().into() + EventType::WorkflowExecutionStarted => WorkflowExecutionStartedEventAttributes { + original_execution_run_id: Uuid::new_v4().to_string(), + ..Default::default() } + .into(), EventType::WorkflowTaskScheduled => WorkflowTaskScheduledEventAttributes::default().into(), EventType::TimerStarted => TimerStartedEventAttributes::default().into(), _ => bail!("Don't know how to construct default attrs for {:?}", et), diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index e77b1d7c3..75e169dc1 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -149,6 +149,7 @@ impl Created { #[derive(Default, Clone)] pub(super) struct CancelTimerCommandCreated {} + impl CancelTimerCommandCreated { pub(super) fn on_command_cancel_timer(self, dat: SharedState) -> TimerMachineTransition { TimerMachineTransition::ok( @@ -163,6 +164,7 @@ pub(super) struct CancelTimerCommandSent {} #[derive(Default, Clone)] pub(super) struct Canceled {} + impl From for Canceled { fn from(_: CancelTimerCommandSent) -> Self { Self::default() @@ -227,9 +229,7 @@ impl WFMachinesAdapter for TimerMachine { timer_id: self.shared_state.timer_attributes.timer_id.clone(), } .into()]), - TimerMachineCommand::AddCommand(_) => { - unreachable!() - } + TimerMachineCommand::AddCommand(_) => unreachable!(), } } } @@ -271,13 +271,8 @@ mod test { 8: EVENT_TYPE_WORKFLOW_TASK_STARTED We have two versions of this test, one which processes the history in two calls, - and one which replays all of it in one go. The former will run the event loop three - times total, and the latter two. - - There are two workflow tasks, so it seems we should only loop two times, but the reason - for the extra iteration in the incremental version is that we need to "wait" for the - timer to fire. In the all-in-one-go test, the timer is created and resolved in the same - task, hence no extra loop. + and one which replays all of it in one go. Both versions must produce the same + two activations. */ let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let timer = StartTimerCommandAttributes { diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 2905934f3..87b2f20d3 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::machines::workflow_machines::WFMachinesError::MalformedEvent; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -6,7 +7,10 @@ use crate::{ }, protos::coresdk::WfActivationJob, protos::{ - coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation}, + coresdk::{ + wf_activation_job, wf_activation_job::Attributes::RandomSeedUpdated, + RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, WfActivation, + }, temporal::api::{ command::v1::StartTimerCommandAttributes, common::v1::WorkflowExecution, @@ -20,12 +24,16 @@ use rustfsm::StateMachine; use std::{ borrow::BorrowMut, cell::RefCell, + collections::hash_map::DefaultHasher, collections::{HashMap, HashSet, VecDeque}, + hash::Hash, + hash::Hasher, ops::DerefMut, sync::{atomic::AtomicBool, Arc}, time::SystemTime, }; use tracing::Level; +use uuid::Uuid; type Result = std::result::Result; @@ -44,7 +52,7 @@ pub(crate) struct WorkflowMachines { replaying: bool, /// Workflow identifier pub workflow_id: String, - /// Identifies the current run and is used as a seed for faux-randomness. + /// Identifies the current run pub run_id: String, /// The current workflow time if it has been established current_wf_time: Option, @@ -76,6 +84,9 @@ pub(super) enum WorkflowTrigger { task_started_event_id: i64, time: SystemTime, }, + UpdateRunIdOnWorkflowReset { + run_id: String, + }, } #[derive(thiserror::Error, Debug)] @@ -292,6 +303,9 @@ impl WorkflowMachines { .unwrap_or_default(), workflow_id: self.workflow_id.clone(), arguments: attrs.input.clone(), + randomness_seed: str_to_randomness_seed( + &attrs.original_execution_run_id, + ), } .into(), ); @@ -410,6 +424,15 @@ impl WorkflowMachines { } => { self.task_started(task_started_event_id, time); } + WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { + self.outgoing_wf_activation_jobs.push_back( + wf_activation_job::Attributes::RandomSeedUpdated( + RandomSeedUpdatedAttributes { + randomness_seed: str_to_randomness_seed(&new_run_id), + }, + ), + ); + } } } Ok(()) @@ -441,3 +464,9 @@ impl WorkflowMachines { } } } + +fn str_to_randomness_seed(run_id: &str) -> u64 { + let mut s = DefaultHasher::new(); + run_id.hash(&mut s); + s.finish() +} diff --git a/src/machines/workflow_task_state_machine.rs b/src/machines/workflow_task_state_machine.rs index 5a2ea6698..408920881 100644 --- a/src/machines/workflow_task_state_machine.rs +++ b/src/machines/workflow_task_state_machine.rs @@ -1,19 +1,22 @@ #![allow(clippy::enum_variant_names)] use crate::machines::workflow_machines::WorkflowTrigger; +use crate::protos::temporal::api::history::v1::history_event::Attributes::WorkflowTaskFailedEventAttributes; use crate::{ machines::{ workflow_machines::{WFMachinesError, WorkflowMachines}, WFMachinesAdapter, }, protos::temporal::api::{ - enums::v1::{CommandType, EventType}, + enums::v1::{CommandType, EventType, WorkflowTaskFailedCause}, history::v1::HistoryEvent, }, }; use rustfsm::{fsm, TransitionResult}; +use std::panic::resume_unwind; use std::{convert::TryFrom, time::SystemTime}; use tracing::Level; +use uuid::Uuid; fsm! { pub(super) name WorkflowTaskMachine; @@ -27,7 +30,7 @@ fsm! { Scheduled --(WorkflowTaskTimedOut) --> TimedOut; Started --(WorkflowTaskCompleted, on_workflow_task_completed) --> Completed; - Started --(WorkflowTaskFailed, on_workflow_task_failed) --> Failed; + Started --(WorkflowTaskFailed(WFTFailedDat), on_workflow_task_failed) --> Failed; Started --(WorkflowTaskTimedOut) --> TimedOut; } @@ -49,6 +52,9 @@ pub(super) enum WFTaskMachineCommand { task_started_event_id: i64, time: SystemTime, }, + RunIdOnWorkflowResetUpdate { + run_id: String, + }, } impl WFMachinesAdapter for WorkflowTaskMachine { @@ -78,6 +84,9 @@ impl WFMachinesAdapter for WorkflowTaskMachine { time, }]) } + WFTaskMachineCommand::RunIdOnWorkflowResetUpdate { run_id } => { + Ok(vec![WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id }]) + } } } } @@ -99,7 +108,29 @@ impl TryFrom for WorkflowTaskMachineEvents { }), Some(EventType::WorkflowTaskTimedOut) => Self::WorkflowTaskTimedOut, Some(EventType::WorkflowTaskCompleted) => Self::WorkflowTaskCompleted, - Some(EventType::WorkflowTaskFailed) => Self::WorkflowTaskFailed, + Some(EventType::WorkflowTaskFailed) => { + if let Some(attributes) = e.attributes { + Self::WorkflowTaskFailed(WFTFailedDat { + new_run_id: match attributes { + WorkflowTaskFailedEventAttributes(a) => { + let cause = WorkflowTaskFailedCause::from_i32(a.cause); + match cause { + Some(WorkflowTaskFailedCause::ResetWorkflow) => { + Some(a.new_run_id) + } + _ => None, + } + } + _ => None, + }, + }) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Workflow task failed is missing attributes".to_string(), + )); + } + } _ => return Err(WFMachinesError::UnexpectedEvent(e)), }) } @@ -134,6 +165,11 @@ pub(super) struct WFTStartedDat { current_time_millis: SystemTime, started_event_id: i64, } + +pub(super) struct WFTFailedDat { + new_run_id: Option, +} + impl Scheduled { pub(super) fn on_workflow_task_started( self, @@ -179,18 +215,27 @@ impl Started { }, ]) } - pub(super) fn on_workflow_task_failed(self) -> WorkflowTaskMachineTransition { - unimplemented!() + pub(super) fn on_workflow_task_failed( + self, + data: WFTFailedDat, + ) -> WorkflowTaskMachineTransition { + let commands = match data.new_run_id { + Some(run_id) => vec![WFTaskMachineCommand::RunIdOnWorkflowResetUpdate { run_id }], + None => vec![], + }; + WorkflowTaskMachineTransition::commands::<_, Completed>(commands) } } #[derive(Default, Clone)] pub(super) struct TimedOut {} + impl From for TimedOut { fn from(_: Scheduled) -> Self { Self::default() } } + impl From for TimedOut { fn from(_: Started) -> Self { Self::default()