diff --git a/src/lib.rs b/src/lib.rs index 093995314..284c23417 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -333,10 +333,8 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; - use crate::machines::test_help::TestHistoryBuilder; - use crate::protos::temporal::api::enums::v1::EventType; use crate::{ - machines::test_help::{build_fake_core, FakeCore}, + machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ coresdk::{ wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, @@ -346,6 +344,7 @@ mod test { CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, + temporal::api::enums::v1::EventType, }, test_help::canned_histories, }; diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index e81743a10..46c22b8bf 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -135,7 +135,7 @@ impl TestHistoryBuilder { to_wf_task_num: Option, ) -> Result<()> { let histinfo = HistoryInfo::new_from_events(&self.events, to_wf_task_num)?; - histinfo.apply_history_events(wf_machines)?; + wf_machines.apply_history_events(&histinfo)?; Ok(()) } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 842326a4a..636697b90 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -43,7 +43,7 @@ pub(crate) fn build_fake_core( let responses: Vec<_> = response_batches .iter() .map(|to_task_num| { - let batch = t.get_history_info(*to_task_num).unwrap().events; + let batch = t.get_history_info(*to_task_num).unwrap().events().to_vec(); let task_token: [u8; 16] = thread_rng().gen(); PollWorkflowTaskQueueResponse { history: Some(History { events: batch }), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index fbeb15217..1223b6875 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -12,6 +12,7 @@ use crate::{ history::v1::{history_event, HistoryEvent}, }, }, + protosext::HistoryInfo, }; use slotmap::SlotMap; use std::{ @@ -426,6 +427,43 @@ impl WorkflowMachines { Ok(()) } + /// Apply events from history to this machines instance + pub(crate) fn apply_history_events(&mut self, history_info: &HistoryInfo) -> Result<()> { + let (_, events) = history_info + .events() + .split_at(self.get_last_started_event_id() as usize); + let mut history = events.iter().peekable(); + + self.set_started_ids( + history_info.previous_started_event_id, + history_info.workflow_task_started_event_id, + ); + + // HistoryInfo's constructor enforces some rules about the structure of history that + // could be enforced here, but needn't be because they have already been guaranteed by it. + // See the errors that can be returned from [HistoryInfo::new_from_events] for detail. + + while let Some(event) = history.next() { + let next_event = history.peek(); + + if event.event_type == EventType::WorkflowTaskStarted as i32 && next_event.is_none() { + self.handle_event(event, false)?; + return Ok(()); + } + + self.handle_event(event, next_event.is_some())?; + + if next_event.is_none() { + if event.is_final_wf_execution_event() { + return Ok(()); + } + unreachable!() + } + } + + Ok(()) + } + /// Wrapper for calling [TemporalStateMachine::handle_event] which appropriately takes action /// on the returned machine responses fn submachine_handle_event( diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs index 7d831af0d..657a75a21 100644 --- a/src/protosext/history_info.rs +++ b/src/protosext/history_info.rs @@ -1,14 +1,15 @@ use crate::{ - machines::WorkflowMachines, protos::temporal::api::enums::v1::EventType, protos::temporal::api::history::v1::{History, HistoryEvent}, }; -#[derive(Clone, Debug, derive_more::Constructor, PartialEq)] +#[derive(Clone, Debug, PartialEq)] pub(crate) struct HistoryInfo { pub previous_started_event_id: i64, pub workflow_task_started_event_id: i64, - pub events: Vec, + // This needs to stay private so the struct can't be instantiated outside of the constructor, + // which enforces some invariants regarding history structure that need to be upheld. + events: Vec, } type Result = std::result::Result; @@ -101,65 +102,8 @@ impl HistoryInfo { Self::new_from_events(&h.events, to_wf_task_num) } - /// Apply events from history to workflow machines. Remember that only the events that exist - /// in this instance will be applied, which is determined by `to_wf_task_num` passed into the - /// constructor. - pub(crate) fn apply_history_events(&self, wf_machines: &mut WorkflowMachines) -> Result<()> { - let (_, events) = self - .events - .split_at(wf_machines.get_last_started_event_id() as usize); - let mut history = events.iter().peekable(); - - wf_machines.set_started_ids( - self.previous_started_event_id, - self.workflow_task_started_event_id, - ); - let mut started_id = self.previous_started_event_id; - - while let Some(event) = history.next() { - let next_event = history.peek(); - - if event.event_type == EventType::WorkflowTaskStarted as i32 { - let next_is_completed = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskCompleted as i32 - }); - let next_is_failed_or_timeout = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskFailed as i32 - || ne.event_type == EventType::WorkflowTaskTimedOut as i32 - }); - - if next_event.is_none() || next_is_completed { - started_id = event.event_id; - if next_event.is_none() { - wf_machines - .handle_event(event, false) - .map_err(anyhow::Error::from)?; - return Ok(()); - } - } else if next_event.is_some() && !next_is_failed_or_timeout { - return Err(HistoryInfoError::FailedOrTimeout(event.clone())); - } - } - - wf_machines - .handle_event(event, next_event.is_some()) - .map_err(anyhow::Error::from)?; - - if next_event.is_none() { - if event.is_final_wf_execution_event() { - return Ok(()); - } - if started_id != event.event_id { - return Err(HistoryInfoError::UnexpectedEventId { - previous_started_event_id: started_id, - workflow_task_started_event_id: event.event_id, - }); - } - unreachable!() - } - } - - Ok(()) + pub(crate) fn events(&self) -> &[HistoryEvent] { + &self.events } } diff --git a/src/workflow/concurrency_manager.rs b/src/workflow/concurrency_manager.rs index c9a40f467..041afaacb 100644 --- a/src/workflow/concurrency_manager.rs +++ b/src/workflow/concurrency_manager.rs @@ -265,7 +265,7 @@ mod tests { "some_run_id", PollWorkflowTaskQueueResponse { history: Some(History { - events: t.get_history_info(1).unwrap().events, + events: t.get_history_info(1).unwrap().events().to_vec(), }), workflow_execution: Some(WorkflowExecution { workflow_id: "wid".to_string(), diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 40cc3fb55..52cacd017 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -109,16 +109,16 @@ pub(crate) struct NextWfActivation { impl WorkflowManager { /// Given history that was just obtained from the server, pipe it into this workflow's machines. /// - /// Should only be called when a workflow has caught up on replay. It will return a workflow - /// activation if one is needed, as well as a bool indicating if there are more workflow tasks - /// that need to be performed to replay the remaining history. + /// Should only be called when a workflow has caught up on replay (or is just beginning). It + /// will return a workflow activation if one is needed, as well as a bool indicating if there + /// are more workflow tasks that need to be performed to replay the remaining history. #[instrument(skip(self))] pub fn feed_history_from_server(&mut self, hist: History) -> Result { let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?; let task_ct = hist.get_workflow_task_count(None)?; self.last_history_task_count = task_ct; self.last_history_from_server = hist; - task_hist.apply_history_events(&mut self.machines)?; + self.machines.apply_history_events(&task_hist)?; let activation = self.machines.get_wf_activation(); let more_activations_needed = task_ct > self.current_wf_task_num; @@ -137,7 +137,7 @@ impl WorkflowManager { pub fn get_next_activation(&mut self) -> Result { let hist = &self.last_history_from_server; let task_hist = HistoryInfo::new_from_history(hist, Some(self.current_wf_task_num))?; - task_hist.apply_history_events(&mut self.machines)?; + self.machines.apply_history_events(&task_hist)?; let activation = self.machines.get_wf_activation(); self.current_wf_task_num += 1; @@ -149,3 +149,29 @@ impl WorkflowManager { }) } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + protos::temporal::api::common::v1::WorkflowExecution, test_help::canned_histories, + }; + use rand::{thread_rng, Rng}; + + #[test] + fn full_history_application() { + let t = canned_histories::single_timer("fake_timer"); + let task_token: [u8; 16] = thread_rng().gen(); + let pwtqr = PollWorkflowTaskQueueResponse { + history: Some(t.as_history()), + workflow_execution: Some(WorkflowExecution { + workflow_id: "wfid".to_string(), + run_id: "runid".to_string(), + }), + task_token: task_token.to_vec(), + ..Default::default() + }; + let mut wfm = WorkflowManager::new(pwtqr).unwrap(); + wfm.get_next_activation().unwrap(); + } +}