diff --git a/src/lib.rs b/src/lib.rs index 095a3a2c4..a4834ec71 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,6 +11,7 @@ mod protosext; pub use protosext::HistoryInfo; +use crate::pollers::ServerGatewayOptions; use crate::{ machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, protos::{ @@ -35,6 +36,7 @@ use std::{ convert::TryInto, sync::mpsc::{self, Receiver, SendError, Sender}, }; +use tokio::runtime::Runtime; use url::Url; pub type Result = std::result::Result; @@ -50,22 +52,35 @@ pub trait Core { } pub struct CoreInitOptions { - temporal_server: Url, - _queue_name: String, + target_url: Url, + namespace: String, + _task_queue: Vec, + identity: String, + binary_checksum: String, } -pub fn init(_opts: CoreInitOptions) -> Result> { +/// Initializes instance of the core sdk and establishes connection to the temporal server. +/// Creates tokio runtime that will be used for all client-server interactions. +pub fn init(opts: CoreInitOptions) -> Result { + let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; + let gateway_opts = ServerGatewayOptions { + namespace: opts.namespace, + identity: opts.identity, + binary_checksum: opts.binary_checksum, + }; // Initialize server client - let work_provider = unimplemented!(); + let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; - Ok(Box::new(CoreSDK { + Ok(CoreSDK { + runtime, work_provider, workflow_machines: Default::default(), workflow_task_tokens: Default::default(), - })) + }) } pub struct CoreSDK { + runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs work_provider: WP, /// Key is run id @@ -76,12 +91,14 @@ pub struct CoreSDK { impl Core for CoreSDK where - WP: WorkProvider, + WP: WorkflowTaskProvider, { #[instrument(skip(self))] fn poll_task(&self) -> Result { // This will block forever in the event there is no work from the server - let work = self.work_provider.get_work("TODO: Real task queue")?; + let work = self + .runtime + .block_on(self.work_provider.get_work("TODO: Real task queue"))?; let run_id = match &work.workflow_execution { Some(we) => { self.instantiate_workflow_if_needed(we); @@ -152,7 +169,7 @@ where impl CoreSDK where - WP: WorkProvider, + WP: WorkflowTaskProvider, { fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) { if self @@ -197,9 +214,10 @@ where /// Implementors can provide new work to the SDK. The connection to the server is the real /// implementor. #[cfg_attr(test, mockall::automock)] -pub trait WorkProvider { +#[async_trait::async_trait] +pub trait WorkflowTaskProvider { /// Fetch new work. Should block indefinitely if there is no work. - fn get_work(&self, task_queue: &str) -> Result; + async fn get_work(&self, task_queue: &str) -> Result; } /// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService] @@ -272,6 +290,12 @@ pub enum CoreError { UnderlyingHistError(#[from] HistoryInfoError), #[error("Task token had nothing associated with it: {0:?}")] NothingFoundForTaskToken(Vec), + #[error("Error calling the service: {0:?}")] + TonicError(#[from] tonic::Status), + #[error("Server connection error: {0:?}")] + TonicTransportError(#[from] tonic::transport::Error), + #[error("Failed to initialize tokio runtime: {0:?}")] + TokioInitError(std::io::Error), } #[cfg(test)] @@ -350,12 +374,14 @@ mod test { let responses = vec![first_response, second_response]; let mut tasks = VecDeque::from(responses); - let mut mock_provider = MockWorkProvider::new(); + let mut mock_provider = MockWorkflowTaskProvider::new(); mock_provider .expect_get_work() .returning(move |_| Ok(tasks.pop_front().unwrap())); + let runtime = Runtime::new().unwrap(); let core = CoreSDK { + runtime, work_provider: mock_provider, workflow_machines: DashMap::new(), workflow_task_tokens: DashMap::new(), diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index 06d49fcca..20a3522b7 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -6,15 +6,13 @@ use crate::{ }, protos::{ coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation}, - temporal::{ - api::{ - command::v1::StartTimerCommandAttributes, - enums::v1::{CommandType, EventType}, - history::v1::{history_event, HistoryEvent}, - common::v1::WorkflowExecution - } - } - } + temporal::api::{ + command::v1::StartTimerCommandAttributes, + common::v1::WorkflowExecution, + enums::v1::{CommandType, EventType}, + history::v1::{history_event, HistoryEvent}, + }, + }, }; use futures::Future; use rustfsm::StateMachine; diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index f7c886369..9cfbc03b4 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -1,9 +1,59 @@ -mod poll_task; -#[allow(unused)] -mod workflow_poll_task; +use crate::protos::temporal::api::enums::v1::TaskQueueKind; +use crate::protos::temporal::api::taskqueue::v1::TaskQueue; +use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient; +use crate::protos::temporal::api::workflowservice::v1::{ + PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse, +}; +use crate::Result; +use crate::WorkflowTaskProvider; +use url::Url; +#[derive(Clone)] +pub(crate) struct ServerGatewayOptions { + pub namespace: String, + pub identity: String, + pub binary_checksum: String, +} +impl ServerGatewayOptions { + pub(crate) async fn connect(&self, target_url: Url) -> Result { + let service = WorkflowServiceClient::connect(target_url.to_string()).await?; + Ok(ServerGateway { + service, + opts: self.clone(), + }) + } +} /// Provides -pub(crate) struct ServerWorkProvider { +pub(crate) struct ServerGateway { + service: WorkflowServiceClient, + opts: ServerGatewayOptions, +} -} \ No newline at end of file +impl ServerGateway { + async fn poll(&self, task_queue: &str) -> Result { + let request = tonic::Request::new(PollWorkflowTaskQueueRequest { + namespace: self.opts.namespace.to_string(), + task_queue: Some(TaskQueue { + name: task_queue.to_string(), + kind: TaskQueueKind::Unspecified as i32, + }), + identity: self.opts.identity.to_string(), + binary_checksum: self.opts.binary_checksum.to_string(), + }); + + Ok(self + .service + .clone() + .poll_workflow_task_queue(request) + .await? + .into_inner()) + } +} + +#[async_trait::async_trait] +impl WorkflowTaskProvider for ServerGateway { + async fn get_work(&self, task_queue: &str) -> Result { + self.poll(task_queue).await + } +} diff --git a/src/pollers/poll_task.rs b/src/pollers/poll_task.rs deleted file mode 100644 index 898acc81c..000000000 --- a/src/pollers/poll_task.rs +++ /dev/null @@ -1,6 +0,0 @@ -pub type Result = std::result::Result; - -#[async_trait::async_trait] -pub trait PollTask { - async fn poll(&mut self) -> Result; -} diff --git a/src/pollers/workflow_poll_task.rs b/src/pollers/workflow_poll_task.rs deleted file mode 100644 index 1745b03ac..000000000 --- a/src/pollers/workflow_poll_task.rs +++ /dev/null @@ -1,40 +0,0 @@ -use crate::{ - pollers::poll_task::PollTask, - pollers::poll_task::Result, - protos::temporal::api::{ - enums::v1::TaskQueueKind, taskqueue::v1::TaskQueue, - workflowservice::v1::workflow_service_client::WorkflowServiceClient, - workflowservice::v1::PollWorkflowTaskQueueRequest, - workflowservice::v1::PollWorkflowTaskQueueResponse, - }, -}; -use tonic::{codegen::Future, Response, Status}; - -struct WorkflowPollTask<'a> { - service: &'a mut WorkflowServiceClient, - namespace: String, - task_queue: String, - identity: String, - binary_checksum: String, -} - -#[async_trait::async_trait] -impl PollTask for WorkflowPollTask<'_> { - async fn poll(&mut self) -> Result { - let request = tonic::Request::new(PollWorkflowTaskQueueRequest { - namespace: self.namespace.clone(), - task_queue: Some(TaskQueue { - name: self.task_queue.clone(), - kind: TaskQueueKind::Unspecified as i32, - }), - identity: self.identity.clone(), - binary_checksum: self.binary_checksum.clone(), - }); - - Ok(self - .service - .poll_workflow_task_queue(request) - .await? - .into_inner()) - } -}