From 4bdebda964babd55a437558a9bc1c5f2428abe04 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Tue, 30 Mar 2021 01:00:38 -0700 Subject: [PATCH 01/19] WIP activity cancellation --- protos/local/workflow_commands.proto | 14 ++ src/machines/activity_state_machine.rs | 228 +++++++++++++++++-------- src/machines/workflow_machines.rs | 45 +++-- src/protos/mod.rs | 38 +++++ 4 files changed, 245 insertions(+), 80 deletions(-) diff --git a/protos/local/workflow_commands.proto b/protos/local/workflow_commands.proto index 1d65d1839..7a9f0226c 100644 --- a/protos/local/workflow_commands.proto +++ b/protos/local/workflow_commands.proto @@ -70,10 +70,24 @@ message ScheduleActivity { /// configuration. Retries are happening up to schedule_to_close_timeout. To disable retries set /// retry_policy.maximum_attempts to 1. common.RetryPolicy retry_policy = 11; + /// Defines behaviour of the underlying workflow when activity cancellation has been requested. + ActivityCancellationType cancellation_type = 12; +} + +enum ActivityCancellationType { + /// Initiate a cancellation request and immediately report cancellation to the workflow. + TRY_CANCEL = 0; + /// Wait for activity cancellation completion. Note that activity must heartbeat to receive a + /// cancellation notification. This can block the cancellation for a long time if activity doesn't + /// heartbeat or chooses to ignore the cancellation request. + WAIT_CANCELLATION_COMPLETED = 1; + /// Do not request cancellation of the activity and immediately report cancellation to the workflow + ABANDON = 2; } message RequestCancelActivity { string activity_id = 1; + int64 scheduled_event_id = 2; } message QueryResult { diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 431f4e53c..04ed49df9 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,7 +1,11 @@ #![allow(clippy::large_enum_variant)] +use crate::protos::coresdk::workflow_commands::{ActivityCancellationType, RequestCancelActivity}; use crate::protos::coresdk::PayloadsExt; -use crate::protos::temporal::api::failure::v1::Failure; +use crate::protos::temporal::api::common::v1::ActivityType; +use crate::protos::temporal::api::enums::v1::RetryState; +use crate::protos::temporal::api::enums::v1::RetryState::CancelRequested; +use crate::protos::temporal::api::failure::v1::{failure, ActivityFailureInfo, Failure}; use crate::protos::temporal::api::history::v1::ActivityTaskCanceledEventAttributes; use crate::{ machines::{ @@ -41,17 +45,17 @@ fsm! { ScheduleCommandCreated --(CommandScheduleActivityTask) --> ScheduleCommandCreated; ScheduleCommandCreated - --(ActivityTaskScheduled, on_activity_task_scheduled) --> ScheduledEventRecorded; - ScheduleCommandCreated --(Cancel, on_canceled) --> Canceled; + --(ActivityTaskScheduled(i64), shared on_activity_task_scheduled) --> ScheduledEventRecorded; + ScheduleCommandCreated --(Cancel, shared on_canceled) --> Canceled; - ScheduledEventRecorded --(ActivityTaskStarted, on_task_started) --> Started; + ScheduledEventRecorded --(ActivityTaskStarted(i64), shared on_task_started) --> Started; ScheduledEventRecorded --(ActivityTaskTimedOut, on_task_timed_out) --> TimedOut; - ScheduledEventRecorded --(Cancel, on_canceled) --> ScheduledActivityCancelCommandCreated; + ScheduledEventRecorded --(Cancel, shared on_canceled) --> ScheduledActivityCancelCommandCreated; Started --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; Started --(ActivityTaskFailed(ActivityTaskFailedEventAttributes), on_activity_task_failed) --> Failed; Started --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; - Started --(Cancel, on_canceled) --> StartedActivityCancelCommandCreated; + Started --(Cancel, shared on_canceled) --> StartedActivityCancelCommandCreated; ScheduledActivityCancelCommandCreated --(CommandRequestCancelActivityTask, @@ -60,9 +64,9 @@ fsm! { --(ActivityTaskCancelRequested) --> ScheduledActivityCancelEventRecorded; ScheduledActivityCancelEventRecorded - --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; + --(ActivityTaskCanceled(ActivityTaskCanceledEventAttributes), shared on_activity_task_canceled) --> Canceled; ScheduledActivityCancelEventRecorded - --(ActivityTaskStarted) --> StartedActivityCancelEventRecorded; + --(ActivityTaskStarted(i64)) --> StartedActivityCancelEventRecorded; ScheduledActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; @@ -70,7 +74,7 @@ fsm! { --(CommandRequestCancelActivityTask) --> StartedActivityCancelCommandCreated; StartedActivityCancelCommandCreated --(ActivityTaskCancelRequested, - on_activity_task_cancel_requested) --> StartedActivityCancelEventRecorded; + shared on_activity_task_cancel_requested) --> StartedActivityCancelEventRecorded; StartedActivityCancelEventRecorded --(ActivityTaskFailed(ActivityTaskFailedEventAttributes), on_activity_task_failed) --> Failed; StartedActivityCancelEventRecorded @@ -78,7 +82,7 @@ fsm! { StartedActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; StartedActivityCancelEventRecorded - --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; + --(ActivityTaskCanceled(ActivityTaskCanceledEventAttributes), on_activity_task_canceled) --> Canceled; } #[derive(Debug, derive_more::Display)] @@ -88,27 +92,9 @@ pub(super) enum ActivityMachineCommand { #[display(fmt = "Fail")] Fail(Option), #[display(fmt = "Cancel")] - Cancel(Option), -} - -#[derive(Debug, Clone, derive_more::Display)] -pub(super) enum ActivityCancellationType { - /// Wait for activity cancellation completion. Note that activity must heartbeat to receive a - /// cancellation notification. This can block the cancellation for a long time if activity doesn't - /// heartbeat or chooses to ignore the cancellation request. - WaitCancellationCompleted, - - /// Initiate a cancellation request and immediately report cancellation to the workflow. - TryCancel, - - /// Do not request cancellation of the activity and immediately report cancellation to the workflow - Abandon, -} - -impl Default for ActivityCancellationType { - fn default() -> Self { - ActivityCancellationType::TryCancel - } + Cancel, + #[display(fmt = "RequestCancellation")] + RequestCancellation(Command), } /// Creates a new activity state machine and a command to schedule it on the server. @@ -126,8 +112,9 @@ impl ActivityMachine { let mut s = Self { state: Created {}.into(), shared_state: SharedState { + cancellation_type: attribs.cancellation_type.try_into().unwrap(), attrs: attribs, - cancellation_type: ActivityCancellationType::TryCancel, + ..Default::default() }, }; s.on_event_mut(ActivityMachineEvents::Schedule) @@ -145,8 +132,8 @@ impl TryFrom for ActivityMachineEvents { fn try_from(e: HistoryEvent) -> Result { Ok(match EventType::from_i32(e.event_type) { - Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled, - Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted, + Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled(e.event_id), + Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted(e.event_id), Some(EventType::ActivityTaskCompleted) => { if let Some(history_event::Attributes::ActivityTaskCompletedEventAttributes( attrs, @@ -174,7 +161,18 @@ impl TryFrom for ActivityMachineEvents { } Some(EventType::ActivityTaskTimedOut) => Self::ActivityTaskTimedOut, Some(EventType::ActivityTaskCancelRequested) => Self::ActivityTaskCancelRequested, - Some(EventType::ActivityTaskCanceled) => Self::ActivityTaskCanceled, + Some(EventType::ActivityTaskCanceled) => { + if let Some(history_event::Attributes::ActivityTaskCanceledEventAttributes(attrs)) = + e.attributes + { + Self::ActivityTaskCanceled(attrs) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Activity cancelation attributes were unset".to_string(), + )); + } + } _ => { return Err(WFMachinesError::UnexpectedEvent( e, @@ -185,18 +183,6 @@ impl TryFrom for ActivityMachineEvents { } } -impl TryFrom for ActivityMachineEvents { - type Error = (); - - fn try_from(c: CommandType) -> Result { - Ok(match c { - CommandType::ScheduleActivityTask => Self::CommandScheduleActivityTask, - CommandType::RequestCancelActivityTask => Self::CommandRequestCancelActivityTask, - _ => return Err(()), - }) - } -} - impl WFMachinesAdapter for ActivityMachine { fn adapt_response( &self, @@ -227,25 +213,49 @@ impl WFMachinesAdapter for ActivityMachine { }), } .into()], - ActivityMachineCommand::Cancel(_) => unimplemented!(), + ActivityMachineCommand::Cancel => unimplemented!(), + ActivityMachineCommand::RequestCancellation(_) => unimplemented!(), + }) + } +} + +impl TryFrom for ActivityMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::ScheduleActivityTask => Self::CommandScheduleActivityTask, + CommandType::RequestCancelActivityTask => Self::CommandRequestCancelActivityTask, + _ => return Err(()), }) } } impl Cancellable for ActivityMachine { fn cancel(&mut self) -> Result> { - unimplemented!() + Ok( + match self.on_event_mut(ActivityMachineEvents::Cancel)?.pop() { + Some(ActivityMachineCommand::RequestCancellation(cmd)) => { + MachineResponse::IssueNewCommand(cmd) + } + Some(ActivityMachineCommand::Cancel) => MachineResponse::NoOp, + x => panic!("Invalid cancel event response {:?}", x), + }, + ) } fn was_cancelled_before_sent_to_server(&self) -> bool { - false // TODO return cancellation flag from the shared state + self.shared_state().cancelled_before_sent } } #[derive(Default, Clone)] pub(super) struct SharedState { + scheduled_event_id: i64, + started_event_id: i64, attrs: ScheduleActivity, cancellation_type: ActivityCancellationType, + cancelled_before_sent: bool, } #[derive(Default, Clone)] @@ -262,14 +272,27 @@ impl Created { pub(super) struct ScheduleCommandCreated {} impl ScheduleCommandCreated { - pub(super) fn on_activity_task_scheduled(self) -> ActivityMachineTransition { - // set initial command event id - // this.initialCommandEventId = currentEvent.getEventId(); + pub(super) fn on_activity_task_scheduled( + self, + mut dat: SharedState, + scheduled_event_id: i64, + ) -> ActivityMachineTransition { + dat.scheduled_event_id = scheduled_event_id; ActivityMachineTransition::default::() } - pub(super) fn on_canceled(self) -> ActivityMachineTransition { - // cancelCommandNotifyCanceled - ActivityMachineTransition::default::() + pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { + let canceled_state = SharedState { + cancelled_before_sent: true, + ..dat + }; + match dat.cancellation_type { + ActivityCancellationType::Abandon => ActivityMachineTransition::ok_shared( + vec![ActivityMachineCommand::Cancel], + Canceled::default(), + canceled_state, + ), + _ => Common::notify_canceled(canceled_state, Canceled::default().into()), + } } } @@ -277,17 +300,71 @@ impl ScheduleCommandCreated { pub(super) struct ScheduledEventRecorded {} impl ScheduledEventRecorded { - pub(super) fn on_task_started(self) -> ActivityMachineTransition { - // setStartedCommandEventId + pub(super) fn on_task_started( + self, + mut dat: SharedState, + started_event_id: i64, + ) -> ActivityMachineTransition { + dat.started_event_id = started_event_id; ActivityMachineTransition::default::() } pub(super) fn on_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out ActivityMachineTransition::default::() } - pub(super) fn on_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { + Common::create_request_cancel_activity_task_command(dat) + } +} + +struct Common {} +impl Common { + fn create_request_cancel_activity_task_command( + dat: SharedState, + ) -> TransitionResult { // createRequestCancelActivityTaskCommand - ActivityMachineTransition::default::() + let cmd = Command { + command_type: CommandType::RequestCancelActivityTask as i32, + attributes: Some( + RequestCancelActivity { + scheduled_event_id: dat.scheduled_event_id, + activity_id: dat.attrs.activity_id, + } + .into(), + ), + }; + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::RequestCancellation(cmd)], + ScheduledActivityCancelCommandCreated::default(), + ) + } + + fn notify_canceled( + dat: SharedState, + next_state: ActivityMachineState, + ) -> TransitionResult { + ActivityMachineTransition::ok_shared( + vec![ActivityMachineCommand::Fail(Some(Failure { + message: "Activity canceled".to_string(), + source: "CoreSDK".to_string(), + stack_trace: "".to_string(), + cause: None, + failure_info: Some(failure::FailureInfo::ActivityFailureInfo( + ActivityFailureInfo { + scheduled_event_id: dat.scheduled_event_id, + started_event_id: dat.started_event_id, + identity: "workflow".to_string(), + activity_type: Some(ActivityType { + name: dat.attrs.activity_type.to_string(), + }), + activity_id: dat.attrs.activity_id.to_string(), + retry_state: RetryState::Unspecified as i32, + }, + )), + }))], + next_state, + dat, + ) } } @@ -319,9 +396,8 @@ impl Started { // notify_timed_out ActivityMachineTransition::default::() } - pub(super) fn on_canceled(self) -> ActivityMachineTransition { - // createRequestCancelActivityTaskCommand - ActivityMachineTransition::default::() + pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { + Common::create_request_cancel_activity_task_command(dat) } } @@ -339,10 +415,15 @@ impl ScheduledActivityCancelCommandCreated { pub(super) struct ScheduledActivityCancelEventRecorded {} impl ScheduledActivityCancelEventRecorded { - pub(super) fn on_activity_task_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_canceled( + self, + dat: SharedState, + _attrs: ActivityTaskCanceledEventAttributes, + ) -> ActivityMachineTransition { // notify_canceled - ActivityMachineTransition::default::() + Common::notify_canceled(dat, Canceled::default().into()) } + pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out ActivityMachineTransition::default::() @@ -359,9 +440,17 @@ impl From for ScheduledActivityCancelEven pub(super) struct StartedActivityCancelCommandCreated {} impl StartedActivityCancelCommandCreated { - pub(super) fn on_activity_task_cancel_requested(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_cancel_requested( + self, + dat: SharedState, + ) -> ActivityMachineTransition { // notifyCanceledIfTryCancel - ActivityMachineTransition::default::() + match dat.cancellation_type { + ActivityCancellationType::TryCancel => { + Common::notify_canceled(dat, StartedActivityCancelEventRecorded::default().into()) + } + _ => ActivityMachineTransition::default::(), + } } } @@ -393,7 +482,10 @@ impl StartedActivityCancelEventRecorded { // notify_timed_out ActivityMachineTransition::default::() } - pub(super) fn on_activity_task_canceled(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_canceled( + self, + _attrs: ActivityTaskCanceledEventAttributes, + ) -> ActivityMachineTransition { // notifyCancellationFromEvent ActivityMachineTransition::default::() } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index fd48894d6..7aac9fb8f 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -547,21 +547,14 @@ impl WorkflowMachines { self.current_wf_task_commands.push_back(timer); } WFCommand::CancelTimer(attrs) => { - let mkey = *self - .id_to_machine - .get(&CommandID::Timer(attrs.timer_id.to_owned())) - .ok_or_else(|| { - WFMachinesError::MissingAssociatedMachine(format!( - "Missing associated machine for cancelling timer {}", - &attrs.timer_id - )) - })?; - let res = self.machine_mut(mkey).cancel()?; + let m_key = + self.get_machine_key(&CommandID::Timer(attrs.timer_id.to_owned()))?; + let res = self.machine_mut(m_key).cancel()?; match res { MachineResponse::IssueNewCommand(c) => { self.current_wf_task_commands.push_back(CommandAndMachine { command: c, - machine: mkey, + machine: m_key, }) } MachineResponse::NoOp => {} @@ -580,7 +573,26 @@ impl WorkflowMachines { .insert(CommandID::Activity(aid), activity.machine); self.current_wf_task_commands.push_back(activity); } - WFCommand::RequestCancelActivity(_) => unimplemented!(), + WFCommand::RequestCancelActivity(attrs) => { + let m_key = + self.get_machine_key(&CommandID::Activity(attrs.activity_id.to_owned()))?; + let res = self.machine_mut(m_key).cancel()?; + match res { + MachineResponse::IssueNewCommand(c) => { + self.current_wf_task_commands.push_back(CommandAndMachine { + command: c, + machine: m_key, + }) + } + MachineResponse::NoOp => {} + v => { + return Err(WFMachinesError::UnexpectedMachineResponse( + v, + "When cancelling activity", + )); + } + } + } WFCommand::CompleteWorkflow(attrs) => { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); @@ -595,6 +607,15 @@ impl WorkflowMachines { Ok(()) } + fn get_machine_key(&mut self, id: &CommandID) -> Result { + Ok(*self.id_to_machine.get(id).ok_or_else(|| { + WFMachinesError::MissingAssociatedMachine(format!( + "Missing associated machine for cancelling timer {:?}", + id + )) + })?) + } + /// Transfer commands from `current_wf_task_commands` to `commands`, so they may be sent off /// to the server. While doing so, [TemporalStateMachine::handle_command] is called on the /// machine associated with the command. diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 3d92a1406..88d55b3fa 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -274,12 +274,14 @@ pub mod temporal { pub mod v1 { include!("temporal.api.command.v1.rs"); + use crate::protos::coresdk::workflow_commands::ActivityCancellationType; use crate::protos::{ coresdk::{workflow_commands, PayloadsExt}, temporal::api::common::v1::ActivityType, temporal::api::enums::v1::CommandType, }; use command::Attributes; + use std::convert::TryFrom; use std::fmt::{Display, Formatter}; impl From for Command { @@ -339,6 +341,42 @@ pub mod temporal { } } + impl From for command::Attributes { + fn from(c: workflow_commands::RequestCancelActivity) -> Self { + Self::RequestCancelActivityTaskCommandAttributes( + RequestCancelActivityTaskCommandAttributes { + scheduled_event_id: c.scheduled_event_id, + }, + ) + } + } + + #[derive(derive_more::Display, Debug)] + pub enum CancellationTypeConversionError { + InvalidCancellationType, + } + + impl TryFrom for ActivityCancellationType { + type Error = CancellationTypeConversionError; + + fn try_from(c: i32) -> Result { + match c { + x if x == ActivityCancellationType::TryCancel as i32 => { + Ok(ActivityCancellationType::TryCancel) + } + x if x + == ActivityCancellationType::WaitCancellationCompleted as i32 => + { + Ok(ActivityCancellationType::WaitCancellationCompleted) + } + x if x == ActivityCancellationType::Abandon as i32 => { + Ok(ActivityCancellationType::Abandon) + } + _ => Err(CancellationTypeConversionError::InvalidCancellationType), + } + } + } + impl From for command::Attributes { fn from(s: workflow_commands::ScheduleActivity) -> Self { Self::ScheduleActivityTaskCommandAttributes( From 752e7cf3e87440e8efbdc31b9eec4957f7b3a059 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Tue, 30 Mar 2021 01:22:19 -0700 Subject: [PATCH 02/19] Add notify_canceled_from_event --- src/machines/activity_state_machine.rs | 34 +++++++++++++++++++++++--- 1 file changed, 31 insertions(+), 3 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 04ed49df9..3a7096e98 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -82,7 +82,7 @@ fsm! { StartedActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; StartedActivityCancelEventRecorded - --(ActivityTaskCanceled(ActivityTaskCanceledEventAttributes), on_activity_task_canceled) --> Canceled; + --(ActivityTaskCanceled(ActivityTaskCanceledEventAttributes), shared on_activity_task_canceled) --> Canceled; } #[derive(Debug, derive_more::Display)] @@ -366,6 +366,33 @@ impl Common { dat, ) } + + fn notify_canceled_from_event( + dat: SharedState, + attrs: ActivityTaskCanceledEventAttributes, + ) -> TransitionResult { + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::Fail(Some(Failure { + message: "Activity canceled".to_string(), + source: "CoreSDK".to_string(), + stack_trace: "".to_string(), + cause: None, + failure_info: Some(failure::FailureInfo::ActivityFailureInfo( + ActivityFailureInfo { + scheduled_event_id: attrs.scheduled_event_id, + started_event_id: attrs.started_event_id, + identity: "".to_string(), // TODO ? + activity_type: Some(ActivityType { + name: dat.attrs.activity_type.to_string(), + }), + activity_id: dat.attrs.activity_id.to_string(), + retry_state: RetryState::Unspecified as i32, + }, + )), + }))], + Canceled::default(), + ) + } } #[derive(Default, Clone)] @@ -484,10 +511,11 @@ impl StartedActivityCancelEventRecorded { } pub(super) fn on_activity_task_canceled( self, - _attrs: ActivityTaskCanceledEventAttributes, + dat: SharedState, + attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { // notifyCancellationFromEvent - ActivityMachineTransition::default::() + Common::notify_canceled_from_event(dat, attrs) } } From 4a197a1f32b9e2c18ec186967a22305ce1ac1a35 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Tue, 30 Mar 2021 21:05:46 -0700 Subject: [PATCH 03/19] Address PR feedback --- src/machines/activity_state_machine.rs | 170 ++++++++++++------------- src/machines/workflow_machines.rs | 4 +- src/protos/mod.rs | 28 ---- 3 files changed, 87 insertions(+), 115 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 3a7096e98..db8dc966e 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -112,7 +112,8 @@ impl ActivityMachine { let mut s = Self { state: Created {}.into(), shared_state: SharedState { - cancellation_type: attribs.cancellation_type.try_into().unwrap(), + cancellation_type: ActivityCancellationType::from_i32(attribs.cancellation_type) + .unwrap(), attrs: attribs, ..Default::default() }, @@ -238,7 +239,7 @@ impl Cancellable for ActivityMachine { Some(ActivityMachineCommand::RequestCancellation(cmd)) => { MachineResponse::IssueNewCommand(cmd) } - Some(ActivityMachineCommand::Cancel) => MachineResponse::NoOp, + Some(ActivityMachineCommand::Cancel) => unimplemented!(), x => panic!("Invalid cancel event response {:?}", x), }, ) @@ -291,7 +292,7 @@ impl ScheduleCommandCreated { Canceled::default(), canceled_state, ), - _ => Common::notify_canceled(canceled_state, Canceled::default().into()), + _ => notify_canceled(canceled_state, Canceled::default().into()), } } } @@ -313,85 +314,7 @@ impl ScheduledEventRecorded { ActivityMachineTransition::default::() } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { - Common::create_request_cancel_activity_task_command(dat) - } -} - -struct Common {} -impl Common { - fn create_request_cancel_activity_task_command( - dat: SharedState, - ) -> TransitionResult { - // createRequestCancelActivityTaskCommand - let cmd = Command { - command_type: CommandType::RequestCancelActivityTask as i32, - attributes: Some( - RequestCancelActivity { - scheduled_event_id: dat.scheduled_event_id, - activity_id: dat.attrs.activity_id, - } - .into(), - ), - }; - ActivityMachineTransition::ok( - vec![ActivityMachineCommand::RequestCancellation(cmd)], - ScheduledActivityCancelCommandCreated::default(), - ) - } - - fn notify_canceled( - dat: SharedState, - next_state: ActivityMachineState, - ) -> TransitionResult { - ActivityMachineTransition::ok_shared( - vec![ActivityMachineCommand::Fail(Some(Failure { - message: "Activity canceled".to_string(), - source: "CoreSDK".to_string(), - stack_trace: "".to_string(), - cause: None, - failure_info: Some(failure::FailureInfo::ActivityFailureInfo( - ActivityFailureInfo { - scheduled_event_id: dat.scheduled_event_id, - started_event_id: dat.started_event_id, - identity: "workflow".to_string(), - activity_type: Some(ActivityType { - name: dat.attrs.activity_type.to_string(), - }), - activity_id: dat.attrs.activity_id.to_string(), - retry_state: RetryState::Unspecified as i32, - }, - )), - }))], - next_state, - dat, - ) - } - - fn notify_canceled_from_event( - dat: SharedState, - attrs: ActivityTaskCanceledEventAttributes, - ) -> TransitionResult { - ActivityMachineTransition::ok( - vec![ActivityMachineCommand::Fail(Some(Failure { - message: "Activity canceled".to_string(), - source: "CoreSDK".to_string(), - stack_trace: "".to_string(), - cause: None, - failure_info: Some(failure::FailureInfo::ActivityFailureInfo( - ActivityFailureInfo { - scheduled_event_id: attrs.scheduled_event_id, - started_event_id: attrs.started_event_id, - identity: "".to_string(), // TODO ? - activity_type: Some(ActivityType { - name: dat.attrs.activity_type.to_string(), - }), - activity_id: dat.attrs.activity_id.to_string(), - retry_state: RetryState::Unspecified as i32, - }, - )), - }))], - Canceled::default(), - ) + create_request_cancel_activity_task_command(dat) } } @@ -424,7 +347,7 @@ impl Started { ActivityMachineTransition::default::() } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { - Common::create_request_cancel_activity_task_command(dat) + create_request_cancel_activity_task_command(dat) } } @@ -448,7 +371,7 @@ impl ScheduledActivityCancelEventRecorded { _attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { // notify_canceled - Common::notify_canceled(dat, Canceled::default().into()) + notify_canceled(dat, Canceled::default().into()) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { @@ -474,7 +397,7 @@ impl StartedActivityCancelCommandCreated { // notifyCanceledIfTryCancel match dat.cancellation_type { ActivityCancellationType::TryCancel => { - Common::notify_canceled(dat, StartedActivityCancelEventRecorded::default().into()) + notify_canceled(dat, StartedActivityCancelEventRecorded::default().into()) } _ => ActivityMachineTransition::default::(), } @@ -515,7 +438,7 @@ impl StartedActivityCancelEventRecorded { attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { // notifyCancellationFromEvent - Common::notify_canceled_from_event(dat, attrs) + notify_canceled_from_event(dat, attrs) } } @@ -537,6 +460,81 @@ pub(super) struct TimedOut {} #[derive(Default, Clone)] pub(super) struct Canceled {} +fn create_request_cancel_activity_task_command( + dat: SharedState, +) -> TransitionResult { + // createRequestCancelActivityTaskCommand + let cmd = Command { + command_type: CommandType::RequestCancelActivityTask as i32, + attributes: Some( + RequestCancelActivity { + scheduled_event_id: dat.scheduled_event_id, + activity_id: dat.attrs.activity_id, + } + .into(), + ), + }; + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::RequestCancellation(cmd)], + ScheduledActivityCancelCommandCreated::default(), + ) +} + +fn notify_canceled( + dat: SharedState, + next_state: ActivityMachineState, +) -> TransitionResult { + ActivityMachineTransition::ok_shared( + vec![ActivityMachineCommand::Fail(Some(Failure { + message: "Activity canceled".to_string(), + source: "CoreSDK".to_string(), + stack_trace: "".to_string(), + cause: None, + failure_info: Some(failure::FailureInfo::ActivityFailureInfo( + ActivityFailureInfo { + scheduled_event_id: dat.scheduled_event_id, + started_event_id: dat.started_event_id, + identity: "workflow".to_string(), + activity_type: Some(ActivityType { + name: dat.attrs.activity_type.to_string(), + }), + activity_id: dat.attrs.activity_id.to_string(), + retry_state: RetryState::Unspecified as i32, + }, + )), + }))], + next_state, + dat, + ) +} + +fn notify_canceled_from_event( + dat: SharedState, + attrs: ActivityTaskCanceledEventAttributes, +) -> TransitionResult { + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::Fail(Some(Failure { + message: "Activity canceled".to_string(), + source: "CoreSDK".to_string(), + stack_trace: "".to_string(), + cause: None, + failure_info: Some(failure::FailureInfo::ActivityFailureInfo( + ActivityFailureInfo { + scheduled_event_id: attrs.scheduled_event_id, + started_event_id: attrs.started_event_id, + identity: "".to_string(), // TODO ? + activity_type: Some(ActivityType { + name: dat.attrs.activity_type.to_string(), + }), + activity_id: dat.attrs.activity_id.to_string(), + retry_state: RetryState::Unspecified as i32, + }, + )), + }))], + Canceled::default(), + ) +} + #[cfg(test)] mod test { use super::*; diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 7aac9fb8f..94a489659 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -139,6 +139,8 @@ pub enum WFMachinesError { #[error("Machine encountered an invalid transition: {0}")] InvalidTransition(&'static str), + #[error("Invalid cancelation type: {0}")] + InvalidCancelationType(i32), } impl WorkflowMachines { @@ -610,7 +612,7 @@ impl WorkflowMachines { fn get_machine_key(&mut self, id: &CommandID) -> Result { Ok(*self.id_to_machine.get(id).ok_or_else(|| { WFMachinesError::MissingAssociatedMachine(format!( - "Missing associated machine for cancelling timer {:?}", + "Missing associated machine for {:?}", id )) })?) diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 88d55b3fa..f7a0508f4 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -274,14 +274,12 @@ pub mod temporal { pub mod v1 { include!("temporal.api.command.v1.rs"); - use crate::protos::coresdk::workflow_commands::ActivityCancellationType; use crate::protos::{ coresdk::{workflow_commands, PayloadsExt}, temporal::api::common::v1::ActivityType, temporal::api::enums::v1::CommandType, }; use command::Attributes; - use std::convert::TryFrom; use std::fmt::{Display, Formatter}; impl From for Command { @@ -351,32 +349,6 @@ pub mod temporal { } } - #[derive(derive_more::Display, Debug)] - pub enum CancellationTypeConversionError { - InvalidCancellationType, - } - - impl TryFrom for ActivityCancellationType { - type Error = CancellationTypeConversionError; - - fn try_from(c: i32) -> Result { - match c { - x if x == ActivityCancellationType::TryCancel as i32 => { - Ok(ActivityCancellationType::TryCancel) - } - x if x - == ActivityCancellationType::WaitCancellationCompleted as i32 => - { - Ok(ActivityCancellationType::WaitCancellationCompleted) - } - x if x == ActivityCancellationType::Abandon as i32 => { - Ok(ActivityCancellationType::Abandon) - } - _ => Err(CancellationTypeConversionError::InvalidCancellationType), - } - } - } - impl From for command::Attributes { fn from(s: workflow_commands::ScheduleActivity) -> Self { Self::ScheduleActivityTaskCommandAttributes( From 9c8db147002e2009b47277c01c092b8d37eac293 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Tue, 30 Mar 2021 22:15:43 -0700 Subject: [PATCH 04/19] Add an end to end integ test for simple activity cancellation scenario --- protos/local/workflow_commands.proto | 6 -- src/machines/activity_state_machine.rs | 24 +++++-- tests/integ_tests/simple_wf_tests.rs | 99 ++++++++++++++++++++++++-- 3 files changed, 111 insertions(+), 18 deletions(-) diff --git a/protos/local/workflow_commands.proto b/protos/local/workflow_commands.proto index 7a9f0226c..2de86c7c3 100644 --- a/protos/local/workflow_commands.proto +++ b/protos/local/workflow_commands.proto @@ -101,12 +101,6 @@ message QuerySuccess { common.Payload response = 1; } -/// Request cancellation of an activity from a workflow -message RequestActivityCancellation { - string activity_id = 1; - string reason = 2; -} - /// Issued when the workflow completes successfully message CompleteWorkflowExecution { common.Payload result = 1; diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index db8dc966e..551230bb5 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -275,11 +275,17 @@ pub(super) struct ScheduleCommandCreated {} impl ScheduleCommandCreated { pub(super) fn on_activity_task_scheduled( self, - mut dat: SharedState, + dat: SharedState, scheduled_event_id: i64, ) -> ActivityMachineTransition { - dat.scheduled_event_id = scheduled_event_id; - ActivityMachineTransition::default::() + ActivityMachineTransition::ok_shared( + vec![], + ScheduledEventRecorded::default(), + SharedState { + scheduled_event_id, + ..dat + }, + ) } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { let canceled_state = SharedState { @@ -303,11 +309,17 @@ pub(super) struct ScheduledEventRecorded {} impl ScheduledEventRecorded { pub(super) fn on_task_started( self, - mut dat: SharedState, + dat: SharedState, started_event_id: i64, ) -> ActivityMachineTransition { - dat.started_event_id = started_event_id; - ActivityMachineTransition::default::() + ActivityMachineTransition::ok_shared( + vec![], + Started::default(), + SharedState { + started_event_id, + ..dat + }, + ) } pub(super) fn on_task_timed_out(self) -> ActivityMachineTransition { // notify_timed_out diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index d67a79712..8562dbea3 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -14,8 +14,8 @@ use temporal_sdk_core::{ WfActivationJob, }, workflow_commands::{ - CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, ScheduleActivity, - StartTimer, + CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, RequestCancelActivity, + ScheduleActivity, StartTimer, }, workflow_completion::WfActivationCompletion, }, @@ -119,7 +119,7 @@ async fn activity_workflow() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_workflow_task(task_q).await.unwrap(); // Complete workflow task and schedule activity - core.complete_workflow_task(activity_completion_req(task_q, &activity_id, task)) + core.complete_workflow_task(schedule_activity_cmd(task_q, &activity_id, task)) .await .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters @@ -177,7 +177,7 @@ async fn activity_non_retryable_failure() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_workflow_task(task_q).await.unwrap(); // Complete workflow task and schedule activity - core.complete_workflow_task(activity_completion_req(task_q, &activity_id, task)) + core.complete_workflow_task(schedule_activity_cmd(task_q, &activity_id, task)) .await .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters @@ -241,7 +241,7 @@ async fn activity_retry() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_workflow_task(task_q).await.unwrap(); // Complete workflow task and schedule activity - core.complete_workflow_task(activity_completion_req(task_q, &activity_id, task)) + core.complete_workflow_task(schedule_activity_cmd(task_q, &activity_id, task)) .await .unwrap(); // Poll activity 1st time @@ -313,7 +313,7 @@ async fn activity_retry() { .unwrap() } -fn activity_completion_req( +fn schedule_activity_cmd( task_q: &str, activity_id: &str, task: WfActivation, @@ -334,6 +334,93 @@ fn activity_completion_req( task.task_token, ) } +fn schedule_activity_and_timer_cmds( + task_q: &str, + activity_id: &str, + timer_id: &str, + task: WfActivation, +) -> WfActivationCompletion { + WfActivationCompletion::ok_from_cmds( + vec![ + ScheduleActivity { + activity_id: activity_id.to_string(), + activity_type: "test_activity".to_string(), + namespace: NAMESPACE.to_owned(), + task_queue: task_q.to_owned(), + schedule_to_start_timeout: Some(Duration::from_secs(30).into()), + start_to_close_timeout: Some(Duration::from_secs(30).into()), + schedule_to_close_timeout: Some(Duration::from_secs(60).into()), + heartbeat_timeout: Some(Duration::from_secs(60).into()), + ..Default::default() + } + .into(), + StartTimer { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_millis(50).into()), + } + .into(), + ], + task.task_token, + ) +} + +#[tokio::test] +async fn activity_cancellation() { + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_failed_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core().await; + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + task_q, + &activity_id, + &timer_id, + task, + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to complete + // it in this test as activity is getting cancelled. + let task = dbg!(core.poll_activity_task(task_q).await.unwrap()); + assert_matches!( + task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![ + RequestCancelActivity { + activity_id, + ..Default::default() + } + .into(), + CompleteWorkflowExecution { result: None }.into(), + ], + task.task_token, + )) + .await + .unwrap() +} #[tokio::test] async fn parallel_timer_workflow() { From 1dcfe12c742d2ebddc739eb56fda53e771b43399 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Tue, 30 Mar 2021 22:49:50 -0700 Subject: [PATCH 05/19] Address lint error --- src/machines/activity_state_machine.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 551230bb5..a0a038b9b 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -538,7 +538,7 @@ fn notify_canceled_from_event( activity_type: Some(ActivityType { name: dat.attrs.activity_type.to_string(), }), - activity_id: dat.attrs.activity_id.to_string(), + activity_id: dat.attrs.activity_id, retry_state: RetryState::Unspecified as i32, }, )), From 7bb69065b2ff8aa0f1ffa371d017a5159ca2bc88 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 31 Mar 2021 01:22:29 -0700 Subject: [PATCH 06/19] Add missing cancelation handler, cleanup some unused code and improve integ test --- src/machines/activity_state_machine.rs | 30 +++++++++++---------- tests/integ_tests/simple_wf_tests.rs | 36 ++++++++++++++++++-------- 2 files changed, 42 insertions(+), 24 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index a0a038b9b..5edee0087 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -59,7 +59,7 @@ fsm! { ScheduledActivityCancelCommandCreated --(CommandRequestCancelActivityTask, - on_command_request_cancel_activity_task) --> ScheduledActivityCancelCommandCreated; + shared on_command_request_cancel_activity_task) --> ScheduledActivityCancelCommandCreated; ScheduledActivityCancelCommandCreated --(ActivityTaskCancelRequested) --> ScheduledActivityCancelEventRecorded; @@ -91,8 +91,6 @@ pub(super) enum ActivityMachineCommand { Complete(Option), #[display(fmt = "Fail")] Fail(Option), - #[display(fmt = "Cancel")] - Cancel, #[display(fmt = "RequestCancellation")] RequestCancellation(Command), } @@ -214,8 +212,9 @@ impl WFMachinesAdapter for ActivityMachine { }), } .into()], - ActivityMachineCommand::Cancel => unimplemented!(), - ActivityMachineCommand::RequestCancellation(_) => unimplemented!(), + ActivityMachineCommand::RequestCancellation(c) => { + vec![MachineResponse::IssueNewCommand(c)] + } }) } } @@ -239,7 +238,6 @@ impl Cancellable for ActivityMachine { Some(ActivityMachineCommand::RequestCancellation(cmd)) => { MachineResponse::IssueNewCommand(cmd) } - Some(ActivityMachineCommand::Cancel) => unimplemented!(), x => panic!("Invalid cancel event response {:?}", x), }, ) @@ -293,11 +291,9 @@ impl ScheduleCommandCreated { ..dat }; match dat.cancellation_type { - ActivityCancellationType::Abandon => ActivityMachineTransition::ok_shared( - vec![ActivityMachineCommand::Cancel], - Canceled::default(), - canceled_state, - ), + ActivityCancellationType::Abandon => { + ActivityMachineTransition::ok_shared(vec![], Canceled::default(), canceled_state) + } _ => notify_canceled(canceled_state, Canceled::default().into()), } } @@ -367,9 +363,17 @@ impl Started { pub(super) struct ScheduledActivityCancelCommandCreated {} impl ScheduledActivityCancelCommandCreated { - pub(super) fn on_command_request_cancel_activity_task(self) -> ActivityMachineTransition { + pub(super) fn on_command_request_cancel_activity_task( + self, + dat: SharedState, + ) -> ActivityMachineTransition { // notifyCanceledIfTryCancel - ActivityMachineTransition::default::() + match dat.cancellation_type { + ActivityCancellationType::TryCancel => { + notify_canceled(dat, ScheduledActivityCancelCommandCreated::default().into()) + } + _ => ActivityMachineTransition::default::(), + } } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 8562dbea3..5f71e75c5 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -386,9 +386,9 @@ async fn activity_cancellation() { .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to complete // it in this test as activity is getting cancelled. - let task = dbg!(core.poll_activity_task(task_q).await.unwrap()); + let activity_task = dbg!(core.poll_activity_task(task_q).await.unwrap()); assert_matches!( - task.variant, + activity_task.variant, Some(act_task::Variant::Start(start_activity)) => { assert_eq!(start_activity.activity_type, "test_activity".to_string()) } @@ -408,18 +408,32 @@ async fn activity_cancellation() { } ); core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( - vec![ - RequestCancelActivity { - activity_id, - ..Default::default() - } - .into(), - CompleteWorkflowExecution { result: None }.into(), - ], + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], task.task_token, )) .await - .unwrap() + .unwrap(); + core.complete_activity_task(ActivityTaskCompletion { + task_token: activity_task.task_token, + result: Some(ActivityResult { + status: Some(activity_result::activity_result::Status::Canceled( + activity_result::Cancelation { details: None }, + )), + }), + }) + .await + .unwrap(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); } #[tokio::test] From 99b996c86f3fe6bccc148c1efa388b2ba2f7cd5b Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 31 Mar 2021 19:28:57 -0700 Subject: [PATCH 07/19] Use correct transition states --- src/machines/activity_state_machine.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 5edee0087..9d5e949b1 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -322,7 +322,10 @@ impl ScheduledEventRecorded { ActivityMachineTransition::default::() } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { - create_request_cancel_activity_task_command(dat) + create_request_cancel_activity_task_command( + dat, + ScheduledActivityCancelCommandCreated::default().into(), + ) } } @@ -355,7 +358,10 @@ impl Started { ActivityMachineTransition::default::() } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { - create_request_cancel_activity_task_command(dat) + create_request_cancel_activity_task_command( + dat, + StartedActivityCancelCommandCreated::default().into(), + ) } } @@ -478,6 +484,7 @@ pub(super) struct Canceled {} fn create_request_cancel_activity_task_command( dat: SharedState, + next_state: ActivityMachineState, ) -> TransitionResult { // createRequestCancelActivityTaskCommand let cmd = Command { @@ -492,7 +499,7 @@ fn create_request_cancel_activity_task_command( }; ActivityMachineTransition::ok( vec![ActivityMachineCommand::RequestCancellation(cmd)], - ScheduledActivityCancelCommandCreated::default(), + next_state, ) } From f6476980ff6fd2308d5bc80f6f0bce7ce87a4e51 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 31 Mar 2021 23:49:49 -0700 Subject: [PATCH 08/19] Add a test case for immediate cancellation and resolve activity job propagation --- src/machines/activity_state_machine.rs | 94 ++++++++++++++++--- .../test_help/async_workflow_driver.rs | 19 +++- src/machines/workflow_machines.rs | 3 + 3 files changed, 102 insertions(+), 14 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 9d5e949b1..5496f2b32 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,5 +1,6 @@ #![allow(clippy::large_enum_variant)] +use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; use crate::protos::coresdk::workflow_commands::{ActivityCancellationType, RequestCancelActivity}; use crate::protos::coresdk::PayloadsExt; use crate::protos::temporal::api::common::v1::ActivityType; @@ -17,7 +18,9 @@ use crate::{ coresdk::{ activity_result::{self as ar, activity_result, ActivityResult}, activity_task, - workflow_activation::ResolveActivity, + workflow_activation::{ + wf_activation_job, ResolveActivity, StartWorkflow, WfActivationJob, + }, }, temporal::api::{ command::v1::Command, @@ -233,14 +236,27 @@ impl TryFrom for ActivityMachineEvents { impl Cancellable for ActivityMachine { fn cancel(&mut self) -> Result> { - Ok( - match self.on_event_mut(ActivityMachineEvents::Cancel)?.pop() { - Some(ActivityMachineCommand::RequestCancellation(cmd)) => { - MachineResponse::IssueNewCommand(cmd) - } - x => panic!("Invalid cancel event response {:?}", x), - }, - ) + let mut vec = self.on_event_mut(ActivityMachineEvents::Cancel)?; + // TODO it could be possible to have multiple events, e.g. cancellation command and immediate activation, we should handle it. + if vec.len() != 1 { + unimplemented!() + } + Ok(match vec.pop() { + Some(ActivityMachineCommand::RequestCancellation(cmd)) => { + MachineResponse::IssueNewCommand(cmd) + } + Some(ActivityMachineCommand::Fail(failure)) => { + MachineResponse::PushWFJob(Variant::ResolveActivity(ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(ar::Failure { + failure: failure.map(Into::into), + })), + }), + })) + } + x => panic!("Invalid cancel event response {:?}", x), + }) } fn was_cancelled_before_sent_to_server(&self) -> bool { @@ -561,6 +577,7 @@ fn notify_canceled_from_event( #[cfg(test)] mod test { use super::*; + use crate::protos::coresdk::workflow_activation::WfActivation; use crate::{ machines::{ test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}, @@ -574,7 +591,7 @@ mod test { #[fixture] fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { - let twd = activity_workflow_driver(); + let twd = activity_workflow_driver("activity-id-1"); let t = canned_histories::single_activity("activity-id-1"); let state_machines = WorkflowMachines::new( "wfid".to_string(), @@ -588,7 +605,7 @@ mod test { #[fixture] fn activity_failure_hist() -> (TestHistoryBuilder, WorkflowMachines) { - let twd = activity_workflow_driver(); + let twd = activity_workflow_driver("activity-id-1"); let t = canned_histories::single_failed_activity("activity-id-1"); let state_machines = WorkflowMachines::new( "wfid".to_string(), @@ -600,10 +617,10 @@ mod test { (t, state_machines) } - fn activity_workflow_driver() -> TestWorkflowDriver { + fn activity_workflow_driver(activity_id: &'static str) -> TestWorkflowDriver { TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let activity = ScheduleActivity { - activity_id: "activity-id-1".to_string(), + activity_id: activity_id.to_string(), ..Default::default() }; command_sink.activity(activity).await; @@ -657,4 +674,55 @@ mod test { CommandType::CompleteWorkflowExecution as i32 ); } + + #[test] + fn immediate_activity_cancelation() { + let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { + let cancel_activity_future = cmd_sink.activity(ScheduleActivity { + activity_id: "activity-id-1".to_string(), + ..Default::default() + }); + // Immediately cancel the activity + cmd_sink.cancel_activity("activity-id-1"); + cancel_activity_future.await; + + let complete = CompleteWorkflowExecution::default(); + cmd_sink.send(complete.into()); + }); + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_workflow_execution_completed(); + + let mut state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); + + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap(); + assert_eq!(commands.len(), 0); + let activation = state_machines.get_wf_activation().unwrap(); + assert_matches!( + activation.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity( + ResolveActivity { + activity_id, + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(_)) + }) + } + )), + }, + ] + ) + } } diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 7f7784ed6..7f6347c46 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,6 +1,6 @@ use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; use crate::protos::coresdk::workflow_activation::ResolveActivity; -use crate::protos::coresdk::workflow_commands::ScheduleActivity; +use crate::protos::coresdk::workflow_commands::{RequestCancelActivity, ScheduleActivity}; use crate::{ machines::{workflow_machines::CommandID, WFCommand}, protos::coresdk::{ @@ -55,6 +55,13 @@ impl TestWfDriverCache { bc.issued_commands.remove(&CommandID::Timer(id.to_owned())); } + /// Cancel activity by ID. + fn cancel_activity(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + bc.issued_commands + .remove(&CommandID::Activity(id.to_owned())); + } + /// Track a new command that the wf has sent down the command sink. The command starts in /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed fn add_sent_cmd(&self, id: CommandID) -> oneshot::Receiver<()> { @@ -157,6 +164,16 @@ impl CommandSender { self.twd_cache.cancel_timer(timer_id); self.send(c); } + + /// Cancel activity + pub fn cancel_activity(&self, activity_id: &str) { + let c = WFCommand::RequestCancelActivity(RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + }); + self.twd_cache.cancel_activity(activity_id); + self.send(c); + } } impl TestWorkflowDriver { diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 94a489659..e70405918 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -586,6 +586,9 @@ impl WorkflowMachines { machine: m_key, }) } + MachineResponse::PushWFJob(j) => { + self.drive_me.send_job(j); + } MachineResponse::NoOp => {} v => { return Err(WFMachinesError::UnexpectedMachineResponse( From cb4fb15326240490493a4bf89d16f6769037dafe Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 1 Apr 2021 09:35:37 -0700 Subject: [PATCH 09/19] Allow returning vec of responses from Cancellable::cancel Also removes stupid NoOp response --- src/machines/activity_state_machine.rs | 48 ++++++++++---------- src/machines/mod.rs | 6 +-- src/machines/timer_state_machine.rs | 8 ++-- src/machines/workflow_machines.rs | 63 +++++++++++++------------- tests/integ_tests/simple_wf_tests.rs | 4 +- 5 files changed, 66 insertions(+), 63 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 5496f2b32..96692edde 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -157,7 +157,7 @@ impl TryFrom for ActivityMachineEvents { } else { return Err(WFMachinesError::MalformedEvent( e, - "Activity completion attributes were unset".to_string(), + "Activity failure attributes were unset".to_string(), )); } } @@ -171,7 +171,7 @@ impl TryFrom for ActivityMachineEvents { } else { return Err(WFMachinesError::MalformedEvent( e, - "Activity cancelation attributes were unset".to_string(), + "Activity cancellation attributes were unset".to_string(), )); } } @@ -235,28 +235,28 @@ impl TryFrom for ActivityMachineEvents { } impl Cancellable for ActivityMachine { - fn cancel(&mut self) -> Result> { - let mut vec = self.on_event_mut(ActivityMachineEvents::Cancel)?; - // TODO it could be possible to have multiple events, e.g. cancellation command and immediate activation, we should handle it. - if vec.len() != 1 { - unimplemented!() - } - Ok(match vec.pop() { - Some(ActivityMachineCommand::RequestCancellation(cmd)) => { - MachineResponse::IssueNewCommand(cmd) - } - Some(ActivityMachineCommand::Fail(failure)) => { - MachineResponse::PushWFJob(Variant::ResolveActivity(ResolveActivity { - activity_id: self.shared_state.attrs.activity_id.clone(), - result: Some(ActivityResult { - status: Some(activity_result::Status::Failed(ar::Failure { - failure: failure.map(Into::into), - })), - }), - })) - } - x => panic!("Invalid cancel event response {:?}", x), - }) + fn cancel(&mut self) -> Result, MachineError> { + let vec = self.on_event_mut(ActivityMachineEvents::Cancel)?; + let res = vec + .into_iter() + .map(|amc| match amc { + ActivityMachineCommand::RequestCancellation(cmd) => { + MachineResponse::IssueNewCommand(cmd) + } + ActivityMachineCommand::Fail(failure) => { + MachineResponse::PushWFJob(Variant::ResolveActivity(ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(ar::Failure { + failure: failure.map(Into::into), + })), + }), + })) + } + x => panic!("Invalid cancel event response {:?}", x), + }) + .collect(); + Ok(res) } fn was_cancelled_before_sent_to_server(&self) -> bool { diff --git a/src/machines/mod.rs b/src/machines/mod.rs index ade531996..0880fb9d1 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -108,7 +108,7 @@ trait TemporalStateMachine: CheckStateMachineInFinal + Send { ) -> Result, WFMachinesError>; /// Attempt to cancel the command associated with this state machine, if it is cancellable - fn cancel(&mut self) -> Result; + fn cancel(&mut self) -> Result, WFMachinesError>; /// Should return true if the command was cancelled before we sent it to the server. Always /// returns false for non-cancellable machines @@ -187,7 +187,7 @@ where } } - fn cancel(&mut self) -> Result { + fn cancel(&mut self) -> Result, WFMachinesError> { let res = self.cancel(); res.map_err(|e| match e { MachineError::InvalidTransition => { @@ -238,7 +238,7 @@ trait Cancellable: StateMachine { /// # Panics /// * If the machine is not cancellable. It's a logic error on our part to call it on such /// machines. - fn cancel(&mut self) -> Result> { + fn cancel(&mut self) -> Result, MachineError> { // It's a logic error on our part if this is ever called on a machine that can't actually // be cancelled panic!("Machine {} cannot be cancelled", self.name()) diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index f12c1bec7..a51cd4a29 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -245,10 +245,12 @@ impl WFMachinesAdapter for TimerMachine { } impl Cancellable for TimerMachine { - fn cancel(&mut self) -> Result> { + fn cancel(&mut self) -> Result, MachineError> { Ok(match self.on_event_mut(TimerMachineEvents::Cancel)?.pop() { - Some(TimerMachineCommand::IssueCancelCmd(cmd)) => MachineResponse::IssueNewCommand(cmd), - Some(TimerMachineCommand::Canceled) => MachineResponse::NoOp, + Some(TimerMachineCommand::IssueCancelCmd(cmd)) => { + vec![MachineResponse::IssueNewCommand(cmd)] + } + Some(TimerMachineCommand::Canceled) => vec![], x => panic!("Invalid cancel event response {:?}", x), }) } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index e70405918..f4b875edb 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -108,7 +108,6 @@ pub enum MachineResponse { UpdateRunIdOnWorkflowReset { run_id: String, }, - NoOp, } #[derive(thiserror::Error, Debug)] @@ -529,7 +528,6 @@ impl WorkflowMachines { }, )); } - MachineResponse::NoOp => (), MachineResponse::IssueNewCommand(_) => { panic!("Issue new command machine response not expected here") } @@ -552,19 +550,21 @@ impl WorkflowMachines { let m_key = self.get_machine_key(&CommandID::Timer(attrs.timer_id.to_owned()))?; let res = self.machine_mut(m_key).cancel()?; - match res { - MachineResponse::IssueNewCommand(c) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: c, - machine: m_key, - }) - } - MachineResponse::NoOp => {} - v => { - return Err(WFMachinesError::UnexpectedMachineResponse( - v, - "When cancelling timer", - )); + // TODO: Dedupe with req cancel activity + for r in res { + match r { + MachineResponse::IssueNewCommand(c) => { + self.current_wf_task_commands.push_back(CommandAndMachine { + command: c, + machine: m_key, + }) + } + v => { + return Err(WFMachinesError::UnexpectedMachineResponse( + v, + "When cancelling timer", + )); + } } } } @@ -579,22 +579,23 @@ impl WorkflowMachines { let m_key = self.get_machine_key(&CommandID::Activity(attrs.activity_id.to_owned()))?; let res = self.machine_mut(m_key).cancel()?; - match res { - MachineResponse::IssueNewCommand(c) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: c, - machine: m_key, - }) - } - MachineResponse::PushWFJob(j) => { - self.drive_me.send_job(j); - } - MachineResponse::NoOp => {} - v => { - return Err(WFMachinesError::UnexpectedMachineResponse( - v, - "When cancelling activity", - )); + for r in res { + match r { + MachineResponse::IssueNewCommand(c) => { + self.current_wf_task_commands.push_back(CommandAndMachine { + command: c, + machine: m_key, + }) + } + MachineResponse::PushWFJob(j) => { + self.drive_me.send_job(j); + } + v => { + return Err(WFMachinesError::UnexpectedMachineResponse( + v, + "When cancelling activity", + )); + } } } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 5f71e75c5..147a46d50 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -384,8 +384,8 @@ async fn activity_cancellation() { )) .await .unwrap(); - // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to complete - // it in this test as activity is getting cancelled. + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is try-cancelled. let activity_task = dbg!(core.poll_activity_task(task_q).await.unwrap()); assert_matches!( activity_task.variant, From 6a9dd855d0121fcde919b1efae33a0ecc9b6ee70 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 1 Apr 2021 11:53:48 -0700 Subject: [PATCH 10/19] push notes to share with Vitaly --- src/lib.rs | 6 +++ src/machines/activity_state_machine.rs | 65 ++++++++++++++++++-------- src/machines/workflow_machines.rs | 3 ++ tests/integ_tests/simple_wf_tests.rs | 28 +++++------ 4 files changed, 70 insertions(+), 32 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0575f5d81..c3a87b7cf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -391,6 +391,12 @@ impl CoreSDK { // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught // up on replay. + // TODO: This may no longer be true. In the activity abandon/trycancel case, we need a new + // activation to resolve the activity, but we also need to pump this command out. Can + // probably deal with this by having `push_lang_commands` return (from deep below) a + // signal that a new PA should be enqueued *after* sending out commands, but then it + // somehow needs to be replay aware (maybe by checking there weren't already any PAs). + // That could be hard. Alternatively, perhaps we just buffer the returned commands if !self.pending_activations.has_pending(&run_id) { self.server_gateway .complete_workflow_task(task_token, commands) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 96692edde..ad824424b 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -127,6 +127,25 @@ impl ActivityMachine { }; (s, cmd) } + + fn machine_responses_from_cancel_request(&self, cancel_cmd: Command) -> Vec { + let mut r = vec![MachineResponse::IssueNewCommand(cancel_cmd)]; + if self.shared_state.cancellation_type + != ActivityCancellationType::WaitCancellationCompleted + { + r.push(MachineResponse::PushWFJob(Variant::ResolveActivity( + ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Canceled(ar::Cancelation { + details: None, + })), + }), + }, + ))) + } + r + } } impl TryFrom for ActivityMachineEvents { @@ -206,17 +225,21 @@ impl WFMachinesAdapter for ActivityMachine { } .into()] } - ActivityMachineCommand::Fail(failure) => vec![ResolveActivity { - activity_id: self.shared_state.attrs.activity_id.clone(), - result: Some(ActivityResult { - status: Some(activity_result::Status::Failed(ar::Failure { - failure: failure.map(Into::into), - })), - }), + ActivityMachineCommand::Fail(failure) => { + warn!("I'm a doin' a failure!"); + vec![ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(ar::Failure { + failure: failure.map(Into::into), + })), + }), + } + .into()] } - .into()], ActivityMachineCommand::RequestCancellation(c) => { - vec![MachineResponse::IssueNewCommand(c)] + warn!("I'm a requesting a cancel!"); + self.machine_responses_from_cancel_request(c) } }) } @@ -239,19 +262,23 @@ impl Cancellable for ActivityMachine { let vec = self.on_event_mut(ActivityMachineEvents::Cancel)?; let res = vec .into_iter() - .map(|amc| match amc { + .flat_map(|amc| match amc { ActivityMachineCommand::RequestCancellation(cmd) => { - MachineResponse::IssueNewCommand(cmd) + warn!("Request cancel from cancel"); + self.machine_responses_from_cancel_request(cmd) } ActivityMachineCommand::Fail(failure) => { - MachineResponse::PushWFJob(Variant::ResolveActivity(ResolveActivity { - activity_id: self.shared_state.attrs.activity_id.clone(), - result: Some(ActivityResult { - status: Some(activity_result::Status::Failed(ar::Failure { - failure: failure.map(Into::into), - })), - }), - })) + warn!("fail from cancel"); + vec![MachineResponse::PushWFJob(Variant::ResolveActivity( + ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(ar::Failure { + failure: failure.map(Into::into), + })), + }), + }, + ))] } x => panic!("Invalid cancel event response {:?}", x), }) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index f4b875edb..27a14cef0 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -447,6 +447,8 @@ impl WorkflowMachines { let results = self.drive_me.fetch_workflow_iteration_output(); self.handle_driven_results(results)?; self.prepare_commands()?; + // TODO: Handling driven results might also mean that we need to push a new pending + // activation Ok(()) } @@ -579,6 +581,7 @@ impl WorkflowMachines { let m_key = self.get_machine_key(&CommandID::Activity(attrs.activity_id.to_owned()))?; let res = self.machine_mut(m_key).cancel()?; + debug!(machine_responses = ?res, "Req cancel activity responses"); for r in res { match r { MachineResponse::IssueNewCommand(c) => { diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 147a46d50..a6701f42a 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -19,8 +19,8 @@ use temporal_sdk_core::{ }, workflow_completion::WfActivationCompletion, }, - CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, ServerGatewayOptions, - Url, + tracing_init, CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, + ServerGatewayOptions, Url, }; // TODO: These tests can get broken permanently if they break one time and the server is not @@ -366,6 +366,8 @@ fn schedule_activity_and_timer_cmds( #[tokio::test] async fn activity_cancellation() { + tracing_init(); + let mut rng = rand::thread_rng(); let task_q_salt: u32 = rng.gen(); let task_q = &format!("activity_failed_workflow_{}", task_q_salt.to_string()); @@ -386,7 +388,7 @@ async fn activity_cancellation() { .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to // complete it in this test as activity is try-cancelled. - let activity_task = dbg!(core.poll_activity_task(task_q).await.unwrap()); + let activity_task = core.poll_activity_task(task_q).await.unwrap(); assert_matches!( activity_task.variant, Some(act_task::Variant::Start(start_activity)) => { @@ -417,16 +419,16 @@ async fn activity_cancellation() { )) .await .unwrap(); - core.complete_activity_task(ActivityTaskCompletion { - task_token: activity_task.task_token, - result: Some(ActivityResult { - status: Some(activity_result::activity_result::Status::Canceled( - activity_result::Cancelation { details: None }, - )), - }), - }) - .await - .unwrap(); + // core.complete_activity_task(ActivityTaskCompletion { + // task_token: activity_task.task_token, + // result: Some(ActivityResult { + // status: Some(activity_result::activity_result::Status::Canceled( + // activity_result::Cancelation { details: None }, + // )), + // }), + // }) + // .await + // .unwrap(); let task = core.poll_workflow_task(task_q).await.unwrap(); core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( vec![CompleteWorkflowExecution { result: None }.into()], From d689d0e9cd6764b6b4f592e7006a32c176880c51 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 1 Apr 2021 23:12:00 -0700 Subject: [PATCH 11/19] Add cancellation failure cause for failures --- src/machines/activity_state_machine.rs | 38 ++++++++++++++++---------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index ad824424b..512fc2121 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -6,7 +6,10 @@ use crate::protos::coresdk::PayloadsExt; use crate::protos::temporal::api::common::v1::ActivityType; use crate::protos::temporal::api::enums::v1::RetryState; use crate::protos::temporal::api::enums::v1::RetryState::CancelRequested; -use crate::protos::temporal::api::failure::v1::{failure, ActivityFailureInfo, Failure}; +use crate::protos::temporal::api::failure::v1::failure::FailureInfo; +use crate::protos::temporal::api::failure::v1::{ + failure, ActivityFailureInfo, CanceledFailureInfo, Failure, +}; use crate::protos::temporal::api::history::v1::ActivityTaskCanceledEventAttributes; use crate::{ machines::{ @@ -550,24 +553,31 @@ fn notify_canceled( dat: SharedState, next_state: ActivityMachineState, ) -> TransitionResult { + let cancelled_failure = Failure { + source: "CoreSDK".to_string(), + failure_info: Some(FailureInfo::CanceledFailureInfo(CanceledFailureInfo { + details: None, + })), + ..Default::default() + }; + let activity_failure_info = ActivityFailureInfo { + activity_id: dat.attrs.activity_id.to_string(), + activity_type: Some(ActivityType { + name: dat.attrs.activity_type.to_string(), + }), + scheduled_event_id: dat.scheduled_event_id, + started_event_id: dat.started_event_id, + identity: "workflow".to_string(), + retry_state: RetryState::Unspecified as i32, + }; ActivityMachineTransition::ok_shared( vec![ActivityMachineCommand::Fail(Some(Failure { message: "Activity canceled".to_string(), - source: "CoreSDK".to_string(), - stack_trace: "".to_string(), - cause: None, + cause: Some(Box::new(cancelled_failure)), failure_info: Some(failure::FailureInfo::ActivityFailureInfo( - ActivityFailureInfo { - scheduled_event_id: dat.scheduled_event_id, - started_event_id: dat.started_event_id, - identity: "workflow".to_string(), - activity_type: Some(ActivityType { - name: dat.attrs.activity_type.to_string(), - }), - activity_id: dat.attrs.activity_id.to_string(), - retry_state: RetryState::Unspecified as i32, - }, + activity_failure_info, )), + ..Default::default() }))], next_state, dat, From 94b8a3c539e74183831123f8c8b2f86119f91583 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Fri, 2 Apr 2021 00:26:55 -0700 Subject: [PATCH 12/19] Add lib test for activity cancellation and refactor out notify_canceled_from_event --- src/lib.rs | 41 +++++++++++++++++ src/machines/activity_state_machine.rs | 63 ++++++++++---------------- src/test_help/canned_histories.rs | 51 ++++++++++++++++++++- 3 files changed, 116 insertions(+), 39 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 02f34266d..5a3416e4a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -522,6 +522,7 @@ impl CoreSDK { #[cfg(test)] mod test { use super::*; + use crate::protos::coresdk::workflow_commands::RequestCancelActivity; use crate::{ machines::test_help::{ build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply, FakeCore, @@ -735,6 +736,46 @@ mod test { .await; } + #[rstest(hist_batches, case::incremental(&[1, 2, 3]), case::replay(&[3]))] + #[tokio::test] + async fn activity_cancel_test_across_wf_bridge(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let activity_id = "fake_activity"; + let signal_id = "signal"; + + let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + #[rstest(single_timer_setup(&[1]))] #[tokio::test] async fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) { diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 512fc2121..fd7c9d9c5 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -340,7 +340,7 @@ impl ScheduleCommandCreated { ActivityCancellationType::Abandon => { ActivityMachineTransition::ok_shared(vec![], Canceled::default(), canceled_state) } - _ => notify_canceled(canceled_state, Canceled::default().into()), + _ => notify_canceled(canceled_state, None, Canceled::default().into()), } } } @@ -421,9 +421,11 @@ impl ScheduledActivityCancelCommandCreated { ) -> ActivityMachineTransition { // notifyCanceledIfTryCancel match dat.cancellation_type { - ActivityCancellationType::TryCancel => { - notify_canceled(dat, ScheduledActivityCancelCommandCreated::default().into()) - } + ActivityCancellationType::TryCancel => notify_canceled( + dat, + None, + ScheduledActivityCancelCommandCreated::default().into(), + ), _ => ActivityMachineTransition::default::(), } } @@ -439,7 +441,7 @@ impl ScheduledActivityCancelEventRecorded { _attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { // notify_canceled - notify_canceled(dat, Canceled::default().into()) + notify_canceled(dat, None, Canceled::default().into()) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { @@ -464,9 +466,11 @@ impl StartedActivityCancelCommandCreated { ) -> ActivityMachineTransition { // notifyCanceledIfTryCancel match dat.cancellation_type { - ActivityCancellationType::TryCancel => { - notify_canceled(dat, StartedActivityCancelEventRecorded::default().into()) - } + ActivityCancellationType::TryCancel => notify_canceled( + dat, + None, + StartedActivityCancelEventRecorded::default().into(), + ), _ => ActivityMachineTransition::default::(), } } @@ -506,7 +510,12 @@ impl StartedActivityCancelEventRecorded { attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { // notifyCancellationFromEvent - notify_canceled_from_event(dat, attrs) + match dat.cancellation_type { + ActivityCancellationType::WaitCancellationCompleted => { + notify_canceled(dat, Some(attrs), Canceled::default().into()) + } + _ => ActivityMachineTransition::ok(vec![], Canceled::default()), + } } } @@ -551,8 +560,13 @@ fn create_request_cancel_activity_task_command( fn notify_canceled( dat: SharedState, + canceled_event: Option, next_state: ActivityMachineState, ) -> TransitionResult { + let (scheduled_event_id, started_event_id) = match canceled_event { + None => (dat.scheduled_event_id, dat.started_event_id), + Some(e) => (e.scheduled_event_id, e.scheduled_event_id), + }; let cancelled_failure = Failure { source: "CoreSDK".to_string(), failure_info: Some(FailureInfo::CanceledFailureInfo(CanceledFailureInfo { @@ -565,8 +579,8 @@ fn notify_canceled( activity_type: Some(ActivityType { name: dat.attrs.activity_type.to_string(), }), - scheduled_event_id: dat.scheduled_event_id, - started_event_id: dat.started_event_id, + scheduled_event_id, + started_event_id, identity: "workflow".to_string(), retry_state: RetryState::Unspecified as i32, }; @@ -584,33 +598,6 @@ fn notify_canceled( ) } -fn notify_canceled_from_event( - dat: SharedState, - attrs: ActivityTaskCanceledEventAttributes, -) -> TransitionResult { - ActivityMachineTransition::ok( - vec![ActivityMachineCommand::Fail(Some(Failure { - message: "Activity canceled".to_string(), - source: "CoreSDK".to_string(), - stack_trace: "".to_string(), - cause: None, - failure_info: Some(failure::FailureInfo::ActivityFailureInfo( - ActivityFailureInfo { - scheduled_event_id: attrs.scheduled_event_id, - started_event_id: attrs.started_event_id, - identity: "".to_string(), // TODO ? - activity_type: Some(ActivityType { - name: dat.attrs.activity_type.to_string(), - }), - activity_id: dat.attrs.activity_id, - retry_state: RetryState::Unspecified as i32, - }, - )), - }))], - Canceled::default(), - ) -} - #[cfg(test)] mod test { use super::*; diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 5d01649d6..88bea963f 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -3,7 +3,8 @@ use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ - history_event, ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, + history_event, ActivityTaskCancelRequestedEventAttributes, + ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskScheduledEventAttributes, ActivityTaskStartedEventAttributes, TimerCanceledEventAttributes, TimerFiredEventAttributes, }; @@ -253,6 +254,54 @@ pub fn single_failed_activity(activity_id: &str) -> TestHistoryBuilder { t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED +/// 11: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 12: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let scheduled_event_id = t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add( + EventType::ActivityTaskCancelRequested, + history_event::Attributes::ActivityTaskCancelRequestedEventAttributes( + ActivityTaskCancelRequestedEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ); + t.add_workflow_task_scheduled_and_started(); + t +} + /// First signal's payload is "hello " and second is "world" (no metadata for either) /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED /// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED From 3e55a8f645f08b7c29a16cb925cd142d9f59cba2 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Fri, 2 Apr 2021 23:15:30 -0700 Subject: [PATCH 13/19] Allow pushing lang commands immediately and add tests for TryCancel and WaitCancellationCompleted --- src/lib.rs | 76 ++++++++++++++++++++++--- src/machines/activity_state_machine.rs | 2 + src/machines/workflow_machines.rs | 22 +++++--- src/test_help/canned_histories.rs | 78 +++++++++++++++++++++++++- src/workflow/mod.rs | 15 ++++- 5 files changed, 172 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5a3416e4a..25030a5de 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,9 +30,10 @@ pub use core_tracing::tracing_init; pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; +use crate::workflow::PushCommandsResult; use crate::{ errors::{ShutdownErr, WorkflowUpdateError}, - machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand}, + machines::{EmptyWorkflowCommandErr, WFCommand}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ @@ -403,7 +404,13 @@ impl CoreSDK { .to_owned(), completion: None, })?; - let commands = self.push_lang_commands(&run_id, cmds)?; + let push_result = self.push_lang_commands(&run_id, cmds)?; + if push_result.has_new_lang_jobs { + self.pending_activations.push(PendingActivation { + run_id: run_id.to_string(), + task_token: task_token.clone(), + }); + } // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught // up on replay. @@ -415,7 +422,7 @@ impl CoreSDK { // That could be hard. Alternatively, perhaps we just buffer the returned commands if !self.pending_activations.has_pending(&run_id) { self.server_gateway - .complete_workflow_task(task_token, commands) + .complete_workflow_task(task_token, push_result.server_commands) .await .map_err(|ts| { if ts.code() == tonic::Code::InvalidArgument @@ -490,7 +497,7 @@ impl CoreSDK { &self, run_id: &str, cmds: Vec, - ) -> Result, WorkflowUpdateError> { + ) -> Result { self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } @@ -522,7 +529,9 @@ impl CoreSDK { #[cfg(test)] mod test { use super::*; - use crate::protos::coresdk::workflow_commands::RequestCancelActivity; + use crate::protos::coresdk::workflow_commands::{ + ActivityCancellationType, RequestCancelActivity, + }; use crate::{ machines::test_help::{ build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply, FakeCore, @@ -736,9 +745,9 @@ mod test { .await; } - #[rstest(hist_batches, case::incremental(&[1, 2, 3]), case::replay(&[3]))] + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] - async fn activity_cancel_test_across_wf_bridge(hist_batches: &[usize]) { + async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let activity_id = "fake_activity"; let signal_id = "signal"; @@ -755,6 +764,53 @@ mod test { &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), vec![ScheduleActivity { activity_id: activity_id.to_string(), + cancellation_type: ActivityCancellationType::TryCancel as i32, + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + // Activity is getting resolved right away as we are in the TryCancel mode. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + + #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))] + #[tokio::test] + async fn scheduled_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let activity_id = "fake_activity"; + let signal_id = "signal"; + + let mut t = canned_histories::cancel_scheduled_activity_with_activity_task_cancel( + activity_id, + signal_id, + ); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: activity_id.to_string(), + cancellation_type: ActivityCancellationType::WaitCancellationCompleted + as i32, ..Default::default() } .into()], @@ -767,6 +823,12 @@ mod test { } .into()], ), + // Making sure that activity is not resolved until it's cancelled. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![], + ), + // Now ActivityTaskCanceled has been processed and activity can be resolved. gen_assert_and_reply( &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), vec![CompleteWorkflowExecution { result: None }.into()], diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index fd7c9d9c5..57e142cb6 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -558,6 +558,8 @@ fn create_request_cancel_activity_task_command( ) } +/// Notifies lang side that activity has been cancelled by sending a failure with cancelled failure as a cause. +/// Optional cancelled_event, if passed, is used to supply event IDs. fn notify_canceled( dat: SharedState, canceled_event: Option, diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 27a14cef0..6dd06e4f4 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -443,13 +443,15 @@ impl WorkflowMachines { /// Iterate the state machines, which consists of grabbing any pending outgoing commands from /// the workflow, handling them, and preparing them to be sent off to the server. - pub(crate) fn iterate_machines(&mut self) -> Result<()> { + pub(crate) fn iterate_machines(&mut self) -> Result { let results = self.drive_me.fetch_workflow_iteration_output(); - self.handle_driven_results(results)?; + let jobs = self.handle_driven_results(results)?; + let has_new_lang_jobs = !jobs.is_empty(); + for job in jobs.into_iter() { + self.drive_me.send_job(job); + } self.prepare_commands()?; - // TODO: Handling driven results might also mean that we need to push a new pending - // activation - Ok(()) + Ok(has_new_lang_jobs) } /// Apply events from history to this machines instance @@ -538,7 +540,11 @@ impl WorkflowMachines { Ok(()) } - fn handle_driven_results(&mut self, results: Vec) -> Result<()> { + fn handle_driven_results( + &mut self, + results: Vec, + ) -> Result> { + let mut jobs = vec![]; for cmd in results { match cmd { WFCommand::AddTimer(attrs) => { @@ -591,7 +597,7 @@ impl WorkflowMachines { }) } MachineResponse::PushWFJob(j) => { - self.drive_me.send_job(j); + jobs.push(j); } v => { return Err(WFMachinesError::UnexpectedMachineResponse( @@ -613,7 +619,7 @@ impl WorkflowMachines { WFCommand::NoCommandsFromLang => (), } } - Ok(()) + Ok(jobs) } fn get_machine_key(&mut self, id: &CommandID) -> Result { diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 88bea963f..74df53f31 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -3,7 +3,7 @@ use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ - history_event, ActivityTaskCancelRequestedEventAttributes, + history_event, ActivityTaskCancelRequestedEventAttributes, ActivityTaskCanceledEventAttributes, ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskScheduledEventAttributes, ActivityTaskStartedEventAttributes, TimerCanceledEventAttributes, TimerFiredEventAttributes, @@ -264,8 +264,8 @@ pub fn single_failed_activity(activity_id: &str) -> TestHistoryBuilder { /// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED /// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED /// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED -/// 11: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED -/// 12: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 12: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 13: EVENT_TYPE_WORKFLOW_TASK_STARTED pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -302,6 +302,78 @@ pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHist t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED +/// 11: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 12: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 13: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 14: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 15: EVENT_TYPE_ACTIVITY_TASK_CANCELED +/// 16: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 17: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn cancel_scheduled_activity_with_activity_task_cancel( + activity_id: &str, + signal_id: &str, +) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let scheduled_event_id = t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add( + EventType::ActivityTaskCancelRequested, + history_event::Attributes::ActivityTaskCancelRequestedEventAttributes( + ActivityTaskCancelRequestedEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add( + EventType::ActivityTaskCanceled, + history_event::Attributes::ActivityTaskCanceledEventAttributes( + ActivityTaskCanceledEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ); + t.add_workflow_task_scheduled_and_started(); + t +} /// First signal's payload is "hello " and second is "world" (no metadata for either) /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED /// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index a729cb32c..e6af921c1 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -100,6 +100,12 @@ impl NextWfActivation { } } +#[derive(Debug)] +pub(crate) struct PushCommandsResult { + pub server_commands: Vec, + pub has_new_lang_jobs: bool, +} + impl WorkflowManager { /// Given history that was just obtained from the server, pipe it into this workflow's machines. /// @@ -147,9 +153,12 @@ impl WorkflowManager { /// Feed the workflow machines new commands issued by the executing workflow code, iterate the /// workflow machines, and spit out the commands which are ready to be sent off to the server - pub fn push_commands(&mut self, cmds: Vec) -> Result> { + pub fn push_commands(&mut self, cmds: Vec) -> Result { self.command_sink.send(cmds)?; - self.machines.iterate_machines()?; - Ok(self.machines.get_commands()) + let has_new_lang_jobs = self.machines.iterate_machines()?; + Ok(PushCommandsResult { + server_commands: self.machines.get_commands(), + has_new_lang_jobs, + }) } } From 92e75bc3c6dd702883b8b66bbf331b8ae636b500 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Fri, 2 Apr 2021 23:21:40 -0700 Subject: [PATCH 14/19] Add integ test for WaitCancellationCompleted cancellation --- tests/integ_tests/simple_wf_tests.rs | 89 ++++++++++++++++++++++++---- 1 file changed, 79 insertions(+), 10 deletions(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index a6701f42a..f77944d35 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -3,6 +3,7 @@ use crossbeam::channel::{unbounded, RecvTimeoutError}; use futures::{channel::mpsc::UnboundedReceiver, future, Future, SinkExt, StreamExt}; use rand::{self, Rng}; use std::{collections::HashMap, convert::TryFrom, env, sync::Arc, time::Duration}; +use temporal_sdk_core::protos::coresdk::workflow_commands::ActivityCancellationType; use temporal_sdk_core::protos::coresdk::ActivityTaskCompletion; use temporal_sdk_core::{ protos::coresdk::{ @@ -338,6 +339,7 @@ fn schedule_activity_and_timer_cmds( task_q: &str, activity_id: &str, timer_id: &str, + cancellation_type: ActivityCancellationType, task: WfActivation, ) -> WfActivationCompletion { WfActivationCompletion::ok_from_cmds( @@ -351,6 +353,7 @@ fn schedule_activity_and_timer_cmds( start_to_close_timeout: Some(Duration::from_secs(30).into()), schedule_to_close_timeout: Some(Duration::from_secs(60).into()), heartbeat_timeout: Some(Duration::from_secs(60).into()), + cancellation_type: cancellation_type as i32, ..Default::default() } .into(), @@ -382,6 +385,7 @@ async fn activity_cancellation() { task_q, &activity_id, &timer_id, + ActivityCancellationType::TryCancel, task, )) .await @@ -419,16 +423,81 @@ async fn activity_cancellation() { )) .await .unwrap(); - // core.complete_activity_task(ActivityTaskCompletion { - // task_token: activity_task.task_token, - // result: Some(ActivityResult { - // status: Some(activity_result::activity_result::Status::Canceled( - // activity_result::Cancelation { details: None }, - // )), - // }), - // }) - // .await - // .unwrap(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); +} + +#[tokio::test] +async fn activity_cancellation_abandon() { + tracing_init(); + + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_failed_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core().await; + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + task_q, + &activity_id, + &timer_id, + ActivityCancellationType::WaitCancellationCompleted, + task, + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is try-cancelled. + let activity_task = core.poll_activity_task(task_q).await.unwrap(); + assert_matches!( + activity_task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], + task.task_token, + )) + .await + .unwrap(); + core.complete_activity_task(ActivityTaskCompletion { + task_token: activity_task.task_token, + result: Some(ActivityResult { + status: Some(activity_result::activity_result::Status::Canceled( + activity_result::Cancelation { details: None }, + )), + }), + }) + .await + .unwrap(); let task = core.poll_workflow_task(task_q).await.unwrap(); core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( vec![CompleteWorkflowExecution { result: None }.into()], From 745fb27c169022ed68c411ecb173513da5ade460 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 5 Apr 2021 18:51:29 -0700 Subject: [PATCH 15/19] Activity cancellation with abandon type + address code review comments --- src/lib.rs | 74 +++++++++++++++------ src/machines/activity_state_machine.rs | 90 +++++++++++++++----------- src/machines/test_help/mod.rs | 10 ++- src/machines/workflow_machines.rs | 7 ++ src/test_help/canned_histories.rs | 42 +++++++++++- src/workflow/mod.rs | 3 +- tests/integ_tests/simple_wf_tests.rs | 75 +++++++++++++++++++-- 7 files changed, 238 insertions(+), 63 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 25030a5de..8da563362 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -414,12 +414,6 @@ impl CoreSDK { // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught // up on replay. - // TODO: This may no longer be true. In the activity abandon/trycancel case, we need a new - // activation to resolve the activity, but we also need to pump this command out. Can - // probably deal with this by having `push_lang_commands` return (from deep below) a - // signal that a new PA should be enqueued *after* sending out commands, but then it - // somehow needs to be replay aware (maybe by checking there weren't already any PAs). - // That could be hard. Alternatively, perhaps we just buffer the returned commands if !self.pending_activations.has_pending(&run_id) { self.server_gateway .complete_workflow_task(task_token, push_result.server_commands) @@ -564,7 +558,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_timer("fake_timer"); - build_fake_core(wfid, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches, None) } #[fixture(hist_batches = &[])] @@ -572,7 +566,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_activity("fake_activity"); - build_fake_core(wfid, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches, None) } #[fixture(hist_batches = &[])] @@ -580,7 +574,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_failed_activity("fake_activity"); - build_fake_core(wfid, &mut t, hist_batches) + build_fake_core(wfid, &mut t, hist_batches, None) } #[rstest] @@ -650,7 +644,7 @@ mod test { let timer_2_id = "timer2"; let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, None); poll_and_reply( &core, @@ -708,7 +702,7 @@ mod test { let cancel_timer_id = "cancel_timer"; let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, None); poll_and_reply( &core, @@ -745,7 +739,7 @@ mod test { .await; } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(hist_batches, case::incremental(&[1, 2, 3]), case::replay(&[3]))] #[tokio::test] async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) { let wfid = "fake_wf_id"; @@ -753,7 +747,7 @@ mod test { let signal_id = "signal"; let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, Some(1)); poll_and_reply( &core, @@ -787,6 +781,48 @@ mod test { .await; } + #[rstest(hist_batches, case::incremental(&[1, 2, 3]), case::replay(&[3]))] + #[tokio::test] + async fn scheduled_activity_cancellation_abandon(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let activity_id = "fake_activity"; + let signal_id = "signal"; + + let mut t = canned_histories::cancel_scheduled_activity_abandon(activity_id, signal_id); + let core = build_fake_core(wfid, &mut t, hist_batches, Some(1)); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: activity_id.to_string(), + cancellation_type: ActivityCancellationType::Abandon as i32, + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + // Activity is getting resolved right away as we are in the Abandon mode. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))] #[tokio::test] async fn scheduled_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) { @@ -798,7 +834,7 @@ mod test { activity_id, signal_id, ); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, None); poll_and_reply( &core, @@ -862,7 +898,7 @@ mod test { let randomness_seed_from_start = AtomicU64::new(0); let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id); - let core = build_fake_core(wfid, &mut t, &[2]); + let core = build_fake_core(wfid, &mut t, &[2], None); poll_and_reply( &core, @@ -924,7 +960,7 @@ mod test { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, None); poll_and_reply( &core, @@ -959,7 +995,7 @@ mod test { let timer_id = "timer"; let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id); - let mut core = build_fake_core(wfid, &mut t, batches); + let mut core = build_fake_core(wfid, &mut t, batches, None); // Need to create an expectation that we will call a failure completion Arc::get_mut(&mut core.server_gateway) .unwrap() @@ -1006,7 +1042,7 @@ mod test { let timer_id = "timer1"; let mut t = canned_histories::single_timer(timer_id); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, None); poll_and_reply( &core, @@ -1042,7 +1078,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::two_signals("sig1", "sig2"); - let core = build_fake_core(wfid, &mut t, hist_batches); + let core = build_fake_core(wfid, &mut t, hist_batches, None); poll_and_reply( &core, diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 57e142cb6..6b8ca02c2 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -57,11 +57,13 @@ fsm! { ScheduledEventRecorded --(ActivityTaskStarted(i64), shared on_task_started) --> Started; ScheduledEventRecorded --(ActivityTaskTimedOut, on_task_timed_out) --> TimedOut; ScheduledEventRecorded --(Cancel, shared on_canceled) --> ScheduledActivityCancelCommandCreated; + ScheduledEventRecorded --(Abandon, shared on_abandoned) --> Canceled; Started --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; Started --(ActivityTaskFailed(ActivityTaskFailedEventAttributes), on_activity_task_failed) --> Failed; Started --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; Started --(Cancel, shared on_canceled) --> StartedActivityCancelCommandCreated; + Started --(Abandon, shared on_abandoned) --> Canceled; ScheduledActivityCancelCommandCreated --(CommandRequestCancelActivityTask, @@ -229,7 +231,6 @@ impl WFMachinesAdapter for ActivityMachine { .into()] } ActivityMachineCommand::Fail(failure) => { - warn!("I'm a doin' a failure!"); vec![ResolveActivity { activity_id: self.shared_state.attrs.activity_id.clone(), result: Some(ActivityResult { @@ -241,7 +242,6 @@ impl WFMachinesAdapter for ActivityMachine { .into()] } ActivityMachineCommand::RequestCancellation(c) => { - warn!("I'm a requesting a cancel!"); self.machine_responses_from_cancel_request(c) } }) @@ -262,16 +262,18 @@ impl TryFrom for ActivityMachineEvents { impl Cancellable for ActivityMachine { fn cancel(&mut self) -> Result, MachineError> { - let vec = self.on_event_mut(ActivityMachineEvents::Cancel)?; + let event = match self.shared_state.cancellation_type { + ActivityCancellationType::Abandon => ActivityMachineEvents::Abandon, + _ => ActivityMachineEvents::Cancel, + }; + let vec = self.on_event_mut(event)?; let res = vec .into_iter() .flat_map(|amc| match amc { ActivityMachineCommand::RequestCancellation(cmd) => { - warn!("Request cancel from cancel"); self.machine_responses_from_cancel_request(cmd) } ActivityMachineCommand::Fail(failure) => { - warn!("fail from cancel"); vec![MachineResponse::PushWFJob(Variant::ResolveActivity( ResolveActivity { activity_id: self.shared_state.attrs.activity_id.clone(), @@ -340,7 +342,7 @@ impl ScheduleCommandCreated { ActivityCancellationType::Abandon => { ActivityMachineTransition::ok_shared(vec![], Canceled::default(), canceled_state) } - _ => notify_canceled(canceled_state, None, Canceled::default().into()), + _ => notify_lang_activity_cancelled(canceled_state, None, Canceled::default().into()), } } } @@ -364,7 +366,6 @@ impl ScheduledEventRecorded { ) } pub(super) fn on_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { @@ -373,6 +374,9 @@ impl ScheduledEventRecorded { ScheduledActivityCancelCommandCreated::default().into(), ) } + pub(super) fn on_abandoned(self, dat: SharedState) -> ActivityMachineTransition { + notify_lang_activity_cancelled(dat, None, Canceled::default().into()) + } } #[derive(Default, Clone)] @@ -383,7 +387,6 @@ impl Started { self, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { - // notify_completed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Complete(attrs.result)], Completed::default(), @@ -393,14 +396,12 @@ impl Started { self, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { - // notify_failed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Fail(attrs.failure)], Completed::default(), ) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { @@ -409,6 +410,9 @@ impl Started { StartedActivityCancelCommandCreated::default().into(), ) } + pub(super) fn on_abandoned(self, dat: SharedState) -> ActivityMachineTransition { + notify_lang_activity_cancelled(dat, None, Canceled::default().into()) + } } #[derive(Default, Clone)] @@ -419,9 +423,8 @@ impl ScheduledActivityCancelCommandCreated { self, dat: SharedState, ) -> ActivityMachineTransition { - // notifyCanceledIfTryCancel match dat.cancellation_type { - ActivityCancellationType::TryCancel => notify_canceled( + ActivityCancellationType::TryCancel => notify_lang_activity_cancelled( dat, None, ScheduledActivityCancelCommandCreated::default().into(), @@ -440,12 +443,10 @@ impl ScheduledActivityCancelEventRecorded { dat: SharedState, _attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { - // notify_canceled - notify_canceled(dat, None, Canceled::default().into()) + notify_lang_activity_cancelled(dat, None, Canceled::default().into()) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } } @@ -464,9 +465,8 @@ impl StartedActivityCancelCommandCreated { self, dat: SharedState, ) -> ActivityMachineTransition { - // notifyCanceledIfTryCancel match dat.cancellation_type { - ActivityCancellationType::TryCancel => notify_canceled( + ActivityCancellationType::TryCancel => notify_lang_activity_cancelled( dat, None, StartedActivityCancelEventRecorded::default().into(), @@ -484,7 +484,6 @@ impl StartedActivityCancelEventRecorded { self, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { - // notify_completed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Complete(attrs.result)], Completed::default(), @@ -494,14 +493,12 @@ impl StartedActivityCancelEventRecorded { self, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { - // notify_failed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Fail(attrs.failure)], Completed::default(), ) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } pub(super) fn on_activity_task_canceled( @@ -509,10 +506,9 @@ impl StartedActivityCancelEventRecorded { dat: SharedState, attrs: ActivityTaskCanceledEventAttributes, ) -> ActivityMachineTransition { - // notifyCancellationFromEvent match dat.cancellation_type { ActivityCancellationType::WaitCancellationCompleted => { - notify_canceled(dat, Some(attrs), Canceled::default().into()) + notify_lang_activity_cancelled(dat, Some(attrs), Canceled::default().into()) } _ => ActivityMachineTransition::ok(vec![], Canceled::default()), } @@ -525,6 +521,18 @@ impl From for StartedActivityCancelEventRe } } +impl From for Canceled { + fn from(_: ScheduledEventRecorded) -> Self { + Self::default() + } +} + +impl From for Canceled { + fn from(_: Started) -> Self { + Self::default() + } +} + #[derive(Default, Clone)] pub(super) struct Completed {} @@ -541,7 +549,6 @@ fn create_request_cancel_activity_task_command( dat: SharedState, next_state: ActivityMachineState, ) -> TransitionResult { - // createRequestCancelActivityTaskCommand let cmd = Command { command_type: CommandType::RequestCancelActivityTask as i32, attributes: Some( @@ -560,7 +567,8 @@ fn create_request_cancel_activity_task_command( /// Notifies lang side that activity has been cancelled by sending a failure with cancelled failure as a cause. /// Optional cancelled_event, if passed, is used to supply event IDs. -fn notify_canceled( +/// State machine will transition into the `next_state` provided as a parameter. +fn notify_lang_activity_cancelled( dat: SharedState, canceled_event: Option, next_state: ActivityMachineState, @@ -569,6 +577,20 @@ fn notify_canceled( None => (dat.scheduled_event_id, dat.started_event_id), Some(e) => (e.scheduled_event_id, e.scheduled_event_id), }; + ActivityMachineTransition::ok_shared( + vec![ActivityMachineCommand::Fail(Some( + new_cancellation_failure(&dat, scheduled_event_id, started_event_id), + ))], + next_state, + dat, + ) +} + +fn new_cancellation_failure( + dat: &SharedState, + scheduled_event_id: i64, + started_event_id: i64, +) -> Failure { let cancelled_failure = Failure { source: "CoreSDK".to_string(), failure_info: Some(FailureInfo::CanceledFailureInfo(CanceledFailureInfo { @@ -586,18 +608,14 @@ fn notify_canceled( identity: "workflow".to_string(), retry_state: RetryState::Unspecified as i32, }; - ActivityMachineTransition::ok_shared( - vec![ActivityMachineCommand::Fail(Some(Failure { - message: "Activity canceled".to_string(), - cause: Some(Box::new(cancelled_failure)), - failure_info: Some(failure::FailureInfo::ActivityFailureInfo( - activity_failure_info, - )), - ..Default::default() - }))], - next_state, - dat, - ) + Failure { + message: "Activity canceled".to_string(), + cause: Some(Box::new(cancelled_failure)), + failure_info: Some(failure::FailureInfo::ActivityFailureInfo( + activity_failure_info, + )), + ..Default::default() + } } #[cfg(test)] diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 49939da51..a46c3c501 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -24,6 +24,7 @@ use crate::{ Core, CoreSDK, }; use rand::{thread_rng, Rng}; +use std::cmp; use std::collections::VecDeque; pub(crate) type FakeCore = CoreSDK; @@ -34,10 +35,14 @@ pub(crate) type FakeCore = CoreSDK; /// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. /// For each number in the input list, a fake response will be prepared which includes history /// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. +/// Optional `lang_job_batch_count` defines the number of direct lang activation batches produced by the underlying +/// state machine. Each lang job batch would save us a poll to the server as it is going to be queued in the +/// pending activations. pub(crate) fn build_fake_core( wf_id: &str, t: &mut TestHistoryBuilder, response_batches: &[usize], + lang_job_batch_count: Option, ) -> FakeCore { let run_id = t.get_orig_run_id(); let wf = Some(WorkflowExecution { @@ -63,7 +68,10 @@ pub(crate) fn build_fake_core( let mut mock_gateway = MockServerGatewayApis::new(); mock_gateway .expect_poll_workflow_task() - .times(response_batches.len()) + .times(cmp::max( + 1, // At least one poll from the server is always needed. + response_batches.len() - lang_job_batch_count.unwrap_or(0), + )) .returning(move |_| Ok(tasks.pop_front().unwrap())); // Response not really important here mock_gateway diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 6dd06e4f4..5891414f5 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -443,6 +443,8 @@ impl WorkflowMachines { /// Iterate the state machines, which consists of grabbing any pending outgoing commands from /// the workflow, handling them, and preparing them to be sent off to the server. + /// Returns a boolean flag which indicates whether or not new activations were produced by the state + /// machine. If true, pending activation should be created by the caller making jobs available to the lang side. pub(crate) fn iterate_machines(&mut self) -> Result { let results = self.drive_me.fetch_workflow_iteration_output(); let jobs = self.handle_driven_results(results)?; @@ -540,6 +542,11 @@ impl WorkflowMachines { Ok(()) } + /// Handles results of the workflow activation, delegating work to the appropriate state machine. + /// Returns a list of workflow jobs that should be queued in the pending activation for the next poll. + /// This list will be populated only if state machine produced lang activations as part of command processing. + /// For example some types of activity cancellation need to immediately unblock lang side without + /// having it to poll for an actual workflow task from the server. fn handle_driven_results( &mut self, results: Vec, diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 74df53f31..322587453 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -264,8 +264,8 @@ pub fn single_failed_activity(activity_id: &str) -> TestHistoryBuilder { /// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED /// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED /// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED -/// 12: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED -/// 13: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 11: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 12: EVENT_TYPE_WORKFLOW_TASK_STARTED pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -302,6 +302,44 @@ pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHist t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 11: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn cancel_scheduled_activity_abandon(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add_workflow_task_scheduled_and_started(); + t +} + /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED /// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED /// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index e6af921c1..b7516f93c 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -152,7 +152,8 @@ impl WorkflowManager { } /// Feed the workflow machines new commands issued by the executing workflow code, iterate the - /// workflow machines, and spit out the commands which are ready to be sent off to the server + /// workflow machines, and spit out the commands which are ready to be sent off to the server, as + /// well as a possible indication that there are new jobs that must be sent to lang. pub fn push_commands(&mut self, cmds: Vec) -> Result { self.command_sink.send(cmds)?; let has_new_lang_jobs = self.machines.iterate_machines()?; diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index f77944d35..e32fe7476 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -368,12 +368,12 @@ fn schedule_activity_and_timer_cmds( } #[tokio::test] -async fn activity_cancellation() { +async fn activity_cancellation_try_cancel() { tracing_init(); let mut rng = rand::thread_rng(); let task_q_salt: u32 = rng.gen(); - let task_q = &format!("activity_failed_workflow_{}", task_q_salt.to_string()); + let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); let core = get_integ_core().await; let workflow_id: u32 = rng.gen(); create_workflow(&core, task_q, &workflow_id.to_string(), None).await; @@ -433,12 +433,12 @@ async fn activity_cancellation() { } #[tokio::test] -async fn activity_cancellation_abandon() { +async fn activity_cancellation_wait_cancellation_completed() { tracing_init(); let mut rng = rand::thread_rng(); let task_q_salt: u32 = rng.gen(); - let task_q = &format!("activity_failed_workflow_{}", task_q_salt.to_string()); + let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); let core = get_integ_core().await; let workflow_id: u32 = rng.gen(); create_workflow(&core, task_q, &workflow_id.to_string(), None).await; @@ -507,6 +507,73 @@ async fn activity_cancellation_abandon() { .unwrap(); } +#[tokio::test] +async fn activity_cancellation_abandon() { + tracing_init(); + + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core().await; + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + task_q, + &activity_id, + &timer_id, + ActivityCancellationType::Abandon, + task, + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is try-cancelled. + let activity_task = core.poll_activity_task(task_q).await.unwrap(); + assert_matches!( + activity_task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], + task.task_token, + )) + .await + .unwrap(); + // Poll workflow task expecting that activation has been created by the state machine + // immediately after the cancellation request. + let task = core.poll_workflow_task(task_q).await.unwrap(); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); +} + #[tokio::test] async fn parallel_timer_workflow() { let task_q = "parallel_timer_workflow"; From d16fe496b7c00170dd7e0fb5a36fed5638d1cd9f Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 5 Apr 2021 20:58:19 -0700 Subject: [PATCH 16/19] Extract common logic for timer and activity cancellation in workflow_machines --- src/machines/workflow_machines.rs | 84 +++++++++++++------------------ 1 file changed, 36 insertions(+), 48 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 5891414f5..f6b3f215e 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; use crate::protos::coresdk::PayloadsToPayloadError; use crate::{ core_tracing::VecDisplayer, @@ -130,7 +131,7 @@ pub enum WFMachinesError { #[error("No command was scheduled for event {0:?}")] NoCommandScheduledForEvent(HistoryEvent), #[error("Machine response {0:?} was not expected: {1}")] - UnexpectedMachineResponse(MachineResponse, &'static str), + UnexpectedMachineResponse(MachineResponse, String), #[error("Command was missing its associated machine: {0}")] MissingAssociatedMachine(String), #[error("There was {0} when we expected exactly one payload while applying event: {1:?}")] @@ -561,28 +562,10 @@ impl WorkflowMachines { .insert(CommandID::Timer(tid), timer.machine); self.current_wf_task_commands.push_back(timer); } - WFCommand::CancelTimer(attrs) => { - let m_key = - self.get_machine_key(&CommandID::Timer(attrs.timer_id.to_owned()))?; - let res = self.machine_mut(m_key).cancel()?; - // TODO: Dedupe with req cancel activity - for r in res { - match r { - MachineResponse::IssueNewCommand(c) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: c, - machine: m_key, - }) - } - v => { - return Err(WFMachinesError::UnexpectedMachineResponse( - v, - "When cancelling timer", - )); - } - } - } - } + WFCommand::CancelTimer(attrs) => self.process_cancellation( + &CommandID::Timer(attrs.timer_id.to_owned()), + &mut jobs, + )?, WFCommand::AddActivity(attrs) => { let aid = attrs.activity_id.clone(); let activity = self.add_new_machine(new_activity(attrs)); @@ -590,31 +573,10 @@ impl WorkflowMachines { .insert(CommandID::Activity(aid), activity.machine); self.current_wf_task_commands.push_back(activity); } - WFCommand::RequestCancelActivity(attrs) => { - let m_key = - self.get_machine_key(&CommandID::Activity(attrs.activity_id.to_owned()))?; - let res = self.machine_mut(m_key).cancel()?; - debug!(machine_responses = ?res, "Req cancel activity responses"); - for r in res { - match r { - MachineResponse::IssueNewCommand(c) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: c, - machine: m_key, - }) - } - MachineResponse::PushWFJob(j) => { - jobs.push(j); - } - v => { - return Err(WFMachinesError::UnexpectedMachineResponse( - v, - "When cancelling activity", - )); - } - } - } - } + WFCommand::RequestCancelActivity(attrs) => self.process_cancellation( + &CommandID::Activity(attrs.activity_id.to_owned()), + &mut jobs, + )?, WFCommand::CompleteWorkflow(attrs) => { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); @@ -629,6 +591,32 @@ impl WorkflowMachines { Ok(jobs) } + fn process_cancellation(&mut self, id: &CommandID, jobs: &mut Vec) -> Result<()> { + let m_key = self.get_machine_key(id)?; + let res = self.machine_mut(m_key).cancel()?; + debug!(machine_responses = ?res, format!("Req cancel {:?} responses", id)); + for r in res { + match r { + MachineResponse::IssueNewCommand(c) => { + self.current_wf_task_commands.push_back(CommandAndMachine { + command: c, + machine: m_key, + }) + } + MachineResponse::PushWFJob(j) => { + jobs.push(j); + } + v => { + return Err(WFMachinesError::UnexpectedMachineResponse( + v, + format!("When cancelling {:?}", id), + )); + } + } + } + Ok(()) + } + fn get_machine_key(&mut self, id: &CommandID) -> Result { Ok(*self.id_to_machine.get(id).ok_or_else(|| { WFMachinesError::MissingAssociatedMachine(format!( From 07d04c82d7b1eff6e8fa96d3139abae9a2b88022 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 5 Apr 2021 21:34:07 -0700 Subject: [PATCH 17/19] remove format --- src/machines/workflow_machines.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index f6b3f215e..b8ef6870f 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -594,7 +594,7 @@ impl WorkflowMachines { fn process_cancellation(&mut self, id: &CommandID, jobs: &mut Vec) -> Result<()> { let m_key = self.get_machine_key(id)?; let res = self.machine_mut(m_key).cancel()?; - debug!(machine_responses = ?res, format!("Req cancel {:?} responses", id)); + debug!(machine_responses = ?res, "Req cancel responses"); for r in res { match r { MachineResponse::IssueNewCommand(c) => { From 4674fbf71b898de4c75657ea09f7e81356d84548 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 6 Apr 2021 09:25:30 -0700 Subject: [PATCH 18/19] Fix histories / expectations and eliminate new job batch count param --- src/lib.rs | 34 ++++++++++++++++--------------- src/machines/test_help/mod.rs | 19 ++++++----------- src/test_help/canned_histories.rs | 8 ++------ 3 files changed, 26 insertions(+), 35 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 8da563362..01f15b520 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -558,7 +558,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_timer("fake_timer"); - build_fake_core(wfid, &mut t, hist_batches, None) + build_fake_core(wfid, &mut t, hist_batches) } #[fixture(hist_batches = &[])] @@ -566,7 +566,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_activity("fake_activity"); - build_fake_core(wfid, &mut t, hist_batches, None) + build_fake_core(wfid, &mut t, hist_batches) } #[fixture(hist_batches = &[])] @@ -574,7 +574,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::single_failed_activity("fake_activity"); - build_fake_core(wfid, &mut t, hist_batches, None) + build_fake_core(wfid, &mut t, hist_batches) } #[rstest] @@ -644,7 +644,7 @@ mod test { let timer_2_id = "timer2"; let mut t = canned_histories::parallel_timer(timer_1_id, timer_2_id); - let core = build_fake_core(wfid, &mut t, hist_batches, None); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -694,7 +694,7 @@ mod test { .await; } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))] #[tokio::test] async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; @@ -702,7 +702,7 @@ mod test { let cancel_timer_id = "cancel_timer"; let mut t = canned_histories::cancel_timer(timer_id, cancel_timer_id); - let core = build_fake_core(wfid, &mut t, hist_batches, None); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -739,7 +739,9 @@ mod test { .await; } - #[rstest(hist_batches, case::incremental(&[1, 2, 3]), case::replay(&[3]))] + // TODO: History doesn't go all the way through execution completed -- which is expected in + // real life anyway, but testing that might still be desirable + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) { let wfid = "fake_wf_id"; @@ -747,7 +749,7 @@ mod test { let signal_id = "signal"; let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id); - let core = build_fake_core(wfid, &mut t, hist_batches, Some(1)); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -781,7 +783,7 @@ mod test { .await; } - #[rstest(hist_batches, case::incremental(&[1, 2, 3]), case::replay(&[3]))] + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] #[tokio::test] async fn scheduled_activity_cancellation_abandon(hist_batches: &[usize]) { let wfid = "fake_wf_id"; @@ -789,7 +791,7 @@ mod test { let signal_id = "signal"; let mut t = canned_histories::cancel_scheduled_activity_abandon(activity_id, signal_id); - let core = build_fake_core(wfid, &mut t, hist_batches, Some(1)); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -834,7 +836,7 @@ mod test { activity_id, signal_id, ); - let core = build_fake_core(wfid, &mut t, hist_batches, None); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -898,7 +900,7 @@ mod test { let randomness_seed_from_start = AtomicU64::new(0); let mut t = canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, new_run_id); - let core = build_fake_core(wfid, &mut t, &[2], None); + let core = build_fake_core(wfid, &mut t, &[2]); poll_and_reply( &core, @@ -960,7 +962,7 @@ mod test { t.add_full_wf_task(); t.add_workflow_execution_completed(); - let core = build_fake_core(wfid, &mut t, hist_batches, None); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -995,7 +997,7 @@ mod test { let timer_id = "timer"; let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id); - let mut core = build_fake_core(wfid, &mut t, batches, None); + let mut core = build_fake_core(wfid, &mut t, batches); // Need to create an expectation that we will call a failure completion Arc::get_mut(&mut core.server_gateway) .unwrap() @@ -1042,7 +1044,7 @@ mod test { let timer_id = "timer1"; let mut t = canned_histories::single_timer(timer_id); - let core = build_fake_core(wfid, &mut t, hist_batches, None); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, @@ -1078,7 +1080,7 @@ mod test { let wfid = "fake_wf_id"; let mut t = canned_histories::two_signals("sig1", "sig2"); - let core = build_fake_core(wfid, &mut t, hist_batches, None); + let core = build_fake_core(wfid, &mut t, hist_batches); poll_and_reply( &core, diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index a46c3c501..eee2ebcfe 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -6,11 +6,11 @@ mod history_builder; pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver}; pub(crate) use history_builder::TestHistoryBuilder; -use crate::protos::coresdk::common::UserCodeFailure; use crate::{ pollers::MockServerGatewayApis, protos::{ coresdk::{ + common::UserCodeFailure, workflow_activation::WfActivation, workflow_commands::workflow_command, workflow_completion::{self, wf_activation_completion, WfActivationCompletion}, @@ -24,7 +24,6 @@ use crate::{ Core, CoreSDK, }; use rand::{thread_rng, Rng}; -use std::cmp; use std::collections::VecDeque; pub(crate) type FakeCore = CoreSDK; @@ -32,17 +31,13 @@ pub(crate) type FakeCore = CoreSDK; /// Given identifiers for a workflow/run, and a test history builder, construct an instance of /// the core SDK with a mock server gateway that will produce the responses as appropriate. /// -/// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. -/// For each number in the input list, a fake response will be prepared which includes history -/// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. -/// Optional `lang_job_batch_count` defines the number of direct lang activation batches produced by the underlying -/// state machine. Each lang job batch would save us a poll to the server as it is going to be queued in the -/// pending activations. +/// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. For +/// each number in the input list, a fake response will be prepared which includes history up to the +/// workflow task with that number, as in [TestHistoryBuilder::get_history_info]. pub(crate) fn build_fake_core( wf_id: &str, t: &mut TestHistoryBuilder, response_batches: &[usize], - lang_job_batch_count: Option, ) -> FakeCore { let run_id = t.get_orig_run_id(); let wf = Some(WorkflowExecution { @@ -68,10 +63,7 @@ pub(crate) fn build_fake_core( let mut mock_gateway = MockServerGatewayApis::new(); mock_gateway .expect_poll_workflow_task() - .times(cmp::max( - 1, // At least one poll from the server is always needed. - response_batches.len() - lang_job_batch_count.unwrap_or(0), - )) + .times(response_batches.len()) .returning(move |_| Ok(tasks.pop_front().unwrap())); // Response not really important here mock_gateway @@ -107,6 +99,7 @@ pub(crate) async fn poll_and_reply<'a>( run_id = res.run_id; } + dbg!(&reply); core.complete_workflow_task(WfActivationCompletion::from_status( task_tok, reply.clone(), diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 322587453..ed3e3c30d 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -264,8 +264,7 @@ pub fn single_failed_activity(activity_id: &str) -> TestHistoryBuilder { /// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED /// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED /// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED -/// 11: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED -/// 12: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 11: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -298,7 +297,7 @@ pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHist }, ), ); - t.add_workflow_task_scheduled_and_started(); + t.add_workflow_execution_completed(); t } @@ -311,8 +310,6 @@ pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHist /// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED /// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED /// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED -/// 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED -/// 11: EVENT_TYPE_WORKFLOW_TASK_STARTED pub fn cancel_scheduled_activity_abandon(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -336,7 +333,6 @@ pub fn cancel_scheduled_activity_abandon(activity_id: &str, signal_id: &str) -> }], ); t.add_full_wf_task(); - t.add_workflow_task_scheduled_and_started(); t } From c83fba23a631e381f94cfe8bdd1c1927321d105b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 6 Apr 2021 09:27:12 -0700 Subject: [PATCH 19/19] Fix comments I made --- src/machines/workflow_machines.rs | 2 +- tests/integ_tests/simple_wf_tests.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index b8ef6870f..6482ba983 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -594,7 +594,7 @@ impl WorkflowMachines { fn process_cancellation(&mut self, id: &CommandID, jobs: &mut Vec) -> Result<()> { let m_key = self.get_machine_key(id)?; let res = self.machine_mut(m_key).cancel()?; - debug!(machine_responses = ?res, "Req cancel responses"); + debug!(machine_responses = ?res, cmd_id = ?id, "Req cancel responses"); for r in res { match r { MachineResponse::IssueNewCommand(c) => { diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index e32fe7476..b642afe7a 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -456,7 +456,7 @@ async fn activity_cancellation_wait_cancellation_completed() { .await .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to - // complete it in this test as activity is try-cancelled. + // complete it in this test as activity is wait-cancelled. let activity_task = core.poll_activity_task(task_q).await.unwrap(); assert_matches!( activity_task.variant, @@ -531,7 +531,7 @@ async fn activity_cancellation_abandon() { .await .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to - // complete it in this test as activity is try-cancelled. + // complete it in this test as activity is abandoned. let activity_task = core.poll_activity_task(task_q).await.unwrap(); assert_matches!( activity_task.variant,