From 6884bccbbd6dcf12055540e237f5e5675c05224d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Mar 2021 11:23:37 -0800 Subject: [PATCH 1/6] Shutdown interrupts poll initial impl --- src/lib.rs | 42 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d872da5fb..5f309f0b0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -38,6 +38,7 @@ use crate::{ }; use crossbeam::queue::SegQueue; use dashmap::DashMap; +use std::time::Duration; use std::{ convert::TryInto, fmt::Debug, @@ -171,9 +172,8 @@ where } // This will block forever in the event there is no work from the server - let work = self - .runtime - .block_on(self.server_gateway.poll_workflow_task(task_queue))?; + // TODO: Trap shutting down and recurse or goto top of fn + let work = self.poll_server(task_queue)?; let task_token = work.task_token.clone(); event!( Level::DEBUG, @@ -292,6 +292,28 @@ impl CoreSDK { })?; Ok(()) } + + /// Blocks polling the server until it responds, or until the shutdown flag is set (aborting + /// the poll) + fn poll_server(&self, task_queue: &str) -> Result { + self.runtime.block_on(async { + let shutdownfut = async { + loop { + if self.shutdown_requested.load(Ordering::Relaxed) { + break; + } + tokio::time::sleep(Duration::from_millis(100)).await; + } + }; + let pollfut = self.server_gateway.poll_workflow_task(task_queue); + tokio::select! { + _ = shutdownfut => { + Err(CoreError::ShuttingDown) + } + r = pollfut => r + } + }) + } } /// The error type returned by interactions with [Core] @@ -312,7 +334,7 @@ pub enum CoreError { UninterpretableCommand(#[from] InconvertibleCommandError), /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), - /// Underlying error in state machines + /// Underlying error in state machines: {0:?} UnderlyingMachinesError(#[from] WFMachinesError), /// Task token had nothing associated with it: {0:?} NothingFoundForTaskToken(Vec), @@ -535,6 +557,18 @@ mod test { ); } + #[rstest(single_timer_setup(&[1]))] + fn shutdown_aborts_actively_blocked_poll(single_timer_setup: FakeCore) { + let res = single_timer_setup.poll_task(TASK_Q).unwrap(); + assert_eq!(res.get_wf_jobs().len(), 1); + + single_timer_setup.shutdown().unwrap(); + assert_matches!( + single_timer_setup.poll_task(TASK_Q).unwrap_err(), + CoreError::ShuttingDown + ); + } + #[test] fn workflow_update_random_seed_on_workflow_reset() { let s = span!(Level::DEBUG, "Test start", t = "bridge"); From 386ed977e0b5969995f4c7e5e5c2849842a71609 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Mar 2021 14:56:41 -0800 Subject: [PATCH 2/6] Use integ test to test this. Unit test too ugly to make work. --- src/lib.rs | 27 +++++++-------------------- src/machines/test_help/mod.rs | 11 +++++++++-- tests/integ_tests/simple_wf_tests.rs | 19 ++++++++++++++++++- 3 files changed, 34 insertions(+), 23 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5f309f0b0..f0152f7f6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,9 +23,8 @@ mod test_help; pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; -use crate::machines::WFMachinesError; use crate::{ - machines::{InconvertibleCommandError, WFCommand}, + machines::{InconvertibleCommandError, WFCommand, WFMachinesError}, protos::{ coresdk::{ task_completion, wf_activation_completion::Status, Task, TaskCompletion, @@ -118,10 +117,7 @@ pub enum TaskQueue { _Activity(String), } -struct CoreSDK -where - WP: ServerGatewayApis + 'static, -{ +struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs server_gateway: Arc, @@ -146,7 +142,7 @@ struct PendingActivation { impl Core for CoreSDK where - WP: ServerGatewayApis + Send + Sync, + WP: ServerGatewayApis + Send + Sync + 'static, { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { @@ -354,7 +350,9 @@ pub enum CoreError { mod test { use super::*; use crate::{ - machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, + machines::test_help::{ + build_fake_core, fake_core_from_mock, pending_mock_poller, FakeCore, TestHistoryBuilder, + }, protos::{ coresdk::{ wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, @@ -368,6 +366,7 @@ mod test { }, test_help::canned_histories, }; + use futures::FutureExt; use rstest::{fixture, rstest}; const TASK_Q: &str = "test-task-queue"; @@ -557,18 +556,6 @@ mod test { ); } - #[rstest(single_timer_setup(&[1]))] - fn shutdown_aborts_actively_blocked_poll(single_timer_setup: FakeCore) { - let res = single_timer_setup.poll_task(TASK_Q).unwrap(); - assert_eq!(res.get_wf_jobs().len(), 1); - - single_timer_setup.shutdown().unwrap(); - assert_matches!( - single_timer_setup.poll_task(TASK_Q).unwrap_err(), - CoreError::ShuttingDown - ); - } - #[test] fn workflow_update_random_seed_on_workflow_reset() { let s = span!(Level::DEBUG, "Test start", t = "bridge"); diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 636697b90..5b0804b86 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -6,7 +6,7 @@ mod workflow_driver; pub(crate) use history_builder::TestHistoryBuilder; pub(super) use workflow_driver::{CommandSender, TestWorkflowDriver}; -use crate::workflow::WorkflowConcurrencyManager; +use crate::workflow::{PollWorkflowTaskQueueApi, WorkflowConcurrencyManager}; use crate::{ pollers::MockServerGateway, protos::temporal::api::common::v1::WorkflowExecution, @@ -14,7 +14,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, }, - CoreSDK, + CoreSDK, ServerGatewayApis, }; use rand::{thread_rng, Rng}; use std::sync::atomic::AtomicBool; @@ -65,6 +65,13 @@ pub(crate) fn build_fake_core( .expect_complete_workflow_task() .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); + fake_core_from_mock(mock_gateway) +} + +pub(crate) fn fake_core_from_mock(mock_gateway: MT) -> CoreSDK +where + MT: ServerGatewayApis, +{ let runtime = Runtime::new().unwrap(); CoreSDK { runtime, diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 6de090128..ce61c1fc4 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -20,7 +20,7 @@ use temporal_sdk_core::{ StartTimerCommandAttributes, }, }, - Core, CoreInitOptions, ServerGatewayOptions, Url, + Core, CoreError, CoreInitOptions, ServerGatewayOptions, Url, }; // TODO: These tests can get broken permanently if they break one time and the server is not @@ -317,3 +317,20 @@ fn parallel_workflows_same_queue() { handles.into_iter().for_each(|h| h.join().unwrap()); } + +// Ideally this would be a unit test, but returning a pending future with mockall bloats the mock +// code a bunch and just isn't worth it. Do it when https://github.com/asomers/mockall/issues/189 is +// fixed. +#[test] +fn shutdown_aborts_actively_blocked_poll() { + let task_q = "shutdown_aborts_actively_blocked_poll"; + let core = Arc::new(get_integ_core()); + // Begin the poll, and request shutdown from another thread after a small period of time. + let tcore = core.clone(); + let handle = std::thread::spawn(move || { + std::thread::sleep(Duration::from_millis(100)); + tcore.shutdown().unwrap(); + }); + assert_matches!(core.poll_task(task_q).unwrap_err(), CoreError::ShuttingDown); + handle.join().unwrap(); +} From a307747effe057006e868da0598192a831730952 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Mar 2021 15:57:36 -0800 Subject: [PATCH 3/6] Fix clippy / test compile problem --- src/lib.rs | 5 +---- src/machines/test_help/mod.rs | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f0152f7f6..333fe111a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,9 +350,7 @@ pub enum CoreError { mod test { use super::*; use crate::{ - machines::test_help::{ - build_fake_core, fake_core_from_mock, pending_mock_poller, FakeCore, TestHistoryBuilder, - }, + machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ coresdk::{ wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, @@ -366,7 +364,6 @@ mod test { }, test_help::canned_histories, }; - use futures::FutureExt; use rstest::{fixture, rstest}; const TASK_Q: &str = "test-task-queue"; diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 5b0804b86..f865268a2 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -6,7 +6,7 @@ mod workflow_driver; pub(crate) use history_builder::TestHistoryBuilder; pub(super) use workflow_driver::{CommandSender, TestWorkflowDriver}; -use crate::workflow::{PollWorkflowTaskQueueApi, WorkflowConcurrencyManager}; +use crate::workflow::WorkflowConcurrencyManager; use crate::{ pollers::MockServerGateway, protos::temporal::api::common::v1::WorkflowExecution, From 50ec53cf8ba6a6e8878cc45b86bc68207b72d3ff Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Mar 2021 13:55:14 -0800 Subject: [PATCH 4/6] Fix double-shutdown panic --- src/workflow/concurrency_manager.rs | 7 +++++-- tests/integ_tests/simple_wf_tests.rs | 3 +++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/workflow/concurrency_manager.rs b/src/workflow/concurrency_manager.rs index a356fac83..187305cf3 100644 --- a/src/workflow/concurrency_manager.rs +++ b/src/workflow/concurrency_manager.rs @@ -121,9 +121,12 @@ impl WorkflowConcurrencyManager { /// # Panics /// If the workflow machine thread panicked pub fn shutdown(&self) { + let mut wf_thread = self.wf_thread.lock(); + if wf_thread.is_none() { + return; + } let _ = self.shutdown_chan.send(true); - self.wf_thread - .lock() + wf_thread .take() .unwrap() .join() diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index ce61c1fc4..273f330fa 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -333,4 +333,7 @@ fn shutdown_aborts_actively_blocked_poll() { }); assert_matches!(core.poll_task(task_q).unwrap_err(), CoreError::ShuttingDown); handle.join().unwrap(); + // Ensure double-shutdown doesn't explode + core.shutdown().unwrap(); + assert_matches!(core.poll_task(task_q).unwrap_err(), CoreError::ShuttingDown); } From 268359c040009d14428bf4a1749c5b8a67fd7b2a Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Mar 2021 16:44:48 -0800 Subject: [PATCH 5/6] Ensure we drain remaining pending tasks if shutdown interrupts long poll --- src/lib.rs | 45 +++++++++++++++++++++++++-------------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ca441c435..8cf541516 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -178,28 +178,33 @@ where return Err(CoreError::ShuttingDown); } - // This will block forever in the event there is no work from the server - // TODO: Trap shutting down and recurse or goto top of fn - let work = self.poll_server(task_queue)?; - let task_token = work.task_token.clone(); - debug!( - task_token = %fmt_task_token(&task_token), - "Received workflow task from server" - ); - - let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + // This will block forever (unless interrupted by shutdown) in the event there is no work + // from the server + match self.poll_server(task_queue) { + Ok(work) => { + let task_token = work.task_token.clone(); + debug!( + task_token = %fmt_task_token(&task_token), + "Received workflow task from server" + ); + + let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + + if next_activation.more_activations_needed { + self.pending_activations.push(PendingActivation { + run_id, + task_token: task_token.clone(), + }); + } - if next_activation.more_activations_needed { - self.pending_activations.push(PendingActivation { - run_id, - task_token: task_token.clone(), - }); + Ok(Task { + task_token, + variant: next_activation.activation.map(Into::into), + }) + } + Err(CoreError::ShuttingDown) => self.poll_task(task_queue), + Err(e) => Err(e), } - - Ok(Task { - task_token, - variant: next_activation.activation.map(Into::into), - }) } #[instrument(skip(self))] From 816e4c9b8e459fa95b66014220875dbecda9883d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Mar 2021 16:38:46 -0800 Subject: [PATCH 6/6] Fix overlapping task queue names that somehow never trampled before --- tests/integ_tests/simple_wf_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 296d259fe..a5dd3347c 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -213,7 +213,7 @@ fn timer_cancel_workflow() { #[test] fn timer_immediate_cancel_workflow() { - let task_q = "timer_cancel_workflow"; + let task_q = "timer_immediate_cancel_workflow"; let core = get_integ_core(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen();