From e69f6fc20c32b15c5746db17fdc97d05b23bb6fa Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 1 Feb 2021 16:10:10 -0800 Subject: [PATCH 1/5] Wire in poller task and add init method for establishing server connection --- src/lib.rs | 43 +++++++++++++++------- src/pollers/mod.rs | 60 +++++++++++++++++++++++++++---- src/pollers/poll_task.rs | 6 ---- src/pollers/workflow_poll_task.rs | 40 --------------------- 4 files changed, 85 insertions(+), 64 deletions(-) delete mode 100644 src/pollers/poll_task.rs delete mode 100644 src/pollers/workflow_poll_task.rs diff --git a/src/lib.rs b/src/lib.rs index 095a3a2c4..a95365b16 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod protosext; pub use protosext::HistoryInfo; +use crate::pollers::ServerGateway; use crate::{ machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, protos::{ @@ -35,6 +36,7 @@ use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; +use tokio::runtime::Runtime; use url::Url; pub type Result = std::result::Result; @@ -50,22 +52,28 @@ pub trait Core { } pub struct CoreInitOptions { - temporal_server: Url, - _queue_name: String, + target_url: Url, + task_queue: String, } -pub fn init(_opts: CoreInitOptions) -> Result> { +/// Initializes instance of the core sdk and establishes connection to the temporal server. +/// Creates tokio runtime that will be used for all client-server interactions. +pub fn init(opts: CoreInitOptions) -> Result { + let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; // Initialize server client - let work_provider = unimplemented!(); + let work_provider = + runtime.block_on(ServerGateway::connect(opts.target_url, opts.task_queue))?; - Ok(Box::new(CoreSDK { + Ok(CoreSDK { + runtime, work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), - })) + }) } pub struct CoreSDK { + runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs work_provider: WP, /// Key is run id @@ -76,12 +84,14 @@ pub struct CoreSDK { impl Core for CoreSDK where - WP: WorkProvider, + WP: WorkflowTaskProvider, { #[instrument(skip(self))] fn poll_task(&self) -> Result { // This will block forever in the event there is no work from the server - let work = self.work_provider.get_work("TODO: Real task queue")?; + let work = self + .runtime + .block_on(self.work_provider.get_work("TODO: Real task queue"))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -152,7 +162,7 @@ where impl CoreSDK where - WP: WorkProvider, + WP: WorkflowTaskProvider, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -197,9 +207,10 @@ where /// Implementors can provide new work to the SDK. The connection to the server is the real /// implementor. #[cfg_attr(test, mockall::automock)] -pub trait WorkProvider { +#[async_trait::async_trait] +pub trait WorkflowTaskProvider { /// Fetch new work. Should block indefinitely if there is no work. - fn get_work(&self, task_queue: &str) -> Result; + async fn get_work(&self, task_queue: &str) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -272,6 +283,12 @@ pub enum CoreError { UnderlyingHistError(#[from] HistoryInfoError), #[error("Task token had nothing associated with it: {0:?}")] NothingFoundForTaskToken(Vec), + #[error("Error calling the service: {0:?}")] + TonicError(#[from] tonic::Status), + #[error("Server connection error: {0:?}")] + TonicTransportError(#[from] tonic::transport::Error), + #[error("Failed to initialize tokio runtime: {0:?}")] + TokioInitError(std::io::Error), } #[cfg(test)] @@ -350,12 +367,14 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkProvider::new(); + let mut mock_provider = MockWorkflowTaskProvider::new(); mock_provider .expect_get_work() .returning(move |_| Ok(tasks.pop_front().unwrap())); + let runtime = Runtime::new().unwrap(); let core = CoreSDK { + runtime, work_provider: mock_provider, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index f7c886369..c15e0b008 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,9 +1,57 @@ -mod poll_task; -#[allow(unused)] -mod workflow_poll_task; - +use crate::protos::temporal::api::enums::v1::TaskQueueKind; +use crate::protos::temporal::api::taskqueue::v1::TaskQueue; +use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; +use crate::protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +}; +use crate::Result; +use crate::WorkflowTaskProvider; +use url::Url; /// Provides -pub(crate) struct ServerWorkProvider { +pub(crate) struct ServerGateway { + service: WorkflowServiceClient, + namespace: String, + task_queue: String, + identity: String, + binary_checksum: String, +} + +impl ServerGateway { + async fn poll(&self) -> Result { + let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + namespace: self.namespace.to_string(), + task_queue: Some(TaskQueue { + name: self.task_queue.to_string(), + kind: TaskQueueKind::Unspecified as i32, + }), + identity: self.identity.to_string(), + binary_checksum: self.binary_checksum.to_string(), + }); + + Ok(self + .service + .clone() + .poll_workflow_task_queue(request) + .await? + .into_inner()) + } + + pub(crate) async fn connect(target_url: Url, task_queue: String) -> Result { + let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + Ok(Self { + service, + namespace: "".to_string(), + task_queue, + identity: "".to_string(), + binary_checksum: "".to_string(), + }) + } +} -} \ No newline at end of file +#[async_trait::async_trait] +impl WorkflowTaskProvider for ServerGateway { + async fn get_work(&self, task_queue: &str) -> Result { + self.poll().await + } +} diff --git a/src/pollers/poll_task.rs b/src/pollers/poll_task.rs deleted file mode 100644 index 898acc81c..000000000 --- a/src/pollers/poll_task.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub type Result = std::result::Result; - -#[async_trait::async_trait] -pub trait PollTask { - async fn poll(&mut self) -> Result; -} diff --git a/src/pollers/workflow_poll_task.rs b/src/pollers/workflow_poll_task.rs deleted file mode 100644 index 1745b03ac..000000000 --- a/src/pollers/workflow_poll_task.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::{ - pollers::poll_task::PollTask, - pollers::poll_task::Result, - protos::temporal::api::{ - enums::v1::TaskQueueKind, taskqueue::v1::TaskQueue, - workflowservice::v1::workflow_service_client::WorkflowServiceClient, - workflowservice::v1::PollWorkflowTaskQueueRequest, - workflowservice::v1::PollWorkflowTaskQueueResponse, - }, -}; -use tonic::{codegen::Future, Response, Status}; - -struct WorkflowPollTask<'a> { - service: &'a mut WorkflowServiceClient, - namespace: String, - task_queue: String, - identity: String, - binary_checksum: String, -} - -#[async_trait::async_trait] -impl PollTask for WorkflowPollTask<'_> { - async fn poll(&mut self) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { - namespace: self.namespace.clone(), - task_queue: Some(TaskQueue { - name: self.task_queue.clone(), - kind: TaskQueueKind::Unspecified as i32, - }), - identity: self.identity.clone(), - binary_checksum: self.binary_checksum.clone(), - }); - - Ok(self - .service - .poll_workflow_task_queue(request) - .await? - .into_inner()) - } -} From 0abb70b4dd3902094fed7d3289fce4fae18ad983 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 1 Feb 2021 16:48:06 -0800 Subject: [PATCH 2/5] Enable server connection creation without task queue --- src/lib.rs | 15 +++++++++++---- src/pollers/mod.rs | 44 +++++++++++++++++++++++--------------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index a95365b16..42af01e28 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ mod protosext; pub use protosext::HistoryInfo; -use crate::pollers::ServerGateway; +use crate::pollers::{ServerGatewayOptions}; use crate::{ machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, protos::{ @@ -53,16 +53,23 @@ pub trait Core { pub struct CoreInitOptions { target_url: Url, - task_queue: String, + namespace: String, + task_queue: Vec, + identity: String, + binary_checksum: String, } /// Initializes instance of the core sdk and establishes connection to the temporal server. /// Creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; + let gateway_opts = ServerGatewayOptions { + namespace: opts.namespace, + identity: opts.identity, + binary_checksum: opts.binary_checksum, + }; // Initialize server client - let work_provider = - runtime.block_on(ServerGateway::connect(opts.target_url, opts.task_queue))?; + let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; Ok(CoreSDK { runtime, diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index c15e0b008..9cfbc03b4 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -8,25 +8,38 @@ use crate::Result; use crate::WorkflowTaskProvider; use url::Url; +#[derive(Clone)] +pub(crate) struct ServerGatewayOptions { + pub namespace: String, + pub identity: String, + pub binary_checksum: String, +} + +impl ServerGatewayOptions { + pub(crate) async fn connect(&self, target_url: Url) -> Result { + let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + Ok(ServerGateway { + service, + opts: self.clone(), + }) + } +} /// Provides pub(crate) struct ServerGateway { service: WorkflowServiceClient, - namespace: String, - task_queue: String, - identity: String, - binary_checksum: String, + opts: ServerGatewayOptions, } impl ServerGateway { - async fn poll(&self) -> Result { + async fn poll(&self, task_queue: &str) -> Result { let request = tonic::Request::new(PollWorkflowTaskQueueRequest { - namespace: self.namespace.to_string(), + namespace: self.opts.namespace.to_string(), task_queue: Some(TaskQueue { - name: self.task_queue.to_string(), + name: task_queue.to_string(), kind: TaskQueueKind::Unspecified as i32, }), - identity: self.identity.to_string(), - binary_checksum: self.binary_checksum.to_string(), + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.binary_checksum.to_string(), }); Ok(self @@ -36,22 +49,11 @@ impl ServerGateway { .await? .into_inner()) } - - pub(crate) async fn connect(target_url: Url, task_queue: String) -> Result { - let service = WorkflowServiceClient::connect(target_url.to_string()).await?; - Ok(Self { - service, - namespace: "".to_string(), - task_queue, - identity: "".to_string(), - binary_checksum: "".to_string(), - }) - } } #[async_trait::async_trait] impl WorkflowTaskProvider for ServerGateway { async fn get_work(&self, task_queue: &str) -> Result { - self.poll().await + self.poll(task_queue).await } } From ffb8cf0ca80dc1a5f3708dae03d1de51d2c43e16 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 1 Feb 2021 17:08:11 -0800 Subject: [PATCH 3/5] fmt --- src/machines/workflow_machines.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 06d49fcca..20a3522b7 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -6,15 +6,13 @@ use crate::{ }, protos::{ coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}, - temporal::{ - api::{ - command::v1::StartTimerCommandAttributes, - enums::v1::{CommandType, EventType}, - history::v1::{history_event, HistoryEvent}, - common::v1::WorkflowExecution - } - } - } + temporal::api::{ + command::v1::StartTimerCommandAttributes, + common::v1::WorkflowExecution, + enums::v1::{CommandType, EventType}, + history::v1::{history_event, HistoryEvent}, + }, + }, }; use futures::Future; use rustfsm::StateMachine; From 2f024240f862d7c2f3defdba97eb4a0ef06651b6 Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 1 Feb 2021 17:08:54 -0800 Subject: [PATCH 4/5] fmt --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 42af01e28..1eeac963a 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,7 @@ mod protosext; pub use protosext::HistoryInfo; -use crate::pollers::{ServerGatewayOptions}; +use crate::pollers::ServerGatewayOptions; use crate::{ machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, protos::{ From f2ba9d4eade9edcfd4819ca13279c28009bbe7ae Mon Sep 17 00:00:00 2001 From: Vitaly Date: Mon, 1 Feb 2021 17:15:06 -0800 Subject: [PATCH 5/5] lint --- src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lib.rs b/src/lib.rs index 1eeac963a..a4834ec71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -54,7 +54,7 @@ pub trait Core { pub struct CoreInitOptions { target_url: Url, namespace: String, - task_queue: Vec, + _task_queue: Vec, identity: String, binary_checksum: String, }