Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ mod protosext;

pub use protosext::HistoryInfo;

use crate::pollers::ServerGatewayOptions;
use crate::{
machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines},
machines::{
ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines,
},
pollers::ServerGatewayOptions,
protos::{
coresdk::{
complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task,
Expand Down Expand Up @@ -271,6 +273,9 @@ impl DrivenWorkflow for WorkflowBridge {
}
}

// Real bridge doesn't actually need to listen
impl ActivationListener for WorkflowBridge {}

#[derive(thiserror::Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum CoreError {
Expand Down Expand Up @@ -335,7 +340,6 @@ mod test {
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: "timer1".to_string(),
..Default::default()
}),
);
t.add_workflow_task_scheduled_and_started();
Expand Down Expand Up @@ -368,7 +372,7 @@ mod test {
history: Some(History {
events: events_second_batch,
}),
workflow_execution: wf.clone(),
workflow_execution: wf,
..Default::default()
};
let responses = vec![first_response, second_response];
Expand Down
6 changes: 3 additions & 3 deletions src/machines/complete_workflow_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::machines::workflow_machines::WorkflowTrigger;
use crate::{
machines::{
workflow_machines::WorkflowMachines, AddCommand, CancellableCommand, WFCommand,
Expand Down Expand Up @@ -123,11 +124,10 @@ impl From<CompleteWorkflowCommandCreated> for CompleteWorkflowCommandRecorded {
impl WFMachinesAdapter for CompleteWorkflowMachine {
fn adapt_response(
&self,
_wf_machines: &mut WorkflowMachines,
_event: &HistoryEvent,
_has_next_event: bool,
_my_command: CompleteWFCommand,
) -> Result<(), WFMachinesError> {
Ok(())
) -> Result<Vec<WorkflowTrigger>, WFMachinesError> {
Ok(vec![])
}
}
81 changes: 42 additions & 39 deletions src/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,22 @@ pub(crate) mod test_help;

pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines};

use crate::protos::coresdk;
use crate::protos::coresdk::command::Variant;
use crate::protos::temporal::api::command::v1::command::Attributes;
use crate::protos::temporal::api::{
command::v1::{
Command, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
},
enums::v1::CommandType,
history::v1::{
HistoryEvent, WorkflowExecutionCanceledEventAttributes,
WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes,
use crate::{
machines::workflow_machines::WorkflowTrigger,
protos::coresdk::wf_activation,
protos::{
coresdk::{self, command::Variant},
temporal::api::{
command::v1::{
command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes,
StartTimerCommandAttributes,
},
enums::v1::CommandType,
history::v1::{
HistoryEvent, WorkflowExecutionCanceledEventAttributes,
WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes,
},
},
},
};
use prost::alloc::fmt::Formatter;
Expand All @@ -57,16 +62,14 @@ use std::{
convert::{TryFrom, TryInto},
fmt::Debug,
rc::Rc,
sync::{atomic::AtomicBool, Arc},
};
use tracing::Level;

// TODO: May need to be our SDKWFCommand type
pub(crate) type MachineCommand = Command;
pub(crate) type ProtoCommand = Command;

/// Implementors of this trait represent something that can (eventually) call into a workflow to
/// drive it, start it, signal it, cancel it, etc.
pub(crate) trait DrivenWorkflow: Send {
pub(crate) trait DrivenWorkflow: ActivationListener + Send {
/// Start the workflow
fn start(
&mut self,
Expand All @@ -90,6 +93,12 @@ pub(crate) trait DrivenWorkflow: Send {
) -> Result<(), anyhow::Error>;
}

/// Allows observers to listen to newly generated outgoing activations. Used for testing, where
/// some activations must be handled before outgoing commands are issued to avoid deadlocking.
pub(crate) trait ActivationListener {
fn on_activation(&mut self, _activation: &wf_activation::Attributes) {}
}

/// The struct for [WFCommand::AddCommand]
#[derive(Debug, derive_more::From)]
pub(crate) struct AddCommand {
Expand All @@ -103,7 +112,7 @@ pub(crate) struct AddCommand {
pub enum WFCommand {
/// Returned when we need to wait for the lang sdk to send us something
NoCommandsFromLang,
AddTimer(StartTimerCommandAttributes, Arc<AtomicBool>),
AddTimer(StartTimerCommandAttributes),
CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes),
}

Expand All @@ -122,9 +131,7 @@ impl TryFrom<coresdk::Command> for WFCommand {
attributes: Some(attrs),
..
}) => match attrs {
Attributes::StartTimerCommandAttributes(s) => {
Ok(WFCommand::AddTimer(s, Arc::new(AtomicBool::new(false))))
}
Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)),
Attributes::CompleteWorkflowExecutionCommandAttributes(c) => {
Ok(WFCommand::CompleteWorkflow(c))
}
Expand All @@ -144,16 +151,13 @@ trait TemporalStateMachine: CheckStateMachineInFinal {
fn name(&self) -> &str;
fn handle_command(&mut self, command_type: CommandType) -> Result<(), WFMachinesError>;

/// Tell the state machine to handle some event. The [WorkflowMachines] instance calling this
/// function passes a reference to itself in so that the [WFMachinesAdapter] trait can use
/// the machines' specific type information while also manipulating [WorkflowMachines]
/// Tell the state machine to handle some event. Returns a list of triggers that can be used
/// to update the overall state of the workflow. EX: To issue outgoing WF activations.
fn handle_event(
&mut self,
event: &HistoryEvent,
has_next_event: bool,
// TODO: Don't pass this all in, rather use responses.
wf_machines: &mut WorkflowMachines,
) -> Result<(), WFMachinesError>;
) -> Result<Vec<WorkflowTrigger>, WFMachinesError>;
}

impl<SM> TemporalStateMachine for SM
Expand Down Expand Up @@ -192,8 +196,7 @@ where
&mut self,
event: &HistoryEvent,
has_next_event: bool,
wf_machines: &mut WorkflowMachines,
) -> Result<(), WFMachinesError> {
) -> Result<Vec<WorkflowTrigger>, WFMachinesError> {
event!(
Level::DEBUG,
msg = "handling event",
Expand All @@ -204,10 +207,11 @@ where
match self.on_event_mut(converted_event) {
Ok(c) => {
event!(Level::DEBUG, msg = "Machine produced commands", ?c);
let mut triggers = vec![];
for cmd in c {
self.adapt_response(wf_machines, event, has_next_event, cmd)?;
triggers.extend(self.adapt_response(event, has_next_event, cmd)?);
}
Ok(())
Ok(triggers)
}
Err(MachineError::InvalidTransition) => {
Err(WFMachinesError::UnexpectedEvent(event.clone()))
Expand Down Expand Up @@ -238,17 +242,16 @@ where

/// This trait exists to bridge [StateMachine]s and the [WorkflowMachines] instance. It has access
/// to the machine's concrete types while hiding those details from [WorkflowMachines]
pub(super) trait WFMachinesAdapter: StateMachine {
/// Given a reference to [WorkflowMachines], the event being processed, and a command that this
/// [StateMachine] instance just produced, perform any handling that needs access to the
/// [WorkflowMachines] instance in response to that command.
trait WFMachinesAdapter: StateMachine {
/// Given a the event being processed, and a command that this [StateMachine] instance just
/// produced, perform any handling that needs inform the [WorkflowMachines] instance of some
/// action to be taken in response to that command.
fn adapt_response(
&self,
_wf_machines: &mut WorkflowMachines,
_event: &HistoryEvent,
_has_next_event: bool,
_my_command: Self::Command,
) -> Result<(), WFMachinesError>;
event: &HistoryEvent,
has_next_event: bool,
my_command: Self::Command,
) -> Result<Vec<WorkflowTrigger>, WFMachinesError>;
}

/// A command which can be cancelled, associated with some state machine that produced it
Expand All @@ -260,7 +263,7 @@ enum CancellableCommand {
Cancelled,
Active {
/// The inner protobuf command, if None, command has been cancelled
command: MachineCommand,
command: ProtoCommand,
machine: Rc<RefCell<dyn TemporalStateMachine>>,
},
}
Expand Down
6 changes: 3 additions & 3 deletions src/machines/test_help/history_builder.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::Result;
use crate::{
machines::{workflow_machines::WorkflowMachines, MachineCommand},
machines::{workflow_machines::WorkflowMachines, ProtoCommand},
protos::temporal::api::{
enums::v1::EventType,
history::v1::{
Expand Down Expand Up @@ -35,7 +35,7 @@ impl TestHistoryBuilder {
pub fn add_by_type(&mut self, event_type: EventType) {
let attribs =
default_attribs(event_type).expect("Couldn't make default attributes in test builder");
self.build_and_push_event(event_type.clone(), attribs);
self.build_and_push_event(event_type, attribs);
}

/// Adds an event, returning the ID that was assigned to it
Expand Down Expand Up @@ -133,7 +133,7 @@ impl TestHistoryBuilder {
&self,
wf_machines: &mut WorkflowMachines,
to_wf_task_num: Option<usize>,
) -> Result<Vec<MachineCommand>> {
) -> Result<Vec<ProtoCommand>> {
self.handle_workflow_task(wf_machines, to_wf_task_num)?;
Ok(wf_machines.get_commands())
}
Expand Down
77 changes: 41 additions & 36 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
@@ -1,45 +1,42 @@
use super::Result;
use crate::{
machines::{DrivenWorkflow, WFCommand},
protos::temporal::api::{
command::v1::StartTimerCommandAttributes,
history::v1::{
WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes,
WorkflowExecutionStartedEventAttributes,
machines::{ActivationListener, DrivenWorkflow, WFCommand},
protos::{
coresdk::{wf_activation::Attributes, UnblockTimerTaskAttributes},
temporal::api::{
command::v1::StartTimerCommandAttributes,
history::v1::{
WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes,
WorkflowExecutionStartedEventAttributes,
},
},
},
};
use dashmap::DashMap;
use futures::Future;
use std::{
collections::{hash_map, HashMap},
sync::{
atomic::{AtomicBool, Ordering},
mpsc::{self, Receiver, Sender},
Arc, RwLock,
},
use std::sync::{
mpsc::{self, Receiver, Sender},
Arc,
};
use tracing::Level;

type TimerMap = HashMap<String, Arc<AtomicBool>>;

/// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control
/// over when commands are returned than a normal workflow would.
///
/// It replaces "TestEnitityTestListenerBase" in java which is pretty hard to follow.
#[derive(Debug)]
pub(in crate::machines) struct TestWorkflowDriver<F> {
wf_function: F,
cache: Arc<TestWfDriverCache>,
}

/// Holds status for timers so we don't recreate them by accident. Key is timer id, value is
/// true if it is complete.
///
/// I don't love that this is pretty duplicative of workflow machines, nor the nasty sync
/// involved. Would be good to eliminate.
#[derive(Default, Debug)]
pub(in crate::machines) struct TestWfDriverCache {
/// Holds a mapping of timer id -> is finished
///
/// It can and should be eliminated by not recreating CommandSender on every iteration, which
/// means keeping the workflow suspended in a future somewhere. I tried this and it was hard,
/// but ultimately it's how real workflows will need to work.
timers: Arc<RwLock<TimerMap>>,
unblocked_timers: DashMap<String, bool>,
}

impl<F, Fut> TestWorkflowDriver<F>
Expand All @@ -55,7 +52,18 @@ where
pub(in crate::machines) fn new(workflow_fn: F) -> Self {
Self {
wf_function: workflow_fn,
timers: Arc::new(RwLock::new(HashMap::default())),
cache: Default::default(),
}
}
}

impl<F> ActivationListener for TestWorkflowDriver<F> {
fn on_activation(&mut self, activation: &Attributes) {
if let Attributes::UnblockTimer(UnblockTimerTaskAttributes { timer_id }) = activation {
Arc::get_mut(&mut self.cache)
.unwrap()
.unblocked_timers
.insert(timer_id.clone(), true);
}
}
}
Expand All @@ -75,7 +83,7 @@ where

#[instrument(skip(self))]
fn iterate_wf(&mut self) -> Result<Vec<WFCommand>, anyhow::Error> {
let (sender, receiver) = CommandSender::new(self.timers.clone());
let (sender, receiver) = CommandSender::new(self.cache.clone());
// Call the closure that produces the workflow future
let wf_future = (self.wf_function)(sender);

Expand Down Expand Up @@ -131,26 +139,23 @@ pub enum TestWFCommand {

pub struct CommandSender {
chan: Sender<TestWFCommand>,
timer_map: Arc<RwLock<TimerMap>>,
twd_cache: Arc<TestWfDriverCache>,
}

impl<'a> CommandSender {
fn new(timer_map: Arc<RwLock<TimerMap>>) -> (Self, Receiver<TestWFCommand>) {
impl CommandSender {
fn new(twd_cache: Arc<TestWfDriverCache>) -> (Self, Receiver<TestWFCommand>) {
let (chan, rx) = mpsc::channel();
(Self { chan, timer_map }, rx)
(Self { chan, twd_cache }, rx)
}

/// Request to create a timer, returning future which resolves when the timer completes
pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future + '_ {
let finished = match self.timer_map.write().unwrap().entry(a.timer_id.clone()) {
hash_map::Entry::Occupied(existing) => existing.get().load(Ordering::SeqCst),
hash_map::Entry::Vacant(v) => {
let atomic = Arc::new(AtomicBool::new(false));

let c = WFCommand::AddTimer(a, atomic.clone());
// In theory we should send this in both branches
let finished = match self.twd_cache.unblocked_timers.entry(a.timer_id.clone()) {
dashmap::mapref::entry::Entry::Occupied(existing) => *existing.get(),
dashmap::mapref::entry::Entry::Vacant(v) => {
let c = WFCommand::AddTimer(a);
self.chan.send(c.into()).unwrap();
v.insert(atomic);
v.insert(false);
false
}
};
Expand Down
Loading