Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 76 additions & 0 deletions src/machines/activity_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -368,3 +368,79 @@ pub(super) struct TimedOut {}

#[derive(Default, Clone)]
pub(super) struct Canceled {}

#[cfg(test)]
mod test {
use super::*;
use crate::{
machines::{
test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver},
workflow_machines::WorkflowMachines,
},
protos::coresdk::workflow_commands::CompleteWorkflowExecution,
test_help::canned_histories,
};
use rstest::{fixture, rstest};
use std::time::Duration;

#[fixture]
fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) {
let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move {
let activity = ScheduleActivity {
activity_id: "activity-id-1".to_string(),
..Default::default()
};
command_sink.activity(activity).await;

let complete = CompleteWorkflowExecution::default();
command_sink.send(complete.into());
});

let t = canned_histories::single_activity("activity-id-1");
let state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);

assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap());
(t, state_machines)
}

#[rstest]
fn activity_happy_path_inc(activity_happy_hist: (TestHistoryBuilder, WorkflowMachines)) {
let (t, mut state_machines) = activity_happy_hist;

let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, Some(1))
.unwrap();
state_machines.get_wf_activation();
assert_eq!(commands.len(), 1);
assert_eq!(
commands[0].command_type,
CommandType::ScheduleActivityTask as i32
);
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, Some(2))
.unwrap();
state_machines.get_wf_activation();
assert_eq!(commands.len(), 1);
assert_eq!(
commands[0].command_type,
CommandType::CompleteWorkflowExecution as i32
);
}

#[rstest]
fn activity_happy_path_full(activity_happy_hist: (TestHistoryBuilder, WorkflowMachines)) {
let (t, mut state_machines) = activity_happy_hist;
let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, None)
.unwrap();
assert_eq!(commands.len(), 1);
assert_eq!(
commands[0].command_type,
CommandType::CompleteWorkflowExecution as i32
);
}
}
32 changes: 26 additions & 6 deletions src/machines/test_help/async_workflow_driver.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant;
use crate::protos::coresdk::workflow_activation::ResolveActivity;
use crate::protos::coresdk::workflow_commands::ScheduleActivity;
use crate::{
machines::{workflow_machines::CommandID, WFCommand},
protos::coresdk::{
Expand All @@ -21,6 +24,7 @@ use tokio::{
runtime::Runtime,
task::{JoinError, JoinHandle},
};
use CommandID::Activity;
use CommandID::Timer;

pub struct TestWorkflowDriver {
Expand Down Expand Up @@ -127,13 +131,20 @@ impl CommandSender {

/// Request to create a timer
pub fn timer(&mut self, a: StartTimer) -> impl Future {
let tid = a.timer_id.clone();
let c = WFCommand::AddTimer(a);
self.send_blocking_cmd(Timer(a.timer_id.clone()), WFCommand::AddTimer(a))
}

/// Request to run an activity
pub fn activity(&mut self, a: ScheduleActivity) -> impl Future {
self.send_blocking_cmd(Activity(a.activity_id.clone()), WFCommand::AddActivity(a))
}

fn send_blocking_cmd(&mut self, id: CommandID, c: WFCommand) -> impl Future {
self.send(c);
let rx = self.twd_cache.add_sent_cmd(Timer(tid.clone()));
let rx = self.twd_cache.add_sent_cmd(id.clone());
let cache_clone = self.twd_cache.clone();
async move {
cache_clone.set_cmd_blocked(Timer(tid));
cache_clone.set_cmd_blocked(id);
rx.await
}
}
Expand Down Expand Up @@ -239,8 +250,17 @@ impl WorkflowFetcher for TestWorkflowDriver {

impl ActivationListener for TestWorkflowDriver {
fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) {
if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation {
self.cache.unblock(Timer(timer_id.to_owned()));
match activation {
Variant::FireTimer(FireTimer { timer_id }) => {
self.cache.unblock(Timer(timer_id.to_owned()));
}
Variant::ResolveActivity(ResolveActivity {
activity_id,
result: _result,
}) => {
self.cache.unblock(Activity(activity_id.to_owned()));
}
_ => {}
}
}
}
5 changes: 2 additions & 3 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ pub(crate) struct WorkflowMachines {
/// for that machine.
machines_by_event_id: HashMap<i64, MachineKey>,

/// Maps timer ids as created by workflow authors to their associated machines
/// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum.
/// Maps command ids as created by workflow authors to their associated machines.
id_to_machine: HashMap<CommandID, MachineKey>,

/// Queued commands which have been produced by machines and await processing / being sent to
Expand All @@ -76,7 +75,7 @@ pub(crate) struct WorkflowMachines {
drive_me: DrivenWorkflow,
}

#[derive(Debug, PartialEq, Eq, Hash)]
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub enum CommandID {
Timer(String),
Activity(String),
Expand Down