From 0764ed8ca1d1f6123285832a76d0d4dd6a1b17a9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Mar 2021 18:44:08 -0800 Subject: [PATCH 01/11] Make completing activations with failures work --- src/lib.rs | 140 ++++++++++++++++++++-- src/machines/test_help/history_builder.rs | 21 +++- src/pollers/mod.rs | 57 ++++++++- src/protos/mod.rs | 16 +++ src/test_help/canned_histories.rs | 44 +++++-- src/workflow/concurrency_manager.rs | 8 +- src/workflow/mod.rs | 60 +++++----- tests/integ_tests/poller_test.rs | 1 - tests/integ_tests/simple_wf_tests.rs | 87 ++++++++++---- 9 files changed, 342 insertions(+), 92 deletions(-) delete mode 100644 tests/integ_tests/poller_test.rs diff --git a/src/lib.rs b/src/lib.rs index d872da5fb..b9a3af349 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -24,6 +24,7 @@ pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; use crate::machines::WFMachinesError; +use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; use crate::{ machines::{InconvertibleCommandError, WFCommand}, protos::{ @@ -129,14 +130,46 @@ where /// Maps task tokens to workflow run ids workflow_task_tokens: DashMap, String>, - /// Workflows that are currently under replay will queue their run ID here, indicating that - /// there are more workflow tasks / activations to be performed. - pending_activations: SegQueue, + /// Workflows that are currently under replay will queue here, indicating that there are more + /// workflow tasks / activations to be performed. + pending_activations: PendingActivations, /// Has shutdown been called? shutdown_requested: AtomicBool, } +#[derive(Default)] +struct PendingActivations { + queue: SegQueue, + count_by_id: DashMap, +} + +impl PendingActivations { + pub fn push(&self, v: PendingActivation) { + *self + .count_by_id + .entry(v.run_id.clone()) + .or_insert_with(|| 0) + .value_mut() += 1; + self.queue.push(v); + } + + pub fn pop(&self) -> Option { + let rme = self.queue.pop(); + if let Some(pa) = &rme { + if let Some(mut c) = self.count_by_id.get_mut(&pa.run_id) { + *c.value_mut() -= 1 + } + self.count_by_id.remove_if(&pa.run_id, |_, v| v <= &0); + } + rme + } + + pub fn has_pending(&self, run_id: &str) -> bool { + self.count_by_id.contains_key(run_id) + } +} + #[derive(Debug)] struct PendingActivation { run_id: String, @@ -217,12 +250,29 @@ where let commands = self .workflow_machines .access(&run_id, |mgr| Ok(mgr.machines.get_commands()))?; + // We only actually want to send commands back to the server if there are + // no more pending activations -- in other words the lang SDK has caught + // up on replay. + if !self.pending_activations.has_pending(&run_id) { + self.runtime.block_on( + self.server_gateway + .complete_workflow_task(task_token, commands), + )?; + } + } + Status::Failed(failure) => { + // Blow up any cached data associated with the workflow + self.evict_run(&run_id); + self.runtime.block_on( - self.server_gateway - .complete_workflow_task(task_token, commands), + self.server_gateway.fail_workflow_task( + task_token, + WorkflowTaskFailedCause::from_i32(failure.cause) + .unwrap_or(WorkflowTaskFailedCause::Unspecified), + failure.failure, + ), )?; } - Status::Failed(_) => {} } Ok(()) } @@ -292,6 +342,11 @@ impl CoreSDK { })?; Ok(()) } + + /// Remove a workflow run from the cache entirely + fn evict_run(&self, run_id: &str) { + self.workflow_machines.evict(run_id); + } } /// The error type returned by interactions with [Core] @@ -338,11 +393,15 @@ mod test { wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, WfActivationJob, }, - temporal::api::command::v1::{ - CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, - StartTimerCommandAttributes, + temporal::api::{ + command::v1::{ + CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, + StartTimerCommandAttributes, + }, + enums::v1::{EventType, WorkflowTaskFailedCause}, + failure::v1::Failure, + workflowservice::v1::RespondWorkflowTaskFailedResponse, }, - temporal::api::enums::v1::EventType, }, test_help::canned_histories, }; @@ -546,7 +605,8 @@ mod test { let timer_1_id = "timer1"; let task_queue = "test-task-queue"; - let mut t = canned_histories::workflow_fails_after_timer(timer_1_id, original_run_id); + let mut t = + canned_histories::workflow_fails_with_reset_after_timer(timer_1_id, original_run_id); let core = build_fake_core(wfid, run_id, &mut t, &[2]); let res = core.poll_task(task_queue).unwrap(); @@ -637,4 +697,62 @@ mod test { core.poll_task(task_queue).unwrap(); } } + + #[test] + fn complete_activation_with_failure() { + crate::core_tracing::tracing_init(); + let wfid = "fake_wf_id"; + let timer_id = "timer"; + + let mut t = canned_histories::workflow_fails_with_failure_after_timer(timer_id); + let mut core = build_fake_core(wfid, RUN_ID, &mut t, &[2, 3]); + // Need to create an expectation that we will call a failure completion + Arc::get_mut(&mut core.server_gateway) + .unwrap() + .expect_fail_workflow_task() + .times(1) + .returning(|_, _, _| Ok(RespondWorkflowTaskFailedResponse {})); + + let res = core.poll_task(TASK_Q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + res.task_token, + )) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + core.complete_task(TaskCompletion::fail( + res.task_token, + WorkflowTaskFailedCause::BadBinary, + Failure { + message: "oh noooooooo".to_string(), + ..Default::default() + }, + )) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + // TODO: assertions + // Need to re-issue the start timer command (we are replaying) + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + res.task_token, + )) + .unwrap(); + // Now we may complete the workflow + let res = core.poll_task(TASK_Q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap(); + } } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 46c22b8bf..e1f6874a9 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,4 +1,5 @@ use super::Result; +use crate::protos::temporal::api::failure::v1::Failure; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -99,7 +100,25 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowExecutionCompleted, attrs.into()); } - pub fn add_workflow_task_failed(&mut self, cause: WorkflowTaskFailedCause, new_run_id: &str) { + pub fn add_workflow_task_failed_with_failure( + &mut self, + cause: WorkflowTaskFailedCause, + failure: Failure, + ) { + let attrs = WorkflowTaskFailedEventAttributes { + scheduled_event_id: self.workflow_task_scheduled_event_id, + cause: cause.into(), + failure: Some(failure), + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into()); + } + + pub fn add_workflow_task_failed_new_id( + &mut self, + cause: WorkflowTaskFailedCause, + new_run_id: &str, + ) { let attrs = WorkflowTaskFailedEventAttributes { scheduled_event_id: self.workflow_task_scheduled_event_id, cause: cause.into(), diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index f0a46b5e6..d9c0238e8 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -4,17 +4,20 @@ use crate::{ machines::ProtoCommand, protos::temporal::api::{ common::v1::WorkflowType, - enums::v1::TaskQueueKind, + enums::v1::{TaskQueueKind, WorkflowTaskFailedCause}, + failure::v1::Failure, taskqueue::v1::TaskQueue, workflowservice::v1::{ workflow_service_client::WorkflowServiceClient, PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedRequest, - RespondWorkflowTaskCompletedResponse, + RespondWorkflowTaskCompletedResponse, RespondWorkflowTaskFailedRequest, + RespondWorkflowTaskFailedResponse, StartWorkflowExecutionRequest, + StartWorkflowExecutionResponse, }, - workflowservice::v1::{StartWorkflowExecutionRequest, StartWorkflowExecutionResponse}, }, workflow::{ - PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, StartWorkflowExecutionApi, + PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, RespondWorkflowTaskFailedApi, + StartWorkflowExecutionApi, }, Result, }; @@ -85,12 +88,18 @@ pub struct ServerGateway { /// This trait provides ways to call the temporal server itself pub trait ServerGatewayApis: - PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi + StartWorkflowExecutionApi + PollWorkflowTaskQueueApi + + RespondWorkflowTaskCompletedApi + + StartWorkflowExecutionApi + + RespondWorkflowTaskFailedApi { } impl ServerGatewayApis for T where - T: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi + StartWorkflowExecutionApi + T: PollWorkflowTaskQueueApi + + RespondWorkflowTaskCompletedApi + + StartWorkflowExecutionApi + + RespondWorkflowTaskFailedApi { } @@ -140,6 +149,31 @@ impl RespondWorkflowTaskCompletedApi for ServerGateway { } } +#[async_trait::async_trait] +impl RespondWorkflowTaskFailedApi for ServerGateway { + async fn fail_workflow_task( + &self, + task_token: Vec, + cause: WorkflowTaskFailedCause, + failure: Option, + ) -> Result { + let request = RespondWorkflowTaskFailedRequest { + task_token, + cause: cause as i32, + failure, + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), + namespace: self.opts.namespace.to_string(), + }; + Ok(self + .service + .clone() + .respond_workflow_task_failed(request) + .await? + .into_inner()) + } +} + #[async_trait::async_trait] impl StartWorkflowExecutionApi for ServerGateway { async fn start_workflow( @@ -183,6 +217,17 @@ mockall::mock! { impl RespondWorkflowTaskCompletedApi for ServerGateway { async fn complete_workflow_task(&self, task_token: Vec, commands: Vec) -> Result; } + + #[async_trait::async_trait] + impl RespondWorkflowTaskFailedApi for ServerGateway { + async fn fail_workflow_task( + &self, + task_token: Vec, + cause: WorkflowTaskFailedCause, + failure: Option, + ) -> Result; + } + #[async_trait::async_trait] impl StartWorkflowExecutionApi for ServerGateway { async fn start_workflow( diff --git a/src/protos/mod.rs b/src/protos/mod.rs index ed8995319..567c8ea72 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -11,6 +11,8 @@ pub mod coresdk { include!("coresdk.rs"); use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; + use super::temporal::api::enums::v1::WorkflowTaskFailedCause; + use super::temporal::api::failure::v1::Failure; use command::Variant; pub type HistoryEventId = i64; @@ -76,6 +78,20 @@ pub mod coresdk { })), } } + + pub fn fail(task_token: Vec, cause: WorkflowTaskFailedCause, failure: Failure) -> Self { + Self { + task_token, + variant: Some(task_completion::Variant::Workflow(WfActivationCompletion { + status: Some(wf_activation_completion::Status::Failed( + WfActivationFailure { + cause: cause as i32, + failure: Some(failure), + }, + )), + })), + } + } } } diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index b2b43341f..317ede059 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -1,5 +1,6 @@ use crate::machines::test_help::TestHistoryBuilder; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; +use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ history_event, TimerCanceledEventAttributes, TimerFiredEventAttributes, }; @@ -114,20 +115,37 @@ pub fn parallel_timer(timer1: &str, timer2: &str) -> TestHistoryBuilder { /// 9: EVENT_TYPE_WORKFLOW_TASK_FAILED /// 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED /// 11: EVENT_TYPE_WORKFLOW_TASK_STARTED -pub fn workflow_fails_after_timer(timer_id: &str, original_run_id: &str) -> TestHistoryBuilder { - let mut t = TestHistoryBuilder::default(); - t.add_by_type(EventType::WorkflowExecutionStarted); - t.add_full_wf_task(); - let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); - t.add( - EventType::TimerFired, - history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { - started_event_id: timer_started_event_id, - timer_id: timer_id.to_string(), - }), - ); +pub fn workflow_fails_with_reset_after_timer( + timer_id: &str, + original_run_id: &str, +) -> TestHistoryBuilder { + let mut t = single_timer(timer_id); + t.add_workflow_task_failed_new_id(WorkflowTaskFailedCause::ResetWorkflow, original_run_id); + t.add_workflow_task_scheduled_and_started(); - t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, original_run_id); + t +} + +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_TIMER_STARTED +/// 6: EVENT_TYPE_TIMER_FIRED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 9: EVENT_TYPE_WORKFLOW_TASK_FAILED +/// 10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 11: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBuilder { + let mut t = single_timer(timer_id); + t.add_workflow_task_failed_with_failure( + WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure, + Failure { + message: "boom".to_string(), + ..Default::default() + }, + ); t.add_workflow_task_scheduled_and_started(); t diff --git a/src/workflow/concurrency_manager.rs b/src/workflow/concurrency_manager.rs index a356fac83..ac01572c9 100644 --- a/src/workflow/concurrency_manager.rs +++ b/src/workflow/concurrency_manager.rs @@ -19,9 +19,6 @@ use tracing::Level; /// managed by this struct. We could make this generic for any collection of things which need /// to live on one thread, if desired. pub(crate) struct WorkflowConcurrencyManager { - // TODO: We need to remove things from here at some point, but that wasn't implemented - // in core SDK yet either - once we're ready to remove things, they can be removed from this - // map and the wfm thread will drop the machines. machines: DashMap, wf_thread: Mutex>>, machine_creator: Sender, @@ -130,6 +127,11 @@ impl WorkflowConcurrencyManager { .expect("Workflow manager thread should shut down cleanly"); } + /// Remove the workflow with the provided run id from management + pub fn evict(&self, run_id: &str) { + self.machines.remove(run_id); + } + /// The implementation of the dedicated thread workflow managers live on fn workflow_thread(create_rcv: Receiver, shutdown_rx: Receiver) { let mut machine_rcvs: Vec<(MachineMutationReceiver, WorkflowManager)> = vec![]; diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 52cacd017..d6741dff6 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -9,10 +9,12 @@ use crate::{ protos::{ coresdk::WfActivation, temporal::api::{ + enums::v1::WorkflowTaskFailedCause, + failure::v1::Failure, history::v1::History, workflowservice::v1::{ PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, - StartWorkflowExecutionResponse, + RespondWorkflowTaskFailedResponse, StartWorkflowExecutionResponse, }, }, }, @@ -31,8 +33,8 @@ pub trait PollWorkflowTaskQueueApi { async fn poll_workflow_task(&self, task_queue: &str) -> Result; } -/// Implementors can complete tasks as would've been issued by [Core::poll]. The real implementor -/// sends the completed tasks to the server. +/// Implementors can complete tasks issued by [Core::poll]. The real implementor sends the completed +/// tasks to the server. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] pub trait RespondWorkflowTaskCompletedApi { @@ -46,6 +48,21 @@ pub trait RespondWorkflowTaskCompletedApi { ) -> Result; } +/// Implementors can fail workflow tasks issued by [Core::poll]. The real implementor sends the +/// failed tasks to the server. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait RespondWorkflowTaskFailedApi { + /// Fail task by sending the failure to the server. `task_token` is the task token that would've + /// been received from [PollWorkflowTaskQueueApi::poll]. + async fn fail_workflow_task( + &self, + task_token: Vec, + cause: WorkflowTaskFailedCause, + failure: Option, + ) -> Result; +} + /// Implementors should send StartWorkflowExecutionRequest to the server and pass the response back. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] @@ -90,13 +107,17 @@ impl WorkflowManager { let (wfb, cmd_sink) = WorkflowBridge::new(); let state_machines = WorkflowMachines::new(we.workflow_id, we.run_id, Box::new(wfb)); - Ok(Self { + // TODO: Combine stuff + let task_hist = HistoryInfo::new_from_history(&history, Some(1))?; + let mut retme = Self { machines: state_machines, command_sink: cmd_sink, last_history_task_count: history.get_workflow_task_count(None)?, last_history_from_server: history, current_wf_task_num: 1, - }) + }; + retme.machines.apply_history_events(&task_hist)?; + Ok(retme) } } @@ -142,6 +163,9 @@ impl WorkflowManager { self.current_wf_task_num += 1; let more_activations_needed = self.current_wf_task_num <= self.last_history_task_count; + if more_activations_needed { + event!(Level::DEBUG, msg = "More activations needed"); + } Ok(NextWfActivation { activation, @@ -149,29 +173,3 @@ impl WorkflowManager { }) } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::{ - protos::temporal::api::common::v1::WorkflowExecution, test_help::canned_histories, - }; - use rand::{thread_rng, Rng}; - - #[test] - fn full_history_application() { - let t = canned_histories::single_timer("fake_timer"); - let task_token: [u8; 16] = thread_rng().gen(); - let pwtqr = PollWorkflowTaskQueueResponse { - history: Some(t.as_history()), - workflow_execution: Some(WorkflowExecution { - workflow_id: "wfid".to_string(), - run_id: "runid".to_string(), - }), - task_token: task_token.to_vec(), - ..Default::default() - }; - let mut wfm = WorkflowManager::new(pwtqr).unwrap(); - wfm.get_next_activation().unwrap(); - } -} diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs deleted file mode 100644 index 8b1378917..000000000 --- a/tests/integ_tests/poller_test.rs +++ /dev/null @@ -1 +0,0 @@ - diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 6de090128..e0f1b7a5f 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -10,6 +10,8 @@ use std::{ }, time::Duration, }; +use temporal_sdk_core::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; +use temporal_sdk_core::protos::temporal::api::failure::v1::Failure; use temporal_sdk_core::{ protos::{ coresdk::{ @@ -156,19 +158,7 @@ fn parallel_timer_workflow() { #[test] fn timer_cancel_workflow() { let task_q = "timer_cancel_workflow"; - let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { - Ok(addr) => addr, - Err(_) => "http://localhost:7233".to_owned(), - }; - let url = Url::try_from(&*temporal_server_address).unwrap(); - let gateway_opts = ServerGatewayOptions { - namespace: NAMESPACE.to_string(), - identity: "none".to_string(), - worker_binary_id: "".to_string(), - long_poll_timeout: Duration::from_secs(60), - target_url: url, - }; - let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let core = get_integ_core(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); dbg!(create_workflow( @@ -215,19 +205,7 @@ fn timer_cancel_workflow() { #[test] fn timer_immediate_cancel_workflow() { let task_q = "timer_cancel_workflow"; - let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { - Ok(addr) => addr, - Err(_) => "http://localhost:7233".to_owned(), - }; - let url = Url::try_from(&*temporal_server_address).unwrap(); - let gateway_opts = ServerGatewayOptions { - namespace: NAMESPACE.to_string(), - identity: "none".to_string(), - worker_binary_id: "".to_string(), - long_poll_timeout: Duration::from_secs(60), - target_url: url, - }; - let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let core = get_integ_core(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); create_workflow(&core, task_q, &workflow_id.to_string(), None); @@ -317,3 +295,60 @@ fn parallel_workflows_same_queue() { handles.into_iter().for_each(|h| h.join().unwrap()); } + +#[test] +fn fail_wf_task() { + let task_q = "fail_wf_task"; + let core = get_integ_core(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + + // Start with a timer + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: "best-timer".to_string(), + start_to_fire_timeout: Some(Duration::from_millis(200).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + + // Allow timer to fire + std::thread::sleep(Duration::from_millis(500)); + + // Then break for whatever reason + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::fail( + task.task_token, + WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure, + Failure { + message: "I did an oopsie".to_string(), + ..Default::default() + }, + )) + .unwrap(); + + // The server will want to retry the task. This time we finish the workflow -- but we need + // to poll a couple of times as there will be more than one required workflow activation. + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: "best-timer".to_string(), + start_to_fire_timeout: Some(Duration::from_millis(200).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task.task_token, + )) + .unwrap(); +} From 5013ac644893aa9b41103b1b9e75c5c203849297 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Mar 2021 21:05:49 -0800 Subject: [PATCH 02/11] Add assertions in UT and docstring for PendingActivities --- src/lib.rs | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index b9a3af349..f5ce3e48f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,16 +23,16 @@ mod test_help; pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; -use crate::machines::WFMachinesError; -use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; use crate::{ - machines::{InconvertibleCommandError, WFCommand}, + machines::{InconvertibleCommandError, WFCommand, WFMachinesError}, protos::{ coresdk::{ task_completion, wf_activation_completion::Status, Task, TaskCompletion, WfActivationCompletion, WfActivationSuccess, }, - temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse, + temporal::api::{ + enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse, + }, }, protosext::HistoryInfoError, workflow::{NextWfActivation, WorkflowConcurrencyManager}, @@ -138,6 +138,8 @@ where shutdown_requested: AtomicBool, } +/// Tracks pending activations using an internal queue, while also allowing fast lookup of any +/// pending activations by run ID #[derive(Default)] struct PendingActivations { queue: SegQueue, @@ -736,7 +738,12 @@ mod test { .unwrap(); let res = core.poll_task(TASK_Q).unwrap(); - // TODO: assertions + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow(_)), + }] + ); // Need to re-issue the start timer command (we are replaying) core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { @@ -749,6 +756,12 @@ mod test { .unwrap(); // Now we may complete the workflow let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_)), + }] + ); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], res.task_token, From 29eabdb757e5d1f3b8594ef6cbac1b32bac7405b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Mar 2021 21:41:06 -0800 Subject: [PATCH 03/11] Give PendingActivations its own module --- src/lib.rs | 51 ++------------------------------------ src/pending_activations.rs | 42 +++++++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 49 deletions(-) create mode 100644 src/pending_activations.rs diff --git a/src/lib.rs b/src/lib.rs index f5ce3e48f..3a64f136d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ pub mod protos; pub(crate) mod core_tracing; mod machines; +mod pending_activations; mod pollers; mod protosext; mod workflow; @@ -25,6 +26,7 @@ pub use url::Url; use crate::{ machines::{InconvertibleCommandError, WFCommand, WFMachinesError}, + pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ task_completion, wf_activation_completion::Status, Task, TaskCompletion, @@ -37,7 +39,6 @@ use crate::{ protosext::HistoryInfoError, workflow::{NextWfActivation, WorkflowConcurrencyManager}, }; -use crossbeam::queue::SegQueue; use dashmap::DashMap; use std::{ convert::TryInto, @@ -110,14 +111,6 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } -/// Type of task queue to poll. -pub enum TaskQueue { - /// Workflow task - Workflow(String), - /// Activity task - _Activity(String), -} - struct CoreSDK where WP: ServerGatewayApis + 'static, @@ -138,46 +131,6 @@ where shutdown_requested: AtomicBool, } -/// Tracks pending activations using an internal queue, while also allowing fast lookup of any -/// pending activations by run ID -#[derive(Default)] -struct PendingActivations { - queue: SegQueue, - count_by_id: DashMap, -} - -impl PendingActivations { - pub fn push(&self, v: PendingActivation) { - *self - .count_by_id - .entry(v.run_id.clone()) - .or_insert_with(|| 0) - .value_mut() += 1; - self.queue.push(v); - } - - pub fn pop(&self) -> Option { - let rme = self.queue.pop(); - if let Some(pa) = &rme { - if let Some(mut c) = self.count_by_id.get_mut(&pa.run_id) { - *c.value_mut() -= 1 - } - self.count_by_id.remove_if(&pa.run_id, |_, v| v <= &0); - } - rme - } - - pub fn has_pending(&self, run_id: &str) -> bool { - self.count_by_id.contains_key(run_id) - } -} - -#[derive(Debug)] -struct PendingActivation { - run_id: String, - task_token: Vec, -} - impl Core for CoreSDK where WP: ServerGatewayApis + Send + Sync, diff --git a/src/pending_activations.rs b/src/pending_activations.rs new file mode 100644 index 000000000..51109cce7 --- /dev/null +++ b/src/pending_activations.rs @@ -0,0 +1,42 @@ +use crossbeam::queue::SegQueue; +use dashmap::DashMap; + +/// Tracks pending activations using an internal queue, while also allowing fast lookup of any +/// pending activations by run ID +#[derive(Default)] +pub struct PendingActivations { + queue: SegQueue, + count_by_id: DashMap, +} + +impl PendingActivations { + pub fn push(&self, v: PendingActivation) { + *self + .count_by_id + .entry(v.run_id.clone()) + .or_insert_with(|| 0) + .value_mut() += 1; + self.queue.push(v); + } + + pub fn pop(&self) -> Option { + let rme = self.queue.pop(); + if let Some(pa) = &rme { + if let Some(mut c) = self.count_by_id.get_mut(&pa.run_id) { + *c.value_mut() -= 1 + } + self.count_by_id.remove_if(&pa.run_id, |_, v| v <= &0); + } + rme + } + + pub fn has_pending(&self, run_id: &str) -> bool { + self.count_by_id.contains_key(run_id) + } +} + +#[derive(Debug)] +pub struct PendingActivation { + pub run_id: String, + pub task_token: Vec, +} From 8476d01491959bd25c8f05e3341ede15dbd2380e Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Mar 2021 10:16:32 -0800 Subject: [PATCH 04/11] Add support for FailWFExecution --- src/lib.rs | 47 ++++++- .../complete_workflow_state_machine.rs | 4 +- src/machines/fail_workflow_state_machine.rs | 123 ++++++++++++++++-- src/machines/mod.rs | 6 +- src/machines/workflow_machines.rs | 8 +- src/protos/mod.rs | 4 + tests/integ_tests/simple_wf_tests.rs | 34 +++++ 7 files changed, 202 insertions(+), 24 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3a64f136d..91b5f2daa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -341,6 +341,7 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; + use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; use crate::{ machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ @@ -365,7 +366,7 @@ mod test { const TASK_Q: &str = "test-task-queue"; const RUN_ID: &str = "fake_run_id"; - #[fixture(hist_batches=&[])] + #[fixture(hist_batches = & [])] fn single_timer_setup(hist_batches: &[usize]) -> FakeCore { let wfid = "fake_wf_id"; @@ -374,8 +375,8 @@ mod test { } #[rstest(core, - case::incremental(single_timer_setup(&[1, 2])), - case::replay(single_timer_setup(&[2])) + case::incremental(single_timer_setup(& [1, 2])), + case::replay(single_timer_setup(& [2])) )] fn single_timer_test_across_wf_bridge(core: FakeCore) { let res = core.poll_task(TASK_Q).unwrap(); @@ -413,7 +414,7 @@ mod test { .unwrap(); } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; @@ -478,7 +479,7 @@ mod test { .unwrap(); } - #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; @@ -537,7 +538,7 @@ mod test { .unwrap(); } - #[rstest(single_timer_setup(&[1]))] + #[rstest(single_timer_setup(& [1]))] fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) { let res = single_timer_setup.poll_task(TASK_Q).unwrap(); assert_eq!(res.get_wf_jobs().len(), 1); @@ -721,4 +722,38 @@ mod test { )) .unwrap(); } + + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + fn simple_timer_fail_wf_execution(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let timer_id = "timer1"; + + let mut t = canned_histories::single_timer(timer_id); + let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + + let res = core.poll_task(TASK_Q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into()], + res.task_token, + )) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![FailWorkflowExecutionCommandAttributes { + failure: Some(Failure { + message: "I'm ded".to_string(), + ..Default::default() + }), + } + .into()], + res.task_token, + )) + .unwrap(); + } } diff --git a/src/machines/complete_workflow_state_machine.rs b/src/machines/complete_workflow_state_machine.rs index 9b31dc5a7..72b76d9bf 100644 --- a/src/machines/complete_workflow_state_machine.rs +++ b/src/machines/complete_workflow_state_machine.rs @@ -54,11 +54,11 @@ impl CompleteWorkflowMachine { }; let cmd = match s .on_event_mut(CompleteWorkflowMachineEvents::Schedule) - .expect("Scheduling timers doesn't fail") + .expect("Scheduling complete wf machines doesn't fail") .pop() { Some(CompleteWFCommand::AddCommand(c)) => c, - _ => panic!("Timer on_schedule must produce command"), + _ => panic!("complete wf machine on_schedule must produce command"), }; (s, cmd) } diff --git a/src/machines/fail_workflow_state_machine.rs b/src/machines/fail_workflow_state_machine.rs index 194a88cb8..db3751989 100644 --- a/src/machines/fail_workflow_state_machine.rs +++ b/src/machines/fail_workflow_state_machine.rs @@ -1,36 +1,131 @@ -use rustfsm::{fsm, TransitionResult}; +use crate::{ + machines::{ + workflow_machines::MachineResponse, Cancellable, NewMachineWithCommand, ProtoCommand, + WFMachinesAdapter, WFMachinesError, + }, + protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes, + protos::temporal::api::enums::v1::CommandType, + protos::temporal::api::enums::v1::EventType, + protos::temporal::api::history::v1::HistoryEvent, +}; +use rustfsm::{fsm, StateMachine, TransitionResult}; +use std::convert::TryFrom; fsm! { - pub(super) name FailWorkflowMachine; command FailWorkflowCommand; error FailWorkflowMachineError; + pub(super) name FailWorkflowMachine; + command FailWFCommand; + error WFMachinesError; + shared_state FailWorkflowExecutionCommandAttributes; - Created --(Schedule, on_schedule) --> FailWorkflowCommandCreated; + Created --(Schedule, shared on_schedule) --> FailWorkflowCommandCreated; FailWorkflowCommandCreated --(CommandFailWorkflowExecution) --> FailWorkflowCommandCreated; - FailWorkflowCommandCreated --(WorkflowExecutionFailed, on_workflow_execution_failed) --> FailWorkflowCommandRecorded; + FailWorkflowCommandCreated --(WorkflowExecutionFailed) --> FailWorkflowCommandRecorded; } -#[derive(thiserror::Error, Debug)] -pub(super) enum FailWorkflowMachineError {} +#[derive(Debug)] +pub(super) enum FailWFCommand { + AddCommand(ProtoCommand), +} + +/// Fail a workflow +pub(super) fn fail_workflow( + attribs: FailWorkflowExecutionCommandAttributes, +) -> NewMachineWithCommand { + let (machine, add_cmd) = FailWorkflowMachine::new_scheduled(attribs); + NewMachineWithCommand { + command: add_cmd, + machine, + } +} -pub(super) enum FailWorkflowCommand {} +impl FailWorkflowMachine { + /// Create a new WF machine and schedule it + pub(crate) fn new_scheduled( + attribs: FailWorkflowExecutionCommandAttributes, + ) -> (Self, ProtoCommand) { + let mut s = Self { + state: Created {}.into(), + shared_state: attribs, + }; + let cmd = match s + .on_event_mut(FailWorkflowMachineEvents::Schedule) + .expect("Scheduling fail wf machines doesn't fail") + .pop() + { + Some(FailWFCommand::AddCommand(c)) => c, + _ => panic!("Fail wf machine on_schedule must produce command"), + }; + (s, cmd) + } +} #[derive(Default, Clone)] pub(super) struct Created {} impl Created { - pub(super) fn on_schedule(self) -> FailWorkflowMachineTransition { - unimplemented!() + pub(super) fn on_schedule( + self, + dat: FailWorkflowExecutionCommandAttributes, + ) -> FailWorkflowMachineTransition { + let cmd = ProtoCommand { + command_type: CommandType::FailWorkflowExecution as i32, + attributes: Some(dat.into()), + }; + TransitionResult::commands::<_, FailWorkflowCommandCreated>(vec![ + FailWFCommand::AddCommand(cmd), + ]) } } #[derive(Default, Clone)] pub(super) struct FailWorkflowCommandCreated {} -impl FailWorkflowCommandCreated { - pub(super) fn on_workflow_execution_failed(self) -> FailWorkflowMachineTransition { - unimplemented!() +#[derive(Default, Clone)] +pub(super) struct FailWorkflowCommandRecorded {} + +impl From for FailWorkflowCommandRecorded { + fn from(_: FailWorkflowCommandCreated) -> Self { + Self::default() } } -#[derive(Default, Clone)] -pub(super) struct FailWorkflowCommandRecorded {} +impl TryFrom for FailWorkflowMachineEvents { + type Error = WFMachinesError; + + fn try_from(e: HistoryEvent) -> Result { + Ok(match EventType::from_i32(e.event_type) { + Some(EventType::WorkflowExecutionFailed) => Self::WorkflowExecutionFailed, + _ => { + return Err(WFMachinesError::UnexpectedEvent( + e, + "Fail workflow machine does not handle this event", + )) + } + }) + } +} + +impl TryFrom for FailWorkflowMachineEvents { + type Error = (); + + fn try_from(c: CommandType) -> Result { + Ok(match c { + CommandType::FailWorkflowExecution => Self::CommandFailWorkflowExecution, + _ => return Err(()), + }) + } +} + +impl WFMachinesAdapter for FailWorkflowMachine { + fn adapt_response( + &self, + _event: &HistoryEvent, + _has_next_event: bool, + _my_command: FailWFCommand, + ) -> Result, WFMachinesError> { + Ok(vec![]) + } +} + +impl Cancellable for FailWorkflowMachine {} diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 2985baaac..00746bd26 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -12,7 +12,6 @@ mod child_workflow_state_machine; mod complete_workflow_state_machine; #[allow(unused)] mod continue_as_new_workflow_state_machine; -#[allow(unused)] mod fail_workflow_state_machine; #[allow(unused)] mod local_activity_state_machine; @@ -34,6 +33,7 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; +use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; use crate::{ machines::workflow_machines::MachineResponse, protos::{ @@ -99,6 +99,7 @@ pub enum WFCommand { AddTimer(StartTimerCommandAttributes), CancelTimer(CancelTimerCommandAttributes), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), + FailWorkflow(FailWorkflowExecutionCommandAttributes), } #[derive(thiserror::Error, Debug, derive_more::From)] @@ -119,6 +120,9 @@ impl TryFrom for WFCommand { Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { Ok(WFCommand::CompleteWorkflow(c)) } + Attributes::FailWorkflowExecutionCommandAttributes(s) => { + Ok(WFCommand::FailWorkflow(s)) + } _ => unimplemented!(), }, _ => Err(c.into()), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1223b6875..39372e112 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::machines::fail_workflow_state_machine::fail_workflow; use crate::protos::coresdk::wf_activation_job; use crate::{ machines::{ @@ -59,7 +60,7 @@ pub(crate) struct WorkflowMachines { /// the server. commands: VecDeque, /// Commands generated by the currently processing workflow task, which will eventually be - /// transferred to `commands` + /// transferred to `commands` (and hence eventually sent to the server) /// /// Old note: It is a queue as commands can be added (due to marker based commands) while /// iterating over already added commands. @@ -555,6 +556,11 @@ impl WorkflowMachines { let cwfm = self.add_new_machine(complete_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); } + WFCommand::FailWorkflow(attrs) => { + // TODO: Do we blow up cache here? I don't think so? + let cwfm = self.add_new_machine(fail_workflow(attrs)); + self.current_wf_task_commands.push_back(cwfm); + } WFCommand::NoCommandsFromLang => (), } } diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 567c8ea72..7810b80da 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -122,6 +122,10 @@ pub mod temporal { command_type: CommandType::CompleteWorkflowExecution as i32, attributes: Some(a), }, + a @ Attributes::FailWorkflowExecutionCommandAttributes(_) => Self { + command_type: CommandType::FailWorkflowExecution as i32, + attributes: Some(a), + }, _ => unimplemented!(), } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index e0f1b7a5f..514a8536d 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -10,6 +10,7 @@ use std::{ }, time::Duration, }; +use temporal_sdk_core::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; use temporal_sdk_core::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; use temporal_sdk_core::protos::temporal::api::failure::v1::Failure; use temporal_sdk_core::{ @@ -352,3 +353,36 @@ fn fail_wf_task() { )) .unwrap(); } + +#[test] +fn fail_workflow_execution() { + let task_q = "fail_workflow_execution"; + let core = get_integ_core(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![FailWorkflowExecutionCommandAttributes { + failure: Some(Failure { + message: "I'm ded".to_string(), + ..Default::default() + }), + } + .into()], + task.task_token, + )) + .unwrap(); +} From 11b5592582f79a02a41d9c73ba16839c7de15258 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Mar 2021 10:19:14 -0800 Subject: [PATCH 05/11] Fix weird accidental formatting change --- src/lib.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 91b5f2daa..ed54455bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -366,7 +366,7 @@ mod test { const TASK_Q: &str = "test-task-queue"; const RUN_ID: &str = "fake_run_id"; - #[fixture(hist_batches = & [])] + #[fixture(hist_batches = &[])] fn single_timer_setup(hist_batches: &[usize]) -> FakeCore { let wfid = "fake_wf_id"; @@ -375,8 +375,8 @@ mod test { } #[rstest(core, - case::incremental(single_timer_setup(& [1, 2])), - case::replay(single_timer_setup(& [2])) + case::incremental(single_timer_setup(&[1, 2])), + case::replay(single_timer_setup(&[2])) )] fn single_timer_test_across_wf_bridge(core: FakeCore) { let res = core.poll_task(TASK_Q).unwrap(); @@ -414,7 +414,7 @@ mod test { .unwrap(); } - #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; @@ -479,7 +479,7 @@ mod test { .unwrap(); } - #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] fn timer_cancel_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; @@ -538,7 +538,7 @@ mod test { .unwrap(); } - #[rstest(single_timer_setup(& [1]))] + #[rstest(single_timer_setup(&[1]))] fn after_shutdown_server_is_not_polled(single_timer_setup: FakeCore) { let res = single_timer_setup.poll_task(TASK_Q).unwrap(); assert_eq!(res.get_wf_jobs().len(), 1); From 2a0b38a70dd31bde5a5d9d615617f7d6a9af8700 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Mar 2021 11:24:02 -0800 Subject: [PATCH 06/11] Add debug output for machines error --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index ed54455bb..4ad506fb5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -322,7 +322,7 @@ pub enum CoreError { UninterpretableCommand(#[from] InconvertibleCommandError), /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), - /// Underlying error in state machines + /// Underlying error in state machines: {0:?} UnderlyingMachinesError(#[from] WFMachinesError), /// Task token had nothing associated with it: {0:?} NothingFoundForTaskToken(Vec), From 76fec2510e6cb162aa941e11ee2502c5c8fc8b86 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Mar 2021 16:58:02 -0800 Subject: [PATCH 07/11] Eliminate TODOs and fixup unneeded extra history application --- src/machines/workflow_machines.rs | 1 - src/workflow/mod.rs | 8 ++------ 2 files changed, 2 insertions(+), 7 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 6cb550126..c3d2d1de0 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -559,7 +559,6 @@ impl WorkflowMachines { self.current_wf_task_commands.push_back(cwfm); } WFCommand::FailWorkflow(attrs) => { - // TODO: Do we blow up cache here? I don't think so? let cwfm = self.add_new_machine(fail_workflow(attrs)); self.current_wf_task_commands.push_back(cwfm); } diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 1436ce4ee..ec79c08df 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -106,17 +106,13 @@ impl WorkflowManager { let (wfb, cmd_sink) = WorkflowBridge::new(); let state_machines = WorkflowMachines::new(we.workflow_id, we.run_id, Box::new(wfb)); - // TODO: Combine stuff - let task_hist = HistoryInfo::new_from_history(&history, Some(1))?; - let mut retme = Self { + Ok(Self { machines: state_machines, command_sink: cmd_sink, last_history_task_count: history.get_workflow_task_count(None)?, last_history_from_server: history, current_wf_task_num: 1, - }; - retme.machines.apply_history_events(&task_hist)?; - Ok(retme) + }) } } From 4084264069fabf3c2fb8a75127491b9cc4ba9a51 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Fri, 5 Mar 2021 13:22:33 -0800 Subject: [PATCH 08/11] Implement signal fn as pushing send signal job --- protos/local/core_interface.proto | 16 ++++++++++++---- src/workflow/driven_workflow.rs | 7 ++++++- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 42ee0438f..862ecdadc 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -13,6 +13,7 @@ import "temporal/api/workflowservice/v1/request_response.proto"; import "temporal/api/taskqueue/v1/message.proto"; import "temporal/api/enums/v1/failed_cause.proto"; import "temporal/api/failure/v1/message.proto"; +import "temporal/api/history/v1/message.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/command/v1/message.proto"; import "temporal/api/query/v1/message.proto"; @@ -69,6 +70,8 @@ message WFActivationJob { QueryWorkflow query_workflow = 5; // A request to cancel the workflow was received. CancelWorkflow cancel_workflow = 6; + // A request to signal the workflow was received. + SignalWorkflow signal_workflow = 7; } } @@ -87,10 +90,6 @@ message StartWorkflow { // will be others - workflow exe started attrs, etc } -message CancelWorkflow { - // TODO: add attributes here -} - message FireTimer { string timer_id = 1; } @@ -107,6 +106,15 @@ message QueryWorkflow { temporal.api.query.v1.WorkflowQuery query = 1; } +message CancelWorkflow { + // TODO: add attributes here +} + +message SignalWorkflow { + // The signal information from the workflow's history + temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes signal = 1; +} + message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; diff --git a/src/workflow/driven_workflow.rs b/src/workflow/driven_workflow.rs index 41a77182f..4decbee1e 100644 --- a/src/workflow/driven_workflow.rs +++ b/src/workflow/driven_workflow.rs @@ -1,3 +1,4 @@ +use crate::protos::coresdk::SignalWorkflow; use crate::{ machines::WFCommand, protos::coresdk::wf_activation_job, @@ -53,7 +54,11 @@ impl DrivenWorkflow { } /// Signal the workflow - pub fn _signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} + pub fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes) { + self.send_job(wf_activation_job::Variant::SignalWorkflow(SignalWorkflow { + signal: Some(attribs), + })) + } /// Cancel the workflow pub fn _cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {} From ab81d982cdddc11ebf811fc12183a2da62171668 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Mar 2021 13:38:46 -0800 Subject: [PATCH 09/11] Tests working --- src/lib.rs | 27 +++++++ src/machines/test_help/history_builder.rs | 11 +++ src/machines/test_help/workflow_driver.rs | 2 + src/test_help/canned_histories.rs | 32 ++++++++ tests/integ_tests/simple_wf_tests.rs | 93 ++++++++++++++++++++--- 5 files changed, 156 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ed274b871..4c9b62d5c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -766,4 +766,31 @@ mod test { )) .unwrap(); } + + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + fn two_signals(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + + let mut t = canned_histories::two_signals("sig1", "sig2"); + let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + + let res = core.poll_task(TASK_Q).unwrap(); + // Task is completed with no commands + core.complete_task(TaskCompletion::ok_from_api_attrs(vec![], res.task_token)) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + } + ] + ); + } } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index e1f6874a9..68b594945 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,5 +1,7 @@ use super::Result; +use crate::protos::temporal::api::common::v1::{Payload, Payloads}; use crate::protos::temporal::api::failure::v1::Failure; +use crate::protos::temporal::api::history::v1::WorkflowExecutionSignaledEventAttributes; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -128,6 +130,15 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into()); } + pub fn add_we_signaled(&mut self, signal_name: &str, payloads: Vec) { + let attrs = WorkflowExecutionSignaledEventAttributes { + signal_name: signal_name.to_string(), + input: Some(Payloads { payloads }), + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowExecutionSignaled, attrs.into()); + } + pub fn as_history(&self) -> History { History { events: self.events.clone(), diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index ce6760a00..443b66d06 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -79,6 +79,8 @@ where // to re-run the workflow again. // TODO: This would be better to solve by actually pausing the workflow properly rather // than doing the re-run the whole thing every time deal. + // TODO: Probably screws up signals edge case where signal rcvd after complete makes it to + // server if self.sent_final_execution { return vec![]; } diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 317ede059..9ac08c3ec 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -1,4 +1,5 @@ use crate::machines::test_help::TestHistoryBuilder; +use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ @@ -150,3 +151,34 @@ pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBui t.add_workflow_task_scheduled_and_started(); t } + +/// First signal's payload is "hello " and second is "world" (no metadata for either) +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn two_signals(sig_1_id: &str, sig_2_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_we_signaled( + sig_1_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_we_signaled( + sig_2_id, + vec![Payload { + metadata: Default::default(), + data: b"world".to_vec(), + }], + ); + t.add_workflow_task_scheduled_and_started(); + t +} diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 514a8536d..12023a9ff 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -10,21 +10,25 @@ use std::{ }, time::Duration, }; -use temporal_sdk_core::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; -use temporal_sdk_core::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; -use temporal_sdk_core::protos::temporal::api::failure::v1::Failure; use temporal_sdk_core::{ protos::{ coresdk::{ wf_activation_job, FireTimer, StartWorkflow, Task, TaskCompletion, WfActivationJob, }, - temporal::api::command::v1::{ - CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, - StartTimerCommandAttributes, + temporal::api::{ + command::v1::{ + CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, + FailWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + common::v1::WorkflowExecution, + enums::v1::WorkflowTaskFailedCause, + failure::v1::Failure, + workflowservice::v1::SignalWorkflowExecutionRequest, }, }, Core, CoreInitOptions, ServerGatewayOptions, Url, }; +use tokio::runtime::Runtime; // TODO: These tests can get broken permanently if they break one time and the server is not // restarted, because pulling from the same task queue produces tasks for the previous failed @@ -55,19 +59,23 @@ async fn create_workflow( .run_id } -fn get_integ_core() -> impl Core { +fn get_integ_server_options() -> ServerGatewayOptions { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { Ok(addr) => addr, Err(_) => "http://localhost:7233".to_owned(), }; let url = Url::try_from(&*temporal_server_address).unwrap(); - let gateway_opts = ServerGatewayOptions { + ServerGatewayOptions { namespace: NAMESPACE.to_string(), identity: "integ_tester".to_string(), worker_binary_id: "".to_string(), long_poll_timeout: Duration::from_secs(60), target_url: url, - }; + } +} + +fn get_integ_core() -> impl Core { + let gateway_opts = get_integ_server_options(); let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); core } @@ -386,3 +394,70 @@ fn fail_workflow_execution() { )) .unwrap(); } + +#[test] +fn signal_workflow() { + let task_q = "fail_workflow_execution"; + let core = get_integ_core(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + + let signal_id_1 = "signal1"; + let signal_id_2 = "signal2"; + let res = core.poll_task(task_q).unwrap(); + // Task is completed with no commands + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![], + res.task_token.clone(), + )) + .unwrap(); + + // Send the signals to the server + let rt = Runtime::new().unwrap(); + let mut client = rt.block_on(async { get_integ_server_options().connect().await.unwrap() }); + let wfe = WorkflowExecution { + workflow_id: workflow_id.to_string(), + run_id: res.get_run_id().unwrap().to_string(), + }; + rt.block_on(async { + client + .service + .signal_workflow_execution(SignalWorkflowExecutionRequest { + namespace: "default".to_string(), + workflow_execution: Some(wfe.clone()), + signal_name: signal_id_1.to_string(), + ..Default::default() + }) + .await + .unwrap(); + client + .service + .signal_workflow_execution(SignalWorkflowExecutionRequest { + namespace: "default".to_string(), + workflow_execution: Some(wfe), + signal_name: signal_id_2.to_string(), + ..Default::default() + }) + .await + .unwrap(); + }); + + let res = core.poll_task(task_q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + } + ] + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap(); +} From b0b9324680949bb4b88154ca3bb1063e47c0b98f Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Mar 2021 14:00:04 -0800 Subject: [PATCH 10/11] Need unique task queue name for integ test --- tests/integ_tests/simple_wf_tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 12023a9ff..082de2094 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -397,7 +397,7 @@ fn fail_workflow_execution() { #[test] fn signal_workflow() { - let task_q = "fail_workflow_execution"; + let task_q = "signal_workflow"; let core = get_integ_core(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); From 47358eaf3d887c4947b4e8d6637d13d26527b1b1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Mar 2021 15:05:47 -0800 Subject: [PATCH 11/11] Write integ test verifying unhandled signal history --- src/lib.rs | 4 ++ src/pollers/mod.rs | 17 ++++-- tests/integ_tests/simple_wf_tests.rs | 77 +++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4c9b62d5c..edcae0014 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,6 +350,10 @@ pub enum CoreError { InvalidUri(#[from] InvalidUri), /// State machines are missing for the workflow with run id {0}! MissingMachines(String), + /// There exists a pending command in this workflow's history which has not yet been handled. + /// When thrown from complete_task, it means you should poll for a new task, receive a new + /// task token, and complete that task. + UnhandledCommandWhenCompleting, } #[cfg(test)] diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index d9c0238e8..e5080880c 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -19,7 +19,7 @@ use crate::{ PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, RespondWorkflowTaskFailedApi, StartWorkflowExecutionApi, }, - Result, + CoreError, Result, }; use tonic::{transport::Channel, Request, Status}; use url::Url; @@ -140,12 +140,21 @@ impl RespondWorkflowTaskCompletedApi for ServerGateway { namespace: self.opts.namespace.to_string(), ..Default::default() }; - Ok(self + match self .service .clone() .respond_workflow_task_completed(request) - .await? - .into_inner()) + .await + { + Ok(pwtr) => Ok(pwtr.into_inner()), + Err(ts) => { + if ts.code() == tonic::Code::InvalidArgument && ts.message() == "UnhandledCommand" { + Err(CoreError::UnhandledCommandWhenCompleting) + } else { + Err(ts.into()) + } + } + } } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 082de2094..28ddb0828 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -26,7 +26,7 @@ use temporal_sdk_core::{ workflowservice::v1::SignalWorkflowExecutionRequest, }, }, - Core, CoreInitOptions, ServerGatewayOptions, Url, + Core, CoreError, CoreInitOptions, ServerGatewayOptions, Url, }; use tokio::runtime::Runtime; @@ -461,3 +461,78 @@ fn signal_workflow() { )) .unwrap(); } + +#[test] +fn signal_workflow_signal_not_handled_on_workflow_completion() { + let task_q = "signal_workflow_signal_not_handled_on_workflow_completion"; + let core = get_integ_core(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + + let signal_id_1 = "signal1"; + let res = core.poll_task(task_q).unwrap(); + // Task is completed with a timer + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: "sometimer".to_string(), + start_to_fire_timeout: Some(Duration::from_millis(10).into()), + ..Default::default() + } + .into()], + res.task_token.clone(), + )) + .unwrap(); + + // Poll before sending the signal - we should have the timer job + let res = core.poll_task(task_q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_)), + },] + ); + + // Send a signal to the server before we complete the workflow + let rt = Runtime::new().unwrap(); + let mut client = rt.block_on(async { get_integ_server_options().connect().await.unwrap() }); + let wfe = WorkflowExecution { + workflow_id: workflow_id.to_string(), + run_id: res.get_run_id().unwrap().to_string(), + }; + rt.block_on(async { + client + .service + .signal_workflow_execution(SignalWorkflowExecutionRequest { + namespace: "default".to_string(), + workflow_execution: Some(wfe.clone()), + signal_name: signal_id_1.to_string(), + ..Default::default() + }) + .await + .unwrap(); + }); + + // Send completion - not having seen a poll response with a signal in it yet + let unhandled = core + .complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap_err(); + assert_matches!(unhandled, CoreError::UnhandledCommandWhenCompleting); + + // We should get a new task with the signal + let res = core.poll_task(task_q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + },] + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap(); +}