diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index eeaed89ae..2210f3e54 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -1482,7 +1482,7 @@ impl WorkflowMachines { fn get_machine_key(&self, id: CommandID) -> Result { Ok(*self.id_to_machine.get(&id).ok_or_else(|| { - WFMachinesError::Fatal(format!("Missing associated machine for {id:?}")) + WFMachinesError::Nondeterminism(format!("Missing associated machine for {id:?}")) })?) } diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 311b4960d..4b4f87010 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -373,6 +373,7 @@ impl ManagedRun { mut commands: Vec, used_flags: Vec, resp_chan: Option>, + is_forced_failure: bool, ) -> Result> { let activation_was_only_eviction = self.activation_is_eviction(); let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() { @@ -446,6 +447,7 @@ impl ManagedRun { query_responses, used_flags, resp_chan, + is_forced_failure, }; // Verify we can actually apply the next workflow task, which will happen as part of @@ -617,6 +619,7 @@ impl ManagedRun { }], vec![], resp_chan, + true, ) .unwrap_or_else(|e| { dbg_panic!("Got next page request when auto-failing workflow: {e:?}"); @@ -686,6 +689,7 @@ impl ManagedRun { query_responses: completion.query_responses, has_pending_query: completion.has_pending_query, activation_was_eviction: completion.activation_was_eviction, + is_forced_failure: completion.is_forced_failure, }; self.wfm.machines.add_lang_used_flags(completion.used_flags); @@ -708,7 +712,8 @@ impl ManagedRun { self.wfm.feed_history_from_new_page(update)?; } // Don't bother applying the next task if we're evicting at the end of this activation - if !completion.activation_was_eviction { + // or are otherwise broken. + if !completion.activation_was_eviction && !self.am_broken { self.wfm.apply_next_task_if_ready()?; } let new_local_acts = self.wfm.drain_queued_local_activities(); @@ -1083,7 +1088,7 @@ impl ManagedRun { // fulfilling a query. If the activation we sent was *only* an eviction, don't send that // either. let should_respond = !(machines_wft_response.has_pending_jobs - || machines_wft_response.replaying + || (machines_wft_response.replaying && !data.is_forced_failure) || is_query_playback || data.activation_was_eviction || machines_wft_response.have_seen_terminal_event); @@ -1331,6 +1336,7 @@ struct CompletionDataForWFT { query_responses: Vec, has_pending_query: bool, activation_was_eviction: bool, + is_forced_failure: bool, } /// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data @@ -1405,13 +1411,11 @@ impl WorkflowManager { self.machines.ready_to_apply_next_wft() } - /// If there are no pending jobs for the workflow, apply the next workflow task and check - /// again if there are any jobs. Importantly, does not *drain* jobs. - /// - /// Returns true if there are jobs (before or after applying the next WFT). - fn apply_next_task_if_ready(&mut self) -> Result { + /// If there are no pending jobs for the workflow apply the next workflow task and check again + /// if there are any jobs. Importantly, does not *drain* jobs. + fn apply_next_task_if_ready(&mut self) -> Result<()> { if self.machines.has_pending_jobs() { - return Ok(true); + return Ok(()); } loop { let consumed_events = self.machines.apply_next_wft_from_history()?; @@ -1423,7 +1427,7 @@ impl WorkflowManager { break; } } - Ok(self.machines.has_pending_jobs()) + Ok(()) } /// Must be called when we're ready to respond to a WFT after handling catching up on replay @@ -1473,6 +1477,7 @@ struct RunActivationCompletion { has_pending_query: bool, query_responses: Vec, used_flags: Vec, + is_forced_failure: bool, /// Used to notify the worker when the completion is done processing and the completion can /// unblock. Must always be `Some` when initialized. resp_chan: Option>, diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index a1babfdfb..50afaa1c5 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -264,7 +264,12 @@ impl WFStream { commands, used_flags, .. - } => match rh.successful_completion(commands, used_flags, complete.response_tx) { + } => match rh.successful_completion( + commands, + used_flags, + complete.response_tx, + false, + ) { Ok(acts) => acts, Err(npr) => { self.runs_needing_fetching diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index df173847c..dfbe441b6 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -23,8 +23,11 @@ use std::{ sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; -use temporal_client::{WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{interceptors::WorkerInterceptor, ActivityOptions, WfContext, WorkflowResult}; +use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions}; +use temporal_sdk::{ + interceptors::WorkerInterceptor, ActivityOptions, LocalActivityOptions, WfContext, + WorkflowResult, +}; use temporal_sdk_core::{replay::HistoryForReplay, CoreRuntime}; use temporal_sdk_core_api::errors::{PollWfError, WorkflowErrorType}; use temporal_sdk_core_protos::{ @@ -46,9 +49,8 @@ use temporal_sdk_core_test_utils::{ drain_pollers_and_shutdown, history_from_proto_binary, init_core_and_create_wf, init_core_replay_preloaded, schedule_activity_cmd, CoreWfStarter, WorkerTestHelpers, }; -use tokio::{join, time::sleep}; +use tokio::{join, sync::Notify, time::sleep}; use uuid::Uuid; - // TODO: We should get expected histories for these tests and confirm that the history at the end // matches. @@ -769,3 +771,96 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( ); assert!(body.contains(&match_this)); } + +#[tokio::test] +async fn history_out_of_order_on_restart() { + let wf_name = "history_out_of_order_on_restart"; + let mut starter = CoreWfStarter::new(wf_name); + starter + .worker_config + .workflow_failure_errors([WorkflowErrorType::Nondeterminism]); + let mut worker = starter.worker().await; + let mut starter2 = starter.clone_no_worker(); + let mut worker2 = starter2.worker().await; + + static HIT_SLEEP: Notify = Notify::const_new(); + + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + // Interrupt this sleep on first go + HIT_SLEEP.notify_one(); + ctx.timer(Duration::from_secs(5)).await; + Ok(().into()) + }); + worker.register_activity("echo", echo); + + worker2.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + // Timer is added after restarting workflow + ctx.timer(Duration::from_secs(1)).await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + ctx.timer(Duration::from_secs(2)).await; + Ok(().into()) + }); + worker2.register_activity("echo", echo); + worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions { + execution_timeout: Some(Duration::from_secs(20)), + ..Default::default() + }, + ) + .await + .unwrap(); + + let w1 = async { + worker.run_until_done().await.unwrap(); + }; + let w2 = async { + // wait to hit sleep + HIT_SLEEP.notified().await; + starter.shutdown().await; + // start new worker + worker2.expect_workflow_completion(wf_name, None); + worker2.run_until_done().await.unwrap(); + }; + join!(w1, w2); + // The workflow should fail with the nondeterminism error + let handle = starter + .get_client() + .await + .get_untyped_workflow_handle(wf_name, ""); + let res = handle + .get_workflow_result(Default::default()) + .await + .unwrap(); + assert_matches!(res, WorkflowExecutionResult::Failed(_)); +}