From 5b5bed9e4bdd77f8298261a743f0cc1904442973 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 14:15:39 -0800 Subject: [PATCH 1/2] Allow wf activations to do multiple "jobs" We are seriously running out of words for "do a thing" --- build.rs | 7 ++--- protos/local/core_interface.proto | 15 ++++++--- src/lib.rs | 26 ++++++---------- src/machines/mod.rs | 7 ++--- src/machines/test_help/workflow_driver.rs | 6 ++-- src/machines/timer_state_machine.rs | 2 +- src/machines/workflow_machines.rs | 38 ++++++++++++++--------- src/protos/mod.rs | 10 ++++++ 8 files changed, 62 insertions(+), 49 deletions(-) diff --git a/build.rs b/build.rs index cb6500556..11f4dc705 100644 --- a/build.rs +++ b/build.rs @@ -15,12 +15,9 @@ fn main() -> Result<(), Box> { "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") + .type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.WFActivation.attributes", - "#[derive(::derive_more::From)]", - ) - .type_attribute( - "coresdk.WorkflowTask.attributes", + "coresdk.WFActivationJob.attributes", "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index ab31048d0..82af4ecdb 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -38,19 +38,24 @@ message Task { } // An instruction to the lang sdk to run some workflow code, whether for the first time or from -// a cached state +// a cached state. message WFActivation { - // Time the activation was requested + // Time the activation(s) were requested google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; // The id of the currently active run of the workflow string run_id = 2; + // The things to do upon activating the workflow + repeated WFActivationJob jobs = 3; +} + +message WFActivationJob { oneof attributes { // TODO could literally be attributes from events? -- maybe we don't need our own types // Begin a workflow for the first time - StartWorkflowTaskAttributes start_workflow = 3; - // A timer has fired, allowing whatever was waiting on it - TimerFiredTaskAttributes unblock_timer = 4; + StartWorkflowTaskAttributes start_workflow = 1; + // A timer has fired, allowing whatever was waiting on it (if anything) to proceed + TimerFiredTaskAttributes timer_fired = 2; } } diff --git a/src/lib.rs b/src/lib.rs index ce918d93b..7a760a37d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -332,7 +332,7 @@ mod test { use crate::{ machines::test_help::TestHistoryBuilder, protos::{ - coresdk::{task, wf_activation, WfActivation}, + coresdk::{wf_activation_job, WfActivationJob}, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, @@ -417,14 +417,10 @@ mod test { let res = dbg!(core.poll_task().unwrap()); // TODO: uggo assert_matches!( - res, - Task { - variant: Some(task::Variant::Workflow(WfActivation { - attributes: Some(wf_activation::Attributes::StartWorkflow(_)), - .. - })), - .. - } + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] ); assert!(core.workflow_machines.get(run_id).is_some()); @@ -443,14 +439,10 @@ mod test { let res = dbg!(core.poll_task().unwrap()); // TODO: uggo assert_matches!( - res, - Task { - variant: Some(task::Variant::Workflow(WfActivation { - attributes: Some(wf_activation::Attributes::UnblockTimer(_)), - .. - })), - .. - } + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + }] ); let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 48d2ef3e7..0f62f33cc 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -39,9 +39,8 @@ pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; use crate::{ machines::workflow_machines::WorkflowTrigger, - protos::coresdk::wf_activation, protos::{ - coresdk::{self, command::Variant}, + coresdk::{self, command::Variant, wf_activation_job}, temporal::api::{ command::v1::{ command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes, @@ -93,10 +92,10 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send { ) -> Result<(), anyhow::Error>; } -/// Allows observers to listen to newly generated outgoing activations. Used for testing, where +/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where /// some activations must be handled before outgoing commands are issued to avoid deadlocking. pub(crate) trait ActivationListener { - fn on_activation(&mut self, _activation: &wf_activation::Attributes) {} + fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {} } /// The struct for [WFCommand::AddCommand] diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 4cfcd45b9..92aa306e1 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -2,7 +2,7 @@ use super::Result; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::{wf_activation::Attributes, TimerFiredTaskAttributes}, + coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes}, temporal::api::{ command::v1::StartTimerCommandAttributes, history::v1::{ @@ -58,8 +58,8 @@ where } impl ActivationListener for TestWorkflowDriver { - fn on_activation(&mut self, activation: &Attributes) { - if let Attributes::UnblockTimer(TimerFiredTaskAttributes { timer_id }) = activation { + fn on_activation_job(&mut self, activation: &Attributes) { + if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation { Arc::get_mut(&mut self.cache) .unwrap() .unblocked_timers diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 6d60687fa..b217efdb3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -6,7 +6,7 @@ use crate::{ AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter, }, protos::{ - coresdk::{wf_activation, HistoryEventId, TimerFiredTaskAttributes, WfActivation}, + coresdk::{HistoryEventId, TimerFiredTaskAttributes, WfActivation}, temporal::api::{ command::v1::{ command::Attributes, CancelTimerCommandAttributes, Command, diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 9f3d4cad5..814ec5372 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,4 +1,5 @@ use crate::machines::ActivationListener; +use crate::protos::coresdk::WfActivationJob; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -6,7 +7,7 @@ use crate::{ ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}, + coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation}, temporal::api::{ command::v1::StartTimerCommandAttributes, common::v1::WorkflowExecution, @@ -62,8 +63,8 @@ pub(crate) struct WorkflowMachines { /// Old note: It is a queue as commands can be added (due to marker based commands) while /// iterating over already added commands. current_wf_task_commands: VecDeque, - /// Outgoing activations that need to be sent to the lang sdk - outgoing_wf_activations: VecDeque, + /// Outgoing activation jobs that need to be sent to the lang sdk + outgoing_wf_activation_jobs: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, @@ -73,7 +74,7 @@ pub(crate) struct WorkflowMachines { #[derive(Debug, derive_more::From)] #[must_use] pub(super) enum WorkflowTrigger { - PushWFActivation(#[from(forward)] wf_activation::Attributes), + PushWFJob(#[from(forward)] wf_activation_job::Attributes), TriggerWFTaskStarted { task_started_event_id: i64, time: SystemTime, @@ -114,7 +115,7 @@ impl WorkflowMachines { machines_by_id: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), - outgoing_wf_activations: Default::default(), + outgoing_wf_activation_jobs: Default::default(), } } @@ -287,7 +288,7 @@ impl WorkflowMachines { { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow - self.outgoing_wf_activations.push_back( + self.outgoing_wf_activation_jobs.push_back( StartWorkflowTaskAttributes { // TODO: This needs to be set during init workflow_type: "".to_string(), @@ -344,14 +345,23 @@ impl WorkflowMachines { /// Returns the next activation that needs to be performed by the lang sdk. Things like unblock /// timer, etc. pub(crate) fn get_wf_activation(&mut self) -> Option { - self.outgoing_wf_activations - .pop_front() - .map(|attrs| WfActivation { - // todo wat ? + if self.outgoing_wf_activation_jobs.is_empty() { + None + } else { + let jobs = self + .outgoing_wf_activation_jobs + .drain(..) + .map(|x| WfActivationJob { + attributes: Some(x.into()), + }) + .collect(); + Some(WfActivation { + // todo How should this timestamp be set? timestamp: None, run_id: self.run_id.clone(), - attributes: attrs.into(), + jobs, }) + } } /// Given an event id (possibly zero) of the last successfully executed workflow task and an @@ -394,9 +404,9 @@ impl WorkflowMachines { event!(Level::DEBUG, msg = "Machine produced triggers", ?triggers); for trigger in triggers { match trigger { - WorkflowTrigger::PushWFActivation(a) => { - self.drive_me.on_activation(&a); - self.outgoing_wf_activations.push_back(a); + WorkflowTrigger::PushWFJob(a) => { + self.drive_me.on_activation_job(&a); + self.outgoing_wf_activation_jobs.push_back(a); } WorkflowTrigger::TriggerWFTaskStarted { task_started_event_id, diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 1592e9bf9..f8e665a36 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -23,6 +23,16 @@ pub mod coresdk { variant: Some(t.into()), } } + + /// Returns any contained jobs if this task was a wf activation and it had some + #[cfg(test)] + pub fn get_wf_jobs(&self) -> Vec { + if let Some(task::Variant::Workflow(a)) = &self.variant { + a.jobs.clone() + } else { + vec![] + } + } } impl From> for WfActivationSuccess { From 95782f61e989b51484a6812960b49a31fee394ae Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 14:28:30 -0800 Subject: [PATCH 2/2] Nicer conversion, set timestamp appropriately --- src/machines/workflow_machines.rs | 7 ++----- src/protos/mod.rs | 9 +++++++++ 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 814ec5372..1315e0df1 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -351,13 +351,10 @@ impl WorkflowMachines { let jobs = self .outgoing_wf_activation_jobs .drain(..) - .map(|x| WfActivationJob { - attributes: Some(x.into()), - }) + .map(Into::into) .collect(); Some(WfActivation { - // todo How should this timestamp be set? - timestamp: None, + timestamp: self.current_wf_time.map(Into::into), run_id: self.run_id.clone(), jobs, }) diff --git a/src/protos/mod.rs b/src/protos/mod.rs index f8e665a36..47232bd12 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -12,6 +12,7 @@ pub mod coresdk { use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; use crate::protos::coresdk::complete_task_req::Completion; + use crate::protos::coresdk::wf_activation_job::Attributes; use command::Variant; pub type HistoryEventId = i64; @@ -35,6 +36,14 @@ pub mod coresdk { } } + impl From for WfActivationJob { + fn from(a: Attributes) -> Self { + WfActivationJob { + attributes: Some(a), + } + } + } + impl From> for WfActivationSuccess { fn from(v: Vec) -> Self { WfActivationSuccess {