Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ mod protosext;

pub use protosext::HistoryInfo;

use crate::pollers::ServerGatewayOptions;
use crate::{
machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines},
protos::{
Expand All @@ -35,6 +36,7 @@ use std::{
convert::TryInto,
sync::mpsc::{self, Receiver, SendError, Sender},
};
use tokio::runtime::Runtime;
use url::Url;

pub type Result<T, E = CoreError> = std::result::Result<T, E>;
Expand All @@ -50,22 +52,35 @@ pub trait Core {
}

pub struct CoreInitOptions {
temporal_server: Url,
_queue_name: String,
target_url: Url,
namespace: String,
_task_queue: Vec<String>,
identity: String,
binary_checksum: String,
}

pub fn init(_opts: CoreInitOptions) -> Result<Box<dyn Core>> {
/// Initializes instance of the core sdk and establishes connection to the temporal server.
/// Creates tokio runtime that will be used for all client-server interactions.
pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
let runtime = Runtime::new().map_err(CoreError::TokioInitError)?;
let gateway_opts = ServerGatewayOptions {
namespace: opts.namespace,
identity: opts.identity,
binary_checksum: opts.binary_checksum,
};
// Initialize server client
let work_provider = unimplemented!();
let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?;

Ok(Box::new(CoreSDK {
Ok(CoreSDK {
runtime,
work_provider,
workflow_machines: Default::default(),
workflow_task_tokens: Default::default(),
}))
})
}

pub struct CoreSDK<WP> {
runtime: Runtime,
/// Provides work in the form of responses the server would send from polling task Qs
work_provider: WP,
/// Key is run id
Expand All @@ -76,12 +91,14 @@ pub struct CoreSDK<WP> {

impl<WP> Core for CoreSDK<WP>
where
WP: WorkProvider,
WP: WorkflowTaskProvider,
{
#[instrument(skip(self))]
fn poll_task(&self) -> Result<Task, CoreError> {
// This will block forever in the event there is no work from the server
let work = self.work_provider.get_work("TODO: Real task queue")?;
let work = self
.runtime
.block_on(self.work_provider.get_work("TODO: Real task queue"))?;
let run_id = match &work.workflow_execution {
Some(we) => {
self.instantiate_workflow_if_needed(we);
Expand Down Expand Up @@ -152,7 +169,7 @@ where

impl<WP> CoreSDK<WP>
where
WP: WorkProvider,
WP: WorkflowTaskProvider,
{
fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) {
if self
Expand Down Expand Up @@ -197,9 +214,10 @@ where
/// Implementors can provide new work to the SDK. The connection to the server is the real
/// implementor.
#[cfg_attr(test, mockall::automock)]
pub trait WorkProvider {
#[async_trait::async_trait]
pub trait WorkflowTaskProvider {
/// Fetch new work. Should block indefinitely if there is no work.
fn get_work(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse>;
async fn get_work(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse>;
}

/// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService]
Expand Down Expand Up @@ -272,6 +290,12 @@ pub enum CoreError {
UnderlyingHistError(#[from] HistoryInfoError),
#[error("Task token had nothing associated with it: {0:?}")]
NothingFoundForTaskToken(Vec<u8>),
#[error("Error calling the service: {0:?}")]
TonicError(#[from] tonic::Status),
#[error("Server connection error: {0:?}")]
TonicTransportError(#[from] tonic::transport::Error),
#[error("Failed to initialize tokio runtime: {0:?}")]
TokioInitError(std::io::Error),
}

#[cfg(test)]
Expand Down Expand Up @@ -350,12 +374,14 @@ mod test {
let responses = vec![first_response, second_response];

let mut tasks = VecDeque::from(responses);
let mut mock_provider = MockWorkProvider::new();
let mut mock_provider = MockWorkflowTaskProvider::new();
mock_provider
.expect_get_work()
.returning(move |_| Ok(tasks.pop_front().unwrap()));

let runtime = Runtime::new().unwrap();
let core = CoreSDK {
runtime,
work_provider: mock_provider,
workflow_machines: DashMap::new(),
workflow_task_tokens: DashMap::new(),
Expand Down
16 changes: 7 additions & 9 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@ use crate::{
},
protos::{
coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation},
temporal::{
api::{
command::v1::StartTimerCommandAttributes,
enums::v1::{CommandType, EventType},
history::v1::{history_event, HistoryEvent},
common::v1::WorkflowExecution
}
}
}
temporal::api::{
command::v1::StartTimerCommandAttributes,
common::v1::WorkflowExecution,
enums::v1::{CommandType, EventType},
history::v1::{history_event, HistoryEvent},
},
},
};
use futures::Future;
use rustfsm::StateMachine;
Expand Down
60 changes: 55 additions & 5 deletions src/pollers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,59 @@
mod poll_task;
#[allow(unused)]
mod workflow_poll_task;
use crate::protos::temporal::api::enums::v1::TaskQueueKind;
use crate::protos::temporal::api::taskqueue::v1::TaskQueue;
use crate::protos::temporal::api::workflowservice::v1::workflow_service_client::WorkflowServiceClient;
use crate::protos::temporal::api::workflowservice::v1::{
PollWorkflowTaskQueueRequest, PollWorkflowTaskQueueResponse,
};
use crate::Result;
use crate::WorkflowTaskProvider;
use url::Url;

#[derive(Clone)]
pub(crate) struct ServerGatewayOptions {
pub namespace: String,
pub identity: String,
pub binary_checksum: String,
}

impl ServerGatewayOptions {
pub(crate) async fn connect(&self, target_url: Url) -> Result<ServerGateway> {
let service = WorkflowServiceClient::connect(target_url.to_string()).await?;
Ok(ServerGateway {
service,
opts: self.clone(),
})
}
}
/// Provides
pub(crate) struct ServerWorkProvider {
pub(crate) struct ServerGateway {
service: WorkflowServiceClient<tonic::transport::Channel>,
opts: ServerGatewayOptions,
}

}
impl ServerGateway {
async fn poll(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse> {
let request = tonic::Request::new(PollWorkflowTaskQueueRequest {
namespace: self.opts.namespace.to_string(),
task_queue: Some(TaskQueue {
name: task_queue.to_string(),
kind: TaskQueueKind::Unspecified as i32,
}),
identity: self.opts.identity.to_string(),
binary_checksum: self.opts.binary_checksum.to_string(),
});

Ok(self
.service
.clone()
.poll_workflow_task_queue(request)
.await?
.into_inner())
}
}

#[async_trait::async_trait]
impl WorkflowTaskProvider for ServerGateway {
async fn get_work(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse> {
self.poll(task_queue).await
}
}
6 changes: 0 additions & 6 deletions src/pollers/poll_task.rs

This file was deleted.

40 changes: 0 additions & 40 deletions src/pollers/workflow_poll_task.rs

This file was deleted.