diff --git a/build.rs b/build.rs index 11f4dc705..474f91870 100644 --- a/build.rs +++ b/build.rs @@ -17,7 +17,7 @@ fn main() -> Result<(), Box> { .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") .type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.WFActivationJob.attributes", + "coresdk.WFActivationJob.variant", "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 8121dcd70..42ee0438f 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -56,24 +56,23 @@ message WFActivation { } message WFActivationJob { - oneof attributes { - // TODO could literally be attributes from events? -- maybe we don't need our own types - + oneof variant { // Begin a workflow for the first time - StartWorkflowTaskAttributes start_workflow = 1; + StartWorkflow start_workflow = 1; // A timer has fired, allowing whatever was waiting on it (if anything) to proceed - TimerFiredTaskAttributes timer_fired = 2; - // A timer was canceled - TimerCanceledTaskAttributes timer_canceled = 3; + FireTimer fire_timer = 2; + // A timer was canceled, and needs to be unblocked on the lang side. + CancelTimer cancel_timer = 3; // Workflow was reset. The randomness seed must be updated. - RandomSeedUpdatedAttributes random_seed_updated = 4; - - QueryWorkflowJob query_workflow = 5; - CancelWorkflowTaskAttributes cancel_workflow = 6; + UpdateRandomSeed update_random_seed = 4; + // A request to query the workflow was received. + QueryWorkflow query_workflow = 5; + // A request to cancel the workflow was received. + CancelWorkflow cancel_workflow = 6; } } -message StartWorkflowTaskAttributes { +message StartWorkflow { // The identifier the lang-specific sdk uses to execute workflow code string workflow_type = 1; // The workflow id used on the temporal server @@ -88,23 +87,23 @@ message StartWorkflowTaskAttributes { // will be others - workflow exe started attrs, etc } -message CancelWorkflowTaskAttributes { +message CancelWorkflow { // TODO: add attributes here } -message TimerFiredTaskAttributes { +message FireTimer { string timer_id = 1; } -message TimerCanceledTaskAttributes { +message CancelTimer { string timer_id = 1; } -message RandomSeedUpdatedAttributes { +message UpdateRandomSeed { uint64 randomness_seed = 1; } -message QueryWorkflowJob { +message QueryWorkflow { temporal.api.query.v1.WorkflowQuery query = 1; } diff --git a/src/lib.rs b/src/lib.rs index 09405e935..093995314 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -339,8 +339,8 @@ mod test { machines::test_help::{build_fake_core, FakeCore}, protos::{ coresdk::{ - wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, - TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, + wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, + WfActivationJob, }, temporal::api::command::v1::{ CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, @@ -359,8 +359,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_timer("fake_timer"); - let core = build_fake_core(wfid, RUN_ID, &mut t, hist_batches); - core + build_fake_core(wfid, RUN_ID, &mut t, hist_batches) } #[rstest(core, @@ -372,7 +371,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); assert!(core.workflow_machines.exists(RUN_ID)); @@ -392,7 +391,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + variant: Some(wf_activation_job::Variant::FireTimer(_)), }] ); let task_tok = res.task_token; @@ -418,7 +417,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); assert!(core.workflow_machines.exists(run_id)); @@ -446,13 +445,13 @@ mod test { res.get_wf_jobs().as_slice(), [ WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( - TimerFiredTaskAttributes { timer_id: t1_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t1_id } )), }, WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( - TimerFiredTaskAttributes { timer_id: t2_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t2_id } )), } ] => { @@ -483,7 +482,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); assert!(core.workflow_machines.exists(run_id)); @@ -510,7 +509,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + variant: Some(wf_activation_job::Variant::FireTimer(_)), }] ); let task_tok = res.task_token; @@ -558,8 +557,8 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow( - StartWorkflowTaskAttributes{randomness_seed, ..} + variant: Some(wf_activation_job::Variant::StartWorkflow( + StartWorkflow{randomness_seed, ..} )), }] => { randomness_seed_from_start = *randomness_seed; @@ -582,10 +581,10 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired(_),), + variant: Some(wf_activation_job::Variant::FireTimer(_),), }, WfActivationJob { - attributes: Some(wf_activation_job::Attributes::RandomSeedUpdated(RandomSeedUpdatedAttributes{randomness_seed})), + variant: Some(wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed{randomness_seed})), }] => { assert_ne!(randomness_seed_from_start, *randomness_seed) } @@ -616,7 +615,7 @@ mod test { assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { - attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), }] ); @@ -630,7 +629,6 @@ mod test { .into(), CancelTimerCommandAttributes { timer_id: cancel_timer_id.to_string(), - ..Default::default() } .into(), CompleteWorkflowExecutionCommandAttributes { result: None }.into(), diff --git a/src/machines/mod.rs b/src/machines/mod.rs index c3ad06212..2985baaac 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -87,7 +87,7 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send { /// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where /// some activations must be handled before outgoing commands are issued to avoid deadlocking. pub(crate) trait ActivationListener { - fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {} + fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {} } /// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index d6edc8506..801eb4969 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,10 +1,9 @@ -use crate::protos::temporal::api::command::v1::CancelTimerCommandAttributes; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes}, + coresdk::{wf_activation_job, FireTimer}, temporal::api::{ - command::v1::StartTimerCommandAttributes, + command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, history::v1::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, @@ -66,8 +65,8 @@ where } impl ActivationListener for TestWorkflowDriver { - fn on_activation_job(&mut self, activation: &Attributes) { - if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation { + fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { + if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { Arc::get_mut(&mut self.cache) .unwrap() .unblocked_timers diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 94b4c4415..b6cfef40b 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -6,7 +6,7 @@ use crate::{ Cancellable, NewMachineWithCommand, WFMachinesAdapter, }, protos::{ - coresdk::{HistoryEventId, TimerCanceledTaskAttributes, TimerFiredTaskAttributes}, + coresdk::{CancelTimer, FireTimer, HistoryEventId}, temporal::api::{ command::v1::{CancelTimerCommandAttributes, Command, StartTimerCommandAttributes}, enums::v1::{CommandType, EventType}, @@ -229,11 +229,11 @@ impl WFMachinesAdapter for TimerMachine { ) -> Result, WFMachinesError> { Ok(match my_command { // Fire the completion - TimerMachineCommand::Complete => vec![TimerFiredTaskAttributes { + TimerMachineCommand::Complete => vec![FireTimer { timer_id: self.shared_state.attrs.timer_id.clone(), } .into()], - TimerMachineCommand::Canceled => vec![TimerCanceledTaskAttributes { + TimerMachineCommand::Canceled => vec![CancelTimer { timer_id: self.shared_state.attrs.timer_id.clone(), } .into()], diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 362a7745d..fbeb15217 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::protos::coresdk::wf_activation_job; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -5,10 +6,7 @@ use crate::{ ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{ - wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, - WfActivation, - }, + coresdk::{StartWorkflow, UpdateRandomSeed, WfActivation}, temporal::api::{ enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent}, @@ -66,7 +64,7 @@ pub(crate) struct WorkflowMachines { /// iterating over already added commands. current_wf_task_commands: VecDeque, /// Outgoing activation jobs that need to be sent to the lang sdk - outgoing_wf_activation_jobs: VecDeque, + outgoing_wf_activation_jobs: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, @@ -84,7 +82,7 @@ struct CommandAndMachine { #[must_use] #[allow(clippy::large_enum_variant)] pub enum MachineResponse { - PushWFJob(#[from(forward)] wf_activation_job::Attributes), + PushWFJob(#[from(forward)] wf_activation_job::Variant), IssueNewCommand(ProtoCommand), TriggerWFTaskStarted { task_started_event_id: i64, @@ -318,7 +316,7 @@ impl WorkflowMachines { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow self.outgoing_wf_activation_jobs.push_back( - StartWorkflowTaskAttributes { + StartWorkflow { workflow_type: attrs .workflow_type .as_ref() @@ -465,11 +463,9 @@ impl WorkflowMachines { } MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => { self.outgoing_wf_activation_jobs.push_back( - wf_activation_job::Attributes::RandomSeedUpdated( - RandomSeedUpdatedAttributes { - randomness_seed: str_to_randomness_seed(&new_run_id), - }, - ), + wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed { + randomness_seed: str_to_randomness_seed(&new_run_id), + }), ); } MachineResponse::NoOp => (), diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 0893a5e9c..777c187d9 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -11,7 +11,6 @@ pub mod coresdk { include!("coresdk.rs"); use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; - use crate::protos::coresdk::wf_activation_job::Attributes; use command::Variant; pub type HistoryEventId = i64; @@ -34,11 +33,9 @@ pub mod coresdk { } } - impl From for WfActivationJob { - fn from(a: Attributes) -> Self { - WfActivationJob { - attributes: Some(a), - } + impl From for WfActivationJob { + fn from(a: wf_activation_job::Variant) -> Self { + WfActivationJob { variant: Some(a) } } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index a48731261..ce2d1d7cc 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -4,7 +4,7 @@ use std::{convert::TryFrom, env, time::Duration}; use temporal_sdk_core::protos::temporal::api::command::v1::CancelTimerCommandAttributes; use temporal_sdk_core::{ protos::{ - coresdk::{wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob}, + coresdk::{wf_activation_job, FireTimer, TaskCompletion, WfActivationJob}, temporal::api::command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, @@ -118,13 +118,13 @@ fn parallel_timer_workflow() { task.get_wf_jobs().as_slice(), [ WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( - TimerFiredTaskAttributes { timer_id: t1_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t1_id } )), }, WfActivationJob { - attributes: Some(wf_activation_job::Attributes::TimerFired( - TimerFiredTaskAttributes { timer_id: t2_id } + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t2_id } )), } ] => {