From 31fccdfb9c63d9e17a35e524ea52c2a8eced3f69 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 25 Feb 2021 11:24:05 -0800 Subject: [PATCH 1/3] Rename wf activation job "attribues" to "variants" and shorten names --- build.rs | 2 +- protos/local/core_interface.proto | 27 ++++++++++---------- src/lib.rs | 30 +++++++++++------------ src/machines/mod.rs | 2 +- src/machines/test_help/workflow_driver.rs | 7 +++--- src/machines/timer_state_machine.rs | 4 +-- src/machines/workflow_machines.rs | 20 ++++++--------- src/protos/mod.rs | 9 +++---- tests/integ_tests/simple_wf_tests.rs | 4 +-- 9 files changed, 49 insertions(+), 56 deletions(-) diff --git a/build.rs b/build.rs index 11f4dc705..474f91870 100644 --- a/build.rs +++ b/build.rs @@ -17,7 +17,7 @@ fn main() -> Result<(), Box> { .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") .type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.WFActivationJob.attributes", + "coresdk.WFActivationJob.variant", "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 14595a860..0daf4da03 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -56,22 +56,21 @@ message WFActivation { } message WFActivationJob { - oneof attributes { - // TODO could literally be attributes from events? -- maybe we don't need our own types - + oneof variant { // Begin a workflow for the first time - StartWorkflowTaskAttributes start_workflow = 1; + StartWorkflow start_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed - TimerFiredTaskAttributes timer_fired = 2; + FireTimer fire_timer = 2; // Workflow was reset. The randomness seed has to be updated. - RandomSeedUpdatedAttributes random_seed_updated = 3; - - QueryWorkflowJob query_workflow = 4; - CancelWorkflowTaskAttributes cancel_workflow = 5; + UpdateRandomSeed update_random_seed = 3; + // A request to query the workflow was received. + QueryWorkflow query_workflow = 4; + // A request to cancel the workflow was received. + CancelWorkflow cancel_workflow = 5; } } -message StartWorkflowTaskAttributes { +message StartWorkflow { // The identifier the lang-specific sdk uses to execute workflow code string workflow_type = 1; // The workflow id used on the temporal server @@ -86,19 +85,19 @@ message StartWorkflowTaskAttributes { // will be others - workflow exe started attrs, etc } -message CancelWorkflowTaskAttributes { +message CancelWorkflow { // TODO: add attributes here } -message TimerFiredTaskAttributes { +message FireTimer { string timer_id = 1; } -message RandomSeedUpdatedAttributes { +message UpdateRandomSeed { uint64 randomness_seed = 1; } -message QueryWorkflowJob { +message QueryWorkflow { temporal.api.query.v1.WorkflowQuery query = 1; } diff --git a/src/lib.rs b/src/lib.rs index fbb334386..f78e67b4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -330,8 +330,8 @@ mod test { machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ coresdk::{ - wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, - TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, + wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, + WfActivationJob, }, temporal::api::{ command::v1::{ @@ -381,7 +381,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); assert!(core.workflow_machines.exists(run_id)); @@ -401,7 +401,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + variant: Some(wf_activation_job::Variant::FireTimer(_)), }] ); let task_tok = res.task_token; @@ -460,7 +460,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); assert!(core.workflow_machines.exists(run_id)); @@ -488,13 +488,13 @@ mod test { res.get_wf_jobs().as_slice(), [ WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( - TimerFiredTaskAttributes { timer_id: t1_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t1_id } )), }, WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( - TimerFiredTaskAttributes { timer_id: t2_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t2_id } )), } ] => { @@ -537,7 +537,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); assert!(core.workflow_machines.exists(run_id)); @@ -557,7 +557,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + variant: Some(wf_activation_job::Variant::FireTimer(_)), }] ); let task_tok = res.task_token; @@ -634,8 +634,8 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow( - StartWorkflowTaskAttributes{randomness_seed, ..} + variant: Some(wf_activation_job::Variant::StartWorkflow( + StartWorkflow{randomness_seed, ..} )), }] => { randomness_seed_from_start = *randomness_seed; @@ -658,10 +658,10 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired(_),), + variant: Some(wf_activation_job::Variant::FireTimer(_),), }, WfActivationJob { - attributes: Some(wf_activation_job::Attributes::RandomSeedUpdated(RandomSeedUpdatedAttributes{randomness_seed})), + variant: Some(wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed{randomness_seed})), }] => { assert_ne!(randomness_seed_from_start, *randomness_seed) } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 35473e12c..ca1447d7b 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -90,7 +90,7 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send { /// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where /// some activations must be handled before outgoing commands are issued to avoid deadlocking. pub(crate) trait ActivationListener { - fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {} + fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {} } /// The struct for [WFCommand::AddCommand] diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 670408595..30ee9bc71 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,7 +1,8 @@ +use crate::protos::coresdk::wf_activation_job; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes}, + coresdk::FireTimer, temporal::api::{ command::v1::StartTimerCommandAttributes, history::v1::{ @@ -57,8 +58,8 @@ where } impl ActivationListener for TestWorkflowDriver { - fn on_activation_job(&mut self, activation: &Attributes) { - if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation { + fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { + if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { Arc::get_mut(&mut self.cache) .unwrap() .unblocked_timers diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 75e169dc1..3acb9a6bb 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -6,7 +6,7 @@ use crate::{ AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter, }, protos::{ - coresdk::{HistoryEventId, TimerFiredTaskAttributes, WfActivation}, + coresdk::{FireTimer, HistoryEventId, WfActivation}, temporal::api::{ command::v1::{ command::Attributes, CancelTimerCommandAttributes, Command, @@ -225,7 +225,7 @@ impl WFMachinesAdapter for TimerMachine { ) -> Result, WFMachinesError> { match my_command { // Fire the completion - TimerMachineCommand::Complete(_event) => Ok(vec![TimerFiredTaskAttributes { + TimerMachineCommand::Complete(_event) => Ok(vec![FireTimer { timer_id: self.shared_state.timer_attributes.timer_id.clone(), } .into()]), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 87b2f20d3..af31482c9 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,4 +1,5 @@ use crate::machines::workflow_machines::WFMachinesError::MalformedEvent; +use crate::protos::coresdk::wf_activation_job; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -7,10 +8,7 @@ use crate::{ }, protos::coresdk::WfActivationJob, protos::{ - coresdk::{ - wf_activation_job, wf_activation_job::Attributes::RandomSeedUpdated, - RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, WfActivation, - }, + coresdk::{StartWorkflow, UpdateRandomSeed, WfActivation}, temporal::api::{ command::v1::StartTimerCommandAttributes, common::v1::WorkflowExecution, @@ -69,7 +67,7 @@ pub(crate) struct WorkflowMachines { /// iterating over already added commands. current_wf_task_commands: VecDeque, /// Outgoing activation jobs that need to be sent to the lang sdk - outgoing_wf_activation_jobs: VecDeque, + outgoing_wf_activation_jobs: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, @@ -79,7 +77,7 @@ pub(crate) struct WorkflowMachines { #[derive(Debug, derive_more::From)] #[must_use] pub(super) enum WorkflowTrigger { - PushWFJob(#[from(forward)] wf_activation_job::Attributes), + PushWFJob(#[from(forward)] wf_activation_job::Variant), TriggerWFTaskStarted { task_started_event_id: i64, time: SystemTime, @@ -295,7 +293,7 @@ impl WorkflowMachines { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow self.outgoing_wf_activation_jobs.push_back( - StartWorkflowTaskAttributes { + StartWorkflow { workflow_type: attrs .workflow_type .as_ref() @@ -426,11 +424,9 @@ impl WorkflowMachines { } WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { self.outgoing_wf_activation_jobs.push_back( - wf_activation_job::Attributes::RandomSeedUpdated( - RandomSeedUpdatedAttributes { - randomness_seed: str_to_randomness_seed(&new_run_id), - }, - ), + wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed { + randomness_seed: str_to_randomness_seed(&new_run_id), + }), ); } } diff --git a/src/protos/mod.rs b/src/protos/mod.rs index b0c223057..3cf274981 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -11,7 +11,6 @@ pub mod coresdk { include!("coresdk.rs"); use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; - use crate::protos::coresdk::wf_activation_job::Attributes; use command::Variant; pub type HistoryEventId = i64; @@ -34,11 +33,9 @@ pub mod coresdk { } } - impl From for WfActivationJob { - fn from(a: Attributes) -> Self { - WfActivationJob { - attributes: Some(a), - } + impl From for WfActivationJob { + fn from(a: wf_activation_job::Variant) -> Self { + WfActivationJob { variant: Some(a) } } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 95109e3ae..fd5ba17dc 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -114,12 +114,12 @@ fn parallel_timer_workflow() { task.get_wf_jobs().as_slice(), [ WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( + attributes: Some(wf_activation_job::Variant::FireTimer( TimerFiredTaskAttributes { timer_id: t1_id } )), }, WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( + attributes: Some(wf_activation_job::Variant::FireTimer( TimerFiredTaskAttributes { timer_id: t2_id } )), } From c111358eacbac6891880950bcef8595249d16ff9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 25 Feb 2021 13:57:43 -0800 Subject: [PATCH 2/3] Fix integ tests --- tests/integ_tests/simple_wf_tests.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index fd5ba17dc..d8388c605 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -3,7 +3,7 @@ use rand::{self, Rng}; use std::{convert::TryFrom, env, time::Duration}; use temporal_sdk_core::{ protos::{ - coresdk::{wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob}, + coresdk::{wf_activation_job, FireTimer, TaskCompletion, WfActivationJob}, temporal::api::command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, @@ -114,13 +114,13 @@ fn parallel_timer_workflow() { task.get_wf_jobs().as_slice(), [ WfActivationJob { - attributes: Some(wf_activation_job::Variant::FireTimer( - TimerFiredTaskAttributes { timer_id: t1_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t1_id } )), }, WfActivationJob { - attributes: Some(wf_activation_job::Variant::FireTimer( - TimerFiredTaskAttributes { timer_id: t2_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t2_id } )), } ] => { From 3a244a6020f5f3654b05b900f892d8349b040d86 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 1 Mar 2021 09:33:57 -0800 Subject: [PATCH 3/3] Fix merge problems --- protos/local/core_interface.proto | 6 +++--- src/lib.rs | 6 ++---- src/machines/test_help/workflow_driver.rs | 5 ++--- src/machines/timer_state_machine.rs | 4 ++-- src/machines/workflow_machines.rs | 1 - 5 files changed, 9 insertions(+), 13 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 5bdc8b692..42ee0438f 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -61,8 +61,8 @@ message WFActivationJob { StartWorkflow start_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed FireTimer fire_timer = 2; - // A timer was canceled - TimerCanceledTaskAttributes timer_canceled = 3; + // A timer was canceled, and needs to be unblocked on the lang side. + CancelTimer cancel_timer = 3; // Workflow was reset. The randomness seed must be updated. UpdateRandomSeed update_random_seed = 4; // A request to query the workflow was received. @@ -95,7 +95,7 @@ message FireTimer { string timer_id = 1; } -message TimerCanceledTaskAttributes { +message CancelTimer { string timer_id = 1; } diff --git a/src/lib.rs b/src/lib.rs index 662715569..093995314 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -359,8 +359,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_timer("fake_timer"); - let core = build_fake_core(wfid, RUN_ID, &mut t, hist_batches); - core + build_fake_core(wfid, RUN_ID, &mut t, hist_batches) } #[rstest(core, @@ -616,7 +615,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); @@ -630,7 +629,6 @@ mod test { .into(), CancelTimerCommandAttributes { timer_id: cancel_timer_id.to_string(), - ..Default::default() } .into(), CompleteWorkflowExecutionCommandAttributes { result: None }.into(), diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 15c56d2cd..801eb4969 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,10 +1,9 @@ -use crate::protos::coresdk::wf_activation_job; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::FireTimer, + coresdk::{wf_activation_job, FireTimer}, temporal::api::{ - command::v1::StartTimerCommandAttributes, + command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, history::v1::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 411ab4aed..b6cfef40b 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -6,7 +6,7 @@ use crate::{ Cancellable, NewMachineWithCommand, WFMachinesAdapter, }, protos::{ - coresdk::{FireTimer, HistoryEventId, WfActivation}, + coresdk::{CancelTimer, FireTimer, HistoryEventId}, temporal::api::{ command::v1::{CancelTimerCommandAttributes, Command, StartTimerCommandAttributes}, enums::v1::{CommandType, EventType}, @@ -233,7 +233,7 @@ impl WFMachinesAdapter for TimerMachine { timer_id: self.shared_state.attrs.timer_id.clone(), } .into()], - TimerMachineCommand::Canceled => vec![TimerCanceledTaskAttributes { + TimerMachineCommand::Canceled => vec![CancelTimer { timer_id: self.shared_state.attrs.timer_id.clone(), } .into()], diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1a798ed71..fbeb15217 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,4 +1,3 @@ -use crate::machines::workflow_machines::WFMachinesError::MalformedEvent; use crate::protos::coresdk::wf_activation_job; use crate::{ machines::{