diff --git a/.gitignore b/.gitignore index 07b4d6d81..6457cabee 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ Cargo.lock src/protos/*.rs !src/protos/mod.rs /tarpaulin-report.html +/.cargo/ diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 02b6cf888..3e7e28fe1 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -38,17 +38,14 @@ use crate::{ core_tracing::VecDisplayer, machines::workflow_machines::MachineResponse, protos::{ - coresdk::{self, command::Variant, wf_activation_job}, + coresdk::{self, command::Variant}, temporal::api::{ command::v1::{ command::Attributes, CancelTimerCommandAttributes, Command, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, enums::v1::CommandType, - history::v1::{ - HistoryEvent, WorkflowExecutionCanceledEventAttributes, - WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, - }, + history::v1::HistoryEvent, }, }, }; @@ -61,35 +58,6 @@ use std::{ pub(crate) type ProtoCommand = Command; -/// Implementors of this trait represent something that can (eventually) call into a workflow to -/// drive it, start it, signal it, cancel it, etc. -pub(crate) trait DrivenWorkflow: ActivationListener + Send { - /// Start the workflow - fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes); - - /// Obtain any output from the workflow's recent execution(s). Because the lang sdk is - /// responsible for calling workflow code as a result of receiving tasks from - /// [crate::Core::poll_task], we cannot directly iterate it here. Thus implementations of this - /// trait are expected to either buffer output or otherwise produce it on demand when this - /// function is called. - /// - /// In the case of the real [WorkflowBridge] implementation, commands are simply pulled from - /// a buffer that the language side sinks into when it calls [crate::Core::complete_task] - fn fetch_workflow_iteration_output(&mut self) -> Vec; - - /// Signal the workflow - fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes); - - /// Cancel the workflow - fn cancel(&mut self, attribs: WorkflowExecutionCanceledEventAttributes); -} - -/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where -/// some activations must be handled before outgoing commands are issued to avoid deadlocking. -pub(crate) trait ActivationListener { - fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {} -} - /// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. /// EX: Create a new timer, complete the workflow, etc. #[derive(Debug, derive_more::From)] diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index b29510ab5..ce6760a00 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,15 +1,10 @@ use crate::{ - machines::{ActivationListener, DrivenWorkflow, WFCommand}, + machines::WFCommand, protos::{ coresdk::{wf_activation_job, FireTimer}, - temporal::api::{ - command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, - history::v1::{ - WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, - WorkflowExecutionStartedEventAttributes, - }, - }, + temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, }, + workflow::{ActivationListener, WorkflowFetcher}, }; use dashmap::DashMap; use futures::Future; @@ -74,15 +69,11 @@ impl ActivationListener for TestWorkflowDriver { } } -impl DrivenWorkflow for TestWorkflowDriver +impl WorkflowFetcher for TestWorkflowDriver where F: Fn(CommandSender) -> Fut + Send + Sync, Fut: Future, { - fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) { - debug!("Test WF driver start called"); - } - fn fetch_workflow_iteration_output(&mut self) -> Vec { // If we have already sent the command to complete the workflow, we don't want // to re-run the workflow again. @@ -124,10 +115,6 @@ where emit_these } - - fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} - - fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {} } #[derive(Debug, derive_more::From)] diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 8450f469c..ff7d49e50 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -297,8 +297,11 @@ mod test { }); let t = canned_histories::single_timer("timer1"); - let state_machines = - WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + let state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); (t, state_machines) @@ -355,8 +358,11 @@ mod test { }); let t = canned_histories::single_timer("badid"); - let mut state_machines = - WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + let mut state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); assert!(t .handle_workflow_task_take_cmds(&mut state_machines, None) @@ -392,8 +398,11 @@ mod test { }); let t = canned_histories::cancel_timer("wait_timer", "cancel_timer"); - let state_machines = - WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + let state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); (t, state_machines) } @@ -461,8 +470,11 @@ mod test { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let mut state_machines = - WorkflowMachines::new("wfid".to_string(), "runid".to_string(), Box::new(twd)); + let mut state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); let commands = t .handle_workflow_task_take_cmds(&mut state_machines, None) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 3182799c4..c40bfa1cd 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,9 +1,10 @@ +use crate::workflow::{DrivenWorkflow, WorkflowFetcher}; use crate::{ core_tracing::VecDisplayer, machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, - workflow_task_state_machine::WorkflowTaskMachine, DrivenWorkflow, NewMachineWithCommand, - ProtoCommand, TemporalStateMachine, WFCommand, + workflow_task_state_machine::WorkflowTaskMachine, NewMachineWithCommand, ProtoCommand, + TemporalStateMachine, WFCommand, }, protos::{ coresdk::{wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation}, @@ -64,11 +65,9 @@ pub(crate) struct WorkflowMachines { /// Old note: It is a queue as commands can be added (due to marker based commands) while /// iterating over already added commands. current_wf_task_commands: VecDeque, - /// Outgoing activation jobs that need to be sent to the lang sdk - outgoing_wf_activation_jobs: VecDeque, /// The workflow that is being driven by this instance of the machines - drive_me: Box, + drive_me: DrivenWorkflow, } slotmap::new_key_type! { struct MachineKey; } @@ -128,11 +127,7 @@ pub enum WFMachinesError { } impl WorkflowMachines { - pub(crate) fn new( - workflow_id: String, - run_id: String, - driven_wf: Box, - ) -> Self { + pub(crate) fn new(workflow_id: String, run_id: String, driven_wf: DrivenWorkflow) -> Self { Self { workflow_id, run_id, @@ -148,7 +143,6 @@ impl WorkflowMachines { timer_id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), - outgoing_wf_activation_jobs: Default::default(), } } @@ -319,7 +313,7 @@ impl WorkflowMachines { { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow - self.outgoing_wf_activation_jobs.push_back( + self.drive_me.send_job( StartWorkflow { workflow_type: attrs .workflow_type @@ -385,14 +379,10 @@ impl WorkflowMachines { /// timer, etc. This does *not* cause any advancement of the state machines, it merely drains /// from the outgoing queue of activation jobs. pub(crate) fn get_wf_activation(&mut self) -> Option { - if self.outgoing_wf_activation_jobs.is_empty() { + let jobs = self.drive_me.drain_jobs(); + if jobs.is_empty() { None } else { - let jobs = self - .outgoing_wf_activation_jobs - .drain(..) - .map(Into::into) - .collect(); Some(WfActivation { timestamp: self.current_wf_time.map(Into::into), run_id: self.run_id.clone(), @@ -490,8 +480,7 @@ impl WorkflowMachines { for response in machine_responses { match response { MachineResponse::PushWFJob(a) => { - self.drive_me.on_activation_job(&a); - self.outgoing_wf_activation_jobs.push_back(a); + self.drive_me.send_job(a); } MachineResponse::TriggerWFTaskStarted { task_started_event_id, @@ -502,11 +491,12 @@ impl WorkflowMachines { MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { // TODO: Should this also update self.run_id? Should we track orig/current // separately? - self.outgoing_wf_activation_jobs.push_back( - wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed { - randomness_seed: str_to_randomness_seed(&new_run_id), - }), - ); + self.drive_me + .send_job(wf_activation_job::Variant::UpdateRandomSeed( + UpdateRandomSeed { + randomness_seed: str_to_randomness_seed(&new_run_id), + }, + )); } MachineResponse::NoOp => (), MachineResponse::IssueNewCommand(_) => { diff --git a/src/workflow/bridge.rs b/src/workflow/bridge.rs index c81470186..58fdcba81 100644 --- a/src/workflow/bridge.rs +++ b/src/workflow/bridge.rs @@ -1,8 +1,6 @@ use crate::{ - machines::{ActivationListener, DrivenWorkflow, WFCommand}, - protos::temporal::api::history::v1::WorkflowExecutionCanceledEventAttributes, - protos::temporal::api::history::v1::WorkflowExecutionSignaledEventAttributes, - protos::temporal::api::history::v1::WorkflowExecutionStartedEventAttributes, + machines::WFCommand, + workflow::{ActivationListener, WorkflowFetcher}, }; use std::sync::mpsc::{self, Receiver, Sender}; @@ -11,8 +9,6 @@ use std::sync::mpsc::{self, Receiver, Sender}; /// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService] #[derive(Debug)] pub(crate) struct WorkflowBridge { - // does wf id belong in here? - started_attrs: Option, incoming_commands: Receiver>, } @@ -22,7 +18,6 @@ impl WorkflowBridge { let (tx, rx) = mpsc::channel(); ( Self { - started_attrs: None, incoming_commands: rx, }, tx, @@ -30,12 +25,7 @@ impl WorkflowBridge { } } -impl DrivenWorkflow for WorkflowBridge { - fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) { - debug!(attribs = ?attribs, "wf bridge start"); - self.started_attrs = Some(attribs); - } - +impl WorkflowFetcher for WorkflowBridge { fn fetch_workflow_iteration_output(&mut self) -> Vec { let in_cmds = self.incoming_commands.try_recv(); @@ -43,14 +33,6 @@ impl DrivenWorkflow for WorkflowBridge { debug!(in_cmds = ?in_cmds, "wf bridge iteration fetch"); in_cmds } - - fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) { - unimplemented!() - } - - fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) { - unimplemented!() - } } // Real bridge doesn't actually need to listen diff --git a/src/workflow/driven_workflow.rs b/src/workflow/driven_workflow.rs new file mode 100644 index 000000000..41a77182f --- /dev/null +++ b/src/workflow/driven_workflow.rs @@ -0,0 +1,95 @@ +use crate::{ + machines::WFCommand, + protos::coresdk::wf_activation_job, + protos::coresdk::WfActivationJob, + protos::temporal::api::history::v1::{ + WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, + WorkflowExecutionStartedEventAttributes, + }, +}; +use std::collections::VecDeque; + +/// Abstracts away the concept of an actual workflow implementation, handling sending it new +/// jobs and fetching output from it. +pub struct DrivenWorkflow { + started_attrs: Option, + fetcher: Box, + /// Outgoing activation jobs that need to be sent to the lang sdk + outgoing_wf_activation_jobs: VecDeque, +} + +impl From> for DrivenWorkflow +where + WF: ExternalWorkflow + 'static, +{ + fn from(wf: Box) -> Self { + Self { + started_attrs: None, + fetcher: wf, + outgoing_wf_activation_jobs: Default::default(), + } + } +} + +impl DrivenWorkflow { + /// Start the workflow + pub fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) { + debug!(run_id = %attribs.original_execution_run_id, "Driven WF start"); + self.started_attrs = Some(attribs) + } + + /// Enqueue a new job to be sent to the driven workflow + pub fn send_job(&mut self, job: wf_activation_job::Variant) { + self.fetcher.on_activation_job(&job); + self.outgoing_wf_activation_jobs.push_back(job); + } + + /// Drain all pending jobs, so that they may be sent to the driven workflow + pub fn drain_jobs(&mut self) -> Vec { + self.outgoing_wf_activation_jobs + .drain(..) + .map(Into::into) + .collect() + } + + /// Signal the workflow + pub fn _signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} + + /// Cancel the workflow + pub fn _cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {} +} + +impl WorkflowFetcher for DrivenWorkflow { + fn fetch_workflow_iteration_output(&mut self) -> Vec { + self.fetcher.fetch_workflow_iteration_output() + } +} + +impl ActivationListener for DrivenWorkflow { + fn on_activation_job(&mut self, a: &wf_activation_job::Variant) { + self.fetcher.on_activation_job(a) + } +} + +pub trait ExternalWorkflow: WorkflowFetcher + ActivationListener {} +impl ExternalWorkflow for T where T: WorkflowFetcher + ActivationListener {} + +/// Implementors of this trait represent a way to fetch output from executing/iterating some +/// workflow code (or a mocked workflow). +pub trait WorkflowFetcher: Send { + /// Obtain any output from the workflow's recent execution(s). Because the lang sdk is + /// responsible for calling workflow code as a result of receiving tasks from + /// [crate::Core::poll_task], we cannot directly iterate it here. Thus implementations of this + /// trait are expected to either buffer output or otherwise produce it on demand when this + /// function is called. + /// + /// In the case of the real [WorkflowBridge] implementation, commands are simply pulled from + /// a buffer that the language side sinks into when it calls [crate::Core::complete_task] + fn fetch_workflow_iteration_output(&mut self) -> Vec; +} + +/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where +/// some activations must be handled before outgoing commands are issued to avoid deadlocking. +pub trait ActivationListener { + fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {} +} diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 45ce89737..aa7eb491e 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -1,8 +1,10 @@ mod bridge; mod concurrency_manager; +mod driven_workflow; pub(crate) use bridge::WorkflowBridge; pub(crate) use concurrency_manager::WorkflowConcurrencyManager; +pub(crate) use driven_workflow::{ActivationListener, DrivenWorkflow, WorkflowFetcher}; use crate::{ machines::{ProtoCommand, WFCommand, WorkflowMachines}, @@ -88,7 +90,7 @@ impl WorkflowManager { }; let (wfb, cmd_sink) = WorkflowBridge::new(); - let state_machines = WorkflowMachines::new(we.workflow_id, we.run_id, Box::new(wfb)); + let state_machines = WorkflowMachines::new(we.workflow_id, we.run_id, Box::new(wfb).into()); Ok(Self { machines: state_machines, command_sink: cmd_sink,