Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ prost = "0.7"
prost-types = "0.7"
slotmap = "1.0"
thiserror = "1.0"
tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] }
tokio = { version = "1.1", features = ["rt", "rt-multi-thread", "parking_lot"] }
tracing = { version = "0.1", features = ["log"] }
tracing-opentelemetry = "0.11"
tracing-subscriber = "0.2"
Expand Down
2 changes: 2 additions & 0 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ message WFActivationJob {
// A timer has fired, allowing whatever was waiting on it (if anything) to proceed
FireTimer fire_timer = 2;
// A timer was canceled, and needs to be unblocked on the lang side.
// TODO: No reason for this to exist. Lang is always doing the cancel, so it can remove
// it itself.
CancelTimer cancel_timer = 3;
// Workflow was reset. The randomness seed must be updated.
UpdateRandomSeed update_random_seed = 4;
Expand Down
23 changes: 10 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions};
pub use url::Url;

use crate::{
machines::{InconvertibleCommandError, WFCommand, WFMachinesError},
machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError},
pending_activations::{PendingActivation, PendingActivations},
protos::{
coresdk::{
Expand Down Expand Up @@ -202,10 +202,7 @@ where
.ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?;
match wfstatus {
Status::Successful(success) => {
self.push_lang_commands(&run_id, success)?;
let commands = self.access_wf_machine(&run_id, move |mgr| {
Ok(mgr.machines.get_commands())
})?;
let commands = self.push_lang_commands(&run_id, success)?;
// We only actually want to send commands back to the server if there are
// no more pending activations -- in other words the lang SDK has caught
// up on replay.
Expand Down Expand Up @@ -283,20 +280,20 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
}
}

/// Feed commands from the lang sdk into the appropriate workflow bridge
fn push_lang_commands(&self, run_id: &str, success: WfActivationSuccess) -> Result<()> {
/// Feed commands from the lang sdk into appropriate workflow manager which will iterate
/// the state machines and return commands ready to be sent to the server
fn push_lang_commands(
&self,
run_id: &str,
success: WfActivationSuccess,
) -> Result<Vec<ProtoCommand>> {
// Convert to wf commands
let cmds = success
.commands
.into_iter()
.map(|c| c.try_into().map_err(Into::into))
.collect::<Result<Vec<_>>>()?;
self.access_wf_machine(run_id, move |mgr| {
mgr.command_sink.send(cmds)?;
mgr.machines.iterate_machines()?;
Ok(())
})?;
Ok(())
self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds))
}

/// Blocks polling the server until it responds, or until the shutdown flag is set (aborting
Expand Down
246 changes: 246 additions & 0 deletions src/machines/test_help/async_workflow_driver.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
use crate::{
machines::WFCommand,
protos::{
coresdk::{wf_activation_job, FireTimer},
temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes},
},
workflow::{ActivationListener, WorkflowFetcher},
};
use futures::channel::oneshot;
use parking_lot::{Condvar, Mutex};
use std::{
collections::HashMap,
future::Future,
sync::{
mpsc::{self, Receiver, Sender},
Arc,
},
time::Duration,
};
use tokio::{
runtime::Runtime,
task::{JoinError, JoinHandle},
};

pub struct TestWorkflowDriver {
join_handle: Option<JoinHandle<()>>,
commands_from_wf: Receiver<WFCommand>,
cache: Arc<TestWfDriverCache>,
runtime: Runtime,
}

#[derive(Debug)]
struct TestWfDriverCache {
blocking_condvar: Arc<(Mutex<BlockingCondInfo>, Condvar)>,
}

impl TestWfDriverCache {
/// Unblock a command by ID
fn unblock(&self, id: &str) {
let mut bc = self.blocking_condvar.0.lock();
if let Some(t) = bc.issued_commands.remove(id) {
t.unblocker.send(()).unwrap()
};
}

/// Cancel a timer by ID. Timers get some special handling here since they are always
/// removed from the "lang" side without needing a response from core.
fn cancel_timer(&self, id: &str) {
let mut bc = self.blocking_condvar.0.lock();
bc.issued_commands.remove(id);
}

/// Track a new command that the wf has sent down the command sink. The command starts in
/// [CommandStatus::Sent] and will be marked blocked once it is `await`ed
fn add_sent_cmd(&self, id: String) -> oneshot::Receiver<()> {
let (tx, rx) = oneshot::channel();
let mut bc = self.blocking_condvar.0.lock();
bc.issued_commands.insert(
id,
IssuedCommand {
unblocker: tx,
status: CommandStatus::Sent,
},
);
rx
}

/// Indicate that a command is being `await`ed
fn set_cmd_blocked(&self, id: &str) {
let mut bc = self.blocking_condvar.0.lock();
if let Some(cmd) = bc.issued_commands.get_mut(id) {
cmd.status = CommandStatus::Blocked;
}
// Wake up the fetcher thread, since we have blocked on a command and that would mean we've
// finished sending what we can.
self.blocking_condvar.1.notify_one();
}
}

/// Contains the info needed to know if workflow code is "done" being iterated or not. A workflow
/// iteration is considered complete if the workflow exits, or the top level task (the main codepath
/// of the workflow) is blocked waiting on a command).
#[derive(Default, Debug)]
struct BlockingCondInfo {
/// Holds a mapping of timer id -> oneshot channel to resolve it
/// TODO: Upgrade w/ enum key once activities are in
issued_commands: HashMap<String, IssuedCommand>,
wf_is_done: bool,
}

