diff --git a/Cargo.toml b/Cargo.toml index f3bcacd8f..c6245c47e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "sdk-core" +name = "temporal-sdk-core" version = "0.1.0" authors = ["Spencer Judge ", "Vitaly Arbuzov "] edition = "2018" @@ -10,6 +10,7 @@ edition = "2018" [dependencies] anyhow = "1.0" async-trait = "0.1" +dashmap = "4.0" derive_more = "0.99" env_logger = "0.8" futures = "0.3" @@ -28,6 +29,8 @@ tracing-subscriber = "0.2" path = "fsm" [dev-dependencies] +assert_matches = "1.4" +mockall = "0.9" rstest = "0.6" [build-dependencies] diff --git a/build.rs b/build.rs index 282ce147a..cb6500556 100644 --- a/build.rs +++ b/build.rs @@ -14,6 +14,16 @@ fn main() -> Result<(), Box> { "temporal.api.command.v1.Command.attributes", "#[derive(::derive_more::From)]", ) + .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") + .type_attribute( + "coresdk.WFActivation.attributes", + "#[derive(::derive_more::From)]", + ) + .type_attribute( + "coresdk.WorkflowTask.attributes", + "#[derive(::derive_more::From)]", + ) + .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") .compile( &[ "protos/local/core_interface.proto", diff --git a/fsm/state_machine_procmacro/tests/progress.rs b/fsm/state_machine_procmacro/tests/progress.rs index 5a3e4a4fc..0621c0d7d 100644 --- a/fsm/state_machine_procmacro/tests/progress.rs +++ b/fsm/state_machine_procmacro/tests/progress.rs @@ -1,49 +1,8 @@ extern crate state_machine_trait as rustfsm; -use state_machine_trait::TransitionResult; -use std::convert::Infallible; - #[test] fn tests() { let t = trybuild::TestCases::new(); t.pass("tests/trybuild/*_pass.rs"); t.compile_fail("tests/trybuild/*_fail.rs"); } - -// Kept here to inspect manual expansion -state_machine_procmacro::fsm! { - name SimpleMachine; command SimpleMachineCommand; error Infallible; - - One --(A(String), foo)--> Two; - One --(B)--> Two; - Two --(B)--> One; - Two --(C, baz)--> One -} - -#[derive(Default, Clone)] -pub struct One {} -impl One { - fn foo(self, _: String) -> SimpleMachineTransition { - TransitionResult::default::() - } -} -impl From for One { - fn from(_: Two) -> Self { - One {} - } -} - -#[derive(Default, Clone)] -pub struct Two {} -impl Two { - fn baz(self) -> SimpleMachineTransition { - TransitionResult::default::() - } -} -impl From for Two { - fn from(_: One) -> Self { - Two {} - } -} - -pub enum SimpleMachineCommand {} diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index eca909199..0019c2696 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -6,91 +6,113 @@ package coresdk; // the include paths. You can make it work by going to the "Protobuf Support" settings section // and adding the "api_upstream" subdir as an include path. +import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; +import "dependencies/gogoproto/gogo.proto"; import "temporal/api/workflowservice/v1/request_response.proto"; import "temporal/api/taskqueue/v1/message.proto"; import "temporal/api/enums/v1/failed_cause.proto"; import "temporal/api/failure/v1/message.proto"; import "temporal/api/common/v1/message.proto"; +import "temporal/api/command/v1/message.proto"; -// TODO: SDK prefix in front of everything is maybe pointless given it's all within this package - -service CoreSDKService { - rpc PollSDKTask (PollSDKTaskReq) returns (PollSDKTaskResp) {} - rpc CompleteSDKTask (CompleteSDKTaskReq) returns (CompleteSDKTaskResp) {} +service Core { + rpc PollTask (PollTaskReq) returns (Task) {} + rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {} } -message PollSDKTaskReq { - // Maybe? Not sure it makes sense to support multiple workers in the same core sdk instance - repeated temporal.api.taskqueue.v1.TaskQueue task_queues = 1; +message PollTaskReq { + bool workflows = 1; + bool activities = 2; } -message PollSDKTaskResp { +message Task { bytes task_token = 1; - oneof task { - SDKWFTask wf_task = 2; - SDKActivityTask activity_task = 3; + oneof variant { + WFActivation workflow = 2; + ActivityTask activity = 3; } } -message SDKWFTask { - // Original task from temporal service - temporal.api.workflowservice.v1.PollWorkflowTaskQueueResponse original = 1; +message StartWorkflowTaskAttributes { + string namespace = 1; + string workflow_id = 3; + string name = 4; + temporal.api.common.v1.Payloads arguments = 5; + // will be others - workflow exe started attrs, etc +} + +// maybe we just go back to timer fired to keep consistent +message UnblockTimerTaskAttributes { + string timer_id = 1; } -message SDKActivityTask { +message WFActivation { + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; + string run_id = 2; + oneof attributes { + // could literally be attributes from events -- maybe we don't need our own types + StartWorkflowTaskAttributes start_workflow = 3; + UnblockTimerTaskAttributes unblock_timer = 4; + } +} + +message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; } -message CompleteSDKTaskReq { +message CompleteTaskReq { bytes task_token = 1; oneof completion { - SDKWFTaskCompletion workflow = 2; - SDKActivityTaskCompletion activity = 3; + WFActivationCompletion workflow = 2; + ActivityTaskCompletion activity = 3; } } -message CompleteSDKTaskResp {} - -message SDKWFTaskCompletion { +message WFActivationCompletion { oneof status { - SDKWFTaskSuccess successful = 1; - SDKWFTaskFailure failed = 2; + WFActivationSuccess successful = 1; + WFActivationFailure failed = 2; } } -message SDKActivityTaskCompletion { +message ActivityTaskCompletion { oneof status { - SDKActivityTaskSuccess successful = 1; - SDKActivityTaskFailure failed = 2; + ActivityTaskSuccess successful = 1; + ActivityTaskFailure failed = 2; } } -message SDKWFTaskSuccess { - repeated SDKWFCommand commands = 1; +message CoreCommand { + // Reserved for specific commands +} + +message Command { + oneof variant { + temporal.api.command.v1.Command api = 1; + CoreCommand core = 2; + } +} + +message WFActivationSuccess { + repeated Command commands = 1; // Other bits from RespondWorkflowTaskCompletedRequest as needed } -message SDKWFTaskFailure { +message WFActivationFailure { temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; temporal.api.failure.v1.Failure failure = 2; // Other bits from RespondWorkflowTaskFailedRequest as needed } -message SDKActivityTaskSuccess { +message ActivityTaskSuccess { temporal.api.common.v1.Payloads result = 1; // Other bits from RespondActivityTaskCompletedRequest as needed } -message SDKActivityTaskFailure { + +message ActivityTaskFailure { temporal.api.failure.v1.Failure failure = 1; // Other bits from RespondActivityTaskFailedRequest as needed } - -message SDKWFCommand { - oneof command { - // Commands go here. Should we reuse and add on top of the originals? - // https://github.com/temporalio/api/blob/master/temporal/api/command/v1/message.proto#L174 - bool nothing = 1; - } -} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 4267ccf6e..1c3519274 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,28 +1,393 @@ #[macro_use] extern crate tracing; +#[cfg(test)] +#[macro_use] +extern crate assert_matches; mod machines; mod pollers; pub mod protos; +mod protosext; + +pub use protosext::HistoryInfo; + +use crate::{ + machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, + protos::{ + coresdk::{ + complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, + WfActivationCompletion, WfActivationSuccess, + }, + temporal::api::{ + common::v1::WorkflowExecution, + history::v1::{ + WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, + WorkflowExecutionStartedEventAttributes, + }, + workflowservice::v1::PollWorkflowTaskQueueResponse, + }, + }, + protosext::HistoryInfoError, +}; +use anyhow::Error; +use dashmap::DashMap; +use std::{ + convert::TryInto, + sync::mpsc::{self, Receiver, SendError, Sender}, +}; + +pub type Result = std::result::Result; -use protos::coresdk::{CompleteSdkTaskReq, CompleteSdkTaskResp, PollSdkTaskReq, PollSdkTaskResp}; +// TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have +// any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can +// be accessed from many threads, but each workflow is pinned to one thread (possibly with many +// sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should +// be both, ideally. +pub trait Core { + fn poll_task(&self) -> Result; + fn complete_task(&self, req: CompleteTaskReq) -> Result<()>; +} -type Result = std::result::Result; +pub struct CoreInitOptions { + _queue_name: String, +} -// TODO: Should probably enforce Send + Sync -#[async_trait::async_trait] -pub trait CoreSDKService { - async fn poll_sdk_task(&self, req: PollSdkTaskReq) -> Result; - async fn complete_sdk_task(&self, req: CompleteSdkTaskReq) -> Result; +pub fn init(_opts: CoreInitOptions) -> Result> { + Err(CoreError::Unknown {}) +} + +pub struct CoreSDK { + /// Provides work in the form of responses the server would send from polling task Qs + work_provider: WP, + /// Key is run id + workflow_machines: DashMap>)>, + /// Maps task tokens to workflow run ids + workflow_task_tokens: DashMap, String>, +} + +impl Core for CoreSDK +where + WP: WorkProvider, +{ + #[instrument(skip(self))] + fn poll_task(&self) -> Result { + // This will block forever in the event there is no work from the server + let work = self.work_provider.get_work("TODO: Real task queue")?; + let run_id = match &work.workflow_execution { + Some(WorkflowExecution { run_id, .. }) => { + self.instantiate_workflow_if_needed(run_id.to_string()); + run_id + } + // TODO: Appropriate error + None => return Err(CoreError::Unknown), + }; + let history = if let Some(hist) = work.history { + hist + } else { + return Err(CoreError::BadDataFromWorkProvider(work)); + }; + + // Correlate task token w/ run ID + self.workflow_task_tokens + .insert(work.task_token.clone(), run_id.clone()); + + // We pass none since we want to apply all the history we just got. + // Will need to change a bit once we impl caching. + let hist_info = HistoryInfo::new_from_history(&history, None)?; + let activation = if let Some(mut machines) = self.workflow_machines.get_mut(run_id) { + hist_info.apply_history_events(&mut machines.value_mut().0)?; + machines.0.get_wf_activation() + } else { + //err + unimplemented!() + }; + + Ok(Task { + task_token: work.task_token, + variant: activation.map(Into::into), + }) + } + + #[instrument(skip(self))] + fn complete_task(&self, req: CompleteTaskReq) -> Result<(), CoreError> { + match req { + CompleteTaskReq { + task_token, + completion: + Some(Completion::Workflow(WfActivationCompletion { + status: Some(wfstatus), + })), + } => { + let wf_run_id = self + .workflow_task_tokens + .get(&task_token) + .map(|x| x.value().clone()) + .ok_or(CoreError::NothingFoundForTaskToken(task_token))?; + match wfstatus { + Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?, + Status::Failed(_) => {} + } + Ok(()) + } + CompleteTaskReq { + completion: Some(Completion::Activity(_)), + .. + } => { + unimplemented!() + } + _ => Err(CoreError::MalformedCompletion(req)), + } + // TODO: Get fsm commands and send them to server + } +} + +impl CoreSDK +where + WP: WorkProvider, +{ + fn instantiate_workflow_if_needed(&self, run_id: String) { + if self.workflow_machines.contains_key(&run_id) { + return; + } + let (wfb, cmd_sink) = WorkflowBridge::new(); + let state_machines = WorkflowMachines::new(Box::new(wfb)); + self.workflow_machines + .insert(run_id, (state_machines, cmd_sink)); + } + + /// Feed commands from the lang sdk into the appropriate workflow bridge + fn push_lang_commands( + &self, + run_id: &str, + success: WfActivationSuccess, + ) -> Result<(), CoreError> { + // Convert to wf commands + let cmds = success + .commands + .into_iter() + .map(|c| c.try_into().map_err(Into::into)) + .collect::>>()?; + self.workflow_machines + .get_mut(run_id) + .unwrap() + .1 + .send(cmds)?; + Ok(()) + } +} + +/// Implementors can provide new work to the SDK. The connection to the server is the real +/// implementor. +#[cfg_attr(test, mockall::automock)] +pub trait WorkProvider { + /// Fetch new work. Should block indefinitely if there is no work. + fn get_work(&self, task_queue: &str) -> Result; +} + +/// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] +/// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering +/// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService] +#[derive(Debug)] +pub struct WorkflowBridge { + // does wf id belong in here? + started_attrs: Option, + incoming_commands: Receiver>, +} + +impl WorkflowBridge { + /// Create a new bridge, returning it and the sink used to send commands to it. + pub(crate) fn new() -> (Self, Sender>) { + let (tx, rx) = mpsc::channel(); + ( + Self { + started_attrs: None, + incoming_commands: rx, + }, + tx, + ) + } +} + +impl DrivenWorkflow for WorkflowBridge { + #[instrument] + fn start( + &mut self, + attribs: WorkflowExecutionStartedEventAttributes, + ) -> Result, Error> { + self.started_attrs = Some(attribs); + // TODO: Need to actually tell the workflow to... start, that's what outgoing was for lol + Ok(vec![]) + } + + #[instrument] + fn iterate_wf(&mut self) -> Result, Error> { + Ok(self + .incoming_commands + .try_recv() + .unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang])) + } + + fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) -> Result<(), Error> { + unimplemented!() + } + + fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) -> Result<(), Error> { + unimplemented!() + } } #[derive(thiserror::Error, Debug)] -pub enum SDKServiceError { - // tbd +#[allow(clippy::large_enum_variant)] +pub enum CoreError { + #[error("Unknown service error")] + Unknown, + #[error("No tasks to perform for now")] + NoWork, + #[error("Poll response from server was malformed: {0:?}")] + BadDataFromWorkProvider(PollWorkflowTaskQueueResponse), + #[error("Lang SDK sent us a malformed completion: {0:?}")] + MalformedCompletion(CompleteTaskReq), + #[error("Error buffering commands")] + CantSendCommands(#[from] SendError>), + #[error("Couldn't interpret command from ")] + UninterprableCommand(#[from] InconvertibleCommandError), + #[error("Underlying error in history processing")] + UnderlyingHistError(#[from] HistoryInfoError), + #[error("Task token had nothing associated with it: {0:?}")] + NothingFoundForTaskToken(Vec), } #[cfg(test)] mod test { + use super::*; + use crate::{ + machines::test_help::TestHistoryBuilder, + protos::{ + coresdk::{task, wf_activation, WfActivation}, + temporal::api::{ + command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + enums::v1::EventType, + history::v1::{history_event, History, TimerFiredEventAttributes}, + }, + }, + }; + use std::collections::VecDeque; + use tracing::Level; + #[test] - fn foo() {} + fn workflow_bridge() { + let s = span!(Level::DEBUG, "Test start"); + let _enter = s.enter(); + + let wfid = "fake_wf_id"; + let run_id = "fake_run_id"; + let timer_id = "fake_timer"; + + let mut t = TestHistoryBuilder::default(); + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: "timer1".to_string(), + ..Default::default() + }), + ); + t.add_workflow_task_scheduled_and_started(); + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + --- + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_FIRED + 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 8: EVENT_TYPE_WORKFLOW_TASK_STARTED + --- + */ + let events_first_batch = t.get_history_info(1).unwrap().events; + let wf = Some(WorkflowExecution { + workflow_id: wfid.to_string(), + run_id: run_id.to_string(), + }); + let first_response = PollWorkflowTaskQueueResponse { + history: Some(History { + events: events_first_batch, + }), + workflow_execution: wf.clone(), + ..Default::default() + }; + let events_second_batch = t.get_history_info(2).unwrap().events; + let second_response = PollWorkflowTaskQueueResponse { + history: Some(History { + events: events_second_batch, + }), + workflow_execution: wf.clone(), + ..Default::default() + }; + let responses = vec![first_response, second_response]; + + let mut tasks = VecDeque::from(responses); + let mut mock_provider = MockWorkProvider::new(); + mock_provider + .expect_get_work() + .returning(move |_| Ok(tasks.pop_front().unwrap())); + + let core = CoreSDK { + work_provider: mock_provider, + workflow_machines: DashMap::new(), + workflow_task_tokens: DashMap::new(), + }; + + let res = dbg!(core.poll_task().unwrap()); + // TODO: uggo + assert_matches!( + res, + Task { + variant: Some(task::Variant::Workflow(WfActivation { + attributes: Some(wf_activation::Attributes::StartWorkflow(_)), + .. + })), + .. + } + ); + assert!(core.workflow_machines.get(run_id).is_some()); + + let task_tok = res.task_token; + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + task_tok, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + + let res = dbg!(core.poll_task().unwrap()); + // TODO: uggo + assert_matches!( + res, + Task { + variant: Some(task::Variant::Workflow(WfActivation { + attributes: Some(wf_activation::Attributes::UnblockTimer(_)), + .. + })), + .. + } + ); + let task_tok = res.task_token; + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task_tok, + )) + .unwrap(); + dbg!("sent workflow done"); + } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index bd2a2f6fd..1022a1761 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -33,19 +33,21 @@ mod version_state_machine; mod workflow_task_state_machine; #[cfg(test)] -mod test_help; - -use crate::{ - machines::workflow_machines::{WFMachinesError, WorkflowMachines}, - protos::temporal::api::{ - command::v1::{ - Command, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, - }, - enums::v1::CommandType, - history::v1::{ - HistoryEvent, WorkflowExecutionCanceledEventAttributes, - WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, - }, +pub(crate) mod test_help; + +pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; + +use crate::protos::coresdk; +use crate::protos::coresdk::command::Variant; +use crate::protos::temporal::api::command::v1::command::Attributes; +use crate::protos::temporal::api::{ + command::v1::{ + Command, CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + enums::v1::CommandType, + history::v1::{ + HistoryEvent, WorkflowExecutionCanceledEventAttributes, + WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, }, }; use prost::alloc::fmt::Formatter; @@ -60,30 +62,30 @@ use std::{ use tracing::Level; // TODO: May need to be our SDKWFCommand type -type MachineCommand = Command; +pub(crate) type MachineCommand = Command; /// Implementors of this trait represent something that can (eventually) call into a workflow to /// drive it, start it, signal it, cancel it, etc. -trait DrivenWorkflow { +pub(crate) trait DrivenWorkflow: Send { /// Start the workflow fn start( - &self, + &mut self, attribs: WorkflowExecutionStartedEventAttributes, ) -> Result, anyhow::Error>; /// Iterate the workflow. The workflow driver should execute workflow code until there is /// nothing left to do. EX: Awaiting an activity/timer, workflow completion. - fn iterate_wf(&self) -> Result, anyhow::Error>; + fn iterate_wf(&mut self) -> Result, anyhow::Error>; /// Signal the workflow fn signal( - &self, + &mut self, attribs: WorkflowExecutionSignaledEventAttributes, ) -> Result<(), anyhow::Error>; /// Cancel the workflow fn cancel( - &self, + &mut self, attribs: WorkflowExecutionCanceledEventAttributes, ) -> Result<(), anyhow::Error>; } @@ -99,10 +101,42 @@ pub(crate) struct AddCommand { /// EX: Create a new timer, complete the workflow, etc. #[derive(Debug, derive_more::From)] pub enum WFCommand { + /// Returned when we need to wait for the lang sdk to send us something + NoCommandsFromLang, AddTimer(StartTimerCommandAttributes, Arc), CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), } +#[derive(thiserror::Error, Debug, derive_more::From)] +#[error("Couldn't convert command")] +pub struct InconvertibleCommandError(pub coresdk::Command); + +impl TryFrom for WFCommand { + type Error = InconvertibleCommandError; + + fn try_from(c: coresdk::Command) -> Result { + // TODO: Return error without cloning + match c.variant.clone() { + Some(a) => match a { + Variant::Api(Command { + attributes: Some(attrs), + .. + }) => match attrs { + Attributes::StartTimerCommandAttributes(s) => { + Ok(WFCommand::AddTimer(s, Arc::new(AtomicBool::new(false)))) + } + Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { + Ok(WFCommand::CompleteWorkflow(c)) + } + _ => unimplemented!(), + }, + _ => Err(c.into()), + }, + None => Err(c.into()), + } + } +} + /// Extends [rustfsm::StateMachine] with some functionality specific to the temporal SDK. /// /// Formerly known as `EntityStateMachine` in Java. diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index 19d1bf94b..523ac41a6 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -9,6 +9,7 @@ use crate::{ WorkflowTaskScheduledEventAttributes, WorkflowTaskStartedEventAttributes, }, }, + protosext::{HistoryInfo, HistoryInfoError}, }; use anyhow::bail; use std::time::SystemTime; @@ -209,61 +210,12 @@ impl TestHistoryBuilder { Ok(()) } - /// Iterates over the events in this builder to return a [HistoryInfo] - pub(crate) fn get_history_info(&self, to_task_index: usize) -> Result { - let mut lastest_wf_started_id = 0; - let mut previous_wf_started_id = 0; - let mut count = 0; - let mut history = self.events.iter().peekable(); - - while let Some(event) = history.next() { - let next_event = history.peek(); - - if event.event_type == EventType::WorkflowTaskStarted as i32 { - let next_is_completed = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskCompleted as i32 - }); - let next_is_failed_or_timeout = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskFailed as i32 - || ne.event_type == EventType::WorkflowTaskTimedOut as i32 - }); - - if next_event.is_none() || next_is_completed { - previous_wf_started_id = lastest_wf_started_id; - lastest_wf_started_id = event.event_id; - if lastest_wf_started_id == previous_wf_started_id { - bail!("Latest wf started id and previous one are equal!") - } - count += 1; - if count == to_task_index || next_event.is_none() { - return Ok(HistoryInfo::new( - previous_wf_started_id, - lastest_wf_started_id, - )); - } - } else if next_event.is_some() && !next_is_failed_or_timeout { - bail!( - "Invalid history! Event {:?} should be WF task completed, \ - failed, or timed out.", - &event - ); - } - } - - if next_event.is_none() { - if event.is_final_wf_execution_event() { - return Ok(HistoryInfo::new( - previous_wf_started_id, - lastest_wf_started_id, - )); - } - // No more events - if lastest_wf_started_id != event.event_id { - bail!("Last item in history wasn't WorkflowTaskStarted") - } - } - } - unreachable!() + /// Iterates over the events in this builder to return a [HistoryInfo] of the n-th workflow task. + pub(crate) fn get_history_info( + &self, + to_wf_task_num: usize, + ) -> Result { + HistoryInfo::new_from_events(&self.events, Some(to_wf_task_num)) } fn build_and_push_event(&mut self, event_type: EventType, attribs: Attributes) { @@ -289,9 +241,3 @@ fn default_attribs(et: EventType) -> Result { _ => bail!("Don't know how to construct default attrs for {:?}", et), }) } - -#[derive(Clone, Debug, derive_more::Constructor, Eq, PartialEq, Hash)] -pub struct HistoryInfo { - pub previous_started_event_id: i64, - pub workflow_task_started_event_id: i64, -} diff --git a/src/machines/test_help/mod.rs b/src/machines/test_help/mod.rs index 427f0876f..b7ab7509b 100644 --- a/src/machines/test_help/mod.rs +++ b/src/machines/test_help/mod.rs @@ -3,5 +3,5 @@ type Result = std::result::Result; mod history_builder; mod workflow_driver; -pub(super) use history_builder::TestHistoryBuilder; +pub(crate) use history_builder::TestHistoryBuilder; pub(super) use workflow_driver::{CommandSender, TestWFCommand, TestWorkflowDriver}; diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 1db3dd5ef..aa89b8562 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -62,19 +62,19 @@ where impl DrivenWorkflow for TestWorkflowDriver where - F: Fn(CommandSender) -> Fut, + F: Fn(CommandSender) -> Fut + Send + Sync, Fut: Future, { #[instrument(skip(self))] fn start( - &self, + &mut self, _attribs: WorkflowExecutionStartedEventAttributes, ) -> Result, anyhow::Error> { Ok(vec![]) } #[instrument(skip(self))] - fn iterate_wf(&self) -> Result, anyhow::Error> { + fn iterate_wf(&mut self) -> Result, anyhow::Error> { let (sender, receiver) = CommandSender::new(self.timers.clone()); // Call the closure that produces the workflow future let wf_future = (self.wf_function)(sender); @@ -106,14 +106,14 @@ where } fn signal( - &self, + &mut self, _attribs: WorkflowExecutionSignaledEventAttributes, ) -> Result<(), anyhow::Error> { Ok(()) } fn cancel( - &self, + &mut self, _attribs: WorkflowExecutionCanceledEventAttributes, ) -> Result<(), anyhow::Error> { Ok(()) diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index e38d31f2d..ffa73c862 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -1,5 +1,6 @@ #![allow(clippy::large_enum_variant)] +use crate::protos::coresdk::{wf_activation, UnblockTimerTaskAttributes, WfActivation}; use crate::{ machines::{ workflow_machines::WFMachinesError, workflow_machines::WorkflowMachines, AddCommand, @@ -228,12 +229,19 @@ impl WFMachinesAdapter for TimerMachine { } // Fire the completion TimerMachineCommand::Complete(_event) => { + // TODO: Remove atomic bool nonsense -- kept for now to keep test here passing if let Some(a) = wf_machines .timer_notifiers .remove(&self.shared_state.timer_attributes.timer_id) { a.store(true, Ordering::SeqCst) }; + wf_machines.outgoing_wf_actications.push_back( + UnblockTimerTaskAttributes { + timer_id: self.shared_state.timer_attributes.timer_id.clone(), + } + .into(), + ); } } Ok(()) @@ -327,6 +335,7 @@ mod test { let commands = t .handle_workflow_task_take_cmds(&mut state_machines, Some(1)) .unwrap(); + dbg!(&commands); assert_eq!(commands.len(), 1); assert_eq!(commands[0].command_type, CommandType::StartTimer as i32); let commands = t @@ -341,16 +350,6 @@ mod test { #[rstest] fn test_fire_happy_path_full(fire_happy_hist: (TestHistoryBuilder, WorkflowMachines)) { - let (tracer, _uninstall) = opentelemetry_jaeger::new_pipeline() - .with_service_name("report_example") - .install() - .unwrap(); - let opentelemetry = tracing_opentelemetry::layer().with_tracer(tracer); - tracing_subscriber::registry() - .with(opentelemetry) - .try_init() - .unwrap(); - let s = span!(Level::DEBUG, "Test start", t = "full"); let _enter = s.enter(); diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index ed9b50ac4..81aaf12e6 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,3 +1,4 @@ +use crate::protos::coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -45,9 +46,13 @@ pub(crate) struct WorkflowMachines { /// Queued commands which have been produced by machines and await processing commands: VecDeque, - /// Commands generated by the currently processed workflow task. It is a queue as commands can - /// be added (due to marker based commands) while iterating over already added commands. + /// Commands generated by the currently processing workflow task. + /// + /// Old note: It is a queue as commands can be added (due to marker based commands) while + /// iterating over already added commands. current_wf_task_commands: VecDeque, + /// Outgoing activations that need to be sent to the lang sdk + pub(super) outgoing_wf_actications: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, @@ -57,7 +62,7 @@ pub(crate) struct WorkflowMachines { } #[derive(thiserror::Error, Debug)] -pub(crate) enum WFMachinesError { +pub enum WFMachinesError { #[error("Event {0:?} was not expected")] UnexpectedEvent(HistoryEvent), #[error("Event {0:?} was malformed: {1}")] @@ -73,7 +78,7 @@ pub(crate) enum WFMachinesError { } impl WorkflowMachines { - pub(super) fn new(driven_wf: Box) -> Self { + pub(crate) fn new(driven_wf: Box) -> Self { Self { drive_me: driven_wf, // In an ideal world one could say ..Default::default() here and it'd still work. @@ -87,6 +92,7 @@ impl WorkflowMachines { commands: Default::default(), current_wf_task_commands: Default::default(), timer_notifiers: Default::default(), + outgoing_wf_actications: Default::default(), } } @@ -106,7 +112,7 @@ impl WorkflowMachines { } /// Returns the id of the last seen WorkflowTaskStarted event - pub(super) fn get_last_started_event_id(&self) -> i64 { + pub(crate) fn get_last_started_event_id(&self) -> i64 { self.current_started_event_id } @@ -268,6 +274,16 @@ impl WorkflowMachines { )) = &event.attributes { self.current_run_id = Some(attrs.original_execution_run_id.clone()); + // TODO: Actual values -- not entirely sure this is the right spot + self.outgoing_wf_actications.push_back( + StartWorkflowTaskAttributes { + namespace: "".to_string(), + workflow_id: "".to_string(), + name: "".to_string(), + arguments: None, + } + .into(), + ); let results = self.drive_me.start(attrs.clone())?; self.handle_driven_results(results); } else { @@ -309,9 +325,23 @@ impl WorkflowMachines { .collect() } + /// Returns the next activation that needs to be performed by the lang sdk. Things like unblock + /// timer, etc. + pub(crate) fn get_wf_activation(&mut self) -> Option { + self.outgoing_wf_actications + .pop_front() + .map(|attrs| WfActivation { + // todo wat ? + timestamp: None, + // TODO: fix unwrap + run_id: self.current_run_id.clone().unwrap(), + attributes: attrs.into(), + }) + } + /// Given an event id (possibly zero) of the last successfully executed workflow task and an /// id of the last event, sets the ids internally and appropriately sets the replaying flag. - pub(super) fn set_started_ids( + pub(crate) fn set_started_ids( &mut self, previous_started_event_id: i64, workflow_task_started_event_id: i64, @@ -349,6 +379,7 @@ impl WorkflowMachines { self.current_wf_task_commands .push_back(complete_workflow(attrs)); } + WFCommand::NoCommandsFromLang => (), } } } diff --git a/src/pollers/workflow_poll_task.rs b/src/pollers/workflow_poll_task.rs index 8a8495ba6..1745b03ac 100644 --- a/src/pollers/workflow_poll_task.rs +++ b/src/pollers/workflow_poll_task.rs @@ -1,14 +1,14 @@ -use tonic::codegen::Future; -use tonic::{Response, Status}; - -use crate::pollers::poll_task::{PollTask, Result}; -use crate::protos::temporal::api::enums::v1 as enums; -use crate::protos::temporal::api::taskqueue::v1 as tq; -use crate::protos::temporal::api::workflowservice::v1 as temporal; -use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +use crate::{ + pollers::poll_task::PollTask, + pollers::poll_task::Result, + protos::temporal::api::{ + enums::v1::TaskQueueKind, taskqueue::v1::TaskQueue, + workflowservice::v1::workflow_service_client::WorkflowServiceClient, + workflowservice::v1::PollWorkflowTaskQueueRequest, + workflowservice::v1::PollWorkflowTaskQueueResponse, + }, }; -use temporal::workflow_service_client::WorkflowServiceClient; +use tonic::{codegen::Future, Response, Status}; struct WorkflowPollTask<'a> { service: &'a mut WorkflowServiceClient, @@ -19,13 +19,13 @@ struct WorkflowPollTask<'a> { } #[async_trait::async_trait] -impl PollTask for WorkflowPollTask<'_> { +impl PollTask for WorkflowPollTask<'_> { async fn poll(&mut self) -> Result { - let request = tonic::Request::new(temporal::PollWorkflowTaskQueueRequest { + let request = tonic::Request::new(PollWorkflowTaskQueueRequest { namespace: self.namespace.clone(), - task_queue: Some(tq::TaskQueue { + task_queue: Some(TaskQueue { name: self.task_queue.clone(), - kind: enums::TaskQueueKind::Unspecified as i32, + kind: TaskQueueKind::Unspecified as i32, }), identity: self.identity.clone(), binary_checksum: self.binary_checksum.clone(), diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 0d7816532..f7e1231e5 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -1,7 +1,51 @@ +#[allow(clippy::large_enum_variant)] pub mod coresdk { include!("coresdk.rs"); + use super::temporal::api::command::v1 as api_command; + use super::temporal::api::command::v1::Command as ApiCommand; + use crate::protos::coresdk::complete_task_req::Completion; + use command::Variant; pub type HistoryEventId = i64; + + impl Task { + pub fn from_wf_task(task_token: Vec, t: WfActivation) -> Self { + Task { + task_token, + variant: Some(t.into()), + } + } + } + + impl From> for WfActivationSuccess { + fn from(v: Vec) -> Self { + WfActivationSuccess { + commands: v + .into_iter() + .map(|cmd| Command { + variant: Some(Variant::Api(cmd)), + }) + .collect(), + } + } + } + + impl CompleteTaskReq { + /// Build a successful completion from some api command attributes and a task token + pub fn ok_from_api_attrs( + cmd: api_command::command::Attributes, + task_token: Vec, + ) -> Self { + let cmd: ApiCommand = cmd.into(); + let success: WfActivationSuccess = vec![cmd].into(); + CompleteTaskReq { + task_token, + completion: Some(Completion::Workflow(WfActivationCompletion { + status: Some(wf_activation_completion::Status::Successful(success)), + })), + } + } + } } // No need to lint these @@ -12,6 +56,24 @@ pub mod temporal { pub mod command { pub mod v1 { include!("temporal.api.command.v1.rs"); + use crate::protos::temporal::api::enums::v1::CommandType; + use command::Attributes; + + impl From for Command { + fn from(c: command::Attributes) -> Self { + match c { + a @ Attributes::StartTimerCommandAttributes(_) => Self { + command_type: CommandType::StartTimer as i32, + attributes: Some(a), + }, + a @ Attributes::CompleteWorkflowExecutionCommandAttributes(_) => Self { + command_type: CommandType::CompleteWorkflowExecution as i32, + attributes: Some(a), + }, + _ => unimplemented!(), + } + } + } } } pub mod enums { diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs new file mode 100644 index 000000000..9db67a641 --- /dev/null +++ b/src/protosext/history_info.rs @@ -0,0 +1,195 @@ +use crate::machines::{WFMachinesError, WorkflowMachines}; +use crate::protos::temporal::api::enums::v1::EventType; +use crate::protos::temporal::api::history::v1::{History, HistoryEvent}; + +#[derive(Clone, Debug, derive_more::Constructor, PartialEq)] +pub struct HistoryInfo { + pub previous_started_event_id: i64, + pub workflow_task_started_event_id: i64, + pub events: Vec, +} + +type Result = std::result::Result; + +#[derive(thiserror::Error, Debug)] +pub enum HistoryInfoError { + #[error("Latest wf started id and previous one are equal! ${previous_started_event_id:?}")] + UnexpectedEventId { + previous_started_event_id: i64, + workflow_task_started_event_id: i64, + }, + #[error("Invalid history! Event {0:?} should be WF task completed, failed, or timed out")] + FailedOrTimeout(HistoryEvent), + #[error("Last item in history wasn't WorkflowTaskStarted")] + HistoryEndsUnexpectedly, + #[error("Underlying error in workflow machine")] + UnderlyingMachineError(#[from] WFMachinesError), +} +impl HistoryInfo { + /// Constructs a new instance, retaining only enough events to reach the provided workflow + /// task number. If not provided, all events are retained. + pub(crate) fn new_from_events( + events: &[HistoryEvent], + to_wf_task_num: Option, + ) -> Result { + let to_wf_task_num = to_wf_task_num.unwrap_or(usize::MAX); + let mut workflow_task_started_event_id = 0; + let mut previous_started_event_id = 0; + let mut count = 0; + let mut history = events.iter().peekable(); + let mut events = vec![]; + + while let Some(event) = history.next() { + events.push(event.clone()); + let next_event = history.peek(); + + if event.event_type == EventType::WorkflowTaskStarted as i32 { + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + let next_is_failed_or_timeout = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskFailed as i32 + || ne.event_type == EventType::WorkflowTaskTimedOut as i32 + }); + + if next_event.is_none() || next_is_completed { + previous_started_event_id = workflow_task_started_event_id; + workflow_task_started_event_id = event.event_id; + if workflow_task_started_event_id == previous_started_event_id { + return Err(HistoryInfoError::UnexpectedEventId { + previous_started_event_id, + workflow_task_started_event_id, + }); + } + count += 1; + if count == to_wf_task_num || next_event.is_none() { + return Ok(Self { + previous_started_event_id, + workflow_task_started_event_id, + events, + }); + } + } else if next_event.is_some() && !next_is_failed_or_timeout { + return Err(HistoryInfoError::FailedOrTimeout(event.clone())); + } + } + + if next_event.is_none() { + if event.is_final_wf_execution_event() { + return Ok(Self { + previous_started_event_id, + workflow_task_started_event_id, + events, + }); + } + // No more events + if workflow_task_started_event_id != event.event_id { + return Err(HistoryInfoError::HistoryEndsUnexpectedly); + } + } + } + unreachable!() + } + + pub(crate) fn new_from_history(h: &History, to_wf_task_num: Option) -> Result { + Self::new_from_events(&h.events, to_wf_task_num) + } + + /// Apply events from history to workflow machines. Remember that only the events that exist + /// in this instance will be applied, which is determined by `to_wf_task_num` passed into the + /// constructor. + pub(crate) fn apply_history_events(&self, wf_machines: &mut WorkflowMachines) -> Result<()> { + let (_, events) = self + .events + .split_at(wf_machines.get_last_started_event_id() as usize); + let mut history = events.iter().peekable(); + + wf_machines.set_started_ids( + self.previous_started_event_id, + self.workflow_task_started_event_id, + ); + let mut started_id = self.previous_started_event_id; + + while let Some(event) = history.next() { + let next_event = history.peek(); + + if event.event_type == EventType::WorkflowTaskStarted as i32 { + let next_is_completed = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskCompleted as i32 + }); + let next_is_failed_or_timeout = next_event.map_or(false, |ne| { + ne.event_type == EventType::WorkflowTaskFailed as i32 + || ne.event_type == EventType::WorkflowTaskTimedOut as i32 + }); + + if next_event.is_none() || next_is_completed { + started_id = event.event_id; + if next_event.is_none() { + wf_machines.handle_event(event, false)?; + return Ok(()); + } + } else if next_event.is_some() && !next_is_failed_or_timeout { + return Err(HistoryInfoError::FailedOrTimeout(event.clone())); + } + } + + wf_machines.handle_event(event, next_event.is_some())?; + + if next_event.is_none() { + if event.is_final_wf_execution_event() { + return Ok(()); + } + if started_id != event.event_id { + return Err(HistoryInfoError::UnexpectedEventId { + previous_started_event_id: started_id, + workflow_task_started_event_id: event.event_id, + }); + } + unreachable!() + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{ + machines::test_help::TestHistoryBuilder, + protos::temporal::api::history::v1::{history_event, TimerFiredEventAttributes}, + }; + + #[test] + fn history_info_constructs_properly() { + /* + 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED + 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 3: EVENT_TYPE_WORKFLOW_TASK_STARTED + 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED + 5: EVENT_TYPE_TIMER_STARTED + 6: EVENT_TYPE_TIMER_FIRED + 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED + 8: EVENT_TYPE_WORKFLOW_TASK_STARTED + */ + let mut t = TestHistoryBuilder::default(); + + t.add_by_type(EventType::WorkflowExecutionStarted); + t.add_workflow_task(); + let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None); + t.add( + EventType::TimerFired, + history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes { + started_event_id: timer_started_event_id, + timer_id: "timer1".to_string(), + ..Default::default() + }), + ); + t.add_workflow_task_scheduled_and_started(); + let history_info = t.get_history_info(1).unwrap(); + assert_eq!(3, history_info.events.len()); + let history_info = t.get_history_info(2).unwrap(); + assert_eq!(8, history_info.events.len()); + } +} diff --git a/src/protosext/mod.rs b/src/protosext/mod.rs new file mode 100644 index 000000000..f55576e31 --- /dev/null +++ b/src/protosext/mod.rs @@ -0,0 +1,3 @@ +mod history_info; +pub use history_info::HistoryInfo; +pub(crate) use history_info::HistoryInfoError;