Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ anyhow = "1.0"
async-trait = "0.1"
dashmap = "4.0"
derive_more = "0.99"
displaydoc = "0.1"
env_logger = "0.8"
futures = "0.3"
log = "0.4"
Expand Down
54 changes: 35 additions & 19 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,24 +16,44 @@ import "temporal/api/failure/v1/message.proto";
import "temporal/api/common/v1/message.proto";
import "temporal/api/command/v1/message.proto";

service Core {
rpc PollTask (PollTaskReq) returns (Task) {}
rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {}
}

// A request as given to [crate::Core::poll_task]
message PollTaskReq {
// If true, poll for workflow tasks
bool workflows = 1;
// If true, poll for activity tasks
bool activities = 2;
}

// An instruction to perform work from core->lang sdk
message Task {
// A unique identifier for this task
bytes task_token = 1;
// The type of task to be performed
oneof variant {
// Wake up a workflow
WFActivation workflow = 2;
// Run an activity
ActivityTask activity = 3;
}
}

// An instruction to the lang sdk to run some workflow code, whether for the first time or from
// a cached state
message WFActivation {
// Time the activation was requested
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true];
// The id of the currently active run of the workflow
string run_id = 2;
oneof attributes {
// TODO could literally be attributes from events? -- maybe we don't need our own types

// Begin a workflow for the first time
StartWorkflowTaskAttributes start_workflow = 3;
// A timer has fired, allowing whatever was waiting on it
TimerFiredTaskAttributes unblock_timer = 4;
}
}

