diff --git a/.buildkite/docker/Dockerfile b/.buildkite/docker/Dockerfile index f32421f6e..4fe468ca8 100644 --- a/.buildkite/docker/Dockerfile +++ b/.buildkite/docker/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:latest +FROM rust:1.50 RUN rustup component add rustfmt && \ rustup component add clippy diff --git a/Cargo.toml b/Cargo.toml index e959ecac0..75dd0de6b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ license-file = "LICENSE.txt" [dependencies] anyhow = "1.0" async-trait = "0.1" +crossbeam = "0.8" dashmap = "4.0" derive_more = "0.99" displaydoc = "0.1" @@ -19,16 +20,18 @@ env_logger = "0.8" futures = "0.3" log = "0.4" opentelemetry-jaeger = "0.10" +opentelemetry = "0.11.2" prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.11" +tracing-opentelemetry = "0.10" tracing-subscriber = "0.2" url = "2.2" rand = "0.8.3" uuid = { version = "0.8.2", features = ["v4"] } + [dependencies.tonic] version = "0.4" #path = "../tonic/tonic" diff --git a/README.md b/README.md index d4f1667b4..5857ed4c2 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,9 @@ Core SDK that can be used as a base for all other Temporal SDKs. # Getting started + +See the [Architecture](ARCHITECTURE.md) doc for some high-level information. + This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a submodule -- when checking out the repo for the first time make sure you've run `git submodule update --init --recursive`. TODO: Makefile. diff --git a/src/lib.rs b/src/lib.rs index 98f1eff74..d0379593e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,14 +26,13 @@ use crate::{ task_completion, wf_activation_completion::Status, Task, TaskCompletion, WfActivationCompletion, WfActivationSuccess, }, - temporal::api::{ - common::v1::WorkflowExecution, workflowservice::v1::PollWorkflowTaskQueueResponse, - }, + temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse, }, - protosext::{HistoryInfo, HistoryInfoError}, - workflow::{WfManagerProtected, WorkflowManager}, + protosext::HistoryInfoError, + workflow::{NextWfActivation, WfManagerProtected, WorkflowManager}, }; -use dashmap::DashMap; +use crossbeam::queue::SegQueue; +use dashmap::{mapref::entry::Entry, DashMap}; use std::{convert::TryInto, sync::mpsc::SendError, sync::Arc}; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; @@ -85,6 +84,7 @@ pub fn init(opts: CoreInitOptions) -> Result { server_gateway: Arc::new(work_provider), workflow_machines: Default::default(), workflow_task_tokens: Default::default(), + pending_activations: Default::default(), }) } @@ -107,6 +107,16 @@ where workflow_machines: DashMap, /// Maps task tokens to workflow run ids workflow_task_tokens: DashMap, String>, + + /// Workflows that are currently under replay will queue their run ID here, indicating that + /// there are more workflow tasks / activations to be performed. + pending_activations: SegQueue, +} + +#[derive(Debug)] +struct PendingActivation { + run_id: String, + task_token: Vec, } impl Core for CoreSDK @@ -115,45 +125,45 @@ where { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { + // We must first check if there are pending workflow tasks for workflows that are currently + // replaying, and issue those tasks before bothering the server. + if let Some(pa) = self.pending_activations.pop() { + event!(Level::DEBUG, msg = "Applying pending activations", ?pa); + let next_activation = + self.access_machine(&pa.run_id, |mgr| mgr.get_next_activation())?; + let task_token = pa.task_token.clone(); + if next_activation.more_activations_needed { + self.pending_activations.push(pa); + } + return Ok(Task { + task_token, + variant: next_activation.activation.map(Into::into), + }); + } + // This will block forever in the event there is no work from the server let work = self .runtime .block_on(self.server_gateway.poll_workflow_task(task_queue))?; - let run_id = match &work.workflow_execution { - Some(we) => { - self.instantiate_workflow_if_needed(we); - we.run_id.clone() - } - None => return Err(CoreError::BadDataFromWorkProvider(work)), - }; - let history = if let Some(hist) = work.history { - hist - } else { - return Err(CoreError::BadDataFromWorkProvider(work)); - }; - + let task_token = work.task_token.clone(); event!( Level::DEBUG, - msg = "Received workflow task", - ?work.task_token + msg = "Received workflow task from server", + ?task_token ); - // Correlate task token w/ run ID - self.workflow_task_tokens - .insert(work.task_token.clone(), run_id.clone()); - - // We pass none since we want to apply all the history we just got. - // Will need to change a bit once we impl caching. - let hist_info = HistoryInfo::new_from_history(&history, None)?; - let activation = self.access_machine(&run_id, |mgr| { - let machines = &mut mgr.machines; - hist_info.apply_history_events(machines)?; - Ok(machines.get_wf_activation()) - })?; + let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + + if next_activation.more_activations_needed { + self.pending_activations.push(PendingActivation { + run_id, + task_token: task_token.clone(), + }); + } Ok(Task { - task_token: work.task_token, - variant: activation.map(Into::into), + task_token, + variant: next_activation.activation.map(Into::into), }) } @@ -203,17 +213,50 @@ where } impl CoreSDK { - fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { - if self - .workflow_machines - .contains_key(&workflow_execution.run_id) + /// Will create a new workflow manager if needed for the workflow task, if not, it will + /// feed the existing manager the updated history we received from the server. + /// + /// Also updates [CoreSDK::workflow_task_tokens] and validates the + /// [PollWorkflowTaskQueueResponse] + /// + /// Returns the next workflow activation and the workflow's run id + fn instantiate_or_update_workflow( + &self, + poll_wf_resp: PollWorkflowTaskQueueResponse, + ) -> Result<(NextWfActivation, String)> { + if let PollWorkflowTaskQueueResponse { + task_token, + workflow_execution: Some(workflow_execution), + .. + } = &poll_wf_resp { - return; + let run_id = workflow_execution.run_id.clone(); + // Correlate task token w/ run ID + self.workflow_task_tokens + .insert(task_token.clone(), run_id.clone()); + + match self.workflow_machines.entry(run_id.clone()) { + Entry::Occupied(mut existing) => { + if let Some(history) = poll_wf_resp.history { + let activation = existing + .get_mut() + .lock()? + .feed_history_from_server(history)?; + Ok((activation, run_id)) + } else { + Err(CoreError::BadDataFromWorkProvider(poll_wf_resp)) + } + } + Entry::Vacant(vacant) => { + let wfm = WorkflowManager::new(poll_wf_resp)?; + let activation = wfm.lock()?.get_next_activation()?; + vacant.insert(wfm); + Ok((activation, run_id)) + } + } + } else { + Err(CoreError::BadDataFromWorkProvider(poll_wf_resp)) } - self.workflow_machines.insert( - workflow_execution.run_id.clone(), - WorkflowManager::new(workflow_execution), - ); } /// Feed commands from the lang sdk into the appropriate workflow bridge @@ -252,8 +295,6 @@ impl CoreSDK { #[allow(clippy::large_enum_variant)] // NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum CoreError { - /// Unknown service error - Unknown, /// No tasks to perform for now NoWork, /// Poll response from server was malformed: {0:?} @@ -263,7 +304,7 @@ pub enum CoreError { /// Error buffering commands CantSendCommands(#[from] SendError>), /// Couldn't interpret command from - UninterprableCommand(#[from] InconvertibleCommandError), + UninterpretableCommand(#[from] InconvertibleCommandError), /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), /// Task token had nothing associated with it: {0:?} @@ -286,31 +327,26 @@ pub enum CoreError { mod test { use super::*; use crate::{ - machines::test_help::TestHistoryBuilder, - pollers::MockServerGateway, + machines::test_help::{build_fake_core, TestHistoryBuilder}, protos::{ - coresdk::{wf_activation_job, WfActivationJob}, + coresdk::{ + wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob, + }, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, enums::v1::EventType, - history::v1::{history_event, History, TimerFiredEventAttributes}, - workflowservice::v1::RespondWorkflowTaskCompletedResponse, + history::v1::{history_event, TimerFiredEventAttributes}, }, }, }; - use std::collections::VecDeque; - use tracing::Level; #[test] - fn workflow_bridge() { - let s = span!(Level::DEBUG, "Test start"); - let _enter = s.enter(); - + fn timer_test_across_wf_bridge() { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; - let timer_id = "fake_timer"; + let timer_id = "fake_timer".to_string(); let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); @@ -321,7 +357,7 @@ mod test { EventType::TimerFired, history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { started_event_id: timer_started_event_id, - timer_id: "timer1".to_string(), + timer_id: timer_id.clone(), }), ); t.add_workflow_task_scheduled_and_started(); @@ -337,48 +373,168 @@ mod test { 8: EVENT_TYPE_WORKFLOW_TASK_STARTED --- */ - let events_first_batch = t.get_history_info(1).unwrap().events; - let wf = Some(WorkflowExecution { - workflow_id: wfid.to_string(), - run_id: run_id.to_string(), - }); - let first_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_first_batch, + let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] + ); + assert!(core.workflow_machines.get(run_id).is_some()); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id, + ..Default::default() + } + .into()], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + }] + ); + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } + + #[test] + fn parallel_timer_test_across_wf_bridge() { + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let timer_1_id = "timer1".to_string(); + let timer_2_id = "timer2".to_string(); + let task_queue = "test-task-queue"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + let timer_2_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_1_id.clone(), }), - workflow_execution: wf.clone(), - ..Default::default() - }; - let events_second_batch = t.get_history_info(2).unwrap().events; - let second_response = PollWorkflowTaskQueueResponse { - history: Some(History { - events: events_second_batch, + ); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_2_started_event_id, + timer_id: timer_2_id.clone(), + }), + ); + t.add_workflow_task_scheduled_and_started(); + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + --- + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_STARTED + 7: EVENT_TYPE_TIMER_FIRED + 8: EVENT_TYPE_TIMER_FIRED + 9: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 10: EVENT_TYPE_WORKFLOW_TASK_STARTED + --- + */ + let core = build_fake_core(wfid, run_id, &mut t, &[1, 2]); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] + ); + assert!(core.workflow_machines.get(run_id).is_some()); + + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ + StartTimerCommandAttributes { + timer_id: timer_1_id.clone(), + ..Default::default() + } + .into(), + StartTimerCommandAttributes { + timer_id: timer_2_id.clone(), + ..Default::default() + } + .into(), + ], + task_tok, + )) + .unwrap(); + + let res = core.poll_task(task_queue).unwrap(); + assert_matches!( + res.get_wf_jobs().as_slice(), + [ + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t1_id } + )), + }, + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t2_id } + )), + } + ] => { + assert_eq!(t1_id, &timer_1_id); + assert_eq!(t2_id, &timer_2_id); + } + ); + let task_tok = res.task_token; + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task_tok, + )) + .unwrap(); + } + + #[test] + fn single_timer_whole_replay_test_across_wf_bridge() { + let s = span!(Level::DEBUG, "Test start", t = "bridge"); + let _enter = s.enter(); + + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let timer_1_id = "timer1".to_string(); + let task_queue = "test-task-queue"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: timer_1_id.clone(), }), - workflow_execution: wf, - ..Default::default() - }; - let responses = vec![first_response, second_response]; - - let mut tasks = VecDeque::from(responses); - let mut mock_gateway = MockServerGateway::new(); - mock_gateway - .expect_poll_workflow_task() - .returning(move |_| Ok(tasks.pop_front().unwrap())); - // Response not really important here - mock_gateway - .expect_complete_workflow_task() - .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); - - let runtime = Runtime::new().unwrap(); - let core = CoreSDK { - runtime, - server_gateway: Arc::new(mock_gateway), - workflow_machines: DashMap::new(), - workflow_task_tokens: DashMap::new(), - }; - - let res = dbg!(core.poll_task(task_queue).unwrap()); - // TODO: uggo + ); + t.add_workflow_task_scheduled_and_started(); + // NOTE! What makes this a replay test is the server only responds with *one* batch here. + // So, server is polled once, but lang->core interactions look just like non-replay test. + let core = build_fake_core(wfid, run_id, &mut t, &[2]); + + let res = core.poll_task(task_queue).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { @@ -389,18 +545,16 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - StartTimerCommandAttributes { - timer_id: timer_id.to_string(), + vec![StartTimerCommandAttributes { + timer_id: timer_1_id, ..Default::default() } - .into(), + .into()], task_tok, )) .unwrap(); - dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task(task_queue).unwrap()); - // TODO: uggo + let res = core.poll_task(task_queue).unwrap(); assert_matches!( res.get_wf_jobs().as_slice(), [WfActivationJob { @@ -409,10 +563,9 @@ mod test { ); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], task_tok, )) .unwrap(); - dbg!("sent workflow done"); } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 0c9b261e2..35473e12c 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -118,22 +118,18 @@ impl TryFrom for WFCommand { type Error = InconvertibleCommandError; fn try_from(c: coresdk::Command) -> Result { - // TODO: Return error without cloning - match c.variant.clone() { - Some(a) => match a { - Variant::Api(Command { - attributes: Some(attrs), - .. - }) => match attrs { - Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), - Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { - Ok(WFCommand::CompleteWorkflow(c)) - } - _ => unimplemented!(), - }, - _ => Err(c.into()), + match c.variant { + Some(Variant::Api(Command { + attributes: Some(attrs), + .. + })) => match attrs { + Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), + Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { + Ok(WFCommand::CompleteWorkflow(c)) + } + _ => unimplemented!(), }, - None => Err(c.into()), + _ => Err(c.into()), } } } diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 2f4dac37f..8a3230790 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -1,4 +1,5 @@ use super::Result; +use crate::protos::temporal::api::history::v1::History; use crate::{ machines::{workflow_machines::WorkflowMachines, ProtoCommand}, protos::temporal::api::{ @@ -87,41 +88,10 @@ impl TestHistoryBuilder { self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into()); } - /// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed by - /// WFTaskCompleted, adding one to the count for every match. It will additionally count - /// a WFTaskStarted at the end of the event list. - /// - /// If `up_to_event_id` is provided, the count will be returned as soon as processing advances - /// past that id. - pub fn get_workflow_task_count(&self, up_to_event_id: Option) -> Result { - let mut last_wf_started_id = 0; - let mut count = 0; - let mut history = self.events.iter().peekable(); - while let Some(event) = history.next() { - let next_event = history.peek(); - if let Some(upto) = up_to_event_id { - if event.event_id > upto { - return Ok(count); - } - } - let next_is_completed = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskCompleted as i32 - }); - if event.event_type == EventType::WorkflowTaskStarted as i32 - && (next_event.is_none() || next_is_completed) - { - last_wf_started_id = event.event_id; - count += 1; - } - - if next_event.is_none() { - if last_wf_started_id != event.event_id { - bail!("Last item in history wasn't WorkflowTaskStarted") - } - return Ok(count); - } + pub fn as_history(&self) -> History { + History { + events: self.events.clone(), } - Ok(count) } /// Handle workflow task(s) using the provided [WorkflowMachines]. Will process as many workflow @@ -148,7 +118,8 @@ impl TestHistoryBuilder { Ok(()) } - /// Iterates over the events in this builder to return a [HistoryInfo] of the n-th workflow task. + /// Iterates over the events in this builder to return a [HistoryInfo] including events up to + /// the provided `to_wf_task_num` pub(crate) fn get_history_info( &self, to_wf_task_num: usize, diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index b7ab7509b..9c4a73df8 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -5,3 +5,69 @@ mod workflow_driver; pub(crate) use history_builder::TestHistoryBuilder; pub(super) use workflow_driver::{CommandSender, TestWFCommand, TestWorkflowDriver}; + +use crate::{ + pollers::MockServerGateway, + protos::temporal::api::common::v1::WorkflowExecution, + protos::temporal::api::history::v1::History, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + }, + CoreSDK, +}; +use dashmap::DashMap; +use rand::{thread_rng, Rng}; +use std::{collections::VecDeque, sync::Arc}; +use tokio::runtime::Runtime; + +/// Given identifiers for a workflow/run, and a test history builder, construct an instance of +/// the core SDK with a mock server gateway that will produce the responses as appropriate. +/// +/// `response_batches` is used to control the fake [PollWorkflowTaskQueueResponse]s returned. +/// For each number in the input list, a fake response will be prepared which includes history +/// up to the workflow task with that number, as in [TestHistoryBuilder::get_history_info]. +pub(crate) fn build_fake_core( + wf_id: &str, + run_id: &str, + t: &mut TestHistoryBuilder, + response_batches: &[usize], +) -> CoreSDK { + let wf = Some(WorkflowExecution { + workflow_id: wf_id.to_string(), + run_id: run_id.to_string(), + }); + + let responses: Vec<_> = response_batches + .iter() + .map(|to_task_num| { + let batch = t.get_history_info(*to_task_num).unwrap().events; + let task_token: [u8; 16] = thread_rng().gen(); + PollWorkflowTaskQueueResponse { + history: Some(History { events: batch }), + workflow_execution: wf.clone(), + task_token: task_token.to_vec(), + ..Default::default() + } + }) + .collect(); + + let mut tasks = VecDeque::from(responses); + let mut mock_gateway = MockServerGateway::new(); + mock_gateway + .expect_poll_workflow_task() + .times(response_batches.len()) + .returning(move |_| Ok(tasks.pop_front().unwrap())); + // Response not really important here + mock_gateway + .expect_complete_workflow_task() + .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); + + let runtime = Runtime::new().unwrap(); + CoreSDK { + runtime, + server_gateway: Arc::new(mock_gateway), + workflow_machines: DashMap::new(), + workflow_task_tokens: DashMap::new(), + pending_activations: Default::default(), + } +} diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 56d9843ef..670408595 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -72,12 +72,11 @@ where F: Fn(CommandSender) -> Fut + Send + Sync, Fut: Future, { - #[instrument(skip(self))] fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) -> Vec { + event!(Level::DEBUG, msg = "Test WF driver start called"); vec![] } - #[instrument(skip(self))] fn fetch_workflow_iteration_output(&mut self) -> Vec { let (sender, receiver) = CommandSender::new(self.cache.clone()); // Call the closure that produces the workflow future @@ -88,7 +87,6 @@ where rt.block_on(wf_future); let cmds = receiver.into_iter(); - event!(Level::DEBUG, msg = "Test wf driver emitting", ?cmds); let mut last_cmd = None; for cmd in cmds { @@ -101,6 +99,8 @@ where } } + event!(Level::DEBUG, msg = "Test wf driver emitting", ?last_cmd); + // Return only the last command, since that's what would've been yielded in a real wf if let Some(c) = last_cmd { vec![c] diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 79c932a0b..e77b1d7c3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -305,7 +305,7 @@ mod test { }), ); t.add_workflow_task_scheduled_and_started(); - assert_eq!(2, t.get_workflow_task_count(None).unwrap()); + assert_eq!(2, t.as_history().get_workflow_task_count(None).unwrap()); (t, state_machines) } @@ -343,7 +343,6 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(2)) .unwrap(); - dbg!(&commands); assert_eq!(commands.len(), 1); assert_eq!( commands[0].command_type, diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index a6a87548d..0a34c6332 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -43,9 +43,9 @@ pub(crate) struct WorkflowMachines { /// True if the workflow is replaying from history replaying: bool, /// Workflow identifier - workflow_id: String, + pub workflow_id: String, /// Identifies the current run and is used as a seed for faux-randomness. - run_id: String, + pub run_id: String, /// The current workflow time if it has been established current_wf_time: Option, @@ -185,11 +185,7 @@ impl WorkflowMachines { /// Called when we want to run the event loop because a workflow task started event has /// triggered - pub(super) fn task_started( - &mut self, - task_started_event_id: i64, - time: SystemTime, - ) -> Result<()> { + pub(super) fn task_started(&mut self, task_started_event_id: i64, time: SystemTime) { let s = span!(Level::DEBUG, "Task started trigger"); let _enter = s.enter(); @@ -215,7 +211,6 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); self.event_loop(); - Ok(()) } /// A command event is an event which is generated from a command emitted by a past decision. @@ -413,7 +408,7 @@ impl WorkflowMachines { task_started_event_id, time, } => { - self.task_started(task_started_event_id, time)?; + self.task_started(task_started_event_id, time); } } } diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index a03ff8281..f0a46b5e6 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -60,6 +60,7 @@ impl ServerGatewayOptions { /// This function will get called on each outbound request. Returning a /// `Status` here will cancel the request and have that status returned to /// the caller. +#[allow(clippy::unnecessary_wraps)] // Clippy lies because we need to pass to `with_interceptor` fn intercept(mut req: Request<()>) -> Result, Status> { let metadata = req.metadata_mut(); // TODO: Only apply this to long poll requests diff --git a/src/protos/mod.rs b/src/protos/mod.rs index c7adee3c3..b0c223057 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -25,7 +25,6 @@ pub mod coresdk { } /// Returns any contained jobs if this task was a wf activation and it had some - #[cfg(test)] pub fn get_wf_jobs(&self) -> Vec { if let Some(task::Variant::Workflow(a)) = &self.variant { a.jobs.clone() @@ -59,11 +58,11 @@ pub mod coresdk { impl TaskCompletion { /// Build a successful completion from some api command attributes and a task token pub fn ok_from_api_attrs( - cmd: api_command::command::Attributes, + cmds: Vec, task_token: Vec, ) -> Self { - let cmd: ApiCommand = cmd.into(); - let success: WfActivationSuccess = vec![cmd].into(); + let cmds: Vec = cmds.into_iter().map(Into::into).collect(); + let success: WfActivationSuccess = cmds.into(); TaskCompletion { task_token, variant: Some(task_completion::Variant::Workflow(WfActivationCompletion { @@ -128,11 +127,54 @@ pub mod temporal { use crate::protos::temporal::api::{ enums::v1::EventType, history::v1::history_event::Attributes, }; + use crate::protosext::HistoryInfoError; use prost::alloc::fmt::Formatter; use std::fmt::Display; include!("temporal.api.history.v1.rs"); + impl History { + /// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed + /// by WFTaskCompleted, adding one to the count for every match. It will + /// additionally count a WFTaskStarted at the end of the event list. + /// + /// If `up_to_event_id` is provided, the count will be returned as soon as + /// processing advances past that id. + pub fn get_workflow_task_count( + &self, + up_to_event_id: Option, + ) -> Result { + let mut last_wf_started_id = 0; + let mut count = 0; + let mut history = self.events.iter().peekable(); + while let Some(event) = history.next() { + let next_event = history.peek(); + if let Some(upto) = up_to_event_id { + if event.event_id > upto { + return Ok(count); + } + } + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + if event.event_type == EventType::WorkflowTaskStarted as i32 + && (next_event.is_none() || next_is_completed) + { + last_wf_started_id = event.event_id; + count += 1; + } + + if next_event.is_none() { + if last_wf_started_id != event.event_id { + return Err(HistoryInfoError::HistoryEndsUnexpectedly); + } + return Ok(count); + } + } + Ok(count) + } + } + impl HistoryEvent { /// Returns true if this is an event created to mirror a command pub fn is_command_event(&self) -> bool { diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 08c894f3b..8b03c5588 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -4,19 +4,24 @@ pub(crate) use bridge::WorkflowBridge; use crate::{ machines::{ProtoCommand, WFCommand, WorkflowMachines}, - protos::temporal::api::workflowservice::v1::StartWorkflowExecutionResponse, - protos::temporal::api::{ - common::v1::WorkflowExecution, - workflowservice::v1::{ - PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + protos::{ + coresdk::WfActivation, + temporal::api::{ + history::v1::History, + workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + StartWorkflowExecutionResponse, + }, }, }, + protosext::HistoryInfo, CoreError, Result, }; use std::{ ops::DerefMut, sync::{mpsc::Sender, Arc, Mutex}, }; +use tracing::Level; /// Implementors can provide new workflow tasks to the SDK. The connection to the server is the real /// implementor. @@ -56,7 +61,8 @@ pub trait StartWorkflowExecutionApi { ) -> Result; } -/// Manages concurrent access to an instance of a [WorkflowMachines], which is not thread-safe. +/// Manages concurrent access to an instance of a [WorkflowMachines], which is not thread-safe, +/// as well as other data associated with that specific workflow run. pub(crate) struct WorkflowManager { data: Arc>, } @@ -64,20 +70,41 @@ pub(crate) struct WorkflowManager { pub(crate) struct WfManagerProtected { pub machines: WorkflowMachines, pub command_sink: Sender>, + /// The last recorded history we received from the server for this workflow run. This must be + /// kept because the lang side polls & completes for every workflow task, but we do not need + /// to poll the server that often during replay. + pub last_history_from_server: History, + pub last_history_task_count: usize, + /// The current workflow task number this run is on. Starts at one and monotonically increases. + pub current_wf_task_num: usize, } impl WorkflowManager { - pub fn new(we: &WorkflowExecution) -> Self { + /// Create a new workflow manager from a server workflow task queue response. + pub fn new(poll_resp: PollWorkflowTaskQueueResponse) -> Result { + let (history, we) = if let PollWorkflowTaskQueueResponse { + workflow_execution: Some(we), + history: Some(hist), + .. + } = poll_resp + { + (hist, we) + } else { + return Err(CoreError::BadDataFromWorkProvider(poll_resp.clone())); + }; + let (wfb, cmd_sink) = WorkflowBridge::new(); - let state_machines = - WorkflowMachines::new(we.workflow_id.clone(), we.run_id.clone(), Box::new(wfb)); + let state_machines = WorkflowMachines::new(we.workflow_id, we.run_id, Box::new(wfb)); let protected = WfManagerProtected { machines: state_machines, command_sink: cmd_sink, + last_history_task_count: history.get_workflow_task_count(None)?, + last_history_from_server: history, + current_wf_task_num: 1, }; - Self { + Ok(Self { data: Arc::new(Mutex::new(protected)), - } + }) } pub fn lock(&self) -> Result + '_> { @@ -91,15 +118,64 @@ impl WorkflowManager { } } +pub(crate) struct NextWfActivation { + pub activation: Option, + pub more_activations_needed: bool, +} + +impl WfManagerProtected { + /// Given history that was just obtained from the server, pipe it into this workflow's machines. + /// + /// Should only be called when a workflow has caught up on replay. It will return a workflow + /// activation if one is needed, as well as a bool indicating if there are more workflow tasks + /// that need to be performed to replay the remaining history. + #[instrument(skip(self))] + pub fn feed_history_from_server(&mut self, hist: History) -> Result { + let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?; + let task_ct = hist.get_workflow_task_count(None)?; + self.last_history_task_count = task_ct; + self.last_history_from_server = hist; + task_hist.apply_history_events(&mut self.machines)?; + let activation = self.machines.get_wf_activation(); + let more_activations_needed = task_ct > self.current_wf_task_num; + + if more_activations_needed { + event!(Level::DEBUG, msg = "More activations needed"); + } + + self.current_wf_task_num += 1; + + Ok(NextWfActivation { + activation, + more_activations_needed, + }) + } + + pub fn get_next_activation(&mut self) -> Result { + let hist = &self.last_history_from_server; + let task_hist = HistoryInfo::new_from_history(hist, Some(self.current_wf_task_num))?; + task_hist.apply_history_events(&mut self.machines)?; + let activation = self.machines.get_wf_activation(); + + self.current_wf_task_num += 1; + let more_activations_needed = self.current_wf_task_num <= self.last_history_task_count; + + Ok(NextWfActivation { + activation, + more_activations_needed, + }) + } +} + #[cfg(test)] mod tests { use super::*; - // Enforce thread-safeness of wf manager fn enforcer(_: W) {} + // Enforce thread-safeness of wf manager #[test] fn is_threadsafe() { - enforcer(WorkflowManager::new(&Default::default())); + enforcer(WorkflowManager::new(Default::default())); } } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index aac262e34..8b1378917 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,68 +1 @@ -use rand::{self, Rng}; -use std::{convert::TryFrom, env, time::Duration}; -use temporal_sdk_core::{ - protos::{ - coresdk::TaskCompletion, - temporal::api::command::v1::{ - CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, - }, - }, - Core, CoreInitOptions, ServerGatewayOptions, Url, -}; -const TASK_QUEUE: &str = "test-tq"; -const NAMESPACE: &str = "default"; - -#[tokio::main] -async fn create_workflow(core: &dyn Core, workflow_id: &str) -> String { - core.server_gateway() - .unwrap() - .start_workflow(NAMESPACE, TASK_QUEUE, workflow_id, "test-workflow") - .await - .unwrap() - .run_id -} - -#[test] -fn timer_workflow() { - let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { - Ok(addr) => addr, - Err(_) => "http://localhost:7233".to_owned(), - }; - let url = Url::try_from(&*temporal_server_address).unwrap(); - let gateway_opts = ServerGatewayOptions { - namespace: NAMESPACE.to_string(), - identity: "none".to_string(), - worker_binary_id: "".to_string(), - long_poll_timeout: Duration::from_secs(60), - target_url: url, - }; - let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); - let mut rng = rand::thread_rng(); - let workflow_id: u32 = rng.gen(); - let run_id = dbg!(create_workflow(&core, &workflow_id.to_string())); - let timer_id: String = rng.gen::().to_string(); - let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); - core.complete_task(TaskCompletion::ok_from_api_attrs( - StartTimerCommandAttributes { - timer_id: timer_id.to_string(), - start_to_fire_timeout: Some(Duration::from_secs(1).into()), - ..Default::default() - } - .into(), - task.task_token, - )) - .unwrap(); - dbg!("sent completion w/ start timer"); - let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); - core.complete_task(TaskCompletion::ok_from_api_attrs( - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), - task.task_token, - )) - .unwrap(); - dbg!( - "sent workflow done, completed workflow", - workflow_id, - run_id - ); -} diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs new file mode 100644 index 000000000..7218fb6c5 --- /dev/null +++ b/tests/integ_tests/simple_wf_tests.rs @@ -0,0 +1,135 @@ +use assert_matches::assert_matches; +use rand::{self, Rng}; +use std::{convert::TryFrom, env, time::Duration}; +use temporal_sdk_core::{ + protos::{ + coresdk::{wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob}, + temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + }, + Core, CoreInitOptions, ServerGatewayOptions, Url, +}; + +// TODO: These tests can get broken permanently if they break one time and the server is not +// restarted, because pulling from the same task queue produces tasks for the previous failed +// workflows. Fix that. + +const NAMESPACE: &str = "default"; + +#[tokio::main] +async fn create_workflow(core: &dyn Core, task_q: &str, workflow_id: &str) -> String { + core.server_gateway() + .unwrap() + .start_workflow(NAMESPACE, task_q, workflow_id, "test-workflow") + .await + .unwrap() + .run_id +} + +#[test] +fn timer_workflow() { + let task_q = "timer_workflow"; + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let url = Url::try_from(&*temporal_server_address).unwrap(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + }; + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); + let timer_id: String = rng.gen::().to_string(); + let task = core.poll_task(task_q).unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + let task = dbg!(core.poll_task(task_q).unwrap()); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task.task_token, + )) + .unwrap(); +} + +#[test] +fn parallel_timer_workflow() { + let task_q = "parallel_timer_workflow"; + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let url = Url::try_from(&*temporal_server_address).unwrap(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + }; + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let mut rng = rand::thread_rng(); + let workflow_id: u32 = rng.gen(); + dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); + let timer_id = "timer 1".to_string(); + let timer_2_id = "timer 2".to_string(); + let task = dbg!(core.poll_task(task_q).unwrap()); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![ + StartTimerCommandAttributes { + timer_id: timer_id.clone(), + start_to_fire_timeout: Some(Duration::from_millis(50).into()), + ..Default::default() + } + .into(), + StartTimerCommandAttributes { + timer_id: timer_2_id.clone(), + start_to_fire_timeout: Some(Duration::from_millis(100).into()), + ..Default::default() + } + .into(), + ], + task.task_token, + )) + .unwrap(); + // Wait long enough for both timers to complete + std::thread::sleep(Duration::from_millis(1000)); + let task = core.poll_task(task_q).unwrap(); + assert_matches!( + task.get_wf_jobs().as_slice(), + [ + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t1_id } + )), + }, + WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired( + TimerFiredTaskAttributes { timer_id: t2_id } + )), + } + ] => { + assert_eq!(t1_id, &timer_id); + assert_eq!(t2_id, &timer_2_id); + } + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task.task_token, + )) + .unwrap(); +} diff --git a/tests/main.rs b/tests/main.rs index 976cf28e4..887c013d1 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,4 +1,4 @@ #[cfg(test)] mod integ_tests { - mod poller_test; + mod simple_wf_tests; }