diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index e2a0bf38b..797fba8b5 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -157,7 +157,7 @@ message CancelActivity { message ActivityTask { string activity_id = 1; - oneof job { + oneof variant { // Start activity execution. StartActivity start = 2; // Attempt to cancel activity execution. @@ -185,12 +185,12 @@ message WFActivationCompletion { } } -/// Used to report activity completion to core and to resolve the activity in a workflow activtion +/// Used to report activity completion to core and to resolve the activity in a workflow activation message ActivityResult { oneof status { ActivityTaskSuccess completed = 1; - ActivityTaskCancelation canceled = 2; - ActivityTaskFailure failed = 3; + ActivityTaskFailure failed = 2; + ActivityTaskCancelation canceled = 3; } } @@ -231,6 +231,7 @@ message WFActivationFailure { /// Used in ActivityResult to report cancellation message ActivityTaskCancelation { + temporal.api.common.v1.Payloads details = 1; } /// Used in ActivityResult to report successful completion diff --git a/src/lib.rs b/src/lib.rs index 04b549fe7..6eeddd7ce 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,15 +22,20 @@ mod workflow; mod test_help; pub use core_tracing::tracing_init; -pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; +pub use pollers::{ + PollTaskRequest, PollTaskResponse, ServerGateway, ServerGatewayApis, ServerGatewayOptions, +}; pub use url::Url; +use crate::protos::coresdk::{ + activity_result, task, ActivityTaskCancelation, ActivityTaskFailure, ActivityTaskSuccess, +}; use crate::{ machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ - task_completion, wf_activation_completion::Status, Task, TaskCompletion, + task_completion, wf_activation_completion, ActivityResult, Task, TaskCompletion, WfActivationCompletion, WfActivationSuccess, }, temporal::api::{ @@ -62,17 +67,24 @@ pub type Result = std::result::Result; /// expected that only one instance of an implementation will exist for the lifetime of the /// worker(s) using it. pub trait Core: Send + Sync { - /// Ask the core for some work, returning a [Task], which will eventually contain either a - /// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the - /// language SDK's responsibility to call the appropriate code with the provided inputs. + /// Ask the core for some work, returning a [Task], which will contain a [protos::coresdk::WfActivation]. + /// It is then the language SDK's responsibility to call the appropriate code with the provided inputs. /// /// TODO: Examples + /// TODO: rename to poll_workflow_task and change result type to WfActivation fn poll_task(&self, task_queue: &str) -> Result; + /// Ask the core for some work, returning a [protos::coresdk::Task], which will contain a [protos::coresdk::ActivityTask]. + /// It is then the language SDK's responsibility to call the completion API. + fn poll_activity_task(&self, task_queue: &str) -> Result; + /// Tell the core that some work has been completed - whether as a result of running workflow /// code or executing an activity. fn complete_task(&self, req: TaskCompletion) -> Result<()>; + /// Tell the core that activity has completed. This will result in core calling the server and completing activity synchronously. + fn complete_activity_task(&self, req: TaskCompletion) -> Result<()>; + /// Returns an instance of ServerGateway. fn server_gateway(&self) -> Result>; @@ -158,8 +170,8 @@ where // This will block forever (unless interrupted by shutdown) in the event there is no work // from the server - match self.poll_server(task_queue) { - Ok(work) => { + match self.poll_server(PollTaskRequest::Workflow(task_queue.to_owned())) { + Ok(PollTaskResponse::WorkflowTask(work)) => { let task_token = work.task_token.clone(); debug!( task_token = %fmt_task_token(&task_token), @@ -180,8 +192,29 @@ where variant: next_activation.activation.map(Into::into), }) } + // Drain pending activations in case of shutdown. Err(CoreError::ShuttingDown) => self.poll_task(task_queue), Err(e) => Err(e), + Ok(PollTaskResponse::ActivityTask(_)) => Err(CoreError::UnexpectedResult), + } + } + + #[instrument(skip(self))] + fn poll_activity_task(&self, task_queue: &str) -> Result { + if self.shutdown_requested.load(Ordering::SeqCst) { + return Err(CoreError::ShuttingDown); + } + + match self.poll_server(PollTaskRequest::Activity(task_queue.to_owned())) { + Ok(PollTaskResponse::ActivityTask(work)) => { + let task_token = work.task_token.clone(); + Ok(Task { + task_token, + variant: Some(task::Variant::Activity(work.into())), + }) + } + Err(e) => Err(e), + Ok(PollTaskResponse::WorkflowTask(_)) => Err(CoreError::UnexpectedResult), } } @@ -201,7 +234,7 @@ where .map(|x| x.value().clone()) .ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => { + wf_activation_completion::Status::Successful(success) => { let commands = self.push_lang_commands(&run_id, success)?; // We only actually want to send commands back to the server if there are // no more pending activations -- in other words the lang SDK has caught @@ -213,7 +246,7 @@ where )?; } } - Status::Failed(failure) => { + wf_activation_completion::Status::Failed(failure) => { // Blow up any cached data associated with the workflow self.evict_run(&run_id); @@ -229,10 +262,41 @@ where } Ok(()) } + _ => Err(CoreError::MalformedCompletion(req)), + } + } + + #[instrument(skip(self))] + fn complete_activity_task(&self, req: TaskCompletion) -> Result<()> { + match req { TaskCompletion { - variant: Some(task_completion::Variant::Activity(_)), - .. - } => unimplemented!(), + task_token, + variant: + Some(task_completion::Variant::Activity(ActivityResult { + status: Some(status), + })), + } => { + match status { + activity_result::Status::Completed(ActivityTaskSuccess { result }) => { + self.runtime.block_on( + self.server_gateway + .complete_activity_task(task_token, result), + )?; + } + activity_result::Status::Failed(ActivityTaskFailure { failure }) => { + self.runtime.block_on( + self.server_gateway.fail_activity_task(task_token, failure), + )?; + } + activity_result::Status::Canceled(ActivityTaskCancelation { details }) => { + self.runtime.block_on( + self.server_gateway + .cancel_activity_task(task_token, details), + )?; + } + } + Ok(()) + } _ => Err(CoreError::MalformedCompletion(req)), } } @@ -298,7 +362,7 @@ impl CoreSDK { /// Blocks polling the server until it responds, or until the shutdown flag is set (aborting /// the poll) - fn poll_server(&self, task_queue: &str) -> Result { + fn poll_server(&self, req: PollTaskRequest) -> Result { self.runtime.block_on(async { let shutdownfut = async { loop { @@ -308,14 +372,12 @@ impl CoreSDK { tokio::time::sleep(Duration::from_millis(100)).await; } }; - let pollfut = self - .server_gateway - .poll_workflow_task(task_queue.to_owned()); + let poll_result_future = self.server_gateway.poll_task(req); tokio::select! { _ = shutdownfut => { Err(CoreError::ShuttingDown) } - r = pollfut => r + r = poll_result_future => r } }) } @@ -377,12 +439,17 @@ pub enum CoreError { /// When thrown from complete_task, it means you should poll for a new task, receive a new /// task token, and complete that task. UnhandledCommandWhenCompleting, + /// Indicates that underlying function returned Ok, but result type was incorrect. + /// This is likely a result of a bug and should never happen. + UnexpectedResult, } #[cfg(test)] mod test { use super::*; - use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; + use crate::protos::temporal::api::command::v1::{ + FailWorkflowExecutionCommandAttributes, ScheduleActivityTaskCommandAttributes, + }; use crate::{ machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ @@ -415,6 +482,14 @@ mod test { build_fake_core(wfid, RUN_ID, &mut t, hist_batches) } + #[fixture(hist_batches = &[])] + fn single_activity_setup(hist_batches: &[usize]) -> FakeCore { + let wfid = "fake_wf_id"; + + let mut t = canned_histories::single_activity("fake_activity"); + build_fake_core(wfid, RUN_ID, &mut t, hist_batches) + } + #[rstest(core, case::incremental(single_timer_setup(&[1, 2])), case::replay(single_timer_setup(&[2])) @@ -455,7 +530,47 @@ mod test { .unwrap(); } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(core, + case::incremental(single_activity_setup(&[1, 2])), + case::replay(single_activity_setup(&[2])) + )] + fn single_activity_completion(core: FakeCore) { + let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }] + ); + assert!(core.workflow_machines.exists(RUN_ID)); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ScheduleActivityTaskCommandAttributes { + activity_id: "fake_activity".to_string(), + ..Default::default() + } + .into()], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity(_)), + }] + ); + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } + + #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 23efd857d..4b62328e9 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,9 +1,25 @@ -use rustfsm::{fsm, TransitionResult}; +use crate::machines::workflow_machines::MachineResponse; +use crate::machines::{Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError}; +use crate::protos::coresdk::{ + activity_result, activity_task, ActivityResult, ActivityTaskSuccess, CancelActivity, + ResolveActivity, StartActivity, +}; +use crate::protos::temporal::api::command::v1::{Command, ScheduleActivityTaskCommandAttributes}; +use crate::protos::temporal::api::common::v1::Payloads; +use crate::protos::temporal::api::enums::v1::{CommandType, EventType}; +use crate::protos::temporal::api::history::v1::{ + history_event, ActivityTaskCompletedEventAttributes, HistoryEvent, +}; +use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; +use std::convert::TryFrom; // Schedule / cancel are "explicit events" (imperative rather than past events?) fsm! { - pub(super) name ActivityMachine; command ActivityCommand; error ActivityMachineError; + pub(super) name ActivityMachine; + command ActivityMachineCommand; + error WFMachinesError; + shared_state SharedState; Created --(Schedule, on_schedule)--> ScheduleCommandCreated; @@ -16,7 +32,7 @@ fsm! { ScheduledEventRecorded --(ActivityTaskTimedOut, on_task_timed_out) --> TimedOut; ScheduledEventRecorded --(Cancel, on_canceled) --> ScheduledActivityCancelCommandCreated; - Started --(ActivityTaskCompleted, on_activity_task_completed) --> Completed; + Started --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; Started --(ActivityTaskFailed, on_activity_task_failed) --> Failed; Started --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; Started --(Cancel, on_canceled) --> StartedActivityCancelCommandCreated; @@ -42,17 +58,152 @@ fsm! { StartedActivityCancelEventRecorded --(ActivityTaskFailed, on_activity_task_failed) --> Failed; StartedActivityCancelEventRecorded - --(ActivityTaskCompleted, on_activity_task_completed) --> Completed; + --(ActivityTaskCompleted(ActivityTaskCompletedEventAttributes), on_activity_task_completed) --> Completed; StartedActivityCancelEventRecorded --(ActivityTaskTimedOut, on_activity_task_timed_out) --> TimedOut; StartedActivityCancelEventRecorded --(ActivityTaskCanceled, on_activity_task_canceled) --> Canceled; } -#[derive(thiserror::Error, Debug)] -pub(super) enum ActivityMachineError {} +#[derive(Debug, derive_more::Display)] +pub(super) enum ActivityMachineCommand { + #[display(fmt = "Complete")] + Complete(Option), +} + +#[derive(Debug, Clone, derive_more::Display)] +pub(super) enum ActivityCancellationType { + /// Wait for activity cancellation completion. Note that activity must heartbeat to receive a + /// cancellation notification. This can block the cancellation for a long time if activity doesn't + /// heartbeat or chooses to ignore the cancellation request. + WaitCancellationCompleted, + + /// Initiate a cancellation request and immediately report cancellation to the workflow. + TryCancel, + + /// Do not request cancellation of the activity and immediately report cancellation to the workflow + Abandon, +} + +impl Default for ActivityCancellationType { + fn default() -> Self { + ActivityCancellationType::TryCancel + } +} + +/// Creates a new activity state machine and a command to schedule it on the server. +pub(super) fn new_activity( + attribs: ScheduleActivityTaskCommandAttributes, +) -> NewMachineWithCommand { + let (activity, add_cmd) = ActivityMachine::new_scheduled(attribs); + NewMachineWithCommand { + command: add_cmd, + machine: activity, + } +} + +impl ActivityMachine { + /// Create a new activity and immediately schedule it. + pub(crate) fn new_scheduled(attribs: ScheduleActivityTaskCommandAttributes) -> (Self, Command) { + let mut s = Self { + state: Created {}.into(), + shared_state: SharedState { + attrs: attribs, + cancellation_type: ActivityCancellationType::TryCancel, + }, + }; + s.on_event_mut(ActivityMachineEvents::Schedule) + .expect("Scheduling activities doesn't fail"); + let cmd = Command { + command_type: CommandType::ScheduleActivityTask as i32, + attributes: Some(s.shared_state().attrs.clone().into()), + }; + (s, cmd) + } +} + +impl TryFrom for ActivityMachineEvents { + type Error = WFMachinesError; + + fn try_from(e: HistoryEvent) -> Result { + Ok(match EventType::from_i32(e.event_type) { + Some(EventType::ActivityTaskScheduled) => Self::ActivityTaskScheduled, + Some(EventType::ActivityTaskStarted) => Self::ActivityTaskStarted, + Some(EventType::ActivityTaskCompleted) => { + if let Some(history_event::Attributes::ActivityTaskCompletedEventAttributes( + attrs, + )) = e.attributes + { + Self::ActivityTaskCompleted(attrs) + } else { + return Err(WFMachinesError::MalformedEvent( + e, + "Activity completion attributes were unset".to_string(), + )); + } + } + Some(EventType::ActivityTaskFailed) => Self::ActivityTaskFailed, + Some(EventType::ActivityTaskTimedOut) => Self::ActivityTaskTimedOut, + Some(EventType::ActivityTaskCancelRequested) => Self::ActivityTaskCancelRequested, + Some(EventType::ActivityTaskCanceled) => Self::ActivityTaskCanceled, + _ => { + return Err(WFMachinesError::UnexpectedEvent( + e, + "Activity machine does not handle this event", + )) + } + }) + } +} -pub(super) enum ActivityCommand {} +impl TryFrom for ActivityMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::ScheduleActivityTask => Self::CommandScheduleActivityTask, + CommandType::RequestCancelActivityTask => Self::CommandRequestCancelActivityTask, + _ => return Err(()), + }) + } +} + +impl WFMachinesAdapter for ActivityMachine { + fn adapt_response( + &self, + event: &HistoryEvent, + has_next_event: bool, + my_command: ActivityMachineCommand, + ) -> Result, WFMachinesError> { + Ok(match my_command { + ActivityMachineCommand::Complete(result) => vec![ResolveActivity { + activity_id: self.shared_state.attrs.activity_id.clone(), + result: Some(ActivityResult { + status: Some(activity_result::Status::Completed(ActivityTaskSuccess { + result, + })), + }), + } + .into()], + }) + } +} + +impl Cancellable for ActivityMachine { + fn cancel(&mut self) -> Result> { + unimplemented!() + } + + fn was_cancelled_before_sent_to_server(&self) -> bool { + false // TODO return cancellation flag from the shared state + } +} + +#[derive(Default, Clone)] +pub(super) struct SharedState { + attrs: ScheduleActivityTaskCommandAttributes, + cancellation_type: ActivityCancellationType, +} #[derive(Default, Clone)] pub(super) struct Created {} @@ -101,9 +252,15 @@ impl ScheduledEventRecorded { pub(super) struct Started {} impl Started { - pub(super) fn on_activity_task_completed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_completed( + self, + attrs: ActivityTaskCompletedEventAttributes, + ) -> ActivityMachineTransition { // notify_completed - ActivityMachineTransition::default::() + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::Complete(attrs.result)], + Completed::default(), + ) } pub(super) fn on_activity_task_failed(self) -> ActivityMachineTransition { // notify_failed @@ -163,9 +320,15 @@ impl StartedActivityCancelCommandCreated { pub(super) struct StartedActivityCancelEventRecorded {} impl StartedActivityCancelEventRecorded { - pub(super) fn on_activity_task_completed(self) -> ActivityMachineTransition { + pub(super) fn on_activity_task_completed( + self, + attrs: ActivityTaskCompletedEventAttributes, + ) -> ActivityMachineTransition { // notify_completed - ActivityMachineTransition::default::() + ActivityMachineTransition::ok( + vec![ActivityMachineCommand::Complete(attrs.result)], + Completed::default(), + ) } pub(super) fn on_activity_task_failed(self) -> ActivityMachineTransition { // notify_failed @@ -198,9 +361,3 @@ pub(super) struct TimedOut {} #[derive(Default, Clone)] pub(super) struct Canceled {} - -#[cfg(test)] -mod activity_machine_tests { - #[test] - fn test() {} -} diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 9ef95a3c4..86df95148 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -33,7 +33,10 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; +use crate::protos::temporal::api::command::v1::{ + FailWorkflowExecutionCommandAttributes, RequestCancelActivityTaskCommandAttributes, + ScheduleActivityTaskCommandAttributes, +}; use crate::{ core_tracing::VecDisplayer, machines::workflow_machines::MachineResponse, @@ -61,9 +64,12 @@ pub(crate) type ProtoCommand = Command; /// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next. /// EX: Create a new timer, complete the workflow, etc. #[derive(Debug, derive_more::From)] +#[allow(clippy::large_enum_variant)] pub enum WFCommand { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, + AddActivity(ScheduleActivityTaskCommandAttributes), + RequestCancelActivity(RequestCancelActivityTaskCommandAttributes), AddTimer(StartTimerCommandAttributes), CancelTimer(CancelTimerCommandAttributes), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), @@ -85,6 +91,12 @@ impl TryFrom for WFCommand { })) => match attrs { Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), Attributes::CancelTimerCommandAttributes(s) => Ok(WFCommand::CancelTimer(s)), + Attributes::ScheduleActivityTaskCommandAttributes(s) => { + Ok(WFCommand::AddActivity(s)) + } + Attributes::RequestCancelActivityTaskCommandAttributes(s) => { + Ok(WFCommand::RequestCancelActivity(s)) + } Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { Ok(WFCommand::CompleteWorkflow(c)) } diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 55bdade46..1b126530c 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,3 +1,4 @@ +use crate::machines::workflow_machines::CommandID; use crate::{ machines::WFCommand, protos::{ @@ -21,6 +22,7 @@ use tokio::{ runtime::Runtime, task::{JoinError, JoinHandle}, }; +use CommandID::Timer; pub struct TestWorkflowDriver { join_handle: Option>, @@ -36,9 +38,9 @@ struct TestWfDriverCache { impl TestWfDriverCache { /// Unblock a command by ID - fn unblock(&self, id: &str) { + fn unblock(&self, id: CommandID) { let mut bc = self.blocking_condvar.0.lock(); - if let Some(t) = bc.issued_commands.remove(id) { + if let Some(t) = bc.issued_commands.remove(&id) { t.unblocker.send(()).unwrap() }; } @@ -47,12 +49,12 @@ impl TestWfDriverCache { /// removed from the "lang" side without needing a response from core. fn cancel_timer(&self, id: &str) { let mut bc = self.blocking_condvar.0.lock(); - bc.issued_commands.remove(id); + bc.issued_commands.remove(&CommandID::Timer(id.to_owned())); } /// Track a new command that the wf has sent down the command sink. The command starts in /// [CommandStatus::Sent] and will be marked blocked once it is `await`ed - fn add_sent_cmd(&self, id: String) -> oneshot::Receiver<()> { + fn add_sent_cmd(&self, id: CommandID) -> oneshot::Receiver<()> { let (tx, rx) = oneshot::channel(); let mut bc = self.blocking_condvar.0.lock(); bc.issued_commands.insert( @@ -66,9 +68,9 @@ impl TestWfDriverCache { } /// Indicate that a command is being `await`ed - fn set_cmd_blocked(&self, id: &str) { + fn set_cmd_blocked(&self, id: CommandID) { let mut bc = self.blocking_condvar.0.lock(); - if let Some(cmd) = bc.issued_commands.get_mut(id) { + if let Some(cmd) = bc.issued_commands.get_mut(&id) { cmd.status = CommandStatus::Blocked; } // Wake up the fetcher thread, since we have blocked on a command and that would mean we've @@ -83,8 +85,7 @@ impl TestWfDriverCache { #[derive(Default, Debug)] struct BlockingCondInfo { /// Holds a mapping of timer id -> oneshot channel to resolve it - /// TODO: Upgrade w/ enum key once activities are in - issued_commands: HashMap, + issued_commands: HashMap, wf_is_done: bool, } @@ -130,10 +131,10 @@ impl CommandSender { let tid = a.timer_id.clone(); let c = WFCommand::AddTimer(a); self.send(c); - let rx = self.twd_cache.add_sent_cmd(tid.clone()); + let rx = self.twd_cache.add_sent_cmd(Timer(tid.clone())); let cache_clone = self.twd_cache.clone(); async move { - cache_clone.set_cmd_blocked(&tid); + cache_clone.set_cmd_blocked(Timer(tid)); rx.await } } @@ -240,7 +241,7 @@ impl WorkflowFetcher for TestWorkflowDriver { impl ActivationListener for TestWorkflowDriver { fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) { if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation { - self.cache.unblock(timer_id); + self.cache.unblock(Timer(timer_id.to_owned())); } } } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 011da2805..2f9e75108 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -14,7 +14,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, }, - CoreSDK, ServerGatewayApis, + CoreSDK, PollTaskResponse, ServerGatewayApis, }; use rand::{thread_rng, Rng}; use std::sync::atomic::AtomicBool; @@ -57,9 +57,9 @@ pub(crate) fn build_fake_core( let mut tasks = VecDeque::from(responses); let mut mock_gateway = MockServerGatewayApis::new(); mock_gateway - .expect_poll_workflow_task() + .expect_poll_task() .times(response_batches.len()) - .returning(move |_| Ok(tasks.pop_front().unwrap())); + .returning(move |_| Ok(PollTaskResponse::WorkflowTask(tasks.pop_front().unwrap()))); // Response not really important here mock_gateway .expect_complete_workflow_task() diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 47131db3f..b25c285a4 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::machines::activity_state_machine::new_activity; use crate::workflow::{DrivenWorkflow, WorkflowFetcher}; use crate::{ core_tracing::VecDisplayer, @@ -55,7 +56,7 @@ pub(crate) struct WorkflowMachines { /// Maps timer ids as created by workflow authors to their associated machines /// TODO: Make this apply to *all* cancellable things, once we've added more. Key can be enum. - timer_id_to_machine: HashMap, + id_to_machine: HashMap, /// Queued commands which have been produced by machines and await processing / being sent to /// the server. @@ -71,6 +72,12 @@ pub(crate) struct WorkflowMachines { drive_me: DrivenWorkflow, } +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum CommandID { + Timer(String), + Activity(String), +} + slotmap::new_key_type! { struct MachineKey; } #[derive(Debug, derive_more::Display)] #[display(fmt = "Cmd&Machine({})", "command")] @@ -86,6 +93,7 @@ struct CommandAndMachine { pub enum MachineResponse { #[display(fmt = "PushWFJob")] PushWFJob(#[from(forward)] wf_activation_job::Variant), + IssueNewCommand(ProtoCommand), #[display(fmt = "TriggerWFTaskStarted")] TriggerWFTaskStarted { @@ -141,7 +149,7 @@ impl WorkflowMachines { current_wf_time: None, all_machines: Default::default(), machines_by_event_id: Default::default(), - timer_id_to_machine: Default::default(), + id_to_machine: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), } @@ -156,7 +164,7 @@ impl WorkflowMachines { /// is the last event in the history. /// /// TODO: Describe what actually happens in here - #[instrument(level = "debug", skip(self), fields(run_id = %self.run_id))] + #[instrument(level = "debug", skip(self), fields(run_id = % self.run_id))] pub(crate) fn handle_event( &mut self, event: &HistoryEvent, @@ -364,7 +372,7 @@ impl WorkflowMachines { return Err(WFMachinesError::UnexpectedEvent( event.clone(), "The event is non a non-stateful event, but we tried to handle it as one", - )) + )); } } Ok(()) @@ -524,13 +532,14 @@ impl WorkflowMachines { WFCommand::AddTimer(attrs) => { let tid = attrs.timer_id.clone(); let timer = self.add_new_machine(new_timer(attrs)); - self.timer_id_to_machine.insert(tid, timer.machine); + self.id_to_machine + .insert(CommandID::Timer(tid), timer.machine); self.current_wf_task_commands.push_back(timer); } WFCommand::CancelTimer(attrs) => { let mkey = *self - .timer_id_to_machine - .get(&attrs.timer_id) + .id_to_machine + .get(&CommandID::Timer(attrs.timer_id.to_owned())) .ok_or_else(|| { WFMachinesError::MissingAssociatedMachine(format!( "Missing associated machine for cancelling timer {}", @@ -550,10 +559,18 @@ impl WorkflowMachines { return Err(WFMachinesError::UnexpectedMachineResponse( v, "When cancelling timer", - )) + )); } } } + WFCommand::AddActivity(attrs) => { + let aid = attrs.activity_id.clone(); + let activity = self.add_new_machine(new_activity(attrs)); + self.id_to_machine + .insert(CommandID::Activity(aid), activity.machine); + self.current_wf_task_commands.push_back(activity); + } + WFCommand::RequestCancelActivity(_) => unimplemented!(), WFCommand::CompleteWorkflow(attrs) => { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 6563033bc..f057ffdc5 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -2,6 +2,10 @@ use std::time::Duration; use crate::protos::temporal::api::common::v1::{Payloads, WorkflowExecution}; use crate::protos::temporal::api::workflowservice::v1::{ + PollActivityTaskQueueRequest, PollActivityTaskQueueResponse, + RespondActivityTaskCanceledRequest, RespondActivityTaskCanceledResponse, + RespondActivityTaskCompletedRequest, RespondActivityTaskCompletedResponse, + RespondActivityTaskFailedRequest, RespondActivityTaskFailedResponse, SignalWorkflowExecutionRequest, SignalWorkflowExecutionResponse, }; use crate::{ @@ -100,11 +104,16 @@ pub trait ServerGatewayApis { ) -> Result; /// Fetch new work. Should block indefinitely if there is no work. - async fn poll_workflow_task(&self, task_queue: String) - -> Result; + async fn poll_task(&self, req: PollTaskRequest) -> Result; + + /// Fetch new work. Should block indefinitely if there is no work. + async fn poll_workflow_task(&self, task_queue: String) -> Result; + + /// Fetch new work. Should block indefinitely if there is no work. + async fn poll_activity_task(&self, task_queue: String) -> Result; /// Complete a task by sending it to the server. `task_token` is the task token that would've - /// been received from [PollWorkflowTaskQueueApi::poll]. `commands` is a list of new commands + /// been received from [poll_workflow_task_queue] API. `commands` is a list of new commands /// to send to the server, such as starting a timer. async fn complete_workflow_task( &self, @@ -112,6 +121,33 @@ pub trait ServerGatewayApis { commands: Vec, ) -> Result; + /// Complete activity task by sending response to the server. `task_token` contains activity identifier that + /// would've been received from [poll_activity_task_queue] API. `result` is a blob that contains + /// activity response. + async fn complete_activity_task( + &self, + task_token: Vec, + result: Option, + ) -> Result; + + /// Cancel activity task by sending response to the server. `task_token` contains activity identifier that + /// would've been received from [poll_activity_task_queue] API. `details` is a blob that provides arbitrary + /// user defined cancellation info. + async fn cancel_activity_task( + &self, + task_token: Vec, + details: Option, + ) -> Result; + + /// Fail activity task by sending response to the server. `task_token` contains activity identifier that + /// would've been received from [poll_activity_task_queue] API. `failure` provides failure details, such + /// as message, cause and stack trace. + async fn fail_activity_task( + &self, + task_token: Vec, + failure: Option, + ) -> Result; + /// Fail task by sending the failure to the server. `task_token` is the task token that would've /// been received from [PollWorkflowTaskQueueApi::poll]. async fn fail_workflow_task( @@ -131,6 +167,22 @@ pub trait ServerGatewayApis { ) -> Result; } +/// Contains poll task request. String parameter defines a task queue to be polled. +pub enum PollTaskRequest { + /// Instructs core to poll for workflow task. + Workflow(String), + /// Instructs core to poll for activity task. + Activity(String), +} + +/// Contains poll task response. +pub enum PollTaskResponse { + /// Poll response for workflow task. + WorkflowTask(PollWorkflowTaskQueueResponse), + /// Poll response for activity task. + ActivityTask(PollActivityTaskQueueResponse), +} + #[async_trait::async_trait] impl ServerGatewayApis for ServerGateway { async fn start_workflow( @@ -162,10 +214,18 @@ impl ServerGatewayApis for ServerGateway { .into_inner()) } - async fn poll_workflow_task( - &self, - task_queue: String, - ) -> Result { + async fn poll_task(&self, req: PollTaskRequest) -> Result { + match req { + PollTaskRequest::Workflow(task_queue) => { + Ok(Self::poll_workflow_task(self, task_queue).await?) + } + PollTaskRequest::Activity(task_queue) => { + Ok(Self::poll_activity_task(self, task_queue).await?) + } + } + } + + async fn poll_workflow_task(&self, task_queue: String) -> Result { let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.clone(), task_queue: Some(TaskQueue { @@ -176,12 +236,33 @@ impl ServerGatewayApis for ServerGateway { binary_checksum: self.opts.worker_binary_id.clone(), }; - Ok(self - .service - .clone() - .poll_workflow_task_queue(request) - .await? - .into_inner()) + Ok(PollTaskResponse::WorkflowTask( + self.service + .clone() + .poll_workflow_task_queue(request) + .await? + .into_inner(), + )) + } + + async fn poll_activity_task(&self, task_queue: String) -> Result { + let request = PollActivityTaskQueueRequest { + namespace: self.opts.namespace.clone(), + task_queue: Some(TaskQueue { + name: task_queue, + kind: TaskQueueKind::Normal as i32, + }), + identity: self.opts.identity.clone(), + task_queue_metadata: None, + }; + + Ok(PollTaskResponse::ActivityTask( + self.service + .clone() + .poll_activity_task_queue(request) + .await? + .into_inner(), + )) } async fn complete_workflow_task( @@ -260,4 +341,58 @@ impl ServerGatewayApis for ServerGateway { .await? .into_inner()) } + + async fn complete_activity_task( + &self, + task_token: Vec, + result: Option, + ) -> Result { + Ok(self + .service + .clone() + .respond_activity_task_completed(RespondActivityTaskCompletedRequest { + task_token, + result, + identity: self.opts.identity.clone(), + namespace: self.opts.namespace.clone(), + }) + .await? + .into_inner()) + } + + async fn cancel_activity_task( + &self, + task_token: Vec, + details: Option, + ) -> Result { + Ok(self + .service + .clone() + .respond_activity_task_canceled(RespondActivityTaskCanceledRequest { + task_token, + details, + identity: self.opts.identity.clone(), + namespace: self.opts.namespace.clone(), + }) + .await? + .into_inner()) + } + + async fn fail_activity_task( + &self, + task_token: Vec, + failure: Option, + ) -> Result { + Ok(self + .service + .clone() + .respond_activity_task_failed(RespondActivityTaskFailedRequest { + task_token, + failure, + identity: self.opts.identity.clone(), + namespace: self.opts.namespace.clone(), + }) + .await? + .into_inner()) + } } diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 87d82b5a9..223b1dbf9 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -13,6 +13,7 @@ pub mod coresdk { use super::temporal::api::command::v1::Command as ApiCommand; use super::temporal::api::enums::v1::WorkflowTaskFailedCause; use super::temporal::api::failure::v1::Failure; + use crate::protos::temporal::api::common::v1::Payloads; use command::Variant; pub type HistoryEventId = i64; @@ -33,6 +34,14 @@ pub mod coresdk { vec![] } } + /// Returns any contained jobs if this task was a wf activation and it had some + pub fn get_activity_variant(&self) -> Option { + if let Some(task::Variant::Activity(a)) = &self.variant { + a.variant.clone() + } else { + None + } + } /// Returns the workflow run id if the task was a workflow pub fn get_run_id(&self) -> Option<&str> { @@ -79,6 +88,17 @@ pub mod coresdk { } } + pub fn ok_activity(result: Option, task_token: Vec) -> Self { + TaskCompletion { + task_token, + variant: Some(task_completion::Variant::Activity(ActivityResult { + status: Some(activity_result::Status::Completed(ActivityTaskSuccess { + result, + })), + })), + } + } + pub fn fail(task_token: Vec, cause: WorkflowTaskFailedCause, failure: Failure) -> Self { Self { task_token, @@ -104,7 +124,9 @@ pub mod temporal { pub mod command { pub mod v1 { include!("temporal.api.command.v1.rs"); + use crate::protos::coresdk::{activity_task, ActivityTask, StartActivity}; use crate::protos::temporal::api::enums::v1::CommandType; + use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; use command::Attributes; use std::fmt::{Display, Formatter}; @@ -127,11 +149,44 @@ pub mod temporal { command_type: CommandType::FailWorkflowExecution as i32, attributes: Some(a), }, + a @ Attributes::ScheduleActivityTaskCommandAttributes(_) => Self { + command_type: CommandType::ScheduleActivityTask as i32, + attributes: Some(a), + }, + a @ Attributes::RequestCancelActivityTaskCommandAttributes(_) => Self { + command_type: CommandType::RequestCancelActivityTask as i32, + attributes: Some(a), + }, _ => unimplemented!(), } } } + impl From for ActivityTask { + fn from(r: PollActivityTaskQueueResponse) -> Self { + ActivityTask { + activity_id: r.activity_id, + variant: Some(activity_task::Variant::Start(StartActivity { + workflow_namespace: r.workflow_namespace, + workflow_type: r.workflow_type, + workflow_execution: r.workflow_execution, + activity_type: r.activity_type, + header: r.header, + input: r.input, + heartbeat_details: r.heartbeat_details, + scheduled_time: r.scheduled_time, + current_attempt_scheduled_time: r.current_attempt_scheduled_time, + started_time: r.started_time, + attempt: r.attempt, + schedule_to_close_timeout: r.schedule_to_close_timeout, + start_to_close_timeout: r.start_to_close_timeout, + heartbeat_timeout: r.heartbeat_timeout, + retry_policy: r.retry_policy, + })), + } + } + } + impl Display for Command { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let ct = CommandType::from_i32(self.command_type) diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 9ac08c3ec..a2cbdf5a8 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -3,7 +3,8 @@ use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ - history_event, TimerCanceledEventAttributes, TimerFiredEventAttributes, + history_event, ActivityTaskCompletedEventAttributes, ActivityTaskScheduledEventAttributes, + ActivityTaskStartedEventAttributes, TimerCanceledEventAttributes, TimerFiredEventAttributes, }; /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED @@ -152,6 +153,56 @@ pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBui t } +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_ACTIVITY_TASK_SCHEDULED +/// 6: EVENT_TYPE_ACTIVITY_TASK_STARTED +/// 7: EVENT_TYPE_ACTIVITY_TASK_COMPLETED +/// 8: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 9: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn single_activity(activity_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + let scheduled_event_id = t.add_get_event_id( + EventType::ActivityTaskScheduled, + Some( + history_event::Attributes::ActivityTaskScheduledEventAttributes( + ActivityTaskScheduledEventAttributes { + activity_id: activity_id.to_string(), + ..Default::default() + }, + ), + ), + ); + let started_event_id = t.add_get_event_id( + EventType::ActivityTaskStarted, + Some( + history_event::Attributes::ActivityTaskStartedEventAttributes( + ActivityTaskStartedEventAttributes { + scheduled_event_id, + ..Default::default() + }, + ), + ), + ); + t.add( + EventType::ActivityTaskCompleted, + history_event::Attributes::ActivityTaskCompletedEventAttributes( + ActivityTaskCompletedEventAttributes { + scheduled_event_id, + started_event_id, + // todo add the result payload + ..Default::default() + }, + ), + ); + t.add_workflow_task_scheduled_and_started(); + t +} + /// First signal's payload is "hello " and second is "world" (no metadata for either) /// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED /// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 36ded4eca..7ea3726cf 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -11,10 +11,14 @@ use std::{ }, time::Duration, }; +use temporal_sdk_core::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; +use temporal_sdk_core::protos::temporal::api::common::v1::{ActivityType, Payload, Payloads}; +use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::{ protos::{ coresdk::{ - wf_activation_job, FireTimer, StartWorkflow, Task, TaskCompletion, WfActivationJob, + activity_result, activity_task, wf_activation_job, ActivityResult, ActivityTaskSuccess, + FireTimer, ResolveActivity, StartWorkflow, Task, TaskCompletion, WfActivationJob, }, temporal::api::{ command::v1::{ @@ -111,6 +115,79 @@ fn timer_workflow() { .unwrap(); } +#[test] +fn activity_workflow() { + let mut rng = rand::thread_rng(); + let task_q_salt: u32 = rng.gen(); + let task_q = &format!("activity_workflow_{}", task_q_salt.to_string()); + let core = get_integ_core(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + let activity_id: String = rng.gen::().to_string(); + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ScheduleActivityTaskCommandAttributes { + activity_id: activity_id.to_string(), + activity_type: Some(ActivityType { + name: "test_activity".to_string(), + }), + namespace: NAMESPACE.to_owned(), + task_queue: Some(TaskQueue { + name: task_q.to_owned(), + kind: 1, + }), + schedule_to_start_timeout: Some(Duration::from_secs(30).into()), + start_to_close_timeout: Some(Duration::from_secs(30).into()), + schedule_to_close_timeout: Some(Duration::from_secs(60).into()), + heartbeat_timeout: Some(Duration::from_secs(60).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + let task = dbg!(core.poll_activity_task(task_q).unwrap()); + assert_matches!( + task.get_activity_variant(), + Some(activity_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, Some(ActivityType { + name: "test_activity".to_string(), + })) + } + ); + let response_payloads = vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }]; + core.complete_activity_task(TaskCompletion::ok_activity( + Some(Payloads { + payloads: response_payloads.clone(), + }), + task.task_token, + )) + .unwrap(); + let task = core.poll_task(task_q).unwrap(); + assert_matches!( + task.get_wf_jobs().as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::ResolveActivity( + ResolveActivity {activity_id: a_id, result: Some(ActivityResult{ + status: Some(activity_result::Status::Completed(ActivityTaskSuccess{result: Some(r)}))})} + )), + }, + ] => { + assert_eq!(a_id, &activity_id); + assert_eq!(r, &Payloads{ payloads: response_payloads.clone()}); + } + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task.task_token, + )) + .unwrap() +} + #[test] fn parallel_timer_workflow() { let task_q = "parallel_timer_workflow"; @@ -504,7 +581,7 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { res.get_wf_jobs().as_slice(), [WfActivationJob { variant: Some(wf_activation_job::Variant::FireTimer(_)), - },] + }] ); let task_token = res.task_token.clone(); @@ -535,7 +612,7 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { res.get_wf_jobs().as_slice(), [WfActivationJob { variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), - },] + }] ); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()],