diff --git a/protos/local/workflow_commands.proto b/protos/local/workflow_commands.proto index 1d65d1839..2de86c7c3 100644 --- a/protos/local/workflow_commands.proto +++ b/protos/local/workflow_commands.proto @@ -70,10 +70,24 @@ message ScheduleActivity { /// configuration. Retries are happening up to schedule_to_close_timeout. To disable retries set /// retry_policy.maximum_attempts to 1. common.RetryPolicy retry_policy = 11; + /// Defines behaviour of the underlying workflow when activity cancellation has been requested. + ActivityCancellationType cancellation_type = 12; +} + +enum ActivityCancellationType { + /// Initiate a cancellation request and immediately report cancellation to the workflow. + TRY_CANCEL = 0; + /// Wait for activity cancellation completion. Note that activity must heartbeat to receive a + /// cancellation notification. This can block the cancellation for a long time if activity doesn't + /// heartbeat or chooses to ignore the cancellation request. + WAIT_CANCELLATION_COMPLETED = 1; + /// Do not request cancellation of the activity and immediately report cancellation to the workflow + ABANDON = 2; } message RequestCancelActivity { string activity_id = 1; + int64 scheduled_event_id = 2; } message QueryResult { @@ -87,12 +101,6 @@ message QuerySuccess { common.Payload response = 1; } -/// Request cancellation of an activity from a workflow -message RequestActivityCancellation { - string activity_id = 1; - string reason = 2; -} - /// Issued when the workflow completes successfully message CompleteWorkflowExecution { common.Payload result = 1; diff --git a/src/lib.rs b/src/lib.rs index 11e976bfe..01f15b520 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,9 +30,10 @@ pub use core_tracing::tracing_init; pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; +use crate::workflow::PushCommandsResult; use crate::{ errors::{ShutdownErr, WorkflowUpdateError}, - machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand}, + machines::{EmptyWorkflowCommandErr, WFCommand}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ @@ -403,13 +404,19 @@ impl CoreSDK { .to_owned(), completion: None, })?; - let commands = self.push_lang_commands(&run_id, cmds)?; + let push_result = self.push_lang_commands(&run_id, cmds)?; + if push_result.has_new_lang_jobs { + self.pending_activations.push(PendingActivation { + run_id: run_id.to_string(), + task_token: task_token.clone(), + }); + } // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught // up on replay. if !self.pending_activations.has_pending(&run_id) { self.server_gateway - .complete_workflow_task(task_token, commands) + .complete_workflow_task(task_token, push_result.server_commands) .await .map_err(|ts| { if ts.code() == tonic::Code::InvalidArgument @@ -484,7 +491,7 @@ impl CoreSDK { &self, run_id: &str, cmds: Vec, - ) -> Result, WorkflowUpdateError> { + ) -> Result { self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } @@ -516,6 +523,9 @@ impl CoreSDK { #[cfg(test)] mod test { use super::*; + use crate::protos::coresdk::workflow_commands::{ + ActivityCancellationType, RequestCancelActivity, + }; use crate::{ machines::test_help::{ build_fake_core, gen_assert_and_fail, gen_assert_and_reply, poll_and_reply, FakeCore, @@ -684,7 +694,7 @@ mod test { .await; } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(hist_batches, case::incremental(&[1, 3]), case::replay(&[3]))] #[tokio::test] async fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; @@ -729,6 +739,143 @@ mod test { .await; } + // TODO: History doesn't go all the way through execution completed -- which is expected in + // real life anyway, but testing that might still be desirable + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[tokio::test] + async fn scheduled_activity_cancellation_try_cancel(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let activity_id = "fake_activity"; + let signal_id = "signal"; + + let mut t = canned_histories::cancel_scheduled_activity(activity_id, signal_id); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: activity_id.to_string(), + cancellation_type: ActivityCancellationType::TryCancel as i32, + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + // Activity is getting resolved right away as we are in the TryCancel mode. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[tokio::test] + async fn scheduled_activity_cancellation_abandon(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let activity_id = "fake_activity"; + let signal_id = "signal"; + + let mut t = canned_histories::cancel_scheduled_activity_abandon(activity_id, signal_id); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: activity_id.to_string(), + cancellation_type: ActivityCancellationType::Abandon as i32, + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + // Activity is getting resolved right away as we are in the Abandon mode. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + + #[rstest(hist_batches, case::incremental(&[1, 2, 3, 4]), case::replay(&[4]))] + #[tokio::test] + async fn scheduled_activity_cancellation_wait_for_cancellation(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let activity_id = "fake_activity"; + let signal_id = "signal"; + + let mut t = canned_histories::cancel_scheduled_activity_with_activity_task_cancel( + activity_id, + signal_id, + ); + let core = build_fake_core(wfid, &mut t, hist_batches); + + poll_and_reply( + &core, + TASK_Q, + false, + &[ + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::StartWorkflow(_)), + vec![ScheduleActivity { + activity_id: activity_id.to_string(), + cancellation_type: ActivityCancellationType::WaitCancellationCompleted + as i32, + ..Default::default() + } + .into()], + ), + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + } + .into()], + ), + // Making sure that activity is not resolved until it's cancelled. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::SignalWorkflow(_)), + vec![], + ), + // Now ActivityTaskCanceled has been processed and activity can be resolved. + gen_assert_and_reply( + &job_assert!(wf_activation_job::Variant::ResolveActivity(_)), + vec![CompleteWorkflowExecution { result: None }.into()], + ), + ], + ) + .await; + } + #[rstest(single_timer_setup(&[1]))] #[tokio::test] async fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) { diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 431f4e53c..6b8ca02c2 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,7 +1,15 @@ #![allow(clippy::large_enum_variant)] +use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; +use crate::protos::coresdk::workflow_commands::{ActivityCancellationType, RequestCancelActivity}; use crate::protos::coresdk::PayloadsExt; -use crate::protos::temporal::api::failure::v1::Failure; +use crate::protos::temporal::api::common::v1::ActivityType; +use crate::protos::temporal::api::enums::v1::RetryState; +use crate::protos::temporal::api::enums::v1::RetryState::CancelRequested; +use crate::protos::temporal::api::failure::v1::failure::FailureInfo; +use crate::protos::temporal::api::failure::v1::{ + failure, ActivityFailureInfo, CanceledFailureInfo, Failure, +}; use crate::protos::temporal::api::history::v1::ActivityTaskCanceledEventAttributes; use crate::{ machines::{ @@ -13,7 +21,9 @@ use crate::{ coresdk::{ activity_result::{self as ar, activity_result, ActivityResult}, activity_task, - workflow_activation::ResolveActivity, + workflow_activation::{ + wf_activation_job, ResolveActivity, StartWorkflow, WfActivationJob, + }, }, temporal::api::{ command::v1::Command, @@ -41,28 +51,30 @@ fsm! { ScheduleCommandCreated --(CommandScheduleActivityTask) --> ScheduleCommandCreated; ScheduleCommandCreated - --(ActivityTaskScheduled, on_activity_task_scheduled) --> ScheduledEventRecorded; - ScheduleCommandCreated --(Cancel, on_canceled) --> Canceled; + --(ActivityTaskScheduled(i64), shared on_activity_task_scheduled) --> ScheduledEventRecorded; + ScheduleCommandCreated --(Cancel, shared on_canceled) --> Canceled; - ScheduledEventRecorded --(ActivityTaskStarted, on_task_started) --> Started; + ScheduledEventRecorded --(ActivityTaskStarted(i64), shared on_task_started) --> Started; ScheduledEventRecorded --(ActivityTaskTimedOut, on_task_timed_out) --> TimedOut; - ScheduledEventRecorded --(Cancel, on_canceled) --> ScheduledActivityCancelCommandCreated; + ScheduledEventRecorded --(Cancel, shared on_canceled) --> ScheduledActivityCancelCommandCreated; + ScheduledEventRecorded --(Abandon, shared on_abandoned) --> Canceled; Started --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; Started --(ActivityTaskFailed(ActivityTaskFailedEventAttributes), on_activity_task_failed) --> Failed; Started --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; - Started --(Cancel, on_canceled) --> StartedActivityCancelCommandCreated; + Started --(Cancel, shared on_canceled) --> StartedActivityCancelCommandCreated; + Started --(Abandon, shared on_abandoned) --> Canceled; ScheduledActivityCancelCommandCreated --(CommandRequestCancelActivityTask, - on_command_request_cancel_activity_task) --> ScheduledActivityCancelCommandCreated; + shared on_command_request_cancel_activity_task) --> ScheduledActivityCancelCommandCreated; ScheduledActivityCancelCommandCreated --(ActivityTaskCancelRequested) --> ScheduledActivityCancelEventRecorded; ScheduledActivityCancelEventRecorded - --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; + --(ActivityTaskCanceled(ActivityTaskCanceledEventAttributes), shared on_activity_task_canceled) --> Canceled; ScheduledActivityCancelEventRecorded - --(ActivityTaskStarted) --> StartedActivityCancelEventRecorded; + --(ActivityTaskStarted(i64)) --> StartedActivityCancelEventRecorded; ScheduledActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; @@ -70,7 +82,7 @@ fsm! { --(CommandRequestCancelActivityTask) --> StartedActivityCancelCommandCreated; StartedActivityCancelCommandCreated --(ActivityTaskCancelRequested, - on_activity_task_cancel_requested) --> StartedActivityCancelEventRecorded; + shared on_activity_task_cancel_requested) --> StartedActivityCancelEventRecorded; StartedActivityCancelEventRecorded --(ActivityTaskFailed(ActivityTaskFailedEventAttributes), on_activity_task_failed) --> Failed; StartedActivityCancelEventRecorded @@ -78,7 +90,7 @@ fsm! { StartedActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; StartedActivityCancelEventRecorded - --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; + --(ActivityTaskCanceled(ActivityTaskCanceledEventAttributes), shared on_activity_task_canceled) --> Canceled; } #[derive(Debug, derive_more::Display)] @@ -87,28 +99,8 @@ pub(super) enum ActivityMachineCommand { Complete(Option), #[display(fmt = "Fail")] Fail(Option), - #[display(fmt = "Cancel")] - Cancel(Option), -} - -#[derive(Debug, Clone, derive_more::Display)] -pub(super) enum ActivityCancellationType { - /// Wait for activity cancellation completion. Note that activity must heartbeat to receive a - /// cancellation notification. This can block the cancellation for a long time if activity doesn't - /// heartbeat or chooses to ignore the cancellation request. - WaitCancellationCompleted, - - /// Initiate a cancellation request and immediately report cancellation to the workflow. - TryCancel, - - /// Do not request cancellation of the activity and immediately report cancellation to the workflow - Abandon, -} - -impl Default for ActivityCancellationType { - fn default() -> Self { - ActivityCancellationType::TryCancel - } + #[display(fmt = "RequestCancellation")] + RequestCancellation(Command), } /// Creates a new activity state machine and a command to schedule it on the server. @@ -126,8 +118,10 @@ impl ActivityMachine { let mut s = Self { state: Created {}.into(), shared_state: SharedState { + cancellation_type: ActivityCancellationType::from_i32(attribs.cancellation_type) + .unwrap(), attrs: attribs, - cancellation_type: ActivityCancellationType::TryCancel, + ..Default::default() }, }; s.on_event_mut(ActivityMachineEvents::Schedule) @@ -138,6 +132,25 @@ impl ActivityMachine { }; (s, cmd) } + + fn machine_responses_from_cancel_request(&self, cancel_cmd: Command) -> Vec { + let mut r = vec![MachineResponse::IssueNewCommand(cancel_cmd)]; + if self.shared_state.cancellation_type + != ActivityCancellationType::WaitCancellationCompleted + { + r.push(MachineResponse::PushWFJob(Variant::ResolveActivity( + ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Canceled(ar::Cancelation { + details: None, + })), + }), + }, + ))) + } + r + } } impl TryFrom for ActivityMachineEvents { @@ -145,8 +158,8 @@ impl TryFrom for ActivityMachineEvents { fn try_from(e: HistoryEvent) -> Result { Ok(match EventType::from_i32(e.event_type) { - Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled, - Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted, + Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled(e.event_id), + Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted(e.event_id), Some(EventType::ActivityTaskCompleted) => { if let Some(history_event::Attributes::ActivityTaskCompletedEventAttributes( attrs, @@ -168,13 +181,24 @@ impl TryFrom for ActivityMachineEvents { } else { return Err(WFMachinesError::MalformedEvent( e, - "Activity completion attributes were unset".to_string(), + "Activity failure attributes were unset".to_string(), )); } } Some(EventType::ActivityTaskTimedOut) => Self::ActivityTaskTimedOut, Some(EventType::ActivityTaskCancelRequested) => Self::ActivityTaskCancelRequested, - Some(EventType::ActivityTaskCanceled) => Self::ActivityTaskCanceled, + Some(EventType::ActivityTaskCanceled) => { + if let Some(history_event::Attributes::ActivityTaskCanceledEventAttributes(attrs)) = + e.attributes + { + Self::ActivityTaskCanceled(attrs) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Activity cancellation attributes were unset".to_string(), + )); + } + } _ => { return Err(WFMachinesError::UnexpectedEvent( e, @@ -185,18 +209,6 @@ impl TryFrom for ActivityMachineEvents { } } -impl TryFrom for ActivityMachineEvents { - type Error = (); - - fn try_from(c: CommandType) -> Result { - Ok(match c { - CommandType::ScheduleActivityTask => Self::CommandScheduleActivityTask, - CommandType::RequestCancelActivityTask => Self::CommandRequestCancelActivityTask, - _ => return Err(()), - }) - } -} - impl WFMachinesAdapter for ActivityMachine { fn adapt_response( &self, @@ -218,34 +230,79 @@ impl WFMachinesAdapter for ActivityMachine { } .into()] } - ActivityMachineCommand::Fail(failure) => vec![ResolveActivity { - activity_id: self.shared_state.attrs.activity_id.clone(), - result: Some(ActivityResult { - status: Some(activity_result::Status::Failed(ar::Failure { - failure: failure.map(Into::into), - })), - }), + ActivityMachineCommand::Fail(failure) => { + vec![ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(ar::Failure { + failure: failure.map(Into::into), + })), + }), + } + .into()] + } + ActivityMachineCommand::RequestCancellation(c) => { + self.machine_responses_from_cancel_request(c) } - .into()], - ActivityMachineCommand::Cancel(_) => unimplemented!(), + }) + } +} + +impl TryFrom for ActivityMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::ScheduleActivityTask => Self::CommandScheduleActivityTask, + CommandType::RequestCancelActivityTask => Self::CommandRequestCancelActivityTask, + _ => return Err(()), }) } } impl Cancellable for ActivityMachine { - fn cancel(&mut self) -> Result> { - unimplemented!() + fn cancel(&mut self) -> Result, MachineError> { + let event = match self.shared_state.cancellation_type { + ActivityCancellationType::Abandon => ActivityMachineEvents::Abandon, + _ => ActivityMachineEvents::Cancel, + }; + let vec = self.on_event_mut(event)?; + let res = vec + .into_iter() + .flat_map(|amc| match amc { + ActivityMachineCommand::RequestCancellation(cmd) => { + self.machine_responses_from_cancel_request(cmd) + } + ActivityMachineCommand::Fail(failure) => { + vec![MachineResponse::PushWFJob(Variant::ResolveActivity( + ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(ar::Failure { + failure: failure.map(Into::into), + })), + }), + }, + ))] + } + x => panic!("Invalid cancel event response {:?}", x), + }) + .collect(); + Ok(res) } fn was_cancelled_before_sent_to_server(&self) -> bool { - false // TODO return cancellation flag from the shared state + self.shared_state().cancelled_before_sent } } #[derive(Default, Clone)] pub(super) struct SharedState { + scheduled_event_id: i64, + started_event_id: i64, attrs: ScheduleActivity, cancellation_type: ActivityCancellationType, + cancelled_before_sent: bool, } #[derive(Default, Clone)] @@ -262,14 +319,31 @@ impl Created { pub(super) struct ScheduleCommandCreated {} impl ScheduleCommandCreated { - pub(super) fn on_activity_task_scheduled(self) -> ActivityMachineTransition { - // set initial command event id - // this.initialCommandEventId = currentEvent.getEventId(); - ActivityMachineTransition::default::() + pub(super) fn on_activity_task_scheduled( + self, + dat: SharedState, + scheduled_event_id: i64, + ) -> ActivityMachineTransition { + ActivityMachineTransition::ok_shared( + vec![], + ScheduledEventRecorded::default(), + SharedState { + scheduled_event_id, + ..dat + }, + ) } - pub(super) fn on_canceled(self) -> ActivityMachineTransition { - // cancelCommandNotifyCanceled - ActivityMachineTransition::default::() + pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { + let canceled_state = SharedState { + cancelled_before_sent: true, + ..dat + }; + match dat.cancellation_type { + ActivityCancellationType::Abandon => { + ActivityMachineTransition::ok_shared(vec![], Canceled::default(), canceled_state) + } + _ => notify_lang_activity_cancelled(canceled_state, None, Canceled::default().into()), + } } } @@ -277,17 +351,31 @@ impl ScheduleCommandCreated { pub(super) struct ScheduledEventRecorded {} impl ScheduledEventRecorded { - pub(super) fn on_task_started(self) -> ActivityMachineTransition { - // setStartedCommandEventId - ActivityMachineTransition::default::() + pub(super) fn on_task_started( + self, + dat: SharedState, + started_event_id: i64, + ) -> ActivityMachineTransition { + ActivityMachineTransition::ok_shared( + vec![], + Started::default(), + SharedState { + started_event_id, + ..dat + }, + ) } pub(super) fn on_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } - pub(super) fn on_canceled(self) -> ActivityMachineTransition { - // createRequestCancelActivityTaskCommand - ActivityMachineTransition::default::() + pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { + create_request_cancel_activity_task_command( + dat, + ScheduledActivityCancelCommandCreated::default().into(), + ) + } + pub(super) fn on_abandoned(self, dat: SharedState) -> ActivityMachineTransition { + notify_lang_activity_cancelled(dat, None, Canceled::default().into()) } } @@ -299,7 +387,6 @@ impl Started { self, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { - // notify_completed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Complete(attrs.result)], Completed::default(), @@ -309,19 +396,22 @@ impl Started { self, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { - // notify_failed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Fail(attrs.failure)], Completed::default(), ) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } - pub(super) fn on_canceled(self) -> ActivityMachineTransition { - // createRequestCancelActivityTaskCommand - ActivityMachineTransition::default::() + pub(super) fn on_canceled(self, dat: SharedState) -> ActivityMachineTransition { + create_request_cancel_activity_task_command( + dat, + StartedActivityCancelCommandCreated::default().into(), + ) + } + pub(super) fn on_abandoned(self, dat: SharedState) -> ActivityMachineTransition { + notify_lang_activity_cancelled(dat, None, Canceled::default().into()) } } @@ -329,9 +419,18 @@ impl Started { pub(super) struct ScheduledActivityCancelCommandCreated {} impl ScheduledActivityCancelCommandCreated { - pub(super) fn on_command_request_cancel_activity_task(self) -> ActivityMachineTransition { - // notifyCanceledIfTryCancel - ActivityMachineTransition::default::() + pub(super) fn on_command_request_cancel_activity_task( + self, + dat: SharedState, + ) -> ActivityMachineTransition { + match dat.cancellation_type { + ActivityCancellationType::TryCancel => notify_lang_activity_cancelled( + dat, + None, + ScheduledActivityCancelCommandCreated::default().into(), + ), + _ => ActivityMachineTransition::default::(), + } } } @@ -339,12 +438,15 @@ impl ScheduledActivityCancelCommandCreated { pub(super) struct ScheduledActivityCancelEventRecorded {} impl ScheduledActivityCancelEventRecorded { - pub(super) fn on_activity_task_canceled(self) -> ActivityMachineTransition { - // notify_canceled - ActivityMachineTransition::default::() + pub(super) fn on_activity_task_canceled( + self, + dat: SharedState, + _attrs: ActivityTaskCanceledEventAttributes, + ) -> ActivityMachineTransition { + notify_lang_activity_cancelled(dat, None, Canceled::default().into()) } + pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } } @@ -359,9 +461,18 @@ impl From for ScheduledActivityCancelEven pub(super) struct StartedActivityCancelCommandCreated {} impl StartedActivityCancelCommandCreated { - pub(super) fn on_activity_task_cancel_requested(self) -> ActivityMachineTransition { - // notifyCanceledIfTryCancel - ActivityMachineTransition::default::() + pub(super) fn on_activity_task_cancel_requested( + self, + dat: SharedState, + ) -> ActivityMachineTransition { + match dat.cancellation_type { + ActivityCancellationType::TryCancel => notify_lang_activity_cancelled( + dat, + None, + StartedActivityCancelEventRecorded::default().into(), + ), + _ => ActivityMachineTransition::default::(), + } } } @@ -373,7 +484,6 @@ impl StartedActivityCancelEventRecorded { self, attrs: ActivityTaskCompletedEventAttributes, ) -> ActivityMachineTransition { - // notify_completed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Complete(attrs.result)], Completed::default(), @@ -383,19 +493,25 @@ impl StartedActivityCancelEventRecorded { self, attrs: ActivityTaskFailedEventAttributes, ) -> ActivityMachineTransition { - // notify_failed ActivityMachineTransition::ok( vec![ActivityMachineCommand::Fail(attrs.failure)], Completed::default(), ) } pub(super) fn on_activity_task_timed_out(self) -> ActivityMachineTransition { - // notify_timed_out ActivityMachineTransition::default::() } - pub(super) fn on_activity_task_canceled(self) -> ActivityMachineTransition { - // notifyCancellationFromEvent - ActivityMachineTransition::default::() + pub(super) fn on_activity_task_canceled( + self, + dat: SharedState, + attrs: ActivityTaskCanceledEventAttributes, + ) -> ActivityMachineTransition { + match dat.cancellation_type { + ActivityCancellationType::WaitCancellationCompleted => { + notify_lang_activity_cancelled(dat, Some(attrs), Canceled::default().into()) + } + _ => ActivityMachineTransition::ok(vec![], Canceled::default()), + } } } @@ -405,6 +521,18 @@ impl From for StartedActivityCancelEventRe } } +impl From for Canceled { + fn from(_: ScheduledEventRecorded) -> Self { + Self::default() + } +} + +impl From for Canceled { + fn from(_: Started) -> Self { + Self::default() + } +} + #[derive(Default, Clone)] pub(super) struct Completed {} @@ -417,9 +545,83 @@ pub(super) struct TimedOut {} #[derive(Default, Clone)] pub(super) struct Canceled {} +fn create_request_cancel_activity_task_command( + dat: SharedState, + next_state: ActivityMachineState, +) -> TransitionResult { + let cmd = Command { + command_type: CommandType::RequestCancelActivityTask as i32, + attributes: Some( + RequestCancelActivity { + scheduled_event_id: dat.scheduled_event_id, + activity_id: dat.attrs.activity_id, + } + .into(), + ), + }; + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::RequestCancellation(cmd)], + next_state, + ) +} + +/// Notifies lang side that activity has been cancelled by sending a failure with cancelled failure as a cause. +/// Optional cancelled_event, if passed, is used to supply event IDs. +/// State machine will transition into the `next_state` provided as a parameter. +fn notify_lang_activity_cancelled( + dat: SharedState, + canceled_event: Option, + next_state: ActivityMachineState, +) -> TransitionResult { + let (scheduled_event_id, started_event_id) = match canceled_event { + None => (dat.scheduled_event_id, dat.started_event_id), + Some(e) => (e.scheduled_event_id, e.scheduled_event_id), + }; + ActivityMachineTransition::ok_shared( + vec![ActivityMachineCommand::Fail(Some( + new_cancellation_failure(&dat, scheduled_event_id, started_event_id), + ))], + next_state, + dat, + ) +} + +fn new_cancellation_failure( + dat: &SharedState, + scheduled_event_id: i64, + started_event_id: i64, +) -> Failure { + let cancelled_failure = Failure { + source: "CoreSDK".to_string(), + failure_info: Some(FailureInfo::CanceledFailureInfo(CanceledFailureInfo { + details: None, + })), + ..Default::default() + }; + let activity_failure_info = ActivityFailureInfo { + activity_id: dat.attrs.activity_id.to_string(), + activity_type: Some(ActivityType { + name: dat.attrs.activity_type.to_string(), + }), + scheduled_event_id, + started_event_id, + identity: "workflow".to_string(), + retry_state: RetryState::Unspecified as i32, + }; + Failure { + message: "Activity canceled".to_string(), + cause: Some(Box::new(cancelled_failure)), + failure_info: Some(failure::FailureInfo::ActivityFailureInfo( + activity_failure_info, + )), + ..Default::default() + } +} + #[cfg(test)] mod test { use super::*; + use crate::protos::coresdk::workflow_activation::WfActivation; use crate::{ machines::{ test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}, @@ -433,7 +635,7 @@ mod test { #[fixture] fn activity_happy_hist() -> (TestHistoryBuilder, WorkflowMachines) { - let twd = activity_workflow_driver(); + let twd = activity_workflow_driver("activity-id-1"); let t = canned_histories::single_activity("activity-id-1"); let state_machines = WorkflowMachines::new( "wfid".to_string(), @@ -447,7 +649,7 @@ mod test { #[fixture] fn activity_failure_hist() -> (TestHistoryBuilder, WorkflowMachines) { - let twd = activity_workflow_driver(); + let twd = activity_workflow_driver("activity-id-1"); let t = canned_histories::single_failed_activity("activity-id-1"); let state_machines = WorkflowMachines::new( "wfid".to_string(), @@ -459,10 +661,10 @@ mod test { (t, state_machines) } - fn activity_workflow_driver() -> TestWorkflowDriver { + fn activity_workflow_driver(activity_id: &'static str) -> TestWorkflowDriver { TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { let activity = ScheduleActivity { - activity_id: "activity-id-1".to_string(), + activity_id: activity_id.to_string(), ..Default::default() }; command_sink.activity(activity).await; @@ -516,4 +718,55 @@ mod test { CommandType::CompleteWorkflowExecution as i32 ); } + + #[test] + fn immediate_activity_cancelation() { + let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { + let cancel_activity_future = cmd_sink.activity(ScheduleActivity { + activity_id: "activity-id-1".to_string(), + ..Default::default() + }); + // Immediately cancel the activity + cmd_sink.cancel_activity("activity-id-1"); + cancel_activity_future.await; + + let complete = CompleteWorkflowExecution::default(); + cmd_sink.send(complete.into()); + }); + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_workflow_execution_completed(); + + let mut state_machines = WorkflowMachines::new( + "wfid".to_string(), + "runid".to_string(), + Box::new(twd).into(), + ); + + let commands = t + .handle_workflow_task_take_cmds(&mut state_machines, None) + .unwrap(); + assert_eq!(commands.len(), 0); + let activation = state_machines.get_wf_activation().unwrap(); + assert_matches!( + activation.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity( + ResolveActivity { + activity_id, + result: Some(ActivityResult { + status: Some(activity_result::Status::Failed(_)) + }) + } + )), + }, + ] + ) + } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index bdcdf173d..7a7620557 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -109,7 +109,7 @@ trait TemporalStateMachine: CheckStateMachineInFinal + Send { ) -> Result, WFMachinesError>; /// Attempt to cancel the command associated with this state machine, if it is cancellable - fn cancel(&mut self) -> Result; + fn cancel(&mut self) -> Result, WFMachinesError>; /// Should return true if the command was cancelled before we sent it to the server. Always /// returns false for non-cancellable machines @@ -188,7 +188,7 @@ where } } - fn cancel(&mut self) -> Result { + fn cancel(&mut self) -> Result, WFMachinesError> { let res = self.cancel(); res.map_err(|e| match e { MachineError::InvalidTransition => { @@ -239,7 +239,7 @@ trait Cancellable: StateMachine { /// # Panics /// * If the machine is not cancellable. It's a logic error on our part to call it on such /// machines. - fn cancel(&mut self) -> Result> { + fn cancel(&mut self) -> Result, MachineError> { // It's a logic error on our part if this is ever called on a machine that can't actually // be cancelled panic!("Machine {} cannot be cancelled", self.name()) diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 7f7784ed6..7f6347c46 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,6 +1,6 @@ use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; use crate::protos::coresdk::workflow_activation::ResolveActivity; -use crate::protos::coresdk::workflow_commands::ScheduleActivity; +use crate::protos::coresdk::workflow_commands::{RequestCancelActivity, ScheduleActivity}; use crate::{ machines::{workflow_machines::CommandID, WFCommand}, protos::coresdk::{ @@ -55,6 +55,13 @@ impl TestWfDriverCache { bc.issued_commands.remove(&CommandID::Timer(id.to_owned())); } + /// Cancel activity by ID. + fn cancel_activity(&self, id: &str) { + let mut bc = self.blocking_condvar.0.lock(); + bc.issued_commands + .remove(&CommandID::Activity(id.to_owned())); + } + /// Track a new command that the wf has sent down the command sink. The command starts in /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed fn add_sent_cmd(&self, id: CommandID) -> oneshot::Receiver<()> { @@ -157,6 +164,16 @@ impl CommandSender { self.twd_cache.cancel_timer(timer_id); self.send(c); } + + /// Cancel activity + pub fn cancel_activity(&self, activity_id: &str) { + let c = WFCommand::RequestCancelActivity(RequestCancelActivity { + activity_id: activity_id.to_string(), + ..Default::default() + }); + self.twd_cache.cancel_activity(activity_id); + self.send(c); + } } impl TestWorkflowDriver { diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 49939da51..eee2ebcfe 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -6,11 +6,11 @@ mod history_builder; pub(super) use async_workflow_driver::{CommandSender, TestWorkflowDriver}; pub(crate) use history_builder::TestHistoryBuilder; -use crate::protos::coresdk::common::UserCodeFailure; use crate::{ pollers::MockServerGatewayApis, protos::{ coresdk::{ + common::UserCodeFailure, workflow_activation::WfActivation, workflow_commands::workflow_command, workflow_completion::{self, wf_activation_completion, WfActivationCompletion}, @@ -31,9 +31,9 @@ pub(crate) type FakeCore = CoreSDK; /// Given identifiers for a workflow/run, and a test history builder, construct an instance of /// the core SDK with a mock server gateway that will produce the responses as appropriate. /// -/// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. -/// For each number in the input list, a fake response will be prepared which includes history -/// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. +/// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. For +/// each number in the input list, a fake response will be prepared which includes history up to the +/// workflow task with that number, as in [TestHistoryBuilder::get_history_info]. pub(crate) fn build_fake_core( wf_id: &str, t: &mut TestHistoryBuilder, @@ -99,6 +99,7 @@ pub(crate) async fn poll_and_reply<'a>( run_id = res.run_id; } + dbg!(&reply); core.complete_workflow_task(WfActivationCompletion::from_status( task_tok, reply.clone(), diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index f12c1bec7..a51cd4a29 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -245,10 +245,12 @@ impl WFMachinesAdapter for TimerMachine { } impl Cancellable for TimerMachine { - fn cancel(&mut self) -> Result> { + fn cancel(&mut self) -> Result, MachineError> { Ok(match self.on_event_mut(TimerMachineEvents::Cancel)?.pop() { - Some(TimerMachineCommand::IssueCancelCmd(cmd)) => MachineResponse::IssueNewCommand(cmd), - Some(TimerMachineCommand::Canceled) => MachineResponse::NoOp, + Some(TimerMachineCommand::IssueCancelCmd(cmd)) => { + vec![MachineResponse::IssueNewCommand(cmd)] + } + Some(TimerMachineCommand::Canceled) => vec![], x => panic!("Invalid cancel event response {:?}", x), }) } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index fd48894d6..6482ba983 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::protos::coresdk::workflow_activation::wf_activation_job::Variant; use crate::protos::coresdk::PayloadsToPayloadError; use crate::{ core_tracing::VecDisplayer, @@ -108,7 +109,6 @@ pub enum MachineResponse { UpdateRunIdOnWorkflowReset { run_id: String, }, - NoOp, } #[derive(thiserror::Error, Debug)] @@ -131,7 +131,7 @@ pub enum WFMachinesError { #[error("No command was scheduled for event {0:?}")] NoCommandScheduledForEvent(HistoryEvent), #[error("Machine response {0:?} was not expected: {1}")] - UnexpectedMachineResponse(MachineResponse, &'static str), + UnexpectedMachineResponse(MachineResponse, String), #[error("Command was missing its associated machine: {0}")] MissingAssociatedMachine(String), #[error("There was {0} when we expected exactly one payload while applying event: {1:?}")] @@ -139,6 +139,8 @@ pub enum WFMachinesError { #[error("Machine encountered an invalid transition: {0}")] InvalidTransition(&'static str), + #[error("Invalid cancelation type: {0}")] + InvalidCancelationType(i32), } impl WorkflowMachines { @@ -442,11 +444,17 @@ impl WorkflowMachines { /// Iterate the state machines, which consists of grabbing any pending outgoing commands from /// the workflow, handling them, and preparing them to be sent off to the server. - pub(crate) fn iterate_machines(&mut self) -> Result<()> { + /// Returns a boolean flag which indicates whether or not new activations were produced by the state + /// machine. If true, pending activation should be created by the caller making jobs available to the lang side. + pub(crate) fn iterate_machines(&mut self) -> Result { let results = self.drive_me.fetch_workflow_iteration_output(); - self.handle_driven_results(results)?; + let jobs = self.handle_driven_results(results)?; + let has_new_lang_jobs = !jobs.is_empty(); + for job in jobs.into_iter() { + self.drive_me.send_job(job); + } self.prepare_commands()?; - Ok(()) + Ok(has_new_lang_jobs) } /// Apply events from history to this machines instance @@ -527,7 +535,6 @@ impl WorkflowMachines { }, )); } - MachineResponse::NoOp => (), MachineResponse::IssueNewCommand(_) => { panic!("Issue new command machine response not expected here") } @@ -536,7 +543,16 @@ impl WorkflowMachines { Ok(()) } - fn handle_driven_results(&mut self, results: Vec) -> Result<()> { + /// Handles results of the workflow activation, delegating work to the appropriate state machine. + /// Returns a list of workflow jobs that should be queued in the pending activation for the next poll. + /// This list will be populated only if state machine produced lang activations as part of command processing. + /// For example some types of activity cancellation need to immediately unblock lang side without + /// having it to poll for an actual workflow task from the server. + fn handle_driven_results( + &mut self, + results: Vec, + ) -> Result> { + let mut jobs = vec![]; for cmd in results { match cmd { WFCommand::AddTimer(attrs) => { @@ -546,33 +562,10 @@ impl WorkflowMachines { .insert(CommandID::Timer(tid), timer.machine); self.current_wf_task_commands.push_back(timer); } - WFCommand::CancelTimer(attrs) => { - let mkey = *self - .id_to_machine - .get(&CommandID::Timer(attrs.timer_id.to_owned())) - .ok_or_else(|| { - WFMachinesError::MissingAssociatedMachine(format!( - "Missing associated machine for cancelling timer {}", - &attrs.timer_id - )) - })?; - let res = self.machine_mut(mkey).cancel()?; - match res { - MachineResponse::IssueNewCommand(c) => { - self.current_wf_task_commands.push_back(CommandAndMachine { - command: c, - machine: mkey, - }) - } - MachineResponse::NoOp => {} - v => { - return Err(WFMachinesError::UnexpectedMachineResponse( - v, - "When cancelling timer", - )); - } - } - } + WFCommand::CancelTimer(attrs) => self.process_cancellation( + &CommandID::Timer(attrs.timer_id.to_owned()), + &mut jobs, + )?, WFCommand::AddActivity(attrs) => { let aid = attrs.activity_id.clone(); let activity = self.add_new_machine(new_activity(attrs)); @@ -580,7 +573,10 @@ impl WorkflowMachines { .insert(CommandID::Activity(aid), activity.machine); self.current_wf_task_commands.push_back(activity); } - WFCommand::RequestCancelActivity(_) => unimplemented!(), + WFCommand::RequestCancelActivity(attrs) => self.process_cancellation( + &CommandID::Activity(attrs.activity_id.to_owned()), + &mut jobs, + )?, WFCommand::CompleteWorkflow(attrs) => { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); @@ -592,9 +588,44 @@ impl WorkflowMachines { WFCommand::NoCommandsFromLang => (), } } + Ok(jobs) + } + + fn process_cancellation(&mut self, id: &CommandID, jobs: &mut Vec) -> Result<()> { + let m_key = self.get_machine_key(id)?; + let res = self.machine_mut(m_key).cancel()?; + debug!(machine_responses = ?res, cmd_id = ?id, "Req cancel responses"); + for r in res { + match r { + MachineResponse::IssueNewCommand(c) => { + self.current_wf_task_commands.push_back(CommandAndMachine { + command: c, + machine: m_key, + }) + } + MachineResponse::PushWFJob(j) => { + jobs.push(j); + } + v => { + return Err(WFMachinesError::UnexpectedMachineResponse( + v, + format!("When cancelling {:?}", id), + )); + } + } + } Ok(()) } + fn get_machine_key(&mut self, id: &CommandID) -> Result { + Ok(*self.id_to_machine.get(id).ok_or_else(|| { + WFMachinesError::MissingAssociatedMachine(format!( + "Missing associated machine for {:?}", + id + )) + })?) + } + /// Transfer commands from `current_wf_task_commands` to `commands`, so they may be sent off /// to the server. While doing so, [TemporalStateMachine::handle_command] is called on the /// machine associated with the command. diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 90a96d777..8856f764a 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -355,6 +355,16 @@ pub mod temporal { } } + impl From for command::Attributes { + fn from(c: workflow_commands::RequestCancelActivity) -> Self { + Self::RequestCancelActivityTaskCommandAttributes( + RequestCancelActivityTaskCommandAttributes { + scheduled_event_id: c.scheduled_event_id, + }, + ) + } + } + impl From for command::Attributes { fn from(s: workflow_commands::ScheduleActivity) -> Self { Self::ScheduleActivityTaskCommandAttributes( diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 5d01649d6..ed3e3c30d 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -3,7 +3,8 @@ use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ - history_event, ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, + history_event, ActivityTaskCancelRequestedEventAttributes, ActivityTaskCanceledEventAttributes, + ActivityTaskCompletedEventAttributes, ActivityTaskFailedEventAttributes, ActivityTaskScheduledEventAttributes, ActivityTaskStartedEventAttributes, TimerCanceledEventAttributes, TimerFiredEventAttributes, }; @@ -253,6 +254,160 @@ pub fn single_failed_activity(activity_id: &str) -> TestHistoryBuilder { t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED +/// 11: EVENT_TYPE_WORKFLOW_EXECUTION_COMPLETED +pub fn cancel_scheduled_activity(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let scheduled_event_id = t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add( + EventType::ActivityTaskCancelRequested, + history_event::Attributes::ActivityTaskCancelRequestedEventAttributes( + ActivityTaskCancelRequestedEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ); + t.add_workflow_execution_completed(); + t +} + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +pub fn cancel_scheduled_activity_abandon(activity_id: &str, signal_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t +} + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 10: EVENT_TYPE_ACTIVITY_TASK_CANCEL_REQUESTED +/// 11: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 12: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 13: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 14: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 15: EVENT_TYPE_ACTIVITY_TASK_CANCELED +/// 16: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 17: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn cancel_scheduled_activity_with_activity_task_cancel( + activity_id: &str, + signal_id: &str, +) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let scheduled_event_id = t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add( + EventType::ActivityTaskCancelRequested, + history_event::Attributes::ActivityTaskCancelRequestedEventAttributes( + ActivityTaskCancelRequestedEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ); + t.add_we_signaled( + signal_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_full_wf_task(); + t.add( + EventType::ActivityTaskCanceled, + history_event::Attributes::ActivityTaskCanceledEventAttributes( + ActivityTaskCanceledEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ); + t.add_workflow_task_scheduled_and_started(); + t +} /// First signal's payload is "hello " and second is "world" (no metadata for either) /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED /// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index a729cb32c..b7516f93c 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -100,6 +100,12 @@ impl NextWfActivation { } } +#[derive(Debug)] +pub(crate) struct PushCommandsResult { + pub server_commands: Vec, + pub has_new_lang_jobs: bool, +} + impl WorkflowManager { /// Given history that was just obtained from the server, pipe it into this workflow's machines. /// @@ -146,10 +152,14 @@ impl WorkflowManager { } /// Feed the workflow machines new commands issued by the executing workflow code, iterate the - /// workflow machines, and spit out the commands which are ready to be sent off to the server - pub fn push_commands(&mut self, cmds: Vec) -> Result> { + /// workflow machines, and spit out the commands which are ready to be sent off to the server, as + /// well as a possible indication that there are new jobs that must be sent to lang. + pub fn push_commands(&mut self, cmds: Vec) -> Result { self.command_sink.send(cmds)?; - self.machines.iterate_machines()?; - Ok(self.machines.get_commands()) + let has_new_lang_jobs = self.machines.iterate_machines()?; + Ok(PushCommandsResult { + server_commands: self.machines.get_commands(), + has_new_lang_jobs, + }) } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index d67a79712..b642afe7a 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -3,6 +3,7 @@ use crossbeam::channel::{unbounded, RecvTimeoutError}; use futures::{channel::mpsc::UnboundedReceiver, future, Future, SinkExt, StreamExt}; use rand::{self, Rng}; use std::{collections::HashMap, convert::TryFrom, env, sync::Arc, time::Duration}; +use temporal_sdk_core::protos::coresdk::workflow_commands::ActivityCancellationType; use temporal_sdk_core::protos::coresdk::ActivityTaskCompletion; use temporal_sdk_core::{ protos::coresdk::{ @@ -14,13 +15,13 @@ use temporal_sdk_core::{ WfActivationJob, }, workflow_commands::{ - CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, ScheduleActivity, - StartTimer, + CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, RequestCancelActivity, + ScheduleActivity, StartTimer, }, workflow_completion::WfActivationCompletion, }, - CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, ServerGatewayOptions, - Url, + tracing_init, CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, + ServerGatewayOptions, Url, }; // TODO: These tests can get broken permanently if they break one time and the server is not @@ -119,7 +120,7 @@ async fn activity_workflow() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_workflow_task(task_q).await.unwrap(); // Complete workflow task and schedule activity - core.complete_workflow_task(activity_completion_req(task_q, &activity_id, task)) + core.complete_workflow_task(schedule_activity_cmd(task_q, &activity_id, task)) .await .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters @@ -177,7 +178,7 @@ async fn activity_non_retryable_failure() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_workflow_task(task_q).await.unwrap(); // Complete workflow task and schedule activity - core.complete_workflow_task(activity_completion_req(task_q, &activity_id, task)) + core.complete_workflow_task(schedule_activity_cmd(task_q, &activity_id, task)) .await .unwrap(); // Poll activity and verify that it's been scheduled with correct parameters @@ -241,7 +242,7 @@ async fn activity_retry() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_workflow_task(task_q).await.unwrap(); // Complete workflow task and schedule activity - core.complete_workflow_task(activity_completion_req(task_q, &activity_id, task)) + core.complete_workflow_task(schedule_activity_cmd(task_q, &activity_id, task)) .await .unwrap(); // Poll activity 1st time @@ -313,7 +314,7 @@ async fn activity_retry() { .unwrap() } -fn activity_completion_req( +fn schedule_activity_cmd( task_q: &str, activity_id: &str, task: WfActivation, @@ -334,6 +335,244 @@ fn activity_completion_req( task.task_token, ) } +fn schedule_activity_and_timer_cmds( + task_q: &str, + activity_id: &str, + timer_id: &str, + cancellation_type: ActivityCancellationType, + task: WfActivation, +) -> WfActivationCompletion { + WfActivationCompletion::ok_from_cmds( + vec![ + ScheduleActivity { + activity_id: activity_id.to_string(), + activity_type: "test_activity".to_string(), + namespace: NAMESPACE.to_owned(), + task_queue: task_q.to_owned(), + schedule_to_start_timeout: Some(Duration::from_secs(30).into()), + start_to_close_timeout: Some(Duration::from_secs(30).into()), + schedule_to_close_timeout: Some(Duration::from_secs(60).into()), + heartbeat_timeout: Some(Duration::from_secs(60).into()), + cancellation_type: cancellation_type as i32, + ..Default::default() + } + .into(), + StartTimer { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_millis(50).into()), + } + .into(), + ], + task.task_token, + ) +} + +#[tokio::test] +async fn activity_cancellation_try_cancel() { + tracing_init(); + + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core().await; + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + task_q, + &activity_id, + &timer_id, + ActivityCancellationType::TryCancel, + task, + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is try-cancelled. + let activity_task = core.poll_activity_task(task_q).await.unwrap(); + assert_matches!( + activity_task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], + task.task_token, + )) + .await + .unwrap(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); +} + +#[tokio::test] +async fn activity_cancellation_wait_cancellation_completed() { + tracing_init(); + + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core().await; + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + task_q, + &activity_id, + &timer_id, + ActivityCancellationType::WaitCancellationCompleted, + task, + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is wait-cancelled. + let activity_task = core.poll_activity_task(task_q).await.unwrap(); + assert_matches!( + activity_task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], + task.task_token, + )) + .await + .unwrap(); + core.complete_activity_task(ActivityTaskCompletion { + task_token: activity_task.task_token, + result: Some(ActivityResult { + status: Some(activity_result::activity_result::Status::Canceled( + activity_result::Cancelation { details: None }, + )), + }), + }) + .await + .unwrap(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); +} + +#[tokio::test] +async fn activity_cancellation_abandon() { + tracing_init(); + + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_cancelled_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core().await; + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None).await; + let activity_id: String = rng.gen::().to_string(); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_workflow_task(task_q).await.unwrap(); + // Complete workflow task and schedule activity and a timer that fires immediately + core.complete_workflow_task(schedule_activity_and_timer_cmds( + task_q, + &activity_id, + &timer_id, + ActivityCancellationType::Abandon, + task, + )) + .await + .unwrap(); + // Poll activity and verify that it's been scheduled with correct parameters, we don't expect to + // complete it in this test as activity is abandoned. + let activity_task = core.poll_activity_task(task_q).await.unwrap(); + assert_matches!( + activity_task.variant, + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) + } + ); + // Poll workflow task and verify that activity has failed. + let task = core.poll_workflow_task(task_q).await.unwrap(); + assert_matches!( + task.jobs.as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer( + FireTimer { timer_id: t_id } + )), + }, + ] => { + assert_eq!(t_id, &timer_id); + } + ); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![RequestCancelActivity { + activity_id, + ..Default::default() + } + .into()], + task.task_token, + )) + .await + .unwrap(); + // Poll workflow task expecting that activation has been created by the state machine + // immediately after the cancellation request. + let task = core.poll_workflow_task(task_q).await.unwrap(); + core.complete_workflow_task(WfActivationCompletion::ok_from_cmds( + vec![CompleteWorkflowExecution { result: None }.into()], + task.task_token, + )) + .await + .unwrap(); +} #[tokio::test] async fn parallel_timer_workflow() {