diff --git a/Cargo.toml b/Cargo.toml index 196233379..a4543f56b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,7 @@ prost = "0.7" prost-types = "0.7" slotmap = "1.0" thiserror = "1.0" -tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } +tokio = { version = "1.1", features = ["rt", "rt-multi-thread", "parking_lot"] } tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 0880ad626..e2a0bf38b 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -64,6 +64,8 @@ message WFActivationJob { // A timer has fired, allowing whatever was waiting on it (if anything) to proceed FireTimer fire_timer = 2; // A timer was canceled, and needs to be unblocked on the lang side. + // TODO: No reason for this to exist. Lang is always doing the cancel, so it can remove + // it itself. CancelTimer cancel_timer = 3; // Workflow was reset. The randomness seed must be updated. UpdateRandomSeed update_random_seed = 4; diff --git a/src/lib.rs b/src/lib.rs index 49a752131..04b549fe7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,7 +26,7 @@ pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; use crate::{ - machines::{InconvertibleCommandError, WFCommand, WFMachinesError}, + machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ @@ -202,10 +202,7 @@ where .ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { Status::Successful(success) => { - self.push_lang_commands(&run_id, success)?; - let commands = self.access_wf_machine(&run_id, move |mgr| { - Ok(mgr.machines.get_commands()) - })?; + let commands = self.push_lang_commands(&run_id, success)?; // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught // up on replay. @@ -283,20 +280,20 @@ impl CoreSDK { } } - /// Feed commands from the lang sdk into the appropriate workflow bridge - fn push_lang_commands(&self, run_id: &str, success: WfActivationSuccess) -> Result<()> { + /// Feed commands from the lang sdk into appropriate workflow manager which will iterate + /// the state machines and return commands ready to be sent to the server + fn push_lang_commands( + &self, + run_id: &str, + success: WfActivationSuccess, + ) -> Result> { // Convert to wf commands let cmds = success .commands .into_iter() .map(|c| c.try_into().map_err(Into::into)) .collect::>>()?; - self.access_wf_machine(run_id, move |mgr| { - mgr.command_sink.send(cmds)?; - mgr.machines.iterate_machines()?; - Ok(()) - })?; - Ok(()) + self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } /// Blocks polling the server until it responds, or until the shutdown flag is set (aborting diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs new file mode 100644 index 000000000..55bdade46 --- /dev/null +++ b/src/machines/test_help/async_workflow_driver.rs @@ -0,0 +1,246 @@ +use crate::{ + machines::WFCommand, + protos::{ + coresdk::{wf_activation_job, FireTimer}, + temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, + }, + workflow::{ActivationListener, WorkflowFetcher}, +}; +use futures::channel::oneshot; +use parking_lot::{Condvar, Mutex}; +use std::{ + collections::HashMap, + future::Future, + sync::{ + mpsc::{self, Receiver, Sender}, + Arc, + }, + time::Duration, +}; +use tokio::{ + runtime::Runtime, + task::{JoinError, JoinHandle}, +}; + +pub struct TestWorkflowDriver { + join_handle: Option>, + commands_from_wf: Receiver, + cache: Arc, + runtime: Runtime, +} + +#[derive(Debug)] +struct TestWfDriverCache { + blocking_condvar: Arc<(Mutex, Condvar)>, +} + +impl TestWfDriverCache { + /// Unblock a command by ID + fn unblock(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + if let Some(t) = bc.issued_commands.remove(id) { + t.unblocker.send(()).unwrap() + }; + } + + /// Cancel a timer by ID. Timers get some special handling here since they are always + /// removed from the "lang" side without needing a response from core. + fn cancel_timer(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + bc.issued_commands.remove(id); + } + + /// Track a new command that the wf has sent down the command sink. The command starts in + /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed + fn add_sent_cmd(&self, id: String) -> oneshot::Receiver<()> { + let (tx, rx) = oneshot::channel(); + let mut bc = self.blocking_condvar.0.lock(); + bc.issued_commands.insert( + id, + IssuedCommand { + unblocker: tx, + status: CommandStatus::Sent, + }, + ); + rx + } + + /// Indicate that a command is being `await`ed + fn set_cmd_blocked(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + if let Some(cmd) = bc.issued_commands.get_mut(id) { + cmd.status = CommandStatus::Blocked; + } + // Wake up the fetcher thread, since we have blocked on a command and that would mean we've + // finished sending what we can. + self.blocking_condvar.1.notify_one(); + } +} + +/// Contains the info needed to know if workflow code is "done" being iterated or not. A workflow +/// iteration is considered complete if the workflow exits, or the top level task (the main codepath +/// of the workflow) is blocked waiting on a command). +#[derive(Default, Debug)] +struct BlockingCondInfo { + /// Holds a mapping of timer id -> oneshot channel to resolve it + /// TODO: Upgrade w/ enum key once activities are in + issued_commands: HashMap, + wf_is_done: bool, +} + +impl BlockingCondInfo { + fn num_blocked_cmds(&self) -> usize { + self.issued_commands + .values() + .filter(|ic| ic.status == CommandStatus::Blocked) + .count() + } +} + +#[derive(Debug)] +struct IssuedCommand { + unblocker: oneshot::Sender<()>, + status: CommandStatus, +} + +#[derive(Debug, PartialEq)] +enum CommandStatus { + Sent, + Blocked, +} + +pub struct CommandSender { + chan: Sender, + twd_cache: Arc, +} + +impl CommandSender { + fn new(twd_cache: Arc) -> (Self, Receiver) { + // We need to use a normal std channel since our receiving side is non-async + let (chan, rx) = mpsc::channel(); + (Self { chan, twd_cache }, rx) + } + + pub fn send(&self, c: WFCommand) { + self.chan.send(c).unwrap(); + } + + /// Request to create a timer + pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future { + let tid = a.timer_id.clone(); + let c = WFCommand::AddTimer(a); + self.send(c); + let rx = self.twd_cache.add_sent_cmd(tid.clone()); + let cache_clone = self.twd_cache.clone(); + async move { + cache_clone.set_cmd_blocked(&tid); + rx.await + } + } + + /// Cancel a timer + pub fn cancel_timer(&self, timer_id: &str) { + let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { + timer_id: timer_id.to_owned(), + }); + self.twd_cache.cancel_timer(timer_id); + self.send(c); + } +} + +impl TestWorkflowDriver { + /// Create a new test workflow driver from a workflow "function" which is really a closure + /// that returns an async block. + /// + /// Creates a tokio runtime to execute the workflow on. + pub fn new(workflow_fn: F) -> Self + where + F: Fn(CommandSender) -> Fut, + Fut: Future + Send + 'static, + { + let blocking_condvar = Arc::new((Default::default(), Default::default())); + let bc_clone = blocking_condvar.clone(); + let twd_cache = Arc::new(TestWfDriverCache { blocking_condvar }); + let (sender, receiver) = CommandSender::new(twd_cache.clone()); + + let wf_inner_fut = workflow_fn(sender); + let wf_future = async move { + wf_inner_fut.await; + + let mut bc = bc_clone.0.lock(); + bc.wf_is_done = true; + // Wake up the fetcher thread, since we have finished the workflow and that would mean + // we've finished sending what we can. + bc_clone.1.notify_one(); + }; + let runtime = Runtime::new().unwrap(); + let join_handle = Some(runtime.spawn(wf_future)); + Self { + join_handle, + commands_from_wf: receiver, + cache: twd_cache, + runtime, + } + } + + /// If there are no commands, and the workflow isn't done, we need to wait for one of those + /// things to become true before we fetch commands. Otherwise, time out via panic. + /// + /// Returns true if the workflow function exited + // TODO: When we try to deal with spawning concurrent tasks inside a workflow, we will + // somehow need to check that specifically the top-level task (the main wf function) is + // blocked waiting on a command. If the workflow spawns a concurrent task, and it blocks + // on a command before the main wf code path does, it will cause a spurious wakeup. + fn wait_until_wf_iteration_done(&mut self) -> bool { + let mut bc_lock = self.cache.blocking_condvar.0.lock(); + while !bc_lock.wf_is_done && bc_lock.num_blocked_cmds() == 0 { + let timeout_res = self + .cache + .blocking_condvar + .1 + .wait_for(&mut bc_lock, Duration::from_secs(1)); + if timeout_res.timed_out() { + panic!("Workflow deadlocked (1 second)") + } + } + bc_lock.wf_is_done + } + + /// Wait for the test workflow to exit + fn join(&mut self) -> Result<(), JoinError> { + if let Some(jh) = self.join_handle.take() { + self.runtime.block_on(jh) + } else { + Ok(()) + } + } +} + +impl WorkflowFetcher for TestWorkflowDriver { + fn fetch_workflow_iteration_output(&mut self) -> Vec { + let mut emit_these = vec![]; + + let wf_is_done = self.wait_until_wf_iteration_done(); + + for c in self.commands_from_wf.try_iter() { + emit_these.push(c); + } + + if wf_is_done { + // TODO: Eventually upgrade to return workflow failures on panic + self.join().expect("Workflow completes without panic"); + } + + debug!(emit_these = ?emit_these, "Test wf driver emitting"); + + emit_these + } +} + +impl ActivationListener for TestWorkflowDriver { + fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { + if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { + self.cache.unblock(timer_id); + } + } +} diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 68b594945..2a1c8820e 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -146,7 +146,7 @@ impl TestHistoryBuilder { } /// Handle workflow task(s) using the provided [WorkflowMachines]. Will process as many workflow - /// tasks as the provided `to_wf_task_num` parameter.. + /// tasks as the provided `to_wf_task_num` parameter. /// /// # Panics /// * Can panic if the passed in machines have been manipulated outside of this builder diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index dc5aaec8d..011da2805 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -1,10 +1,10 @@ type Result = std::result::Result; +mod async_workflow_driver; mod history_builder; -mod workflow_driver; +pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver}; pub(crate) use history_builder::TestHistoryBuilder; -pub(super) use workflow_driver::{CommandSender, TestWorkflowDriver}; use crate::workflow::WorkflowConcurrencyManager; use crate::{ diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs deleted file mode 100644 index 443b66d06..000000000 --- a/src/machines/test_help/workflow_driver.rs +++ /dev/null @@ -1,171 +0,0 @@ -use crate::{ - machines::WFCommand, - protos::{ - coresdk::{wf_activation_job, FireTimer}, - temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, - }, - workflow::{ActivationListener, WorkflowFetcher}, -}; -use dashmap::DashMap; -use futures::Future; -use std::sync::{ - mpsc::{self, Receiver, Sender}, - Arc, -}; - -/// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control -/// over when commands are returned than a normal workflow would. -/// -/// It replaces "TestEntityTestListenerBase" in java which is pretty hard to follow. -/// -/// It is important to understand that this driver doesn't work like a real workflow in the sense -/// that nothing in it ever blocks, or ever should block. Every workflow task will run through the -/// *entire* workflow, but any commands given to the sink after a `Waiting` command are simply -/// ignored, allowing you to simulate blocking without ever actually blocking. -pub(in crate::machines) struct TestWorkflowDriver { - wf_function: F, - cache: Arc, - /// Set to true if a workflow execution completed/failed/cancelled/etc has been issued - sent_final_execution: bool, -} - -#[derive(Default, Debug)] -pub(in crate::machines) struct TestWfDriverCache { - /// Holds a mapping of timer id -> is finished - /// - /// It can and should be eliminated by not recreating CommandSender on every iteration, which - /// means keeping the workflow suspended in a future somewhere. I tried this and it was hard, - /// but ultimately it's how real workflows will need to work. - unblocked_timers: DashMap, -} - -impl TestWorkflowDriver -where - F: Fn(CommandSender) -> Fut, - Fut: Future, -{ - /// Create a new test workflow driver from a workflow "function" which is really a closure - /// that returns an async block. - /// - /// In an ideal world, the workflow fn would actually be a generator which can yield commands, - /// and we may well end up doing something like that later. - pub(in crate::machines) fn new(workflow_fn: F) -> Self { - Self { - wf_function: workflow_fn, - cache: Default::default(), - sent_final_execution: false, - } - } -} - -impl ActivationListener for TestWorkflowDriver { - fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { - if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { - Arc::get_mut(&mut self.cache) - .unwrap() - .unblocked_timers - .insert(timer_id.clone(), true); - } - } -} - -impl WorkflowFetcher for TestWorkflowDriver -where - F: Fn(CommandSender) -> Fut + Send + Sync, - Fut: Future, -{ - fn fetch_workflow_iteration_output(&mut self) -> Vec { - // If we have already sent the command to complete the workflow, we don't want - // to re-run the workflow again. - // TODO: This would be better to solve by actually pausing the workflow properly rather - // than doing the re-run the whole thing every time deal. - // TODO: Probably screws up signals edge case where signal rcvd after complete makes it to - // server - if self.sent_final_execution { - return vec![]; - } - - let (sender, receiver) = CommandSender::new(self.cache.clone()); - // Call the closure that produces the workflow future - let wf_future = (self.wf_function)(sender); - - // TODO: This is pointless right now -- either actually use async and suspend on awaits - // or just remove it. - // Create a tokio runtime to block on the future - let rt = tokio::runtime::Runtime::new().unwrap(); - rt.block_on(wf_future); - - let cmds = receiver.into_iter(); - - let mut emit_these = vec![]; - for cmd in cmds { - match cmd { - TestWFCommand::WFCommand(c) => { - if let WFCommand::CompleteWorkflow(_) = &c { - self.sent_final_execution = true; - } - emit_these.push(c); - } - TestWFCommand::Waiting => { - // Ignore further commands since we're waiting on something - break; - } - } - } - - debug!(emit_these = ?emit_these, "Test wf driver emitting"); - - emit_these - } -} - -#[derive(Debug, derive_more::From)] -pub enum TestWFCommand { - WFCommand(WFCommand), - /// When a test workflow wants to await something like a timer or an activity, we will - /// ignore all commands produced after the wait, since they couldn't have actually happened - /// in a real workflow, since you'd be stuck waiting - Waiting, -} - -pub struct CommandSender { - chan: Sender, - twd_cache: Arc, -} - -impl CommandSender { - fn new(twd_cache: Arc) -> (Self, Receiver) { - let (chan, rx) = mpsc::channel(); - (Self { chan, twd_cache }, rx) - } - - /// Request to create a timer. Returns true if the timer has fired, false if it hasn't yet. - /// - /// If `do_wait` is true, issue a waiting command if the timer is not finished. - pub fn timer(&mut self, a: StartTimerCommandAttributes, do_wait: bool) -> bool { - let finished = match self.twd_cache.unblocked_timers.entry(a.timer_id.clone()) { - dashmap::mapref::entry::Entry::Occupied(existing) => *existing.get(), - dashmap::mapref::entry::Entry::Vacant(v) => { - let c = WFCommand::AddTimer(a); - self.chan.send(c.into()).unwrap(); - v.insert(false); - false - } - }; - if !finished && do_wait { - self.chan.send(TestWFCommand::Waiting).unwrap(); - } - finished - } - - pub fn cancel_timer(&mut self, timer_id: &str) { - let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { - timer_id: timer_id.to_string(), - }); - self.chan.send(c.into()).unwrap(); - } - - pub fn send(&mut self, c: WFCommand) { - self.chan.send(c.into()).unwrap(); - } -} diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index cdbd6bfef..56c8696f3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -269,7 +269,6 @@ mod test { }; use rstest::{fixture, rstest}; use std::time::Duration; - use tracing::Level; #[fixture] fn fire_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { @@ -289,7 +288,7 @@ mod test { timer_id: "timer1".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; - command_sink.timer(timer, true); + command_sink.timer(timer).await; let complete = CompleteWorkflowExecutionCommandAttributes::default(); command_sink.send(complete.into()); @@ -308,9 +307,6 @@ mod test { #[rstest] fn test_fire_happy_path_inc(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "happy_inc"); - let _enter = s.enter(); - let (t, mut state_machines) = fire_happy_hist; let commands = t @@ -332,9 +328,6 @@ mod test { #[rstest] fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "happy_full"); - let _enter = s.enter(); - let (t, mut state_machines) = fire_happy_hist; let commands = t .handle_workflow_task_take_cmds(&mut state_machines, None) @@ -353,7 +346,7 @@ mod test { timer_id: "realid".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; - command_sink.timer(timer, true); + command_sink.timer(timer).await; }); let t = canned_histories::single_timer("badid"); @@ -373,22 +366,19 @@ mod test { #[fixture] fn cancellation_setup() -> (TestHistoryBuilder, WorkflowMachines) { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { - let _cancel_this = cmd_sink.timer( - StartTimerCommandAttributes { - timer_id: "cancel_timer".to_string(), - start_to_fire_timeout: Some(Duration::from_secs(500).into()), - }, - false, - ); - cmd_sink.timer( - StartTimerCommandAttributes { + let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { + timer_id: "cancel_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(500).into()), + }); + cmd_sink + .timer(StartTimerCommandAttributes { timer_id: "wait_timer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), - }, - true, - ); + }) + .await; // Cancel the first timer after having waited on the second cmd_sink.cancel_timer("cancel_timer"); + cancel_timer_fut.await; let complete = CompleteWorkflowExecutionCommandAttributes::default(); cmd_sink.send(complete.into()); @@ -441,15 +431,13 @@ mod test { #[test] fn cancel_before_sent_to_server() { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { - cmd_sink.timer( - StartTimerCommandAttributes { - timer_id: "cancel_timer".to_string(), - start_to_fire_timeout: Some(Duration::from_secs(500).into()), - }, - false, - ); + let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { + timer_id: "cancel_timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(500).into()), + }); // Immediately cancel the timer cmd_sink.cancel_timer("cancel_timer"); + cancel_timer_fut.await; let complete = CompleteWorkflowExecutionCommandAttributes::default(); cmd_sink.send(complete.into()); diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 22bcb484a..47131db3f 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -237,6 +237,9 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); + // TODO: Ideally this would actually be called every time a command is pushed from a + // workflow that isn't going across the public api, but as it stands that isn't really + // doable, so this has to exist here for test workflow driver self.iterate_machines()?; Ok(()) } diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 712e01a47..67413bbc3 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -7,7 +7,7 @@ pub(crate) use concurrency_manager::WorkflowConcurrencyManager; pub(crate) use driven_workflow::{ActivationListener, DrivenWorkflow, WorkflowFetcher}; use crate::{ - machines::{WFCommand, WorkflowMachines}, + machines::{ProtoCommand, WFCommand, WorkflowMachines}, protos::{ coresdk::WfActivation, temporal::api::{history::v1::History, workflowservice::v1::PollWorkflowTaskQueueResponse}, @@ -21,7 +21,7 @@ use std::sync::mpsc::Sender; /// associated with that specific workflow run. pub(crate) struct WorkflowManager { pub machines: WorkflowMachines, - pub command_sink: Sender>, + command_sink: Sender>, /// The last recorded history we received from the server for this workflow run. This must be /// kept because the lang side polls & completes for every workflow task, but we do not need /// to poll the server that often during replay. @@ -107,4 +107,12 @@ impl WorkflowManager { more_activations_needed, }) } + + /// Feed the workflow machines new commands issued by the executing workflow code, iterate the + /// workflow machines, and spit out the commands which are ready to be sent off to the server + pub fn push_commands(&mut self, cmds: Vec) -> Result> { + self.command_sink.send(cmds)?; + self.machines.iterate_machines()?; + Ok(self.machines.get_commands()) + } }