message StartWorkflowTaskAttributes {
// The identifier the lang-specific sdk uses to execute workflow code
string workflow_type = 1;
Expand All @@ -43,35 +63,27 @@ message StartWorkflowTaskAttributes {
temporal.api.common.v1.Payloads arguments = 3;

// TODO: Do we need namespace here, or should that just be fetchable easily?

// will be others - workflow exe started attrs, etc
// will be others - workflow exe started attrs, etc
}

// maybe we just go back to timer fired to keep consistent
message UnblockTimerTaskAttributes {
message TimerFiredTaskAttributes {
string timer_id = 1;
}

message WFActivation {
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true];
string run_id = 2;
oneof attributes {
// could literally be attributes from events -- maybe we don't need our own types
StartWorkflowTaskAttributes start_workflow = 3;
UnblockTimerTaskAttributes unblock_timer = 4;
}
}

message ActivityTask {
// Original task from temporal service
temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1;
}


// Sent from lang side to core when calling [crate::Core::complete_task]
message CompleteTaskReq {
// The id from the [Task] being completed
bytes task_token = 1;
oneof completion {
// Complete a workflow task
WFActivationCompletion workflow = 2;
// Complete an activity task
ActivityTaskCompletion activity = 3;
}
}
Expand All @@ -94,6 +106,7 @@ message CoreCommand {
// Reserved for specific commands
}

// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next
message Command {
oneof variant {
temporal.api.command.v1.Command api = 1;
Expand All @@ -102,13 +115,16 @@ message Command {
}

message WFActivationSuccess {
// A list of commands to send back to the temporal server
repeated Command commands = 1;

// Other bits from RespondWorkflowTaskCompletedRequest as needed
}

message WFActivationFailure {
temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1;
temporal.api.failure.v1.Failure failure = 2;

// Other bits from RespondWorkflowTaskFailedRequest as needed
}

Expand Down
85 changes: 54 additions & 31 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
#![warn(missing_docs)] // error if there are missing docs

//! This crate provides a basis for creating new Temporal SDKs without completely starting from
//! scratch

#[macro_use]
extern crate tracing;
#[cfg(test)]
#[macro_use]
extern crate assert_matches;

pub mod protos;

mod machines;
mod pollers;
pub mod protos;
mod protosext;

pub use protosext::HistoryInfo;

use crate::{
machines::{
ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines,
Expand All @@ -30,7 +34,7 @@ use crate::{
workflowservice::v1::PollWorkflowTaskQueueResponse,
},
},
protosext::HistoryInfoError,
protosext::{HistoryInfo, HistoryInfoError},
};
use anyhow::Error;
use dashmap::DashMap;
Expand All @@ -41,34 +45,51 @@ use std::{
use tokio::runtime::Runtime;
use url::Url;

/// A result alias having [CoreError] as the error type
pub type Result<T, E = CoreError> = std::result::Result<T, E>;

// TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have
// any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can
// be accessed from many threads, but each workflow is pinned to one thread (possibly with many
// sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should
// be both, ideally.
/// This trait is the primary way by which language specific SDKs interact with the core SDK. It is
/// expected that only one instance of an implementation will exist for the lifetime of the
/// worker(s) using it.
pub trait Core {
/// Ask the core for some work, returning a [Task], which will eventually contain either a
/// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the
/// language SDK's responsibility to call the appropriate code with the provided inputs.
///
/// TODO: Examples
fn poll_task(&self) -> Result<Task>;

/// Tell the core that some work has been completed - whether as a result of running workflow
/// code or executing an activity.
fn complete_task(&self, req: CompleteTaskReq) -> Result<()>;
}

/// Holds various configuration information required to call [init]
pub struct CoreInitOptions {
target_url: Url,
namespace: String,
_task_queue: Vec<String>,
identity: String,
binary_checksum: String,
/// The URL of the Temporal server to connect to
pub target_url: Url,

/// The namespace on the server your worker will be using
pub namespace: String,

/// A human-readable string that can identify your worker
///
/// TODO: Probably belongs in future worker abstraction
pub identity: String,

/// A string that should be unique to the exact worker code/binary being executed
pub worker_binary_id: String,
}

/// Initializes instance of the core sdk and establishes connection to the temporal server.
/// Creates tokio runtime that will be used for all client-server interactions.
/// Initializes an instance of the core sdk and establishes a connection to the temporal server.
///
/// Note: Also creates tokio runtime that will be used for all client-server interactions.
pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
let runtime = Runtime::new().map_err(CoreError::TokioInitError)?;
let gateway_opts = ServerGatewayOptions {
namespace: opts.namespace,
identity: opts.identity,
binary_checksum: opts.binary_checksum,
worker_binary_id: opts.worker_binary_id,
};
// Initialize server client
let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?;
Expand All @@ -81,7 +102,7 @@ pub fn init(opts: CoreInitOptions) -> Result<impl Core> {
})
}

pub struct CoreSDK<WP> {
struct CoreSDK<WP> {
runtime: Runtime,
/// Provides work in the form of responses the server would send from polling task Qs
work_provider: WP,
Expand Down Expand Up @@ -226,7 +247,7 @@ pub trait WorkflowTaskProvider {
/// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering
/// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService]
#[derive(Debug)]
pub struct WorkflowBridge {
pub(crate) struct WorkflowBridge {
// does wf id belong in here?
started_attrs: Option<WorkflowExecutionStartedEventAttributes>,
incoming_commands: Receiver<Vec<WFCommand>>,
Expand Down Expand Up @@ -276,30 +297,32 @@ impl DrivenWorkflow for WorkflowBridge {
// Real bridge doesn't actually need to listen
impl ActivationListener for WorkflowBridge {}

#[derive(thiserror::Error, Debug)]
/// The error type returned by interactions with [Core]
#[derive(thiserror::Error, Debug, displaydoc::Display)]
#[allow(clippy::large_enum_variant)]
// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc
pub enum CoreError {
#[error("Unknown service error")]
/// Unknown service error
Unknown,
#[error("No tasks to perform for now")]
/// No tasks to perform for now
NoWork,
#[error("Poll response from server was malformed: {0:?}")]
/// Poll response from server was malformed: {0:?}
BadDataFromWorkProvider(PollWorkflowTaskQueueResponse),
#[error("Lang SDK sent us a malformed completion: {0:?}")]
/// Lang SDK sent us a malformed completion: {0:?}
MalformedCompletion(CompleteTaskReq),
#[error("Error buffering commands")]
/// Error buffering commands
CantSendCommands(#[from] SendError<Vec<WFCommand>>),
#[error("Couldn't interpret command from <lang>")]
/// Couldn't interpret command from <lang>
UninterprableCommand(#[from] InconvertibleCommandError),
#[error("Underlying error in history processing")]
/// Underlying error in history processing
UnderlyingHistError(#[from] HistoryInfoError),
#[error("Task token had nothing associated with it: {0:?}")]
/// Task token had nothing associated with it: {0:?}
NothingFoundForTaskToken(Vec<u8>),
#[error("Error calling the service: {0:?}")]
/// Error calling the service: {0:?}
TonicError(#[from] tonic::Status),
#[error("Server connection error: {0:?}")]
/// Server connection error: {0:?}
TonicTransportError(#[from] tonic::transport::Error),
#[error("Failed to initialize tokio runtime: {0:?}")]
/// Failed to initialize tokio runtime: {0:?}
TokioInitError(std::io::Error),
}

Expand Down
4 changes: 2 additions & 2 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::Result;
use crate::{
machines::{ActivationListener, DrivenWorkflow, WFCommand},
protos::{
coresdk::{wf_activation::Attributes, UnblockTimerTaskAttributes},
coresdk::{wf_activation::Attributes, TimerFiredTaskAttributes},
temporal::api::{
command::v1::StartTimerCommandAttributes,
history::v1::{
Expand Down Expand Up @@ -59,7 +59,7 @@ where

impl<F> ActivationListener for TestWorkflowDriver<F> {
fn on_activation(&mut self, activation: &Attributes) {
if let Attributes::UnblockTimer(UnblockTimerTaskAttributes { timer_id }) = activation {
if let Attributes::UnblockTimer(TimerFiredTaskAttributes { timer_id }) = activation {
Arc::get_mut(&mut self.cache)
.unwrap()
.unblocked_timers
Expand Down
10 changes: 4 additions & 6 deletions src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
#![allow(clippy::large_enum_variant)]

use crate::machines::workflow_machines::WorkflowTrigger;
use crate::protos::coresdk::{wf_activation, UnblockTimerTaskAttributes, WfActivation};
use crate::{
machines::{
workflow_machines::WFMachinesError, workflow_machines::WorkflowMachines, AddCommand,
CancellableCommand, WFCommand, WFMachinesAdapter,
workflow_machines::{WFMachinesError, WorkflowMachines, WorkflowTrigger},
AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter,
},
protos::{
coresdk::HistoryEventId,
coresdk::{wf_activation, HistoryEventId, TimerFiredTaskAttributes, WfActivation},
temporal::api::{
command::v1::{
command::Attributes, CancelTimerCommandAttributes, Command,
Expand Down Expand Up @@ -224,7 +222,7 @@ impl WFMachinesAdapter for TimerMachine {
) -> Result<Vec<WorkflowTrigger>, WFMachinesError> {
match my_command {
// Fire the completion
TimerMachineCommand::Complete(_event) => Ok(vec![UnblockTimerTaskAttributes {
TimerMachineCommand::Complete(_event) => Ok(vec![TimerFiredTaskAttributes {
timer_id: self.shared_state.timer_attributes.timer_id.clone(),
}
.into()]),
Expand Down
4 changes: 2 additions & 2 deletions src/pollers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use url::Url;
pub(crate) struct ServerGatewayOptions {
pub namespace: String,
pub identity: String,
pub binary_checksum: String,
pub worker_binary_id: String,
}

impl ServerGatewayOptions {
Expand All @@ -39,7 +39,7 @@ impl ServerGateway {
kind: TaskQueueKind::Unspecified as i32,
}),
identity: self.opts.identity.to_string(),
binary_checksum: self.opts.binary_checksum.to_string(),
binary_checksum: self.opts.worker_binary_id.to_string(),
});

Ok(self
Expand Down
9 changes: 9 additions & 0 deletions src/protos/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
//! Contains the protobuf definitions used as arguments to and return values from interactions with
//! [super::Core]. Language SDK authors can generate structs using the proto definitions that will match
//! the generated structs in this module.

#[allow(clippy::large_enum_variant)]
// I'd prefer not to do this, but there are some generated things that just don't need it.
#[allow(missing_docs)]
pub mod coresdk {
//! Contains all protobufs relating to communication between core and lang-specific SDKs

include!("coresdk.rs");
use super::temporal::api::command::v1 as api_command;
use super::temporal::api::command::v1::Command as ApiCommand;
Expand Down Expand Up @@ -50,6 +58,7 @@ pub mod coresdk {

// No need to lint these
#[allow(clippy::all)]
#[allow(missing_docs)]
// This is disgusting, but unclear to me how to avoid it. TODO: Discuss w/ prost maintainer
pub mod temporal {
pub mod api {
Expand Down
11 changes: 7 additions & 4 deletions src/protosext/history_info.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::machines::{WFMachinesError, WorkflowMachines};
use crate::protos::temporal::api::enums::v1::EventType;
use crate::protos::temporal::api::history::v1::{History, HistoryEvent};
use crate::{
machines::{WFMachinesError, WorkflowMachines},
protos::temporal::api::enums::v1::EventType,
protos::temporal::api::history::v1::{History, HistoryEvent},
};

#[derive(Clone, Debug, derive_more::Constructor, PartialEq)]
pub struct HistoryInfo {
pub(crate) struct HistoryInfo {
pub previous_started_event_id: i64,
pub workflow_task_started_event_id: i64,
pub events: Vec<HistoryEvent>,
Expand All @@ -25,6 +27,7 @@ pub enum HistoryInfoError {
#[error("Underlying error in workflow machine")]
UnderlyingMachineError(#[from] WFMachinesError),
}

impl HistoryInfo {
/// Constructs a new instance, retaining only enough events to reach the provided workflow
/// task number. If not provided, all events are retained.
Expand Down
2 changes: 1 addition & 1 deletion src/protosext/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
mod history_info;
pub use history_info::HistoryInfo;
pub(crate) use history_info::HistoryInfo;
pub(crate) use history_info::HistoryInfoError;