diff --git a/.buildkite/docker/docker-compose.yaml b/.buildkite/docker/docker-compose.yaml index 0aca4f6e2..002bd045e 100644 --- a/.buildkite/docker/docker-compose.yaml +++ b/.buildkite/docker/docker-compose.yaml @@ -1,6 +1,44 @@ version: '3.5' services: + cassandra: + image: cassandra:3.11 + logging: + driver: none + ports: + - "9042:9042" + + statsd: + image: hopsoft/graphite-statsd + logging: + driver: none + ports: + - "8080:80" + - "2003:2003" + - "8125:8125" + - "8126:8126" + + temporal: + image: temporalio/auto-setup:1.6.3 + logging: + driver: none + ports: + - "7233:7233" + - "7234:7234" + - "7235:7235" + - "7239:7239" + - "6933:6933" + - "6934:6934" + - "6935:6935" + - "6939:6939" + environment: + - "CASSANDRA_SEEDS=cassandra" + - "STATSD_ENDPOINT=statsd:8125" + - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" + depends_on: + - cassandra + - statsd + unit-test: build: context: ../../ @@ -12,3 +50,16 @@ services: - "USER=unittest" volumes: - "../../:/sdk-core" + + integ-test: + build: + context: ../../ + dockerfile: .buildkite/docker/Dockerfile + command: /bin/sh -c ".buildkite/docker/build.sh" + environment: + - "USER=unittest" + - "TEMPORAL_SERVICE_ADDRESS=http://temporal:7233" + depends_on: + - temporal + volumes: + - "../../:/sdk-core" diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 415db733f..4df301d43 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -31,4 +31,14 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + - label: "integ-test" + agents: + queue: "default" + docker: "*" + command: "cargo test --test integ_tests" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: integ-test + config: .buildkite/docker/docker-compose.yaml - wait diff --git a/Cargo.toml b/Cargo.toml index 6eb064d9c..13f9c1b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,11 +21,17 @@ prost = "0.7" prost-types = "0.7" thiserror = "1.0" tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] } -tonic = "0.4" tracing = { version = "0.1", features = ["log"] } -tracing-opentelemetry = "0.10" +tracing-opentelemetry = "0.11" tracing-subscriber = "0.2" url = "2.2" +rand = "0.8.3" + +[dependencies.tonic] +version = "0.4" +#path = "../tonic/tonic" +# Using our fork for now which fixes grpc-timeout header getting stripped +git = "https://github.com/temporalio/tonic" [dependencies.rustfsm] path = "fsm" @@ -40,3 +46,10 @@ tonic-build = "0.4" [workspace] members = [".", "fsm"] + +[[test]] +name = "integ_tests" +path = "tests/main.rs" +# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with +# `cargo test --test integ_tests` +test = false \ No newline at end of file diff --git a/README.md b/README.md index cc352b7ba..d4f1667b4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,6 @@ -Core SDK that can be used as a base for all other SDKs. +[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master) + +Core SDK that can be used as a base for all other Temporal SDKs. # Getting started This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 282b94866..7d997214f 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -19,10 +19,16 @@ import "temporal/api/query/v1/message.proto"; // A request as given to [crate::Core::poll_task] message PollTaskReq { - // If true, poll for workflow tasks - bool workflows = 1; - // If true, poll for activity tasks - bool activities = 2; + // What type of task to poll for + enum TaskType { + // Poll for workflows + WORKFLOWS = 0; + // Poll for activities + ACTIVITIES = 1; + } + + // A list of task types to poll for + repeated TaskType types = 1; } // An instruction to perform work from core->lang sdk @@ -41,7 +47,7 @@ message Task { // An instruction to the lang sdk to run some workflow code, whether for the first time or from // a cached state. message WFActivation { - // Time the activation(s) were requested + // The current time as understood by the workflow, which is set by workflow task started events google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; // The id of the currently active run of the workflow string run_id = 2; diff --git a/src/lib.rs b/src/lib.rs index 7a760a37d..7e87cfcfb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,23 +3,25 @@ //! This crate provides a basis for creating new Temporal SDKs without completely starting from //! scratch -#[macro_use] -extern crate tracing; #[cfg(test)] #[macro_use] -extern crate assert_matches; - -pub mod protos; +pub extern crate assert_matches; +#[macro_use] +extern crate tracing; mod machines; mod pollers; +pub mod protos; mod protosext; +pub use pollers::{ServerGateway, ServerGatewayOptions}; +pub use url::Url; + use crate::{ machines::{ - ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, + ActivationListener, DrivenWorkflow, InconvertibleCommandError, ProtoCommand, WFCommand, + WorkflowMachines, }, - pollers::ServerGatewayOptions, protos::{ coresdk::{ complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task, @@ -31,19 +33,20 @@ use crate::{ WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, WorkflowExecutionStartedEventAttributes, }, - workflowservice::v1::PollWorkflowTaskQueueResponse, + workflowservice::v1::{ + PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse, + }, }, }, protosext::{HistoryInfo, HistoryInfoError}, }; -use anyhow::Error; use dashmap::DashMap; use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; use tokio::runtime::Runtime; -use url::Url; +use tonic::codegen::http::uri::InvalidUri; /// A result alias having [CoreError] as the error type pub type Result = std::result::Result; @@ -57,7 +60,7 @@ pub trait Core { /// language SDK's responsibility to call the appropriate code with the provided inputs. /// /// TODO: Examples - fn poll_task(&self) -> Result; + fn poll_task(&self, task_queue: &str) -> Result; /// Tell the core that some work has been completed - whether as a result of running workflow /// code or executing an activity. @@ -66,46 +69,42 @@ pub trait Core { /// Holds various configuration information required to call [init] pub struct CoreInitOptions { - /// The URL of the Temporal server to connect to - pub target_url: Url, - - /// The namespace on the server your worker will be using - pub namespace: String, - - /// A human-readable string that can identify your worker - /// - /// TODO: Probably belongs in future worker abstraction - pub identity: String, - - /// A string that should be unique to the exact worker code/binary being executed - pub worker_binary_id: String, + /// Options for the connection to the temporal server + pub gateway_opts: ServerGatewayOptions, } /// Initializes an instance of the core sdk and establishes a connection to the temporal server. /// -/// Note: Also creates tokio runtime that will be used for all client-server interactions. +/// Note: Also creates a tokio runtime that will be used for all client-server interactions. +/// +/// # Panics +/// * Will panic if called from within an async context, as it will construct a runtime and you +/// cannot construct a runtime from within a runtime. pub fn init(opts: CoreInitOptions) -> Result { let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; - let gateway_opts = ServerGatewayOptions { - namespace: opts.namespace, - identity: opts.identity, - worker_binary_id: opts.worker_binary_id, - }; // Initialize server client - let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; + let work_provider = runtime.block_on(opts.gateway_opts.connect())?; Ok(CoreSDK { runtime, - work_provider, + server_gateway: work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), }) } +/// Type of task queue to poll. +pub enum TaskQueue { + /// Workflow task + Workflow(String), + /// Activity task + _Activity(String), +} + struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs - work_provider: WP, + server_gateway: WP, /// Key is run id workflow_machines: DashMap>)>, /// Maps task tokens to workflow run ids @@ -114,14 +113,14 @@ struct CoreSDK { impl Core for CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi, { #[instrument(skip(self))] - fn poll_task(&self) -> Result { + fn poll_task(&self, task_queue: &str) -> Result { // This will block forever in the event there is no work from the server let work = self .runtime - .block_on(self.work_provider.get_work("TODO: Real task queue"))?; + .block_on(self.server_gateway.poll(task_queue))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -167,13 +166,20 @@ where status: Some(wfstatus), })), } => { - let wf_run_id = self + let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or(CoreError::NothingFoundForTaskToken(task_token))?; + .ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?; match wfstatus { - Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?, + Status::Successful(success) => { + self.push_lang_commands(&run_id, success)?; + if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) { + let commands = machines.0.get_commands(); + self.runtime + .block_on(self.server_gateway.complete(task_token, commands))?; + } + } Status::Failed(_) => {} } Ok(()) @@ -186,13 +192,12 @@ where } _ => Err(CoreError::MalformedCompletion(req)), } - // TODO: Get fsm commands and send them to server (get_commands) } } impl CoreSDK where - WP: WorkflowTaskProvider, + WP: PollWorkflowTaskQueueApi, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -225,11 +230,12 @@ where .into_iter() .map(|c| c.try_into().map_err(Into::into)) .collect::>>()?; - self.workflow_machines - .get_mut(run_id) - .unwrap() - .1 - .send(cmds)?; + if let Some(mut machine) = self.workflow_machines.get_mut(run_id) { + machine.1.send(cmds)?; + machine.0.event_loop(); + } else { + // TODO: Error + } Ok(()) } } @@ -238,9 +244,24 @@ where /// implementor. #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] -pub trait WorkflowTaskProvider { +pub(crate) trait PollWorkflowTaskQueueApi { /// Fetch new work. Should block indefinitely if there is no work. - async fn get_work(&self, task_queue: &str) -> Result; + async fn poll(&self, task_queue: &str) -> Result; +} + +/// Implementors can complete tasks as would've been issued by [Core::poll]. The real implementor +/// sends the completed tasks to the server. +#[cfg_attr(test, mockall::automock)] +#[async_trait::async_trait] +pub(crate) trait RespondWorkflowTaskCompletedApi { + /// Complete a task by sending it to the server. `task_token` is the task token that would've + /// been received from [PollWorkflowTaskQueueApi::poll]. `commands` is a list of new commands + /// to send to the server, such as starting a timer. + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -269,27 +290,23 @@ impl WorkflowBridge { impl DrivenWorkflow for WorkflowBridge { #[instrument] - fn start( - &mut self, - attribs: WorkflowExecutionStartedEventAttributes, - ) -> Result, Error> { + fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec { self.started_attrs = Some(attribs); - Ok(vec![]) + vec![] } #[instrument] - fn iterate_wf(&mut self) -> Result, Error> { - Ok(self - .incoming_commands + fn fetch_workflow_iteration_output(&mut self) -> Vec { + self.incoming_commands .try_recv() - .unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang])) + .unwrap_or_else(|_| vec![WFCommand::NoCommandsFromLang]) } - fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) -> Result<(), Error> { + fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) { unimplemented!() } - fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) -> Result<(), Error> { + fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) { unimplemented!() } } @@ -324,6 +341,8 @@ pub enum CoreError { TonicTransportError(#[from] tonic::transport::Error), /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), + /// Invalid URI: {0:?} + InvalidUri(#[from] InvalidUri), } #[cfg(test)] @@ -331,6 +350,7 @@ mod test { use super::*; use crate::{ machines::test_help::TestHistoryBuilder, + pollers::MockServerGateway, protos::{ coresdk::{wf_activation_job, WfActivationJob}, temporal::api::{ @@ -353,6 +373,7 @@ mod test { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; let timer_id = "fake_timer"; + let task_queue = "test-task-queue"; let mut t = TestHistoryBuilder::default(); t.add_by_type(EventType::WorkflowExecutionStarted); @@ -401,20 +422,24 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkflowTaskProvider::new(); - mock_provider - .expect_get_work() + let mut mock_gateway = MockServerGateway::new(); + mock_gateway + .expect_poll() .returning(move |_| Ok(tasks.pop_front().unwrap())); + // Response not really important here + mock_gateway + .expect_complete() + .returning(|_, _| Ok(RespondWorkflowTaskCompletedResponse::default())); let runtime = Runtime::new().unwrap(); let core = CoreSDK { runtime, - work_provider: mock_provider, + server_gateway: mock_gateway, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), }; - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), @@ -436,7 +461,7 @@ mod test { .unwrap(); dbg!("sent completion w/ start timer"); - let res = dbg!(core.poll_task().unwrap()); + let res = dbg!(core.poll_task(task_queue).unwrap()); // TODO: uggo assert_matches!( res.get_wf_jobs().as_slice(), diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 0f62f33cc..6f54d0430 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -70,26 +70,23 @@ pub(crate) type ProtoCommand = Command; /// drive it, start it, signal it, cancel it, etc. pub(crate) trait DrivenWorkflow: ActivationListener + Send { /// Start the workflow - fn start( - &mut self, - attribs: WorkflowExecutionStartedEventAttributes, - ) -> Result, anyhow::Error>; + fn start(&mut self, attribs: WorkflowExecutionStartedEventAttributes) -> Vec; - /// Iterate the workflow. The workflow driver should execute workflow code until there is - /// nothing left to do. EX: Awaiting an activity/timer, workflow completion. - fn iterate_wf(&mut self) -> Result, anyhow::Error>; + /// Obtain any output from the workflow's recent execution(s). Because the lang sdk is + /// responsible for calling workflow code as a result of receiving tasks from + /// [crate::Core::poll_task], we cannot directly iterate it here. Thus implementations of this + /// trait are expected to either buffer output or otherwise produce it on demand when this + /// function is called. + /// + /// In the case of the real [WorkflowBridge] implementation, commands are simply pulled from + /// a buffer that the language side sinks into when it calls [crate::Core::complete_task] + fn fetch_workflow_iteration_output(&mut self) -> Vec; /// Signal the workflow - fn signal( - &mut self, - attribs: WorkflowExecutionSignaledEventAttributes, - ) -> Result<(), anyhow::Error>; + fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes); /// Cancel the workflow - fn cancel( - &mut self, - attribs: WorkflowExecutionCanceledEventAttributes, - ) -> Result<(), anyhow::Error>; + fn cancel(&mut self, attribs: WorkflowExecutionCanceledEventAttributes); } /// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 92aa306e1..56d9843ef 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -1,4 +1,3 @@ -use super::Result; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ @@ -74,15 +73,12 @@ where Fut: Future, { #[instrument(skip(self))] - fn start( - &mut self, - _attribs: WorkflowExecutionStartedEventAttributes, - ) -> Result, anyhow::Error> { - Ok(vec![]) + fn start(&mut self, _attribs: WorkflowExecutionStartedEventAttributes) -> Vec { + vec![] } #[instrument(skip(self))] - fn iterate_wf(&mut self) -> Result, anyhow::Error> { + fn fetch_workflow_iteration_output(&mut self) -> Vec { let (sender, receiver) = CommandSender::new(self.cache.clone()); // Call the closure that produces the workflow future let wf_future = (self.wf_function)(sender); @@ -106,26 +102,16 @@ where } // Return only the last command, since that's what would've been yielded in a real wf - Ok(if let Some(c) = last_cmd { + if let Some(c) = last_cmd { vec![c] } else { vec![] - }) + } } - fn signal( - &mut self, - _attribs: WorkflowExecutionSignaledEventAttributes, - ) -> Result<(), anyhow::Error> { - Ok(()) - } + fn signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {} - fn cancel( - &mut self, - _attribs: WorkflowExecutionCanceledEventAttributes, - ) -> Result<(), anyhow::Error> { - Ok(()) - } + fn cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {} } #[derive(Debug, derive_more::From)] diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 1315e0df1..8596ab175 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -217,7 +217,7 @@ impl WorkflowMachines { self.current_started_event_id = task_started_event_id; self.set_current_time(time); - self.event_loop()?; + self.event_loop(); Ok(()) } @@ -297,7 +297,7 @@ impl WorkflowMachines { } .into(), ); - let results = self.drive_me.start(attrs.clone())?; + let results = self.drive_me.start(attrs.clone()); self.handle_driven_results(results); } else { return Err(WFMachinesError::MalformedEvent( @@ -381,12 +381,13 @@ impl WorkflowMachines { .expect("We have just ensured this is populated") } - fn event_loop(&mut self) -> Result<()> { - let results = self.drive_me.iterate_wf()?; + /// Runs the event loop, which consists of grabbing any pending outgoing commands from the + /// workflow, handling them, and preparing them to be sent off to the server. + pub(crate) fn event_loop(&mut self) { + let results = self.drive_me.fetch_workflow_iteration_output(); self.handle_driven_results(results); self.prepare_commands(); - Ok(()) } /// Wrapper for calling [TemporalStateMachine::handle_event] which appropriately takes action diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index d26b5d9d5..cf3c6125f 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,38 +1,85 @@ -use crate::protos::temporal::api::enums::v1::TaskQueueKind; -use crate::protos::temporal::api::taskqueue::v1::TaskQueue; -use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; +use std::time::Duration; + +use crate::machines::ProtoCommand; use crate::protos::temporal::api::workflowservice::v1::{ - PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + RespondWorkflowTaskCompletedRequest, RespondWorkflowTaskCompletedResponse, +}; +use crate::{ + protos::temporal::api::enums::v1::TaskQueueKind, + protos::temporal::api::taskqueue::v1::TaskQueue, + protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient, + protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, + }, + PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, Result, }; -use crate::Result; -use crate::WorkflowTaskProvider; +use tonic::{transport::Channel, Request, Status}; use url::Url; -#[derive(Clone)] -pub(crate) struct ServerGatewayOptions { +/// Options for the connection to the temporal server +#[derive(Clone, Debug)] +pub struct ServerGatewayOptions { + /// The URL of the Temporal server to connect to + pub target_url: Url, + + /// What namespace will we operate under pub namespace: String, + + /// A human-readable string that can identify your worker + /// + /// TODO: Probably belongs in future worker abstraction pub identity: String, + + /// A string that should be unique to the exact worker code/binary being executed pub worker_binary_id: String, + + /// Timeout for long polls (polling of task queues) + pub long_poll_timeout: Duration, } impl ServerGatewayOptions { - pub(crate) async fn connect(&self, target_url: Url) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + /// Attempt to establish a connection to the Temporal server + pub async fn connect(&self) -> Result { + let channel = Channel::from_shared(self.target_url.to_string())? + .connect() + .await?; + let service = WorkflowServiceClient::with_interceptor(channel, intercept); Ok(ServerGateway { service, opts: self.clone(), }) } } -/// Provides -pub(crate) struct ServerGateway { - service: WorkflowServiceClient, - opts: ServerGatewayOptions, + +/// This function will get called on each outbound request. Returning a +/// `Status` here will cancel the request and have that status returned to +/// the caller. +fn intercept(mut req: Request<()>) -> Result, Status> { + let metadata = req.metadata_mut(); + // TODO: Only apply this to long poll requests + metadata.insert( + "grpc-timeout", + "50000m".parse().expect("Static value is parsable"), + ); + metadata.insert( + "client-name", + "core-sdk".parse().expect("Static value is parsable"), + ); + Ok(req) +} + +/// Contains an instance of a client for interacting with the temporal server +pub struct ServerGateway { + /// Client for interacting with workflow service + pub service: WorkflowServiceClient, + /// Options gateway was initialized with + pub opts: ServerGatewayOptions, } -impl ServerGateway { +#[async_trait::async_trait] +impl PollWorkflowTaskQueueApi for ServerGateway { async fn poll(&self, task_queue: &str) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + let request = PollWorkflowTaskQueueRequest { namespace: self.opts.namespace.to_string(), task_queue: Some(TaskQueue { name: task_queue.to_string(), @@ -40,7 +87,7 @@ impl ServerGateway { }), identity: self.opts.identity.to_string(), binary_checksum: self.opts.worker_binary_id.to_string(), - }); + }; Ok(self .service @@ -52,8 +99,38 @@ impl ServerGateway { } #[async_trait::async_trait] -impl WorkflowTaskProvider for ServerGateway { - async fn get_work(&self, task_queue: &str) -> Result { - self.poll(task_queue).await +impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete( + &self, + task_token: Vec, + commands: Vec, + ) -> Result { + let request = RespondWorkflowTaskCompletedRequest { + task_token, + commands, + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), + namespace: self.opts.namespace.to_string(), + ..Default::default() + }; + Ok(self + .service + .clone() + .respond_workflow_task_completed(request) + .await? + .into_inner()) + } +} + +#[cfg(test)] +mockall::mock! { + pub(crate) ServerGateway {} + #[async_trait::async_trait] + impl PollWorkflowTaskQueueApi for ServerGateway { + async fn poll(&self, task_queue: &str) -> Result; + } + #[async_trait::async_trait] + impl RespondWorkflowTaskCompletedApi for ServerGateway { + async fn complete(&self, task_token: Vec, commands: Vec) -> Result; } } diff --git a/tests/integ_tests/poller_test.rs b/tests/integ_tests/poller_test.rs new file mode 100644 index 000000000..cf0cd866c --- /dev/null +++ b/tests/integ_tests/poller_test.rs @@ -0,0 +1,92 @@ +use rand::{self, Rng}; +use std::{convert::TryFrom, env, time::Duration}; +use temporal_sdk_core::{ + protos::{ + coresdk::CompleteTaskReq, + temporal::api::command::v1::{ + CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + }, + temporal::api::common::v1::WorkflowType, + temporal::api::taskqueue::v1::TaskQueue, + temporal::api::workflowservice::v1::StartWorkflowExecutionRequest, + }, + Core, CoreInitOptions, ServerGatewayOptions, Url, +}; + +const TASK_QUEUE: &str = "test-tq"; +const NAMESPACE: &str = "default"; + +// TODO try to consolidate this into the SDK code so we don't need to create another runtime. +#[tokio::main] +async fn create_workflow() -> (String, String, ServerGatewayOptions) { + let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { + Ok(addr) => addr, + Err(_) => "http://localhost:7233".to_owned(), + }; + + let mut rng = rand::thread_rng(); + let url = Url::try_from(&*temporal_server_address).unwrap(); + let workflow_id: u32 = rng.gen(); + let request_id: u32 = rng.gen(); + let gateway_opts = ServerGatewayOptions { + namespace: NAMESPACE.to_string(), + identity: "none".to_string(), + worker_binary_id: "".to_string(), + long_poll_timeout: Duration::from_secs(60), + target_url: url, + }; + let mut gateway = gateway_opts.connect().await.unwrap(); + let response = gateway + .service + .start_workflow_execution(StartWorkflowExecutionRequest { + namespace: NAMESPACE.to_string(), + workflow_id: workflow_id.to_string(), + workflow_type: Some(WorkflowType { + name: "test-workflow".to_string(), + }), + task_queue: Some(TaskQueue { + name: TASK_QUEUE.to_string(), + kind: 0, + }), + request_id: request_id.to_string(), + ..Default::default() + }) + .await + .unwrap(); + ( + workflow_id.to_string(), + response.into_inner().run_id, + gateway_opts, + ) +} + +#[test] +fn timer_workflow() { + let (workflow_id, run_id, gateway_opts) = dbg!(create_workflow()); + let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let mut rng = rand::thread_rng(); + let timer_id: String = rng.gen::().to_string(); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + StartTimerCommandAttributes { + timer_id: timer_id.to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), + ..Default::default() + } + .into(), + task.task_token, + )) + .unwrap(); + dbg!("sent completion w/ start timer"); + let task = dbg!(core.poll_task(TASK_QUEUE).unwrap()); + core.complete_task(CompleteTaskReq::ok_from_api_attrs( + CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + task.task_token, + )) + .unwrap(); + dbg!( + "sent workflow done, completed workflow", + workflow_id, + run_id + ); +} diff --git a/tests/main.rs b/tests/main.rs new file mode 100644 index 000000000..976cf28e4 --- /dev/null +++ b/tests/main.rs @@ -0,0 +1,4 @@ +#[cfg(test)] +mod integ_tests { + mod poller_test; +}