Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
Core SDK Architecture
===

## High level description

The below diagram depicts how future SDKs are split into two parts. The `sdk-core` common code, which is written in Rust, and a `sdk-lang` package specific to the language the user is writing their workflow/activity in. For example a user writing workflows in Rust would be pulling in (at least) two crates - `temporal-sdk-core` and `temporal-sdk-rust`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't give temporal-sdk-rust as an example. We probably will never have just sdk-rust as Rust workflows are going to use WebAssembly. I would use sdk-python as the example.


![Arch Diagram](https://lucid.app/publicSegments/view/7872bb33-d2b9-4b90-8aa1-bac111136aa5/image.png)

The `core` communicates with the Temporal service in the same way that existing SDKs today do, via gRPC. It's responsible for polling for tasks, processing those tasks according to our state machine logic, and then driving the language-specific code and shuttling events to it and commands back.

The `sdk-lang` side communicates with `sdk-core` via either C bindings, IPC, or (later) bindings to a WASM interface. Many languages already have nice support for calling into Rust code - generally speaking these implementations are using C bindings under the hood. For example, we use [neon](https://neon-bindings.com/) to support the TS/JS sdk, and we will likely use [PyO3](https://github.com/PyO3/pyo3) for Python. It is expected that such usages will layer another crate on top of `core` which brings in these language specific libraries to expose `core` to that language in an ergonomic manner. IPC will exist as a thin layer on top of the C bindings. Care should be taken here to avoid unnecessary copying and [de]serialization. Then `sdk-lang` is responsible for dispatching tasks to the appropriate user code (to whatever extent parts of this can be reasonably put in the core code, we desire that to make lang-specific SDKs as small as possible).

As a general note, the more we can push from `sdk-lang` into `sdk-core`, the easier our ecosystem is to maintain in the long run as we will have less semantically identical code.

### Glossary of terms

There are many concepts involved in the chain of communication from server->core->lang that all roughly mean "Do something". Unfortunately this leads to quite a lot of overloaded terminology in the code. This list should help to disambiguate:

* `HistoryEvent` (often referred to simply as an `Event`): These events come from the server and represent the history of the workflow. They are defined in the protobuf definitions for the Temporal service itself.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

* `Command`: These are the commands defined in the temporal service protobufs that are returned by clients upon completing a `WorkflowTask`. For example, starting a timer or an activity.
* `WorkflowTask`: These are how the server represents the need to run user workflow code (or the result of it executing). See the `HistoryEvent` proto definition for more.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In reality WorkflowTask is represented by PollWorkflowTaskQueueResponse

* `WfActivation` (said "WorkflowActivation"): These are produced by the Core SDK when the lang sdk needs to "activate" the user's workflow code, either running it from the beginning or resuming a cached workflow.
* `WfActivationJob` (shorthand: `Job`s): These are included in `WfActivation`s and represent the actual things that have happened since the last time the workflow was activated (if ever). EX: Firing a timer, proceeding with the result of an activity, etc. They are all ultimately derived from `HistoryEvent`s.
* `Task`: Returned from calls to `Core::poll_task`, these tasks represent work for the lang SDK to perform, which includes both workflow activations, and instructions to run activities (`ActivityTask`).
* `TaskCompletion`: Provided by the lang side when calling `Core::complete_task`. Can be the result of a `WfActivation` or an `ActivityTask`. In the case of workflow activations, the (successful) completion contains one or more `Command`s, which may be literal `Command`s as defined in the Temporal service protobufs, or may be `CoreCommand`s, which support things like query responses.

Additional clarifications that are internal to Core:
* `StateMachine`s also handle events and produce commands, which often map directly to the above `HistoryEvent`s and `Command`s, but are distinct types. The state machine library is Temporal agnostic - but all interactions with the machines pass through a `TemporalStateMachine` trait, which accepts `HistoryEvent`s, and produces `WorkflowTrigger`s.
* `WorkflowTrigger`: These allow the state machines to trigger things to happen to the workflow. Including pushing new `WfActivationJob`s, or otherwise advancing workflow state.


### Core SDK Responsibilities

- Communication with Temporal service using a generated gRPC client, which is wrapped with somewhat more ergonomic traits.
- Provide interface for language-specific SDK to drive event loop and handle returned commands. The lang sdk will continuously call/poll the core SDK to receive new `Task`s, which are either represent workflows being started or awoken (`WFActivation`) or activities to execute (`ActivityTask`). It will then call it's workflow/activity functions with the provided information as appropriate, and will then push completed tasks back into the core SDK.
- Advance state machines and report back to the temporal server as appropriate when handling events and commands

### Language Specific SDK Responsibilities

- Periodically poll Core SDK for tasks
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to emphasize that it is a long poll. I.e. poll blocks until a task is available. The "periodically poll" sound like a busy poll to me.

- Call workflow and activity functions as appropriate, using information in events it received from Core SDK
- Return results of workflows/activities to Core SDK
- Manage concurrency using language appropriate primitives. For example, it is up to the language side to decide how frequently to poll, and whether or not to execute worklows and activities in separate threads or coroutines, etc.

### Example Sequence Diagrams

Here we consider what the sequence of API calls would look like for a simple workflow executing a happy path. The hello-world workflow & activity in imaginary Rust (don't pay too much attention to the specifics, just an example) is below. It is meant to be like our most basic hello world samples.

```rust
#[workflow]
async fn hello_world_workflow(name: &str) -> Result<String, Error> {
info!("Hello world workflow started! Name: {}", name);
// Very much TBD how this would actually work in rust sdk. Many options here.
activity!(hello_activity(name), timeout: 2s).await
}

#[activity]
async fn hello_activity(name: &str) -> String {
format!("Hello {}!", name)
}
```

[![](https://mermaid.ink/img/eyJjb2RlIjoic2VxdWVuY2VEaWFncmFtXG4gICAgcGFydGljaXBhbnQgUyBhcyBUZW1wb3JhbCBTZXJ2aWNlXG4gICAgcGFydGljaXBhbnQgQyBhcyBDb3JlIFNES1xuICAgIHBhcnRpY2lwYW50IEwgYXMgTGFuZyBTREtcblxuICAgIEwgLT4-IEM6IEluaXRpYWxpemUgd29ya2VyXG4gICAgTCAtPj4gUzogZ3JwYzogU3RhcnRXb3JrZmxvd0V4ZWN1dGlvblxuXG4gICAgbG9vcCB3b3JrZmxvdyB0YXNrIHByb2Nlc3NpbmdcbiAgICBDIC0-PiBTOiBncnBjOiBQb2xsV29ya2Zsb3dUYXNrUXVldWVcbiAgICBTIC0tPj4gQzogVGFza3MgJiBoaXN0b3J5ICAgXG4gICAgQyAtPj4gQzogQXBwbHkgaGlzdG9yeSB0byBzdGF0ZSBtYWNoaW5lc1xuICAgIFxuICAgIGxvb3AgZXZlbnQgbG9vcFxuICAgIEwgLT4-IEM6IFBvbGwgZm9yIHNkayBldmVudHNcbiAgICBMIC0-PiBMOiBSdW4gd29ya2Zsb3csIHByb2R1Y2VzIGNvbW1hbmRzXG4gICAgTCAtLT4-IEM6IFdvcmtmbG93IEFjdGl2YXRpb24gZG9uZSB3LyBjb21tYW5kc1xuICAgIEMgLT4-IEM6IEFkdmFuY2Ugc3RhdGUgbWFjaGluZXNcbiAgICBlbmRcblxuICAgIEMgLT4-IFM6IGdycGM6IFJlc3BvbmRXb3JrZmxvd1Rhc2tDb21wbGV0ZWRcbiAgICBlbmRcblxuICAgIEwgLT4-IEM6IFNodXRkb3duXG4iLCJtZXJtYWlkIjp7InRoZW1lIjoiZGVmYXVsdCJ9LCJ1cGRhdGVFZGl0b3IiOmZhbHNlfQ)](https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoic2VxdWVuY2VEaWFncmFtXG4gICAgcGFydGljaXBhbnQgUyBhcyBUZW1wb3JhbCBTZXJ2aWNlXG4gICAgcGFydGljaXBhbnQgQyBhcyBDb3JlIFNES1xuICAgIHBhcnRpY2lwYW50IEwgYXMgTGFuZyBTREtcblxuICAgIEwgLT4-IEM6IEluaXRpYWxpemUgd29ya2VyXG4gICAgTCAtPj4gUzogZ3JwYzogU3RhcnRXb3JrZmxvd0V4ZWN1dGlvblxuXG4gICAgbG9vcCB3b3JrZmxvdyB0YXNrIHByb2Nlc3NpbmdcbiAgICBDIC0-PiBTOiBncnBjOiBQb2xsV29ya2Zsb3dUYXNrUXVldWVcbiAgICBTIC0tPj4gQzogVGFza3MgJiBoaXN0b3J5ICAgXG4gICAgQyAtPj4gQzogQXBwbHkgaGlzdG9yeSB0byBzdGF0ZSBtYWNoaW5lc1xuICAgIFxuICAgIGxvb3AgZXZlbnQgbG9vcFxuICAgIEwgLT4-IEM6IFBvbGwgZm9yIHNkayBldmVudHNcbiAgICBMIC0-PiBMOiBSdW4gd29ya2Zsb3csIHByb2R1Y2VzIGNvbW1hbmRzXG4gICAgTCAtLT4-IEM6IFdvcmtmbG93IEFjdGl2YXRpb24gZG9uZSB3LyBjb21tYW5kc1xuICAgIEMgLT4-IEM6IEFkdmFuY2Ugc3RhdGUgbWFjaGluZXNcbiAgICBlbmRcblxuICAgIEMgLT4-IFM6IGdycGM6IFJlc3BvbmRXb3JrZmxvd1Rhc2tDb21wbGV0ZWRcbiAgICBlbmRcblxuICAgIEwgLT4-IEM6IFNodXRkb3duXG4iLCJtZXJtYWlkIjp7InRoZW1lIjoiZGVmYXVsdCJ9LCJ1cGRhdGVFZGl0b3IiOmZhbHNlfQ)

## API Definition

We define the interface between the core and lang SDKs in terms of gRPC service definitions. The actual implementations of this "service" are not generated by gRPC generators, but the messages themselves are, and make it easier to hit the ground running in new languages.

See the latest API definition [here](https://github.com/temporalio/sdk-core/blob/master/protos/local/core_interface.proto)
4 changes: 2 additions & 2 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,10 @@ message ActivityTask {


// Sent from lang side to core when calling [crate::Core::complete_task]
message CompleteTaskReq {
message TaskCompletion {
// The id from the [Task] being completed
bytes task_token = 1;
oneof completion {
oneof variant {
// Complete a workflow task
WFActivationCompletion workflow = 2;
// Complete an activity task
Expand Down
22 changes: 11 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
machines::{InconvertibleCommandError, WFCommand},
protos::{
coresdk::{
complete_task_req::Completion, wf_activation_completion::Status, CompleteTaskReq, Task,
task_completion, wf_activation_completion::Status, Task, TaskCompletion,
WfActivationCompletion, WfActivationSuccess,
},
temporal::api::{
Expand Down Expand Up @@ -55,7 +55,7 @@ pub trait Core: Send + Sync {

/// Tell the core that some work has been completed - whether as a result of running workflow
/// code or executing an activity.
fn complete_task(&self, req: CompleteTaskReq) -> Result<()>;
fn complete_task(&self, req: TaskCompletion) -> Result<()>;

/// Returns an instance of ServerGateway.
fn server_gateway(&self) -> Result<Arc<dyn ServerGatewayApis>>;
Expand Down Expand Up @@ -158,12 +158,12 @@ where
}

#[instrument(skip(self))]
fn complete_task(&self, req: CompleteTaskReq) -> Result<()> {
fn complete_task(&self, req: TaskCompletion) -> Result<()> {
match req {
CompleteTaskReq {
TaskCompletion {
task_token,
completion:
Some(Completion::Workflow(WfActivationCompletion {
variant:
Some(task_completion::Variant::Workflow(WfActivationCompletion {
status: Some(wfstatus),
})),
} => {
Expand All @@ -187,8 +187,8 @@ where
}
Ok(())
}
CompleteTaskReq {
completion: Some(Completion::Activity(_)),
TaskCompletion {
variant: Some(task_completion::Variant::Activity(_)),
..
} => {
unimplemented!()
Expand Down Expand Up @@ -259,7 +259,7 @@ pub enum CoreError {
/// Poll response from server was malformed: {0:?}
BadDataFromWorkProvider(PollWorkflowTaskQueueResponse),
/// Lang SDK sent us a malformed completion: {0:?}
MalformedCompletion(CompleteTaskReq),
MalformedCompletion(TaskCompletion),
/// Error buffering commands
CantSendCommands(#[from] SendError<Vec<WFCommand>>),
/// Couldn't interpret command from <lang>
Expand Down Expand Up @@ -388,7 +388,7 @@ mod test {
assert!(core.workflow_machines.get(run_id).is_some());

let task_tok = res.task_token;
core.complete_task(CompleteTaskReq::ok_from_api_attrs(
core.complete_task(TaskCompletion::ok_from_api_attrs(
StartTimerCommandAttributes {
timer_id: timer_id.to_string(),
..Default::default()
Expand All @@ -408,7 +408,7 @@ mod test {
}]
);
let task_tok = res.task_token;
core.complete_task(CompleteTaskReq::ok_from_api_attrs(
core.complete_task(TaskCompletion::ok_from_api_attrs(
CompleteWorkflowExecutionCommandAttributes { result: None }.into(),
task_tok,
))
Expand Down
7 changes: 3 additions & 4 deletions src/protos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub mod coresdk {
include!("coresdk.rs");
use super::temporal::api::command::v1 as api_command;
use super::temporal::api::command::v1::Command as ApiCommand;
use crate::protos::coresdk::complete_task_req::Completion;
use crate::protos::coresdk::wf_activation_job::Attributes;
use command::Variant;

Expand Down Expand Up @@ -57,17 +56,17 @@ pub mod coresdk {
}
}

impl CompleteTaskReq {
impl TaskCompletion {
/// Build a successful completion from some api command attributes and a task token
pub fn ok_from_api_attrs(
cmd: api_command::command::Attributes,
task_token: Vec<u8>,
) -> Self {
let cmd: ApiCommand = cmd.into();
let success: WfActivationSuccess = vec![cmd].into();
CompleteTaskReq {
TaskCompletion {
task_token,
completion: Some(Completion::Workflow(WfActivationCompletion {
variant: Some(task_completion::Variant::Workflow(WfActivationCompletion {
status: Some(wf_activation_completion::Status::Successful(success)),
})),
}
Expand Down
6 changes: 3 additions & 3 deletions tests/integ_tests/poller_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use rand::{self, Rng};
use std::{convert::TryFrom, env, time::Duration};
use temporal_sdk_core::{
protos::{
coresdk::CompleteTaskReq,
coresdk::TaskCompletion,
temporal::api::command::v1::{
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
},
Expand Down Expand Up @@ -43,7 +43,7 @@ fn timer_workflow() {
let run_id = dbg!(create_workflow(&core, &workflow_id.to_string()));
let timer_id: String = rng.gen::<u32>().to_string();
let task = dbg!(core.poll_task(TASK_QUEUE).unwrap());
core.complete_task(CompleteTaskReq::ok_from_api_attrs(
core.complete_task(TaskCompletion::ok_from_api_attrs(
StartTimerCommandAttributes {
timer_id: timer_id.to_string(),
start_to_fire_timeout: Some(Duration::from_secs(1).into()),
Expand All @@ -55,7 +55,7 @@ fn timer_workflow() {
.unwrap();
dbg!("sent completion w/ start timer");
let task = dbg!(core.poll_task(TASK_QUEUE).unwrap());
core.complete_task(CompleteTaskReq::ok_from_api_attrs(
core.complete_task(TaskCompletion::ok_from_api_attrs(
CompleteWorkflowExecutionCommandAttributes { result: None }.into(),
task.task_token,
))
Expand Down