From de0b9b06641c3f3a5848a93a9e9113e391eb393d Mon Sep 17 00:00:00 2001 From: Vitaly Date: Sun, 7 Mar 2021 18:38:11 -0800 Subject: [PATCH 01/22] Basic boilerplate and a failing test --- src/machines/activity_state_machine.rs | 80 +++++++++++++++++++++++++- src/test_help/canned_histories.rs | 52 ++++++++++++++++- 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 23efd857d..7d5c1b714 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,3 +1,4 @@ +use crate::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; use rustfsm::{fsm, TransitionResult}; // Schedule / cancel are "explicit events" (imperative rather than past events?) @@ -54,6 +55,36 @@ pub(super) enum ActivityMachineError {} pub(super) enum ActivityCommand {} +#[derive(Debug, Clone, derive_more::Display)] +pub(super) enum ActivityCancellationType { + /** + * Wait for activity cancellation completion. Note that activity must heartbeat to receive a + * cancellation notification. This can block the cancellation for a long time if activity doesn't + * heartbeat or chooses to ignore the cancellation request. + */ + WaitCancellationCompleted, + + /** Initiate a cancellation request and immediately report cancellation to the workflow. */ + TryCancel, + + /** + * Do not request cancellation of the activity and immediately report cancellation to the workflow + */ + Abandon, +} + +impl Default for ActivityCancellationType { + fn default() -> Self { + ActivityCancellationType::TryCancel + } +} + +#[derive(Default, Clone)] +pub(super) struct SharedState { + attrs: ScheduleActivityTaskCommandAttributes, + cancellation_type: ActivityCancellationType, +} + #[derive(Default, Clone)] pub(super) struct Created {} @@ -201,6 +232,51 @@ pub(super) struct Canceled {} #[cfg(test)] mod activity_machine_tests { - #[test] - fn test() {} + use crate::machines::test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}; + use crate::machines::WorkflowMachines; + use crate::protos::temporal::api::command::v1::CompleteWorkflowExecutionCommandAttributes; + use crate::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; + use crate::protos::temporal::api::enums::v1::CommandType; + use crate::test_help::canned_histories; + use rstest::{fixture, rstest}; + use tracing::Level; + + #[fixture] + fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { + let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { + let activity = ScheduleActivityTaskCommandAttributes { + ..Default::default() + }; + command_sink.activity(activity); + + let complete = CompleteWorkflowExecutionCommandAttributes::default(); + command_sink.send(complete.into()); + }); + + let t = canned_histories::single_activity("activity1"); + let state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); + + assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); + (t, state_machines) + } + + #[rstest] + fn test_activity_happy_path(activity_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + let s = span!(Level::DEBUG, "Test start", t = "activity_happy_path"); + let _enter = s.enter(); + let (t, mut state_machines) = activity_happy_hist; + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) + .unwrap(); + state_machines.get_wf_activation(); + assert_eq!(commands.len(), 1); + assert_eq!( + commands[0].command_type, + CommandType::ScheduleActivityTask as i32 + ); + } } diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 9ac08c3ec..ed8df16c1 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -3,7 +3,8 @@ use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ - history_event, TimerCanceledEventAttributes, TimerFiredEventAttributes, + history_event, ActivityTaskCompletedEventAttributes, ActivityTaskScheduledEventAttributes, + ActivityTaskStartedEventAttributes, TimerCanceledEventAttributes, TimerFiredEventAttributes, }; /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED @@ -152,6 +153,55 @@ pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBui t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_ACTIVITY_TASK_STARTED +/// 7: EVENT_TYPE_ACTIVITY_TASK_COMPLETED +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn single_activity(activity_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let scheduled_event_id = t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + let started_event_id = t.add_get_event_id( + EventType::ActivityTaskStarted, + Some( + history_event::Attributes::ActivityTaskStartedEventAttributes( + ActivityTaskStartedEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ), + ); + t.add( + EventType::ActivityTaskCompleted, + history_event::Attributes::ActivityTaskCompletedEventAttributes( + ActivityTaskCompletedEventAttributes { + scheduled_event_id, + started_event_id, + ..Default::default() + }, + ), + ); + t.add_workflow_task_scheduled_and_started(); + t +} + /// First signal's payload is "hello " and second is "world" (no metadata for either) /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED /// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED From 2baa39e96f4a3377a3b0321fd61722e4b578207b Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 8 Mar 2021 11:58:59 -0800 Subject: [PATCH 02/22] recent progress --- src/machines/activity_state_machine.rs | 82 ++++++++++++++++++++++++-- src/machines/mod.rs | 5 +- src/machines/workflow_machines.rs | 10 ++++ 3 files changed, 90 insertions(+), 7 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 7d5c1b714..cbe98be01 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,10 +1,15 @@ -use crate::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; -use rustfsm::{fsm, TransitionResult}; +use crate::machines::workflow_machines::MachineResponse; +use crate::machines::{Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError}; +use crate::protos::temporal::api::command::v1::{Command, ScheduleActivityTaskCommandAttributes}; +use crate::protos::temporal::api::enums::v1::CommandType; +use crate::protos::temporal::api::history::v1::HistoryEvent; +use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; +use std::convert::TryFrom; // Schedule / cancel are "explicit events" (imperative rather than past events?) fsm! { - pub(super) name ActivityMachine; command ActivityCommand; error ActivityMachineError; + pub(super) name ActivityMachine; command ActivityCommand; error WFMachinesError; Created --(Schedule, on_schedule)--> ScheduleCommandCreated; @@ -50,9 +55,6 @@ fsm! { --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; } -#[derive(thiserror::Error, Debug)] -pub(super) enum ActivityMachineError {} - pub(super) enum ActivityCommand {} #[derive(Debug, Clone, derive_more::Display)] @@ -79,6 +81,74 @@ impl Default for ActivityCancellationType { } } +/// Creates a new, scheduled, activity as a [CancellableCommand] +pub(super) fn new_activity( + attribs: ScheduleActivityTaskCommandAttributes, +) -> NewMachineWithCommand { + let (activity, add_cmd) = ActivityMachine::new_scheduled(attribs); + NewMachineWithCommand { + command: add_cmd, + machine: activity, + } +} + +impl ActivityMachine { + pub(crate) fn new_scheduled(attribs: ScheduleActivityTaskCommandAttributes) -> (Self, Command) { + let mut s = Self::new(attribs); + s.on_event_mut(ActivityMachine::Schedule) + .expect("Scheduling activities doesn't fail"); + let cmd = Command { + command_type: CommandType::ScheduleActivityTask as i32, + attributes: Some(s.shared_state().attrs.clone().into()), + }; + (s, cmd) + } +} + +impl TryFrom for ActivityMachine { + type Error = WFMachinesError; + + fn try_from(value: HistoryEvent) -> Result { + unimplemented!() + } +} + +impl TryFrom for ActivityMachine { + type Error = (); + + fn try_from(c: CommandType) -> Result { + unimplemented!() + } +} + +impl WFMachinesAdapter for ActivityMachine { + fn adapt_response( + &self, + event: &HistoryEvent, + has_next_event: bool, + my_command: ActivityMachineCommand, + ) -> Result, WFMachinesError> { + Ok(!vec![]) + } +} + +impl Cancellable for ActivityMachine { + fn cancel(&mut self) -> Result> { + unimplemented!() + } + + fn was_cancelled_before_sent_to_server(&self) -> bool { + unimplemented!() + } +} + +#[derive(Debug, derive_more::Display)] +pub(super) enum ActivityMachineCommand { + Complete, + Canceled, + IssueCancelCmd(Command), +} + #[derive(Default, Clone)] pub(super) struct SharedState { attrs: ScheduleActivityTaskCommandAttributes, diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 9ef95a3c4..f371c33ea 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -33,7 +33,9 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; +use crate::protos::temporal::api::command::v1::{ + FailWorkflowExecutionCommandAttributes, ScheduleActivityTaskCommandAttributes, +}; use crate::{ core_tracing::VecDisplayer, machines::workflow_machines::MachineResponse, @@ -64,6 +66,7 @@ pub(crate) type ProtoCommand = Command; pub enum WFCommand { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, + AddActivity(ScheduleActivityTaskCommandAttributes), AddTimer(StartTimerCommandAttributes), CancelTimer(CancelTimerCommandAttributes), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 47131db3f..a5b6dcbbc 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::machines::activity_state_machine::new_activity; use crate::workflow::{DrivenWorkflow, WorkflowFetcher}; use crate::{ core_tracing::VecDisplayer, @@ -57,6 +58,9 @@ pub(crate) struct WorkflowMachines { /// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum. timer_id_to_machine: HashMap, + /// TODO document + activity_id_to_machine: HashMap, + /// Queued commands which have been produced by machines and await processing / being sent to /// the server. commands: VecDeque, @@ -554,6 +558,12 @@ impl WorkflowMachines { } } } + WFCommand::AddActivity(attrs) => { + let aid = attrs.activity_id.clone(); + let activity = self.add_new_machine(new_activity(attrs)); + self.activity_id_to_machine.insert(aid, activity.machine); + self.current_wf_task_commands.push_back(activity); + } WFCommand::CompleteWorkflow(attrs) => { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); From e7e3de2ef3d4430734b48808f734d06033515db6 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 8 Mar 2021 17:44:53 -0800 Subject: [PATCH 03/22] fix compilation issues and add proto messages --- src/machines/activity_state_machine.rs | 24 +++++++++++++++++------- src/machines/workflow_machines.rs | 1 + 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index cbe98be01..1b5ecde45 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -9,7 +9,10 @@ use std::convert::TryFrom; // Schedule / cancel are "explicit events" (imperative rather than past events?) fsm! { - pub(super) name ActivityMachine; command ActivityCommand; error WFMachinesError; + pub(super) name ActivityMachine; + command ActivityCommand; + error WFMachinesError; + shared_state SharedState; Created --(Schedule, on_schedule)--> ScheduleCommandCreated; @@ -55,6 +58,7 @@ fsm! { --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; } +#[derive(Debug, derive_more::Display)] pub(super) enum ActivityCommand {} #[derive(Debug, Clone, derive_more::Display)] @@ -94,8 +98,14 @@ pub(super) fn new_activity( impl ActivityMachine { pub(crate) fn new_scheduled(attribs: ScheduleActivityTaskCommandAttributes) -> (Self, Command) { - let mut s = Self::new(attribs); - s.on_event_mut(ActivityMachine::Schedule) + let mut s = Self { + state: Created {}.into(), + shared_state: SharedState { + attrs: attribs, + cancellation_type: ActivityCancellationType::TryCancel, + }, + }; + s.on_event_mut(ActivityMachineEvents::Schedule) .expect("Scheduling activities doesn't fail"); let cmd = Command { command_type: CommandType::ScheduleActivityTask as i32, @@ -105,7 +115,7 @@ impl ActivityMachine { } } -impl TryFrom for ActivityMachine { +impl TryFrom for ActivityMachineEvents { type Error = WFMachinesError; fn try_from(value: HistoryEvent) -> Result { @@ -113,7 +123,7 @@ impl TryFrom for ActivityMachine { } } -impl TryFrom for ActivityMachine { +impl TryFrom for ActivityMachineEvents { type Error = (); fn try_from(c: CommandType) -> Result { @@ -126,9 +136,9 @@ impl WFMachinesAdapter for ActivityMachine { &self, event: &HistoryEvent, has_next_event: bool, - my_command: ActivityMachineCommand, + my_command: ActivityCommand, ) -> Result, WFMachinesError> { - Ok(!vec![]) + Ok(match my_command {}) } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index a5b6dcbbc..59a4ffcee 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -146,6 +146,7 @@ impl WorkflowMachines { all_machines: Default::default(), machines_by_event_id: Default::default(), timer_id_to_machine: Default::default(), + activity_id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), } From 95084c4b1e50080e795e05f4bc4fe3b99a692851 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 8 Mar 2021 20:01:01 -0800 Subject: [PATCH 04/22] Convert ActivityCommand in WFMachinesAdapter --- src/machines/activity_state_machine.rs | 22 ++++++++++++++++++++-- src/machines/workflow_machines.rs | 8 ++++++++ 2 files changed, 28 insertions(+), 2 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 1b5ecde45..ddacd7a3e 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,5 +1,7 @@ use crate::machines::workflow_machines::MachineResponse; +use crate::machines::workflow_machines::MachineResponse::PushActivityJob; use crate::machines::{Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError}; +use crate::protos::coresdk::{activity_task, CancelActivity, StartActivity}; use crate::protos::temporal::api::command::v1::{Command, ScheduleActivityTaskCommandAttributes}; use crate::protos::temporal::api::enums::v1::CommandType; use crate::protos::temporal::api::history::v1::HistoryEvent; @@ -59,7 +61,10 @@ fsm! { } #[derive(Debug, derive_more::Display)] -pub(super) enum ActivityCommand {} +pub(super) enum ActivityCommand { + StartActivity, + CancelActivity, +} #[derive(Debug, Clone, derive_more::Display)] pub(super) enum ActivityCancellationType { @@ -138,7 +143,20 @@ impl WFMachinesAdapter for ActivityMachine { has_next_event: bool, my_command: ActivityCommand, ) -> Result, WFMachinesError> { - Ok(match my_command {}) + Ok(match my_command { + ActivityCommand::StartActivity => { + vec![PushActivityJob(activity_task::Job::Start(StartActivity { + // TODO pass activity details + }))] + } + ActivityCommand::CancelActivity => { + vec![PushActivityJob(activity_task::Job::Cancel( + CancelActivity { + // TODO pass activity cancellation details + }, + ))] + } + }) } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 59a4ffcee..d7a920b52 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,4 +1,5 @@ use crate::machines::activity_state_machine::new_activity; +use crate::protos::coresdk::activity_task; use crate::workflow::{DrivenWorkflow, WorkflowFetcher}; use crate::{ core_tracing::VecDisplayer, @@ -90,6 +91,10 @@ struct CommandAndMachine { pub enum MachineResponse { #[display(fmt = "PushWFJob")] PushWFJob(#[from(forward)] wf_activation_job::Variant), + + #[display(fmt = "PushActivityJob")] + PushActivityJob(activity_task::Job), + IssueNewCommand(ProtoCommand), #[display(fmt = "TriggerWFTaskStarted")] TriggerWFTaskStarted { @@ -518,6 +523,9 @@ impl WorkflowMachines { MachineResponse::IssueNewCommand(_) => { panic!("Issue new command machine response not expected here") } + MachineResponse::PushActivityJob(a) => { + unimplemented!() + } } } Ok(()) From d066943b7e2eb37979830f10b48c8eaf27012c21 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 8 Mar 2021 20:36:15 -0800 Subject: [PATCH 05/22] Add CommandSender for activity in the workflow driver --- src/machines/activity_state_machine.rs | 3 ++- src/machines/test_help/workflow_driver.rs | 1 + 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 src/machines/test_help/workflow_driver.rs diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index ddacd7a3e..ccb348c34 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -343,9 +343,10 @@ mod activity_machine_tests { fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let activity = ScheduleActivityTaskCommandAttributes { + activity_id: "activity A".to_string(), ..Default::default() }; - command_sink.activity(activity); + command_sink.activity(activity, true); let complete = CompleteWorkflowExecutionCommandAttributes::default(); command_sink.send(complete.into()); diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/src/machines/test_help/workflow_driver.rs @@ -0,0 +1 @@ + From 8dec63a9c26bdd49db6822c9ddd36665fe1ebb1d Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 8 Mar 2021 21:00:58 -0800 Subject: [PATCH 06/22] Add TryFrom for CommandType and make the test pass --- src/machines/activity_state_machine.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index ccb348c34..84e346ebf 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -132,7 +132,11 @@ impl TryFrom for ActivityMachineEvents { type Error = (); fn try_from(c: CommandType) -> Result { - unimplemented!() + Ok(match c { + CommandType::ScheduleActivityTask => Self::CommandScheduleActivityTask, + CommandType::RequestCancelActivityTask => Self::CommandRequestCancelActivityTask, + _ => return Err(()), + }) } } @@ -166,7 +170,7 @@ impl Cancellable for ActivityMachine { } fn was_cancelled_before_sent_to_server(&self) -> bool { - unimplemented!() + false // TODO return cancellation flag from the shared state } } From abdbb6b9cdd3b937338a5838419a0c4bc9d51a34 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 10 Mar 2021 22:52:53 -0800 Subject: [PATCH 07/22] Add core API for polling activities --- protos/local/core_interface.proto | 2 +- src/lib.rs | 76 ++++++++++++++++++-------- src/machines/activity_state_machine.rs | 28 ++++++++-- src/machines/test_help/mod.rs | 4 +- src/machines/workflow_machines.rs | 5 +- src/pollers/mod.rs | 71 +++++++++++++++++++----- src/workflow/driven_workflow.rs | 2 +- 7 files changed, 140 insertions(+), 48 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index e2a0bf38b..f54e8679d 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -157,7 +157,7 @@ message CancelActivity { message ActivityTask { string activity_id = 1; - oneof job { + oneof variant { // Start activity execution. StartActivity start = 2; // Attempt to cancel activity execution. diff --git a/src/lib.rs b/src/lib.rs index 04b549fe7..f777a5baf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,16 +22,20 @@ mod workflow; mod test_help; pub use core_tracing::tracing_init; -pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; +pub use pollers::{ + PollTaskRequest, PollTaskResponse, ServerGateway, ServerGatewayApis, ServerGatewayOptions, +}; pub use url::Url; +use crate::protos::coresdk::ActivityTask; +use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use crate::{ machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ - task_completion, wf_activation_completion::Status, Task, TaskCompletion, - WfActivationCompletion, WfActivationSuccess, + task_completion, wf_activation_completion::Status, ActivityResult, Task, + TaskCompletion, WfActivationCompletion, WfActivationSuccess, }, temporal::api::{ enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse, @@ -62,13 +66,17 @@ pub type Result = std::result::Result; /// expected that only one instance of an implementation will exist for the lifetime of the /// worker(s) using it. pub trait Core: Send + Sync { - /// Ask the core for some work, returning a [Task], which will eventually contain either a - /// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the - /// language SDK's responsibility to call the appropriate code with the provided inputs. + /// Ask the core for some work, returning a [Task], which will contain a [protos::coresdk::WfActivation]. + /// It is then the language SDK's responsibility to call the appropriate code with the provided inputs. /// /// TODO: Examples + /// TODO: rename to poll_workflow_task and change result type to WfActivation fn poll_task(&self, task_queue: &str) -> Result; + /// Ask the core for some work, returning a [protos::coresdk::Task], which will contain a [protos::coresdk::ActivityTask]. + /// It is then the language SDK's responsibility to call the completion API. + fn poll_activity_task(&self, task_queue: &str) -> Result; + /// Tell the core that some work has been completed - whether as a result of running workflow /// code or executing an activity. fn complete_task(&self, req: TaskCompletion) -> Result<()>; @@ -158,8 +166,8 @@ where // This will block forever (unless interrupted by shutdown) in the event there is no work // from the server - match self.poll_server(task_queue) { - Ok(work) => { + match self.poll_server(PollTaskRequest::Workflow(task_queue.to_owned())) { + Ok(PollTaskResponse::WorkflowTask(work)) => { let task_token = work.task_token.clone(); debug!( task_token = %fmt_task_token(&task_token), @@ -180,8 +188,29 @@ where variant: next_activation.activation.map(Into::into), }) } + // Drain pending activations in case of shutdown. Err(CoreError::ShuttingDown) => self.poll_task(task_queue), Err(e) => Err(e), + Ok(PollTaskResponse::ActivityTask(_)) => Err(CoreError::UnexpectedResult), + } + } + + #[instrument(skip(self))] + fn poll_activity_task(&self, task_queue: &str) -> Result { + if self.shutdown_requested.load(Ordering::SeqCst) { + return Err(CoreError::ShuttingDown); + } + + match self.poll_server(PollTaskRequest::Activity(task_queue.to_owned())) { + Ok(PollTaskResponse::ActivityTask(work)) => { + let task_token = work.task_token.clone(); + Ok(Task { + task_token, + variant: None, + }) + } + Err(e) => Err(e), + Ok(PollTaskResponse::WorkflowTask(_)) => Err(CoreError::UnexpectedResult), } } @@ -230,8 +259,11 @@ where Ok(()) } TaskCompletion { - variant: Some(task_completion::Variant::Activity(_)), - .. + task_token, + variant: + Some(task_completion::Variant::Activity(ActivityResult { + status: Some(activity_status), + })), } => unimplemented!(), _ => Err(CoreError::MalformedCompletion(req)), } @@ -298,7 +330,7 @@ impl CoreSDK { /// Blocks polling the server until it responds, or until the shutdown flag is set (aborting /// the poll) - fn poll_server(&self, task_queue: &str) -> Result { + fn poll_server(&self, req: PollTaskRequest) -> Result { self.runtime.block_on(async { let shutdownfut = async { loop { @@ -308,14 +340,12 @@ impl CoreSDK { tokio::time::sleep(Duration::from_millis(100)).await; } }; - let pollfut = self - .server_gateway - .poll_workflow_task(task_queue.to_owned()); + let poll_result_future = self.server_gateway.poll_task(req); tokio::select! { _ = shutdownfut => { Err(CoreError::ShuttingDown) } - r = pollfut => r + r = poll_result_future => r } }) } @@ -377,6 +407,9 @@ pub enum CoreError { /// When thrown from complete_task, it means you should poll for a new task, receive a new /// task token, and complete that task. UnhandledCommandWhenCompleting, + /// Indicates that underlying function returned Ok, but result type was incorrect. + /// This is likely a result of a bug and should never happen. + UnexpectedResult, } #[cfg(test)] @@ -810,14 +843,11 @@ mod test { let res = core.poll_task(TASK_Q).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), - [ - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - } - ] + [WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }] ); } } diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 84e346ebf..9f2220b84 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -3,7 +3,7 @@ use crate::machines::workflow_machines::MachineResponse::PushActivityJob; use crate::machines::{Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError}; use crate::protos::coresdk::{activity_task, CancelActivity, StartActivity}; use crate::protos::temporal::api::command::v1::{Command, ScheduleActivityTaskCommandAttributes}; -use crate::protos::temporal::api::enums::v1::CommandType; +use crate::protos::temporal::api::enums::v1::{CommandType, EventType}; use crate::protos::temporal::api::history::v1::HistoryEvent; use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; use std::convert::TryFrom; @@ -123,8 +123,22 @@ impl ActivityMachine { impl TryFrom for ActivityMachineEvents { type Error = WFMachinesError; - fn try_from(value: HistoryEvent) -> Result { - unimplemented!() + fn try_from(e: HistoryEvent) -> Result { + Ok(match EventType::from_i32(e.event_type) { + Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled, + Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted, + Some(EventType::ActivityTaskCompleted) => Self::ActivityTaskCompleted, + Some(EventType::ActivityTaskFailed) => Self::ActivityTaskFailed, + Some(EventType::ActivityTaskTimedOut) => Self::ActivityTaskTimedOut, + Some(EventType::ActivityTaskCancelRequested) => Self::ActivityTaskCancelRequested, + Some(EventType::ActivityTaskCanceled) => Self::ActivityTaskCanceled, + _ => { + return Err(WFMachinesError::UnexpectedEvent( + e, + "Activity machine does not handle this event", + )) + } + }) } } @@ -149,12 +163,14 @@ impl WFMachinesAdapter for ActivityMachine { ) -> Result, WFMachinesError> { Ok(match my_command { ActivityCommand::StartActivity => { - vec![PushActivityJob(activity_task::Job::Start(StartActivity { + vec![PushActivityJob(activity_task::Variant::Start( + StartActivity { // TODO pass activity details - }))] + }, + ))] } ActivityCommand::CancelActivity => { - vec![PushActivityJob(activity_task::Job::Cancel( + vec![PushActivityJob(activity_task::Variant::Cancel( CancelActivity { // TODO pass activity cancellation details }, diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 011da2805..d0ba92241 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -14,7 +14,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, }, - CoreSDK, ServerGatewayApis, + CoreSDK, PollTaskResponse, ServerGatewayApis, }; use rand::{thread_rng, Rng}; use std::sync::atomic::AtomicBool; @@ -59,7 +59,7 @@ pub(crate) fn build_fake_core( mock_gateway .expect_poll_workflow_task() .times(response_batches.len()) - .returning(move |_| Ok(tasks.pop_front().unwrap())); + .returning(move |_| Ok(PollTaskResponse::WorkflowTask(tasks.pop_front().unwrap()))); // Response not really important here mock_gateway .expect_complete_workflow_task() diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index d7a920b52..83dfbb872 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -10,7 +10,7 @@ use crate::{ TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation}, + coresdk::{wf_activation_job, ActivityTask, StartWorkflow, UpdateRandomSeed, WfActivation}, temporal::api::{ enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent}, @@ -92,8 +92,9 @@ pub enum MachineResponse { #[display(fmt = "PushWFJob")] PushWFJob(#[from(forward)] wf_activation_job::Variant), + // TODO remove and use IssueNewCommand #[display(fmt = "PushActivityJob")] - PushActivityJob(activity_task::Job), + PushActivityJob(activity_task::Variant), IssueNewCommand(ProtoCommand), #[display(fmt = "TriggerWFTaskStarted")] diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 6563033bc..4b29fb470 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -2,7 +2,8 @@ use std::time::Duration; use crate::protos::temporal::api::common::v1::{Payloads, WorkflowExecution}; use crate::protos::temporal::api::workflowservice::v1::{ - SignalWorkflowExecutionRequest, SignalWorkflowExecutionResponse, + PollActivityTaskQueueRequest, PollActivityTaskQueueResponse, SignalWorkflowExecutionRequest, + SignalWorkflowExecutionResponse, }; use crate::{ machines::ProtoCommand, @@ -100,8 +101,13 @@ pub trait ServerGatewayApis { ) -> Result; /// Fetch new work. Should block indefinitely if there is no work. - async fn poll_workflow_task(&self, task_queue: String) - -> Result; + async fn poll_task(&self, req: PollTaskRequest) -> Result; + + /// Fetch new work. Should block indefinitely if there is no work. + async fn poll_workflow_task(&self, task_queue: String) -> Result; + + /// Fetch new work. Should block indefinitely if there is no work. + async fn poll_activity_task(&self, task_queue: String) -> Result; /// Complete a task by sending it to the server. `task_token` is the task token that would've /// been received from [PollWorkflowTaskQueueApi::poll]. `commands` is a list of new commands @@ -131,6 +137,16 @@ pub trait ServerGatewayApis { ) -> Result; } +pub enum PollTaskRequest { + Workflow(String), + Activity(String), +} + +pub enum PollTaskResponse { + WorkflowTask(PollWorkflowTaskQueueResponse), + ActivityTask(PollActivityTaskQueueResponse), +} + #[async_trait::async_trait] impl ServerGatewayApis for ServerGateway { async fn start_workflow( @@ -162,10 +178,18 @@ impl ServerGatewayApis for ServerGateway { .into_inner()) } - async fn poll_workflow_task( - &self, - task_queue: String, - ) -> Result { + async fn poll_task(&self, req: PollTaskRequest) -> Result { + match req { + PollTaskRequest::Workflow(task_queue) => { + Ok(Self::poll_workflow_task(self, task_queue).await?) + } + PollTaskRequest::Activity(task_queue) => { + Ok(Self::poll_activity_task(self, task_queue).await?) + } + } + } + + async fn poll_workflow_task(&self, task_queue: String) -> Result { let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.clone(), task_queue: Some(TaskQueue { @@ -176,12 +200,33 @@ impl ServerGatewayApis for ServerGateway { binary_checksum: self.opts.worker_binary_id.clone(), }; - Ok(self - .service - .clone() - .poll_workflow_task_queue(request) - .await? - .into_inner()) + Ok(PollTaskResponse::WorkflowTask( + self.service + .clone() + .poll_workflow_task_queue(request) + .await? + .into_inner(), + )) + } + + async fn poll_activity_task(&self, task_queue: String) -> Result { + let request = PollActivityTaskQueueRequest { + namespace: self.opts.namespace.clone(), + task_queue: Some(TaskQueue { + name: task_queue, + kind: TaskQueueKind::Unspecified as i32, + }), + identity: self.opts.identity.clone(), + task_queue_metadata: None, + }; + + Ok(PollTaskResponse::ActivityTask( + self.service + .clone() + .poll_activity_task_queue(request) + .await? + .into_inner(), + )) } async fn complete_workflow_task( diff --git a/src/workflow/driven_workflow.rs b/src/workflow/driven_workflow.rs index 4decbee1e..a541910a8 100644 --- a/src/workflow/driven_workflow.rs +++ b/src/workflow/driven_workflow.rs @@ -1,4 +1,4 @@ -use crate::protos::coresdk::SignalWorkflow; +use crate::protos::coresdk::{activity_task, SignalWorkflow}; use crate::{ machines::WFCommand, protos::coresdk::wf_activation_job, From d6171cfa556fc61b7c9a6a6c793c4454f094fa60 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Sun, 14 Mar 2021 23:07:54 -0700 Subject: [PATCH 08/22] Add complete activity task API to the core --- protos/local/core_interface.proto | 3 +- src/lib.rs | 48 +++++++++++++++++++++++-------- src/pollers/mod.rs | 29 +++++++++++++++++-- 3 files changed, 64 insertions(+), 16 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index f54e8679d..4d9e6ed83 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -189,8 +189,7 @@ message WFActivationCompletion { message ActivityResult { oneof status { ActivityTaskSuccess completed = 1; - ActivityTaskCancelation canceled = 2; - ActivityTaskFailure failed = 3; + ActivityTaskFailure failed = 2; } } diff --git a/src/lib.rs b/src/lib.rs index f777a5baf..2c8a9cf73 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,15 +27,15 @@ pub use pollers::{ }; pub use url::Url; -use crate::protos::coresdk::ActivityTask; +use crate::protos::coresdk::{activity_result, ActivityTask}; use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use crate::{ machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ - task_completion, wf_activation_completion::Status, ActivityResult, Task, - TaskCompletion, WfActivationCompletion, WfActivationSuccess, + task_completion, wf_activation_completion, ActivityResult, Task, TaskCompletion, + WfActivationCompletion, WfActivationSuccess, }, temporal::api::{ enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse, @@ -81,6 +81,9 @@ pub trait Core: Send + Sync { /// code or executing an activity. fn complete_task(&self, req: TaskCompletion) -> Result<()>; + /// Tell the core that activity has completed. This will result in core calling the server and completing activity synchronously. + fn complete_activity_task(&self, req: TaskCompletion) -> Result<()>; + /// Returns an instance of ServerGateway. fn server_gateway(&self) -> Result>; @@ -230,7 +233,7 @@ where .map(|x| x.value().clone()) .ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => { + wf_activation_completion::Status::Successful(success) => { let commands = self.push_lang_commands(&run_id, success)?; // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught @@ -242,7 +245,7 @@ where )?; } } - Status::Failed(failure) => { + wf_activation_completion::Status::Failed(failure) => { // Blow up any cached data associated with the workflow self.evict_run(&run_id); @@ -258,13 +261,31 @@ where } Ok(()) } + _ => Err(CoreError::MalformedCompletion(req)), + } + } + + #[instrument(skip(self))] + fn complete_activity_task(&self, req: TaskCompletion) -> Result<()> { + match req { TaskCompletion { task_token, variant: Some(task_completion::Variant::Activity(ActivityResult { - status: Some(activity_status), + status: Some(status), })), - } => unimplemented!(), + } => { + match status { + activity_result::Status::Completed(success) => { + self.runtime.block_on( + self.server_gateway + .complete_activity_task(task_token, success.result), + )?; + } + activity_result::Status::Failed(_) => unimplemented!(), + } + Ok(()) + } _ => Err(CoreError::MalformedCompletion(req)), } } @@ -843,11 +864,14 @@ mod test { let res = core.poll_task(TASK_Q).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }, WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }] + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + } + ] ); } } diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 4b29fb470..0efb1d25a 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -2,8 +2,9 @@ use std::time::Duration; use crate::protos::temporal::api::common::v1::{Payloads, WorkflowExecution}; use crate::protos::temporal::api::workflowservice::v1::{ - PollActivityTaskQueueRequest, PollActivityTaskQueueResponse, SignalWorkflowExecutionRequest, - SignalWorkflowExecutionResponse, + PollActivityTaskQueueRequest, PollActivityTaskQueueResponse, + RespondActivityTaskCompletedRequest, RespondActivityTaskCompletedResponse, + SignalWorkflowExecutionRequest, SignalWorkflowExecutionResponse, }; use crate::{ machines::ProtoCommand, @@ -118,6 +119,12 @@ pub trait ServerGatewayApis { commands: Vec, ) -> Result; + async fn complete_activity_task( + &self, + task_token: Vec, + result: Option, + ) -> Result; + /// Fail task by sending the failure to the server. `task_token` is the task token that would've /// been received from [PollWorkflowTaskQueueApi::poll]. async fn fail_workflow_task( @@ -305,4 +312,22 @@ impl ServerGatewayApis for ServerGateway { .await? .into_inner()) } + + async fn complete_activity_task( + &self, + task_token: Vec, + result: Option, + ) -> Result { + Ok(self + .service + .clone() + .respond_activity_task_completed(RespondActivityTaskCompletedRequest { + task_token, + result, + identity: self.opts.identity.clone(), + namespace: self.opts.namespace.clone(), + }) + .await? + .into_inner()) + } } From e48d1c45d00c6bb745c42c5df7bfc4c09d07467b Mon Sep 17 00:00:00 2001 From: Vitaly Date: Sun, 14 Mar 2021 23:20:52 -0700 Subject: [PATCH 09/22] Return cancellation result back --- protos/local/core_interface.proto | 1 + 1 file changed, 1 insertion(+) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 4d9e6ed83..bbfa7b891 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -190,6 +190,7 @@ message ActivityResult { oneof status { ActivityTaskSuccess completed = 1; ActivityTaskFailure failed = 2; + ActivityTaskCancelation canceled = 3; } } From 565d692deb2924ae00a2db2d96fca80d351be25e Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 15 Mar 2021 01:20:01 -0700 Subject: [PATCH 10/22] State machine changes for activity completion --- protos/local/core_interface.proto | 2 +- src/machines/activity_state_machine.rs | 82 +++++++++++++++----------- src/machines/workflow_machines.rs | 7 --- 3 files changed, 49 insertions(+), 42 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index bbfa7b891..2fe4e0a0a 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -185,7 +185,7 @@ message WFActivationCompletion { } } -/// Used to report activity completion to core and to resolve the activity in a workflow activtion +/// Used to report activity completion to core and to resolve the activity in a workflow activation message ActivityResult { oneof status { ActivityTaskSuccess completed = 1; diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 9f2220b84..62100cf9a 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,10 +1,15 @@ use crate::machines::workflow_machines::MachineResponse; -use crate::machines::workflow_machines::MachineResponse::PushActivityJob; use crate::machines::{Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError}; -use crate::protos::coresdk::{activity_task, CancelActivity, StartActivity}; +use crate::protos::coresdk::{ + activity_result, activity_task, ActivityResult, ActivityTaskSuccess, CancelActivity, + ResolveActivity, StartActivity, +}; use crate::protos::temporal::api::command::v1::{Command, ScheduleActivityTaskCommandAttributes}; +use crate::protos::temporal::api::common::v1::Payloads; use crate::protos::temporal::api::enums::v1::{CommandType, EventType}; -use crate::protos::temporal::api::history::v1::HistoryEvent; +use crate::protos::temporal::api::history::v1::{ + history_event, ActivityTaskCompletedEventAttributes, HistoryEvent, +}; use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; use std::convert::TryFrom; @@ -12,7 +17,7 @@ use std::convert::TryFrom; fsm! { pub(super) name ActivityMachine; - command ActivityCommand; + command ActivityMachineCommand; error WFMachinesError; shared_state SharedState; @@ -27,7 +32,7 @@ fsm! { ScheduledEventRecorded --(ActivityTaskTimedOut, on_task_timed_out) --> TimedOut; ScheduledEventRecorded --(Cancel, on_canceled) --> ScheduledActivityCancelCommandCreated; - Started --(ActivityTaskCompleted, on_activity_task_completed) --> Completed; + Started --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; Started --(ActivityTaskFailed, on_activity_task_failed) --> Failed; Started --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; Started --(Cancel, on_canceled) --> StartedActivityCancelCommandCreated; @@ -53,7 +58,7 @@ fsm! { StartedActivityCancelEventRecorded --(ActivityTaskFailed, on_activity_task_failed) --> Failed; StartedActivityCancelEventRecorded - --(ActivityTaskCompleted, on_activity_task_completed) --> Completed; + --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; StartedActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; StartedActivityCancelEventRecorded @@ -61,9 +66,9 @@ fsm! { } #[derive(Debug, derive_more::Display)] -pub(super) enum ActivityCommand { - StartActivity, - CancelActivity, +pub(super) enum ActivityMachineCommand { + #[display(fmt = "Complete")] + Complete(Option), } #[derive(Debug, Clone, derive_more::Display)] @@ -127,7 +132,19 @@ impl TryFrom for ActivityMachineEvents { Ok(match EventType::from_i32(e.event_type) { Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled, Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted, - Some(EventType::ActivityTaskCompleted) => Self::ActivityTaskCompleted, + Some(EventType::ActivityTaskCompleted) => { + if let Some(history_event::Attributes::ActivityTaskCompletedEventAttributes( + attrs, + )) = e.attributes + { + Self::ActivityTaskCompleted(attrs) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Activity completion attributes were unset".to_string(), + )); + } + } Some(EventType::ActivityTaskFailed) => Self::ActivityTaskFailed, Some(EventType::ActivityTaskTimedOut) => Self::ActivityTaskTimedOut, Some(EventType::ActivityTaskCancelRequested) => Self::ActivityTaskCancelRequested, @@ -159,23 +176,18 @@ impl WFMachinesAdapter for ActivityMachine { &self, event: &HistoryEvent, has_next_event: bool, - my_command: ActivityCommand, + my_command: ActivityMachineCommand, ) -> Result, WFMachinesError> { Ok(match my_command { - ActivityCommand::StartActivity => { - vec![PushActivityJob(activity_task::Variant::Start( - StartActivity { - // TODO pass activity details - }, - ))] - } - ActivityCommand::CancelActivity => { - vec![PushActivityJob(activity_task::Variant::Cancel( - CancelActivity { - // TODO pass activity cancellation details - }, - ))] + ActivityMachineCommand::Complete(result) => vec![ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Completed(ActivityTaskSuccess { + result, + })), + }), } + .into()], }) } } @@ -190,13 +202,6 @@ impl Cancellable for ActivityMachine { } } -#[derive(Debug, derive_more::Display)] -pub(super) enum ActivityMachineCommand { - Complete, - Canceled, - IssueCancelCmd(Command), -} - #[derive(Default, Clone)] pub(super) struct SharedState { attrs: ScheduleActivityTaskCommandAttributes, @@ -250,9 +255,15 @@ impl ScheduledEventRecorded { pub(super) struct Started {} impl Started { - pub(super) fn on_activity_task_completed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_completed( + self, + attrs: ActivityTaskCompletedEventAttributes, + ) -> ActivityMachineTransition { // notify_completed - ActivityMachineTransition::default::() + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::Complete(attrs.result)], + Completed::default(), + ) } pub(super) fn on_activity_task_failed(self) -> ActivityMachineTransition { // notify_failed @@ -312,7 +323,10 @@ impl StartedActivityCancelCommandCreated { pub(super) struct StartedActivityCancelEventRecorded {} impl StartedActivityCancelEventRecorded { - pub(super) fn on_activity_task_completed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_completed( + self, + attrs: ActivityTaskCompletedEventAttributes, + ) -> ActivityMachineTransition { // notify_completed ActivityMachineTransition::default::() } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 83dfbb872..b7aef6e43 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -92,10 +92,6 @@ pub enum MachineResponse { #[display(fmt = "PushWFJob")] PushWFJob(#[from(forward)] wf_activation_job::Variant), - // TODO remove and use IssueNewCommand - #[display(fmt = "PushActivityJob")] - PushActivityJob(activity_task::Variant), - IssueNewCommand(ProtoCommand), #[display(fmt = "TriggerWFTaskStarted")] TriggerWFTaskStarted { @@ -524,9 +520,6 @@ impl WorkflowMachines { MachineResponse::IssueNewCommand(_) => { panic!("Issue new command machine response not expected here") } - MachineResponse::PushActivityJob(a) => { - unimplemented!() - } } } Ok(()) From df3d62475de96ab862a1d0c182f897aa5453516c Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 15 Mar 2021 01:20:19 -0700 Subject: [PATCH 11/22] update pattern --- src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 2c8a9cf73..2b4d15d05 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,6 +27,7 @@ pub use pollers::{ }; pub use url::Url; +use crate::protos::coresdk::activity_result::Status; use crate::protos::coresdk::{activity_result, ActivityTask}; use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use crate::{ @@ -282,7 +283,7 @@ where .complete_activity_task(task_token, success.result), )?; } - activity_result::Status::Failed(_) => unimplemented!(), + _ => unimplemented!(), } Ok(()) } From 62d9b3fffc2a15d9ca051a7d0d2211379ec05f0d Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 15 Mar 2021 01:38:26 -0700 Subject: [PATCH 12/22] Add handling for failure/cancelation in completion API --- protos/local/core_interface.proto | 1 + src/lib.rs | 21 ++++++++++--- src/pollers/mod.rs | 50 +++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 2fe4e0a0a..797fba8b5 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -231,6 +231,7 @@ message WFActivationFailure { /// Used in ActivityResult to report cancellation message ActivityTaskCancelation { + temporal.api.common.v1.Payloads details = 1; } /// Used in ActivityResult to report successful completion diff --git a/src/lib.rs b/src/lib.rs index 2b4d15d05..ff7f86be9 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -28,7 +28,10 @@ pub use pollers::{ pub use url::Url; use crate::protos::coresdk::activity_result::Status; -use crate::protos::coresdk::{activity_result, ActivityTask}; +use crate::protos::coresdk::{ + activity_result, ActivityTask, ActivityTaskCancelation, ActivityTaskFailure, + ActivityTaskSuccess, +}; use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use crate::{ machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, @@ -277,13 +280,23 @@ where })), } => { match status { - activity_result::Status::Completed(success) => { + activity_result::Status::Completed(ActivityTaskSuccess { result }) => { + self.runtime.block_on( + self.server_gateway + .complete_activity_task(task_token, result), + )?; + } + activity_result::Status::Failed(ActivityTaskFailure { failure }) => { + self.runtime.block_on( + self.server_gateway.fail_activity_task(task_token, failure), + )?; + } + activity_result::Status::Canceled(ActivityTaskCancelation { details }) => { self.runtime.block_on( self.server_gateway - .complete_activity_task(task_token, success.result), + .cancel_activity_task(task_token, details), )?; } - _ => unimplemented!(), } Ok(()) } diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 0efb1d25a..41cd99106 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -3,7 +3,9 @@ use std::time::Duration; use crate::protos::temporal::api::common::v1::{Payloads, WorkflowExecution}; use crate::protos::temporal::api::workflowservice::v1::{ PollActivityTaskQueueRequest, PollActivityTaskQueueResponse, + RespondActivityTaskCanceledRequest, RespondActivityTaskCanceledResponse, RespondActivityTaskCompletedRequest, RespondActivityTaskCompletedResponse, + RespondActivityTaskFailedRequest, RespondActivityTaskFailedResponse, SignalWorkflowExecutionRequest, SignalWorkflowExecutionResponse, }; use crate::{ @@ -125,6 +127,18 @@ pub trait ServerGatewayApis { result: Option, ) -> Result; + async fn cancel_activity_task( + &self, + task_token: Vec, + details: Option, + ) -> Result; + + async fn fail_activity_task( + &self, + task_token: Vec, + failure: Option, + ) -> Result; + /// Fail task by sending the failure to the server. `task_token` is the task token that would've /// been received from [PollWorkflowTaskQueueApi::poll]. async fn fail_workflow_task( @@ -330,4 +344,40 @@ impl ServerGatewayApis for ServerGateway { .await? .into_inner()) } + + async fn cancel_activity_task( + &self, + task_token: Vec, + details: Option, + ) -> Result { + Ok(self + .service + .clone() + .respond_activity_task_canceled(RespondActivityTaskCanceledRequest { + task_token, + details, + identity: self.opts.identity.clone(), + namespace: self.opts.namespace.clone(), + }) + .await? + .into_inner()) + } + + async fn fail_activity_task( + &self, + task_token: Vec, + failure: Option, + ) -> Result { + Ok(self + .service + .clone() + .respond_activity_task_failed(RespondActivityTaskFailedRequest { + task_token, + failure, + identity: self.opts.identity.clone(), + namespace: self.opts.namespace.clone(), + }) + .await? + .into_inner()) + } } From 10546431c2c0d32e20001e32898d6a9232635bfd Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 17 Mar 2021 23:56:50 -0700 Subject: [PATCH 13/22] End to end flow with test --- src/lib.rs | 61 +++++++++++++++++++++++--- src/machines/activity_state_machine.rs | 5 ++- src/machines/mod.rs | 11 ++++- src/machines/test_help/mod.rs | 2 +- src/machines/workflow_machines.rs | 10 ++--- src/pollers/mod.rs | 17 ++++++- src/protos/mod.rs | 35 +++++++++++++++ src/test_help/canned_histories.rs | 1 + src/workflow/driven_workflow.rs | 2 +- 9 files changed, 127 insertions(+), 17 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ff7f86be9..cea2ce581 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,12 +27,9 @@ pub use pollers::{ }; pub use url::Url; -use crate::protos::coresdk::activity_result::Status; use crate::protos::coresdk::{ - activity_result, ActivityTask, ActivityTaskCancelation, ActivityTaskFailure, - ActivityTaskSuccess, + activity_result, task, ActivityTaskCancelation, ActivityTaskFailure, ActivityTaskSuccess, }; -use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use crate::{ machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, @@ -213,7 +210,7 @@ where let task_token = work.task_token.clone(); Ok(Task { task_token, - variant: None, + variant: Some(task::Variant::Activity(work.into())), }) } Err(e) => Err(e), @@ -450,7 +447,9 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; - use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; + use crate::protos::temporal::api::command::v1::{ + FailWorkflowExecutionCommandAttributes, ScheduleActivityTaskCommandAttributes, + }; use crate::{ machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ @@ -483,6 +482,14 @@ mod test { build_fake_core(wfid, RUN_ID, &mut t, hist_batches) } + #[fixture(hist_batches = &[])] + fn single_activity_setup(hist_batches: &[usize]) -> FakeCore { + let wfid = "fake_wf_id"; + + let mut t = canned_histories::single_activity("fake_activity"); + build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + } + #[rstest(core, case::incremental(single_timer_setup(&[1, 2])), case::replay(single_timer_setup(&[2])) @@ -523,7 +530,47 @@ mod test { .unwrap(); } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(core, + case::incremental(single_activity_setup(& [1, 2])), + case::replay(single_activity_setup(& [2])) + )] + fn single_activity_completion(core: FakeCore) { + let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }] + ); + assert!(core.workflow_machines.exists(RUN_ID)); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ScheduleActivityTaskCommandAttributes { + activity_id: "fake_activity".to_string(), + ..Default::default() + } + .into()], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity(_)), + }] + ); + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } + + #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 62100cf9a..731035e11 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -328,7 +328,10 @@ impl StartedActivityCancelEventRecorded { attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { // notify_completed - ActivityMachineTransition::default::() + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::Complete(attrs.result)], + Completed::default(), + ) } pub(super) fn on_activity_task_failed(self) -> ActivityMachineTransition { // notify_failed diff --git a/src/machines/mod.rs b/src/machines/mod.rs index f371c33ea..86df95148 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -34,7 +34,8 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; use crate::protos::temporal::api::command::v1::{ - FailWorkflowExecutionCommandAttributes, ScheduleActivityTaskCommandAttributes, + FailWorkflowExecutionCommandAttributes, RequestCancelActivityTaskCommandAttributes, + ScheduleActivityTaskCommandAttributes, }; use crate::{ core_tracing::VecDisplayer, @@ -63,10 +64,12 @@ pub(crate) type ProtoCommand = Command; /// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. /// EX: Create a new timer, complete the workflow, etc. #[derive(Debug, derive_more::From)] +#[allow(clippy::large_enum_variant)] pub enum WFCommand { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, AddActivity(ScheduleActivityTaskCommandAttributes), + RequestCancelActivity(RequestCancelActivityTaskCommandAttributes), AddTimer(StartTimerCommandAttributes), CancelTimer(CancelTimerCommandAttributes), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), @@ -88,6 +91,12 @@ impl TryFrom for WFCommand { })) => match attrs { Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), Attributes::CancelTimerCommandAttributes(s) => Ok(WFCommand::CancelTimer(s)), + Attributes::ScheduleActivityTaskCommandAttributes(s) => { + Ok(WFCommand::AddActivity(s)) + } + Attributes::RequestCancelActivityTaskCommandAttributes(s) => { + Ok(WFCommand::RequestCancelActivity(s)) + } Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { Ok(WFCommand::CompleteWorkflow(c)) } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index d0ba92241..2f9e75108 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -57,7 +57,7 @@ pub(crate) fn build_fake_core( let mut tasks = VecDeque::from(responses); let mut mock_gateway = MockServerGatewayApis::new(); mock_gateway - .expect_poll_workflow_task() + .expect_poll_task() .times(response_batches.len()) .returning(move |_| Ok(PollTaskResponse::WorkflowTask(tasks.pop_front().unwrap()))); // Response not really important here diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index b7aef6e43..0fece9690 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,5 +1,4 @@ use crate::machines::activity_state_machine::new_activity; -use crate::protos::coresdk::activity_task; use crate::workflow::{DrivenWorkflow, WorkflowFetcher}; use crate::{ core_tracing::VecDisplayer, @@ -10,7 +9,7 @@ use crate::{ TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{wf_activation_job, ActivityTask, StartWorkflow, UpdateRandomSeed, WfActivation}, + coresdk::{wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation}, temporal::api::{ enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent}, @@ -163,7 +162,7 @@ impl WorkflowMachines { /// is the last event in the history. /// /// TODO: Describe what actually happens in here - #[instrument(level = "debug", skip(self), fields(run_id = %self.run_id))] + #[instrument(level = "debug", skip(self), fields(run_id = % self.run_id))] pub(crate) fn handle_event( &mut self, event: &HistoryEvent, @@ -371,7 +370,7 @@ impl WorkflowMachines { return Err(WFMachinesError::UnexpectedEvent( event.clone(), "The event is non a non-stateful event, but we tried to handle it as one", - )) + )); } } Ok(()) @@ -557,7 +556,7 @@ impl WorkflowMachines { return Err(WFMachinesError::UnexpectedMachineResponse( v, "When cancelling timer", - )) + )); } } } @@ -567,6 +566,7 @@ impl WorkflowMachines { self.activity_id_to_machine.insert(aid, activity.machine); self.current_wf_task_commands.push_back(activity); } + WFCommand::RequestCancelActivity(_) => unimplemented!(), WFCommand::CompleteWorkflow(attrs) => { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 41cd99106..e52eb7b07 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -113,7 +113,7 @@ pub trait ServerGatewayApis { async fn poll_activity_task(&self, task_queue: String) -> Result; /// Complete a task by sending it to the server. `task_token` is the task token that would've - /// been received from [PollWorkflowTaskQueueApi::poll]. `commands` is a list of new commands + /// been received from [poll_workflow_task_queue] API. `commands` is a list of new commands /// to send to the server, such as starting a timer. async fn complete_workflow_task( &self, @@ -121,18 +121,27 @@ pub trait ServerGatewayApis { commands: Vec, ) -> Result; + /// Complete activity task by sending response to the server. `task_token` contains activity identifier that + /// would've been received from [poll_activity_task_queue] API. `result` is a blob that contains + /// activity response. async fn complete_activity_task( &self, task_token: Vec, result: Option, ) -> Result; + /// Cancel activity task by sending response to the server. `task_token` contains activity identifier that + /// would've been received from [poll_activity_task_queue] API. `details` is a blob that provides arbitrary + /// user defined cancellation info. async fn cancel_activity_task( &self, task_token: Vec, details: Option, ) -> Result; + /// Fail activity task by sending response to the server. `task_token` contains activity identifier that + /// would've been received from [poll_activity_task_queue] API. `failure` provides failure details, such + /// as message, cause and stack trace. async fn fail_activity_task( &self, task_token: Vec, @@ -158,13 +167,19 @@ pub trait ServerGatewayApis { ) -> Result; } +/// Contains poll task request. String parameter defines a task queue to be polled. pub enum PollTaskRequest { + /// Instructs core to poll for workflow task. Workflow(String), + /// Instructs core to poll for activity task. Activity(String), } +/// Contains poll task response. pub enum PollTaskResponse { + /// Poll response for workflow task. WorkflowTask(PollWorkflowTaskQueueResponse), + /// Poll response for activity task. ActivityTask(PollActivityTaskQueueResponse), } diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 87d82b5a9..df206f310 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -104,7 +104,9 @@ pub mod temporal { pub mod command { pub mod v1 { include!("temporal.api.command.v1.rs"); + use crate::protos::coresdk::{activity_task, ActivityTask, StartActivity}; use crate::protos::temporal::api::enums::v1::CommandType; + use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use command::Attributes; use std::fmt::{Display, Formatter}; @@ -127,11 +129,44 @@ pub mod temporal { command_type: CommandType::FailWorkflowExecution as i32, attributes: Some(a), }, + a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self { + command_type: CommandType::ScheduleActivityTask as i32, + attributes: Some(a), + }, + a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self { + command_type: CommandType::RequestCancelActivityTask as i32, + attributes: Some(a), + }, _ => unimplemented!(), } } } + impl From for ActivityTask { + fn from(r: PollActivityTaskQueueResponse) -> Self { + ActivityTask { + activity_id: r.activity_id, + variant: Some(activity_task::Variant::Start(StartActivity { + workflow_namespace: r.workflow_namespace, + workflow_type: r.workflow_type, + workflow_execution: r.workflow_execution, + activity_type: r.activity_type, + header: r.header, + input: r.input, + heartbeat_details: r.heartbeat_details, + scheduled_time: r.scheduled_time, + current_attempt_scheduled_time: r.current_attempt_scheduled_time, + started_time: r.started_time, + attempt: r.attempt, + schedule_to_close_timeout: r.schedule_to_close_timeout, + start_to_close_timeout: r.start_to_close_timeout, + heartbeat_timeout: r.heartbeat_timeout, + retry_policy: r.retry_policy, + })), + } + } + } + impl Display for Command { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let ct = CommandType::from_i32(self.command_type) diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index ed8df16c1..a2cbdf5a8 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -194,6 +194,7 @@ pub fn single_activity(activity_id: &str) -> TestHistoryBuilder { ActivityTaskCompletedEventAttributes { scheduled_event_id, started_event_id, + // todo add the result payload ..Default::default() }, ), diff --git a/src/workflow/driven_workflow.rs b/src/workflow/driven_workflow.rs index a541910a8..4decbee1e 100644 --- a/src/workflow/driven_workflow.rs +++ b/src/workflow/driven_workflow.rs @@ -1,4 +1,4 @@ -use crate::protos::coresdk::{activity_task, SignalWorkflow}; +use crate::protos::coresdk::SignalWorkflow; use crate::{ machines::WFCommand, protos::coresdk::wf_activation_job, From f3bb00caf7dcb0e3937b5e7ead8f9477e49b5072 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 17 Mar 2021 23:57:42 -0700 Subject: [PATCH 14/22] fmt --- tests/integ_tests/simple_wf_tests.rs | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 36ded4eca..6bdc32630 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -460,14 +460,11 @@ fn signal_workflow() { let res = core.poll_task(task_q).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), - [ - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }, - WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - } - ] + [WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }] ); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], @@ -504,7 +501,7 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { res.get_wf_jobs().as_slice(), [WfActivationJob { variant: Some(wf_activation_job::Variant::FireTimer(_)), - },] + }] ); let task_token = res.task_token.clone(); @@ -535,7 +532,7 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { res.get_wf_jobs().as_slice(), [WfActivationJob { variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - },] + }] ); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], From b9616ddfbbf4b46f33e29e53b339f86404b6f8ee Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 16:52:39 -0700 Subject: [PATCH 15/22] Address some CR comments --- src/lib.rs | 4 ++-- src/machines/activity_state_machine.rs | 17 +++++++---------- src/machines/workflow_machines.rs | 21 +++++++++------------ tests/integ_tests/simple_wf_tests.rs | 13 ++++++++----- 4 files changed, 26 insertions(+), 29 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index cea2ce581..6eeddd7ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -531,8 +531,8 @@ mod test { } #[rstest(core, - case::incremental(single_activity_setup(& [1, 2])), - case::replay(single_activity_setup(& [2])) + case::incremental(single_activity_setup(&[1, 2])), + case::replay(single_activity_setup(&[2])) )] fn single_activity_completion(core: FakeCore) { let res = core.poll_task(TASK_Q).unwrap(); diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 731035e11..f17c17cc2 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -73,19 +73,15 @@ pub(super) enum ActivityMachineCommand { #[derive(Debug, Clone, derive_more::Display)] pub(super) enum ActivityCancellationType { - /** - * Wait for activity cancellation completion. Note that activity must heartbeat to receive a - * cancellation notification. This can block the cancellation for a long time if activity doesn't - * heartbeat or chooses to ignore the cancellation request. - */ + /// Wait for activity cancellation completion. Note that activity must heartbeat to receive a + /// cancellation notification. This can block the cancellation for a long time if activity doesn't + /// heartbeat or chooses to ignore the cancellation request. WaitCancellationCompleted, - /** Initiate a cancellation request and immediately report cancellation to the workflow. */ + /// Initiate a cancellation request and immediately report cancellation to the workflow. TryCancel, - /** - * Do not request cancellation of the activity and immediately report cancellation to the workflow - */ + /// Do not request cancellation of the activity and immediately report cancellation to the workflow Abandon, } @@ -95,7 +91,7 @@ impl Default for ActivityCancellationType { } } -/// Creates a new, scheduled, activity as a [CancellableCommand] +/// Creates a new activity state machine and a command to schedule it on the server. pub(super) fn new_activity( attribs: ScheduleActivityTaskCommandAttributes, ) -> NewMachineWithCommand { @@ -107,6 +103,7 @@ pub(super) fn new_activity( } impl ActivityMachine { + /// Create a new activity and immediately schedule it. pub(crate) fn new_scheduled(attribs: ScheduleActivityTaskCommandAttributes) -> (Self, Command) { let mut s = Self { state: Created {}.into(), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 0fece9690..c757d3adb 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -56,7 +56,7 @@ pub(crate) struct WorkflowMachines { /// Maps timer ids as created by workflow authors to their associated machines /// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum. - timer_id_to_machine: HashMap, + id_to_machine: HashMap, /// TODO document activity_id_to_machine: HashMap, @@ -146,7 +146,7 @@ impl WorkflowMachines { current_wf_time: None, all_machines: Default::default(), machines_by_event_id: Default::default(), - timer_id_to_machine: Default::default(), + id_to_machine: Default::default(), activity_id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), @@ -530,19 +530,16 @@ impl WorkflowMachines { WFCommand::AddTimer(attrs) => { let tid = attrs.timer_id.clone(); let timer = self.add_new_machine(new_timer(attrs)); - self.timer_id_to_machine.insert(tid, timer.machine); + self.id_to_machine.insert(tid, timer.machine); self.current_wf_task_commands.push_back(timer); } WFCommand::CancelTimer(attrs) => { - let mkey = *self - .timer_id_to_machine - .get(&attrs.timer_id) - .ok_or_else(|| { - WFMachinesError::MissingAssociatedMachine(format!( - "Missing associated machine for cancelling timer {}", - &attrs.timer_id - )) - })?; + let mkey = *self.id_to_machine.get(&attrs.timer_id).ok_or_else(|| { + WFMachinesError::MissingAssociatedMachine(format!( + "Missing associated machine for cancelling timer {}", + &attrs.timer_id + )) + })?; let res = self.machine_mut(mkey).cancel()?; match res { MachineResponse::IssueNewCommand(c) => { diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 6bdc32630..f2afb894f 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -460,11 +460,14 @@ fn signal_workflow() { let res = core.poll_task(task_q).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), - [WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }, WfActivationJob { - variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - }] + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + } + ] ); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], From bac0197e925b951f39746f34bed6241bf70bf9bb Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 17:00:43 -0700 Subject: [PATCH 16/22] Remove redundant test --- src/machines/activity_state_machine.rs | 52 -------------------------- 1 file changed, 52 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index f17c17cc2..4b62328e9 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -361,55 +361,3 @@ pub(super) struct TimedOut {} #[derive(Default, Clone)] pub(super) struct Canceled {} - -#[cfg(test)] -mod activity_machine_tests { - use crate::machines::test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}; - use crate::machines::WorkflowMachines; - use crate::protos::temporal::api::command::v1::CompleteWorkflowExecutionCommandAttributes; - use crate::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; - use crate::protos::temporal::api::enums::v1::CommandType; - use crate::test_help::canned_histories; - use rstest::{fixture, rstest}; - use tracing::Level; - - #[fixture] - fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { - let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { - let activity = ScheduleActivityTaskCommandAttributes { - activity_id: "activity A".to_string(), - ..Default::default() - }; - command_sink.activity(activity, true); - - let complete = CompleteWorkflowExecutionCommandAttributes::default(); - command_sink.send(complete.into()); - }); - - let t = canned_histories::single_activity("activity1"); - let state_machines = WorkflowMachines::new( - "wfid".to_string(), - "runid".to_string(), - Box::new(twd).into(), - ); - - assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); - (t, state_machines) - } - - #[rstest] - fn test_activity_happy_path(activity_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let s = span!(Level::DEBUG, "Test start", t = "activity_happy_path"); - let _enter = s.enter(); - let (t, mut state_machines) = activity_happy_hist; - let commands = t - .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) - .unwrap(); - state_machines.get_wf_activation(); - assert_eq!(commands.len(), 1); - assert_eq!( - commands[0].command_type, - CommandType::ScheduleActivityTask as i32 - ); - } -} From 891a18a28af12a551bb2ef699d9dcbfe07cea439 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 18:26:14 -0700 Subject: [PATCH 17/22] use command id enum as the issued_commands map key --- .../test_help/async_workflow_driver.rs | 32 +++++++++++-------- 1 file changed, 19 insertions(+), 13 deletions(-) diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 55bdade46..03f552ca6 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -21,6 +21,7 @@ use tokio::{ runtime::Runtime, task::{JoinError, JoinHandle}, }; +use CommandId::TimerId; pub struct TestWorkflowDriver { join_handle: Option>, @@ -36,23 +37,23 @@ struct TestWfDriverCache { impl TestWfDriverCache { /// Unblock a command by ID - fn unblock(&self, id: &str) { + fn unblock(&self, id: CommandId) { let mut bc = self.blocking_condvar.0.lock(); - if let Some(t) = bc.issued_commands.remove(id) { + if let Some(t) = bc.issued_commands.remove(&id) { t.unblocker.send(()).unwrap() }; } /// Cancel a timer by ID. Timers get some special handling here since they are always /// removed from the "lang" side without needing a response from core. - fn cancel_timer(&self, id: &str) { + fn cancel_timer(&self, id: CommandId) { let mut bc = self.blocking_condvar.0.lock(); - bc.issued_commands.remove(id); + bc.issued_commands.remove(&id); } /// Track a new command that the wf has sent down the command sink. The command starts in /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed - fn add_sent_cmd(&self, id: String) -> oneshot::Receiver<()> { + fn add_sent_cmd(&self, id: CommandId) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); let mut bc = self.blocking_condvar.0.lock(); bc.issued_commands.insert( @@ -66,9 +67,9 @@ impl TestWfDriverCache { } /// Indicate that a command is being `await`ed - fn set_cmd_blocked(&self, id: &str) { + fn set_cmd_blocked(&self, id: CommandId) { let mut bc = self.blocking_condvar.0.lock(); - if let Some(cmd) = bc.issued_commands.get_mut(id) { + if let Some(cmd) = bc.issued_commands.get_mut(&id) { cmd.status = CommandStatus::Blocked; } // Wake up the fetcher thread, since we have blocked on a command and that would mean we've @@ -77,14 +78,19 @@ impl TestWfDriverCache { } } +#[derive(Debug, PartialEq, Eq, Hash)] +enum CommandId { + TimerId(String), + ActivityId(String), +} + /// Contains the info needed to know if workflow code is "done" being iterated or not. A workflow /// iteration is considered complete if the workflow exits, or the top level task (the main codepath /// of the workflow) is blocked waiting on a command). #[derive(Default, Debug)] struct BlockingCondInfo { /// Holds a mapping of timer id -> oneshot channel to resolve it - /// TODO: Upgrade w/ enum key once activities are in - issued_commands: HashMap, + issued_commands: HashMap, wf_is_done: bool, } @@ -130,10 +136,10 @@ impl CommandSender { let tid = a.timer_id.clone(); let c = WFCommand::AddTimer(a); self.send(c); - let rx = self.twd_cache.add_sent_cmd(tid.clone()); + let rx = self.twd_cache.add_sent_cmd(TimerId(tid.clone())); let cache_clone = self.twd_cache.clone(); async move { - cache_clone.set_cmd_blocked(&tid); + cache_clone.set_cmd_blocked(TimerId(tid)); rx.await } } @@ -143,7 +149,7 @@ impl CommandSender { let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { timer_id: timer_id.to_owned(), }); - self.twd_cache.cancel_timer(timer_id); + self.twd_cache.cancel_timer(TimerId(timer_id.to_string())); self.send(c); } } @@ -240,7 +246,7 @@ impl WorkflowFetcher for TestWorkflowDriver { impl ActivationListener for TestWorkflowDriver { fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { - self.cache.unblock(timer_id); + self.cache.unblock(TimerId(timer_id.to_owned())); } } } From a6103c1169b3df082c567e725182ed4ef9f97489 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 18:30:30 -0700 Subject: [PATCH 18/22] rename enum params --- .../test_help/async_workflow_driver.rs | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 03f552ca6..596d83d94 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -21,7 +21,7 @@ use tokio::{ runtime::Runtime, task::{JoinError, JoinHandle}, }; -use CommandId::TimerId; +use CommandID::Timer; pub struct TestWorkflowDriver { join_handle: Option>, @@ -37,7 +37,7 @@ struct TestWfDriverCache { impl TestWfDriverCache { /// Unblock a command by ID - fn unblock(&self, id: CommandId) { + fn unblock(&self, id: CommandID) { let mut bc = self.blocking_condvar.0.lock(); if let Some(t) = bc.issued_commands.remove(&id) { t.unblocker.send(()).unwrap() @@ -46,14 +46,14 @@ impl TestWfDriverCache { /// Cancel a timer by ID. Timers get some special handling here since they are always /// removed from the "lang" side without needing a response from core. - fn cancel_timer(&self, id: CommandId) { + fn cancel_timer(&self, id: CommandID) { let mut bc = self.blocking_condvar.0.lock(); bc.issued_commands.remove(&id); } /// Track a new command that the wf has sent down the command sink. The command starts in /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed - fn add_sent_cmd(&self, id: CommandId) -> oneshot::Receiver<()> { + fn add_sent_cmd(&self, id: CommandID) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); let mut bc = self.blocking_condvar.0.lock(); bc.issued_commands.insert( @@ -67,7 +67,7 @@ impl TestWfDriverCache { } /// Indicate that a command is being `await`ed - fn set_cmd_blocked(&self, id: CommandId) { + fn set_cmd_blocked(&self, id: CommandID) { let mut bc = self.blocking_condvar.0.lock(); if let Some(cmd) = bc.issued_commands.get_mut(&id) { cmd.status = CommandStatus::Blocked; @@ -79,9 +79,9 @@ impl TestWfDriverCache { } #[derive(Debug, PartialEq, Eq, Hash)] -enum CommandId { - TimerId(String), - ActivityId(String), +enum CommandID { + Timer(String), + Activity(String), } /// Contains the info needed to know if workflow code is "done" being iterated or not. A workflow @@ -90,7 +90,7 @@ enum CommandId { #[derive(Default, Debug)] struct BlockingCondInfo { /// Holds a mapping of timer id -> oneshot channel to resolve it - issued_commands: HashMap, + issued_commands: HashMap, wf_is_done: bool, } @@ -136,10 +136,10 @@ impl CommandSender { let tid = a.timer_id.clone(); let c = WFCommand::AddTimer(a); self.send(c); - let rx = self.twd_cache.add_sent_cmd(TimerId(tid.clone())); + let rx = self.twd_cache.add_sent_cmd(Timer(tid.clone())); let cache_clone = self.twd_cache.clone(); async move { - cache_clone.set_cmd_blocked(TimerId(tid)); + cache_clone.set_cmd_blocked(Timer(tid)); rx.await } } @@ -149,7 +149,7 @@ impl CommandSender { let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { timer_id: timer_id.to_owned(), }); - self.twd_cache.cancel_timer(TimerId(timer_id.to_string())); + self.twd_cache.cancel_timer(Timer(timer_id.to_string())); self.send(c); } } @@ -246,7 +246,7 @@ impl WorkflowFetcher for TestWorkflowDriver { impl ActivationListener for TestWorkflowDriver { fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { - self.cache.unblock(TimerId(timer_id.to_owned())); + self.cache.unblock(Timer(timer_id.to_owned())); } } } From 81c73eda9b5857878b2a2d262118d426a331f113 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 20:12:05 -0700 Subject: [PATCH 19/22] Add end-to-end integration test --- src/pollers/mod.rs | 2 +- src/protos/mod.rs | 20 ++++++++ tests/integ_tests/simple_wf_tests.rs | 74 +++++++++++++++++++++++++++- 3 files changed, 94 insertions(+), 2 deletions(-) diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index e52eb7b07..f057ffdc5 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -250,7 +250,7 @@ impl ServerGatewayApis for ServerGateway { namespace: self.opts.namespace.clone(), task_queue: Some(TaskQueue { name: task_queue, - kind: TaskQueueKind::Unspecified as i32, + kind: TaskQueueKind::Normal as i32, }), identity: self.opts.identity.clone(), task_queue_metadata: None, diff --git a/src/protos/mod.rs b/src/protos/mod.rs index df206f310..223b1dbf9 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -13,6 +13,7 @@ pub mod coresdk { use super::temporal::api::command::v1::Command as ApiCommand; use super::temporal::api::enums::v1::WorkflowTaskFailedCause; use super::temporal::api::failure::v1::Failure; + use crate::protos::temporal::api::common::v1::Payloads; use command::Variant; pub type HistoryEventId = i64; @@ -33,6 +34,14 @@ pub mod coresdk { vec![] } } + /// Returns any contained jobs if this task was a wf activation and it had some + pub fn get_activity_variant(&self) -> Option { + if let Some(task::Variant::Activity(a)) = &self.variant { + a.variant.clone() + } else { + None + } + } /// Returns the workflow run id if the task was a workflow pub fn get_run_id(&self) -> Option<&str> { @@ -79,6 +88,17 @@ pub mod coresdk { } } + pub fn ok_activity(result: Option, task_token: Vec) -> Self { + TaskCompletion { + task_token, + variant: Some(task_completion::Variant::Activity(ActivityResult { + status: Some(activity_result::Status::Completed(ActivityTaskSuccess { + result, + })), + })), + } + } + pub fn fail(task_token: Vec, cause: WorkflowTaskFailedCause, failure: Failure) -> Self { Self { task_token, diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index f2afb894f..defb1a8d9 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -11,10 +11,14 @@ use std::{ }, time::Duration, }; +use temporal_sdk_core::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; +use temporal_sdk_core::protos::temporal::api::common::v1::{ActivityType, Payload, Payloads}; +use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::{ protos::{ coresdk::{ - wf_activation_job, FireTimer, StartWorkflow, Task, TaskCompletion, WfActivationJob, + activity_result, activity_task, wf_activation_job, ActivityResult, ActivityTaskSuccess, + FireTimer, ResolveActivity, StartWorkflow, Task, TaskCompletion, WfActivationJob, }, temporal::api::{ command::v1::{ @@ -111,6 +115,74 @@ fn timer_workflow() { .unwrap(); } +#[test] +fn activity_workflow() { + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + let activity_id: String = rng.gen::().to_string(); + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ScheduleActivityTaskCommandAttributes { + activity_id: activity_id.to_string(), + activity_type: Some(ActivityType { + name: "test_activity".to_string(), + }), + namespace: NAMESPACE.to_owned(), + task_queue: Some(TaskQueue { + name: task_q.to_owned(), + kind: 1, + }), + schedule_to_start_timeout: Some(Duration::from_secs(30).into()), + start_to_close_timeout: Some(Duration::from_secs(30).into()), + schedule_to_close_timeout: Some(Duration::from_secs(60).into()), + heartbeat_timeout: Some(Duration::from_secs(60).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + let task = dbg!(core.poll_activity_task(task_q).unwrap()); + assert_matches!( + task.get_activity_variant(), + Some(activity_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, Some(ActivityType { + name: "test_activity".to_string(), + })) + } + ); + let response_payloads = vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }]; + core.complete_activity_task(TaskCompletion::ok_activity( + Some(Payloads { + payloads: response_payloads.clone(), + }), + task.task_token, + )) + .unwrap(); + let task = core.poll_task(task_q).unwrap(); + assert_matches!( + task.get_wf_jobs().as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity( + ResolveActivity {activity_id: a_id, result: Some(ActivityResult{ + status: Some(activity_result::Status::Completed(ActivityTaskSuccess{result: Some(r)}))})} + )), + }, + ] => { + assert_eq!(a_id, &activity_id); + assert_eq!(r, &Payloads{ payloads: response_payloads.clone()}); + } + ); +} + #[test] fn parallel_timer_workflow() { let task_q = "parallel_timer_workflow"; From 1fb36dccd52dc4fa3f36b3f968105e89387f2d8e Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 20:15:53 -0700 Subject: [PATCH 20/22] complete wf --- tests/integ_tests/simple_wf_tests.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index defb1a8d9..7ea3726cf 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -181,6 +181,11 @@ fn activity_workflow() { assert_eq!(r, &Payloads{ payloads: response_payloads.clone()}); } ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task.task_token, + )) + .unwrap() } #[test] From 64e1f900c4838231d93b266ad778b687fbae33cd Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 18 Mar 2021 23:03:54 -0700 Subject: [PATCH 21/22] make id to machine map keyed of command id --- .../test_help/async_workflow_driver.rs | 7 +--- src/machines/workflow_machines.rs | 33 +++++++++++-------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 596d83d94..c150fa7a6 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,3 +1,4 @@ +use crate::machines::workflow_machines::CommandID; use crate::{ machines::WFCommand, protos::{ @@ -78,12 +79,6 @@ impl TestWfDriverCache { } } -#[derive(Debug, PartialEq, Eq, Hash)] -enum CommandID { - Timer(String), - Activity(String), -} - /// Contains the info needed to know if workflow code is "done" being iterated or not. A workflow /// iteration is considered complete if the workflow exits, or the top level task (the main codepath /// of the workflow) is blocked waiting on a command). diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index c757d3adb..b25c285a4 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -56,10 +56,7 @@ pub(crate) struct WorkflowMachines { /// Maps timer ids as created by workflow authors to their associated machines /// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum. - id_to_machine: HashMap, - - /// TODO document - activity_id_to_machine: HashMap, + id_to_machine: HashMap, /// Queued commands which have been produced by machines and await processing / being sent to /// the server. @@ -75,6 +72,12 @@ pub(crate) struct WorkflowMachines { drive_me: DrivenWorkflow, } +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum CommandID { + Timer(String), + Activity(String), +} + slotmap::new_key_type! { struct MachineKey; } #[derive(Debug, derive_more::Display)] #[display(fmt = "Cmd&Machine({})", "command")] @@ -147,7 +150,6 @@ impl WorkflowMachines { all_machines: Default::default(), machines_by_event_id: Default::default(), id_to_machine: Default::default(), - activity_id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), } @@ -530,16 +532,20 @@ impl WorkflowMachines { WFCommand::AddTimer(attrs) => { let tid = attrs.timer_id.clone(); let timer = self.add_new_machine(new_timer(attrs)); - self.id_to_machine.insert(tid, timer.machine); + self.id_to_machine + .insert(CommandID::Timer(tid), timer.machine); self.current_wf_task_commands.push_back(timer); } WFCommand::CancelTimer(attrs) => { - let mkey = *self.id_to_machine.get(&attrs.timer_id).ok_or_else(|| { - WFMachinesError::MissingAssociatedMachine(format!( - "Missing associated machine for cancelling timer {}", - &attrs.timer_id - )) - })?; + let mkey = *self + .id_to_machine + .get(&CommandID::Timer(attrs.timer_id.to_owned())) + .ok_or_else(|| { + WFMachinesError::MissingAssociatedMachine(format!( + "Missing associated machine for cancelling timer {}", + &attrs.timer_id + )) + })?; let res = self.machine_mut(mkey).cancel()?; match res { MachineResponse::IssueNewCommand(c) => { @@ -560,7 +566,8 @@ impl WorkflowMachines { WFCommand::AddActivity(attrs) => { let aid = attrs.activity_id.clone(); let activity = self.add_new_machine(new_activity(attrs)); - self.activity_id_to_machine.insert(aid, activity.machine); + self.id_to_machine + .insert(CommandID::Activity(aid), activity.machine); self.current_wf_task_commands.push_back(activity); } WFCommand::RequestCancelActivity(_) => unimplemented!(), From 263b60e6d8385929a0882fd7426ee4e36d608edf Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 22 Mar 2021 16:05:53 -0700 Subject: [PATCH 22/22] Change parameter type + remove unused file --- src/machines/test_help/async_workflow_driver.rs | 6 +++--- src/machines/test_help/workflow_driver.rs | 1 - 2 files changed, 3 insertions(+), 4 deletions(-) delete mode 100644 src/machines/test_help/workflow_driver.rs diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index c150fa7a6..1b126530c 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -47,9 +47,9 @@ impl TestWfDriverCache { /// Cancel a timer by ID. Timers get some special handling here since they are always /// removed from the "lang" side without needing a response from core. - fn cancel_timer(&self, id: CommandID) { + fn cancel_timer(&self, id: &str) { let mut bc = self.blocking_condvar.0.lock(); - bc.issued_commands.remove(&id); + bc.issued_commands.remove(&CommandID::Timer(id.to_owned())); } /// Track a new command that the wf has sent down the command sink. The command starts in @@ -144,7 +144,7 @@ impl CommandSender { let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { timer_id: timer_id.to_owned(), }); - self.twd_cache.cancel_timer(Timer(timer_id.to_string())); + self.twd_cache.cancel_timer(timer_id); self.send(c); } } diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs deleted file mode 100644 index 8b1378917..000000000 --- a/src/machines/test_help/workflow_driver.rs +++ /dev/null @@ -1 +0,0 @@ -