From a78439edfefce422439a2581ba10405d8842cec8 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Feb 2021 16:35:31 -0800 Subject: [PATCH 01/16] Creating new UT, want to dedupe next --- src/lib.rs | 152 +++++++++++++++++- src/protos/mod.rs | 6 +- .../{poller_test.rs => simple_wf_tests.rs} | 44 +++++ tests/main.rs | 2 +- 4 files changed, 194 insertions(+), 10 deletions(-) rename tests/integ_tests/{poller_test.rs => simple_wf_tests.rs} (58%) diff --git a/src/lib.rs b/src/lib.rs index 8a5aaa36e..4f3a5a658 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -285,6 +285,7 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; + use crate::protos::coresdk::TimerFiredTaskAttributes; use crate::{ machines::test_help::TestHistoryBuilder, pollers::MockServerGateway, @@ -304,7 +305,7 @@ mod test { use tracing::Level; #[test] - fn workflow_bridge() { + fn timer_test_accross_wf_bridge() { let s = span!(Level::DEBUG, "Test start"); let _enter = s.enter(); @@ -389,15 +390,14 @@ mod test { let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( - StartTimerCommandAttributes { + vec![StartTimerCommandAttributes { timer_id: timer_id.to_string(), ..Default::default() } - .into(), + .into()], task_tok, )) .unwrap(); - dbg!("sent completion w/ start timer"); let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo @@ -409,10 +409,150 @@ mod test { ); let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } + + #[test] + fn parallel_timer_test_accross_wf_bridge() { + let s = span!(Level::DEBUG, "Test start"); + let _enter = s.enter(); + + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let timer_1_id = "timer1".to_string(); + let timer_2_id = "timer2".to_string(); + let task_queue = "test-task-queue"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + let timer_2_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_1_id.clone(), + }), + ); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_2_started_event_id, + timer_id: timer_2_id.clone(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + --- + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_STARTED + 7: EVENT_TYPE_TIMER_FIRED + 8: EVENT_TYPE_TIMER_FIRED + 9: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 10: EVENT_TYPE_WORKFLOW_TASK_STARTED + --- + */ + let events_first_batch = t.get_history_info(1).unwrap().events; + let wf = Some(WorkflowExecution { + workflow_id: wfid.to_string(), + run_id: run_id.to_string(), + }); + let first_response = PollWorkflowTaskQueueResponse { + history: Some(History { + events: events_first_batch, + }), + workflow_execution: wf.clone(), + ..Default::default() + }; + let events_second_batch = t.get_history_info(2).unwrap().events; + let second_response = PollWorkflowTaskQueueResponse { + history: Some(History { + events: events_second_batch, + }), + workflow_execution: wf, + ..Default::default() + }; + let responses = vec![first_response, second_response]; + + let mut tasks = VecDeque::from(responses); + let mut mock_gateway = MockServerGateway::new(); + mock_gateway + .expect_poll_workflow_task() + .returning(move |_| Ok(tasks.pop_front().unwrap())); + // Response not really important here + mock_gateway + .expect_complete_workflow_task() + .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); + + let runtime = Runtime::new().unwrap(); + let core = CoreSDK { + runtime, + server_gateway: Arc::new(mock_gateway), + workflow_machines: DashMap::new(), + workflow_task_tokens: DashMap::new(), + }; + + let res = core.poll_task(task_queue).unwrap(); + // TODO: uggo + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] + ); + assert!(core.workflow_machines.get(run_id).is_some()); + + let task_tok = res.task_token; + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + vec![ + StartTimerCommandAttributes { + timer_id: timer_1_id.clone(), + ..Default::default() + } + .into(), + StartTimerCommandAttributes { + timer_id: timer_2_id.clone(), + ..Default::default() + } + .into(), + ], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(task_queue).unwrap(); + // TODO: uggo + assert_matches!( + res.get_wf_jobs().as_slice(), + [ + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t1_id } + )), + }, + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t2_id } + )), + } + ] => { + assert_eq!(t1_id, &timer_1_id); + assert_eq!(t2_id, &timer_2_id); + } + ); + let task_tok = res.task_token; + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task_tok, )) .unwrap(); - dbg!("sent workflow done"); } } diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 47232bd12..57650f312 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -60,11 +60,11 @@ pub mod coresdk { impl CompleteTaskReq { /// Build a successful completion from some api command attributes and a task token pub fn ok_from_api_attrs( - cmd: api_command::command::Attributes, + cmds: Vec, task_token: Vec, ) -> Self { - let cmd: ApiCommand = cmd.into(); - let success: WfActivationSuccess = vec![cmd].into(); + let cmds: Vec = cmds.into_iter().map(Into::into).collect(); + let success: WfActivationSuccess = cmds.into(); CompleteTaskReq { task_token, completion: Some(Completion::Workflow(WfActivationCompletion { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/simple_wf_tests.rs similarity index 58% rename from tests/integ_tests/poller_test.rs rename to tests/integ_tests/simple_wf_tests.rs index d0d5ef1a4..e89033859 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -66,3 +66,47 @@ fn timer_workflow() { run_id ); } + +#[test] +fn parallel_timer_workflow() { + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let url = Url::try_from(&*temporal_server_address).unwrap(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + }; + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + let run_id = dbg!(create_workflow(&core, &workflow_id.to_string())); + let timer_id: String = rng.gen::().to_string(); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), + ..Default::default() + } + .into(), + task.task_token, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task.task_token, + )) + .unwrap(); + dbg!( + "sent workflow done, completed workflow", + workflow_id, + run_id + ); +} diff --git a/tests/main.rs b/tests/main.rs index 976cf28e4..887c013d1 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,4 +1,4 @@ #[cfg(test)] mod integ_tests { - mod poller_test; + mod simple_wf_tests; } From 79495e63149de603c585d7433ed7f01bdfd66477 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Feb 2021 16:53:39 -0800 Subject: [PATCH 02/16] Dedupe core response setup --- src/lib.rs | 104 ++++------------------------ src/machines/test_help/mod.rs | 62 +++++++++++++++++ src/machines/timer_state_machine.rs | 1 - 3 files changed, 74 insertions(+), 93 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 4f3a5a658..6b112b0ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -285,23 +285,19 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; - use crate::protos::coresdk::TimerFiredTaskAttributes; use crate::{ - machines::test_help::TestHistoryBuilder, - pollers::MockServerGateway, + machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ - coresdk::{wf_activation_job, WfActivationJob}, + coresdk::{wf_activation_job, TimerFiredTaskAttributes, WfActivationJob}, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, enums::v1::EventType, - history::v1::{history_event, History, TimerFiredEventAttributes}, - workflowservice::v1::RespondWorkflowTaskCompletedResponse, + history::v1::{history_event, TimerFiredEventAttributes}, }, }, }; - use std::collections::VecDeque; use tracing::Level; #[test] @@ -311,7 +307,7 @@ mod test { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; - let timer_id = "fake_timer"; + let timer_id = "fake_timer".to_string(); let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); @@ -322,7 +318,7 @@ mod test { EventType::TimerFired, history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { started_event_id: timer_started_event_id, - timer_id: "timer1".to_string(), + timer_id: timer_id.clone(), }), ); t.add_workflow_task_scheduled_and_started(); @@ -338,47 +334,9 @@ mod test { 8: EVENT_TYPE_WORKFLOW_TASK_STARTED --- */ - let events_first_batch = t.get_history_info(1).unwrap().events; - let wf = Some(WorkflowExecution { - workflow_id: wfid.to_string(), - run_id: run_id.to_string(), - }); - let first_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_first_batch, - }), - workflow_execution: wf.clone(), - ..Default::default() - }; - let events_second_batch = t.get_history_info(2).unwrap().events; - let second_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_second_batch, - }), - workflow_execution: wf, - ..Default::default() - }; - let responses = vec![first_response, second_response]; - - let mut tasks = VecDeque::from(responses); - let mut mock_gateway = MockServerGateway::new(); - mock_gateway - .expect_poll_workflow_task() - .returning(move |_| Ok(tasks.pop_front().unwrap())); - // Response not really important here - mock_gateway - .expect_complete_workflow_task() - .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); - - let runtime = Runtime::new().unwrap(); - let core = CoreSDK { - runtime, - server_gateway: Arc::new(mock_gateway), - workflow_machines: DashMap::new(), - workflow_task_tokens: DashMap::new(), - }; + let core = build_fake_core(wfid, run_id, &mut t); - let res = dbg!(core.poll_task(task_queue).unwrap()); + let res = core.poll_task(task_queue).unwrap(); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -391,7 +349,7 @@ mod test { let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( vec![StartTimerCommandAttributes { - timer_id: timer_id.to_string(), + timer_id, ..Default::default() } .into()], @@ -399,7 +357,7 @@ mod test { )) .unwrap(); - let res = dbg!(core.poll_task(task_queue).unwrap()); + let res = core.poll_task(task_queue).unwrap(); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -460,45 +418,7 @@ mod test { 10: EVENT_TYPE_WORKFLOW_TASK_STARTED --- */ - let events_first_batch = t.get_history_info(1).unwrap().events; - let wf = Some(WorkflowExecution { - workflow_id: wfid.to_string(), - run_id: run_id.to_string(), - }); - let first_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_first_batch, - }), - workflow_execution: wf.clone(), - ..Default::default() - }; - let events_second_batch = t.get_history_info(2).unwrap().events; - let second_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_second_batch, - }), - workflow_execution: wf, - ..Default::default() - }; - let responses = vec![first_response, second_response]; - - let mut tasks = VecDeque::from(responses); - let mut mock_gateway = MockServerGateway::new(); - mock_gateway - .expect_poll_workflow_task() - .returning(move |_| Ok(tasks.pop_front().unwrap())); - // Response not really important here - mock_gateway - .expect_complete_workflow_task() - .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); - - let runtime = Runtime::new().unwrap(); - let core = CoreSDK { - runtime, - server_gateway: Arc::new(mock_gateway), - workflow_machines: DashMap::new(), - workflow_task_tokens: DashMap::new(), - }; + let core = build_fake_core(wfid, run_id, &mut t); let res = core.poll_task(task_queue).unwrap(); // TODO: uggo @@ -544,8 +464,8 @@ mod test { )), } ] => { - assert_eq!(t1_id, &timer_1_id); - assert_eq!(t2_id, &timer_2_id); + assert_eq!(t1_id, &timer_1_id); + assert_eq!(t2_id, &timer_2_id); } ); let task_tok = res.task_token; diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index b7ab7509b..e1c0233eb 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -5,3 +5,65 @@ mod workflow_driver; pub(crate) use history_builder::TestHistoryBuilder; pub(super) use workflow_driver::{CommandSender, TestWFCommand, TestWorkflowDriver}; + +use crate::{ + pollers::MockServerGateway, + protos::temporal::api::common::v1::WorkflowExecution, + protos::temporal::api::history::v1::History, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + }, + CoreSDK, +}; +use dashmap::DashMap; +use std::{collections::VecDeque, sync::Arc}; +use tokio::runtime::Runtime; + +/// Given identifiers for a workflow/run, and a test history builder, construct an instance of +/// the core SDK with a mock server gateway that will produce the responses as appropriate. +pub(crate) fn build_fake_core( + wfid: &str, + run_id: &str, + t: &mut TestHistoryBuilder, +) -> CoreSDK { + let events_first_batch = t.get_history_info(1).unwrap().events; + let wf = Some(WorkflowExecution { + workflow_id: wfid.to_string(), + run_id: run_id.to_string(), + }); + let first_response = PollWorkflowTaskQueueResponse { + history: Some(History { + events: events_first_batch, + }), + workflow_execution: wf.clone(), + ..Default::default() + }; + let events_second_batch = t.get_history_info(2).unwrap().events; + let second_response = PollWorkflowTaskQueueResponse { + history: Some(History { + events: events_second_batch, + }), + workflow_execution: wf, + ..Default::default() + }; + let responses = vec![first_response, second_response]; + + let mut tasks = VecDeque::from(responses); + let mut mock_gateway = MockServerGateway::new(); + mock_gateway + .expect_poll_workflow_task() + .returning(move |_| Ok(tasks.pop_front().unwrap())); + // Response not really important here + mock_gateway + .expect_complete_workflow_task() + .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); + + let runtime = Runtime::new().unwrap(); + let core = CoreSDK { + runtime, + server_gateway: Arc::new(mock_gateway), + workflow_machines: DashMap::new(), + workflow_task_tokens: DashMap::new(), + }; + core +} diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 79c932a0b..c315043e7 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -343,7 +343,6 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); - dbg!(&commands); assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, From 070f863d1ae672294efacf64ca45b2e383ff1d87 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Mon, 8 Feb 2021 17:12:19 -0800 Subject: [PATCH 03/16] Response batch control --- src/lib.rs | 15 +++----- src/machines/test_help/history_builder.rs | 3 +- src/machines/test_help/mod.rs | 43 +++++++++++------------ 3 files changed, 27 insertions(+), 34 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 6b112b0ed..3d344b100 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -298,13 +298,9 @@ mod test { }, }, }; - use tracing::Level; #[test] - fn timer_test_accross_wf_bridge() { - let s = span!(Level::DEBUG, "Test start"); - let _enter = s.enter(); - + fn timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_id = "fake_timer".to_string(); @@ -334,7 +330,7 @@ mod test { 8: EVENT_TYPE_WORKFLOW_TASK_STARTED --- */ - let core = build_fake_core(wfid, run_id, &mut t); + let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); let res = core.poll_task(task_queue).unwrap(); // TODO: uggo @@ -374,10 +370,7 @@ mod test { } #[test] - fn parallel_timer_test_accross_wf_bridge() { - let s = span!(Level::DEBUG, "Test start"); - let _enter = s.enter(); - + fn parallel_timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_1_id = "timer1".to_string(); @@ -418,7 +411,7 @@ mod test { 10: EVENT_TYPE_WORKFLOW_TASK_STARTED --- */ - let core = build_fake_core(wfid, run_id, &mut t); + let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); let res = core.poll_task(task_queue).unwrap(); // TODO: uggo diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 2f4dac37f..b9fc0a3dc 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -148,7 +148,8 @@ impl TestHistoryBuilder { Ok(()) } - /// Iterates over the events in this builder to return a [HistoryInfo] of the n-th workflow task. + /// Iterates over the events in this builder to return a [HistoryInfo] including events up to + /// the provided `to_wf_task_num` pub(crate) fn get_history_info( &self, to_wf_task_num: usize, diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index e1c0233eb..ba216ab7e 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -21,32 +21,32 @@ use tokio::runtime::Runtime; /// Given identifiers for a workflow/run, and a test history builder, construct an instance of /// the core SDK with a mock server gateway that will produce the responses as appropriate. +/// +/// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. +/// For each number in the input list, a fake response will be prepared which includes history +/// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. pub(crate) fn build_fake_core( - wfid: &str, + wf_id: &str, run_id: &str, t: &mut TestHistoryBuilder, + response_batches: &[usize], ) -> CoreSDK { - let events_first_batch = t.get_history_info(1).unwrap().events; let wf = Some(WorkflowExecution { - workflow_id: wfid.to_string(), + workflow_id: wf_id.to_string(), run_id: run_id.to_string(), }); - let first_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_first_batch, - }), - workflow_execution: wf.clone(), - ..Default::default() - }; - let events_second_batch = t.get_history_info(2).unwrap().events; - let second_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_second_batch, - }), - workflow_execution: wf, - ..Default::default() - }; - let responses = vec![first_response, second_response]; + + let responses: Vec<_> = response_batches + .iter() + .map(|to_task_num| { + let batch = t.get_history_info(*to_task_num).unwrap().events; + PollWorkflowTaskQueueResponse { + history: Some(History { events: batch }), + workflow_execution: wf.clone(), + ..Default::default() + } + }) + .collect(); let mut tasks = VecDeque::from(responses); let mut mock_gateway = MockServerGateway::new(); @@ -59,11 +59,10 @@ pub(crate) fn build_fake_core( .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); let runtime = Runtime::new().unwrap(); - let core = CoreSDK { + CoreSDK { runtime, server_gateway: Arc::new(mock_gateway), workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), - }; - core + } } From ab685cc22543693da3a0c2ddc710e5bed59bd0e3 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 9 Feb 2021 13:21:06 -0800 Subject: [PATCH 04/16] Saving here while waiting for some feedback --- Cargo.toml | 4 +- src/lib.rs | 54 +++++++++++++++++++++++ src/machines/test_help/workflow_driver.rs | 6 +-- src/machines/timer_state_machine.rs | 10 +++++ src/machines/workflow_machines.rs | 1 + tests/integ_tests/simple_wf_tests.rs | 1 + 6 files changed, 72 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 63703200b..c0b880f38 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,16 +17,18 @@ env_logger = "0.8" futures = "0.3" log = "0.4" opentelemetry-jaeger = "0.10" +opentelemetry = "0.11.2" prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.11" +tracing-opentelemetry = "0.10" tracing-subscriber = "0.2" url = "2.2" rand = "0.8.3" uuid = { version = "0.8.2", features = ["v4"] } + [dependencies.tonic] version = "0.4" #path = "../tonic/tonic" diff --git a/src/lib.rs b/src/lib.rs index 3d344b100..be5fd900a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -298,6 +298,8 @@ mod test { }, }, }; + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; #[test] fn timer_test_across_wf_bridge() { @@ -468,4 +470,56 @@ mod test { )) .unwrap(); } + + #[test] + fn single_timer_whole_replay_test_across_wf_bridge() { + let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() + .with_service_name("report_example") + .install() + .unwrap(); + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + tracing_subscriber::registry() + .with(opentelemetry) + .try_init() + .unwrap(); + + let s = span!(Level::DEBUG, "Test start", t = "bridge"); + let _enter = s.enter(); + + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let timer_1_id = "timer1".to_string(); + let task_queue = "test-task-queue"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_1_id.clone(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + let core = build_fake_core(wfid, run_id, &mut t, &[2]); + + let res = core.poll_task(task_queue).unwrap(); + // TODO: Not the right expectation -- is timer fired? + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] + ); + assert!(core.workflow_machines.get(run_id).is_some()); + + let task_tok = res.task_token; + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } } diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 56d9843ef..670408595 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -72,12 +72,11 @@ where F: Fn(CommandSender) -> Fut + Send + Sync, Fut: Future, { - #[instrument(skip(self))] fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) -> Vec { + event!(Level::DEBUG, msg = "Test WF driver start called"); vec![] } - #[instrument(skip(self))] fn fetch_workflow_iteration_output(&mut self) -> Vec { let (sender, receiver) = CommandSender::new(self.cache.clone()); // Call the closure that produces the workflow future @@ -88,7 +87,6 @@ where rt.block_on(wf_future); let cmds = receiver.into_iter(); - event!(Level::DEBUG, msg = "Test wf driver emitting", ?cmds); let mut last_cmd = None; for cmd in cmds { @@ -101,6 +99,8 @@ where } } + event!(Level::DEBUG, msg = "Test wf driver emitting", ?last_cmd); + // Return only the last command, since that's what would've been yielded in a real wf if let Some(c) = last_cmd { vec![c] diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index c315043e7..2c9950be8 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -336,6 +336,16 @@ mod test { #[rstest] fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { + let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() + .with_service_name("report_example") + .install() + .unwrap(); + let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); + tracing_subscriber::registry() + .with(opentelemetry) + .try_init() + .unwrap(); + let s = span!(Level::DEBUG, "Test start", t = "full"); let _enter = s.enter(); diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index a6a87548d..cb0efef0e 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -439,6 +439,7 @@ impl WorkflowMachines { fn prepare_commands(&mut self) { while let Some(c) = self.current_wf_task_commands.pop_front() { + dbg!(&c); // TODO - some special case stuff that can maybe be managed differently? // handleCommand should be called even on canceled ones to support mutableSideEffect // command.handleCommand(command.getCommandType()); diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index e89033859..9375ac203 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -67,6 +67,7 @@ fn timer_workflow() { ); } +// TODO: Actually make this different #[test] fn parallel_timer_workflow() { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { From 72327b9f89b3dc3a937cfb43c175aa9be43c83cd Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 10 Feb 2021 10:05:06 -0800 Subject: [PATCH 05/16] Replay test works. Just need to do a bit of cleanup --- Cargo.toml | 1 + src/lib.rs | 75 ++++++++++++++++------- src/machines/test_help/history_builder.rs | 38 ++---------- src/machines/test_help/mod.rs | 2 + src/machines/timer_state_machine.rs | 12 +--- src/protos/mod.rs | 43 +++++++++++++ src/workflow/mod.rs | 59 +++++++++++++++++- tests/integ_tests/simple_wf_tests.rs | 2 +- 8 files changed, 162 insertions(+), 70 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c0b880f38..fc54cecd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,6 +10,7 @@ edition = "2018" [dependencies] anyhow = "1.0" async-trait = "0.1" +crossbeam = "0.8" dashmap = "4.0" derive_more = "0.99" displaydoc = "0.1" diff --git a/src/lib.rs b/src/lib.rs index be5fd900a..51de5b948 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -30,9 +30,10 @@ use crate::{ common::v1::WorkflowExecution, workflowservice::v1::PollWorkflowTaskQueueResponse, }, }, - protosext::{HistoryInfo, HistoryInfoError}, + protosext::HistoryInfoError, workflow::{WfManagerProtected, WorkflowManager}, }; +use crossbeam::queue::SegQueue; use dashmap::DashMap; use std::{convert::TryInto, sync::mpsc::SendError, sync::Arc}; use tokio::runtime::Runtime; @@ -85,6 +86,7 @@ pub fn init(opts: CoreInitOptions) -> Result { server_gateway: Arc::new(work_provider), workflow_machines: Default::default(), workflow_task_tokens: Default::default(), + pending_activations: Default::default(), }) } @@ -107,6 +109,10 @@ where workflow_machines: DashMap, /// Maps task tokens to workflow run ids workflow_task_tokens: DashMap, String>, + + /// Workflows that are currently under replay will queue their run ID here, indicating that + /// there are more workflow tasks / activations to be performed. + pending_activations: SegQueue, } impl Core for CoreSDK @@ -115,6 +121,22 @@ where { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { + // We must first check if there are pending workflow tasks for workflows that are currently + // replaying + if let Some(run_id) = self.pending_activations.pop() { + dbg!(&run_id); + let (activation, more_tasks) = + self.access_machine(&run_id, |mgr| mgr.get_next_activation())?; + if more_tasks { + self.pending_activations.push(run_id); + } + return Ok(Task { + // TODO: Set this properly + task_token: vec![], + variant: activation.map(Into::into), + }); + } + // This will block forever in the event there is no work from the server let work = self .runtime @@ -134,7 +156,7 @@ where event!( Level::DEBUG, - msg = "Received workflow task", + msg = "Received workflow task from server", ?work.task_token ); @@ -142,14 +164,13 @@ where self.workflow_task_tokens .insert(work.task_token.clone(), run_id.clone()); - // We pass none since we want to apply all the history we just got. - // Will need to change a bit once we impl caching. - let hist_info = HistoryInfo::new_from_history(&history, None)?; - let activation = self.access_machine(&run_id, |mgr| { - let machines = &mut mgr.machines; - hist_info.apply_history_events(machines)?; - Ok(machines.get_wf_activation()) - })?; + let (activation, more_tasks) = + self.access_machine(&run_id, |mgr| mgr.feed_history_from_server(history))?; + + if more_tasks { + dbg!("More tasks!"); + self.pending_activations.push(run_id); + } Ok(Task { task_token: work.task_token, @@ -298,8 +319,6 @@ mod test { }, }, }; - use tracing_subscriber::layer::SubscriberExt; - use tracing_subscriber::util::SubscriberInitExt; #[test] fn timer_test_across_wf_bridge() { @@ -355,6 +374,7 @@ mod test { )) .unwrap(); + dbg!("Second poll"); let res = core.poll_task(task_queue).unwrap(); // TODO: uggo assert_matches!( @@ -473,16 +493,6 @@ mod test { #[test] fn single_timer_whole_replay_test_across_wf_bridge() { - let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() - .with_service_name("report_example") - .install() - .unwrap(); - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); - tracing_subscriber::registry() - .with(opentelemetry) - .try_init() - .unwrap(); - let s = span!(Level::DEBUG, "Test start", t = "bridge"); let _enter = s.enter(); @@ -503,6 +513,8 @@ mod test { }), ); t.add_workflow_task_scheduled_and_started(); + // NOTE! What makes this a replay test is the server only responds with *one* batch here. + // So, server is polled once, but lang->core interactions look just like non-replay test. let core = build_fake_core(wfid, run_id, &mut t, &[2]); let res = core.poll_task(task_queue).unwrap(); @@ -515,6 +527,25 @@ mod test { ); assert!(core.workflow_machines.get(run_id).is_some()); + let task_tok = res.task_token; + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_1_id, + ..Default::default() + } + .into()], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(task_queue).unwrap(); + // TODO: uggo + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + }] + ); let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index b9fc0a3dc..8a3230790 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,4 +1,5 @@ use super::Result; +use crate::protos::temporal::api::history::v1::History; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -87,41 +88,10 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into()); } - /// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed by - /// WFTaskCompleted, adding one to the count for every match. It will additionally count - /// a WFTaskStarted at the end of the event list. - /// - /// If `up_to_event_id` is provided, the count will be returned as soon as processing advances - /// past that id. - pub fn get_workflow_task_count(&self, up_to_event_id: Option) -> Result { - let mut last_wf_started_id = 0; - let mut count = 0; - let mut history = self.events.iter().peekable(); - while let Some(event) = history.next() { - let next_event = history.peek(); - if let Some(upto) = up_to_event_id { - if event.event_id > upto { - return Ok(count); - } - } - let next_is_completed = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskCompleted as i32 - }); - if event.event_type == EventType::WorkflowTaskStarted as i32 - && (next_event.is_none() || next_is_completed) - { - last_wf_started_id = event.event_id; - count += 1; - } - - if next_event.is_none() { - if last_wf_started_id != event.event_id { - bail!("Last item in history wasn't WorkflowTaskStarted") - } - return Ok(count); - } + pub fn as_history(&self) -> History { + History { + events: self.events.clone(), } - Ok(count) } /// Handle workflow task(s) using the provided [WorkflowMachines]. Will process as many workflow diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index ba216ab7e..7f18520d0 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -52,6 +52,7 @@ pub(crate) fn build_fake_core( let mut mock_gateway = MockServerGateway::new(); mock_gateway .expect_poll_workflow_task() + .times(response_batches.len()) .returning(move |_| Ok(tasks.pop_front().unwrap())); // Response not really important here mock_gateway @@ -64,5 +65,6 @@ pub(crate) fn build_fake_core( server_gateway: Arc::new(mock_gateway), workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), + pending_activations: Default::default(), } } diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 2c9950be8..e77b1d7c3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -305,7 +305,7 @@ mod test { }), ); t.add_workflow_task_scheduled_and_started(); - assert_eq!(2, t.get_workflow_task_count(None).unwrap()); + assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); (t, state_machines) } @@ -336,16 +336,6 @@ mod test { #[rstest] fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() - .with_service_name("report_example") - .install() - .unwrap(); - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); - tracing_subscriber::registry() - .with(opentelemetry) - .try_init() - .unwrap(); - let s = span!(Level::DEBUG, "Test start", t = "full"); let _enter = s.enter(); diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 57650f312..f4c71801e 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -129,11 +129,54 @@ pub mod temporal { use crate::protos::temporal::api::{ enums::v1::EventType, history::v1::history_event::Attributes, }; + use crate::protosext::HistoryInfoError; use prost::alloc::fmt::Formatter; use std::fmt::Display; include!("temporal.api.history.v1.rs"); + impl History { + /// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed + /// by WFTaskCompleted, adding one to the count for every match. It will + /// additionally count a WFTaskStarted at the end of the event list. + /// + /// If `up_to_event_id` is provided, the count will be returned as soon as + /// processing advances past that id. + pub fn get_workflow_task_count( + &self, + up_to_event_id: Option, + ) -> Result { + let mut last_wf_started_id = 0; + let mut count = 0; + let mut history = self.events.iter().peekable(); + while let Some(event) = history.next() { + let next_event = history.peek(); + if let Some(upto) = up_to_event_id { + if event.event_id > upto { + return Ok(count); + } + } + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + if event.event_type == EventType::WorkflowTaskStarted as i32 + && (next_event.is_none() || next_is_completed) + { + last_wf_started_id = event.event_id; + count += 1; + } + + if next_event.is_none() { + if last_wf_started_id != event.event_id { + return Err(HistoryInfoError::HistoryEndsUnexpectedly); + } + return Ok(count); + } + } + Ok(count) + } + } + impl HistoryEvent { /// Returns true if this is an event created to mirror a command pub fn is_command_event(&self) -> bool { diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 08c894f3b..da699c69d 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -2,6 +2,9 @@ mod bridge; pub(crate) use bridge::WorkflowBridge; +use crate::protos::coresdk::WfActivation; +use crate::protos::temporal::api::history::v1::History; +use crate::protosext::HistoryInfo; use crate::{ machines::{ProtoCommand, WFCommand, WorkflowMachines}, protos::temporal::api::workflowservice::v1::StartWorkflowExecutionResponse, @@ -56,7 +59,8 @@ pub trait StartWorkflowExecutionApi { ) -> Result; } -/// Manages concurrent access to an instance of a [WorkflowMachines], which is not thread-safe. +/// Manages concurrent access to an instance of a [WorkflowMachines], which is not thread-safe, +/// as well as other data associated with that specific workflow run. pub(crate) struct WorkflowManager { data: Arc>, } @@ -64,6 +68,13 @@ pub(crate) struct WorkflowManager { pub(crate) struct WfManagerProtected { pub machines: WorkflowMachines, pub command_sink: Sender>, + /// The last recorded history we received from the server for this workflow run. This must be + /// kept because the lang side polls & completes for every workflow task, but we do not need + /// to poll the server that often during replay. + pub last_history_from_server: Option, + pub last_history_task_count: Option, + /// The current workflow task number this run is on. Starts at one and monotonically increases. + pub current_wf_task_num: usize, } impl WorkflowManager { @@ -74,6 +85,9 @@ impl WorkflowManager { let protected = WfManagerProtected { machines: state_machines, command_sink: cmd_sink, + last_history_from_server: None, + last_history_task_count: None, + current_wf_task_num: 1, }; Self { data: Arc::new(Mutex::new(protected)), @@ -91,13 +105,54 @@ impl WorkflowManager { } } +impl WfManagerProtected { + /// Given history that was just obtained from the server, pipe it into this workflow's machines. + /// + /// Should only be called when a workflow has caught up on replay. It will return a workflow + /// activation if one is needed, as well as a bool indicating if there are more workflow tasks + /// that need to be performed to replay the remaining history. + pub fn feed_history_from_server( + &mut self, + hist: History, + ) -> Result<(Option, bool)> { + let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?; + let task_ct = hist.get_workflow_task_count(None)?; + self.last_history_task_count = Some(task_ct); + self.last_history_from_server = Some(hist); + task_hist.apply_history_events(&mut self.machines)?; + let activation = self.machines.get_wf_activation(); + let more_activations_needed = task_ct > self.current_wf_task_num; + + self.current_wf_task_num += 1; + Ok((activation, more_activations_needed)) + } + + pub fn get_next_activation(&mut self) -> Result<(Option, bool)> { + self.current_wf_task_num += 1; + // TODO: Proper errors + let hist = self + .last_history_from_server + .as_ref() + .ok_or_else(|| CoreError::Unknown)?; + let task_hist = HistoryInfo::new_from_history(hist, Some(self.current_wf_task_num))?; + task_hist.apply_history_events(&mut self.machines)?; + let activation = self.machines.get_wf_activation(); + let more_activations_needed = self.current_wf_task_num + <= self + .last_history_task_count + .ok_or_else(|| CoreError::Unknown)?; + + Ok((activation, more_activations_needed)) + } +} + #[cfg(test)] mod tests { use super::*; - // Enforce thread-safeness of wf manager fn enforcer(_: W) {} + // Enforce thread-safeness of wf manager #[test] fn is_threadsafe() { enforcer(WorkflowManager::new(&Default::default())); diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 9375ac203..4c1616665 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -67,7 +67,7 @@ fn timer_workflow() { ); } -// TODO: Actually make this different +// TODO: Actually make this different from serial #[test] fn parallel_timer_workflow() { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { From a29489ec77a8b2e0e5d9dd86bed782d720fb5900 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 10 Feb 2021 10:41:13 -0800 Subject: [PATCH 06/16] Fix task token stuff --- src/lib.rs | 28 +++++++++++++++++----------- src/machines/test_help/mod.rs | 3 +++ src/machines/workflow_machines.rs | 1 - src/workflow/mod.rs | 24 ++++++++++++++++-------- 4 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 51de5b948..2785427a3 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -112,7 +112,13 @@ where /// Workflows that are currently under replay will queue their run ID here, indicating that /// there are more workflow tasks / activations to be performed. - pending_activations: SegQueue, + pending_activations: SegQueue, +} + +#[derive(Debug)] +struct PendingActivation { + run_id: String, + task_token: Vec, } impl Core for CoreSDK @@ -123,16 +129,16 @@ where fn poll_task(&self, task_queue: &str) -> Result { // We must first check if there are pending workflow tasks for workflows that are currently // replaying - if let Some(run_id) = self.pending_activations.pop() { - dbg!(&run_id); + if let Some(pa) = self.pending_activations.pop() { + event!(Level::DEBUG, msg = "Applying pending activations", ?pa); let (activation, more_tasks) = - self.access_machine(&run_id, |mgr| mgr.get_next_activation())?; + self.access_machine(&pa.run_id, |mgr| mgr.get_next_activation())?; + let task_token = pa.task_token.clone(); if more_tasks { - self.pending_activations.push(run_id); + self.pending_activations.push(pa); } return Ok(Task { - // TODO: Set this properly - task_token: vec![], + task_token, variant: activation.map(Into::into), }); } @@ -168,8 +174,10 @@ where self.access_machine(&run_id, |mgr| mgr.feed_history_from_server(history))?; if more_tasks { - dbg!("More tasks!"); - self.pending_activations.push(run_id); + self.pending_activations.push(PendingActivation { + run_id, + task_token: work.task_token.clone(), + }); } Ok(Task { @@ -374,7 +382,6 @@ mod test { )) .unwrap(); - dbg!("Second poll"); let res = core.poll_task(task_queue).unwrap(); // TODO: uggo assert_matches!( @@ -518,7 +525,6 @@ mod test { let core = build_fake_core(wfid, run_id, &mut t, &[2]); let res = core.poll_task(task_queue).unwrap(); - // TODO: Not the right expectation -- is timer fired? assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 7f18520d0..9c4a73df8 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -16,6 +16,7 @@ use crate::{ CoreSDK, }; use dashmap::DashMap; +use rand::{thread_rng, Rng}; use std::{collections::VecDeque, sync::Arc}; use tokio::runtime::Runtime; @@ -40,9 +41,11 @@ pub(crate) fn build_fake_core( .iter() .map(|to_task_num| { let batch = t.get_history_info(*to_task_num).unwrap().events; + let task_token: [u8; 16] = thread_rng().gen(); PollWorkflowTaskQueueResponse { history: Some(History { events: batch }), workflow_execution: wf.clone(), + task_token: task_token.to_vec(), ..Default::default() } }) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index cb0efef0e..a6a87548d 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -439,7 +439,6 @@ impl WorkflowMachines { fn prepare_commands(&mut self) { while let Some(c) = self.current_wf_task_commands.pop_front() { - dbg!(&c); // TODO - some special case stuff that can maybe be managed differently? // handleCommand should be called even on canceled ones to support mutableSideEffect // command.handleCommand(command.getCommandType()); diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index da699c69d..8b638045c 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -2,24 +2,27 @@ mod bridge; pub(crate) use bridge::WorkflowBridge; -use crate::protos::coresdk::WfActivation; -use crate::protos::temporal::api::history::v1::History; -use crate::protosext::HistoryInfo; use crate::{ machines::{ProtoCommand, WFCommand, WorkflowMachines}, - protos::temporal::api::workflowservice::v1::StartWorkflowExecutionResponse, - protos::temporal::api::{ - common::v1::WorkflowExecution, - workflowservice::v1::{ - PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + protos::{ + coresdk::WfActivation, + temporal::api::{ + common::v1::WorkflowExecution, + history::v1::History, + workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + StartWorkflowExecutionResponse, + }, }, }, + protosext::HistoryInfo, CoreError, Result, }; use std::{ ops::DerefMut, sync::{mpsc::Sender, Arc, Mutex}, }; +use tracing::Level; /// Implementors can provide new workflow tasks to the SDK. The connection to the server is the real /// implementor. @@ -111,6 +114,7 @@ impl WfManagerProtected { /// Should only be called when a workflow has caught up on replay. It will return a workflow /// activation if one is needed, as well as a bool indicating if there are more workflow tasks /// that need to be performed to replay the remaining history. + #[instrument(skip(self))] pub fn feed_history_from_server( &mut self, hist: History, @@ -123,6 +127,10 @@ impl WfManagerProtected { let activation = self.machines.get_wf_activation(); let more_activations_needed = task_ct > self.current_wf_task_num; + if more_activations_needed { + event!(Level::DEBUG, msg = "More activations needed"); + } + self.current_wf_task_num += 1; Ok((activation, more_activations_needed)) } From 4bf1703030e5d192d723e459665ab1c67b86d13b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 10 Feb 2021 17:16:57 -0800 Subject: [PATCH 07/16] Some refactoring to eliminate invalid states in workflow representation --- src/lib.rs | 102 +++++++++++++++------------ src/machines/workflow_machines.rs | 4 +- src/workflow/mod.rs | 73 +++++++++++-------- tests/integ_tests/simple_wf_tests.rs | 12 ++-- 4 files changed, 108 insertions(+), 83 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 2785427a3..4c5f02f8f 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,15 +26,13 @@ use crate::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, WfActivationCompletion, WfActivationSuccess, }, - temporal::api::{ - common::v1::WorkflowExecution, workflowservice::v1::PollWorkflowTaskQueueResponse, - }, + temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse, }, protosext::HistoryInfoError, - workflow::{WfManagerProtected, WorkflowManager}, + workflow::{NextWfActivation, WfManagerProtected, WorkflowManager}, }; use crossbeam::queue::SegQueue; -use dashmap::DashMap; +use dashmap::{mapref::entry::Entry, DashMap}; use std::{convert::TryInto, sync::mpsc::SendError, sync::Arc}; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; @@ -128,18 +126,18 @@ where #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { // We must first check if there are pending workflow tasks for workflows that are currently - // replaying + // replaying, and issue those tasks before bothering the server. if let Some(pa) = self.pending_activations.pop() { event!(Level::DEBUG, msg = "Applying pending activations", ?pa); - let (activation, more_tasks) = + let next_activation = self.access_machine(&pa.run_id, |mgr| mgr.get_next_activation())?; let task_token = pa.task_token.clone(); - if more_tasks { + if next_activation.more_activations_needed { self.pending_activations.push(pa); } return Ok(Task { task_token, - variant: activation.map(Into::into), + variant: next_activation.activation.map(Into::into), }); } @@ -147,42 +145,25 @@ where let work = self .runtime .block_on(self.server_gateway.poll_workflow_task(task_queue))?; - let run_id = match &work.workflow_execution { - Some(we) => { - self.instantiate_workflow_if_needed(we); - we.run_id.clone() - } - None => return Err(CoreError::BadDataFromWorkProvider(work)), - }; - let history = if let Some(hist) = work.history { - hist - } else { - return Err(CoreError::BadDataFromWorkProvider(work)); - }; - + let task_token = work.task_token.clone(); event!( Level::DEBUG, msg = "Received workflow task from server", - ?work.task_token + ?task_token ); - // Correlate task token w/ run ID - self.workflow_task_tokens - .insert(work.task_token.clone(), run_id.clone()); + let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; - let (activation, more_tasks) = - self.access_machine(&run_id, |mgr| mgr.feed_history_from_server(history))?; - - if more_tasks { + if next_activation.more_activations_needed { self.pending_activations.push(PendingActivation { run_id, - task_token: work.task_token.clone(), + task_token: task_token.clone(), }); } Ok(Task { - task_token: work.task_token, - variant: activation.map(Into::into), + task_token, + variant: next_activation.activation.map(Into::into), }) } @@ -232,17 +213,50 @@ where } impl CoreSDK { - fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { - if self - .workflow_machines - .contains_key(&workflow_execution.run_id) + /// Will create a new workflow manager if needed for the workflow task, if not, it will + /// feed the existing manager the updated history we received from the server. + /// + /// Also updates [CoreSDK::workflow_task_tokens] and validates the + /// [PollWorkflowTaskQueueResponse] + /// + /// Returns the next workflow activation and the workflow's run id + fn instantiate_or_update_workflow( + &self, + poll_wf_resp: PollWorkflowTaskQueueResponse, + ) -> Result<(NextWfActivation, String)> { + if let PollWorkflowTaskQueueResponse { + task_token, + workflow_execution: Some(workflow_execution), + .. + } = &poll_wf_resp { - return; + let run_id = workflow_execution.run_id.clone(); + // Correlate task token w/ run ID + self.workflow_task_tokens + .insert(task_token.clone(), run_id.clone()); + + match self.workflow_machines.entry(run_id.clone()) { + Entry::Occupied(mut existing) => { + if let Some(history) = poll_wf_resp.history { + let activation = existing + .get_mut() + .lock()? + .feed_history_from_server(history)?; + Ok((activation, run_id)) + } else { + Err(CoreError::BadDataFromWorkProvider(poll_wf_resp)) + } + } + Entry::Vacant(vacant) => { + let wfm = WorkflowManager::new(poll_wf_resp)?; + let activation = wfm.lock()?.get_next_activation()?; + vacant.insert(wfm); + Ok((activation, run_id)) + } + } + } else { + Err(CoreError::BadDataFromWorkProvider(poll_wf_resp)) } - self.workflow_machines.insert( - workflow_execution.run_id.clone(), - WorkflowManager::new(workflow_execution), - ); } /// Feed commands from the lang sdk into the appropriate workflow bridge @@ -281,8 +295,6 @@ impl CoreSDK { #[allow(clippy::large_enum_variant)] // NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum CoreError { - /// Unknown service error - Unknown, /// No tasks to perform for now NoWork, /// Poll response from server was malformed: {0:?} @@ -292,7 +304,7 @@ pub enum CoreError { /// Error buffering commands CantSendCommands(#[from] SendError>), /// Couldn't interpret command from - UninterprableCommand(#[from] InconvertibleCommandError), + UninterpretableCommand(#[from] InconvertibleCommandError), /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), /// Task token had nothing associated with it: {0:?} diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index a6a87548d..abad68299 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -43,9 +43,9 @@ pub(crate) struct WorkflowMachines { /// True if the workflow is replaying from history replaying: bool, /// Workflow identifier - workflow_id: String, + pub workflow_id: String, /// Identifies the current run and is used as a seed for faux-randomness. - run_id: String, + pub run_id: String, /// The current workflow time if it has been established current_wf_time: Option, diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 8b638045c..8b03c5588 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -7,7 +7,6 @@ use crate::{ protos::{ coresdk::WfActivation, temporal::api::{ - common::v1::WorkflowExecution, history::v1::History, workflowservice::v1::{ PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, @@ -74,27 +73,38 @@ pub(crate) struct WfManagerProtected { /// The last recorded history we received from the server for this workflow run. This must be /// kept because the lang side polls & completes for every workflow task, but we do not need /// to poll the server that often during replay. - pub last_history_from_server: Option, - pub last_history_task_count: Option, + pub last_history_from_server: History, + pub last_history_task_count: usize, /// The current workflow task number this run is on. Starts at one and monotonically increases. pub current_wf_task_num: usize, } impl WorkflowManager { - pub fn new(we: &WorkflowExecution) -> Self { + /// Create a new workflow manager from a server workflow task queue response. + pub fn new(poll_resp: PollWorkflowTaskQueueResponse) -> Result { + let (history, we) = if let PollWorkflowTaskQueueResponse { + workflow_execution: Some(we), + history: Some(hist), + .. + } = poll_resp + { + (hist, we) + } else { + return Err(CoreError::BadDataFromWorkProvider(poll_resp.clone())); + }; + let (wfb, cmd_sink) = WorkflowBridge::new(); - let state_machines = - WorkflowMachines::new(we.workflow_id.clone(), we.run_id.clone(), Box::new(wfb)); + let state_machines = WorkflowMachines::new(we.workflow_id, we.run_id, Box::new(wfb)); let protected = WfManagerProtected { machines: state_machines, command_sink: cmd_sink, - last_history_from_server: None, - last_history_task_count: None, + last_history_task_count: history.get_workflow_task_count(None)?, + last_history_from_server: history, current_wf_task_num: 1, }; - Self { + Ok(Self { data: Arc::new(Mutex::new(protected)), - } + }) } pub fn lock(&self) -> Result + '_> { @@ -108,6 +118,11 @@ impl WorkflowManager { } } +pub(crate) struct NextWfActivation { + pub activation: Option, + pub more_activations_needed: bool, +} + impl WfManagerProtected { /// Given history that was just obtained from the server, pipe it into this workflow's machines. /// @@ -115,14 +130,11 @@ impl WfManagerProtected { /// activation if one is needed, as well as a bool indicating if there are more workflow tasks /// that need to be performed to replay the remaining history. #[instrument(skip(self))] - pub fn feed_history_from_server( - &mut self, - hist: History, - ) -> Result<(Option, bool)> { + pub fn feed_history_from_server(&mut self, hist: History) -> Result { let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?; let task_ct = hist.get_workflow_task_count(None)?; - self.last_history_task_count = Some(task_ct); - self.last_history_from_server = Some(hist); + self.last_history_task_count = task_ct; + self.last_history_from_server = hist; task_hist.apply_history_events(&mut self.machines)?; let activation = self.machines.get_wf_activation(); let more_activations_needed = task_ct > self.current_wf_task_num; @@ -132,25 +144,26 @@ impl WfManagerProtected { } self.current_wf_task_num += 1; - Ok((activation, more_activations_needed)) + + Ok(NextWfActivation { + activation, + more_activations_needed, + }) } - pub fn get_next_activation(&mut self) -> Result<(Option, bool)> { - self.current_wf_task_num += 1; - // TODO: Proper errors - let hist = self - .last_history_from_server - .as_ref() - .ok_or_else(|| CoreError::Unknown)?; + pub fn get_next_activation(&mut self) -> Result { + let hist = &self.last_history_from_server; let task_hist = HistoryInfo::new_from_history(hist, Some(self.current_wf_task_num))?; task_hist.apply_history_events(&mut self.machines)?; let activation = self.machines.get_wf_activation(); - let more_activations_needed = self.current_wf_task_num - <= self - .last_history_task_count - .ok_or_else(|| CoreError::Unknown)?; - Ok((activation, more_activations_needed)) + self.current_wf_task_num += 1; + let more_activations_needed = self.current_wf_task_num <= self.last_history_task_count; + + Ok(NextWfActivation { + activation, + more_activations_needed, + }) } } @@ -163,6 +176,6 @@ mod tests { // Enforce thread-safeness of wf manager #[test] fn is_threadsafe() { - enforcer(WorkflowManager::new(&Default::default())); + enforcer(WorkflowManager::new(Default::default())); } } diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 4c1616665..9e3ca7291 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -44,19 +44,19 @@ fn timer_workflow() { let timer_id: String = rng.gen::().to_string(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); core.complete_task(CompleteTaskReq::ok_from_api_attrs( - StartTimerCommandAttributes { + vec![StartTimerCommandAttributes { timer_id: timer_id.to_string(), start_to_fire_timeout: Some(Duration::from_secs(1).into()), ..Default::default() } - .into(), + .into()], task.task_token, )) .unwrap(); dbg!("sent completion w/ start timer"); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); core.complete_task(CompleteTaskReq::ok_from_api_attrs( - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, )) .unwrap(); @@ -89,19 +89,19 @@ fn parallel_timer_workflow() { let timer_id: String = rng.gen::().to_string(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); core.complete_task(CompleteTaskReq::ok_from_api_attrs( - StartTimerCommandAttributes { + vec![StartTimerCommandAttributes { timer_id: timer_id.to_string(), start_to_fire_timeout: Some(Duration::from_secs(1).into()), ..Default::default() } - .into(), + .into()], task.task_token, )) .unwrap(); dbg!("sent completion w/ start timer"); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); core.complete_task(CompleteTaskReq::ok_from_api_attrs( - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, )) .unwrap(); From fed36fb3410d4324a83c79011563ddf5a5ca7661 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 13:31:29 -0800 Subject: [PATCH 08/16] Make parallel timer integ test actually test two timers --- src/protos/mod.rs | 1 - tests/integ_tests/simple_wf_tests.rs | 65 ++++++++++++++++++---------- 2 files changed, 41 insertions(+), 25 deletions(-) diff --git a/src/protos/mod.rs b/src/protos/mod.rs index f4c71801e..6f93e48ad 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -26,7 +26,6 @@ pub mod coresdk { } /// Returns any contained jobs if this task was a wf activation and it had some - #[cfg(test)] pub fn get_wf_jobs(&self) -> Vec { if let Some(task::Variant::Workflow(a)) = &self.variant { a.jobs.clone() diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 9e3ca7291..4ab4246c7 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -1,8 +1,9 @@ +use assert_matches::assert_matches; use rand::{self, Rng}; use std::{convert::TryFrom, env, time::Duration}; use temporal_sdk_core::{ protos::{ - coresdk::CompleteTaskReq, + coresdk::{wf_activation_job, CompleteTaskReq, TimerFiredTaskAttributes, WfActivationJob}, temporal::api::command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, @@ -40,9 +41,9 @@ fn timer_workflow() { let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - let run_id = dbg!(create_workflow(&core, &workflow_id.to_string())); + dbg!(create_workflow(&core, &workflow_id.to_string())); let timer_id: String = rng.gen::().to_string(); - let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let task = core.poll_task(TASK_QUEUE).unwrap(); core.complete_task(CompleteTaskReq::ok_from_api_attrs( vec![StartTimerCommandAttributes { timer_id: timer_id.to_string(), @@ -53,21 +54,14 @@ fn timer_workflow() { task.task_token, )) .unwrap(); - dbg!("sent completion w/ start timer"); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); core.complete_task(CompleteTaskReq::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, )) .unwrap(); - dbg!( - "sent workflow done, completed workflow", - workflow_id, - run_id - ); } -// TODO: Actually make this different from serial #[test] fn parallel_timer_workflow() { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { @@ -86,28 +80,51 @@ fn parallel_timer_workflow() { let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); let run_id = dbg!(create_workflow(&core, &workflow_id.to_string())); - let timer_id: String = rng.gen::().to_string(); + let timer_id = "timer 1".to_string(); + let timer_2_id = "timer 2".to_string(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); core.complete_task(CompleteTaskReq::ok_from_api_attrs( - vec![StartTimerCommandAttributes { - timer_id: timer_id.to_string(), - start_to_fire_timeout: Some(Duration::from_secs(1).into()), - ..Default::default() - } - .into()], + vec![ + StartTimerCommandAttributes { + timer_id: timer_id.clone(), + start_to_fire_timeout: Some(Duration::from_millis(100).into()), + ..Default::default() + } + .into(), + StartTimerCommandAttributes { + timer_id: timer_2_id.clone(), + start_to_fire_timeout: Some(Duration::from_millis(200).into()), + ..Default::default() + } + .into(), + ], task.task_token, )) .unwrap(); - dbg!("sent completion w/ start timer"); - let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // Wait long enough for both timers to complete + std::thread::sleep(Duration::from_millis(400)); + let task = core.poll_task(TASK_QUEUE).unwrap(); + assert_matches!( + task.get_wf_jobs().as_slice(), + [ + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t1_id } + )), + }, + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t2_id } + )), + } + ] => { + assert_eq!(t1_id, &timer_id); + assert_eq!(t2_id, &timer_2_id); + } + ); core.complete_task(CompleteTaskReq::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, )) .unwrap(); - dbg!( - "sent workflow done, completed workflow", - workflow_id, - run_id - ); } From 121977afe35ea6bc34a88f732c4d96c1601bfc7b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 13:43:11 -0800 Subject: [PATCH 09/16] Clippy lints --- src/lib.rs | 12 +++++++----- src/machines/mod.rs | 23 ++++++++++------------- src/machines/workflow_machines.rs | 9 ++------- src/pollers/mod.rs | 1 + tests/integ_tests/poller_test.rs | 1 + 5 files changed, 21 insertions(+), 25 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e0b76f065..e5e31e940 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -329,7 +329,9 @@ mod test { use crate::{ machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ - coresdk::{wf_activation_job, TimerFiredTaskAttributes, WfActivationJob}, + coresdk::{ + wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, + }, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, @@ -384,7 +386,7 @@ mod test { assert!(core.workflow_machines.get(run_id).is_some()); let task_tok = res.task_token; - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { timer_id, ..Default::default() @@ -403,7 +405,7 @@ mod test { }] ); let task_tok = res.task_token; - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task_tok, )) @@ -503,7 +505,7 @@ mod test { } ); let task_tok = res.task_token; - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task_tok, )) @@ -546,7 +548,7 @@ mod test { assert!(core.workflow_machines.get(run_id).is_some()); let task_tok = res.task_token; - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { timer_id: timer_1_id, ..Default::default() diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 0c9b261e2..57f771939 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -120,20 +120,17 @@ impl TryFrom for WFCommand { fn try_from(c: coresdk::Command) -> Result { // TODO: Return error without cloning match c.variant.clone() { - Some(a) => match a { - Variant::Api(Command { - attributes: Some(attrs), - .. - }) => match attrs { - Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), - Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { - Ok(WFCommand::CompleteWorkflow(c)) - } - _ => unimplemented!(), - }, - _ => Err(c.into()), + Some(Variant::Api(Command { + attributes: Some(attrs), + .. + })) => match attrs { + Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), + Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { + Ok(WFCommand::CompleteWorkflow(c)) + } + _ => unimplemented!(), }, - None => Err(c.into()), + _ => Err(c.into()), } } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index abad68299..0a34c6332 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -185,11 +185,7 @@ impl WorkflowMachines { /// Called when we want to run the event loop because a workflow task started event has /// triggered - pub(super) fn task_started( - &mut self, - task_started_event_id: i64, - time: SystemTime, - ) -> Result<()> { + pub(super) fn task_started(&mut self, task_started_event_id: i64, time: SystemTime) { let s = span!(Level::DEBUG, "Task started trigger"); let _enter = s.enter(); @@ -215,7 +211,6 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); self.event_loop(); - Ok(()) } /// A command event is an event which is generated from a command emitted by a past decision. @@ -413,7 +408,7 @@ impl WorkflowMachines { task_started_event_id, time, } => { - self.task_started(task_started_event_id, time)?; + self.task_started(task_started_event_id, time); } } } diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index a03ff8281..f0a46b5e6 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -60,6 +60,7 @@ impl ServerGatewayOptions { /// This function will get called on each outbound request. Returning a /// `Status` here will cancel the request and have that status returned to /// the caller. +#[allow(clippy::unnecessary_wraps)] // Clippy lies because we need to pass to `with_interceptor` fn intercept(mut req: Request<()>) -> Result, Status> { let metadata = req.metadata_mut(); // TODO: Only apply this to long poll requests diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index e69de29bb..8b1378917 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -0,0 +1 @@ + From c03b30486efd93163d626f5116995f4e4f12fb0a Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 14:01:41 -0800 Subject: [PATCH 10/16] Fix a todo --- src/machines/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 57f771939..35473e12c 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -118,8 +118,7 @@ impl TryFrom for WFCommand { type Error = InconvertibleCommandError; fn try_from(c: coresdk::Command) -> Result { - // TODO: Return error without cloning - match c.variant.clone() { + match c.variant { Some(Variant::Api(Command { attributes: Some(attrs), .. From 7300cd6936e94c1b447a7fdd968d2ced553d5641 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 14:03:19 -0800 Subject: [PATCH 11/16] Clean up uggo todos, it's clear we want to reduce test verbosity --- src/lib.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e5e31e940..d0379593e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -376,7 +376,6 @@ mod test { let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); let res = core.poll_task(task_queue).unwrap(); - // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { @@ -397,7 +396,6 @@ mod test { .unwrap(); let res = core.poll_task(task_queue).unwrap(); - // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { @@ -457,7 +455,6 @@ mod test { let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); let res = core.poll_task(task_queue).unwrap(); - // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { @@ -485,7 +482,6 @@ mod test { .unwrap(); let res = core.poll_task(task_queue).unwrap(); - // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), [ @@ -559,7 +555,6 @@ mod test { .unwrap(); let res = core.poll_task(task_queue).unwrap(); - // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { From f722f3e58eb80729237cabd1643a14a1263b1685 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 14:28:43 -0800 Subject: [PATCH 12/16] Fix merge problem in integ tests --- tests/integ_tests/simple_wf_tests.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 4ab4246c7..4aac503dc 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -3,7 +3,7 @@ use rand::{self, Rng}; use std::{convert::TryFrom, env, time::Duration}; use temporal_sdk_core::{ protos::{ - coresdk::{wf_activation_job, CompleteTaskReq, TimerFiredTaskAttributes, WfActivationJob}, + coresdk::{wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob}, temporal::api::command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, @@ -44,7 +44,7 @@ fn timer_workflow() { dbg!(create_workflow(&core, &workflow_id.to_string())); let timer_id: String = rng.gen::().to_string(); let task = core.poll_task(TASK_QUEUE).unwrap(); - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { timer_id: timer_id.to_string(), start_to_fire_timeout: Some(Duration::from_secs(1).into()), @@ -55,7 +55,7 @@ fn timer_workflow() { )) .unwrap(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, )) @@ -79,11 +79,11 @@ fn parallel_timer_workflow() { let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - let run_id = dbg!(create_workflow(&core, &workflow_id.to_string())); + dbg!(create_workflow(&core, &workflow_id.to_string())); let timer_id = "timer 1".to_string(); let timer_2_id = "timer 2".to_string(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ StartTimerCommandAttributes { timer_id: timer_id.clone(), @@ -122,7 +122,7 @@ fn parallel_timer_workflow() { assert_eq!(t2_id, &timer_2_id); } ); - core.complete_task(CompleteTaskReq::ok_from_api_attrs( + core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, )) From d59c394c9cab47750047828862b1f8e911bbf3c9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 14:43:28 -0800 Subject: [PATCH 13/16] Use separate task queues to enable running integ tests in parallel --- tests/integ_tests/simple_wf_tests.rs | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 4aac503dc..e7d9f3f47 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -11,14 +11,17 @@ use temporal_sdk_core::{ Core, CoreInitOptions, ServerGatewayOptions, Url, }; -const TASK_QUEUE: &str = "test-tq"; +// TODO: These tests can get broken permanently if they break one time and the server is not +// restarted, because pulling from the same task queue produces tasks for the previous failed +// workflows. Fix that. + const NAMESPACE: &str = "default"; #[tokio::main] -async fn create_workflow(core: &dyn Core, workflow_id: &str) -> String { +async fn create_workflow(core: &dyn Core, task_q: &str, workflow_id: &str) -> String { core.server_gateway() .unwrap() - .start_workflow(NAMESPACE, TASK_QUEUE, workflow_id, "test-workflow") + .start_workflow(NAMESPACE, task_q, workflow_id, "test-workflow") .await .unwrap() .run_id @@ -26,6 +29,7 @@ async fn create_workflow(core: &dyn Core, workflow_id: &str) -> String { #[test] fn timer_workflow() { + let task_q = "timer_workflow"; let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { Ok(addr) => addr, Err(_) => "http://localhost:7233".to_owned(), @@ -41,9 +45,9 @@ fn timer_workflow() { let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - dbg!(create_workflow(&core, &workflow_id.to_string())); + dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); let timer_id: String = rng.gen::().to_string(); - let task = core.poll_task(TASK_QUEUE).unwrap(); + let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![StartTimerCommandAttributes { timer_id: timer_id.to_string(), @@ -54,7 +58,7 @@ fn timer_workflow() { task.task_token, )) .unwrap(); - let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let task = dbg!(core.poll_task(task_q).unwrap()); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task.task_token, @@ -64,6 +68,7 @@ fn timer_workflow() { #[test] fn parallel_timer_workflow() { + let task_q = "parallel_timer_workflow"; let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { Ok(addr) => addr, Err(_) => "http://localhost:7233".to_owned(), @@ -79,21 +84,21 @@ fn parallel_timer_workflow() { let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - dbg!(create_workflow(&core, &workflow_id.to_string())); + dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); let timer_id = "timer 1".to_string(); let timer_2_id = "timer 2".to_string(); - let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let task = dbg!(core.poll_task(task_q).unwrap()); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ StartTimerCommandAttributes { timer_id: timer_id.clone(), - start_to_fire_timeout: Some(Duration::from_millis(100).into()), + start_to_fire_timeout: Some(Duration::from_millis(50).into()), ..Default::default() } .into(), StartTimerCommandAttributes { timer_id: timer_2_id.clone(), - start_to_fire_timeout: Some(Duration::from_millis(200).into()), + start_to_fire_timeout: Some(Duration::from_millis(100).into()), ..Default::default() } .into(), @@ -103,7 +108,7 @@ fn parallel_timer_workflow() { .unwrap(); // Wait long enough for both timers to complete std::thread::sleep(Duration::from_millis(400)); - let task = core.poll_task(TASK_QUEUE).unwrap(); + let task = core.poll_task(task_q).unwrap(); assert_matches!( task.get_wf_jobs().as_slice(), [ From 71f42fc00d7d23866028e8b07b93cc5a2ad1e071 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 15:23:03 -0800 Subject: [PATCH 14/16] Add shutdown API and flag --- src/lib.rs | 26 ++++++++++++++++++++++++-- src/machines/test_help/mod.rs | 2 ++ 2 files changed, 26 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d0379593e..a95eaf5dc 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ use crate::{ }; use crossbeam::queue::SegQueue; use dashmap::{mapref::entry::Entry, DashMap}; +use std::sync::atomic::{AtomicBool, Ordering}; use std::{convert::TryInto, sync::mpsc::SendError, sync::Arc}; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; @@ -58,6 +59,13 @@ pub trait Core: Send + Sync { /// Returns an instance of ServerGateway. fn server_gateway(&self) -> Result>; + + /// Eventually closes the connection to the server and ceases all polling. [Core::poll_task] + /// should be called until it returns [CoreError::ConnectionClosed] to ensure that any workflows + /// which are still undergoing replay have an opportunity to finish. This means that the lang + /// sdk will need to call [Core::complete_task] for those workflows until they are done. At + /// that point, the connection will be closed and/or the lang SDK can end the process. + fn shutdown(&self) -> Result<()>; } /// Holds various configuration information required to call [init] @@ -85,6 +93,7 @@ pub fn init(opts: CoreInitOptions) -> Result { workflow_machines: Default::default(), workflow_task_tokens: Default::default(), pending_activations: Default::default(), + shutdown_requested: AtomicBool::new(false), }) } @@ -111,6 +120,9 @@ where /// Workflows that are currently under replay will queue their run ID here, indicating that /// there are more workflow tasks / activations to be performed. pending_activations: SegQueue, + + /// Has shutdown been called? + shutdown_requested: AtomicBool, } #[derive(Debug)] @@ -141,6 +153,10 @@ where }); } + if self.shutdown_requested.load(Ordering::SeqCst) { + return Err(CoreError::ShuttingDown); + } + // This will block forever in the event there is no work from the server let work = self .runtime @@ -210,6 +226,11 @@ where fn server_gateway(&self) -> Result> { Ok(self.server_gateway.clone()) } + + fn shutdown(&self) -> Result<(), CoreError> { + self.shutdown_requested.store(true, Ordering::SeqCst); + Ok(()) + } } impl CoreSDK { @@ -295,8 +316,9 @@ impl CoreSDK { #[allow(clippy::large_enum_variant)] // NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum CoreError { - /// No tasks to perform for now - NoWork, + /// [Core::shutdown] was called, and there are no more replay tasks to be handled. You must + /// call [Core::complete_task] for any remaining tasks, and then may exit. + ShuttingDown, /// Poll response from server was malformed: {0:?} BadDataFromWorkProvider(PollWorkflowTaskQueueResponse), /// Lang SDK sent us a malformed completion: {0:?} diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 9c4a73df8..737a2f2e8 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -17,6 +17,7 @@ use crate::{ }; use dashmap::DashMap; use rand::{thread_rng, Rng}; +use std::sync::atomic::AtomicBool; use std::{collections::VecDeque, sync::Arc}; use tokio::runtime::Runtime; @@ -69,5 +70,6 @@ pub(crate) fn build_fake_core( workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), pending_activations: Default::default(), + shutdown_requested: AtomicBool::new(false), } } From 6523a22aa0c605ce4c9a2ac201da74afd0fa0086 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 11 Feb 2021 15:56:01 -0800 Subject: [PATCH 15/16] Add test, update docs. Closing connection is up to lang. --- src/lib.rs | 46 ++++++++++++++++++++++++++--------- src/machines/test_help/mod.rs | 4 ++- 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a95eaf5dc..d3c07242b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,8 +33,14 @@ use crate::{ }; use crossbeam::queue::SegQueue; use dashmap::{mapref::entry::Entry, DashMap}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::{convert::TryInto, sync::mpsc::SendError, sync::Arc}; +use std::{ + convert::TryInto, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::SendError, + Arc, + }, +}; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; use tracing::Level; @@ -60,11 +66,11 @@ pub trait Core: Send + Sync { /// Returns an instance of ServerGateway. fn server_gateway(&self) -> Result>; - /// Eventually closes the connection to the server and ceases all polling. [Core::poll_task] - /// should be called until it returns [CoreError::ConnectionClosed] to ensure that any workflows - /// which are still undergoing replay have an opportunity to finish. This means that the lang - /// sdk will need to call [Core::complete_task] for those workflows until they are done. At - /// that point, the connection will be closed and/or the lang SDK can end the process. + /// Eventually ceases all polling of the server. [Core::poll_task] should be called until it + /// returns [CoreError::ShuttingDown] to ensure that any workflows which are still undergoing + /// replay have an opportunity to finish. This means that the lang sdk will need to call + /// [Core::complete_task] for those workflows until they are done. At that point, the lang + /// SDK can end the process, or drop the [Core] instance, which will close the connection. fn shutdown(&self) -> Result<()>; } @@ -348,6 +354,7 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; + use crate::machines::test_help::FakeCore; use crate::{ machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ @@ -363,6 +370,7 @@ mod test { }, }, }; + use rstest::{fixture, rstest}; #[test] fn timer_test_across_wf_bridge() { @@ -530,11 +538,8 @@ mod test { .unwrap(); } - #[test] - fn single_timer_whole_replay_test_across_wf_bridge() { - let s = span!(Level::DEBUG, "Test start", t = "bridge"); - let _enter = s.enter(); - + #[fixture] + fn single_timer_whole_replay() -> FakeCore { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_1_id = "timer1".to_string(); @@ -589,5 +594,22 @@ mod test { task_tok, )) .unwrap(); + core + } + + #[rstest] + fn single_timer_whole_replay_test_across_wf_bridge(_single_timer_whole_replay: FakeCore) { + // Nothing to do here -- whole real test is in fixture. Rstest properly handles leading `_` + } + + #[rstest] + fn after_shutdown_server_is_not_polled(single_timer_whole_replay: FakeCore) { + single_timer_whole_replay.shutdown().unwrap(); + assert_matches!( + single_timer_whole_replay + .poll_task("irrelevant") + .unwrap_err(), + CoreError::ShuttingDown + ); } } diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 737a2f2e8..4ab974e7d 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -21,6 +21,8 @@ use std::sync::atomic::AtomicBool; use std::{collections::VecDeque, sync::Arc}; use tokio::runtime::Runtime; +pub(crate) type FakeCore = CoreSDK; + /// Given identifiers for a workflow/run, and a test history builder, construct an instance of /// the core SDK with a mock server gateway that will produce the responses as appropriate. /// @@ -32,7 +34,7 @@ pub(crate) fn build_fake_core( run_id: &str, t: &mut TestHistoryBuilder, response_batches: &[usize], -) -> CoreSDK { +) -> FakeCore { let wf = Some(WorkflowExecution { workflow_id: wf_id.to_string(), run_id: run_id.to_string(), From 282751e625d24fc78700da4975696b7bb82628e9 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 23 Feb 2021 22:11:59 -0800 Subject: [PATCH 16/16] Fix merge --- src/lib.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 2856327c8..53f1838ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -36,7 +36,11 @@ use dashmap::DashMap; use std::{ convert::TryInto, fmt::Debug, - sync::{mpsc::SendError, Arc}, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc::SendError, + Arc, + }, }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri;