From cd957ceb3ac29c11f81ad8d54481d31f549cb27b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Feb 2021 21:45:12 -0800 Subject: [PATCH 01/13] Adding integration test for the poller and made task queue configurable for the poll task --- .buildkite/docker/Dockerfile | 2 + .buildkite/docker/docker-compose.yaml | 2 + .buildkite/pipeline.yml | 14 ++- Cargo.toml | 18 ++- README.md | 4 +- build.rs | 7 +- protos/local/core_interface.proto | 59 ++++++---- src/lib.rs | 132 +++++++++++++--------- src/machines/mod.rs | 7 +- src/machines/test_help/history_builder.rs | 66 +---------- src/machines/test_help/workflow_driver.rs | 6 +- src/machines/timer_state_machine.rs | 10 +- src/machines/workflow_machines.rs | 37 +++--- src/pollers/mod.rs | 41 +++++-- src/protos/mod.rs | 28 +++++ src/protosext/history_info.rs | 11 +- src/protosext/mod.rs | 2 +- tests/integ_tests/poller_test.rs | 15 +++ tests/main.rs | 4 + 19 files changed, 278 insertions(+), 187 deletions(-) create mode 100644 tests/integ_tests/poller_test.rs create mode 100644 tests/main.rs diff --git a/.buildkite/docker/Dockerfile b/.buildkite/docker/Dockerfile index 3acabbd57..f32421f6e 100644 --- a/.buildkite/docker/Dockerfile +++ b/.buildkite/docker/Dockerfile @@ -3,4 +3,6 @@ FROM rust:latest RUN rustup component add rustfmt && \ rustup component add clippy +RUN cargo install cargo-tarpaulin + WORKDIR /sdk-core diff --git a/.buildkite/docker/docker-compose.yaml b/.buildkite/docker/docker-compose.yaml index 13d3cc132..0aca4f6e2 100644 --- a/.buildkite/docker/docker-compose.yaml +++ b/.buildkite/docker/docker-compose.yaml @@ -5,6 +5,8 @@ services: build: context: ../../ dockerfile: .buildkite/docker/Dockerfile + security_opt: + - seccomp:unconfined command: /bin/sh -c ".buildkite/docker/build.sh" environment: - "USER=unittest" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 92e370956..1aaecbc72 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -23,7 +23,19 @@ steps: agents: queue: "default" docker: "*" - command: "cargo test --workspace" + command: "cargo tarpaulin --out Html --workspace" + artifact_paths: + - "tarpaulin-report.html" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml + - label: "integ-test" + agents: + queue: "default" + docker: "*" + command: "cargo test --test integ_tests" timeout_in_minutes: 15 plugins: - docker-compose#v3.0.0: diff --git a/Cargo.toml b/Cargo.toml index 7c07e9756..e52162b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ anyhow = "1.0" async-trait = "0.1" dashmap = "4.0" derive_more = "0.99" +displaydoc = "0.1" env_logger = "0.8" futures = "0.3" log = "0.4" @@ -20,11 +21,17 @@ prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } -tonic = "0.4" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" +tower = "0.4.4" + +[dependencies.tonic] +version = "0.4" +#path = "../tonic/tonic" +# Using our fork for now which fixes grpc-timeout header getting stripped +git = "https://github.com/temporalio/tonic" [dependencies.rustfsm] path = "fsm" @@ -39,3 +46,10 @@ tonic-build = "0.4" [workspace] members = [".", "fsm"] + +[[test]] +name = "integ_tests" +path = "tests/main.rs" +# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with +# `cargo test --test integ_tests` +test = false \ No newline at end of file diff --git a/README.md b/README.md index cc352b7ba..d4f1667b4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Core SDK that can be used as a base for all other SDKs. +[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master) + +Core SDK that can be used as a base for all other Temporal SDKs. # Getting started This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a diff --git a/build.rs b/build.rs index cb6500556..11f4dc705 100644 --- a/build.rs +++ b/build.rs @@ -15,12 +15,9 @@ fn main() -> Result<(), Box> { "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") + .type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.WFActivation.attributes", - "#[derive(::derive_more::From)]", - ) - .type_attribute( - "coresdk.WorkflowTask.attributes", + "coresdk.WFActivationJob.attributes", "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 920e1f008..82af4ecdb 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -16,24 +16,49 @@ import "temporal/api/failure/v1/message.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/command/v1/message.proto"; -service Core { - rpc PollTask (PollTaskReq) returns (Task) {} - rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {} -} - +// A request as given to [crate::Core::poll_task] message PollTaskReq { + // If true, poll for workflow tasks bool workflows = 1; + // If true, poll for activity tasks bool activities = 2; } +// An instruction to perform work from core->lang sdk message Task { + // A unique identifier for this task bytes task_token = 1; + // The type of task to be performed oneof variant { + // Wake up a workflow WFActivation workflow = 2; + // Run an activity ActivityTask activity = 3; } } +// An instruction to the lang sdk to run some workflow code, whether for the first time or from +// a cached state. +message WFActivation { + // Time the activation(s) were requested + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; + // The id of the currently active run of the workflow + string run_id = 2; + // The things to do upon activating the workflow + repeated WFActivationJob jobs = 3; +} + +message WFActivationJob { + oneof attributes { + // TODO could literally be attributes from events? -- maybe we don't need our own types + + // Begin a workflow for the first time + StartWorkflowTaskAttributes start_workflow = 1; + // A timer has fired, allowing whatever was waiting on it (if anything) to proceed + TimerFiredTaskAttributes timer_fired = 2; + } +} + message StartWorkflowTaskAttributes { // The identifier the lang-specific sdk uses to execute workflow code string workflow_type = 1; @@ -43,35 +68,27 @@ message StartWorkflowTaskAttributes { temporal.api.common.v1.Payloads arguments = 3; // TODO: Do we need namespace here, or should that just be fetchable easily? - - // will be others - workflow exe started attrs, etc + // will be others - workflow exe started attrs, etc } -// maybe we just go back to timer fired to keep consistent -message UnblockTimerTaskAttributes { +message TimerFiredTaskAttributes { string timer_id = 1; } -message WFActivation { - google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; - string run_id = 2; - oneof attributes { - // could literally be attributes from events -- maybe we don't need our own types - StartWorkflowTaskAttributes start_workflow = 3; - UnblockTimerTaskAttributes unblock_timer = 4; - } -} - message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; } +// Sent from lang side to core when calling [crate::Core::complete_task] message CompleteTaskReq { + // The id from the [Task] being completed bytes task_token = 1; oneof completion { + // Complete a workflow task WFActivationCompletion workflow = 2; + // Complete an activity task ActivityTaskCompletion activity = 3; } } @@ -94,6 +111,7 @@ message CoreCommand { // Reserved for specific commands } +// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next message Command { oneof variant { temporal.api.command.v1.Command api = 1; @@ -102,13 +120,16 @@ message Command { } message WFActivationSuccess { + // A list of commands to send back to the temporal server repeated Command commands = 1; + // Other bits from RespondWorkflowTaskCompletedRequest as needed } message WFActivationFailure { temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; temporal.api.failure.v1.Failure failure = 2; + // Other bits from RespondWorkflowTaskFailedRequest as needed } diff --git a/src/lib.rs b/src/lib.rs index 1fcf135b5..d297cc518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,22 @@ +#![warn(missing_docs)] // error if there are missing docs + +//! This crate provides a basis for creating new Temporal SDKs without completely starting from +//! scratch + #[macro_use] extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +pub mod protos; + mod machines; mod pollers; -pub mod protos; mod protosext; pub use protosext::HistoryInfo; +pub use url::Url; use crate::{ machines::{ @@ -30,45 +37,65 @@ use crate::{ workflowservice::v1::PollWorkflowTaskQueueResponse, }, }, - protosext::HistoryInfoError, + protosext::{HistoryInfo, HistoryInfoError}, }; use anyhow::Error; use dashmap::DashMap; +use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; use tokio::runtime::Runtime; -use url::Url; +use tonic::codegen::http::uri::InvalidUri; +/// A result alias having [CoreError] as the error type pub type Result = std::result::Result; +const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; -// TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have -// any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can -// be accessed from many threads, but each workflow is pinned to one thread (possibly with many -// sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should -// be both, ideally. +/// This trait is the primary way by which language specific SDKs interact with the core SDK. It is +/// expected that only one instance of an implementation will exist for the lifetime of the +/// worker(s) using it. pub trait Core { - fn poll_task(&self) -> Result; + /// Ask the core for some work, returning a [Task], which will eventually contain either a + /// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the + /// language SDK's responsibility to call the appropriate code with the provided inputs. + /// + /// TODO: Examples + fn poll_task(&self, task_queue: &str) -> Result; + + /// Tell the core that some work has been completed - whether as a result of running workflow + /// code or executing an activity. fn complete_task(&self, req: CompleteTaskReq) -> Result<()>; } +/// Holds various configuration information required to call [init] pub struct CoreInitOptions { - target_url: Url, - namespace: String, - _task_queue: Vec, - identity: String, - binary_checksum: String, + /// The URL of the Temporal server to connect to + pub target_url: Url, + + /// The namespace on the server your worker will be using + pub namespace: String, + + /// A human-readable string that can identify your worker + /// + /// TODO: Probably belongs in future worker abstraction + pub identity: String, + + /// A string that should be unique to the exact worker code/binary being executed + pub worker_binary_id: String, } -/// Initializes instance of the core sdk and establishes connection to the temporal server. -/// Creates tokio runtime that will be used for all client-server interactions. +/// Initializes an instance of the core sdk and establishes a connection to the temporal server. +/// +/// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, - binary_checksum: opts.binary_checksum, + worker_binary_id: opts.worker_binary_id, + long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT), }; // Initialize server client let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; @@ -81,7 +108,12 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } -pub struct CoreSDK { +pub enum TaskQueue { + Workflow(String), + _Activity(String), +} + +struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs work_provider: WP, @@ -96,11 +128,11 @@ where WP: WorkflowTaskProvider, { #[instrument(skip(self))] - fn poll_task(&self) -> Result { + fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work("TODO: Real task queue"))?; + .block_on(self.work_provider.get_work(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -226,7 +258,7 @@ pub trait WorkflowTaskProvider { /// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering /// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService] #[derive(Debug)] -pub struct WorkflowBridge { +pub(crate) struct WorkflowBridge { // does wf id belong in here? started_attrs: Option, incoming_commands: Receiver>, @@ -276,31 +308,35 @@ impl DrivenWorkflow for WorkflowBridge { // Real bridge doesn't actually need to listen impl ActivationListener for WorkflowBridge {} -#[derive(thiserror::Error, Debug)] +/// The error type returned by interactions with [Core] +#[derive(thiserror::Error, Debug, displaydoc::Display)] #[allow(clippy::large_enum_variant)] +// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum CoreError { - #[error("Unknown service error")] + /// Unknown service error Unknown, - #[error("No tasks to perform for now")] + /// No tasks to perform for now NoWork, - #[error("Poll response from server was malformed: {0:?}")] + /// Poll response from server was malformed: {0:?} BadDataFromWorkProvider(PollWorkflowTaskQueueResponse), - #[error("Lang SDK sent us a malformed completion: {0:?}")] + /// Lang SDK sent us a malformed completion: {0:?} MalformedCompletion(CompleteTaskReq), - #[error("Error buffering commands")] + /// Error buffering commands CantSendCommands(#[from] SendError>), - #[error("Couldn't interpret command from ")] + /// Couldn't interpret command from UninterprableCommand(#[from] InconvertibleCommandError), - #[error("Underlying error in history processing")] + /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), - #[error("Task token had nothing associated with it: {0:?}")] + /// Task token had nothing associated with it: {0:?} NothingFoundForTaskToken(Vec), - #[error("Error calling the service: {0:?}")] + /// Error calling the service: {0:?} TonicError(#[from] tonic::Status), - #[error("Server connection error: {0:?}")] + /// Server connection error: {0:?} TonicTransportError(#[from] tonic::transport::Error), - #[error("Failed to initialize tokio runtime: {0:?}")] + /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), + #[error("Invalid URI: {0:?}")] + InvalidUri(#[from] InvalidUri), } #[cfg(test)] @@ -309,7 +345,7 @@ mod test { use crate::{ machines::test_help::TestHistoryBuilder, protos::{ - coresdk::{task, wf_activation, WfActivation}, + coresdk::{wf_activation_job, WfActivationJob}, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, @@ -391,17 +427,13 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( - res, - Task { - variant: Some(task::Variant::Workflow(WfActivation { - attributes: Some(wf_activation::Attributes::StartWorkflow(_)), - .. - })), - .. - } + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] ); assert!(core.workflow_machines.get(run_id).is_some()); @@ -417,17 +449,13 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( - res, - Task { - variant: Some(task::Variant::Workflow(WfActivation { - attributes: Some(wf_activation::Attributes::UnblockTimer(_)), - .. - })), - .. - } + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + }] ); let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 48d2ef3e7..0f62f33cc 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -39,9 +39,8 @@ pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; use crate::{ machines::workflow_machines::WorkflowTrigger, - protos::coresdk::wf_activation, protos::{ - coresdk::{self, command::Variant}, + coresdk::{self, command::Variant, wf_activation_job}, temporal::api::{ command::v1::{ command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes, @@ -93,10 +92,10 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send { ) -> Result<(), anyhow::Error>; } -/// Allows observers to listen to newly generated outgoing activations. Used for testing, where +/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where /// some activations must be handled before outgoing commands are issued to avoid deadlocking. pub(crate) trait ActivationListener { - fn on_activation(&mut self, _activation: &wf_activation::Attributes) {} + fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {} } /// The struct for [WFCommand::AddCommand] diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index fcb319276..2f4dac37f 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -143,70 +143,8 @@ impl TestHistoryBuilder { wf_machines: &mut WorkflowMachines, to_wf_task_num: Option, ) -> Result<()> { - let to_wf_task_num = to_wf_task_num.unwrap_or(usize::MAX); - let (_, events) = self - .events - .split_at(wf_machines.get_last_started_event_id() as usize); - let mut history = events.iter().peekable(); - - let hist_info = self.get_history_info(to_wf_task_num)?; - wf_machines.set_started_ids( - hist_info.previous_started_event_id, - hist_info.workflow_task_started_event_id, - ); - let mut started_id = hist_info.previous_started_event_id; - let mut num_seen_wf_tasks = if wf_machines.get_last_started_event_id() > 0 { - self.get_workflow_task_count(history.peek().map(|e| e.event_id - 1))? - } else { - 0 - }; - - while let Some(event) = history.next() { - let next_event = history.peek(); - - if event.event_type == EventType::WorkflowTaskStarted as i32 { - let next_is_completed = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskCompleted as i32 - }); - let next_is_failed_or_timeout = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskFailed as i32 - || ne.event_type == EventType::WorkflowTaskTimedOut as i32 - }); - - if next_event.is_none() || next_is_completed { - started_id = event.event_id; - num_seen_wf_tasks += 1; - if num_seen_wf_tasks == to_wf_task_num || next_event.is_none() { - wf_machines.handle_event(event, false)?; - return Ok(()); - } - } else if next_event.is_some() && !next_is_failed_or_timeout { - bail!( - "Invalid history! Event {:?} should be WF task completed, \ - failed, or timed out.", - &event - ); - } - } - - wf_machines.handle_event(event, next_event.is_some())?; - - if next_event.is_none() { - if event.is_final_wf_execution_event() { - return Ok(()); - } - if started_id != event.event_id { - bail!( - "The last event in the history (id {}) isn't the last WF task \ - started (id {})", - event.event_id, - started_id - ); - } - unreachable!() - } - } - + let histinfo = HistoryInfo::new_from_events(&self.events, to_wf_task_num)?; + histinfo.apply_history_events(wf_machines)?; Ok(()) } diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index d6dd8fe05..92aa306e1 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -2,7 +2,7 @@ use super::Result; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::{wf_activation::Attributes, UnblockTimerTaskAttributes}, + coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes}, temporal::api::{ command::v1::StartTimerCommandAttributes, history::v1::{ @@ -58,8 +58,8 @@ where } impl ActivationListener for TestWorkflowDriver { - fn on_activation(&mut self, activation: &Attributes) { - if let Attributes::UnblockTimer(UnblockTimerTaskAttributes { timer_id }) = activation { + fn on_activation_job(&mut self, activation: &Attributes) { + if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation { Arc::get_mut(&mut self.cache) .unwrap() .unblocked_timers diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index e29006fb1..b217efdb3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -1,14 +1,12 @@ #![allow(clippy::large_enum_variant)] -use crate::machines::workflow_machines::WorkflowTrigger; -use crate::protos::coresdk::{wf_activation, UnblockTimerTaskAttributes, WfActivation}; use crate::{ machines::{ - workflow_machines::WFMachinesError, workflow_machines::WorkflowMachines, AddCommand, - CancellableCommand, WFCommand, WFMachinesAdapter, + workflow_machines::{WFMachinesError, WorkflowMachines, WorkflowTrigger}, + AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter, }, protos::{ - coresdk::HistoryEventId, + coresdk::{HistoryEventId, TimerFiredTaskAttributes, WfActivation}, temporal::api::{ command::v1::{ command::Attributes, CancelTimerCommandAttributes, Command, @@ -224,7 +222,7 @@ impl WFMachinesAdapter for TimerMachine { ) -> Result, WFMachinesError> { match my_command { // Fire the completion - TimerMachineCommand::Complete(_event) => Ok(vec![UnblockTimerTaskAttributes { + TimerMachineCommand::Complete(_event) => Ok(vec![TimerFiredTaskAttributes { timer_id: self.shared_state.timer_attributes.timer_id.clone(), } .into()]), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 9f3d4cad5..1315e0df1 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,4 +1,5 @@ use crate::machines::ActivationListener; +use crate::protos::coresdk::WfActivationJob; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -6,7 +7,7 @@ use crate::{ ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}, + coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation}, temporal::api::{ command::v1::StartTimerCommandAttributes, common::v1::WorkflowExecution, @@ -62,8 +63,8 @@ pub(crate) struct WorkflowMachines { /// Old note: It is a queue as commands can be added (due to marker based commands) while /// iterating over already added commands. current_wf_task_commands: VecDeque, - /// Outgoing activations that need to be sent to the lang sdk - outgoing_wf_activations: VecDeque, + /// Outgoing activation jobs that need to be sent to the lang sdk + outgoing_wf_activation_jobs: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, @@ -73,7 +74,7 @@ pub(crate) struct WorkflowMachines { #[derive(Debug, derive_more::From)] #[must_use] pub(super) enum WorkflowTrigger { - PushWFActivation(#[from(forward)] wf_activation::Attributes), + PushWFJob(#[from(forward)] wf_activation_job::Attributes), TriggerWFTaskStarted { task_started_event_id: i64, time: SystemTime, @@ -114,7 +115,7 @@ impl WorkflowMachines { machines_by_id: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), - outgoing_wf_activations: Default::default(), + outgoing_wf_activation_jobs: Default::default(), } } @@ -287,7 +288,7 @@ impl WorkflowMachines { { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow - self.outgoing_wf_activations.push_back( + self.outgoing_wf_activation_jobs.push_back( StartWorkflowTaskAttributes { // TODO: This needs to be set during init workflow_type: "".to_string(), @@ -344,14 +345,20 @@ impl WorkflowMachines { /// Returns the next activation that needs to be performed by the lang sdk. Things like unblock /// timer, etc. pub(crate) fn get_wf_activation(&mut self) -> Option { - self.outgoing_wf_activations - .pop_front() - .map(|attrs| WfActivation { - // todo wat ? - timestamp: None, + if self.outgoing_wf_activation_jobs.is_empty() { + None + } else { + let jobs = self + .outgoing_wf_activation_jobs + .drain(..) + .map(Into::into) + .collect(); + Some(WfActivation { + timestamp: self.current_wf_time.map(Into::into), run_id: self.run_id.clone(), - attributes: attrs.into(), + jobs, }) + } } /// Given an event id (possibly zero) of the last successfully executed workflow task and an @@ -394,9 +401,9 @@ impl WorkflowMachines { event!(Level::DEBUG, msg = "Machine produced triggers", ?triggers); for trigger in triggers { match trigger { - WorkflowTrigger::PushWFActivation(a) => { - self.drive_me.on_activation(&a); - self.outgoing_wf_activations.push_back(a); + WorkflowTrigger::PushWFJob(a) => { + self.drive_me.on_activation_job(&a); + self.outgoing_wf_activation_jobs.push_back(a); } WorkflowTrigger::TriggerWFTaskStarted { task_started_event_id, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 9cfbc03b4..26a1c3c5f 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,29 +1,50 @@ -use crate::protos::temporal::api::enums::v1::TaskQueueKind; -use crate::protos::temporal::api::taskqueue::v1::TaskQueue; -use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; -use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +use std::time::Duration; + +use crate::{ + protos::temporal::api::enums::v1::TaskQueueKind, + protos::temporal::api::taskqueue::v1::TaskQueue, + protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + }, + Result, WorkflowTaskProvider, }; -use crate::Result; -use crate::WorkflowTaskProvider; +use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] pub(crate) struct ServerGatewayOptions { pub namespace: String, pub identity: String, - pub binary_checksum: String, + pub worker_binary_id: String, + pub long_poll_timeout: Duration, } impl ServerGatewayOptions { pub(crate) async fn connect(&self, target_url: Url) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + let channel = Channel::from_shared(target_url.to_string())? + .connect() + .await?; + let service = WorkflowServiceClient::with_interceptor(channel, intercept); Ok(ServerGateway { service, opts: self.clone(), }) } } + +/// This function will get called on each outbound request. Returning a +/// `Status` here will cancel the request and have that status returned to +/// the caller. +fn intercept(mut req: Request<()>) -> Result, Status> { + // TODO convert error + let metadata = req.metadata_mut(); + metadata.insert("grpc-timeout", "50000m".parse().unwrap()); + metadata.insert("client-name", "core-sdk".parse().unwrap()); + println!("Intercepting request: {:?}", req); + Ok(req) +} + /// Provides pub(crate) struct ServerGateway { service: WorkflowServiceClient, @@ -39,7 +60,7 @@ impl ServerGateway { kind: TaskQueueKind::Unspecified as i32, }), identity: self.opts.identity.to_string(), - binary_checksum: self.opts.binary_checksum.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), }); Ok(self diff --git a/src/protos/mod.rs b/src/protos/mod.rs index f7e1231e5..47232bd12 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -1,9 +1,18 @@ +//! Contains the protobuf definitions used as arguments to and return values from interactions with +//! [super::Core]. Language SDK authors can generate structs using the proto definitions that will match +//! the generated structs in this module. + #[allow(clippy::large_enum_variant)] +// I'd prefer not to do this, but there are some generated things that just don't need it. +#[allow(missing_docs)] pub mod coresdk { + //! Contains all protobufs relating to communication between core and lang-specific SDKs + include!("coresdk.rs"); use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; use crate::protos::coresdk::complete_task_req::Completion; + use crate::protos::coresdk::wf_activation_job::Attributes; use command::Variant; pub type HistoryEventId = i64; @@ -15,6 +24,24 @@ pub mod coresdk { variant: Some(t.into()), } } + + /// Returns any contained jobs if this task was a wf activation and it had some + #[cfg(test)] + pub fn get_wf_jobs(&self) -> Vec { + if let Some(task::Variant::Workflow(a)) = &self.variant { + a.jobs.clone() + } else { + vec![] + } + } + } + + impl From for WfActivationJob { + fn from(a: Attributes) -> Self { + WfActivationJob { + attributes: Some(a), + } + } } impl From> for WfActivationSuccess { @@ -50,6 +77,7 @@ pub mod coresdk { // No need to lint these #[allow(clippy::all)] +#[allow(missing_docs)] // This is disgusting, but unclear to me how to avoid it. TODO: Discuss w/ prost maintainer pub mod temporal { pub mod api { diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs index 23524fb55..4ef36537b 100644 --- a/src/protosext/history_info.rs +++ b/src/protosext/history_info.rs @@ -1,9 +1,11 @@ -use crate::machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::temporal::api::enums::v1::EventType; -use crate::protos::temporal::api::history::v1::{History, HistoryEvent}; +use crate::{ + machines::{WFMachinesError, WorkflowMachines}, + protos::temporal::api::enums::v1::EventType, + protos::temporal::api::history::v1::{History, HistoryEvent}, +}; #[derive(Clone, Debug, derive_more::Constructor, PartialEq)] -pub struct HistoryInfo { +pub(crate) struct HistoryInfo { pub previous_started_event_id: i64, pub workflow_task_started_event_id: i64, pub events: Vec, @@ -25,6 +27,7 @@ pub enum HistoryInfoError { #[error("Underlying error in workflow machine")] UnderlyingMachineError(#[from] WFMachinesError), } + impl HistoryInfo { /// Constructs a new instance, retaining only enough events to reach the provided workflow /// task number. If not provided, all events are retained. diff --git a/src/protosext/mod.rs b/src/protosext/mod.rs index f55576e31..91d15b0d8 100644 --- a/src/protosext/mod.rs +++ b/src/protosext/mod.rs @@ -1,3 +1,3 @@ mod history_info; -pub use history_info::HistoryInfo; +pub(crate) use history_info::HistoryInfo; pub(crate) use history_info::HistoryInfoError; diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs new file mode 100644 index 000000000..bb7f957b0 --- /dev/null +++ b/tests/integ_tests/poller_test.rs @@ -0,0 +1,15 @@ +use std::convert::TryFrom; +use temporal_sdk_core::{Core, CoreInitOptions, Url}; + +#[test] +fn empty_poll() { + let core = temporal_sdk_core::init(CoreInitOptions { + target_url: Url::try_from("http://localhost:7233").unwrap(), + namespace: "default".to_string(), + identity: "none".to_string(), + binary_checksum: "".to_string(), + }) + .unwrap(); + + dbg!(core.poll_task("test-tq").unwrap()); +} diff --git a/tests/main.rs b/tests/main.rs new file mode 100644 index 000000000..976cf28e4 --- /dev/null +++ b/tests/main.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod integ_tests { + mod poller_test; +} From 7cef27d514c9517fd315bf9963e43fd559b30033 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 16:12:00 -0800 Subject: [PATCH 02/13] Fix build errors and add comments --- src/lib.rs | 33 ++++++++++++++++++-------------- tests/integ_tests/poller_test.rs | 2 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d297cc518..0209c886b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,19 +3,12 @@ //! This crate provides a basis for creating new Temporal SDKs without completely starting from //! scratch -#[macro_use] -extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +#[macro_use] +extern crate tracing; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - -pub use protosext::HistoryInfo; pub use url::Url; use crate::{ @@ -41,14 +34,20 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; -use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, + time::Duration, }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; +pub mod protos; + +mod machines; +mod pollers; +mod protosext; + /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -108,8 +107,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +/// Type of task queue to poll. pub enum TaskQueue { + /// Workflow task Workflow(String), + /// Activity task _Activity(String), } @@ -335,13 +337,16 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), - #[error("Invalid URI: {0:?}")] + /// Invalid URI: {0:?} InvalidUri(#[from] InvalidUri), } #[cfg(test)] mod test { - use super::*; + use std::collections::VecDeque; + + use tracing::Level; + use crate::{ machines::test_help::TestHistoryBuilder, protos::{ @@ -355,8 +360,8 @@ mod test { }, }, }; - use std::collections::VecDeque; - use tracing::Level; + + use super::*; #[test] fn workflow_bridge() { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index bb7f957b0..fd8c37356 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -7,7 +7,7 @@ fn empty_poll() { target_url: Url::try_from("http://localhost:7233").unwrap(), namespace: "default".to_string(), identity: "none".to_string(), - binary_checksum: "".to_string(), + worker_binary_id: "".to_string(), }) .unwrap(); From 47aba709238ee93a90fa33e00e5dcb819cc9466f Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 17:10:00 -0800 Subject: [PATCH 03/13] Create workflow in the integ test --- Cargo.toml | 2 +- src/lib.rs | 21 +++++++----- src/pollers/mod.rs | 10 +++--- tests/integ_tests/poller_test.rs | 56 +++++++++++++++++++++++++++++--- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e52162b97..13f9c1b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" -tower = "0.4.4" +rand = "0.8.3" [dependencies.tonic] version = "0.4" diff --git a/src/lib.rs b/src/lib.rs index 0209c886b..ab96c6686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,18 @@ extern crate assert_matches; #[macro_use] extern crate tracing; +mod machines; +mod pollers; +pub mod protos; +mod protosext; + +pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, }, - pollers::ServerGatewayOptions, protos::{ coresdk::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, @@ -42,12 +47,6 @@ use std::{ use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -83,13 +82,19 @@ pub struct CoreInitOptions { /// A string that should be unique to the exact worker code/binary being executed pub worker_binary_id: String, + + /// Optional tokio runtime + pub runtime: Option, } /// Initializes an instance of the core sdk and establishes a connection to the temporal server. /// /// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { - let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; + let runtime = opts + .runtime + .map(Ok) + .unwrap_or_else(|| Runtime::new().map_err(CoreError::TokioInitError))?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 26a1c3c5f..badf0cfe2 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -13,7 +13,7 @@ use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] -pub(crate) struct ServerGatewayOptions { +pub struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub worker_binary_id: String, @@ -21,7 +21,7 @@ pub(crate) struct ServerGatewayOptions { } impl ServerGatewayOptions { - pub(crate) async fn connect(&self, target_url: Url) -> Result { + pub async fn connect(&self, target_url: Url) -> Result { let channel = Channel::from_shared(target_url.to_string())? .connect() .await?; @@ -46,9 +46,9 @@ fn intercept(mut req: Request<()>) -> Result, Status> { } /// Provides -pub(crate) struct ServerGateway { - service: WorkflowServiceClient, - opts: ServerGatewayOptions, +pub struct ServerGateway { + pub service: WorkflowServiceClient, + pub opts: ServerGatewayOptions, } impl ServerGateway { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index fd8c37356..7a6b4507a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,15 +1,63 @@ +use rand; +use rand::Rng; use std::convert::TryFrom; -use temporal_sdk_core::{Core, CoreInitOptions, Url}; +use std::time::Duration; +use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; +use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; +use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; + +const TASK_QUEUE: &str = "test-tq"; +const NAMESPACE: &str = "default"; + +const TARGET_URI: &'static str = "http://localhost:7233"; + +// TODO try to consolidate this into the SDK code so we don't need to create another runtime. +#[tokio::main] +async fn create_workflow() { + let mut rng = rand::thread_rng(); + let workflow_id: u8 = rng.gen(); + let request_id: u8 = rng.gen(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + }; + let mut gateway = gateway_opts + .connect(Url::try_from(TARGET_URI).unwrap()) + .await + .unwrap(); + gateway + .service + .start_workflow_execution(StartWorkflowExecutionRequest { + namespace: NAMESPACE.to_string(), + workflow_id: workflow_id.to_string(), + workflow_type: Some(WorkflowType { + name: "test-workflow".to_string(), + }), + task_queue: Some(TaskQueue { + name: TASK_QUEUE.to_string(), + kind: 0, + }), + request_id: request_id.to_string(), + ..Default::default() + }) + .await + .unwrap(); +} #[test] fn empty_poll() { + create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { - target_url: Url::try_from("http://localhost:7233").unwrap(), - namespace: "default".to_string(), + target_url: Url::try_from(TARGET_URI).unwrap(), + namespace: NAMESPACE.to_string(), identity: "none".to_string(), worker_binary_id: "".to_string(), + runtime: None, }) .unwrap(); - dbg!(core.poll_task("test-tq").unwrap()); + dbg!(core.poll_task(TASK_QUEUE).unwrap()); } From 0a92a79016f000e1924f14199e38c1e68de142f8 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 20:19:52 -0800 Subject: [PATCH 04/13] Add end to end integ test for timer workflow --- src/lib.rs | 48 ++++++++++++++++++++++--------- src/pollers/mod.rs | 36 ++++++++++++++++++----- tests/integ_tests/poller_test.rs | 49 ++++++++++++++++++++++++++------ 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ab96c6686..31279670d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ #[cfg(test)] #[macro_use] -extern crate assert_matches; +pub extern crate assert_matches; #[macro_use] extern crate tracing; @@ -17,6 +17,10 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -106,7 +110,7 @@ pub fn init(opts: CoreInitOptions) -> Result { Ok(CoreSDK { runtime, - work_provider, + server_gateway: work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), }) @@ -123,7 +127,7 @@ pub enum TaskQueue { struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs - work_provider: WP, + server_gateway: WP, /// Key is run id workflow_machines: DashMap>)>, /// Maps task tokens to workflow run ids @@ -132,14 +136,14 @@ struct CoreSDK { impl Core for CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi, { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work(task_queue))?; + .block_on(self.server_gateway.poll(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -185,13 +189,19 @@ where status: Some(wfstatus), })), } => { - let wf_run_id = self + let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or(CoreError::NothingFoundForTaskToken(task_token))?; + .ok_or(CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?, + Status::Successful(success) => { + self.push_lang_commands(&run_id, success)?; + if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { + let commands = machines.0.get_commands(); + self.server_gateway.complete(task_token, commands); + } + } Status::Failed(_) => {} } Ok(()) @@ -204,13 +214,12 @@ where } _ => Err(CoreError::MalformedCompletion(req)), } - // TODO: Get fsm commands and send them to server (get_commands) } } impl CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -256,9 +265,22 @@ where /// implementor. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait WorkflowTaskProvider { +pub trait PollWorkflowTaskQueueApi { /// Fetch new work. Should block indefinitely if there is no work. - async fn get_work(&self, task_queue: &str) -> Result; + async fn poll(&self, task_queue: &str) -> Result; +} + +/// Implementors can provide new work to the SDK. The connection to the server is the real +/// implementor. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait RespondWorkflowTaskCompletedApi { + /// Fetch new work. Should block indefinitely if there is no work. + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -432,7 +454,7 @@ mod test { let runtime = Runtime::new().unwrap(); let core = CoreSDK { runtime, - work_provider: mock_provider, + server_gateway: mock_provider, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), }; diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index badf0cfe2..f8ecb808b 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,5 +1,9 @@ use std::time::Duration; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ protos::temporal::api::enums::v1::TaskQueueKind, protos::temporal::api::taskqueue::v1::TaskQueue, @@ -7,7 +11,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, }, - Result, WorkflowTaskProvider, + PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, Result, }; use tonic::{transport::Channel, Request, Status}; use url::Url; @@ -51,9 +55,10 @@ pub struct ServerGateway { pub opts: ServerGatewayOptions, } -impl ServerGateway { +#[async_trait::async_trait] +impl PollWorkflowTaskQueueApi for ServerGateway { async fn poll(&self, task_queue: &str) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.to_string(), task_queue: Some(TaskQueue { name: task_queue.to_string(), @@ -61,7 +66,7 @@ impl ServerGateway { }), identity: self.opts.identity.to_string(), binary_checksum: self.opts.worker_binary_id.to_string(), - }); + }; Ok(self .service @@ -73,8 +78,25 @@ impl ServerGateway { } #[async_trait::async_trait] -impl WorkflowTaskProvider for ServerGateway { - async fn get_work(&self, task_queue: &str) -> Result { - self.poll(task_queue).await +impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result { + let request = RespondWorkflowTaskCompletedRequest { + task_token, + commands, + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), + namespace: self.opts.namespace.to_string(), + ..Default::default() + }; + Ok(self + .service + .clone() + .respond_workflow_task_completed(request) + .await? + .into_inner()) } } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 7a6b4507a..961639b4a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -2,7 +2,15 @@ use rand; use rand::Rng; use std::convert::TryFrom; use std::time::Duration; +use temporal_sdk_core::protos::coresdk::CompleteTaskReq; +use temporal_sdk_core::protos::temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, +}; use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::enums::v1::EventType; +use temporal_sdk_core::protos::temporal::api::history::v1::{ + history_event, TimerFiredEventAttributes, +}; use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; @@ -14,10 +22,10 @@ const TARGET_URI: &'static str = "http://localhost:7233"; // TODO try to consolidate this into the SDK code so we don't need to create another runtime. #[tokio::main] -async fn create_workflow() { +async fn create_workflow() -> (String, String) { let mut rng = rand::thread_rng(); - let workflow_id: u8 = rng.gen(); - let request_id: u8 = rng.gen(); + let workflow_id: u32 = rng.gen(); + let request_id: u32 = rng.gen(); let gateway_opts = ServerGatewayOptions { namespace: NAMESPACE.to_string(), identity: "none".to_string(), @@ -28,7 +36,7 @@ async fn create_workflow() { .connect(Url::try_from(TARGET_URI).unwrap()) .await .unwrap(); - gateway + let response = gateway .service .start_workflow_execution(StartWorkflowExecutionRequest { namespace: NAMESPACE.to_string(), @@ -45,11 +53,12 @@ async fn create_workflow() { }) .await .unwrap(); + (workflow_id.to_string(), response.into_inner().run_id) } #[test] -fn empty_poll() { - create_workflow(); +fn timer_workflow() { + let (workflow_id, run_id) = create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { target_url: Url::try_from(TARGET_URI).unwrap(), namespace: NAMESPACE.to_string(), @@ -58,6 +67,30 @@ fn empty_poll() { runtime: None, }) .unwrap(); - - dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let mut rng = rand::thread_rng(); + let timer_id: String = rng.gen::().to_string(); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + task.task_token, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task.task_token, + )) + .unwrap(); + dbg!( + "sent workflow done, completed workflow", + workflow_id, + run_id + ); } From c6e73888fe6e7fe67146fd9dd0f9c93bdd2b6005 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 21:11:58 -0800 Subject: [PATCH 05/13] fix mocks in tests --- src/lib.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 31279670d..857cf2540 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[macro_use] +extern crate mockall; mod machines; mod pollers; @@ -388,8 +390,19 @@ mod test { }, }; - use super::*; + mock! { + ServerGateway {} + #[async_trait::async_trait] + impl PollWorkflowTaskQueueApi for ServerGateway { + async fn poll(&self, task_queue: &str) -> Result; + } + #[async_trait::async_trait] + impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete(&self, task_token: Vec, commands: Vec) -> Result; + } + } + use super::*; #[test] fn workflow_bridge() { let s = span!(Level::DEBUG, "Test start"); @@ -398,6 +411,7 @@ mod test { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_id = "fake_timer"; + let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -446,9 +460,9 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkflowTaskProvider::new(); + let mut mock_provider = MockServerGateway::new(); mock_provider - .expect_get_work() + .expect_poll() .returning(move |_| Ok(tasks.pop_front().unwrap())); let runtime = Runtime::new().unwrap(); @@ -459,7 +473,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -481,7 +495,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), From 74261ee9441a16a09b5e705ae5d0b5066b6e9e0c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Feb 2021 21:45:12 -0800 Subject: [PATCH 06/13] Adding integration test for the poller and made task queue configurable for the poll task --- .buildkite/pipeline.yml | 10 +++++++++ Cargo.toml | 17 +++++++++++++-- README.md | 4 +++- src/lib.rs | 25 +++++++++++++++------ src/pollers/mod.rs | 37 +++++++++++++++++++++++++------- tests/integ_tests/poller_test.rs | 15 +++++++++++++ tests/main.rs | 4 ++++ 7 files changed, 95 insertions(+), 17 deletions(-) create mode 100644 tests/integ_tests/poller_test.rs create mode 100644 tests/main.rs diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 415db733f..1aaecbc72 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -31,4 +31,14 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + - label: "integ-test" + agents: + queue: "default" + docker: "*" + command: "cargo test --test integ_tests" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml - wait diff --git a/Cargo.toml b/Cargo.toml index 6eb064d9c..e52162b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,11 +21,17 @@ prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } -tonic = "0.4" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" +tower = "0.4.4" + +[dependencies.tonic] +version = "0.4" +#path = "../tonic/tonic" +# Using our fork for now which fixes grpc-timeout header getting stripped +git = "https://github.com/temporalio/tonic" [dependencies.rustfsm] path = "fsm" @@ -40,3 +46,10 @@ tonic-build = "0.4" [workspace] members = [".", "fsm"] + +[[test]] +name = "integ_tests" +path = "tests/main.rs" +# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with +# `cargo test --test integ_tests` +test = false \ No newline at end of file diff --git a/README.md b/README.md index cc352b7ba..d4f1667b4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Core SDK that can be used as a base for all other SDKs. +[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master) + +Core SDK that can be used as a base for all other Temporal SDKs. # Getting started This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a diff --git a/src/lib.rs b/src/lib.rs index 7a760a37d..d297cc518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,9 @@ mod machines; mod pollers; mod protosext; +pub use protosext::HistoryInfo; +pub use url::Url; + use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -38,15 +41,17 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; +use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; use tokio::runtime::Runtime; -use url::Url; +use tonic::codegen::http::uri::InvalidUri; /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; +const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; /// This trait is the primary way by which language specific SDKs interact with the core SDK. It is /// expected that only one instance of an implementation will exist for the lifetime of the @@ -57,7 +62,7 @@ pub trait Core { /// language SDK's responsibility to call the appropriate code with the provided inputs. /// /// TODO: Examples - fn poll_task(&self) -> Result; + fn poll_task(&self, task_queue: &str) -> Result; /// Tell the core that some work has been completed - whether as a result of running workflow /// code or executing an activity. @@ -90,6 +95,7 @@ pub fn init(opts: CoreInitOptions) -> Result { namespace: opts.namespace, identity: opts.identity, worker_binary_id: opts.worker_binary_id, + long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT), }; // Initialize server client let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; @@ -102,6 +108,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +pub enum TaskQueue { + Workflow(String), + _Activity(String), +} + struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs @@ -117,11 +128,11 @@ where WP: WorkflowTaskProvider, { #[instrument(skip(self))] - fn poll_task(&self) -> Result { + fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work("TODO: Real task queue"))?; + .block_on(self.work_provider.get_work(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -324,6 +335,8 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), + #[error("Invalid URI: {0:?}")] + InvalidUri(#[from] InvalidUri), } #[cfg(test)] @@ -414,7 +427,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -436,7 +449,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index d26b5d9d5..26a1c3c5f 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,11 +1,15 @@ -use crate::protos::temporal::api::enums::v1::TaskQueueKind; -use crate::protos::temporal::api::taskqueue::v1::TaskQueue; -use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; -use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +use std::time::Duration; + +use crate::{ + protos::temporal::api::enums::v1::TaskQueueKind, + protos::temporal::api::taskqueue::v1::TaskQueue, + protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + }, + Result, WorkflowTaskProvider, }; -use crate::Result; -use crate::WorkflowTaskProvider; +use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] @@ -13,17 +17,34 @@ pub(crate) struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub worker_binary_id: String, + pub long_poll_timeout: Duration, } impl ServerGatewayOptions { pub(crate) async fn connect(&self, target_url: Url) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + let channel = Channel::from_shared(target_url.to_string())? + .connect() + .await?; + let service = WorkflowServiceClient::with_interceptor(channel, intercept); Ok(ServerGateway { service, opts: self.clone(), }) } } + +/// This function will get called on each outbound request. Returning a +/// `Status` here will cancel the request and have that status returned to +/// the caller. +fn intercept(mut req: Request<()>) -> Result, Status> { + // TODO convert error + let metadata = req.metadata_mut(); + metadata.insert("grpc-timeout", "50000m".parse().unwrap()); + metadata.insert("client-name", "core-sdk".parse().unwrap()); + println!("Intercepting request: {:?}", req); + Ok(req) +} + /// Provides pub(crate) struct ServerGateway { service: WorkflowServiceClient, diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs new file mode 100644 index 000000000..bb7f957b0 --- /dev/null +++ b/tests/integ_tests/poller_test.rs @@ -0,0 +1,15 @@ +use std::convert::TryFrom; +use temporal_sdk_core::{Core, CoreInitOptions, Url}; + +#[test] +fn empty_poll() { + let core = temporal_sdk_core::init(CoreInitOptions { + target_url: Url::try_from("http://localhost:7233").unwrap(), + namespace: "default".to_string(), + identity: "none".to_string(), + binary_checksum: "".to_string(), + }) + .unwrap(); + + dbg!(core.poll_task("test-tq").unwrap()); +} diff --git a/tests/main.rs b/tests/main.rs new file mode 100644 index 000000000..976cf28e4 --- /dev/null +++ b/tests/main.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod integ_tests { + mod poller_test; +} From 77fabbbc0fb6110944048f85d929900aa24ed908 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 16:12:00 -0800 Subject: [PATCH 07/13] Fix build errors and add comments --- src/lib.rs | 33 ++++++++++++++++++-------------- tests/integ_tests/poller_test.rs | 2 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d297cc518..0209c886b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,19 +3,12 @@ //! This crate provides a basis for creating new Temporal SDKs without completely starting from //! scratch -#[macro_use] -extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +#[macro_use] +extern crate tracing; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - -pub use protosext::HistoryInfo; pub use url::Url; use crate::{ @@ -41,14 +34,20 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; -use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, + time::Duration, }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; +pub mod protos; + +mod machines; +mod pollers; +mod protosext; + /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -108,8 +107,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +/// Type of task queue to poll. pub enum TaskQueue { + /// Workflow task Workflow(String), + /// Activity task _Activity(String), } @@ -335,13 +337,16 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), - #[error("Invalid URI: {0:?}")] + /// Invalid URI: {0:?} InvalidUri(#[from] InvalidUri), } #[cfg(test)] mod test { - use super::*; + use std::collections::VecDeque; + + use tracing::Level; + use crate::{ machines::test_help::TestHistoryBuilder, protos::{ @@ -355,8 +360,8 @@ mod test { }, }, }; - use std::collections::VecDeque; - use tracing::Level; + + use super::*; #[test] fn workflow_bridge() { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index bb7f957b0..fd8c37356 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -7,7 +7,7 @@ fn empty_poll() { target_url: Url::try_from("http://localhost:7233").unwrap(), namespace: "default".to_string(), identity: "none".to_string(), - binary_checksum: "".to_string(), + worker_binary_id: "".to_string(), }) .unwrap(); From b260f7d05992ae99cba042960c5563698d55f2ae Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 17:10:00 -0800 Subject: [PATCH 08/13] Create workflow in the integ test --- Cargo.toml | 2 +- src/lib.rs | 21 +++++++----- src/pollers/mod.rs | 10 +++--- tests/integ_tests/poller_test.rs | 56 +++++++++++++++++++++++++++++--- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e52162b97..13f9c1b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" -tower = "0.4.4" +rand = "0.8.3" [dependencies.tonic] version = "0.4" diff --git a/src/lib.rs b/src/lib.rs index 0209c886b..ab96c6686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,18 @@ extern crate assert_matches; #[macro_use] extern crate tracing; +mod machines; +mod pollers; +pub mod protos; +mod protosext; + +pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, }, - pollers::ServerGatewayOptions, protos::{ coresdk::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, @@ -42,12 +47,6 @@ use std::{ use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -83,13 +82,19 @@ pub struct CoreInitOptions { /// A string that should be unique to the exact worker code/binary being executed pub worker_binary_id: String, + + /// Optional tokio runtime + pub runtime: Option, } /// Initializes an instance of the core sdk and establishes a connection to the temporal server. /// /// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { - let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; + let runtime = opts + .runtime + .map(Ok) + .unwrap_or_else(|| Runtime::new().map_err(CoreError::TokioInitError))?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 26a1c3c5f..badf0cfe2 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -13,7 +13,7 @@ use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] -pub(crate) struct ServerGatewayOptions { +pub struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub worker_binary_id: String, @@ -21,7 +21,7 @@ pub(crate) struct ServerGatewayOptions { } impl ServerGatewayOptions { - pub(crate) async fn connect(&self, target_url: Url) -> Result { + pub async fn connect(&self, target_url: Url) -> Result { let channel = Channel::from_shared(target_url.to_string())? .connect() .await?; @@ -46,9 +46,9 @@ fn intercept(mut req: Request<()>) -> Result, Status> { } /// Provides -pub(crate) struct ServerGateway { - service: WorkflowServiceClient, - opts: ServerGatewayOptions, +pub struct ServerGateway { + pub service: WorkflowServiceClient, + pub opts: ServerGatewayOptions, } impl ServerGateway { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index fd8c37356..7a6b4507a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,15 +1,63 @@ +use rand; +use rand::Rng; use std::convert::TryFrom; -use temporal_sdk_core::{Core, CoreInitOptions, Url}; +use std::time::Duration; +use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; +use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; +use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; + +const TASK_QUEUE: &str = "test-tq"; +const NAMESPACE: &str = "default"; + +const TARGET_URI: &'static str = "http://localhost:7233"; + +// TODO try to consolidate this into the SDK code so we don't need to create another runtime. +#[tokio::main] +async fn create_workflow() { + let mut rng = rand::thread_rng(); + let workflow_id: u8 = rng.gen(); + let request_id: u8 = rng.gen(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + }; + let mut gateway = gateway_opts + .connect(Url::try_from(TARGET_URI).unwrap()) + .await + .unwrap(); + gateway + .service + .start_workflow_execution(StartWorkflowExecutionRequest { + namespace: NAMESPACE.to_string(), + workflow_id: workflow_id.to_string(), + workflow_type: Some(WorkflowType { + name: "test-workflow".to_string(), + }), + task_queue: Some(TaskQueue { + name: TASK_QUEUE.to_string(), + kind: 0, + }), + request_id: request_id.to_string(), + ..Default::default() + }) + .await + .unwrap(); +} #[test] fn empty_poll() { + create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { - target_url: Url::try_from("http://localhost:7233").unwrap(), - namespace: "default".to_string(), + target_url: Url::try_from(TARGET_URI).unwrap(), + namespace: NAMESPACE.to_string(), identity: "none".to_string(), worker_binary_id: "".to_string(), + runtime: None, }) .unwrap(); - dbg!(core.poll_task("test-tq").unwrap()); + dbg!(core.poll_task(TASK_QUEUE).unwrap()); } From 8ef4abdc6d19ea0da760fe4a0c87193f49959764 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 20:19:52 -0800 Subject: [PATCH 09/13] Add end to end integ test for timer workflow --- src/lib.rs | 48 ++++++++++++++++++++++--------- src/pollers/mod.rs | 36 ++++++++++++++++++----- tests/integ_tests/poller_test.rs | 49 ++++++++++++++++++++++++++------ 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ab96c6686..31279670d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ #[cfg(test)] #[macro_use] -extern crate assert_matches; +pub extern crate assert_matches; #[macro_use] extern crate tracing; @@ -17,6 +17,10 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -106,7 +110,7 @@ pub fn init(opts: CoreInitOptions) -> Result { Ok(CoreSDK { runtime, - work_provider, + server_gateway: work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), }) @@ -123,7 +127,7 @@ pub enum TaskQueue { struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs - work_provider: WP, + server_gateway: WP, /// Key is run id workflow_machines: DashMap>)>, /// Maps task tokens to workflow run ids @@ -132,14 +136,14 @@ struct CoreSDK { impl Core for CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi, { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work(task_queue))?; + .block_on(self.server_gateway.poll(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -185,13 +189,19 @@ where status: Some(wfstatus), })), } => { - let wf_run_id = self + let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or(CoreError::NothingFoundForTaskToken(task_token))?; + .ok_or(CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?, + Status::Successful(success) => { + self.push_lang_commands(&run_id, success)?; + if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { + let commands = machines.0.get_commands(); + self.server_gateway.complete(task_token, commands); + } + } Status::Failed(_) => {} } Ok(()) @@ -204,13 +214,12 @@ where } _ => Err(CoreError::MalformedCompletion(req)), } - // TODO: Get fsm commands and send them to server (get_commands) } } impl CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -256,9 +265,22 @@ where /// implementor. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait WorkflowTaskProvider { +pub trait PollWorkflowTaskQueueApi { /// Fetch new work. Should block indefinitely if there is no work. - async fn get_work(&self, task_queue: &str) -> Result; + async fn poll(&self, task_queue: &str) -> Result; +} + +/// Implementors can provide new work to the SDK. The connection to the server is the real +/// implementor. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait RespondWorkflowTaskCompletedApi { + /// Fetch new work. Should block indefinitely if there is no work. + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -432,7 +454,7 @@ mod test { let runtime = Runtime::new().unwrap(); let core = CoreSDK { runtime, - work_provider: mock_provider, + server_gateway: mock_provider, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), }; diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index badf0cfe2..f8ecb808b 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,5 +1,9 @@ use std::time::Duration; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ protos::temporal::api::enums::v1::TaskQueueKind, protos::temporal::api::taskqueue::v1::TaskQueue, @@ -7,7 +11,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, }, - Result, WorkflowTaskProvider, + PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, Result, }; use tonic::{transport::Channel, Request, Status}; use url::Url; @@ -51,9 +55,10 @@ pub struct ServerGateway { pub opts: ServerGatewayOptions, } -impl ServerGateway { +#[async_trait::async_trait] +impl PollWorkflowTaskQueueApi for ServerGateway { async fn poll(&self, task_queue: &str) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.to_string(), task_queue: Some(TaskQueue { name: task_queue.to_string(), @@ -61,7 +66,7 @@ impl ServerGateway { }), identity: self.opts.identity.to_string(), binary_checksum: self.opts.worker_binary_id.to_string(), - }); + }; Ok(self .service @@ -73,8 +78,25 @@ impl ServerGateway { } #[async_trait::async_trait] -impl WorkflowTaskProvider for ServerGateway { - async fn get_work(&self, task_queue: &str) -> Result { - self.poll(task_queue).await +impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result { + let request = RespondWorkflowTaskCompletedRequest { + task_token, + commands, + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), + namespace: self.opts.namespace.to_string(), + ..Default::default() + }; + Ok(self + .service + .clone() + .respond_workflow_task_completed(request) + .await? + .into_inner()) } } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 7a6b4507a..961639b4a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -2,7 +2,15 @@ use rand; use rand::Rng; use std::convert::TryFrom; use std::time::Duration; +use temporal_sdk_core::protos::coresdk::CompleteTaskReq; +use temporal_sdk_core::protos::temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, +}; use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::enums::v1::EventType; +use temporal_sdk_core::protos::temporal::api::history::v1::{ + history_event, TimerFiredEventAttributes, +}; use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; @@ -14,10 +22,10 @@ const TARGET_URI: &'static str = "http://localhost:7233"; // TODO try to consolidate this into the SDK code so we don't need to create another runtime. #[tokio::main] -async fn create_workflow() { +async fn create_workflow() -> (String, String) { let mut rng = rand::thread_rng(); - let workflow_id: u8 = rng.gen(); - let request_id: u8 = rng.gen(); + let workflow_id: u32 = rng.gen(); + let request_id: u32 = rng.gen(); let gateway_opts = ServerGatewayOptions { namespace: NAMESPACE.to_string(), identity: "none".to_string(), @@ -28,7 +36,7 @@ async fn create_workflow() { .connect(Url::try_from(TARGET_URI).unwrap()) .await .unwrap(); - gateway + let response = gateway .service .start_workflow_execution(StartWorkflowExecutionRequest { namespace: NAMESPACE.to_string(), @@ -45,11 +53,12 @@ async fn create_workflow() { }) .await .unwrap(); + (workflow_id.to_string(), response.into_inner().run_id) } #[test] -fn empty_poll() { - create_workflow(); +fn timer_workflow() { + let (workflow_id, run_id) = create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { target_url: Url::try_from(TARGET_URI).unwrap(), namespace: NAMESPACE.to_string(), @@ -58,6 +67,30 @@ fn empty_poll() { runtime: None, }) .unwrap(); - - dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let mut rng = rand::thread_rng(); + let timer_id: String = rng.gen::().to_string(); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + task.task_token, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task.task_token, + )) + .unwrap(); + dbg!( + "sent workflow done, completed workflow", + workflow_id, + run_id + ); } From 119410d79c0fca8012c3ec90532164eb4ad18d42 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 21:11:58 -0800 Subject: [PATCH 10/13] fix mocks in tests --- src/lib.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 31279670d..857cf2540 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[macro_use] +extern crate mockall; mod machines; mod pollers; @@ -388,8 +390,19 @@ mod test { }, }; - use super::*; + mock! { + ServerGateway {} + #[async_trait::async_trait] + impl PollWorkflowTaskQueueApi for ServerGateway { + async fn poll(&self, task_queue: &str) -> Result; + } + #[async_trait::async_trait] + impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete(&self, task_token: Vec, commands: Vec) -> Result; + } + } + use super::*; #[test] fn workflow_bridge() { let s = span!(Level::DEBUG, "Test start"); @@ -398,6 +411,7 @@ mod test { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_id = "fake_timer"; + let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -446,9 +460,9 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkflowTaskProvider::new(); + let mut mock_provider = MockServerGateway::new(); mock_provider - .expect_get_work() + .expect_poll() .returning(move |_| Ok(tasks.pop_front().unwrap())); let runtime = Runtime::new().unwrap(); @@ -459,7 +473,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -481,7 +495,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), From fa10dae26b76e98df8b626d038bb835b7fbfba1d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 22:15:21 -0800 Subject: [PATCH 11/13] Fix mockall compile problem --- src/lib.rs | 12 ++++++------ tests/integ_tests/poller_test.rs | 30 +++++++++++++++--------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 857cf2540..fec0d5a8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[cfg(test)] #[macro_use] extern crate mockall; @@ -19,13 +20,10 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; -use crate::machines::ProtoCommand; -use crate::protos::temporal::api::workflowservice::v1::{ - RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, -}; use crate::{ machines::{ - ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, + ActivationListener, DrivenWorkflow, InconvertibleCommandError, ProtoCommand, WFCommand, + WorkflowMachines, }, protos::{ coresdk::{ @@ -38,7 +36,9 @@ use crate::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, }, - workflowservice::v1::PollWorkflowTaskQueueResponse, + workflowservice::{ + v1::PollWorkflowTaskQueueResponse, v1::RespondWorkflowTaskCompletedResponse, + }, }, }, protosext::{HistoryInfo, HistoryInfoError}, diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 961639b4a..1a870e552 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,19 +1,19 @@ -use rand; -use rand::Rng; -use std::convert::TryFrom; -use std::time::Duration; -use temporal_sdk_core::protos::coresdk::CompleteTaskReq; -use temporal_sdk_core::protos::temporal::api::command::v1::{ - CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, +use rand::{self, Rng}; +use std::{convert::TryFrom, time::Duration}; +use temporal_sdk_core::{ + protos::{ + coresdk::CompleteTaskReq, + temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + temporal::api::common::v1::WorkflowType, + temporal::api::enums::v1::EventType, + temporal::api::history::v1::{history_event, TimerFiredEventAttributes}, + temporal::api::taskqueue::v1::TaskQueue, + temporal::api::workflowservice::v1::StartWorkflowExecutionRequest, + }, + Core, CoreInitOptions, ServerGatewayOptions, Url, }; -use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; -use temporal_sdk_core::protos::temporal::api::enums::v1::EventType; -use temporal_sdk_core::protos::temporal::api::history::v1::{ - history_event, TimerFiredEventAttributes, -}; -use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; -use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; -use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; const TASK_QUEUE: &str = "test-tq"; const NAMESPACE: &str = "default"; From c0da81319f764270e247935719982cca6565c5a7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 22:20:35 -0800 Subject: [PATCH 12/13] Fix mockall compile problem, actually send completions using block_on --- src/lib.rs | 12 +++++------- tests/integ_tests/poller_test.rs | 2 -- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 38aaf8eda..3822b26e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[cfg(test)] #[macro_use] extern crate mockall; @@ -19,10 +20,6 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; -use crate::machines::ProtoCommand; -use crate::protos::temporal::api::workflowservice::v1::{ - RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, -}; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, ProtoCommand, WFCommand, @@ -39,8 +36,8 @@ use crate::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, }, - workflowservice::{ - v1::PollWorkflowTaskQueueResponse, v1::RespondWorkflowTaskCompletedResponse, + workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, }, }, }, @@ -204,7 +201,8 @@ where self.push_lang_commands(&run_id, success)?; if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { let commands = machines.0.get_commands(); - self.server_gateway.complete(task_token, commands); + self.runtime + .block_on(self.server_gateway.complete(task_token, commands))?; } } Status::Failed(_) => {} diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 1a870e552..903f2315f 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -7,8 +7,6 @@ use temporal_sdk_core::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, temporal::api::common::v1::WorkflowType, - temporal::api::enums::v1::EventType, - temporal::api::history::v1::{history_event, TimerFiredEventAttributes}, temporal::api::taskqueue::v1::TaskQueue, temporal::api::workflowservice::v1::StartWorkflowExecutionRequest, }, From 9d6a4c639b5ba9e0727c1de755c5edd89a7fa60f Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 4 Feb 2021 11:28:11 -0800 Subject: [PATCH 13/13] add event loop invocation into push_lang_commands --- src/lib.rs | 5 +++++ src/machines/workflow_machines.rs | 2 +- tests/integ_tests/poller_test.rs | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3822b26e8..385e0c067 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -260,6 +260,11 @@ where .unwrap() .1 .send(cmds)?; + self.workflow_machines + .get_mut(run_id) + .unwrap() + .0 + .event_loop(); Ok(()) } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1315e0df1..5b98767c9 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -381,7 +381,7 @@ impl WorkflowMachines { .expect("We have just ensured this is populated") } - fn event_loop(&mut self) -> Result<()> { + pub fn event_loop(&mut self) -> Result<()> { let results = self.drive_me.iterate_wf()?; self.handle_driven_results(results); diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 903f2315f..b4a35153b 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -56,7 +56,7 @@ async fn create_workflow() -> (String, String) { #[test] fn timer_workflow() { - let (workflow_id, run_id) = create_workflow(); + let (workflow_id, run_id) = dbg!(create_workflow()); let core = temporal_sdk_core::init(CoreInitOptions { target_url: Url::try_from(TARGET_URI).unwrap(), namespace: NAMESPACE.to_string(), @@ -72,6 +72,7 @@ fn timer_workflow() { core.complete_task(CompleteTaskReq::ok_from_api_attrs( StartTimerCommandAttributes { timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), ..Default::default() } .into(),