impl BlockingCondInfo {
fn num_blocked_cmds(&self) -> usize {
self.issued_commands
.values()
.filter(|ic| ic.status == CommandStatus::Blocked)
.count()
}
}

#[derive(Debug)]
struct IssuedCommand {
unblocker: oneshot::Sender<()>,
status: CommandStatus,
}

#[derive(Debug, PartialEq)]
enum CommandStatus {
Sent,
Blocked,
}

pub struct CommandSender {
chan: Sender<WFCommand>,
twd_cache: Arc<TestWfDriverCache>,
}

impl CommandSender {
fn new(twd_cache: Arc<TestWfDriverCache>) -> (Self, Receiver<WFCommand>) {
// We need to use a normal std channel since our receiving side is non-async
let (chan, rx) = mpsc::channel();
(Self { chan, twd_cache }, rx)
}

pub fn send(&self, c: WFCommand) {
self.chan.send(c).unwrap();
}

/// Request to create a timer
pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future {
let tid = a.timer_id.clone();
let c = WFCommand::AddTimer(a);
self.send(c);
let rx = self.twd_cache.add_sent_cmd(tid.clone());
let cache_clone = self.twd_cache.clone();
async move {
cache_clone.set_cmd_blocked(&tid);
rx.await
}
}

/// Cancel a timer
pub fn cancel_timer(&self, timer_id: &str) {
let c = WFCommand::CancelTimer(CancelTimerCommandAttributes {
timer_id: timer_id.to_owned(),
});
self.twd_cache.cancel_timer(timer_id);
self.send(c);
}
}

impl TestWorkflowDriver {
/// Create a new test workflow driver from a workflow "function" which is really a closure
/// that returns an async block.
///
/// Creates a tokio runtime to execute the workflow on.
pub fn new<F, Fut>(workflow_fn: F) -> Self
where
F: Fn(CommandSender) -> Fut,
Fut: Future<Output = ()> + Send + 'static,
{
let blocking_condvar = Arc::new((Default::default(), Default::default()));
let bc_clone = blocking_condvar.clone();
let twd_cache = Arc::new(TestWfDriverCache { blocking_condvar });
let (sender, receiver) = CommandSender::new(twd_cache.clone());

let wf_inner_fut = workflow_fn(sender);
let wf_future = async move {
wf_inner_fut.await;

let mut bc = bc_clone.0.lock();
bc.wf_is_done = true;
// Wake up the fetcher thread, since we have finished the workflow and that would mean
// we've finished sending what we can.
bc_clone.1.notify_one();
};
let runtime = Runtime::new().unwrap();
let join_handle = Some(runtime.spawn(wf_future));
Self {
join_handle,
commands_from_wf: receiver,
cache: twd_cache,
runtime,
}
}

/// If there are no commands, and the workflow isn't done, we need to wait for one of those
/// things to become true before we fetch commands. Otherwise, time out via panic.
///
/// Returns true if the workflow function exited
// TODO: When we try to deal with spawning concurrent tasks inside a workflow, we will
// somehow need to check that specifically the top-level task (the main wf function) is
// blocked waiting on a command. If the workflow spawns a concurrent task, and it blocks
// on a command before the main wf code path does, it will cause a spurious wakeup.
fn wait_until_wf_iteration_done(&mut self) -> bool {
let mut bc_lock = self.cache.blocking_condvar.0.lock();
while !bc_lock.wf_is_done && bc_lock.num_blocked_cmds() == 0 {
let timeout_res = self
.cache
.blocking_condvar
.1
.wait_for(&mut bc_lock, Duration::from_secs(1));
if timeout_res.timed_out() {
panic!("Workflow deadlocked (1 second)")
}
}
bc_lock.wf_is_done
}

/// Wait for the test workflow to exit
fn join(&mut self) -> Result<(), JoinError> {
if let Some(jh) = self.join_handle.take() {
self.runtime.block_on(jh)
} else {
Ok(())
}
}
}

impl WorkflowFetcher for TestWorkflowDriver {
fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
let mut emit_these = vec![];

let wf_is_done = self.wait_until_wf_iteration_done();

for c in self.commands_from_wf.try_iter() {
emit_these.push(c);
}

if wf_is_done {
// TODO: Eventually upgrade to return workflow failures on panic
self.join().expect("Workflow completes without panic");
}

debug!(emit_these = ?emit_these, "Test wf driver emitting");

emit_these
}
}

impl ActivationListener for TestWorkflowDriver {
fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) {
if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation {
self.cache.unblock(timer_id);
}
}
}
2 changes: 1 addition & 1 deletion src/machines/test_help/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ impl TestHistoryBuilder {
}

/// Handle workflow task(s) using the provided [WorkflowMachines]. Will process as many workflow
/// tasks as the provided `to_wf_task_num` parameter..
/// tasks as the provided `to_wf_task_num` parameter.
///
/// # Panics
/// * Can panic if the passed in machines have been manipulated outside of this builder
Expand Down
4 changes: 2 additions & 2 deletions src/machines/test_help/mod.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
type Result<T, E = anyhow::Error> = std::result::Result<T, E>;

mod async_workflow_driver;
mod history_builder;
mod workflow_driver;

pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver};
pub(crate) use history_builder::TestHistoryBuilder;
pub(super) use workflow_driver::{CommandSender, TestWorkflowDriver};

use crate::workflow::WorkflowConcurrencyManager;
use crate::{
Expand Down
Loading