Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,14 @@ steps:
- docker-compose#v3.0.0:
run: unit-test
config: .buildkite/docker/docker-compose.yaml
- label: "integ-test"
agents:
queue: "default"
docker: "*"
command: "cargo test --test integ_tests"
timeout_in_minutes: 15
plugins:
- docker-compose#v3.0.0:
run: unit-test
config: .buildkite/docker/docker-compose.yaml
- wait
17 changes: 15 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,17 @@ prost = "0.7"
prost-types = "0.7"
thiserror = "1.0"
tokio = { version = "1.1", features = ["rt", "rt-multi-thread"] }
tonic = "0.4"
tracing = { version = "0.1", features = ["log"] }
tracing-opentelemetry = "0.10"
tracing-opentelemetry = "0.11"
tracing-subscriber = "0.2"
url = "2.2"
rand = "0.8.3"

[dependencies.tonic]
version = "0.4"
#path = "../tonic/tonic"
# Using our fork for now which fixes grpc-timeout header getting stripped
git = "https://github.com/temporalio/tonic"

[dependencies.rustfsm]
path = "fsm"
Expand All @@ -40,3 +46,10 @@ tonic-build = "0.4"

[workspace]
members = [".", "fsm"]

[[test]]
name = "integ_tests"
path = "tests/main.rs"
# Prevents autodiscovery, and hence these getting run with `cargo test`. Run with
# `cargo test --test integ_tests`
test = false
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
Core SDK that can be used as a base for all other SDKs.
[![Build status](https://badge.buildkite.com/c23f47f4a827f04daece909963bd3a248496f0cdbabfbecee4.svg)](https://buildkite.com/temporal/core-sdk?branch=master)

Core SDK that can be used as a base for all other Temporal SDKs.

# Getting started
This repo uses a submodule for upstream protobuf files. The path `protos/api_upstream` is a
Expand Down
123 changes: 94 additions & 29 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,28 @@
//! This crate provides a basis for creating new Temporal SDKs without completely starting from
//! scratch

#[cfg(test)]
#[macro_use]
pub extern crate assert_matches;
#[macro_use]
extern crate tracing;
#[cfg(test)]
#[macro_use]
extern crate assert_matches;

pub mod protos;
extern crate mockall;

mod machines;
mod pollers;
pub mod protos;
mod protosext;

pub use pollers::{ServerGateway, ServerGatewayOptions};
pub use url::Url;

use crate::{
machines::{
ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines,
ActivationListener, DrivenWorkflow, InconvertibleCommandError, ProtoCommand, WFCommand,
WorkflowMachines,
},
pollers::ServerGatewayOptions,
protos::{
coresdk::{
complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task,
Expand All @@ -31,7 +36,9 @@ use crate::{
WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes,
WorkflowExecutionStartedEventAttributes,
},
workflowservice::v1::PollWorkflowTaskQueueResponse,
workflowservice::v1::{
PollWorkflowTaskQueueResponse, RespondWorkflowTaskCompletedResponse,
},
},
},
protosext::{HistoryInfo, HistoryInfoError},
Expand All @@ -41,12 +48,14 @@ use dashmap::DashMap;
use std::{
convert::TryInto,
sync::mpsc::{self, Receiver, SendError, Sender},
time::Duration,
};
use tokio::runtime::Runtime;
use url::Url;
use tonic::codegen::http::uri::InvalidUri;

/// A result alias having [CoreError] as the error type
pub type Result<T, E = CoreError> = std::result::Result<T, E>;
const DEFAULT_LONG_POLL_TIMEOUT: u64 = 60;

/// This trait is the primary way by which language specific SDKs interact with the core SDK. It is
/// expected that only one instance of an implementation will exist for the lifetime of the
Expand All @@ -57,7 +66,7 @@ pub trait Core {
/// language SDK's responsibility to call the appropriate code with the provided inputs.
///
/// TODO: Examples
fn poll_task(&self) -> Result<Task>;
fn poll_task(&self, task_queue: &str) -> Result<Task>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it mean that we are not supporting core polling on multiple queues and language SDK being single threaded?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be possible, that's the whole reason why we pass it as a parameter as opposed to a field during initialization.


/// Tell the core that some work has been completed - whether as a result of running workflow
/// code or executing an activity.
Expand All @@ -79,33 +88,48 @@ pub struct CoreInitOptions {

/// A string that should be unique to the exact worker code/binary being executed
pub worker_binary_id: String,

/// Optional tokio runtime
pub runtime: Option<Runtime>,
}

/// Initializes an instance of the core sdk and establishes a connection to the temporal server.
///
/// Note: Also creates tokio runtime that will be used for all client-server interactions.
pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
let runtime = Runtime::new().map_err(CoreError::TokioInitError)?;
let runtime = opts
.runtime
.map(Ok)
.unwrap_or_else(|| Runtime::new().map_err(CoreError::TokioInitError))?;
let gateway_opts = ServerGatewayOptions {
namespace: opts.namespace,
identity: opts.identity,
worker_binary_id: opts.worker_binary_id,
long_poll_timeout: Duration::from_secs(DEFAULT_LONG_POLL_TIMEOUT),
};
// Initialize server client
let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?;

Ok(CoreSDK {
runtime,
work_provider,
server_gateway: work_provider,
workflow_machines: Default::default(),
workflow_task_tokens: Default::default(),
})
}

/// Type of task queue to poll.
pub enum TaskQueue {
/// Workflow task
Workflow(String),
/// Activity task
_Activity(String),
}

struct CoreSDK<WP> {
runtime: Runtime,
/// Provides work in the form of responses the server would send from polling task Qs
work_provider: WP,
server_gateway: WP,
/// Key is run id
workflow_machines: DashMap<String, (WorkflowMachines, Sender<Vec<WFCommand>>)>,
/// Maps task tokens to workflow run ids
Expand All @@ -114,14 +138,14 @@ struct CoreSDK<WP> {

impl<WP> Core for CoreSDK<WP>
where
WP: WorkflowTaskProvider,
WP: PollWorkflowTaskQueueApi + RespondWorkflowTaskCompletedApi,
{
#[instrument(skip(self))]
fn poll_task(&self) -> Result<Task, CoreError> {
fn poll_task(&self, task_queue: &str) -> Result<Task, CoreError> {
// This will block forever in the event there is no work from the server
let work = self
.runtime
.block_on(self.work_provider.get_work("TODO: Real task queue"))?;
.block_on(self.server_gateway.poll(task_queue))?;
let run_id = match &work.workflow_execution {
Some(we) => {
self.instantiate_workflow_if_needed(we);
Expand Down Expand Up @@ -167,13 +191,20 @@ where
status: Some(wfstatus),
})),
} => {
let wf_run_id = self
let run_id = self
.workflow_task_tokens
.get(&task_token)
.map(|x| x.value().clone())
.ok_or(CoreError::NothingFoundForTaskToken(task_token))?;
.ok_or(CoreError::NothingFoundForTaskToken(task_token.clone()))?;
match wfstatus {
Status::Successful(success) => self.push_lang_commands(&wf_run_id, success)?,
Status::Successful(success) => {
self.push_lang_commands(&run_id, success)?;
if let Some(mut machines) = self.workflow_machines.get_mut(&run_id) {
let commands = machines.0.get_commands();
self.runtime
.block_on(self.server_gateway.complete(task_token, commands))?;
}
}
Status::Failed(_) => {}
}
Ok(())
Expand All @@ -186,13 +217,12 @@ where
}
_ => Err(CoreError::MalformedCompletion(req)),
}
// TODO: Get fsm commands and send them to server (get_commands)
}
}

impl<WP> CoreSDK<WP>
where
WP: WorkflowTaskProvider,
WP: PollWorkflowTaskQueueApi,
{
fn instantiate_workflow_if_needed(&self, workflow_execution: &WorkflowExecution) {
if self
Expand Down Expand Up @@ -230,6 +260,11 @@ where
.unwrap()
.1
.send(cmds)?;
self.workflow_machines
.get_mut(run_id)
.unwrap()
.0
.event_loop();
Ok(())
}
}
Expand All @@ -238,9 +273,22 @@ where
/// implementor.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait WorkflowTaskProvider {
pub trait PollWorkflowTaskQueueApi {
/// Fetch new work. Should block indefinitely if there is no work.
async fn get_work(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse>;
async fn poll(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse>;
}

/// Implementors can provide new work to the SDK. The connection to the server is the real
/// implementor.
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub trait RespondWorkflowTaskCompletedApi {
/// Fetch new work. Should block indefinitely if there is no work.
async fn complete(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The above comment doesn't make sense anymore.

&self,
task_token: Vec<u8>,
commands: Vec<ProtoCommand>,
) -> Result<RespondWorkflowTaskCompletedResponse>;
}

/// The [DrivenWorkflow] trait expects to be called to make progress, but the [CoreSDKService]
Expand Down Expand Up @@ -324,11 +372,16 @@ pub enum CoreError {
TonicTransportError(#[from] tonic::transport::Error),
/// Failed to initialize tokio runtime: {0:?}
TokioInitError(std::io::Error),
/// Invalid URI: {0:?}
InvalidUri(#[from] InvalidUri),
}

#[cfg(test)]
mod test {
use super::*;
use std::collections::VecDeque;

use tracing::Level;

use crate::{
machines::test_help::TestHistoryBuilder,
protos::{
Expand All @@ -342,9 +395,20 @@ mod test {
},
},
};
use std::collections::VecDeque;
use tracing::Level;

mock! {
ServerGateway {}
#[async_trait::async_trait]
impl PollWorkflowTaskQueueApi for ServerGateway {
async fn poll(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse>;
}
#[async_trait::async_trait]
impl RespondWorkflowTaskCompletedApi for ServerGateway {
async fn complete(&self, task_token: Vec<u8>, commands: Vec<ProtoCommand>) -> Result<RespondWorkflowTaskCompletedResponse>;
}
}

use super::*;
#[test]
fn workflow_bridge() {
let s = span!(Level::DEBUG, "Test start");
Expand All @@ -353,6 +417,7 @@ mod test {
let wfid = "fake_wf_id";
let run_id = "fake_run_id";
let timer_id = "fake_timer";
let task_queue = "test-task-queue";

let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
Expand Down Expand Up @@ -401,20 +466,20 @@ mod test {
let responses = vec![first_response, second_response];

let mut tasks = VecDeque::from(responses);
let mut mock_provider = MockWorkflowTaskProvider::new();
let mut mock_provider = MockServerGateway::new();
mock_provider
.expect_get_work()
.expect_poll()
.returning(move |_| Ok(tasks.pop_front().unwrap()));

let runtime = Runtime::new().unwrap();
let core = CoreSDK {
runtime,
work_provider: mock_provider,
server_gateway: mock_provider,
workflow_machines: DashMap::new(),
workflow_task_tokens: DashMap::new(),
};

let res = dbg!(core.poll_task().unwrap());
let res = dbg!(core.poll_task(task_queue).unwrap());
// TODO: uggo
assert_matches!(
res.get_wf_jobs().as_slice(),
Expand All @@ -436,7 +501,7 @@ mod test {
.unwrap();
dbg!("sent completion w/ start timer");

let res = dbg!(core.poll_task().unwrap());
let res = dbg!(core.poll_task(task_queue).unwrap());
// TODO: uggo
assert_matches!(
res.get_wf_jobs().as_slice(),
Expand Down
2 changes: 1 addition & 1 deletion src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ impl WorkflowMachines {
.expect("We have just ensured this is populated")
}

fn event_loop(&mut self) -> Result<()> {
pub fn event_loop(&mut self) -> Result<()> {
let results = self.drive_me.iterate_wf()?;
self.handle_driven_results(results);

Expand Down
Loading