Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ Cargo.lock
src/protos/*.rs
!src/protos/mod.rs
/tarpaulin-report.html
/.cargo/
36 changes: 2 additions & 34 deletions src/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,14 @@ use crate::{
core_tracing::VecDisplayer,
machines::workflow_machines::MachineResponse,
protos::{
coresdk::{self, command::Variant, wf_activation_job},
coresdk::{self, command::Variant},
temporal::api::{
command::v1::{
command::Attributes, CancelTimerCommandAttributes, Command,
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
},
enums::v1::CommandType,
history::v1::{
HistoryEvent, WorkflowExecutionCanceledEventAttributes,
WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes,
},
history::v1::HistoryEvent,
},
},
};
Expand All @@ -61,35 +58,6 @@ use std::{

pub(crate) type ProtoCommand = Command;

/// Implementors of this trait represent something that can (eventually) call into a workflow to
/// drive it, start it, signal it, cancel it, etc.
pub(crate) trait DrivenWorkflow: ActivationListener + Send {
/// Start the workflow
fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes);

/// Obtain any output from the workflow's recent execution(s). Because the lang sdk is
/// responsible for calling workflow code as a result of receiving tasks from
/// [crate::Core::poll_task], we cannot directly iterate it here. Thus implementations of this
/// trait are expected to either buffer output or otherwise produce it on demand when this
/// function is called.
///
/// In the case of the real [WorkflowBridge] implementation, commands are simply pulled from
/// a buffer that the language side sinks into when it calls [crate::Core::complete_task]
fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand>;

/// Signal the workflow
fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes);

/// Cancel the workflow
fn cancel(&mut self, attribs: WorkflowExecutionCanceledEventAttributes);
}

/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where
/// some activations must be handled before outgoing commands are issued to avoid deadlocking.
pub(crate) trait ActivationListener {
fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {}
}

/// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next.
/// EX: Create a new timer, complete the workflow, etc.
#[derive(Debug, derive_more::From)]
Expand Down
21 changes: 4 additions & 17 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use crate::{
machines::{ActivationListener, DrivenWorkflow, WFCommand},
machines::WFCommand,
protos::{
coresdk::{wf_activation_job, FireTimer},
temporal::api::{
command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes},
history::v1::{
WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes,
WorkflowExecutionStartedEventAttributes,
},
},
temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes},
},
workflow::{ActivationListener, WorkflowFetcher},
};
use dashmap::DashMap;
use futures::Future;
Expand Down Expand Up @@ -74,15 +69,11 @@ impl<F> ActivationListener for TestWorkflowDriver<F> {
}
}

impl<F, Fut> DrivenWorkflow for TestWorkflowDriver<F>
impl<F, Fut> WorkflowFetcher for TestWorkflowDriver<F>
where
F: Fn(CommandSender) -> Fut + Send + Sync,
Fut: Future<Output = ()>,
{
fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) {
debug!("Test WF driver start called");
}

fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
// If we have already sent the command to complete the workflow, we don't want
// to re-run the workflow again.
Expand Down Expand Up @@ -124,10 +115,6 @@ where

emit_these
}

fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {}

fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {}
}

#[derive(Debug, derive_more::From)]
Expand Down
28 changes: 20 additions & 8 deletions src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,11 @@ mod test {
});

let t = canned_histories::single_timer("timer1");
let state_machines =
WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd));
let state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);

assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap());
(t, state_machines)
Expand Down Expand Up @@ -355,8 +358,11 @@ mod test {
});

let t = canned_histories::single_timer("badid");
let mut state_machines =
WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd));
let mut state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);

assert!(t
.handle_workflow_task_take_cmds(&mut state_machines, None)
Expand Down Expand Up @@ -392,8 +398,11 @@ mod test {
});

let t = canned_histories::cancel_timer("wait_timer", "cancel_timer");
let state_machines =
WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd));
let state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);
(t, state_machines)
}

Expand Down Expand Up @@ -461,8 +470,11 @@ mod test {
t.add_full_wf_task();
t.add_workflow_execution_completed();

let mut state_machines =
WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd));
let mut state_machines = WorkflowMachines::new(
"wfid".to_string(),
"runid".to_string(),
Box::new(twd).into(),
);

let commands = t
.handle_workflow_task_take_cmds(&mut state_machines, None)
Expand Down
40 changes: 15 additions & 25 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use crate::workflow::{DrivenWorkflow, WorkflowFetcher};
use crate::{
core_tracing::VecDisplayer,
machines::{
complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer,
workflow_task_state_machine::WorkflowTaskMachine, DrivenWorkflow, NewMachineWithCommand,
ProtoCommand, TemporalStateMachine, WFCommand,
workflow_task_state_machine::WorkflowTaskMachine, NewMachineWithCommand, ProtoCommand,
TemporalStateMachine, WFCommand,
},
protos::{
coresdk::{wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation},
Expand Down Expand Up @@ -64,11 +65,9 @@ pub(crate) struct WorkflowMachines {
/// Old note: It is a queue as commands can be added (due to marker based commands) while
/// iterating over already added commands.
current_wf_task_commands: VecDeque<CommandAndMachine>,
/// Outgoing activation jobs that need to be sent to the lang sdk
outgoing_wf_activation_jobs: VecDeque<wf_activation_job::Variant>,

/// The workflow that is being driven by this instance of the machines
drive_me: Box<dyn DrivenWorkflow + 'static>,
drive_me: DrivenWorkflow,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

}

slotmap::new_key_type! { struct MachineKey; }
Expand Down Expand Up @@ -128,11 +127,7 @@ pub enum WFMachinesError {
}

impl WorkflowMachines {
pub(crate) fn new(
workflow_id: String,
run_id: String,
driven_wf: Box<dyn DrivenWorkflow>,
) -> Self {
pub(crate) fn new(workflow_id: String, run_id: String, driven_wf: DrivenWorkflow) -> Self {
Self {
workflow_id,
run_id,
Expand All @@ -148,7 +143,6 @@ impl WorkflowMachines {
timer_id_to_machine: Default::default(),
commands: Default::default(),
current_wf_task_commands: Default::default(),
outgoing_wf_activation_jobs: Default::default(),
}
}

Expand Down Expand Up @@ -319,7 +313,7 @@ impl WorkflowMachines {
{
self.run_id = attrs.original_execution_run_id.clone();
// We need to notify the lang sdk that it's time to kick off a workflow
self.outgoing_wf_activation_jobs.push_back(
self.drive_me.send_job(
StartWorkflow {
workflow_type: attrs
.workflow_type
Expand Down Expand Up @@ -385,14 +379,10 @@ impl WorkflowMachines {
/// timer, etc. This does *not* cause any advancement of the state machines, it merely drains
/// from the outgoing queue of activation jobs.
pub(crate) fn get_wf_activation(&mut self) -> Option<WfActivation> {
if self.outgoing_wf_activation_jobs.is_empty() {
let jobs = self.drive_me.drain_jobs();
if jobs.is_empty() {
None
} else {
let jobs = self
.outgoing_wf_activation_jobs
.drain(..)
.map(Into::into)
.collect();
Some(WfActivation {
timestamp: self.current_wf_time.map(Into::into),
run_id: self.run_id.clone(),
Expand Down Expand Up @@ -490,8 +480,7 @@ impl WorkflowMachines {
for response in machine_responses {
match response {
MachineResponse::PushWFJob(a) => {
self.drive_me.on_activation_job(&a);
self.outgoing_wf_activation_jobs.push_back(a);
self.drive_me.send_job(a);
}
MachineResponse::TriggerWFTaskStarted {
task_started_event_id,
Expand All @@ -502,11 +491,12 @@ impl WorkflowMachines {
MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => {
// TODO: Should this also update self.run_id? Should we track orig/current
// separately?
self.outgoing_wf_activation_jobs.push_back(
wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed {
randomness_seed: str_to_randomness_seed(&new_run_id),
}),
);
self.drive_me
.send_job(wf_activation_job::Variant::UpdateRandomSeed(
UpdateRandomSeed {
randomness_seed: str_to_randomness_seed(&new_run_id),
},
));
}
MachineResponse::NoOp => (),
MachineResponse::IssueNewCommand(_) => {
Expand Down
24 changes: 3 additions & 21 deletions src/workflow/bridge.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use crate::{
machines::{ActivationListener, DrivenWorkflow, WFCommand},
protos::temporal::api::history::v1::WorkflowExecutionCanceledEventAttributes,
protos::temporal::api::history::v1::WorkflowExecutionSignaledEventAttributes,
protos::temporal::api::history::v1::WorkflowExecutionStartedEventAttributes,
machines::WFCommand,
workflow::{ActivationListener, WorkflowFetcher},
};
use std::sync::mpsc::{self, Receiver, Sender};

Expand All @@ -11,8 +9,6 @@ use std::sync::mpsc::{self, Receiver, Sender};
/// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService]
#[derive(Debug)]
pub(crate) struct WorkflowBridge {
// does wf id belong in here?
started_attrs: Option<WorkflowExecutionStartedEventAttributes>,
incoming_commands: Receiver<Vec<WFCommand>>,
}

Expand All @@ -22,35 +18,21 @@ impl WorkflowBridge {
let (tx, rx) = mpsc::channel();
(
Self {
started_attrs: None,
incoming_commands: rx,
},
tx,
)
}
}

impl DrivenWorkflow for WorkflowBridge {
fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) {
debug!(attribs = ?attribs, "wf bridge start");
self.started_attrs = Some(attribs);
}

impl WorkflowFetcher for WorkflowBridge {
fn fetch_workflow_iteration_output(&mut self) -> Vec<WFCommand> {
let in_cmds = self.incoming_commands.try_recv();

let in_cmds = in_cmds.unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]);
debug!(in_cmds = ?in_cmds, "wf bridge iteration fetch");
in_cmds
}

fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {
unimplemented!()
}

fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {
unimplemented!()
}
}

// Real bridge doesn't actually need to listen
Expand Down
Loading