From 87246ebb2157edf2282721cd24de48ceb33ce9dc Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 19 Dec 2024 16:12:34 -0800 Subject: [PATCH 1/2] Adds repro test & correct other error that should be nondetermminism --- .../workflow/machines/workflow_machines.rs | 2 +- tests/integ_tests/workflow_tests.rs | 94 ++++++++++++++++++- 2 files changed, 91 insertions(+), 5 deletions(-) diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index eeaed89ae..2210f3e54 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -1482,7 +1482,7 @@ impl WorkflowMachines { fn get_machine_key(&self, id: CommandID) -> Result { Ok(*self.id_to_machine.get(&id).ok_or_else(|| { - WFMachinesError::Fatal(format!("Missing associated machine for {id:?}")) + WFMachinesError::Nondeterminism(format!("Missing associated machine for {id:?}")) })?) } diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index df173847c..b73f1db74 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -20,11 +20,14 @@ use crate::integ_tests::{activity_functions::echo, metrics_tests}; use assert_matches::assert_matches; use std::{ collections::{HashMap, HashSet}, - sync::atomic::{AtomicUsize, Ordering}, + sync::atomic::{AtomicBool, AtomicUsize, Ordering}, time::Duration, }; -use temporal_client::{WorkflowClientTrait, WorkflowOptions}; -use temporal_sdk::{interceptors::WorkerInterceptor, ActivityOptions, WfContext, WorkflowResult}; +use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService}; +use temporal_sdk::{ + interceptors::WorkerInterceptor, ActivityOptions, LocalActivityOptions, WfContext, + WorkflowResult, +}; use temporal_sdk_core::{replay::HistoryForReplay, CoreRuntime}; use temporal_sdk_core_api::errors::{PollWfError, WorkflowErrorType}; use temporal_sdk_core_protos::{ @@ -48,7 +51,6 @@ use temporal_sdk_core_test_utils::{ }; use tokio::{join, time::sleep}; use uuid::Uuid; - // TODO: We should get expected histories for these tests and confirm that the history at the end // matches. @@ -769,3 +771,87 @@ async fn nondeterminism_errors_fail_workflow_when_configured_to( ); assert!(body.contains(&match_this)); } + +#[tokio::test] +async fn history_out_of_order_on_restart() { + let wf_name = "history_out_of_order_on_restart"; + let mut starter = CoreWfStarter::new(wf_name); + starter + .worker_config + .workflow_failure_errors([WorkflowErrorType::Nondeterminism]); + let mut worker = starter.worker().await; + let client = starter.get_client().await; + let mut starter2 = starter.clone_no_worker(); + let mut worker2 = starter2.worker().await; + + static HIT_SLEEP: AtomicBool = AtomicBool::new(false); + + worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + // Interrupt this sleep on first go + HIT_SLEEP.store(true, Ordering::Release); + ctx.timer(Duration::from_secs(5)).await; + Ok(().into()) + }); + worker.register_activity("echo", echo); + + worker2.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { + ctx.local_activity(LocalActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + // Timer is added after restarting workflow + ctx.timer(Duration::from_secs(1)).await; + ctx.activity(ActivityOptions { + activity_type: "echo".to_owned(), + input: "hi".as_json_payload().unwrap(), + start_to_close_timeout: Some(Duration::from_secs(5)), + ..Default::default() + }) + .await; + ctx.timer(Duration::from_secs(5)).await; + Ok(().into()) + }); + worker2.register_activity("echo", echo); + let run_id = worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + + let w1 = async { + worker.run_until_done().await.unwrap(); + }; + let w2 = async { + // wait to hit sleep + while !HIT_SLEEP.load(Ordering::Acquire) { + tokio::time::sleep(Duration::from_millis(100)).await; + } + starter.shutdown().await; + // start new worker + worker2.expect_workflow_completion(wf_name, None); + worker2.run_until_done().await.unwrap(); + }; + join!(w1, w2); + // The workflow should complete because the nondeterminism error should fail the workflow +} From 683cd6bb28352a0ca21f5b1f5b9b1b8571285f02 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 19 Dec 2024 16:46:37 -0800 Subject: [PATCH 2/2] Fix --- core/src/worker/workflow/managed_run.rs | 23 ++++++++------ core/src/worker/workflow/workflow_stream.rs | 7 ++++- tests/integ_tests/workflow_tests.rs | 35 +++++++++++++-------- 3 files changed, 42 insertions(+), 23 deletions(-) diff --git a/core/src/worker/workflow/managed_run.rs b/core/src/worker/workflow/managed_run.rs index 311b4960d..4b4f87010 100644 --- a/core/src/worker/workflow/managed_run.rs +++ b/core/src/worker/workflow/managed_run.rs @@ -373,6 +373,7 @@ impl ManagedRun { mut commands: Vec, used_flags: Vec, resp_chan: Option>, + is_forced_failure: bool, ) -> Result> { let activation_was_only_eviction = self.activation_is_eviction(); let (task_token, has_pending_query, start_time) = if let Some(entry) = self.wft.as_ref() { @@ -446,6 +447,7 @@ impl ManagedRun { query_responses, used_flags, resp_chan, + is_forced_failure, }; // Verify we can actually apply the next workflow task, which will happen as part of @@ -617,6 +619,7 @@ impl ManagedRun { }], vec![], resp_chan, + true, ) .unwrap_or_else(|e| { dbg_panic!("Got next page request when auto-failing workflow: {e:?}"); @@ -686,6 +689,7 @@ impl ManagedRun { query_responses: completion.query_responses, has_pending_query: completion.has_pending_query, activation_was_eviction: completion.activation_was_eviction, + is_forced_failure: completion.is_forced_failure, }; self.wfm.machines.add_lang_used_flags(completion.used_flags); @@ -708,7 +712,8 @@ impl ManagedRun { self.wfm.feed_history_from_new_page(update)?; } // Don't bother applying the next task if we're evicting at the end of this activation - if !completion.activation_was_eviction { + // or are otherwise broken. + if !completion.activation_was_eviction && !self.am_broken { self.wfm.apply_next_task_if_ready()?; } let new_local_acts = self.wfm.drain_queued_local_activities(); @@ -1083,7 +1088,7 @@ impl ManagedRun { // fulfilling a query. If the activation we sent was *only* an eviction, don't send that // either. let should_respond = !(machines_wft_response.has_pending_jobs - || machines_wft_response.replaying + || (machines_wft_response.replaying && !data.is_forced_failure) || is_query_playback || data.activation_was_eviction || machines_wft_response.have_seen_terminal_event); @@ -1331,6 +1336,7 @@ struct CompletionDataForWFT { query_responses: Vec, has_pending_query: bool, activation_was_eviction: bool, + is_forced_failure: bool, } /// Manages an instance of a [WorkflowMachines], which is not thread-safe, as well as other data @@ -1405,13 +1411,11 @@ impl WorkflowManager { self.machines.ready_to_apply_next_wft() } - /// If there are no pending jobs for the workflow, apply the next workflow task and check - /// again if there are any jobs. Importantly, does not *drain* jobs. - /// - /// Returns true if there are jobs (before or after applying the next WFT). - fn apply_next_task_if_ready(&mut self) -> Result { + /// If there are no pending jobs for the workflow apply the next workflow task and check again + /// if there are any jobs. Importantly, does not *drain* jobs. + fn apply_next_task_if_ready(&mut self) -> Result<()> { if self.machines.has_pending_jobs() { - return Ok(true); + return Ok(()); } loop { let consumed_events = self.machines.apply_next_wft_from_history()?; @@ -1423,7 +1427,7 @@ impl WorkflowManager { break; } } - Ok(self.machines.has_pending_jobs()) + Ok(()) } /// Must be called when we're ready to respond to a WFT after handling catching up on replay @@ -1473,6 +1477,7 @@ struct RunActivationCompletion { has_pending_query: bool, query_responses: Vec, used_flags: Vec, + is_forced_failure: bool, /// Used to notify the worker when the completion is done processing and the completion can /// unblock. Must always be `Some` when initialized. resp_chan: Option>, diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index a1babfdfb..50afaa1c5 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -264,7 +264,12 @@ impl WFStream { commands, used_flags, .. - } => match rh.successful_completion(commands, used_flags, complete.response_tx) { + } => match rh.successful_completion( + commands, + used_flags, + complete.response_tx, + false, + ) { Ok(acts) => acts, Err(npr) => { self.runs_needing_fetching diff --git a/tests/integ_tests/workflow_tests.rs b/tests/integ_tests/workflow_tests.rs index b73f1db74..dfbe441b6 100644 --- a/tests/integ_tests/workflow_tests.rs +++ b/tests/integ_tests/workflow_tests.rs @@ -20,10 +20,10 @@ use crate::integ_tests::{activity_functions::echo, metrics_tests}; use assert_matches::assert_matches; use std::{ collections::{HashMap, HashSet}, - sync::atomic::{AtomicBool, AtomicUsize, Ordering}, + sync::atomic::{AtomicUsize, Ordering}, time::Duration, }; -use temporal_client::{WorkflowClientTrait, WorkflowOptions, WorkflowService}; +use temporal_client::{WfClientExt, WorkflowClientTrait, WorkflowExecutionResult, WorkflowOptions}; use temporal_sdk::{ interceptors::WorkerInterceptor, ActivityOptions, LocalActivityOptions, WfContext, WorkflowResult, @@ -49,7 +49,7 @@ use temporal_sdk_core_test_utils::{ drain_pollers_and_shutdown, history_from_proto_binary, init_core_and_create_wf, init_core_replay_preloaded, schedule_activity_cmd, CoreWfStarter, WorkerTestHelpers, }; -use tokio::{join, time::sleep}; +use tokio::{join, sync::Notify, time::sleep}; use uuid::Uuid; // TODO: We should get expected histories for these tests and confirm that the history at the end // matches. @@ -780,11 +780,10 @@ async fn history_out_of_order_on_restart() { .worker_config .workflow_failure_errors([WorkflowErrorType::Nondeterminism]); let mut worker = starter.worker().await; - let client = starter.get_client().await; let mut starter2 = starter.clone_no_worker(); let mut worker2 = starter2.worker().await; - static HIT_SLEEP: AtomicBool = AtomicBool::new(false); + static HIT_SLEEP: Notify = Notify::const_new(); worker.register_wf(wf_name.to_owned(), |ctx: WfContext| async move { ctx.local_activity(LocalActivityOptions { @@ -802,7 +801,7 @@ async fn history_out_of_order_on_restart() { }) .await; // Interrupt this sleep on first go - HIT_SLEEP.store(true, Ordering::Release); + HIT_SLEEP.notify_one(); ctx.timer(Duration::from_secs(5)).await; Ok(().into()) }); @@ -825,16 +824,19 @@ async fn history_out_of_order_on_restart() { ..Default::default() }) .await; - ctx.timer(Duration::from_secs(5)).await; + ctx.timer(Duration::from_secs(2)).await; Ok(().into()) }); worker2.register_activity("echo", echo); - let run_id = worker + worker .submit_wf( wf_name.to_owned(), wf_name.to_owned(), vec![], - WorkflowOptions::default(), + WorkflowOptions { + execution_timeout: Some(Duration::from_secs(20)), + ..Default::default() + }, ) .await .unwrap(); @@ -844,14 +846,21 @@ async fn history_out_of_order_on_restart() { }; let w2 = async { // wait to hit sleep - while !HIT_SLEEP.load(Ordering::Acquire) { - tokio::time::sleep(Duration::from_millis(100)).await; - } + HIT_SLEEP.notified().await; starter.shutdown().await; // start new worker worker2.expect_workflow_completion(wf_name, None); worker2.run_until_done().await.unwrap(); }; join!(w1, w2); - // The workflow should complete because the nondeterminism error should fail the workflow + // The workflow should fail with the nondeterminism error + let handle = starter + .get_client() + .await + .get_untyped_workflow_handle(wf_name, ""); + let res = handle + .get_workflow_result(Default::default()) + .await + .unwrap(); + assert_matches!(res, WorkflowExecutionResult::Failed(_)); }