From 4809945646be7d2d9dd4f74b3349f63a46bc3f36 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 09:25:31 -0800 Subject: [PATCH 01/26] Use a local path to test patch --- Cargo.toml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 7c07e9756..00c215188 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,12 +20,15 @@ prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } -tonic = "0.4" tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.10" tracing-subscriber = "0.2" url = "2.2" +[dependencies.tonic] +version = "0.4" +path = "../tonic/tonic" + [dependencies.rustfsm] path = "fsm" From f4de8456880863c8e44462271f1319dd7159ae8c Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 09:53:10 -0800 Subject: [PATCH 02/26] Adding integ test for pollers --- Cargo.toml | 12 +++++++++++- src/lib.rs | 33 +++++++++++++++++++++------------ src/pollers/mod.rs | 27 +++++++++++++++++++++++++-- tests/poller_test.rs | 15 +++++++++++++++ 4 files changed, 72 insertions(+), 15 deletions(-) create mode 100644 tests/poller_test.rs diff --git a/Cargo.toml b/Cargo.toml index 7c07e9756..57fb71bb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["Spencer Judge ", "Vitaly Arbuzov "] edition = "2018" +[features] +default = [] +integ_tests = [] + [lib] # TODO: Feature flag opentelem stuff @@ -22,9 +26,10 @@ thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } tonic = "0.4" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" +tower = "0.4.4" [dependencies.rustfsm] path = "fsm" @@ -39,3 +44,8 @@ tonic-build = "0.4" [workspace] members = [".", "fsm"] + +[[test]] +name = "poller_test" +path = "tests/poller_test.rs" +required-features = ["integ_tests"] \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 1fcf135b5..04efa3619 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -10,6 +10,7 @@ pub mod protos; mod protosext; pub use protosext::HistoryInfo; +pub use url::Url; use crate::{ machines::{ @@ -34,31 +35,31 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; +use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; use tokio::runtime::Runtime; -use url::Url; +use tonic::codegen::http::uri::InvalidUri; pub type Result = std::result::Result; - +const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; // TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have // any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can // be accessed from many threads, but each workflow is pinned to one thread (possibly with many // sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should // be both, ideally. pub trait Core { - fn poll_task(&self) -> Result; + fn poll_task(&self, task_queue: &str) -> Result; fn complete_task(&self, req: CompleteTaskReq) -> Result<()>; } pub struct CoreInitOptions { - target_url: Url, - namespace: String, - _task_queue: Vec, - identity: String, - binary_checksum: String, + pub target_url: Url, + pub namespace: String, + pub identity: String, + pub binary_checksum: String, } /// Initializes instance of the core sdk and establishes connection to the temporal server. @@ -69,6 +70,7 @@ pub fn init(opts: CoreInitOptions) -> Result { namespace: opts.namespace, identity: opts.identity, binary_checksum: opts.binary_checksum, + long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT), }; // Initialize server client let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; @@ -81,6 +83,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +pub enum TaskQueue { + Workflow(String), + _Activity(String), +} + pub struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs @@ -96,11 +103,11 @@ where WP: WorkflowTaskProvider, { #[instrument(skip(self))] - fn poll_task(&self) -> Result { + fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work("TODO: Real task queue"))?; + .block_on(self.work_provider.get_work(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -301,6 +308,8 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), #[error("Failed to initialize tokio runtime: {0:?}")] TokioInitError(std::io::Error), + #[error("Invalid URI: {0:?}")] + InvalidUri(#[from] InvalidUri), } #[cfg(test)] @@ -391,7 +400,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( res, @@ -417,7 +426,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( res, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 9cfbc03b4..6b5622f85 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,3 +1,10 @@ +use std::time::Duration; + +use tonic::metadata::MetadataValue; +use tonic::transport::Channel; +use tonic::{Request, Status}; +use url::Url; + use crate::protos::temporal::api::enums::v1::TaskQueueKind; use crate::protos::temporal::api::taskqueue::v1::TaskQueue; use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; @@ -6,24 +13,40 @@ use crate::protos::temporal::api::workflowservice::v1::{ }; use crate::Result; use crate::WorkflowTaskProvider; -use url::Url; #[derive(Clone)] pub(crate) struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub binary_checksum: String, + pub long_poll_timeout: Duration, } impl ServerGatewayOptions { pub(crate) async fn connect(&self, target_url: Url) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + let channel = Channel::from_shared(target_url.to_string())? + .connect() + .await?; + let service = WorkflowServiceClient::with_interceptor(channel, intercept); Ok(ServerGateway { service, opts: self.clone(), }) } } + +/// This function will get called on each outbound request. Returning a +/// `Status` here will cancel the request and have that status returned to +/// the caller. +fn intercept(mut req: Request<()>) -> Result, Status> { + // TODO convert error + let metadata = req.metadata_mut(); + metadata.insert("grpc-timeout", "500m".parse().unwrap()); + metadata.insert("client-name", "core-sdk".parse().unwrap()); + println!("Intercepting request: {:?}", req); + Ok(req) +} + /// Provides pub(crate) struct ServerGateway { service: WorkflowServiceClient, diff --git a/tests/poller_test.rs b/tests/poller_test.rs new file mode 100644 index 000000000..bb7f957b0 --- /dev/null +++ b/tests/poller_test.rs @@ -0,0 +1,15 @@ +use std::convert::TryFrom; +use temporal_sdk_core::{Core, CoreInitOptions, Url}; + +#[test] +fn empty_poll() { + let core = temporal_sdk_core::init(CoreInitOptions { + target_url: Url::try_from("http://localhost:7233").unwrap(), + namespace: "default".to_string(), + identity: "none".to_string(), + binary_checksum: "".to_string(), + }) + .unwrap(); + + dbg!(core.poll_task("test-tq").unwrap()); +} From 37cc9dfe48b6f0f25afc2ff72f9d644a582db1e8 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 10:06:43 -0800 Subject: [PATCH 03/26] Server accepts request, hits timeout, but we get lame error --- Cargo.toml | 4 +++- src/pollers/mod.rs | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 618ca818a..7872147d3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,7 +32,9 @@ tower = "0.4.4" [dependencies.tonic] version = "0.4" -path = "../tonic/tonic" +#path = "../tonic/tonic" +# Using our fork for now which fixes grpc-timeout header getting stripped +git = "https://github.com/temporalio/tonic" [dependencies.rustfsm] path = "fsm" diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 6b5622f85..6558bb282 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -41,7 +41,7 @@ impl ServerGatewayOptions { fn intercept(mut req: Request<()>) -> Result, Status> { // TODO convert error let metadata = req.metadata_mut(); - metadata.insert("grpc-timeout", "500m".parse().unwrap()); + metadata.insert("grpc-timeout", "50000m".parse().unwrap()); metadata.insert("client-name", "core-sdk".parse().unwrap()); println!("Intercepting request: {:?}", req); Ok(req) From 4feb1a722153c688214d186a44910231b1012f41 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 11:01:05 -0800 Subject: [PATCH 04/26] Avoid needing feature flag for integ testing --- .buildkite/pipeline.yml | 10 ++++++++++ Cargo.toml | 12 +++++------- README.md | 4 +++- src/pollers/mod.rs | 22 ++++++++++------------ tests/main.rs | 1 + 5 files changed, 29 insertions(+), 20 deletions(-) create mode 100644 tests/main.rs diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 92e370956..89d56488e 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -29,4 +29,14 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + - label: "integ-test" + agents: + queue: "default" + docker: "*" + command: "cargo test --test integ_tests" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml - wait diff --git a/Cargo.toml b/Cargo.toml index 7872147d3..232e460ec 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,10 +4,6 @@ version = "0.1.0" authors = ["Spencer Judge ", "Vitaly Arbuzov "] edition = "2018" -[features] -default = [] -integ_tests = [] - [lib] # TODO: Feature flag opentelem stuff @@ -51,6 +47,8 @@ tonic-build = "0.4" members = [".", "fsm"] [[test]] -name = "poller_test" -path = "tests/poller_test.rs" -required-features = ["integ_tests"] \ No newline at end of file +name = "integ_tests" +path = "tests/main.rs" +# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with +# `cargo test --test integ_tests` +test = false \ No newline at end of file diff --git a/README.md b/README.md index cc352b7ba..d4f1667b4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Core SDK that can be used as a base for all other SDKs. +[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master) + +Core SDK that can be used as a base for all other Temporal SDKs. # Getting started This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 6558bb282..7abe3709e 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,18 +1,16 @@ use std::time::Duration; -use tonic::metadata::MetadataValue; -use tonic::transport::Channel; -use tonic::{Request, Status}; -use url::Url; - -use crate::protos::temporal::api::enums::v1::TaskQueueKind; -use crate::protos::temporal::api::taskqueue::v1::TaskQueue; -use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; -use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +use crate::{ + protos::temporal::api::enums::v1::TaskQueueKind, + protos::temporal::api::taskqueue::v1::TaskQueue, + protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + }, + Result, WorkflowTaskProvider, }; -use crate::Result; -use crate::WorkflowTaskProvider; +use tonic::{transport::Channel, Request, Status}; +use url::Url; #[derive(Clone)] pub(crate) struct ServerGatewayOptions { diff --git a/tests/main.rs b/tests/main.rs new file mode 100644 index 000000000..519643a53 --- /dev/null +++ b/tests/main.rs @@ -0,0 +1 @@ +mod poller_test; From 21156834bfcc270cfedb4f515776f6504b4d66de Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 11:18:26 -0800 Subject: [PATCH 05/26] Actually make test selection work how I wanted it to --- tests/{ => integ_tests}/poller_test.rs | 0 tests/main.rs | 5 ++++- 2 files changed, 4 insertions(+), 1 deletion(-) rename tests/{ => integ_tests}/poller_test.rs (100%) diff --git a/tests/poller_test.rs b/tests/integ_tests/poller_test.rs similarity index 100% rename from tests/poller_test.rs rename to tests/integ_tests/poller_test.rs diff --git a/tests/main.rs b/tests/main.rs index 519643a53..976cf28e4 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1 +1,4 @@ -mod poller_test; +#[cfg(test)] +mod integ_tests { + mod poller_test; +} From cd957ceb3ac29c11f81ad8d54481d31f549cb27b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Feb 2021 21:45:12 -0800 Subject: [PATCH 06/26] Adding integration test for the poller and made task queue configurable for the poll task --- .buildkite/docker/Dockerfile | 2 + .buildkite/docker/docker-compose.yaml | 2 + .buildkite/pipeline.yml | 14 ++- Cargo.toml | 18 ++- README.md | 4 +- build.rs | 7 +- protos/local/core_interface.proto | 59 ++++++---- src/lib.rs | 132 +++++++++++++--------- src/machines/mod.rs | 7 +- src/machines/test_help/history_builder.rs | 66 +---------- src/machines/test_help/workflow_driver.rs | 6 +- src/machines/timer_state_machine.rs | 10 +- src/machines/workflow_machines.rs | 37 +++--- src/pollers/mod.rs | 41 +++++-- src/protos/mod.rs | 28 +++++ src/protosext/history_info.rs | 11 +- src/protosext/mod.rs | 2 +- tests/integ_tests/poller_test.rs | 15 +++ tests/main.rs | 4 + 19 files changed, 278 insertions(+), 187 deletions(-) create mode 100644 tests/integ_tests/poller_test.rs create mode 100644 tests/main.rs diff --git a/.buildkite/docker/Dockerfile b/.buildkite/docker/Dockerfile index 3acabbd57..f32421f6e 100644 --- a/.buildkite/docker/Dockerfile +++ b/.buildkite/docker/Dockerfile @@ -3,4 +3,6 @@ FROM rust:latest RUN rustup component add rustfmt && \ rustup component add clippy +RUN cargo install cargo-tarpaulin + WORKDIR /sdk-core diff --git a/.buildkite/docker/docker-compose.yaml b/.buildkite/docker/docker-compose.yaml index 13d3cc132..0aca4f6e2 100644 --- a/.buildkite/docker/docker-compose.yaml +++ b/.buildkite/docker/docker-compose.yaml @@ -5,6 +5,8 @@ services: build: context: ../../ dockerfile: .buildkite/docker/Dockerfile + security_opt: + - seccomp:unconfined command: /bin/sh -c ".buildkite/docker/build.sh" environment: - "USER=unittest" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 92e370956..1aaecbc72 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -23,7 +23,19 @@ steps: agents: queue: "default" docker: "*" - command: "cargo test --workspace" + command: "cargo tarpaulin --out Html --workspace" + artifact_paths: + - "tarpaulin-report.html" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml + - label: "integ-test" + agents: + queue: "default" + docker: "*" + command: "cargo test --test integ_tests" timeout_in_minutes: 15 plugins: - docker-compose#v3.0.0: diff --git a/Cargo.toml b/Cargo.toml index 7c07e9756..e52162b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ anyhow = "1.0" async-trait = "0.1" dashmap = "4.0" derive_more = "0.99" +displaydoc = "0.1" env_logger = "0.8" futures = "0.3" log = "0.4" @@ -20,11 +21,17 @@ prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } -tonic = "0.4" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" +tower = "0.4.4" + +[dependencies.tonic] +version = "0.4" +#path = "../tonic/tonic" +# Using our fork for now which fixes grpc-timeout header getting stripped +git = "https://github.com/temporalio/tonic" [dependencies.rustfsm] path = "fsm" @@ -39,3 +46,10 @@ tonic-build = "0.4" [workspace] members = [".", "fsm"] + +[[test]] +name = "integ_tests" +path = "tests/main.rs" +# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with +# `cargo test --test integ_tests` +test = false \ No newline at end of file diff --git a/README.md b/README.md index cc352b7ba..d4f1667b4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Core SDK that can be used as a base for all other SDKs. +[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master) + +Core SDK that can be used as a base for all other Temporal SDKs. # Getting started This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a diff --git a/build.rs b/build.rs index cb6500556..11f4dc705 100644 --- a/build.rs +++ b/build.rs @@ -15,12 +15,9 @@ fn main() -> Result<(), Box> { "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") + .type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.WFActivation.attributes", - "#[derive(::derive_more::From)]", - ) - .type_attribute( - "coresdk.WorkflowTask.attributes", + "coresdk.WFActivationJob.attributes", "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 920e1f008..82af4ecdb 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -16,24 +16,49 @@ import "temporal/api/failure/v1/message.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/command/v1/message.proto"; -service Core { - rpc PollTask (PollTaskReq) returns (Task) {} - rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {} -} - +// A request as given to [crate::Core::poll_task] message PollTaskReq { + // If true, poll for workflow tasks bool workflows = 1; + // If true, poll for activity tasks bool activities = 2; } +// An instruction to perform work from core->lang sdk message Task { + // A unique identifier for this task bytes task_token = 1; + // The type of task to be performed oneof variant { + // Wake up a workflow WFActivation workflow = 2; + // Run an activity ActivityTask activity = 3; } } +// An instruction to the lang sdk to run some workflow code, whether for the first time or from +// a cached state. +message WFActivation { + // Time the activation(s) were requested + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; + // The id of the currently active run of the workflow + string run_id = 2; + // The things to do upon activating the workflow + repeated WFActivationJob jobs = 3; +} + +message WFActivationJob { + oneof attributes { + // TODO could literally be attributes from events? -- maybe we don't need our own types + + // Begin a workflow for the first time + StartWorkflowTaskAttributes start_workflow = 1; + // A timer has fired, allowing whatever was waiting on it (if anything) to proceed + TimerFiredTaskAttributes timer_fired = 2; + } +} + message StartWorkflowTaskAttributes { // The identifier the lang-specific sdk uses to execute workflow code string workflow_type = 1; @@ -43,35 +68,27 @@ message StartWorkflowTaskAttributes { temporal.api.common.v1.Payloads arguments = 3; // TODO: Do we need namespace here, or should that just be fetchable easily? - - // will be others - workflow exe started attrs, etc + // will be others - workflow exe started attrs, etc } -// maybe we just go back to timer fired to keep consistent -message UnblockTimerTaskAttributes { +message TimerFiredTaskAttributes { string timer_id = 1; } -message WFActivation { - google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; - string run_id = 2; - oneof attributes { - // could literally be attributes from events -- maybe we don't need our own types - StartWorkflowTaskAttributes start_workflow = 3; - UnblockTimerTaskAttributes unblock_timer = 4; - } -} - message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; } +// Sent from lang side to core when calling [crate::Core::complete_task] message CompleteTaskReq { + // The id from the [Task] being completed bytes task_token = 1; oneof completion { + // Complete a workflow task WFActivationCompletion workflow = 2; + // Complete an activity task ActivityTaskCompletion activity = 3; } } @@ -94,6 +111,7 @@ message CoreCommand { // Reserved for specific commands } +// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next message Command { oneof variant { temporal.api.command.v1.Command api = 1; @@ -102,13 +120,16 @@ message Command { } message WFActivationSuccess { + // A list of commands to send back to the temporal server repeated Command commands = 1; + // Other bits from RespondWorkflowTaskCompletedRequest as needed } message WFActivationFailure { temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; temporal.api.failure.v1.Failure failure = 2; + // Other bits from RespondWorkflowTaskFailedRequest as needed } diff --git a/src/lib.rs b/src/lib.rs index 1fcf135b5..d297cc518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,15 +1,22 @@ +#![warn(missing_docs)] // error if there are missing docs + +//! This crate provides a basis for creating new Temporal SDKs without completely starting from +//! scratch + #[macro_use] extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +pub mod protos; + mod machines; mod pollers; -pub mod protos; mod protosext; pub use protosext::HistoryInfo; +pub use url::Url; use crate::{ machines::{ @@ -30,45 +37,65 @@ use crate::{ workflowservice::v1::PollWorkflowTaskQueueResponse, }, }, - protosext::HistoryInfoError, + protosext::{HistoryInfo, HistoryInfoError}, }; use anyhow::Error; use dashmap::DashMap; +use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; use tokio::runtime::Runtime; -use url::Url; +use tonic::codegen::http::uri::InvalidUri; +/// A result alias having [CoreError] as the error type pub type Result = std::result::Result; +const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; -// TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have -// any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can -// be accessed from many threads, but each workflow is pinned to one thread (possibly with many -// sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should -// be both, ideally. +/// This trait is the primary way by which language specific SDKs interact with the core SDK. It is +/// expected that only one instance of an implementation will exist for the lifetime of the +/// worker(s) using it. pub trait Core { - fn poll_task(&self) -> Result; + /// Ask the core for some work, returning a [Task], which will eventually contain either a + /// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the + /// language SDK's responsibility to call the appropriate code with the provided inputs. + /// + /// TODO: Examples + fn poll_task(&self, task_queue: &str) -> Result; + + /// Tell the core that some work has been completed - whether as a result of running workflow + /// code or executing an activity. fn complete_task(&self, req: CompleteTaskReq) -> Result<()>; } +/// Holds various configuration information required to call [init] pub struct CoreInitOptions { - target_url: Url, - namespace: String, - _task_queue: Vec, - identity: String, - binary_checksum: String, + /// The URL of the Temporal server to connect to + pub target_url: Url, + + /// The namespace on the server your worker will be using + pub namespace: String, + + /// A human-readable string that can identify your worker + /// + /// TODO: Probably belongs in future worker abstraction + pub identity: String, + + /// A string that should be unique to the exact worker code/binary being executed + pub worker_binary_id: String, } -/// Initializes instance of the core sdk and establishes connection to the temporal server. -/// Creates tokio runtime that will be used for all client-server interactions. +/// Initializes an instance of the core sdk and establishes a connection to the temporal server. +/// +/// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, - binary_checksum: opts.binary_checksum, + worker_binary_id: opts.worker_binary_id, + long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT), }; // Initialize server client let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; @@ -81,7 +108,12 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } -pub struct CoreSDK { +pub enum TaskQueue { + Workflow(String), + _Activity(String), +} + +struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs work_provider: WP, @@ -96,11 +128,11 @@ where WP: WorkflowTaskProvider, { #[instrument(skip(self))] - fn poll_task(&self) -> Result { + fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work("TODO: Real task queue"))?; + .block_on(self.work_provider.get_work(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -226,7 +258,7 @@ pub trait WorkflowTaskProvider { /// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering /// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService] #[derive(Debug)] -pub struct WorkflowBridge { +pub(crate) struct WorkflowBridge { // does wf id belong in here? started_attrs: Option, incoming_commands: Receiver>, @@ -276,31 +308,35 @@ impl DrivenWorkflow for WorkflowBridge { // Real bridge doesn't actually need to listen impl ActivationListener for WorkflowBridge {} -#[derive(thiserror::Error, Debug)] +/// The error type returned by interactions with [Core] +#[derive(thiserror::Error, Debug, displaydoc::Display)] #[allow(clippy::large_enum_variant)] +// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum CoreError { - #[error("Unknown service error")] + /// Unknown service error Unknown, - #[error("No tasks to perform for now")] + /// No tasks to perform for now NoWork, - #[error("Poll response from server was malformed: {0:?}")] + /// Poll response from server was malformed: {0:?} BadDataFromWorkProvider(PollWorkflowTaskQueueResponse), - #[error("Lang SDK sent us a malformed completion: {0:?}")] + /// Lang SDK sent us a malformed completion: {0:?} MalformedCompletion(CompleteTaskReq), - #[error("Error buffering commands")] + /// Error buffering commands CantSendCommands(#[from] SendError>), - #[error("Couldn't interpret command from ")] + /// Couldn't interpret command from UninterprableCommand(#[from] InconvertibleCommandError), - #[error("Underlying error in history processing")] + /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), - #[error("Task token had nothing associated with it: {0:?}")] + /// Task token had nothing associated with it: {0:?} NothingFoundForTaskToken(Vec), - #[error("Error calling the service: {0:?}")] + /// Error calling the service: {0:?} TonicError(#[from] tonic::Status), - #[error("Server connection error: {0:?}")] + /// Server connection error: {0:?} TonicTransportError(#[from] tonic::transport::Error), - #[error("Failed to initialize tokio runtime: {0:?}")] + /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), + #[error("Invalid URI: {0:?}")] + InvalidUri(#[from] InvalidUri), } #[cfg(test)] @@ -309,7 +345,7 @@ mod test { use crate::{ machines::test_help::TestHistoryBuilder, protos::{ - coresdk::{task, wf_activation, WfActivation}, + coresdk::{wf_activation_job, WfActivationJob}, temporal::api::{ command::v1::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, @@ -391,17 +427,13 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( - res, - Task { - variant: Some(task::Variant::Workflow(WfActivation { - attributes: Some(wf_activation::Attributes::StartWorkflow(_)), - .. - })), - .. - } + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)), + }] ); assert!(core.workflow_machines.get(run_id).is_some()); @@ -417,17 +449,13 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( - res, - Task { - variant: Some(task::Variant::Workflow(WfActivation { - attributes: Some(wf_activation::Attributes::UnblockTimer(_)), - .. - })), - .. - } + res.get_wf_jobs().as_slice(), + [WfActivationJob { + attributes: Some(wf_activation_job::Attributes::TimerFired(_)), + }] ); let task_tok = res.task_token; core.complete_task(CompleteTaskReq::ok_from_api_attrs( diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 48d2ef3e7..0f62f33cc 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -39,9 +39,8 @@ pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; use crate::{ machines::workflow_machines::WorkflowTrigger, - protos::coresdk::wf_activation, protos::{ - coresdk::{self, command::Variant}, + coresdk::{self, command::Variant, wf_activation_job}, temporal::api::{ command::v1::{ command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes, @@ -93,10 +92,10 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send { ) -> Result<(), anyhow::Error>; } -/// Allows observers to listen to newly generated outgoing activations. Used for testing, where +/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where /// some activations must be handled before outgoing commands are issued to avoid deadlocking. pub(crate) trait ActivationListener { - fn on_activation(&mut self, _activation: &wf_activation::Attributes) {} + fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {} } /// The struct for [WFCommand::AddCommand] diff --git a/src/machines/test_help/history_builder.rs b/src/machines/test_help/history_builder.rs index fcb319276..2f4dac37f 100644 --- a/src/machines/test_help/history_builder.rs +++ b/src/machines/test_help/history_builder.rs @@ -143,70 +143,8 @@ impl TestHistoryBuilder { wf_machines: &mut WorkflowMachines, to_wf_task_num: Option, ) -> Result<()> { - let to_wf_task_num = to_wf_task_num.unwrap_or(usize::MAX); - let (_, events) = self - .events - .split_at(wf_machines.get_last_started_event_id() as usize); - let mut history = events.iter().peekable(); - - let hist_info = self.get_history_info(to_wf_task_num)?; - wf_machines.set_started_ids( - hist_info.previous_started_event_id, - hist_info.workflow_task_started_event_id, - ); - let mut started_id = hist_info.previous_started_event_id; - let mut num_seen_wf_tasks = if wf_machines.get_last_started_event_id() > 0 { - self.get_workflow_task_count(history.peek().map(|e| e.event_id - 1))? - } else { - 0 - }; - - while let Some(event) = history.next() { - let next_event = history.peek(); - - if event.event_type == EventType::WorkflowTaskStarted as i32 { - let next_is_completed = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskCompleted as i32 - }); - let next_is_failed_or_timeout = next_event.map_or(false, |ne| { - ne.event_type == EventType::WorkflowTaskFailed as i32 - || ne.event_type == EventType::WorkflowTaskTimedOut as i32 - }); - - if next_event.is_none() || next_is_completed { - started_id = event.event_id; - num_seen_wf_tasks += 1; - if num_seen_wf_tasks == to_wf_task_num || next_event.is_none() { - wf_machines.handle_event(event, false)?; - return Ok(()); - } - } else if next_event.is_some() && !next_is_failed_or_timeout { - bail!( - "Invalid history! Event {:?} should be WF task completed, \ - failed, or timed out.", - &event - ); - } - } - - wf_machines.handle_event(event, next_event.is_some())?; - - if next_event.is_none() { - if event.is_final_wf_execution_event() { - return Ok(()); - } - if started_id != event.event_id { - bail!( - "The last event in the history (id {}) isn't the last WF task \ - started (id {})", - event.event_id, - started_id - ); - } - unreachable!() - } - } - + let histinfo = HistoryInfo::new_from_events(&self.events, to_wf_task_num)?; + histinfo.apply_history_events(wf_machines)?; Ok(()) } diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index d6dd8fe05..92aa306e1 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -2,7 +2,7 @@ use super::Result; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::{wf_activation::Attributes, UnblockTimerTaskAttributes}, + coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes}, temporal::api::{ command::v1::StartTimerCommandAttributes, history::v1::{ @@ -58,8 +58,8 @@ where } impl ActivationListener for TestWorkflowDriver { - fn on_activation(&mut self, activation: &Attributes) { - if let Attributes::UnblockTimer(UnblockTimerTaskAttributes { timer_id }) = activation { + fn on_activation_job(&mut self, activation: &Attributes) { + if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation { Arc::get_mut(&mut self.cache) .unwrap() .unblocked_timers diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index e29006fb1..b217efdb3 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -1,14 +1,12 @@ #![allow(clippy::large_enum_variant)] -use crate::machines::workflow_machines::WorkflowTrigger; -use crate::protos::coresdk::{wf_activation, UnblockTimerTaskAttributes, WfActivation}; use crate::{ machines::{ - workflow_machines::WFMachinesError, workflow_machines::WorkflowMachines, AddCommand, - CancellableCommand, WFCommand, WFMachinesAdapter, + workflow_machines::{WFMachinesError, WorkflowMachines, WorkflowTrigger}, + AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter, }, protos::{ - coresdk::HistoryEventId, + coresdk::{HistoryEventId, TimerFiredTaskAttributes, WfActivation}, temporal::api::{ command::v1::{ command::Attributes, CancelTimerCommandAttributes, Command, @@ -224,7 +222,7 @@ impl WFMachinesAdapter for TimerMachine { ) -> Result, WFMachinesError> { match my_command { // Fire the completion - TimerMachineCommand::Complete(_event) => Ok(vec![UnblockTimerTaskAttributes { + TimerMachineCommand::Complete(_event) => Ok(vec![TimerFiredTaskAttributes { timer_id: self.shared_state.timer_attributes.timer_id.clone(), } .into()]), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 9f3d4cad5..1315e0df1 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,4 +1,5 @@ use crate::machines::ActivationListener; +use crate::protos::coresdk::WfActivationJob; use crate::{ machines::{ complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer, @@ -6,7 +7,7 @@ use crate::{ ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}, + coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation}, temporal::api::{ command::v1::StartTimerCommandAttributes, common::v1::WorkflowExecution, @@ -62,8 +63,8 @@ pub(crate) struct WorkflowMachines { /// Old note: It is a queue as commands can be added (due to marker based commands) while /// iterating over already added commands. current_wf_task_commands: VecDeque, - /// Outgoing activations that need to be sent to the lang sdk - outgoing_wf_activations: VecDeque, + /// Outgoing activation jobs that need to be sent to the lang sdk + outgoing_wf_activation_jobs: VecDeque, /// The workflow that is being driven by this instance of the machines drive_me: Box, @@ -73,7 +74,7 @@ pub(crate) struct WorkflowMachines { #[derive(Debug, derive_more::From)] #[must_use] pub(super) enum WorkflowTrigger { - PushWFActivation(#[from(forward)] wf_activation::Attributes), + PushWFJob(#[from(forward)] wf_activation_job::Attributes), TriggerWFTaskStarted { task_started_event_id: i64, time: SystemTime, @@ -114,7 +115,7 @@ impl WorkflowMachines { machines_by_id: Default::default(), commands: Default::default(), current_wf_task_commands: Default::default(), - outgoing_wf_activations: Default::default(), + outgoing_wf_activation_jobs: Default::default(), } } @@ -287,7 +288,7 @@ impl WorkflowMachines { { self.run_id = attrs.original_execution_run_id.clone(); // We need to notify the lang sdk that it's time to kick off a workflow - self.outgoing_wf_activations.push_back( + self.outgoing_wf_activation_jobs.push_back( StartWorkflowTaskAttributes { // TODO: This needs to be set during init workflow_type: "".to_string(), @@ -344,14 +345,20 @@ impl WorkflowMachines { /// Returns the next activation that needs to be performed by the lang sdk. Things like unblock /// timer, etc. pub(crate) fn get_wf_activation(&mut self) -> Option { - self.outgoing_wf_activations - .pop_front() - .map(|attrs| WfActivation { - // todo wat ? - timestamp: None, + if self.outgoing_wf_activation_jobs.is_empty() { + None + } else { + let jobs = self + .outgoing_wf_activation_jobs + .drain(..) + .map(Into::into) + .collect(); + Some(WfActivation { + timestamp: self.current_wf_time.map(Into::into), run_id: self.run_id.clone(), - attributes: attrs.into(), + jobs, }) + } } /// Given an event id (possibly zero) of the last successfully executed workflow task and an @@ -394,9 +401,9 @@ impl WorkflowMachines { event!(Level::DEBUG, msg = "Machine produced triggers", ?triggers); for trigger in triggers { match trigger { - WorkflowTrigger::PushWFActivation(a) => { - self.drive_me.on_activation(&a); - self.outgoing_wf_activations.push_back(a); + WorkflowTrigger::PushWFJob(a) => { + self.drive_me.on_activation_job(&a); + self.outgoing_wf_activation_jobs.push_back(a); } WorkflowTrigger::TriggerWFTaskStarted { task_started_event_id, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 9cfbc03b4..26a1c3c5f 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,29 +1,50 @@ -use crate::protos::temporal::api::enums::v1::TaskQueueKind; -use crate::protos::temporal::api::taskqueue::v1::TaskQueue; -use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; -use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +use std::time::Duration; + +use crate::{ + protos::temporal::api::enums::v1::TaskQueueKind, + protos::temporal::api::taskqueue::v1::TaskQueue, + protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + }, + Result, WorkflowTaskProvider, }; -use crate::Result; -use crate::WorkflowTaskProvider; +use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] pub(crate) struct ServerGatewayOptions { pub namespace: String, pub identity: String, - pub binary_checksum: String, + pub worker_binary_id: String, + pub long_poll_timeout: Duration, } impl ServerGatewayOptions { pub(crate) async fn connect(&self, target_url: Url) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + let channel = Channel::from_shared(target_url.to_string())? + .connect() + .await?; + let service = WorkflowServiceClient::with_interceptor(channel, intercept); Ok(ServerGateway { service, opts: self.clone(), }) } } + +/// This function will get called on each outbound request. Returning a +/// `Status` here will cancel the request and have that status returned to +/// the caller. +fn intercept(mut req: Request<()>) -> Result, Status> { + // TODO convert error + let metadata = req.metadata_mut(); + metadata.insert("grpc-timeout", "50000m".parse().unwrap()); + metadata.insert("client-name", "core-sdk".parse().unwrap()); + println!("Intercepting request: {:?}", req); + Ok(req) +} + /// Provides pub(crate) struct ServerGateway { service: WorkflowServiceClient, @@ -39,7 +60,7 @@ impl ServerGateway { kind: TaskQueueKind::Unspecified as i32, }), identity: self.opts.identity.to_string(), - binary_checksum: self.opts.binary_checksum.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), }); Ok(self diff --git a/src/protos/mod.rs b/src/protos/mod.rs index f7e1231e5..47232bd12 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -1,9 +1,18 @@ +//! Contains the protobuf definitions used as arguments to and return values from interactions with +//! [super::Core]. Language SDK authors can generate structs using the proto definitions that will match +//! the generated structs in this module. + #[allow(clippy::large_enum_variant)] +// I'd prefer not to do this, but there are some generated things that just don't need it. +#[allow(missing_docs)] pub mod coresdk { + //! Contains all protobufs relating to communication between core and lang-specific SDKs + include!("coresdk.rs"); use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; use crate::protos::coresdk::complete_task_req::Completion; + use crate::protos::coresdk::wf_activation_job::Attributes; use command::Variant; pub type HistoryEventId = i64; @@ -15,6 +24,24 @@ pub mod coresdk { variant: Some(t.into()), } } + + /// Returns any contained jobs if this task was a wf activation and it had some + #[cfg(test)] + pub fn get_wf_jobs(&self) -> Vec { + if let Some(task::Variant::Workflow(a)) = &self.variant { + a.jobs.clone() + } else { + vec![] + } + } + } + + impl From for WfActivationJob { + fn from(a: Attributes) -> Self { + WfActivationJob { + attributes: Some(a), + } + } } impl From> for WfActivationSuccess { @@ -50,6 +77,7 @@ pub mod coresdk { // No need to lint these #[allow(clippy::all)] +#[allow(missing_docs)] // This is disgusting, but unclear to me how to avoid it. TODO: Discuss w/ prost maintainer pub mod temporal { pub mod api { diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs index 23524fb55..4ef36537b 100644 --- a/src/protosext/history_info.rs +++ b/src/protosext/history_info.rs @@ -1,9 +1,11 @@ -use crate::machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::temporal::api::enums::v1::EventType; -use crate::protos::temporal::api::history::v1::{History, HistoryEvent}; +use crate::{ + machines::{WFMachinesError, WorkflowMachines}, + protos::temporal::api::enums::v1::EventType, + protos::temporal::api::history::v1::{History, HistoryEvent}, +}; #[derive(Clone, Debug, derive_more::Constructor, PartialEq)] -pub struct HistoryInfo { +pub(crate) struct HistoryInfo { pub previous_started_event_id: i64, pub workflow_task_started_event_id: i64, pub events: Vec, @@ -25,6 +27,7 @@ pub enum HistoryInfoError { #[error("Underlying error in workflow machine")] UnderlyingMachineError(#[from] WFMachinesError), } + impl HistoryInfo { /// Constructs a new instance, retaining only enough events to reach the provided workflow /// task number. If not provided, all events are retained. diff --git a/src/protosext/mod.rs b/src/protosext/mod.rs index f55576e31..91d15b0d8 100644 --- a/src/protosext/mod.rs +++ b/src/protosext/mod.rs @@ -1,3 +1,3 @@ mod history_info; -pub use history_info::HistoryInfo; +pub(crate) use history_info::HistoryInfo; pub(crate) use history_info::HistoryInfoError; diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs new file mode 100644 index 000000000..bb7f957b0 --- /dev/null +++ b/tests/integ_tests/poller_test.rs @@ -0,0 +1,15 @@ +use std::convert::TryFrom; +use temporal_sdk_core::{Core, CoreInitOptions, Url}; + +#[test] +fn empty_poll() { + let core = temporal_sdk_core::init(CoreInitOptions { + target_url: Url::try_from("http://localhost:7233").unwrap(), + namespace: "default".to_string(), + identity: "none".to_string(), + binary_checksum: "".to_string(), + }) + .unwrap(); + + dbg!(core.poll_task("test-tq").unwrap()); +} diff --git a/tests/main.rs b/tests/main.rs new file mode 100644 index 000000000..976cf28e4 --- /dev/null +++ b/tests/main.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod integ_tests { + mod poller_test; +} From 7cef27d514c9517fd315bf9963e43fd559b30033 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 16:12:00 -0800 Subject: [PATCH 07/26] Fix build errors and add comments --- src/lib.rs | 33 ++++++++++++++++++-------------- tests/integ_tests/poller_test.rs | 2 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d297cc518..0209c886b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,19 +3,12 @@ //! This crate provides a basis for creating new Temporal SDKs without completely starting from //! scratch -#[macro_use] -extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +#[macro_use] +extern crate tracing; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - -pub use protosext::HistoryInfo; pub use url::Url; use crate::{ @@ -41,14 +34,20 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; -use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, + time::Duration, }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; +pub mod protos; + +mod machines; +mod pollers; +mod protosext; + /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -108,8 +107,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +/// Type of task queue to poll. pub enum TaskQueue { + /// Workflow task Workflow(String), + /// Activity task _Activity(String), } @@ -335,13 +337,16 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), - #[error("Invalid URI: {0:?}")] + /// Invalid URI: {0:?} InvalidUri(#[from] InvalidUri), } #[cfg(test)] mod test { - use super::*; + use std::collections::VecDeque; + + use tracing::Level; + use crate::{ machines::test_help::TestHistoryBuilder, protos::{ @@ -355,8 +360,8 @@ mod test { }, }, }; - use std::collections::VecDeque; - use tracing::Level; + + use super::*; #[test] fn workflow_bridge() { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index bb7f957b0..fd8c37356 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -7,7 +7,7 @@ fn empty_poll() { target_url: Url::try_from("http://localhost:7233").unwrap(), namespace: "default".to_string(), identity: "none".to_string(), - binary_checksum: "".to_string(), + worker_binary_id: "".to_string(), }) .unwrap(); From 47aba709238ee93a90fa33e00e5dcb819cc9466f Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 17:10:00 -0800 Subject: [PATCH 08/26] Create workflow in the integ test --- Cargo.toml | 2 +- src/lib.rs | 21 +++++++----- src/pollers/mod.rs | 10 +++--- tests/integ_tests/poller_test.rs | 56 +++++++++++++++++++++++++++++--- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e52162b97..13f9c1b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" -tower = "0.4.4" +rand = "0.8.3" [dependencies.tonic] version = "0.4" diff --git a/src/lib.rs b/src/lib.rs index 0209c886b..ab96c6686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,18 @@ extern crate assert_matches; #[macro_use] extern crate tracing; +mod machines; +mod pollers; +pub mod protos; +mod protosext; + +pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, }, - pollers::ServerGatewayOptions, protos::{ coresdk::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, @@ -42,12 +47,6 @@ use std::{ use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -83,13 +82,19 @@ pub struct CoreInitOptions { /// A string that should be unique to the exact worker code/binary being executed pub worker_binary_id: String, + + /// Optional tokio runtime + pub runtime: Option, } /// Initializes an instance of the core sdk and establishes a connection to the temporal server. /// /// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { - let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; + let runtime = opts + .runtime + .map(Ok) + .unwrap_or_else(|| Runtime::new().map_err(CoreError::TokioInitError))?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 26a1c3c5f..badf0cfe2 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -13,7 +13,7 @@ use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] -pub(crate) struct ServerGatewayOptions { +pub struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub worker_binary_id: String, @@ -21,7 +21,7 @@ pub(crate) struct ServerGatewayOptions { } impl ServerGatewayOptions { - pub(crate) async fn connect(&self, target_url: Url) -> Result { + pub async fn connect(&self, target_url: Url) -> Result { let channel = Channel::from_shared(target_url.to_string())? .connect() .await?; @@ -46,9 +46,9 @@ fn intercept(mut req: Request<()>) -> Result, Status> { } /// Provides -pub(crate) struct ServerGateway { - service: WorkflowServiceClient, - opts: ServerGatewayOptions, +pub struct ServerGateway { + pub service: WorkflowServiceClient, + pub opts: ServerGatewayOptions, } impl ServerGateway { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index fd8c37356..7a6b4507a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,15 +1,63 @@ +use rand; +use rand::Rng; use std::convert::TryFrom; -use temporal_sdk_core::{Core, CoreInitOptions, Url}; +use std::time::Duration; +use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; +use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; +use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; + +const TASK_QUEUE: &str = "test-tq"; +const NAMESPACE: &str = "default"; + +const TARGET_URI: &'static str = "http://localhost:7233"; + +// TODO try to consolidate this into the SDK code so we don't need to create another runtime. +#[tokio::main] +async fn create_workflow() { + let mut rng = rand::thread_rng(); + let workflow_id: u8 = rng.gen(); + let request_id: u8 = rng.gen(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + }; + let mut gateway = gateway_opts + .connect(Url::try_from(TARGET_URI).unwrap()) + .await + .unwrap(); + gateway + .service + .start_workflow_execution(StartWorkflowExecutionRequest { + namespace: NAMESPACE.to_string(), + workflow_id: workflow_id.to_string(), + workflow_type: Some(WorkflowType { + name: "test-workflow".to_string(), + }), + task_queue: Some(TaskQueue { + name: TASK_QUEUE.to_string(), + kind: 0, + }), + request_id: request_id.to_string(), + ..Default::default() + }) + .await + .unwrap(); +} #[test] fn empty_poll() { + create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { - target_url: Url::try_from("http://localhost:7233").unwrap(), - namespace: "default".to_string(), + target_url: Url::try_from(TARGET_URI).unwrap(), + namespace: NAMESPACE.to_string(), identity: "none".to_string(), worker_binary_id: "".to_string(), + runtime: None, }) .unwrap(); - dbg!(core.poll_task("test-tq").unwrap()); + dbg!(core.poll_task(TASK_QUEUE).unwrap()); } From 0a92a79016f000e1924f14199e38c1e68de142f8 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 20:19:52 -0800 Subject: [PATCH 09/26] Add end to end integ test for timer workflow --- src/lib.rs | 48 ++++++++++++++++++++++--------- src/pollers/mod.rs | 36 ++++++++++++++++++----- tests/integ_tests/poller_test.rs | 49 ++++++++++++++++++++++++++------ 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ab96c6686..31279670d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ #[cfg(test)] #[macro_use] -extern crate assert_matches; +pub extern crate assert_matches; #[macro_use] extern crate tracing; @@ -17,6 +17,10 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -106,7 +110,7 @@ pub fn init(opts: CoreInitOptions) -> Result { Ok(CoreSDK { runtime, - work_provider, + server_gateway: work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), }) @@ -123,7 +127,7 @@ pub enum TaskQueue { struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs - work_provider: WP, + server_gateway: WP, /// Key is run id workflow_machines: DashMap>)>, /// Maps task tokens to workflow run ids @@ -132,14 +136,14 @@ struct CoreSDK { impl Core for CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi, { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work(task_queue))?; + .block_on(self.server_gateway.poll(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -185,13 +189,19 @@ where status: Some(wfstatus), })), } => { - let wf_run_id = self + let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or(CoreError::NothingFoundForTaskToken(task_token))?; + .ok_or(CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?, + Status::Successful(success) => { + self.push_lang_commands(&run_id, success)?; + if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { + let commands = machines.0.get_commands(); + self.server_gateway.complete(task_token, commands); + } + } Status::Failed(_) => {} } Ok(()) @@ -204,13 +214,12 @@ where } _ => Err(CoreError::MalformedCompletion(req)), } - // TODO: Get fsm commands and send them to server (get_commands) } } impl CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -256,9 +265,22 @@ where /// implementor. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait WorkflowTaskProvider { +pub trait PollWorkflowTaskQueueApi { /// Fetch new work. Should block indefinitely if there is no work. - async fn get_work(&self, task_queue: &str) -> Result; + async fn poll(&self, task_queue: &str) -> Result; +} + +/// Implementors can provide new work to the SDK. The connection to the server is the real +/// implementor. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait RespondWorkflowTaskCompletedApi { + /// Fetch new work. Should block indefinitely if there is no work. + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -432,7 +454,7 @@ mod test { let runtime = Runtime::new().unwrap(); let core = CoreSDK { runtime, - work_provider: mock_provider, + server_gateway: mock_provider, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), }; diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index badf0cfe2..f8ecb808b 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,5 +1,9 @@ use std::time::Duration; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ protos::temporal::api::enums::v1::TaskQueueKind, protos::temporal::api::taskqueue::v1::TaskQueue, @@ -7,7 +11,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, }, - Result, WorkflowTaskProvider, + PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, Result, }; use tonic::{transport::Channel, Request, Status}; use url::Url; @@ -51,9 +55,10 @@ pub struct ServerGateway { pub opts: ServerGatewayOptions, } -impl ServerGateway { +#[async_trait::async_trait] +impl PollWorkflowTaskQueueApi for ServerGateway { async fn poll(&self, task_queue: &str) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.to_string(), task_queue: Some(TaskQueue { name: task_queue.to_string(), @@ -61,7 +66,7 @@ impl ServerGateway { }), identity: self.opts.identity.to_string(), binary_checksum: self.opts.worker_binary_id.to_string(), - }); + }; Ok(self .service @@ -73,8 +78,25 @@ impl ServerGateway { } #[async_trait::async_trait] -impl WorkflowTaskProvider for ServerGateway { - async fn get_work(&self, task_queue: &str) -> Result { - self.poll(task_queue).await +impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result { + let request = RespondWorkflowTaskCompletedRequest { + task_token, + commands, + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), + namespace: self.opts.namespace.to_string(), + ..Default::default() + }; + Ok(self + .service + .clone() + .respond_workflow_task_completed(request) + .await? + .into_inner()) } } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 7a6b4507a..961639b4a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -2,7 +2,15 @@ use rand; use rand::Rng; use std::convert::TryFrom; use std::time::Duration; +use temporal_sdk_core::protos::coresdk::CompleteTaskReq; +use temporal_sdk_core::protos::temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, +}; use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::enums::v1::EventType; +use temporal_sdk_core::protos::temporal::api::history::v1::{ + history_event, TimerFiredEventAttributes, +}; use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; @@ -14,10 +22,10 @@ const TARGET_URI: &'static str = "http://localhost:7233"; // TODO try to consolidate this into the SDK code so we don't need to create another runtime. #[tokio::main] -async fn create_workflow() { +async fn create_workflow() -> (String, String) { let mut rng = rand::thread_rng(); - let workflow_id: u8 = rng.gen(); - let request_id: u8 = rng.gen(); + let workflow_id: u32 = rng.gen(); + let request_id: u32 = rng.gen(); let gateway_opts = ServerGatewayOptions { namespace: NAMESPACE.to_string(), identity: "none".to_string(), @@ -28,7 +36,7 @@ async fn create_workflow() { .connect(Url::try_from(TARGET_URI).unwrap()) .await .unwrap(); - gateway + let response = gateway .service .start_workflow_execution(StartWorkflowExecutionRequest { namespace: NAMESPACE.to_string(), @@ -45,11 +53,12 @@ async fn create_workflow() { }) .await .unwrap(); + (workflow_id.to_string(), response.into_inner().run_id) } #[test] -fn empty_poll() { - create_workflow(); +fn timer_workflow() { + let (workflow_id, run_id) = create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { target_url: Url::try_from(TARGET_URI).unwrap(), namespace: NAMESPACE.to_string(), @@ -58,6 +67,30 @@ fn empty_poll() { runtime: None, }) .unwrap(); - - dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let mut rng = rand::thread_rng(); + let timer_id: String = rng.gen::().to_string(); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + task.task_token, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task.task_token, + )) + .unwrap(); + dbg!( + "sent workflow done, completed workflow", + workflow_id, + run_id + ); } From c6e73888fe6e7fe67146fd9dd0f9c93bdd2b6005 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 21:11:58 -0800 Subject: [PATCH 10/26] fix mocks in tests --- src/lib.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 31279670d..857cf2540 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[macro_use] +extern crate mockall; mod machines; mod pollers; @@ -388,8 +390,19 @@ mod test { }, }; - use super::*; + mock! { + ServerGateway {} + #[async_trait::async_trait] + impl PollWorkflowTaskQueueApi for ServerGateway { + async fn poll(&self, task_queue: &str) -> Result; + } + #[async_trait::async_trait] + impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete(&self, task_token: Vec, commands: Vec) -> Result; + } + } + use super::*; #[test] fn workflow_bridge() { let s = span!(Level::DEBUG, "Test start"); @@ -398,6 +411,7 @@ mod test { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_id = "fake_timer"; + let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -446,9 +460,9 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkflowTaskProvider::new(); + let mut mock_provider = MockServerGateway::new(); mock_provider - .expect_get_work() + .expect_poll() .returning(move |_| Ok(tasks.pop_front().unwrap())); let runtime = Runtime::new().unwrap(); @@ -459,7 +473,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -481,7 +495,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), From 74261ee9441a16a09b5e705ae5d0b5066b6e9e0c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Tue, 2 Feb 2021 21:45:12 -0800 Subject: [PATCH 11/26] Adding integration test for the poller and made task queue configurable for the poll task --- .buildkite/pipeline.yml | 10 +++++++++ Cargo.toml | 17 +++++++++++++-- README.md | 4 +++- src/lib.rs | 25 +++++++++++++++------ src/pollers/mod.rs | 37 +++++++++++++++++++++++++------- tests/integ_tests/poller_test.rs | 15 +++++++++++++ tests/main.rs | 4 ++++ 7 files changed, 95 insertions(+), 17 deletions(-) create mode 100644 tests/integ_tests/poller_test.rs create mode 100644 tests/main.rs diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 415db733f..1aaecbc72 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -31,4 +31,14 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + - label: "integ-test" + agents: + queue: "default" + docker: "*" + command: "cargo test --test integ_tests" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml - wait diff --git a/Cargo.toml b/Cargo.toml index 6eb064d9c..e52162b97 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,11 +21,17 @@ prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } -tonic = "0.4" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" +tower = "0.4.4" + +[dependencies.tonic] +version = "0.4" +#path = "../tonic/tonic" +# Using our fork for now which fixes grpc-timeout header getting stripped +git = "https://github.com/temporalio/tonic" [dependencies.rustfsm] path = "fsm" @@ -40,3 +46,10 @@ tonic-build = "0.4" [workspace] members = [".", "fsm"] + +[[test]] +name = "integ_tests" +path = "tests/main.rs" +# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with +# `cargo test --test integ_tests` +test = false \ No newline at end of file diff --git a/README.md b/README.md index cc352b7ba..d4f1667b4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Core SDK that can be used as a base for all other SDKs. +[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master) + +Core SDK that can be used as a base for all other Temporal SDKs. # Getting started This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a diff --git a/src/lib.rs b/src/lib.rs index 7a760a37d..d297cc518 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -15,6 +15,9 @@ mod machines; mod pollers; mod protosext; +pub use protosext::HistoryInfo; +pub use url::Url; + use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -38,15 +41,17 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; +use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; use tokio::runtime::Runtime; -use url::Url; +use tonic::codegen::http::uri::InvalidUri; /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; +const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; /// This trait is the primary way by which language specific SDKs interact with the core SDK. It is /// expected that only one instance of an implementation will exist for the lifetime of the @@ -57,7 +62,7 @@ pub trait Core { /// language SDK's responsibility to call the appropriate code with the provided inputs. /// /// TODO: Examples - fn poll_task(&self) -> Result; + fn poll_task(&self, task_queue: &str) -> Result; /// Tell the core that some work has been completed - whether as a result of running workflow /// code or executing an activity. @@ -90,6 +95,7 @@ pub fn init(opts: CoreInitOptions) -> Result { namespace: opts.namespace, identity: opts.identity, worker_binary_id: opts.worker_binary_id, + long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT), }; // Initialize server client let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; @@ -102,6 +108,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +pub enum TaskQueue { + Workflow(String), + _Activity(String), +} + struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs @@ -117,11 +128,11 @@ where WP: WorkflowTaskProvider, { #[instrument(skip(self))] - fn poll_task(&self) -> Result { + fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work("TODO: Real task queue"))?; + .block_on(self.work_provider.get_work(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -324,6 +335,8 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), + #[error("Invalid URI: {0:?}")] + InvalidUri(#[from] InvalidUri), } #[cfg(test)] @@ -414,7 +427,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -436,7 +449,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task("test-task-queue").unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index d26b5d9d5..26a1c3c5f 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,11 +1,15 @@ -use crate::protos::temporal::api::enums::v1::TaskQueueKind; -use crate::protos::temporal::api::taskqueue::v1::TaskQueue; -use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; -use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +use std::time::Duration; + +use crate::{ + protos::temporal::api::enums::v1::TaskQueueKind, + protos::temporal::api::taskqueue::v1::TaskQueue, + protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + }, + Result, WorkflowTaskProvider, }; -use crate::Result; -use crate::WorkflowTaskProvider; +use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] @@ -13,17 +17,34 @@ pub(crate) struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub worker_binary_id: String, + pub long_poll_timeout: Duration, } impl ServerGatewayOptions { pub(crate) async fn connect(&self, target_url: Url) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + let channel = Channel::from_shared(target_url.to_string())? + .connect() + .await?; + let service = WorkflowServiceClient::with_interceptor(channel, intercept); Ok(ServerGateway { service, opts: self.clone(), }) } } + +/// This function will get called on each outbound request. Returning a +/// `Status` here will cancel the request and have that status returned to +/// the caller. +fn intercept(mut req: Request<()>) -> Result, Status> { + // TODO convert error + let metadata = req.metadata_mut(); + metadata.insert("grpc-timeout", "50000m".parse().unwrap()); + metadata.insert("client-name", "core-sdk".parse().unwrap()); + println!("Intercepting request: {:?}", req); + Ok(req) +} + /// Provides pub(crate) struct ServerGateway { service: WorkflowServiceClient, diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs new file mode 100644 index 000000000..bb7f957b0 --- /dev/null +++ b/tests/integ_tests/poller_test.rs @@ -0,0 +1,15 @@ +use std::convert::TryFrom; +use temporal_sdk_core::{Core, CoreInitOptions, Url}; + +#[test] +fn empty_poll() { + let core = temporal_sdk_core::init(CoreInitOptions { + target_url: Url::try_from("http://localhost:7233").unwrap(), + namespace: "default".to_string(), + identity: "none".to_string(), + binary_checksum: "".to_string(), + }) + .unwrap(); + + dbg!(core.poll_task("test-tq").unwrap()); +} diff --git a/tests/main.rs b/tests/main.rs new file mode 100644 index 000000000..976cf28e4 --- /dev/null +++ b/tests/main.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod integ_tests { + mod poller_test; +} From 77fabbbc0fb6110944048f85d929900aa24ed908 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 16:12:00 -0800 Subject: [PATCH 12/26] Fix build errors and add comments --- src/lib.rs | 33 ++++++++++++++++++-------------- tests/integ_tests/poller_test.rs | 2 +- 2 files changed, 20 insertions(+), 15 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d297cc518..0209c886b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,19 +3,12 @@ //! This crate provides a basis for creating new Temporal SDKs without completely starting from //! scratch -#[macro_use] -extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +#[macro_use] +extern crate tracing; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - -pub use protosext::HistoryInfo; pub use url::Url; use crate::{ @@ -41,14 +34,20 @@ use crate::{ }; use anyhow::Error; use dashmap::DashMap; -use std::time::Duration; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, + time::Duration, }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; +pub mod protos; + +mod machines; +mod pollers; +mod protosext; + /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -108,8 +107,11 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } +/// Type of task queue to poll. pub enum TaskQueue { + /// Workflow task Workflow(String), + /// Activity task _Activity(String), } @@ -335,13 +337,16 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), - #[error("Invalid URI: {0:?}")] + /// Invalid URI: {0:?} InvalidUri(#[from] InvalidUri), } #[cfg(test)] mod test { - use super::*; + use std::collections::VecDeque; + + use tracing::Level; + use crate::{ machines::test_help::TestHistoryBuilder, protos::{ @@ -355,8 +360,8 @@ mod test { }, }, }; - use std::collections::VecDeque; - use tracing::Level; + + use super::*; #[test] fn workflow_bridge() { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index bb7f957b0..fd8c37356 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -7,7 +7,7 @@ fn empty_poll() { target_url: Url::try_from("http://localhost:7233").unwrap(), namespace: "default".to_string(), identity: "none".to_string(), - binary_checksum: "".to_string(), + worker_binary_id: "".to_string(), }) .unwrap(); From b260f7d05992ae99cba042960c5563698d55f2ae Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 17:10:00 -0800 Subject: [PATCH 13/26] Create workflow in the integ test --- Cargo.toml | 2 +- src/lib.rs | 21 +++++++----- src/pollers/mod.rs | 10 +++--- tests/integ_tests/poller_test.rs | 56 +++++++++++++++++++++++++++++--- 4 files changed, 71 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e52162b97..13f9c1b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ tracing = { version = "0.1", features = ["log"] } tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" -tower = "0.4.4" +rand = "0.8.3" [dependencies.tonic] version = "0.4" diff --git a/src/lib.rs b/src/lib.rs index 0209c886b..ab96c6686 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,13 +9,18 @@ extern crate assert_matches; #[macro_use] extern crate tracing; +mod machines; +mod pollers; +pub mod protos; +mod protosext; + +pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, }, - pollers::ServerGatewayOptions, protos::{ coresdk::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, @@ -42,12 +47,6 @@ use std::{ use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; -pub mod protos; - -mod machines; -mod pollers; -mod protosext; - /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; @@ -83,13 +82,19 @@ pub struct CoreInitOptions { /// A string that should be unique to the exact worker code/binary being executed pub worker_binary_id: String, + + /// Optional tokio runtime + pub runtime: Option, } /// Initializes an instance of the core sdk and establishes a connection to the temporal server. /// /// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { - let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; + let runtime = opts + .runtime + .map(Ok) + .unwrap_or_else(|| Runtime::new().map_err(CoreError::TokioInitError))?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 26a1c3c5f..badf0cfe2 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -13,7 +13,7 @@ use tonic::{transport::Channel, Request, Status}; use url::Url; #[derive(Clone)] -pub(crate) struct ServerGatewayOptions { +pub struct ServerGatewayOptions { pub namespace: String, pub identity: String, pub worker_binary_id: String, @@ -21,7 +21,7 @@ pub(crate) struct ServerGatewayOptions { } impl ServerGatewayOptions { - pub(crate) async fn connect(&self, target_url: Url) -> Result { + pub async fn connect(&self, target_url: Url) -> Result { let channel = Channel::from_shared(target_url.to_string())? .connect() .await?; @@ -46,9 +46,9 @@ fn intercept(mut req: Request<()>) -> Result, Status> { } /// Provides -pub(crate) struct ServerGateway { - service: WorkflowServiceClient, - opts: ServerGatewayOptions, +pub struct ServerGateway { + pub service: WorkflowServiceClient, + pub opts: ServerGatewayOptions, } impl ServerGateway { diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index fd8c37356..7a6b4507a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,15 +1,63 @@ +use rand; +use rand::Rng; use std::convert::TryFrom; -use temporal_sdk_core::{Core, CoreInitOptions, Url}; +use std::time::Duration; +use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; +use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; +use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; + +const TASK_QUEUE: &str = "test-tq"; +const NAMESPACE: &str = "default"; + +const TARGET_URI: &'static str = "http://localhost:7233"; + +// TODO try to consolidate this into the SDK code so we don't need to create another runtime. +#[tokio::main] +async fn create_workflow() { + let mut rng = rand::thread_rng(); + let workflow_id: u8 = rng.gen(); + let request_id: u8 = rng.gen(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + }; + let mut gateway = gateway_opts + .connect(Url::try_from(TARGET_URI).unwrap()) + .await + .unwrap(); + gateway + .service + .start_workflow_execution(StartWorkflowExecutionRequest { + namespace: NAMESPACE.to_string(), + workflow_id: workflow_id.to_string(), + workflow_type: Some(WorkflowType { + name: "test-workflow".to_string(), + }), + task_queue: Some(TaskQueue { + name: TASK_QUEUE.to_string(), + kind: 0, + }), + request_id: request_id.to_string(), + ..Default::default() + }) + .await + .unwrap(); +} #[test] fn empty_poll() { + create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { - target_url: Url::try_from("http://localhost:7233").unwrap(), - namespace: "default".to_string(), + target_url: Url::try_from(TARGET_URI).unwrap(), + namespace: NAMESPACE.to_string(), identity: "none".to_string(), worker_binary_id: "".to_string(), + runtime: None, }) .unwrap(); - dbg!(core.poll_task("test-tq").unwrap()); + dbg!(core.poll_task(TASK_QUEUE).unwrap()); } From 8ef4abdc6d19ea0da760fe4a0c87193f49959764 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 20:19:52 -0800 Subject: [PATCH 14/26] Add end to end integ test for timer workflow --- src/lib.rs | 48 ++++++++++++++++++++++--------- src/pollers/mod.rs | 36 ++++++++++++++++++----- tests/integ_tests/poller_test.rs | 49 ++++++++++++++++++++++++++------ 3 files changed, 105 insertions(+), 28 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index ab96c6686..31279670d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,7 +5,7 @@ #[cfg(test)] #[macro_use] -extern crate assert_matches; +pub extern crate assert_matches; #[macro_use] extern crate tracing; @@ -17,6 +17,10 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -106,7 +110,7 @@ pub fn init(opts: CoreInitOptions) -> Result { Ok(CoreSDK { runtime, - work_provider, + server_gateway: work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), }) @@ -123,7 +127,7 @@ pub enum TaskQueue { struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs - work_provider: WP, + server_gateway: WP, /// Key is run id workflow_machines: DashMap>)>, /// Maps task tokens to workflow run ids @@ -132,14 +136,14 @@ struct CoreSDK { impl Core for CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi, { #[instrument(skip(self))] fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work(task_queue))?; + .block_on(self.server_gateway.poll(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -185,13 +189,19 @@ where status: Some(wfstatus), })), } => { - let wf_run_id = self + let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or(CoreError::NothingFoundForTaskToken(task_token))?; + .ok_or(CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?, + Status::Successful(success) => { + self.push_lang_commands(&run_id, success)?; + if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { + let commands = machines.0.get_commands(); + self.server_gateway.complete(task_token, commands); + } + } Status::Failed(_) => {} } Ok(()) @@ -204,13 +214,12 @@ where } _ => Err(CoreError::MalformedCompletion(req)), } - // TODO: Get fsm commands and send them to server (get_commands) } } impl CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -256,9 +265,22 @@ where /// implementor. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait WorkflowTaskProvider { +pub trait PollWorkflowTaskQueueApi { /// Fetch new work. Should block indefinitely if there is no work. - async fn get_work(&self, task_queue: &str) -> Result; + async fn poll(&self, task_queue: &str) -> Result; +} + +/// Implementors can provide new work to the SDK. The connection to the server is the real +/// implementor. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub trait RespondWorkflowTaskCompletedApi { + /// Fetch new work. Should block indefinitely if there is no work. + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -432,7 +454,7 @@ mod test { let runtime = Runtime::new().unwrap(); let core = CoreSDK { runtime, - work_provider: mock_provider, + server_gateway: mock_provider, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), }; diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index badf0cfe2..f8ecb808b 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,5 +1,9 @@ use std::time::Duration; +use crate::machines::ProtoCommand; +use crate::protos::temporal::api::workflowservice::v1::{ + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; use crate::{ protos::temporal::api::enums::v1::TaskQueueKind, protos::temporal::api::taskqueue::v1::TaskQueue, @@ -7,7 +11,7 @@ use crate::{ protos::temporal::api::workflowservice::v1::{ PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, }, - Result, WorkflowTaskProvider, + PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, Result, }; use tonic::{transport::Channel, Request, Status}; use url::Url; @@ -51,9 +55,10 @@ pub struct ServerGateway { pub opts: ServerGatewayOptions, } -impl ServerGateway { +#[async_trait::async_trait] +impl PollWorkflowTaskQueueApi for ServerGateway { async fn poll(&self, task_queue: &str) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.to_string(), task_queue: Some(TaskQueue { name: task_queue.to_string(), @@ -61,7 +66,7 @@ impl ServerGateway { }), identity: self.opts.identity.to_string(), binary_checksum: self.opts.worker_binary_id.to_string(), - }); + }; Ok(self .service @@ -73,8 +78,25 @@ impl ServerGateway { } #[async_trait::async_trait] -impl WorkflowTaskProvider for ServerGateway { - async fn get_work(&self, task_queue: &str) -> Result { - self.poll(task_queue).await +impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result { + let request = RespondWorkflowTaskCompletedRequest { + task_token, + commands, + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), + namespace: self.opts.namespace.to_string(), + ..Default::default() + }; + Ok(self + .service + .clone() + .respond_workflow_task_completed(request) + .await? + .into_inner()) } } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 7a6b4507a..961639b4a 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -2,7 +2,15 @@ use rand; use rand::Rng; use std::convert::TryFrom; use std::time::Duration; +use temporal_sdk_core::protos::coresdk::CompleteTaskReq; +use temporal_sdk_core::protos::temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, +}; use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; +use temporal_sdk_core::protos::temporal::api::enums::v1::EventType; +use temporal_sdk_core::protos::temporal::api::history::v1::{ + history_event, TimerFiredEventAttributes, +}; use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; @@ -14,10 +22,10 @@ const TARGET_URI: &'static str = "http://localhost:7233"; // TODO try to consolidate this into the SDK code so we don't need to create another runtime. #[tokio::main] -async fn create_workflow() { +async fn create_workflow() -> (String, String) { let mut rng = rand::thread_rng(); - let workflow_id: u8 = rng.gen(); - let request_id: u8 = rng.gen(); + let workflow_id: u32 = rng.gen(); + let request_id: u32 = rng.gen(); let gateway_opts = ServerGatewayOptions { namespace: NAMESPACE.to_string(), identity: "none".to_string(), @@ -28,7 +36,7 @@ async fn create_workflow() { .connect(Url::try_from(TARGET_URI).unwrap()) .await .unwrap(); - gateway + let response = gateway .service .start_workflow_execution(StartWorkflowExecutionRequest { namespace: NAMESPACE.to_string(), @@ -45,11 +53,12 @@ async fn create_workflow() { }) .await .unwrap(); + (workflow_id.to_string(), response.into_inner().run_id) } #[test] -fn empty_poll() { - create_workflow(); +fn timer_workflow() { + let (workflow_id, run_id) = create_workflow(); let core = temporal_sdk_core::init(CoreInitOptions { target_url: Url::try_from(TARGET_URI).unwrap(), namespace: NAMESPACE.to_string(), @@ -58,6 +67,30 @@ fn empty_poll() { runtime: None, }) .unwrap(); - - dbg!(core.poll_task(TASK_QUEUE).unwrap()); + let mut rng = rand::thread_rng(); + let timer_id: String = rng.gen::().to_string(); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + ..Default::default() + } + .into(), + task.task_token, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + // TODO verify + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task.task_token, + )) + .unwrap(); + dbg!( + "sent workflow done, completed workflow", + workflow_id, + run_id + ); } From 119410d79c0fca8012c3ec90532164eb4ad18d42 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Wed, 3 Feb 2021 21:11:58 -0800 Subject: [PATCH 15/26] fix mocks in tests --- src/lib.rs | 24 +++++++++++++++++++----- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 31279670d..857cf2540 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,8 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[macro_use] +extern crate mockall; mod machines; mod pollers; @@ -388,8 +390,19 @@ mod test { }, }; - use super::*; + mock! { + ServerGateway {} + #[async_trait::async_trait] + impl PollWorkflowTaskQueueApi for ServerGateway { + async fn poll(&self, task_queue: &str) -> Result; + } + #[async_trait::async_trait] + impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete(&self, task_token: Vec, commands: Vec) -> Result; + } + } + use super::*; #[test] fn workflow_bridge() { let s = span!(Level::DEBUG, "Test start"); @@ -398,6 +411,7 @@ mod test { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_id = "fake_timer"; + let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -446,9 +460,9 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkflowTaskProvider::new(); + let mut mock_provider = MockServerGateway::new(); mock_provider - .expect_get_work() + .expect_poll() .returning(move |_| Ok(tasks.pop_front().unwrap())); let runtime = Runtime::new().unwrap(); @@ -459,7 +473,7 @@ mod test { workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -481,7 +495,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task("test-task-queue").unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), From fa10dae26b76e98df8b626d038bb835b7fbfba1d Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 22:15:21 -0800 Subject: [PATCH 16/26] Fix mockall compile problem --- src/lib.rs | 12 ++++++------ tests/integ_tests/poller_test.rs | 30 +++++++++++++++--------------- 2 files changed, 21 insertions(+), 21 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 857cf2540..fec0d5a8c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[cfg(test)] #[macro_use] extern crate mockall; @@ -19,13 +20,10 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; -use crate::machines::ProtoCommand; -use crate::protos::temporal::api::workflowservice::v1::{ - RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, -}; use crate::{ machines::{ - ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, + ActivationListener, DrivenWorkflow, InconvertibleCommandError, ProtoCommand, WFCommand, + WorkflowMachines, }, protos::{ coresdk::{ @@ -38,7 +36,9 @@ use crate::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, }, - workflowservice::v1::PollWorkflowTaskQueueResponse, + workflowservice::{ + v1::PollWorkflowTaskQueueResponse, v1::RespondWorkflowTaskCompletedResponse, + }, }, }, protosext::{HistoryInfo, HistoryInfoError}, diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 961639b4a..1a870e552 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,19 +1,19 @@ -use rand; -use rand::Rng; -use std::convert::TryFrom; -use std::time::Duration; -use temporal_sdk_core::protos::coresdk::CompleteTaskReq; -use temporal_sdk_core::protos::temporal::api::command::v1::{ - CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, +use rand::{self, Rng}; +use std::{convert::TryFrom, time::Duration}; +use temporal_sdk_core::{ + protos::{ + coresdk::CompleteTaskReq, + temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + temporal::api::common::v1::WorkflowType, + temporal::api::enums::v1::EventType, + temporal::api::history::v1::{history_event, TimerFiredEventAttributes}, + temporal::api::taskqueue::v1::TaskQueue, + temporal::api::workflowservice::v1::StartWorkflowExecutionRequest, + }, + Core, CoreInitOptions, ServerGatewayOptions, Url, }; -use temporal_sdk_core::protos::temporal::api::common::v1::WorkflowType; -use temporal_sdk_core::protos::temporal::api::enums::v1::EventType; -use temporal_sdk_core::protos::temporal::api::history::v1::{ - history_event, TimerFiredEventAttributes, -}; -use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; -use temporal_sdk_core::protos::temporal::api::workflowservice::v1::StartWorkflowExecutionRequest; -use temporal_sdk_core::{Core, CoreInitOptions, ServerGatewayOptions, Url}; const TASK_QUEUE: &str = "test-tq"; const NAMESPACE: &str = "default"; From c0da81319f764270e247935719982cca6565c5a7 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 3 Feb 2021 22:20:35 -0800 Subject: [PATCH 17/26] Fix mockall compile problem, actually send completions using block_on --- src/lib.rs | 12 +++++------- tests/integ_tests/poller_test.rs | 2 -- 2 files changed, 5 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 38aaf8eda..3822b26e8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; +#[cfg(test)] #[macro_use] extern crate mockall; @@ -19,10 +20,6 @@ mod protosext; pub use pollers::{ServerGateway, ServerGatewayOptions}; pub use url::Url; -use crate::machines::ProtoCommand; -use crate::protos::temporal::api::workflowservice::v1::{ - RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, -}; use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, ProtoCommand, WFCommand, @@ -39,8 +36,8 @@ use crate::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, }, - workflowservice::{ - v1::PollWorkflowTaskQueueResponse, v1::RespondWorkflowTaskCompletedResponse, + workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, }, }, }, @@ -204,7 +201,8 @@ where self.push_lang_commands(&run_id, success)?; if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { let commands = machines.0.get_commands(); - self.server_gateway.complete(task_token, commands); + self.runtime + .block_on(self.server_gateway.complete(task_token, commands))?; } } Status::Failed(_) => {} diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 1a870e552..903f2315f 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -7,8 +7,6 @@ use temporal_sdk_core::{ CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, }, temporal::api::common::v1::WorkflowType, - temporal::api::enums::v1::EventType, - temporal::api::history::v1::{history_event, TimerFiredEventAttributes}, temporal::api::taskqueue::v1::TaskQueue, temporal::api::workflowservice::v1::StartWorkflowExecutionRequest, }, From 9d6a4c639b5ba9e0727c1de755c5edd89a7fa60f Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 4 Feb 2021 11:28:11 -0800 Subject: [PATCH 18/26] add event loop invocation into push_lang_commands --- src/lib.rs | 5 +++++ src/machines/workflow_machines.rs | 2 +- tests/integ_tests/poller_test.rs | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 3822b26e8..385e0c067 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -260,6 +260,11 @@ where .unwrap() .1 .send(cmds)?; + self.workflow_machines + .get_mut(run_id) + .unwrap() + .0 + .event_loop(); Ok(()) } } diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1315e0df1..5b98767c9 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -381,7 +381,7 @@ impl WorkflowMachines { .expect("We have just ensured this is populated") } - fn event_loop(&mut self) -> Result<()> { + pub fn event_loop(&mut self) -> Result<()> { let results = self.drive_me.iterate_wf()?; self.handle_driven_results(results); diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 903f2315f..b4a35153b 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -56,7 +56,7 @@ async fn create_workflow() -> (String, String) { #[test] fn timer_workflow() { - let (workflow_id, run_id) = create_workflow(); + let (workflow_id, run_id) = dbg!(create_workflow()); let core = temporal_sdk_core::init(CoreInitOptions { target_url: Url::try_from(TARGET_URI).unwrap(), namespace: NAMESPACE.to_string(), @@ -72,6 +72,7 @@ fn timer_workflow() { core.complete_task(CompleteTaskReq::ok_from_api_attrs( StartTimerCommandAttributes { timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), ..Default::default() } .into(), From adbc67c9815f6c4d9b125006e7da294a3216489a Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 11:57:12 -0800 Subject: [PATCH 19/26] Cleanup linting errors and a handful of other issues --- src/lib.rs | 55 +++++++++++-------------------- src/machines/workflow_machines.rs | 1 + src/pollers/mod.rs | 24 +++++++++++--- tests/integ_tests/poller_test.rs | 26 +++++++-------- 4 files changed, 52 insertions(+), 54 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 385e0c067..753c158aa 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,14 +48,12 @@ use dashmap::DashMap; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, - time::Duration, }; use tokio::runtime::Runtime; use tonic::codegen::http::uri::InvalidUri; /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; -const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60; /// This trait is the primary way by which language specific SDKs interact with the core SDK. It is /// expected that only one instance of an implementation will exist for the lifetime of the @@ -75,40 +73,21 @@ pub trait Core { /// Holds various configuration information required to call [init] pub struct CoreInitOptions { - /// The URL of the Temporal server to connect to - pub target_url: Url, - - /// The namespace on the server your worker will be using - pub namespace: String, - - /// A human-readable string that can identify your worker - /// - /// TODO: Probably belongs in future worker abstraction - pub identity: String, - - /// A string that should be unique to the exact worker code/binary being executed - pub worker_binary_id: String, - - /// Optional tokio runtime - pub runtime: Option, + /// Options for the connection to the temporal server + pub gateway_opts: ServerGatewayOptions, } /// Initializes an instance of the core sdk and establishes a connection to the temporal server. /// -/// Note: Also creates tokio runtime that will be used for all client-server interactions. +/// Note: Also creates a tokio runtime that will be used for all client-server interactions. +/// +/// # Panics +/// * Will panic if called from within an async context, as it will construct a runtime and you +/// cannot construct a runtime from within a runtime. pub fn init(opts: CoreInitOptions) -> Result { - let runtime = opts - .runtime - .map(Ok) - .unwrap_or_else(|| Runtime::new().map_err(CoreError::TokioInitError))?; - let gateway_opts = ServerGatewayOptions { - namespace: opts.namespace, - identity: opts.identity, - worker_binary_id: opts.worker_binary_id, - long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT), - }; + let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; // Initialize server client - let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; + let work_provider = runtime.block_on(opts.gateway_opts.connect())?; Ok(CoreSDK { runtime, @@ -195,7 +174,7 @@ where .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or(CoreError::NothingFoundForTaskToken(task_token.clone()))?; + .ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { Status::Successful(success) => { self.push_lang_commands(&run_id, success)?; @@ -249,6 +228,7 @@ where run_id: &str, success: WfActivationSuccess, ) -> Result<(), CoreError> { + // TODO: No unwraps // Convert to wf commands let cmds = success .commands @@ -264,7 +244,8 @@ where .get_mut(run_id) .unwrap() .0 - .event_loop(); + .event_loop() + .unwrap(); Ok(()) } } @@ -466,15 +447,19 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockServerGateway::new(); - mock_provider + let mut mock_gateway = MockServerGateway::new(); + mock_gateway .expect_poll() .returning(move |_| Ok(tasks.pop_front().unwrap())); + // Response not really important here + mock_gateway + .expect_complete() + .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); let runtime = Runtime::new().unwrap(); let core = CoreSDK { runtime, - server_gateway: mock_provider, + server_gateway: mock_gateway, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), }; diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 5b98767c9..bd0b396b5 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -381,6 +381,7 @@ impl WorkflowMachines { .expect("We have just ensured this is populated") } + // TODO: Shouldn't be pub pub fn event_loop(&mut self) -> Result<()> { let results = self.drive_me.iterate_wf()?; self.handle_driven_results(results); diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index f8ecb808b..1077e641d 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -16,17 +16,31 @@ use crate::{ use tonic::{transport::Channel, Request, Status}; use url::Url; -#[derive(Clone)] +/// Options for the connection to the temporal server +#[derive(Clone, Debug)] pub struct ServerGatewayOptions { + /// The URL of the Temporal server to connect to + pub target_url: Url, + + /// What namespace will we operate under pub namespace: String, + + /// A human-readable string that can identify your worker + /// + /// TODO: Probably belongs in future worker abstraction pub identity: String, + + /// A string that should be unique to the exact worker code/binary being executed pub worker_binary_id: String, + + /// Timeout for long polls (polling of task queues) pub long_poll_timeout: Duration, } impl ServerGatewayOptions { - pub async fn connect(&self, target_url: Url) -> Result { - let channel = Channel::from_shared(target_url.to_string())? + /// Attempt to establish a connection to the Temporal server + pub async fn connect(&self) -> Result { + let channel = Channel::from_shared(self.target_url.to_string())? .connect() .await?; let service = WorkflowServiceClient::with_interceptor(channel, intercept); @@ -49,9 +63,11 @@ fn intercept(mut req: Request<()>) -> Result, Status> { Ok(req) } -/// Provides +/// Contains an instance of a client for interacting with the temporal server pub struct ServerGateway { + /// Client for interacting with workflow service pub service: WorkflowServiceClient, + /// Options gateway was initialized with pub opts: ServerGatewayOptions, } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index b4a35153b..ba9653920 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -20,8 +20,9 @@ const TARGET_URI: &'static str = "http://localhost:7233"; // TODO try to consolidate this into the SDK code so we don't need to create another runtime. #[tokio::main] -async fn create_workflow() -> (String, String) { +async fn create_workflow() -> (String, String, ServerGatewayOptions) { let mut rng = rand::thread_rng(); + let url = Url::try_from(TARGET_URI).unwrap(); let workflow_id: u32 = rng.gen(); let request_id: u32 = rng.gen(); let gateway_opts = ServerGatewayOptions { @@ -29,11 +30,9 @@ async fn create_workflow() -> (String, String) { identity: "none".to_string(), worker_binary_id: "".to_string(), long_poll_timeout: Duration::from_secs(60), + target_url: url, }; - let mut gateway = gateway_opts - .connect(Url::try_from(TARGET_URI).unwrap()) - .await - .unwrap(); + let mut gateway = gateway_opts.connect().await.unwrap(); let response = gateway .service .start_workflow_execution(StartWorkflowExecutionRequest { @@ -51,20 +50,17 @@ async fn create_workflow() -> (String, String) { }) .await .unwrap(); - (workflow_id.to_string(), response.into_inner().run_id) + ( + workflow_id.to_string(), + response.into_inner().run_id, + gateway_opts, + ) } #[test] fn timer_workflow() { - let (workflow_id, run_id) = dbg!(create_workflow()); - let core = temporal_sdk_core::init(CoreInitOptions { - target_url: Url::try_from(TARGET_URI).unwrap(), - namespace: NAMESPACE.to_string(), - identity: "none".to_string(), - worker_binary_id: "".to_string(), - runtime: None, - }) - .unwrap(); + let (workflow_id, run_id, gateway_opts) = dbg!(create_workflow()); + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let timer_id: String = rng.gen::().to_string(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); From a88b96eb74d4e8ed4826d925606cf786ae43e90c Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 12:13:03 -0800 Subject: [PATCH 20/26] More docstring / todo / visibility cleanup --- src/lib.rs | 12 +++++++----- src/pollers/mod.rs | 12 ++++++++---- 2 files changed, 15 insertions(+), 9 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 753c158aa..d42a9b9ee 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -254,17 +254,19 @@ where /// implementor. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait PollWorkflowTaskQueueApi { +pub(crate) trait PollWorkflowTaskQueueApi { /// Fetch new work. Should block indefinitely if there is no work. async fn poll(&self, task_queue: &str) -> Result; } -/// Implementors can provide new work to the SDK. The connection to the server is the real -/// implementor. +/// Implementors can complete tasks as would've been issued by [Core::poll]. The real implementor +/// sends the completed tasks to the server. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait RespondWorkflowTaskCompletedApi { - /// Fetch new work. Should block indefinitely if there is no work. +pub(crate) trait RespondWorkflowTaskCompletedApi { + /// Complete a task by sending it to the server. `task_token` is the task token that would've + /// been received from [PollWorkflowTaskQueueApi::poll]. `commands` is a list of new commands + /// to send to the server, such as starting a timer. async fn complete( &self, task_token: Vec, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 1077e641d..0c3623167 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -55,11 +55,15 @@ impl ServerGatewayOptions { /// `Status` here will cancel the request and have that status returned to /// the caller. fn intercept(mut req: Request<()>) -> Result, Status> { - // TODO convert error let metadata = req.metadata_mut(); - metadata.insert("grpc-timeout", "50000m".parse().unwrap()); - metadata.insert("client-name", "core-sdk".parse().unwrap()); - println!("Intercepting request: {:?}", req); + metadata.insert( + "grpc-timeout", + "50000m".parse().expect("Static value is parsable"), + ); + metadata.insert( + "client-name", + "core-sdk".parse().expect("Static value is parsable"), + ); Ok(req) } From cbb4ab3cd1bd8a8c7788de68df7c3d1999af14c4 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 12:19:11 -0800 Subject: [PATCH 21/26] Address some of Max's feedback --- protos/local/core_interface.proto | 16 +++++++++++----- src/pollers/mod.rs | 1 + 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 82af4ecdb..7ab9ebf03 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -18,10 +18,16 @@ import "temporal/api/command/v1/message.proto"; // A request as given to [crate::Core::poll_task] message PollTaskReq { - // If true, poll for workflow tasks - bool workflows = 1; - // If true, poll for activity tasks - bool activities = 2; + // What type of task to poll for + enum TaskType { + // Poll for workflows + WORKFLOWS = 0; + // Poll for activities + ACTIVITIES = 1; + } + + // A list of task types to poll for + repeated TaskType types = 1; } // An instruction to perform work from core->lang sdk @@ -40,7 +46,7 @@ message Task { // An instruction to the lang sdk to run some workflow code, whether for the first time or from // a cached state. message WFActivation { - // Time the activation(s) were requested + // The current time as understood by the workflow, which is set by workflow task started events google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; // The id of the currently active run of the workflow string run_id = 2; diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 0c3623167..d0839c3a9 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -56,6 +56,7 @@ impl ServerGatewayOptions { /// the caller. fn intercept(mut req: Request<()>) -> Result, Status> { let metadata = req.metadata_mut(); + // TODO: Only apply this to long poll requests metadata.insert( "grpc-timeout", "50000m".parse().expect("Static value is parsable"), From dec4a7845b88a5ea7ba0e0d52f0a60f3ce29c204 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 12:22:11 -0800 Subject: [PATCH 22/26] Bad merge meant we are running two sets of integ tests for some reason --- .buildkite/pipeline.yml | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index c3c3949d4..1aaecbc72 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -41,14 +41,4 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml - - label: "integ-test" - agents: - queue: "default" - docker: "*" - command: "cargo test --test integ_tests" - timeout_in_minutes: 15 - plugins: - - docker-compose#v3.0.0: - run: unit-test - config: .buildkite/docker/docker-compose.yaml - wait From e38099c1803fe33e5e87954b076b3579a9663e51 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 14:24:28 -0800 Subject: [PATCH 23/26] Clean up a bunch of results that didn't need to exist. Better name/doc for iterate_wf --- src/lib.rs | 37 ++++++++--------------- src/machines/mod.rs | 27 ++++++++--------- src/machines/test_help/workflow_driver.rs | 28 +++++------------ src/machines/workflow_machines.rs | 12 ++++---- tests/integ_tests/poller_test.rs | 2 -- 5 files changed, 38 insertions(+), 68 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index d42a9b9ee..e6b7c92d4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -43,7 +43,6 @@ use crate::{ }, protosext::{HistoryInfo, HistoryInfoError}, }; -use anyhow::Error; use dashmap::DashMap; use std::{ convert::TryInto, @@ -228,24 +227,18 @@ where run_id: &str, success: WfActivationSuccess, ) -> Result<(), CoreError> { - // TODO: No unwraps // Convert to wf commands let cmds = success .commands .into_iter() .map(|c| c.try_into().map_err(Into::into)) .collect::>>()?; - self.workflow_machines - .get_mut(run_id) - .unwrap() - .1 - .send(cmds)?; - self.workflow_machines - .get_mut(run_id) - .unwrap() - .0 - .event_loop() - .unwrap(); + if let Some(mut machine) = self.workflow_machines.get_mut(run_id) { + machine.1.send(cmds)?; + machine.0.event_loop(); + } else { + // TODO: Error + } Ok(()) } } @@ -300,27 +293,23 @@ impl WorkflowBridge { impl DrivenWorkflow for WorkflowBridge { #[instrument] - fn start( - &mut self, - attribs: WorkflowExecutionStartedEventAttributes, - ) -> Result, Error> { + fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec { self.started_attrs = Some(attribs); - Ok(vec![]) + vec![] } #[instrument] - fn iterate_wf(&mut self) -> Result, Error> { - Ok(self - .incoming_commands + fn fetch_workflow_iteration_output(&mut self) -> Vec { + self.incoming_commands .try_recv() - .unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang])) + .unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]) } - fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) -> Result<(), Error> { + fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) { unimplemented!() } - fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) -> Result<(), Error> { + fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) { unimplemented!() } } diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 0f62f33cc..6f54d0430 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -70,26 +70,23 @@ pub(crate) type ProtoCommand = Command; /// drive it, start it, signal it, cancel it, etc. pub(crate) trait DrivenWorkflow: ActivationListener + Send { /// Start the workflow - fn start( - &mut self, - attribs: WorkflowExecutionStartedEventAttributes, - ) -> Result, anyhow::Error>; + fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec; - /// Iterate the workflow. The workflow driver should execute workflow code until there is - /// nothing left to do. EX: Awaiting an activity/timer, workflow completion. - fn iterate_wf(&mut self) -> Result, anyhow::Error>; + /// Obtain any output from the workflow's recent execution(s). Because the lang sdk is + /// responsible for calling workflow code as a result of receiving tasks from + /// [crate::Core::poll_task], we cannot directly iterate it here. Thus implementations of this + /// trait are expected to either buffer output or otherwise produce it on demand when this + /// function is called. + /// + /// In the case of the real [WorkflowBridge] implementation, commands are simply pulled from + /// a buffer that the language side sinks into when it calls [crate::Core::complete_task] + fn fetch_workflow_iteration_output(&mut self) -> Vec; /// Signal the workflow - fn signal( - &mut self, - attribs: WorkflowExecutionSignaledEventAttributes, - ) -> Result<(), anyhow::Error>; + fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes); /// Cancel the workflow - fn cancel( - &mut self, - attribs: WorkflowExecutionCanceledEventAttributes, - ) -> Result<(), anyhow::Error>; + fn cancel(&mut self, attribs: WorkflowExecutionCanceledEventAttributes); } /// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 92aa306e1..56d9843ef 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,4 +1,3 @@ -use super::Result; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ @@ -74,15 +73,12 @@ where Fut: Future, { #[instrument(skip(self))] - fn start( - &mut self, - _attribs: WorkflowExecutionStartedEventAttributes, - ) -> Result, anyhow::Error> { - Ok(vec![]) + fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) -> Vec { + vec![] } #[instrument(skip(self))] - fn iterate_wf(&mut self) -> Result, anyhow::Error> { + fn fetch_workflow_iteration_output(&mut self) -> Vec { let (sender, receiver) = CommandSender::new(self.cache.clone()); // Call the closure that produces the workflow future let wf_future = (self.wf_function)(sender); @@ -106,26 +102,16 @@ where } // Return only the last command, since that's what would've been yielded in a real wf - Ok(if let Some(c) = last_cmd { + if let Some(c) = last_cmd { vec![c] } else { vec![] - }) + } } - fn signal( - &mut self, - _attribs: WorkflowExecutionSignaledEventAttributes, - ) -> Result<(), anyhow::Error> { - Ok(()) - } + fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} - fn cancel( - &mut self, - _attribs: WorkflowExecutionCanceledEventAttributes, - ) -> Result<(), anyhow::Error> { - Ok(()) - } + fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {} } #[derive(Debug, derive_more::From)] diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index bd0b396b5..8596ab175 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -217,7 +217,7 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); - self.event_loop()?; + self.event_loop(); Ok(()) } @@ -297,7 +297,7 @@ impl WorkflowMachines { } .into(), ); - let results = self.drive_me.start(attrs.clone())?; + let results = self.drive_me.start(attrs.clone()); self.handle_driven_results(results); } else { return Err(WFMachinesError::MalformedEvent( @@ -381,13 +381,13 @@ impl WorkflowMachines { .expect("We have just ensured this is populated") } - // TODO: Shouldn't be pub - pub fn event_loop(&mut self) -> Result<()> { - let results = self.drive_me.iterate_wf()?; + /// Runs the event loop, which consists of grabbing any pending outgoing commands from the + /// workflow, handling them, and preparing them to be sent off to the server. + pub(crate) fn event_loop(&mut self) { + let results = self.drive_me.fetch_workflow_iteration_output(); self.handle_driven_results(results); self.prepare_commands(); - Ok(()) } /// Wrapper for calling [TemporalStateMachine::handle_event] which appropriately takes action diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index ba9653920..8e272e2d2 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -64,7 +64,6 @@ fn timer_workflow() { let mut rng = rand::thread_rng(); let timer_id: String = rng.gen::().to_string(); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); - // TODO verify core.complete_task(CompleteTaskReq::ok_from_api_attrs( StartTimerCommandAttributes { timer_id: timer_id.to_string(), @@ -77,7 +76,6 @@ fn timer_workflow() { .unwrap(); dbg!("sent completion w/ start timer"); let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); - // TODO verify core.complete_task(CompleteTaskReq::ok_from_api_attrs( CompleteWorkflowExecutionCommandAttributes { result: None }.into(), task.task_token, From ed1c946bfe9b70526b9e68682f75d71deae6ab5b Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Thu, 4 Feb 2021 14:50:29 -0800 Subject: [PATCH 24/26] A few other bits of idiomaticizing --- src/lib.rs | 24 ++++-------------------- src/pollers/mod.rs | 13 +++++++++++++ 2 files changed, 17 insertions(+), 20 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index e6b7c92d4..7e87cfcfb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,9 +8,6 @@ pub extern crate assert_matches; #[macro_use] extern crate tracing; -#[cfg(test)] -#[macro_use] -extern crate mockall; mod machines; mod pollers; @@ -350,12 +347,10 @@ pub enum CoreError { #[cfg(test)] mod test { - use std::collections::VecDeque; - - use tracing::Level; - + use super::*; use crate::{ machines::test_help::TestHistoryBuilder, + pollers::MockServerGateway, protos::{ coresdk::{wf_activation_job, WfActivationJob}, temporal::api::{ @@ -367,20 +362,9 @@ mod test { }, }, }; + use std::collections::VecDeque; + use tracing::Level; - mock! { - ServerGateway {} - #[async_trait::async_trait] - impl PollWorkflowTaskQueueApi for ServerGateway { - async fn poll(&self, task_queue: &str) -> Result; - } - #[async_trait::async_trait] - impl RespondWorkflowTaskCompletedApi for ServerGateway { - async fn complete(&self, task_token: Vec, commands: Vec) -> Result; - } - } - - use super::*; #[test] fn workflow_bridge() { let s = span!(Level::DEBUG, "Test start"); diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index d0839c3a9..cf3c6125f 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -121,3 +121,16 @@ impl RespondWorkflowTaskCompletedApi for ServerGateway { .into_inner()) } } + +#[cfg(test)] +mockall::mock! { + pub(crate) ServerGateway {} + #[async_trait::async_trait] + impl PollWorkflowTaskQueueApi for ServerGateway { + async fn poll(&self, task_queue: &str) -> Result; + } + #[async_trait::async_trait] + impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete(&self, task_token: Vec, commands: Vec) -> Result; + } +} From 4c75d174bae8df2c9832c38698c53e555d21a4b5 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 4 Feb 2021 16:04:31 -0800 Subject: [PATCH 25/26] Adding temporal service for integ tests --- .buildkite/docker/docker-compose.yaml | 50 +++++++++++++++++++++++++++ .buildkite/pipeline.yml | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/.buildkite/docker/docker-compose.yaml b/.buildkite/docker/docker-compose.yaml index 0aca4f6e2..3c4ef9c91 100644 --- a/.buildkite/docker/docker-compose.yaml +++ b/.buildkite/docker/docker-compose.yaml @@ -1,6 +1,44 @@ version: '3.5' services: + cassandra: + image: cassandra:3.11 + logging: + driver: none + ports: + - "9042:9042" + + statsd: + image: hopsoft/graphite-statsd + logging: + driver: none + ports: + - "8080:80" + - "2003:2003" + - "8125:8125" + - "8126:8126" + + temporal: + image: temporalio/auto-setup:1.6.3 + logging: + driver: none + ports: + - "7233:7233" + - "7234:7234" + - "7235:7235" + - "7239:7239" + - "6933:6933" + - "6934:6934" + - "6935:6935" + - "6939:6939" + environment: + - "CASSANDRA_SEEDS=cassandra" + - "STATSD_ENDPOINT=statsd:8125" + - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" + depends_on: + - cassandra + - statsd + unit-test: build: context: ../../ @@ -12,3 +50,15 @@ services: - "USER=unittest" volumes: - "../../:/sdk-core" + + integ-test: + build: + context: ../../ + dockerfile: .buildkite/docker/Dockerfile + command: /bin/sh -c ".buildkite/docker/build.sh" + environment: + - "USER=unittest" + depends_on: + - temporal + volumes: + - "../../:/sdk-core" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 1aaecbc72..4df301d43 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -39,6 +39,6 @@ steps: timeout_in_minutes: 15 plugins: - docker-compose#v3.0.0: - run: unit-test + run: integ-test config: .buildkite/docker/docker-compose.yaml - wait From 85ee66d149df5cc1cfdb4f0067329bce6e3d7310 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Thu, 4 Feb 2021 17:44:19 -0800 Subject: [PATCH 26/26] Specify endpoint for integ tests as docker doesn't see localhost --- .buildkite/docker/docker-compose.yaml | 1 + tests/integ_tests/poller_test.rs | 11 +++++++---- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/.buildkite/docker/docker-compose.yaml b/.buildkite/docker/docker-compose.yaml index 3c4ef9c91..002bd045e 100644 --- a/.buildkite/docker/docker-compose.yaml +++ b/.buildkite/docker/docker-compose.yaml @@ -58,6 +58,7 @@ services: command: /bin/sh -c ".buildkite/docker/build.sh" environment: - "USER=unittest" + - "TEMPORAL_SERVICE_ADDRESS=http://temporal:7233" depends_on: - temporal volumes: diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs index 8e272e2d2..cf0cd866c 100644 --- a/tests/integ_tests/poller_test.rs +++ b/tests/integ_tests/poller_test.rs @@ -1,5 +1,5 @@ use rand::{self, Rng}; -use std::{convert::TryFrom, time::Duration}; +use std::{convert::TryFrom, env, time::Duration}; use temporal_sdk_core::{ protos::{ coresdk::CompleteTaskReq, @@ -16,13 +16,16 @@ use temporal_sdk_core::{ const TASK_QUEUE: &str = "test-tq"; const NAMESPACE: &str = "default"; -const TARGET_URI: &'static str = "http://localhost:7233"; - // TODO try to consolidate this into the SDK code so we don't need to create another runtime. #[tokio::main] async fn create_workflow() -> (String, String, ServerGatewayOptions) { + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + let mut rng = rand::thread_rng(); - let url = Url::try_from(TARGET_URI).unwrap(); + let url = Url::try_from(&*temporal_server_address).unwrap(); let workflow_id: u32 = rng.gen(); let request_id: u32 = rng.gen(); let gateway_opts = ServerGatewayOptions {