diff --git a/Cargo.toml b/Cargo.toml index 7c07e9756..6eb064d9c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ anyhow = "1.0" async-trait = "0.1" dashmap = "4.0" derive_more = "0.99" +displaydoc = "0.1" env_logger = "0.8" futures = "0.3" log = "0.4" diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 920e1f008..ab31048d0 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -16,24 +16,44 @@ import "temporal/api/failure/v1/message.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/command/v1/message.proto"; -service Core { - rpc PollTask (PollTaskReq) returns (Task) {} - rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {} -} - +// A request as given to [crate::Core::poll_task] message PollTaskReq { + // If true, poll for workflow tasks bool workflows = 1; + // If true, poll for activity tasks bool activities = 2; } +// An instruction to perform work from core->lang sdk message Task { + // A unique identifier for this task bytes task_token = 1; + // The type of task to be performed oneof variant { + // Wake up a workflow WFActivation workflow = 2; + // Run an activity ActivityTask activity = 3; } } +// An instruction to the lang sdk to run some workflow code, whether for the first time or from +// a cached state +message WFActivation { + // Time the activation was requested + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; + // The id of the currently active run of the workflow + string run_id = 2; + oneof attributes { + // TODO could literally be attributes from events? -- maybe we don't need our own types + + // Begin a workflow for the first time + StartWorkflowTaskAttributes start_workflow = 3; + // A timer has fired, allowing whatever was waiting on it + TimerFiredTaskAttributes unblock_timer = 4; + } +} + message StartWorkflowTaskAttributes { // The identifier the lang-specific sdk uses to execute workflow code string workflow_type = 1; @@ -43,35 +63,27 @@ message StartWorkflowTaskAttributes { temporal.api.common.v1.Payloads arguments = 3; // TODO: Do we need namespace here, or should that just be fetchable easily? - - // will be others - workflow exe started attrs, etc + // will be others - workflow exe started attrs, etc } -// maybe we just go back to timer fired to keep consistent -message UnblockTimerTaskAttributes { +message TimerFiredTaskAttributes { string timer_id = 1; } -message WFActivation { - google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; - string run_id = 2; - oneof attributes { - // could literally be attributes from events -- maybe we don't need our own types - StartWorkflowTaskAttributes start_workflow = 3; - UnblockTimerTaskAttributes unblock_timer = 4; - } -} - message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; } +// Sent from lang side to core when calling [crate::Core::complete_task] message CompleteTaskReq { + // The id from the [Task] being completed bytes task_token = 1; oneof completion { + // Complete a workflow task WFActivationCompletion workflow = 2; + // Complete an activity task ActivityTaskCompletion activity = 3; } } @@ -94,6 +106,7 @@ message CoreCommand { // Reserved for specific commands } +// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next message Command { oneof variant { temporal.api.command.v1.Command api = 1; @@ -102,13 +115,16 @@ message Command { } message WFActivationSuccess { + // A list of commands to send back to the temporal server repeated Command commands = 1; + // Other bits from RespondWorkflowTaskCompletedRequest as needed } message WFActivationFailure { temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; temporal.api.failure.v1.Failure failure = 2; + // Other bits from RespondWorkflowTaskFailedRequest as needed } diff --git a/src/lib.rs b/src/lib.rs index 1fcf135b5..ce918d93b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,16 +1,20 @@ +#![warn(missing_docs)] // error if there are missing docs + +//! This crate provides a basis for creating new Temporal SDKs without completely starting from +//! scratch + #[macro_use] extern crate tracing; #[cfg(test)] #[macro_use] extern crate assert_matches; +pub mod protos; + mod machines; mod pollers; -pub mod protos; mod protosext; -pub use protosext::HistoryInfo; - use crate::{ machines::{ ActivationListener, DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines, @@ -30,7 +34,7 @@ use crate::{ workflowservice::v1::PollWorkflowTaskQueueResponse, }, }, - protosext::HistoryInfoError, + protosext::{HistoryInfo, HistoryInfoError}, }; use anyhow::Error; use dashmap::DashMap; @@ -41,34 +45,51 @@ use std::{ use tokio::runtime::Runtime; use url::Url; +/// A result alias having [CoreError] as the error type pub type Result = std::result::Result; -// TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have -// any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can -// be accessed from many threads, but each workflow is pinned to one thread (possibly with many -// sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should -// be both, ideally. +/// This trait is the primary way by which language specific SDKs interact with the core SDK. It is +/// expected that only one instance of an implementation will exist for the lifetime of the +/// worker(s) using it. pub trait Core { + /// Ask the core for some work, returning a [Task], which will eventually contain either a + /// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the + /// language SDK's responsibility to call the appropriate code with the provided inputs. + /// + /// TODO: Examples fn poll_task(&self) -> Result; + + /// Tell the core that some work has been completed - whether as a result of running workflow + /// code or executing an activity. fn complete_task(&self, req: CompleteTaskReq) -> Result<()>; } +/// Holds various configuration information required to call [init] pub struct CoreInitOptions { - target_url: Url, - namespace: String, - _task_queue: Vec, - identity: String, - binary_checksum: String, + /// The URL of the Temporal server to connect to + pub target_url: Url, + + /// The namespace on the server your worker will be using + pub namespace: String, + + /// A human-readable string that can identify your worker + /// + /// TODO: Probably belongs in future worker abstraction + pub identity: String, + + /// A string that should be unique to the exact worker code/binary being executed + pub worker_binary_id: String, } -/// Initializes instance of the core sdk and establishes connection to the temporal server. -/// Creates tokio runtime that will be used for all client-server interactions. +/// Initializes an instance of the core sdk and establishes a connection to the temporal server. +/// +/// Note: Also creates tokio runtime that will be used for all client-server interactions. pub fn init(opts: CoreInitOptions) -> Result { let runtime = Runtime::new().map_err(CoreError::TokioInitError)?; let gateway_opts = ServerGatewayOptions { namespace: opts.namespace, identity: opts.identity, - binary_checksum: opts.binary_checksum, + worker_binary_id: opts.worker_binary_id, }; // Initialize server client let work_provider = runtime.block_on(gateway_opts.connect(opts.target_url))?; @@ -81,7 +102,7 @@ pub fn init(opts: CoreInitOptions) -> Result { }) } -pub struct CoreSDK { +struct CoreSDK { runtime: Runtime, /// Provides work in the form of responses the server would send from polling task Qs work_provider: WP, @@ -226,7 +247,7 @@ pub trait WorkflowTaskProvider { /// expects to be polled by the lang sdk. This struct acts as the bridge between the two, buffering /// output from calls to [DrivenWorkflow] and offering them to [CoreSDKService] #[derive(Debug)] -pub struct WorkflowBridge { +pub(crate) struct WorkflowBridge { // does wf id belong in here? started_attrs: Option, incoming_commands: Receiver>, @@ -276,30 +297,32 @@ impl DrivenWorkflow for WorkflowBridge { // Real bridge doesn't actually need to listen impl ActivationListener for WorkflowBridge {} -#[derive(thiserror::Error, Debug)] +/// The error type returned by interactions with [Core] +#[derive(thiserror::Error, Debug, displaydoc::Display)] #[allow(clippy::large_enum_variant)] +// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum CoreError { - #[error("Unknown service error")] + /// Unknown service error Unknown, - #[error("No tasks to perform for now")] + /// No tasks to perform for now NoWork, - #[error("Poll response from server was malformed: {0:?}")] + /// Poll response from server was malformed: {0:?} BadDataFromWorkProvider(PollWorkflowTaskQueueResponse), - #[error("Lang SDK sent us a malformed completion: {0:?}")] + /// Lang SDK sent us a malformed completion: {0:?} MalformedCompletion(CompleteTaskReq), - #[error("Error buffering commands")] + /// Error buffering commands CantSendCommands(#[from] SendError>), - #[error("Couldn't interpret command from ")] + /// Couldn't interpret command from UninterprableCommand(#[from] InconvertibleCommandError), - #[error("Underlying error in history processing")] + /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), - #[error("Task token had nothing associated with it: {0:?}")] + /// Task token had nothing associated with it: {0:?} NothingFoundForTaskToken(Vec), - #[error("Error calling the service: {0:?}")] + /// Error calling the service: {0:?} TonicError(#[from] tonic::Status), - #[error("Server connection error: {0:?}")] + /// Server connection error: {0:?} TonicTransportError(#[from] tonic::transport::Error), - #[error("Failed to initialize tokio runtime: {0:?}")] + /// Failed to initialize tokio runtime: {0:?} TokioInitError(std::io::Error), } diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index d6dd8fe05..4cfcd45b9 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -2,7 +2,7 @@ use super::Result; use crate::{ machines::{ActivationListener, DrivenWorkflow, WFCommand}, protos::{ - coresdk::{wf_activation::Attributes, UnblockTimerTaskAttributes}, + coresdk::{wf_activation::Attributes, TimerFiredTaskAttributes}, temporal::api::{ command::v1::StartTimerCommandAttributes, history::v1::{ @@ -59,7 +59,7 @@ where impl ActivationListener for TestWorkflowDriver { fn on_activation(&mut self, activation: &Attributes) { - if let Attributes::UnblockTimer(UnblockTimerTaskAttributes { timer_id }) = activation { + if let Attributes::UnblockTimer(TimerFiredTaskAttributes { timer_id }) = activation { Arc::get_mut(&mut self.cache) .unwrap() .unblocked_timers diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index e29006fb1..6d60687fa 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -1,14 +1,12 @@ #![allow(clippy::large_enum_variant)] -use crate::machines::workflow_machines::WorkflowTrigger; -use crate::protos::coresdk::{wf_activation, UnblockTimerTaskAttributes, WfActivation}; use crate::{ machines::{ - workflow_machines::WFMachinesError, workflow_machines::WorkflowMachines, AddCommand, - CancellableCommand, WFCommand, WFMachinesAdapter, + workflow_machines::{WFMachinesError, WorkflowMachines, WorkflowTrigger}, + AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter, }, protos::{ - coresdk::HistoryEventId, + coresdk::{wf_activation, HistoryEventId, TimerFiredTaskAttributes, WfActivation}, temporal::api::{ command::v1::{ command::Attributes, CancelTimerCommandAttributes, Command, @@ -224,7 +222,7 @@ impl WFMachinesAdapter for TimerMachine { ) -> Result, WFMachinesError> { match my_command { // Fire the completion - TimerMachineCommand::Complete(_event) => Ok(vec![UnblockTimerTaskAttributes { + TimerMachineCommand::Complete(_event) => Ok(vec![TimerFiredTaskAttributes { timer_id: self.shared_state.timer_attributes.timer_id.clone(), } .into()]), diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 9cfbc03b4..d26b5d9d5 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -12,7 +12,7 @@ use url::Url; pub(crate) struct ServerGatewayOptions { pub namespace: String, pub identity: String, - pub binary_checksum: String, + pub worker_binary_id: String, } impl ServerGatewayOptions { @@ -39,7 +39,7 @@ impl ServerGateway { kind: TaskQueueKind::Unspecified as i32, }), identity: self.opts.identity.to_string(), - binary_checksum: self.opts.binary_checksum.to_string(), + binary_checksum: self.opts.worker_binary_id.to_string(), }); Ok(self diff --git a/src/protos/mod.rs b/src/protos/mod.rs index f7e1231e5..1592e9bf9 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -1,5 +1,13 @@ +//! Contains the protobuf definitions used as arguments to and return values from interactions with +//! [super::Core]. Language SDK authors can generate structs using the proto definitions that will match +//! the generated structs in this module. + #[allow(clippy::large_enum_variant)] +// I'd prefer not to do this, but there are some generated things that just don't need it. +#[allow(missing_docs)] pub mod coresdk { + //! Contains all protobufs relating to communication between core and lang-specific SDKs + include!("coresdk.rs"); use super::temporal::api::command::v1 as api_command; use super::temporal::api::command::v1::Command as ApiCommand; @@ -50,6 +58,7 @@ pub mod coresdk { // No need to lint these #[allow(clippy::all)] +#[allow(missing_docs)] // This is disgusting, but unclear to me how to avoid it. TODO: Discuss w/ prost maintainer pub mod temporal { pub mod api { diff --git a/src/protosext/history_info.rs b/src/protosext/history_info.rs index 23524fb55..4ef36537b 100644 --- a/src/protosext/history_info.rs +++ b/src/protosext/history_info.rs @@ -1,9 +1,11 @@ -use crate::machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::temporal::api::enums::v1::EventType; -use crate::protos::temporal::api::history::v1::{History, HistoryEvent}; +use crate::{ + machines::{WFMachinesError, WorkflowMachines}, + protos::temporal::api::enums::v1::EventType, + protos::temporal::api::history::v1::{History, HistoryEvent}, +}; #[derive(Clone, Debug, derive_more::Constructor, PartialEq)] -pub struct HistoryInfo { +pub(crate) struct HistoryInfo { pub previous_started_event_id: i64, pub workflow_task_started_event_id: i64, pub events: Vec, @@ -25,6 +27,7 @@ pub enum HistoryInfoError { #[error("Underlying error in workflow machine")] UnderlyingMachineError(#[from] WFMachinesError), } + impl HistoryInfo { /// Constructs a new instance, retaining only enough events to reach the provided workflow /// task number. If not provided, all events are retained. diff --git a/src/protosext/mod.rs b/src/protosext/mod.rs index f55576e31..91d15b0d8 100644 --- a/src/protosext/mod.rs +++ b/src/protosext/mod.rs @@ -1,3 +1,3 @@ mod history_info; -pub use history_info::HistoryInfo; +pub(crate) use history_info::HistoryInfo; pub(crate) use history_info::HistoryInfoError;