From 3d36694d7484bb6278423c45239e659cd2106650 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 15 Mar 2021 16:53:11 -0700 Subject: [PATCH 1/3] This works, but I kinda hate it. Eliminating the timeout would be good. --- Cargo.toml | 2 +- .../test_help/async_workflow_driver.rs | 162 +++++++++++++++++ src/machines/test_help/mod.rs | 4 +- src/machines/test_help/workflow_driver.rs | 171 ------------------ src/machines/timer_state_machine.rs | 67 ++++--- 5 files changed, 196 insertions(+), 210 deletions(-) create mode 100644 src/machines/test_help/async_workflow_driver.rs delete mode 100644 src/machines/test_help/workflow_driver.rs diff --git a/Cargo.toml b/Cargo.toml index 196233379..a4543f56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ prost = "0.7" prost-types = "0.7" slotmap = "1.0" thiserror = "1.0" -tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } +tokio = { version = "1.1", features = ["rt", "rt-multi-thread", "parking_lot"] } tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs new file mode 100644 index 000000000..924817450 --- /dev/null +++ b/src/machines/test_help/async_workflow_driver.rs @@ -0,0 +1,162 @@ +use crate::{ + machines::WFCommand, + protos::{ + coresdk::{wf_activation_job, CancelTimer, FireTimer}, + temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, + }, + workflow::{ActivationListener, WorkflowFetcher}, +}; +use dashmap::DashMap; +use futures::channel::oneshot; +use std::{ + future::Future, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::{self, Receiver, RecvTimeoutError, Sender}, + Arc, + }, + time::Duration, +}; +use tokio::task::{JoinError, JoinHandle}; + +pub struct TestWorkflowDriver { + join_handle: JoinHandle<()>, + commands_from_wf: Receiver, + cache: Arc, + is_done: Arc, +} + +#[derive(Default, Debug)] +pub struct TestWfDriverCache { + /// Holds a mapping of timer id -> oneshot channel to resolve it + timer_futures: DashMap>, +} + +pub struct CommandSender { + chan: Sender, + twd_cache: Arc, +} + +impl CommandSender { + fn new(twd_cache: Arc) -> (Self, Receiver) { + // We need to use a normal std channel since our receiving side is non-async + let (chan, rx) = mpsc::channel(); + (Self { chan, twd_cache }, rx) + } + + pub fn send(&self, c: WFCommand) { + self.chan.send(c).unwrap(); + } + + /// Request to create a timer + pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future { + let (tx, rx) = oneshot::channel(); + self.twd_cache.timer_futures.insert(a.timer_id.clone(), tx); + let c = WFCommand::AddTimer(a); + dbg!("Send add timer"); + self.send(c.into()); + rx + } + + /// Cancel a timer + pub fn cancel_timer(&self, timer_id: &str) { + let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { + timer_id: timer_id.to_string(), + }); + // Timer cancellation immediately unblocks awaiting a timer + self.twd_cache + .timer_futures + .remove(timer_id) + .map(|t| t.1.send(()).unwrap()); + self.send(c.into()); + } +} + +impl TestWorkflowDriver { + /// Create a new test workflow driver from a workflow "function" which is really a closure + /// that returns an async block. + /// + /// This function, though not async itself, must be called from an async context as it depends + /// on the tokio runtime to spawn the workflow future. + pub fn new(workflow_fn: F) -> Self + where + F: Fn(CommandSender) -> Fut, + Fut: Future + Send + 'static, + { + let twd_cache = Arc::new(TestWfDriverCache::default()); + let (sender, receiver) = CommandSender::new(twd_cache.clone()); + let send_half_drop_on_wf_exit = sender.chan.clone(); + let is_done = Arc::new(AtomicBool::new(false)); + let is_done_in_fut = is_done.clone(); + let wf_inner_fut = workflow_fn(sender); + let wf_future = async move { + wf_inner_fut.await; + is_done_in_fut.store(true, Ordering::SeqCst); + drop(send_half_drop_on_wf_exit); + dbg!("Exiting wf future"); + }; + let join_handle = tokio::spawn(wf_future); + Self { + join_handle, + commands_from_wf: receiver, + is_done, + cache: twd_cache, + } + } + + // TODO: Needs to be returned separately from new + /// Wait for the test workflow to exit + pub async fn join(self) -> Result<(), JoinError> { + self.join_handle.await + } +} + +impl WorkflowFetcher for TestWorkflowDriver { + fn fetch_workflow_iteration_output(&mut self) -> Vec { + let mut emit_these = vec![]; + + // TODO: We really want this to be like a condvar, where we block here until either + // the wf is done or we are blocking on a command, doing deadlock if neither happens after + // a timeout. + // let is_blocking_on_cmds = self.cache.blocked_commands.load(Ordering::SeqCst) > 0; + + // If the workflow is blocking on one or more commands, we will perform a blocking wait + // for it to send us new commands, with a timeout. If we are not blocking on any commands, + // and we hit the timeout, the workflow is spinning or deadlocked. + + // futures::executor::block_on(self.cache.cmd_notifier.notified()); + loop { + match self + .commands_from_wf + .recv_timeout(Duration::from_millis(10)) + { + Ok(c) => emit_these.push(c), + Err(RecvTimeoutError::Timeout) => { + dbg!("Timeout"); + break; + } + Err(RecvTimeoutError::Disconnected) => { + if self.is_done.load(Ordering::SeqCst) { + break; + } + unreachable!("Workflow done flag must be set properly") + } + } + } + + debug!(emit_these = ?emit_these, "Test wf driver emitting"); + + emit_these + } +} +impl ActivationListener for TestWorkflowDriver { + fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { + if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) + | wf_activation_job::Variant::CancelTimer(CancelTimer { timer_id }) = activation + { + if let Some(tx) = self.cache.timer_futures.remove(timer_id) { + tx.1.send(()).unwrap() + } + } + } +} diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index dc5aaec8d..011da2805 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -1,10 +1,10 @@ type Result = std::result::Result; +mod async_workflow_driver; mod history_builder; -mod workflow_driver; +pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver}; pub(crate) use history_builder::TestHistoryBuilder; -pub(super) use workflow_driver::{CommandSender, TestWorkflowDriver}; use crate::workflow::WorkflowConcurrencyManager; use crate::{ diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs deleted file mode 100644 index 443b66d06..000000000 --- a/src/machines/test_help/workflow_driver.rs +++ /dev/null @@ -1,171 +0,0 @@ -use crate::{ - machines::WFCommand, - protos::{ - coresdk::{wf_activation_job, FireTimer}, - temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, - }, - workflow::{ActivationListener, WorkflowFetcher}, -}; -use dashmap::DashMap; -use futures::Future; -use std::sync::{ - mpsc::{self, Receiver, Sender}, - Arc, -}; - -/// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control -/// over when commands are returned than a normal workflow would. -/// -/// It replaces "TestEntityTestListenerBase" in java which is pretty hard to follow. -/// -/// It is important to understand that this driver doesn't work like a real workflow in the sense -/// that nothing in it ever blocks, or ever should block. Every workflow task will run through the -/// *entire* workflow, but any commands given to the sink after a `Waiting` command are simply -/// ignored, allowing you to simulate blocking without ever actually blocking. -pub(in crate::machines) struct TestWorkflowDriver { - wf_function: F, - cache: Arc, - /// Set to true if a workflow execution completed/failed/cancelled/etc has been issued - sent_final_execution: bool, -} - -#[derive(Default, Debug)] -pub(in crate::machines) struct TestWfDriverCache { - /// Holds a mapping of timer id -> is finished - /// - /// It can and should be eliminated by not recreating CommandSender on every iteration, which - /// means keeping the workflow suspended in a future somewhere. I tried this and it was hard, - /// but ultimately it's how real workflows will need to work. - unblocked_timers: DashMap, -} - -impl TestWorkflowDriver -where - F: Fn(CommandSender) -> Fut, - Fut: Future, -{ - /// Create a new test workflow driver from a workflow "function" which is really a closure - /// that returns an async block. - /// - /// In an ideal world, the workflow fn would actually be a generator which can yield commands, - /// and we may well end up doing something like that later. - pub(in crate::machines) fn new(workflow_fn: F) -> Self { - Self { - wf_function: workflow_fn, - cache: Default::default(), - sent_final_execution: false, - } - } -} - -impl ActivationListener for TestWorkflowDriver { - fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { - if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { - Arc::get_mut(&mut self.cache) - .unwrap() - .unblocked_timers - .insert(timer_id.clone(), true); - } - } -} - -impl WorkflowFetcher for TestWorkflowDriver -where - F: Fn(CommandSender) -> Fut + Send + Sync, - Fut: Future, -{ - fn fetch_workflow_iteration_output(&mut self) -> Vec { - // If we have already sent the command to complete the workflow, we don't want - // to re-run the workflow again. - // TODO: This would be better to solve by actually pausing the workflow properly rather - // than doing the re-run the whole thing every time deal. - // TODO: Probably screws up signals edge case where signal rcvd after complete makes it to - // server - if self.sent_final_execution { - return vec![]; - } - - let (sender, receiver) = CommandSender::new(self.cache.clone()); - // Call the closure that produces the workflow future - let wf_future = (self.wf_function)(sender); - - // TODO: This is pointless right now -- either actually use async and suspend on awaits - // or just remove it. - // Create a tokio runtime to block on the future - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(wf_future); - - let cmds = receiver.into_iter(); - - let mut emit_these = vec![]; - for cmd in cmds { - match cmd { - TestWFCommand::WFCommand(c) => { - if let WFCommand::CompleteWorkflow(_) = &c { - self.sent_final_execution = true; - } - emit_these.push(c); - } - TestWFCommand::Waiting => { - // Ignore further commands since we're waiting on something - break; - } - } - } - - debug!(emit_these = ?emit_these, "Test wf driver emitting"); - - emit_these - } -} - -#[derive(Debug, derive_more::From)] -pub enum TestWFCommand { - WFCommand(WFCommand), - /// When a test workflow wants to await something like a timer or an activity, we will - /// ignore all commands produced after the wait, since they couldn't have actually happened - /// in a real workflow, since you'd be stuck waiting - Waiting, -} - -pub struct CommandSender { - chan: Sender, - twd_cache: Arc, -} - -impl CommandSender { - fn new(twd_cache: Arc) -> (Self, Receiver) { - let (chan, rx) = mpsc::channel(); - (Self { chan, twd_cache }, rx) - } - - /// Request to create a timer. Returns true if the timer has fired, false if it hasn't yet. - /// - /// If `do_wait` is true, issue a waiting command if the timer is not finished. - pub fn timer(&mut self, a: StartTimerCommandAttributes, do_wait: bool) -> bool { - let finished = match self.twd_cache.unblocked_timers.entry(a.timer_id.clone()) { - dashmap::mapref::entry::Entry::Occupied(existing) => *existing.get(), - dashmap::mapref::entry::Entry::Vacant(v) => { - let c = WFCommand::AddTimer(a); - self.chan.send(c.into()).unwrap(); - v.insert(false); - false - } - }; - if !finished && do_wait { - self.chan.send(TestWFCommand::Waiting).unwrap(); - } - finished - } - - pub fn cancel_timer(&mut self, timer_id: &str) { - let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { - timer_id: timer_id.to_string(), - }); - self.chan.send(c.into()).unwrap(); - } - - pub fn send(&mut self, c: WFCommand) { - self.chan.send(c.into()).unwrap(); - } -} diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index cdbd6bfef..8e4c56272 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -269,7 +269,6 @@ mod test { }; use rstest::{fixture, rstest}; use std::time::Duration; - use tracing::Level; #[fixture] fn fire_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { @@ -289,7 +288,9 @@ mod test { timer_id: "timer1".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; - command_sink.timer(timer, true); + dbg!("BLORP"); + command_sink.timer(timer).await; + dbg!("DORP"); let complete = CompleteWorkflowExecutionCommandAttributes::default(); command_sink.send(complete.into()); @@ -307,10 +308,9 @@ mod test { } #[rstest] - fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "happy_inc"); - let _enter = s.enter(); - + #[tokio::test(flavor = "multi_thread")] + async fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + // tracing_init(); let (t, mut state_machines) = fire_happy_hist; let commands = t @@ -331,10 +331,8 @@ mod test { } #[rstest] - fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "happy_full"); - let _enter = s.enter(); - + #[tokio::test(flavor = "multi_thread")] + async fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = fire_happy_hist; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, None) @@ -346,14 +344,14 @@ mod test { ); } - #[test] - fn mismatched_timer_ids_errors() { + #[tokio::test(flavor = "multi_thread")] + async fn mismatched_timer_ids_errors() { let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let timer = StartTimerCommandAttributes { timer_id: "realid".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; - command_sink.timer(timer, true); + command_sink.timer(timer).await; }); let t = canned_histories::single_timer("badid"); @@ -373,22 +371,19 @@ mod test { #[fixture] fn cancellation_setup() -> (TestHistoryBuilder, WorkflowMachines) { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { - let _cancel_this = cmd_sink.timer( - StartTimerCommandAttributes { - timer_id: "cancel_timer".to_string(), - start_to_fire_timeout: Some(Duration::from_secs(500).into()), - }, - false, - ); - cmd_sink.timer( - StartTimerCommandAttributes { + let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { + timer_id: "cancel_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(500).into()), + }); + cmd_sink + .timer(StartTimerCommandAttributes { timer_id: "wait_timer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), - }, - true, - ); + }) + .await; // Cancel the first timer after having waited on the second cmd_sink.cancel_timer("cancel_timer"); + cancel_timer_fut.await; let complete = CompleteWorkflowExecutionCommandAttributes::default(); cmd_sink.send(complete.into()); @@ -404,7 +399,8 @@ mod test { } #[rstest] - fn incremental_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { + #[tokio::test(flavor = "multi_thread")] + async fn incremental_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = cancellation_setup; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) @@ -429,7 +425,8 @@ mod test { } #[rstest] - fn full_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { + #[tokio::test(flavor = "multi_thread")] + async fn full_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = cancellation_setup; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, None) @@ -438,18 +435,16 @@ mod test { assert_eq!(commands.len(), 0); } - #[test] - fn cancel_before_sent_to_server() { + #[tokio::test(flavor = "multi_thread")] + async fn cancel_before_sent_to_server() { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { - cmd_sink.timer( - StartTimerCommandAttributes { - timer_id: "cancel_timer".to_string(), - start_to_fire_timeout: Some(Duration::from_secs(500).into()), - }, - false, - ); + let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { + timer_id: "cancel_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(500).into()), + }); // Immediately cancel the timer cmd_sink.cancel_timer("cancel_timer"); + cancel_timer_fut.await; let complete = CompleteWorkflowExecutionCommandAttributes::default(); cmd_sink.send(complete.into()); From 6433a9d4a687ea4d1f5e3e5a447c90f3f1c715c6 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 16 Mar 2021 10:44:52 -0700 Subject: [PATCH 2/3] A much improved version that uses a condvar to wake up the fetcher This is pretty complex, but ultimately is probably almost exactly what will be happening in a real Rust workflow. --- protos/local/core_interface.proto | 2 + src/lib.rs | 23 +- .../test_help/async_workflow_driver.rs | 217 ++++++++++++------ src/machines/test_help/history_builder.rs | 2 +- src/machines/timer_state_machine.rs | 23 +- src/machines/workflow_machines.rs | 3 + src/workflow/mod.rs | 12 +- 7 files changed, 185 insertions(+), 97 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 0880ad626..e2a0bf38b 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -64,6 +64,8 @@ message WFActivationJob { // A timer has fired, allowing whatever was waiting on it (if anything) to proceed FireTimer fire_timer = 2; // A timer was canceled, and needs to be unblocked on the lang side. + // TODO: No reason for this to exist. Lang is always doing the cancel, so it can remove + // it itself. CancelTimer cancel_timer = 3; // Workflow was reset. The randomness seed must be updated. UpdateRandomSeed update_random_seed = 4; diff --git a/src/lib.rs b/src/lib.rs index 49a752131..04b549fe7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; use crate::{ - machines::{InconvertibleCommandError, WFCommand, WFMachinesError}, + machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ @@ -202,10 +202,7 @@ where .ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { Status::Successful(success) => { - self.push_lang_commands(&run_id, success)?; - let commands = self.access_wf_machine(&run_id, move |mgr| { - Ok(mgr.machines.get_commands()) - })?; + let commands = self.push_lang_commands(&run_id, success)?; // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught // up on replay. @@ -283,20 +280,20 @@ impl CoreSDK { } } - /// Feed commands from the lang sdk into the appropriate workflow bridge - fn push_lang_commands(&self, run_id: &str, success: WfActivationSuccess) -> Result<()> { + /// Feed commands from the lang sdk into appropriate workflow manager which will iterate + /// the state machines and return commands ready to be sent to the server + fn push_lang_commands( + &self, + run_id: &str, + success: WfActivationSuccess, + ) -> Result> { // Convert to wf commands let cmds = success .commands .into_iter() .map(|c| c.try_into().map_err(Into::into)) .collect::>>()?; - self.access_wf_machine(run_id, move |mgr| { - mgr.command_sink.send(cmds)?; - mgr.machines.iterate_machines()?; - Ok(()) - })?; - Ok(()) + self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } /// Blocks polling the server until it responds, or until the shutdown flag is set (aborting diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 924817450..8aa4cf796 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,35 +1,112 @@ use crate::{ machines::WFCommand, protos::{ - coresdk::{wf_activation_job, CancelTimer, FireTimer}, + coresdk::{wf_activation_job, FireTimer}, temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, }, workflow::{ActivationListener, WorkflowFetcher}, }; -use dashmap::DashMap; use futures::channel::oneshot; +use parking_lot::{Condvar, Mutex}; use std::{ + collections::HashMap, future::Future, sync::{ - atomic::{AtomicBool, Ordering}, - mpsc::{self, Receiver, RecvTimeoutError, Sender}, + mpsc::{self, Receiver, Sender}, Arc, }, time::Duration, }; -use tokio::task::{JoinError, JoinHandle}; +use tokio::{ + runtime::Runtime, + task::{JoinError, JoinHandle}, +}; pub struct TestWorkflowDriver { - join_handle: JoinHandle<()>, + join_handle: Option>, commands_from_wf: Receiver, cache: Arc, - is_done: Arc, + runtime: Runtime, +} + +#[derive(Debug)] +struct TestWfDriverCache { + blocking_condvar: Arc<(Mutex, Condvar)>, +} + +impl TestWfDriverCache { + /// Unblock a command by ID + fn unblock(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + if let Some(t) = bc.issued_commands.remove(id) { + t.unblocker.send(()).unwrap() + }; + } + + /// Cancel a timer by ID. Timers get some special handling here since they are always + /// removed from the "lang" side without needing a response from core. + fn cancel_timer(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + bc.issued_commands.remove(id); + } + + /// Track a new command that the wf has sent down the command sink. The command starts in + /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed + fn add_sent_cmd(&self, id: String) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + let mut bc = self.blocking_condvar.0.lock(); + bc.issued_commands.insert( + id, + IssuedCommand { + unblocker: tx, + status: CommandStatus::Sent, + }, + ); + rx + } + + /// Indicate that a command is being `await`ed + fn set_cmd_blocked(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + if let Some(cmd) = bc.issued_commands.get_mut(id) { + cmd.status = CommandStatus::Blocked; + } + // Wake up the fetcher thread, since we have blocked on a command and that would mean we've + // finished sending what we can. + self.blocking_condvar.1.notify_one(); + } } +/// Contains the info needed to know if workflow code is "done" being iterated or not. A workflow +/// iteration is considered complete if the workflow exits, or the top level task (the main codepath +/// of the workflow) is blocked waiting on a command). #[derive(Default, Debug)] -pub struct TestWfDriverCache { +struct BlockingCondInfo { /// Holds a mapping of timer id -> oneshot channel to resolve it - timer_futures: DashMap>, + /// TODO: Upgrade w/ enum key once activities are in + issued_commands: HashMap, + wf_is_done: bool, +} + +impl BlockingCondInfo { + fn num_blocked_cmds(&self) -> usize { + self.issued_commands + .values() + .filter(|ic| ic.status == CommandStatus::Blocked) + .count() + } +} + +#[derive(Debug)] +struct IssuedCommand { + unblocker: oneshot::Sender<()>, + status: CommandStatus, +} + +#[derive(Debug, PartialEq)] +enum CommandStatus { + Sent, + Blocked, } pub struct CommandSender { @@ -50,25 +127,24 @@ impl CommandSender { /// Request to create a timer pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future { - let (tx, rx) = oneshot::channel(); - self.twd_cache.timer_futures.insert(a.timer_id.clone(), tx); + let tid = a.timer_id.clone(); let c = WFCommand::AddTimer(a); - dbg!("Send add timer"); - self.send(c.into()); - rx + self.send(c); + let rx = self.twd_cache.add_sent_cmd(tid.clone()); + let cache_clone = self.twd_cache.clone(); + async move { + cache_clone.set_cmd_blocked(&tid); + rx.await + } } /// Cancel a timer pub fn cancel_timer(&self, timer_id: &str) { let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { - timer_id: timer_id.to_string(), + timer_id: timer_id.to_owned(), }); - // Timer cancellation immediately unblocks awaiting a timer - self.twd_cache - .timer_futures - .remove(timer_id) - .map(|t| t.1.send(()).unwrap()); - self.send(c.into()); + self.twd_cache.cancel_timer(timer_id); + self.send(c); } } @@ -83,31 +159,61 @@ impl TestWorkflowDriver { F: Fn(CommandSender) -> Fut, Fut: Future + Send + 'static, { - let twd_cache = Arc::new(TestWfDriverCache::default()); + let blocking_condvar = Arc::new((Default::default(), Default::default())); + let bc_clone = blocking_condvar.clone(); + let twd_cache = Arc::new(TestWfDriverCache { blocking_condvar }); let (sender, receiver) = CommandSender::new(twd_cache.clone()); - let send_half_drop_on_wf_exit = sender.chan.clone(); - let is_done = Arc::new(AtomicBool::new(false)); - let is_done_in_fut = is_done.clone(); + let wf_inner_fut = workflow_fn(sender); let wf_future = async move { wf_inner_fut.await; - is_done_in_fut.store(true, Ordering::SeqCst); - drop(send_half_drop_on_wf_exit); - dbg!("Exiting wf future"); + + let mut bc = bc_clone.0.lock(); + bc.wf_is_done = true; + // Wake up the fetcher thread, since we have finished the workflow and that would mean + // we've finished sending what we can. + bc_clone.1.notify_one(); }; - let join_handle = tokio::spawn(wf_future); + let runtime = Runtime::new().unwrap(); + let join_handle = Some(runtime.spawn(wf_future)); Self { join_handle, commands_from_wf: receiver, - is_done, cache: twd_cache, + runtime, + } + } + + /// If there are no commands, and the workflow isn't done, we need to wait for one of those + /// things to become true before we fetch commands. Otherwise, time out via panic. + /// + /// Returns true if the workflow function exited + // TODO: When we try to deal with spawning concurrent tasks inside a workflow, we will + // somehow need to check that specifically the top-level task (the main wf function) is + // blocked waiting on a command. If the workflow spawns a concurrent task, and it blocks + // on a command before the main wf code path does, it will cause a spurious wakeup. + fn wait_until_wf_iteration_done(&mut self) -> bool { + let mut bc_lock = self.cache.blocking_condvar.0.lock(); + while !bc_lock.wf_is_done && bc_lock.num_blocked_cmds() == 0 { + let timeout_res = self + .cache + .blocking_condvar + .1 + .wait_for(&mut bc_lock, Duration::from_secs(1)); + if timeout_res.timed_out() { + panic!("Workflow deadlocked (1 second)") + } } + bc_lock.wf_is_done } - // TODO: Needs to be returned separately from new /// Wait for the test workflow to exit - pub async fn join(self) -> Result<(), JoinError> { - self.join_handle.await + fn join(&mut self) -> Result<(), JoinError> { + if let Some(jh) = self.join_handle.take() { + self.runtime.block_on(jh) + } else { + Ok(()) + } } } @@ -115,33 +221,15 @@ impl WorkflowFetcher for TestWorkflowDriver { fn fetch_workflow_iteration_output(&mut self) -> Vec { let mut emit_these = vec![]; - // TODO: We really want this to be like a condvar, where we block here until either - // the wf is done or we are blocking on a command, doing deadlock if neither happens after - // a timeout. - // let is_blocking_on_cmds = self.cache.blocked_commands.load(Ordering::SeqCst) > 0; - - // If the workflow is blocking on one or more commands, we will perform a blocking wait - // for it to send us new commands, with a timeout. If we are not blocking on any commands, - // and we hit the timeout, the workflow is spinning or deadlocked. - - // futures::executor::block_on(self.cache.cmd_notifier.notified()); - loop { - match self - .commands_from_wf - .recv_timeout(Duration::from_millis(10)) - { - Ok(c) => emit_these.push(c), - Err(RecvTimeoutError::Timeout) => { - dbg!("Timeout"); - break; - } - Err(RecvTimeoutError::Disconnected) => { - if self.is_done.load(Ordering::SeqCst) { - break; - } - unreachable!("Workflow done flag must be set properly") - } - } + let wf_is_done = self.wait_until_wf_iteration_done(); + + for c in self.commands_from_wf.try_iter() { + emit_these.push(c); + } + + if wf_is_done { + // TODO: Eventually upgrade to return workflow failures on panic + self.join().expect("Workflow completes without panic"); } debug!(emit_these = ?emit_these, "Test wf driver emitting"); @@ -149,14 +237,11 @@ impl WorkflowFetcher for TestWorkflowDriver { emit_these } } + impl ActivationListener for TestWorkflowDriver { fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { - if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) - | wf_activation_job::Variant::CancelTimer(CancelTimer { timer_id }) = activation - { - if let Some(tx) = self.cache.timer_futures.remove(timer_id) { - tx.1.send(()).unwrap() - } + if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { + self.cache.unblock(timer_id); } } } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 68b594945..2a1c8820e 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -146,7 +146,7 @@ impl TestHistoryBuilder { } /// Handle workflow task(s) using the provided [WorkflowMachines]. Will process as many workflow - /// tasks as the provided `to_wf_task_num` parameter.. + /// tasks as the provided `to_wf_task_num` parameter. /// /// # Panics /// * Can panic if the passed in machines have been manipulated outside of this builder diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 8e4c56272..56c8696f3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -288,9 +288,7 @@ mod test { timer_id: "timer1".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; - dbg!("BLORP"); command_sink.timer(timer).await; - dbg!("DORP"); let complete = CompleteWorkflowExecutionCommandAttributes::default(); command_sink.send(complete.into()); @@ -308,9 +306,7 @@ mod test { } #[rstest] - #[tokio::test(flavor = "multi_thread")] - async fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - // tracing_init(); + fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = fire_happy_hist; let commands = t @@ -331,8 +327,7 @@ mod test { } #[rstest] - #[tokio::test(flavor = "multi_thread")] - async fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = fire_happy_hist; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, None) @@ -344,8 +339,8 @@ mod test { ); } - #[tokio::test(flavor = "multi_thread")] - async fn mismatched_timer_ids_errors() { + #[test] + fn mismatched_timer_ids_errors() { let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let timer = StartTimerCommandAttributes { timer_id: "realid".to_string(), @@ -399,8 +394,7 @@ mod test { } #[rstest] - #[tokio::test(flavor = "multi_thread")] - async fn incremental_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { + fn incremental_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = cancellation_setup; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) @@ -425,8 +419,7 @@ mod test { } #[rstest] - #[tokio::test(flavor = "multi_thread")] - async fn full_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { + fn full_cancellation(cancellation_setup: (TestHistoryBuilder, WorkflowMachines)) { let (t, mut state_machines) = cancellation_setup; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, None) @@ -435,8 +428,8 @@ mod test { assert_eq!(commands.len(), 0); } - #[tokio::test(flavor = "multi_thread")] - async fn cancel_before_sent_to_server() { + #[test] + fn cancel_before_sent_to_server() { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { timer_id: "cancel_timer".to_string(), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 22bcb484a..47131db3f 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -237,6 +237,9 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); + // TODO: Ideally this would actually be called every time a command is pushed from a + // workflow that isn't going across the public api, but as it stands that isn't really + // doable, so this has to exist here for test workflow driver self.iterate_machines()?; Ok(()) } diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 712e01a47..281bce289 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -7,7 +7,7 @@ pub(crate) use concurrency_manager::WorkflowConcurrencyManager; pub(crate) use driven_workflow::{ActivationListener, DrivenWorkflow, WorkflowFetcher}; use crate::{ - machines::{WFCommand, WorkflowMachines}, + machines::{ProtoCommand, WFCommand, WorkflowMachines}, protos::{ coresdk::WfActivation, temporal::api::{history::v1::History, workflowservice::v1::PollWorkflowTaskQueueResponse}, @@ -21,7 +21,7 @@ use std::sync::mpsc::Sender; /// associated with that specific workflow run. pub(crate) struct WorkflowManager { pub machines: WorkflowMachines, - pub command_sink: Sender>, + command_sink: Sender>, /// The last recorded history we received from the server for this workflow run. This must be /// kept because the lang side polls & completes for every workflow task, but we do not need /// to poll the server that often during replay. @@ -107,4 +107,12 @@ impl WorkflowManager { more_activations_needed, }) } + + // Feed the workflow machines new commands issued by the executing workflow code, iterate + // the workflow machines, and spit out the commands which are ready to be sent off to the server + pub fn push_commands(&mut self, cmds: Vec) -> Result> { + self.command_sink.send(cmds)?; + self.machines.iterate_machines()?; + Ok(self.machines.get_commands()) + } } From de7706173d67e8f76867fbff902c5fc24bed2409 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 17 Mar 2021 10:29:51 -0700 Subject: [PATCH 3/3] Cleanup some docstrings --- src/machines/test_help/async_workflow_driver.rs | 3 +-- src/workflow/mod.rs | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 8aa4cf796..55bdade46 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -152,8 +152,7 @@ impl TestWorkflowDriver { /// Create a new test workflow driver from a workflow "function" which is really a closure /// that returns an async block. /// - /// This function, though not async itself, must be called from an async context as it depends - /// on the tokio runtime to spawn the workflow future. + /// Creates a tokio runtime to execute the workflow on. pub fn new(workflow_fn: F) -> Self where F: Fn(CommandSender) -> Fut, diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 281bce289..67413bbc3 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -108,8 +108,8 @@ impl WorkflowManager { }) } - // Feed the workflow machines new commands issued by the executing workflow code, iterate - // the workflow machines, and spit out the commands which are ready to be sent off to the server + /// Feed the workflow machines new commands issued by the executing workflow code, iterate the + /// workflow machines, and spit out the commands which are ready to be sent off to the server pub fn push_commands(&mut self, cmds: Vec) -> Result> { self.command_sink.send(cmds)?; self.machines.iterate_machines()?;