diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 42ee0438f..862ecdadc 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -13,6 +13,7 @@ import "temporal/api/workflowservice/v1/request_response.proto"; import "temporal/api/taskqueue/v1/message.proto"; import "temporal/api/enums/v1/failed_cause.proto"; import "temporal/api/failure/v1/message.proto"; +import "temporal/api/history/v1/message.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/command/v1/message.proto"; import "temporal/api/query/v1/message.proto"; @@ -69,6 +70,8 @@ message WFActivationJob { QueryWorkflow query_workflow = 5; // A request to cancel the workflow was received. CancelWorkflow cancel_workflow = 6; + // A request to signal the workflow was received. + SignalWorkflow signal_workflow = 7; } } @@ -87,10 +90,6 @@ message StartWorkflow { // will be others - workflow exe started attrs, etc } -message CancelWorkflow { - // TODO: add attributes here -} - message FireTimer { string timer_id = 1; } @@ -107,6 +106,15 @@ message QueryWorkflow { temporal.api.query.v1.WorkflowQuery query = 1; } +message CancelWorkflow { + // TODO: add attributes here +} + +message SignalWorkflow { + // The signal information from the workflow's history + temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes signal = 1; +} + message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; diff --git a/src/lib.rs b/src/lib.rs index ed274b871..edcae0014 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -350,6 +350,10 @@ pub enum CoreError { InvalidUri(#[from] InvalidUri), /// State machines are missing for the workflow with run id {0}! MissingMachines(String), + /// There exists a pending command in this workflow's history which has not yet been handled. + /// When thrown from complete_task, it means you should poll for a new task, receive a new + /// task token, and complete that task. + UnhandledCommandWhenCompleting, } #[cfg(test)] @@ -766,4 +770,31 @@ mod test { )) .unwrap(); } + + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] + fn two_signals(hist_batches: &[usize]) { + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + + let mut t = canned_histories::two_signals("sig1", "sig2"); + let core = build_fake_core(wfid, run_id, &mut t, hist_batches); + + let res = core.poll_task(TASK_Q).unwrap(); + // Task is completed with no commands + core.complete_task(TaskCompletion::ok_from_api_attrs(vec![], res.task_token)) + .unwrap(); + + let res = core.poll_task(TASK_Q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + } + ] + ); + } } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index e1f6874a9..68b594945 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,5 +1,7 @@ use super::Result; +use crate::protos::temporal::api::common::v1::{Payload, Payloads}; use crate::protos::temporal::api::failure::v1::Failure; +use crate::protos::temporal::api::history::v1::WorkflowExecutionSignaledEventAttributes; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -128,6 +130,15 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into()); } + pub fn add_we_signaled(&mut self, signal_name: &str, payloads: Vec) { + let attrs = WorkflowExecutionSignaledEventAttributes { + signal_name: signal_name.to_string(), + input: Some(Payloads { payloads }), + ..Default::default() + }; + self.build_and_push_event(EventType::WorkflowExecutionSignaled, attrs.into()); + } + pub fn as_history(&self) -> History { History { events: self.events.clone(), diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index ce6760a00..443b66d06 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -79,6 +79,8 @@ where // to re-run the workflow again. // TODO: This would be better to solve by actually pausing the workflow properly rather // than doing the re-run the whole thing every time deal. + // TODO: Probably screws up signals edge case where signal rcvd after complete makes it to + // server if self.sent_final_execution { return vec![]; } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 9ce136888..22bcb484a 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -345,7 +345,14 @@ impl WorkflowMachines { self.machines_by_event_id.insert(event.event_id, key); } Some(EventType::WorkflowExecutionSignaled) => { - // TODO: Signal callbacks + if let Some(history_event::Attributes::WorkflowExecutionSignaledEventAttributes( + attrs, + )) = &event.attributes + { + self.drive_me.signal(attrs.clone()); + } else { + // err + } } Some(EventType::WorkflowExecutionCancelRequested) => { // TODO: Cancel callbacks diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index d9c0238e8..e5080880c 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -19,7 +19,7 @@ use crate::{ PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, RespondWorkflowTaskFailedApi, StartWorkflowExecutionApi, }, - Result, + CoreError, Result, }; use tonic::{transport::Channel, Request, Status}; use url::Url; @@ -140,12 +140,21 @@ impl RespondWorkflowTaskCompletedApi for ServerGateway { namespace: self.opts.namespace.to_string(), ..Default::default() }; - Ok(self + match self .service .clone() .respond_workflow_task_completed(request) - .await? - .into_inner()) + .await + { + Ok(pwtr) => Ok(pwtr.into_inner()), + Err(ts) => { + if ts.code() == tonic::Code::InvalidArgument && ts.message() == "UnhandledCommand" { + Err(CoreError::UnhandledCommandWhenCompleting) + } else { + Err(ts.into()) + } + } + } } } diff --git a/src/test_help/canned_histories.rs b/src/test_help/canned_histories.rs index 317ede059..9ac08c3ec 100644 --- a/src/test_help/canned_histories.rs +++ b/src/test_help/canned_histories.rs @@ -1,4 +1,5 @@ use crate::machines::test_help::TestHistoryBuilder; +use crate::protos::temporal::api::common::v1::Payload; use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause}; use crate::protos::temporal::api::failure::v1::Failure; use crate::protos::temporal::api::history::v1::{ @@ -150,3 +151,34 @@ pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBui t.add_workflow_task_scheduled_and_started(); t } + +/// First signal's payload is "hello " and second is "world" (no metadata for either) +/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED +/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED +/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED +/// 5: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED +/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED +/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED +pub fn two_signals(sig_1_id: &str, sig_2_id: &str) -> TestHistoryBuilder { + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_full_wf_task(); + t.add_we_signaled( + sig_1_id, + vec![Payload { + metadata: Default::default(), + data: b"hello ".to_vec(), + }], + ); + t.add_we_signaled( + sig_2_id, + vec![Payload { + metadata: Default::default(), + data: b"world".to_vec(), + }], + ); + t.add_workflow_task_scheduled_and_started(); + t +} diff --git a/src/workflow/driven_workflow.rs b/src/workflow/driven_workflow.rs index 41a77182f..4decbee1e 100644 --- a/src/workflow/driven_workflow.rs +++ b/src/workflow/driven_workflow.rs @@ -1,3 +1,4 @@ +use crate::protos::coresdk::SignalWorkflow; use crate::{ machines::WFCommand, protos::coresdk::wf_activation_job, @@ -53,7 +54,11 @@ impl DrivenWorkflow { } /// Signal the workflow - pub fn _signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} + pub fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes) { + self.send_job(wf_activation_job::Variant::SignalWorkflow(SignalWorkflow { + signal: Some(attribs), + })) + } /// Cancel the workflow pub fn _cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {} diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 514a8536d..28ddb0828 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -10,21 +10,25 @@ use std::{ }, time::Duration, }; -use temporal_sdk_core::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes; -use temporal_sdk_core::protos::temporal::api::enums::v1::WorkflowTaskFailedCause; -use temporal_sdk_core::protos::temporal::api::failure::v1::Failure; use temporal_sdk_core::{ protos::{ coresdk::{ wf_activation_job, FireTimer, StartWorkflow, Task, TaskCompletion, WfActivationJob, }, - temporal::api::command::v1::{ - CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, - StartTimerCommandAttributes, + temporal::api::{ + command::v1::{ + CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, + FailWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + common::v1::WorkflowExecution, + enums::v1::WorkflowTaskFailedCause, + failure::v1::Failure, + workflowservice::v1::SignalWorkflowExecutionRequest, }, }, - Core, CoreInitOptions, ServerGatewayOptions, Url, + Core, CoreError, CoreInitOptions, ServerGatewayOptions, Url, }; +use tokio::runtime::Runtime; // TODO: These tests can get broken permanently if they break one time and the server is not // restarted, because pulling from the same task queue produces tasks for the previous failed @@ -55,19 +59,23 @@ async fn create_workflow( .run_id } -fn get_integ_core() -> impl Core { +fn get_integ_server_options() -> ServerGatewayOptions { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { Ok(addr) => addr, Err(_) => "http://localhost:7233".to_owned(), }; let url = Url::try_from(&*temporal_server_address).unwrap(); - let gateway_opts = ServerGatewayOptions { + ServerGatewayOptions { namespace: NAMESPACE.to_string(), identity: "integ_tester".to_string(), worker_binary_id: "".to_string(), long_poll_timeout: Duration::from_secs(60), target_url: url, - }; + } +} + +fn get_integ_core() -> impl Core { + let gateway_opts = get_integ_server_options(); let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); core } @@ -386,3 +394,145 @@ fn fail_workflow_execution() { )) .unwrap(); } + +#[test] +fn signal_workflow() { + let task_q = "signal_workflow"; + let core = get_integ_core(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + + let signal_id_1 = "signal1"; + let signal_id_2 = "signal2"; + let res = core.poll_task(task_q).unwrap(); + // Task is completed with no commands + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![], + res.task_token.clone(), + )) + .unwrap(); + + // Send the signals to the server + let rt = Runtime::new().unwrap(); + let mut client = rt.block_on(async { get_integ_server_options().connect().await.unwrap() }); + let wfe = WorkflowExecution { + workflow_id: workflow_id.to_string(), + run_id: res.get_run_id().unwrap().to_string(), + }; + rt.block_on(async { + client + .service + .signal_workflow_execution(SignalWorkflowExecutionRequest { + namespace: "default".to_string(), + workflow_execution: Some(wfe.clone()), + signal_name: signal_id_1.to_string(), + ..Default::default() + }) + .await + .unwrap(); + client + .service + .signal_workflow_execution(SignalWorkflowExecutionRequest { + namespace: "default".to_string(), + workflow_execution: Some(wfe), + signal_name: signal_id_2.to_string(), + ..Default::default() + }) + .await + .unwrap(); + }); + + let res = core.poll_task(task_q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [ + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + }, + WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + } + ] + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap(); +} + +#[test] +fn signal_workflow_signal_not_handled_on_workflow_completion() { + let task_q = "signal_workflow_signal_not_handled_on_workflow_completion"; + let core = get_integ_core(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + create_workflow(&core, task_q, &workflow_id.to_string(), None); + + let signal_id_1 = "signal1"; + let res = core.poll_task(task_q).unwrap(); + // Task is completed with a timer + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: "sometimer".to_string(), + start_to_fire_timeout: Some(Duration::from_millis(10).into()), + ..Default::default() + } + .into()], + res.task_token.clone(), + )) + .unwrap(); + + // Poll before sending the signal - we should have the timer job + let res = core.poll_task(task_q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::FireTimer(_)), + },] + ); + + // Send a signal to the server before we complete the workflow + let rt = Runtime::new().unwrap(); + let mut client = rt.block_on(async { get_integ_server_options().connect().await.unwrap() }); + let wfe = WorkflowExecution { + workflow_id: workflow_id.to_string(), + run_id: res.get_run_id().unwrap().to_string(), + }; + rt.block_on(async { + client + .service + .signal_workflow_execution(SignalWorkflowExecutionRequest { + namespace: "default".to_string(), + workflow_execution: Some(wfe.clone()), + signal_name: signal_id_1.to_string(), + ..Default::default() + }) + .await + .unwrap(); + }); + + // Send completion - not having seen a poll response with a signal in it yet + let unhandled = core + .complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap_err(); + assert_matches!(unhandled, CoreError::UnhandledCommandWhenCompleting); + + // We should get a new task with the signal + let res = core.poll_task(task_q).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::SignalWorkflow(_)), + },] + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + res.task_token, + )) + .unwrap(); +}