diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index e84b65cad..7a20b9a56 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -9,6 +9,18 @@ steps: - docker-compose#v3.0.0: run: unit-test config: .buildkite/docker/docker-compose.yaml + - label: "doc" + agents: + queue: "default" + docker: "*" + command: "cargo doc --workspace --all-features --no-deps" + timeout_in_minutes: 15 + plugins: + - docker-compose#v3.0.0: + run: unit-test + config: .buildkite/docker/docker-compose.yaml + env: + - RUSTDOCFLAGS=-Dwarnings - label: "lint" agents: queue: "default" diff --git a/Cargo.toml b/Cargo.toml index 4d79882bf..a03e50e26 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ base64 = "0.13" crossbeam = "0.8" dashmap = "4.0" derive_more = "0.99" -displaydoc = "0.2" futures = "0.3" itertools = "0.10" once_cell = "1.5" diff --git a/protos/local/workflow_completion.proto b/protos/local/workflow_completion.proto index 9c1e636f3..4fbc9beec 100644 --- a/protos/local/workflow_completion.proto +++ b/protos/local/workflow_completion.proto @@ -7,7 +7,7 @@ import "workflow_commands.proto"; /// Result of a single workflow activation, reported from lang to core message WFActivationCompletion { - // The token from the [WfActivation] you are completing + // The token from the workflow activation you are completing bytes task_token = 1; oneof status { Success successful = 2; diff --git a/src/errors.rs b/src/errors.rs new file mode 100644 index 000000000..d4100e2e1 --- /dev/null +++ b/src/errors.rs @@ -0,0 +1,145 @@ +use crate::{ + protos::coresdk::activity_result::ActivityResult, + protos::coresdk::workflow_completion::WfActivationCompletion, + protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse, + workflow::WorkflowError, +}; +use tonic::codegen::http::uri::InvalidUri; + +pub(crate) struct ShutdownErr; +pub(crate) struct WorkflowUpdateError { + /// Underlying workflow error + pub source: WorkflowError, + /// The run id of the erring workflow + pub run_id: String, +} + +/// Errors thrown during initialization of [crate::Core] +#[derive(thiserror::Error, Debug)] +pub enum CoreInitError { + /// Invalid URI. Configuration error, fatal. + #[error("Invalid URI: {0:?}")] + InvalidUri(#[from] InvalidUri), + /// Server connection error. Crashing and restarting the worker is likely best. + #[error("Server connection error: {0:?}")] + TonicTransportError(#[from] tonic::transport::Error), +} + +/// Errors thrown by [crate::Core::poll_workflow_task] +#[derive(thiserror::Error, Debug)] +pub enum PollWfError { + /// There was an error specific to a workflow instance. The cached workflow should be deleted + /// from lang side. + #[error("There was an error with the workflow instance with run id ({run_id}): {source:?}")] + WorkflowUpdateError { + /// Underlying workflow error + source: WorkflowError, + /// The run id of the erring workflow + run_id: String, + }, + /// The server returned a malformed polling response. Either we aren't handling a valid form, + /// or the server is bugging out. Likely fatal. + #[error("Poll workflow response from server was malformed: {0:?}")] + BadPollResponseFromServer(PollWorkflowTaskQueueResponse), + /// [crate::Core::shutdown] was called, and there are no more replay tasks to be handled. Lang + /// must call [crate::Core::complete_workflow_task] for any remaining tasks, and then may + /// exit. + #[error("Core is shut down and there are no more workflow replay tasks")] + ShutDown, + /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal + /// errors, so lang should consider this fatal. + #[error("Unhandled error when calling the temporal server: {0:?}")] + TonicError(#[from] tonic::Status), +} + +impl From for PollWfError { + fn from(e: WorkflowUpdateError) -> Self { + Self::WorkflowUpdateError { + source: e.source, + run_id: e.run_id, + } + } +} + +impl From for PollWfError { + fn from(_: ShutdownErr) -> Self { + Self::ShutDown + } +} + +/// Errors thrown by [crate::Core::poll_activity_task] +#[derive(thiserror::Error, Debug)] +pub enum PollActivityError { + /// [crate::Core::shutdown] was called, we will no longer fetch new activity tasks. Lang must + /// ensure it is finished with any workflow replay, see [PollWfError::ShutDown] + #[error("Core is shut down")] + ShutDown, + /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal + /// errors, so lang should consider this fatal. + #[error("Unhandled error when calling the temporal server: {0:?}")] + TonicError(#[from] tonic::Status), +} + +impl From for PollActivityError { + fn from(_: ShutdownErr) -> Self { + Self::ShutDown + } +} + +/// Errors thrown by [crate::Core::complete_workflow_task] +#[derive(thiserror::Error, Debug)] +#[allow(clippy::large_enum_variant)] +pub enum CompleteWfError { + /// Lang SDK sent us a malformed workflow completion. This likely means a bug in the lang sdk. + #[error("Lang SDK sent us a malformed workflow completion ({reason}): {completion:?}")] + MalformedWorkflowCompletion { + /// Reason the completion was malformed + reason: String, + /// The completion, which may not be included to avoid unnecessary copies. + completion: Option, + }, + /// There was an error specific to a workflow instance. The cached workflow should be deleted + /// from lang side. + #[error("There was an error with the workflow instance with run id ({run_id}): {source:?}")] + WorkflowUpdateError { + /// Underlying workflow error + source: WorkflowError, + /// The run id of the erring workflow + run_id: String, + }, + /// There exists a pending command in this workflow's history which has not yet been handled. + /// When thrown from [crate::Core::complete_workflow_task], it means you should poll for a new + /// task, receive a new task token, and complete that new task. + #[error("Unhandled command when completing workflow activation")] + UnhandledCommandWhenCompleting, + /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal + /// errors, so lang should consider this fatal. + #[error("Unhandled error when calling the temporal server: {0:?}")] + TonicError(#[from] tonic::Status), +} + +impl From for CompleteWfError { + fn from(e: WorkflowUpdateError) -> Self { + Self::WorkflowUpdateError { + source: e.source, + run_id: e.run_id, + } + } +} + +/// Errors thrown by [crate::Core::complete_activity_task] +#[derive(thiserror::Error, Debug)] +pub enum CompleteActivityError { + /// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk. + #[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")] + MalformedActivityCompletion { + /// Reason the completion was malformed + reason: String, + /// The completion, which may not be included to avoid unnecessary copies. + completion: Option, + }, + /// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal + /// errors, so lang should consider this fatal. + #[error("Unhandled error when calling the temporal server: {0:?}")] + TonicError(#[from] tonic::Status), +} diff --git a/src/lib.rs b/src/lib.rs index 558ae5b16..0575f5d81 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,6 +13,7 @@ extern crate tracing; pub mod protos; pub(crate) mod core_tracing; +mod errors; mod machines; mod pending_activations; mod pollers; @@ -22,19 +23,23 @@ mod workflow; #[cfg(test)] mod test_help; +pub use crate::errors::{ + CompleteActivityError, CompleteWfError, CoreInitError, PollActivityError, PollWfError, +}; pub use core_tracing::tracing_init; pub use pollers::{PollTaskRequest, ServerGateway, ServerGatewayApis, ServerGatewayOptions}; pub use url::Url; use crate::{ + errors::{ShutdownErr, WorkflowUpdateError}, machines::{EmptyWorkflowCommandErr, ProtoCommand, WFCommand}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ - activity_result::{self as ar, activity_result, ActivityResult}, + activity_result::{self as ar, activity_result}, activity_task::ActivityTask, workflow_activation::WfActivation, - workflow_completion::{wf_activation_completion, WfActivationCompletion}, + workflow_completion::{self, wf_activation_completion, WfActivationCompletion}, ActivityHeartbeat, ActivityTaskCompletion, }, temporal::api::{ @@ -54,12 +59,8 @@ use std::{ }, }; use tokio::sync::Notify; -use tonic::codegen::http::uri::InvalidUri; use tracing::Span; -/// A result alias having [CoreError] as the error type -pub type Result = std::result::Result; - /// This trait is the primary way by which language specific SDKs interact with the core SDK. It is /// expected that only one instance of an implementation will exist for the lifetime of the /// worker(s) using it. @@ -67,35 +68,43 @@ pub type Result = std::result::Result; pub trait Core: Send + Sync { /// Ask the core for some work, returning a [WfActivation]. It is then the language SDK's /// responsibility to call the appropriate workflow code with the provided inputs. Blocks - /// indefinitely until such work is available or [shutdown] is called. + /// indefinitely until such work is available or [Core::shutdown] is called. /// /// TODO: Examples - async fn poll_workflow_task(&self, task_queue: &str) -> Result; + async fn poll_workflow_task(&self, task_queue: &str) -> Result; /// Ask the core for some work, returning an [ActivityTask]. It is then the language SDK's /// responsibility to call the appropriate activity code with the provided inputs. Blocks - /// indefinitely until such work is available or [shutdown] is called. + /// indefinitely until such work is available or [Core::shutdown] is called. /// /// TODO: Examples - async fn poll_activity_task(&self, task_queue: &str) -> Result; + async fn poll_activity_task(&self, task_queue: &str) + -> Result; /// Tell the core that a workflow activation has completed - async fn complete_workflow_task(&self, completion: WfActivationCompletion) -> Result<()>; + async fn complete_workflow_task( + &self, + completion: WfActivationCompletion, + ) -> Result<(), CompleteWfError>; /// Tell the core that an activity has finished executing - async fn complete_activity_task(&self, completion: ActivityTaskCompletion) -> Result<()>; + async fn complete_activity_task( + &self, + completion: ActivityTaskCompletion, + ) -> Result<(), CompleteActivityError>; /// Indicate that a long running activity is still making progress - async fn send_activity_heartbeat(&self, task_token: ActivityHeartbeat) -> Result<()>; + async fn send_activity_heartbeat(&self, task_token: ActivityHeartbeat) -> Result<(), ()>; /// Returns core's instance of the [ServerGatewayApis] implementor it is using. fn server_gateway(&self) -> Arc; - /// Eventually ceases all polling of the server. [Core::poll_task] should be called until it - /// returns [CoreError::ShuttingDown] to ensure that any workflows which are still undergoing - /// replay have an opportunity to finish. This means that the lang sdk will need to call - /// [Core::complete_task] for those workflows until they are done. At that point, the lang - /// SDK can end the process, or drop the [Core] instance, which will close the connection. + /// Eventually ceases all polling of the server. [Core::poll_workflow_task] should be called + /// until it returns [PollWfError::ShutDown] to ensure that any workflows which are still + /// undergoing replay have an opportunity to finish. This means that the lang sdk will need to + /// call [Core::complete_workflow_task] for those workflows until they are done. At that point, + /// the lang SDK can end the process, or drop the [Core] instance, which will close the + /// connection. fn shutdown(&self); } @@ -152,7 +161,7 @@ macro_rules! abort_on_shutdown { let poll_result_future = $self.server_gateway.$gateway_fn($poll_arg); tokio::select! { _ = shutdownfut => { - Err(CoreError::ShuttingDown) + Err(ShutdownErr.into()) } r = poll_result_future => r.map_err(Into::into) } @@ -164,69 +173,47 @@ impl Core for CoreSDK where WP: ServerGatewayApis + Send + Sync + 'static, { - #[instrument(skip(self), fields(pending_activation))] - async fn poll_workflow_task(&self, task_queue: &str) -> Result { + #[instrument(skip(self))] + async fn poll_workflow_task(&self, task_queue: &str) -> Result { // The poll needs to be in a loop because we can't guarantee tail call optimization in Rust // (simply) and we really, really need that for long-poll retries. loop { // We must first check if there are pending workflow tasks for workflows that are // currently replaying, and issue those tasks before bothering the server. - if let Some(pa) = self.pending_activations.pop() { - Span::current().record("pending_activation", &format!("{}", &pa).as_str()); - - if let Some(mut next_activation) = - self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? - { - // TODO: What do do about task token - let task_token = pa.task_token.clone(); - if next_activation.more_activations_needed { - self.pending_activations.push(pa); - } - next_activation.activation.task_token = task_token; - return Ok(next_activation.activation); - } + if let Some(pa) = self + .pending_activations + .pop() + .and_then(|p| self.prepare_pending_activation(p).transpose()) + { + return pa; } if self.shutdown_requested.load(Ordering::SeqCst) { - return Err(CoreError::ShuttingDown); + return Err(PollWfError::ShutDown); } match abort_on_shutdown!(self, poll_workflow_task, task_queue.to_owned()) { Ok(work) => { - if work == PollWorkflowTaskQueueResponse::default() { - // We get the default proto in the event that the long poll times out. - continue; - } - let task_token = work.task_token.clone(); - debug!( - task_token = %fmt_task_token(&task_token), - "Received workflow task from server" - ); - - let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; - - if let Some(mut na) = next_activation { - if na.more_activations_needed { - self.pending_activations.push(PendingActivation { - run_id, - task_token: task_token.clone(), - }); - } - - // TODO: This sucks - na.activation.task_token = task_token; - return Ok(na.activation); + if let Some(activation) = self.prepare_new_activation(work)? { + return Ok(activation); } } // Drain pending activations in case of shutdown. - Err(CoreError::ShuttingDown) => continue, + Err(PollWfError::ShutDown) => continue, Err(e) => return Err(e), } } } #[instrument(skip(self))] - async fn poll_activity_task(&self, task_queue: &str) -> Result { + async fn poll_activity_task( + &self, + task_queue: &str, + ) -> Result { + if self.shutdown_requested.load(Ordering::SeqCst) { + return Err(PollActivityError::ShutDown); + } + match abort_on_shutdown!(self, poll_activity_task, task_queue.to_owned()) { Ok(work) => { let task_token = work.task_token.clone(); @@ -237,14 +224,17 @@ where } #[instrument(skip(self))] - async fn complete_workflow_task(&self, completion: WfActivationCompletion) -> Result<()> { + async fn complete_workflow_task( + &self, + completion: WfActivationCompletion, + ) -> Result<(), CompleteWfError> { let task_token = completion.task_token; let wfstatus = completion.status; let run_id = self .workflow_task_tokens .get(&task_token) .map(|x| x.value().clone()) - .ok_or_else(|| CoreError::MalformedWorkflowCompletion { + .ok_or_else(|| CompleteWfError::MalformedWorkflowCompletion { reason: format!( "Task token {} had no workflow run associated with it", fmt_task_token(&task_token) @@ -253,51 +243,15 @@ where })?; match wfstatus { Some(wf_activation_completion::Status::Successful(success)) => { - // Convert to wf commands - let cmds = success - .commands - .into_iter() - .map(|c| c.try_into()) - .collect::, EmptyWorkflowCommandErr>>() - .map_err(|_| CoreError::MalformedWorkflowCompletion { - reason: "At least one workflow command in the completion \ - contained an empty variant" - .to_owned(), - completion: None, - })?; - let commands = self.push_lang_commands(&run_id, cmds)?; - // We only actually want to send commands back to the server if there are - // no more pending activations -- in other words the lang SDK has caught - // up on replay. - if !self.pending_activations.has_pending(&run_id) { - self.server_gateway - .complete_workflow_task(task_token, commands) - .await - .map_err(|ts| { - if ts.code() == tonic::Code::InvalidArgument - && ts.message() == "UnhandledCommand" - { - CoreError::UnhandledCommandWhenCompleting - } else { - ts.into() - } - })?; - } + self.wf_completion_success(task_token, &run_id, success) + .await?; } Some(wf_activation_completion::Status::Failed(failure)) => { - // Blow up any cached data associated with the workflow - self.evict_run(&run_id); - - self.server_gateway - .fail_workflow_task( - task_token, - WorkflowTaskFailedCause::Unspecified, - failure.failure.map(Into::into), - ) + self.wf_completion_failed(task_token, &run_id, failure) .await?; } None => { - return Err(CoreError::MalformedWorkflowCompletion { + return Err(CompleteWfError::MalformedWorkflowCompletion { reason: "Workflow completion had empty status field".to_owned(), completion: None, }) @@ -307,22 +261,17 @@ where } #[instrument(skip(self))] - async fn complete_activity_task(&self, completion: ActivityTaskCompletion) -> Result<()> { + async fn complete_activity_task( + &self, + completion: ActivityTaskCompletion, + ) -> Result<(), CompleteActivityError> { let task_token = completion.task_token; - let result = if let Some(r) = completion.result { - r - } else { - return Err(CoreError::MalformedActivityCompletion { - reason: "Activity completion had empty result field".to_owned(), - completion: None, - }); - }; - let status = if let Some(s) = result.status { + let status = if let Some(s) = completion.result.and_then(|r| r.status) { s } else { - return Err(CoreError::MalformedActivityCompletion { - reason: "Activity result had empty status field".to_owned(), - completion: Some(result), + return Err(CompleteActivityError::MalformedActivityCompletion { + reason: "Activity completion had empty result/status field".to_owned(), + completion: None, }); }; match status { @@ -345,7 +294,7 @@ where Ok(()) } - async fn send_activity_heartbeat(&self, _task_token: ActivityHeartbeat) -> Result<()> { + async fn send_activity_heartbeat(&self, _task_token: ActivityHeartbeat) -> Result<(), ()> { unimplemented!() } @@ -372,6 +321,113 @@ impl CoreSDK { } } + /// Given a pending activation, prepare it to be sent to lang + #[instrument(skip(self))] + fn prepare_pending_activation( + &self, + pa: PendingActivation, + ) -> Result, PollWfError> { + if let Some(next_activation) = + self.access_wf_machine(&pa.run_id, move |mgr| mgr.get_next_activation())? + { + let task_token = pa.task_token.clone(); + if next_activation.more_activations_needed { + self.pending_activations.push(pa); + } + return Ok(Some(next_activation.finalize(task_token))); + } + Ok(None) + } + + /// Given a wf task from the server, prepare an activation (if there is one) to be sent to lang + fn prepare_new_activation( + &self, + work: PollWorkflowTaskQueueResponse, + ) -> Result, PollWfError> { + if work == PollWorkflowTaskQueueResponse::default() { + // We get the default proto in the event that the long poll times out. + return Ok(None); + } + let task_token = work.task_token.clone(); + debug!( + task_token = %fmt_task_token(&task_token), + "Received workflow task from server" + ); + + let (next_activation, run_id) = self.instantiate_or_update_workflow(work)?; + + if let Some(na) = next_activation { + if na.more_activations_needed { + self.pending_activations.push(PendingActivation { + run_id, + task_token: task_token.clone(), + }); + } + return Ok(Some(na.finalize(task_token))); + } + Ok(None) + } + + /// Handle a successful workflow completion + async fn wf_completion_success( + &self, + task_token: Vec, + run_id: &str, + success: workflow_completion::Success, + ) -> Result<(), CompleteWfError> { + // Convert to wf commands + let cmds = success + .commands + .into_iter() + .map(|c| c.try_into()) + .collect::, EmptyWorkflowCommandErr>>() + .map_err(|_| CompleteWfError::MalformedWorkflowCompletion { + reason: "At least one workflow command in the completion \ + contained an empty variant" + .to_owned(), + completion: None, + })?; + let commands = self.push_lang_commands(&run_id, cmds)?; + // We only actually want to send commands back to the server if there are + // no more pending activations -- in other words the lang SDK has caught + // up on replay. + if !self.pending_activations.has_pending(&run_id) { + self.server_gateway + .complete_workflow_task(task_token, commands) + .await + .map_err(|ts| { + if ts.code() == tonic::Code::InvalidArgument + && ts.message() == "UnhandledCommand" + { + CompleteWfError::UnhandledCommandWhenCompleting + } else { + ts.into() + } + })?; + } + Ok(()) + } + + /// Handle a failed workflow completion + async fn wf_completion_failed( + &self, + task_token: Vec, + run_id: &str, + failure: workflow_completion::Failure, + ) -> Result<(), CompleteWfError> { + // Blow up any cached data associated with the workflow + self.evict_run(&run_id); + + self.server_gateway + .fail_workflow_task( + task_token, + WorkflowTaskFailedCause::Unspecified, + failure.failure.map(Into::into), + ) + .await?; + Ok(()) + } + /// Will create a new workflow manager if needed for the workflow task, if not, it will /// feed the existing manager the updated history we received from the server. /// @@ -382,7 +438,7 @@ impl CoreSDK { fn instantiate_or_update_workflow( &self, poll_wf_resp: PollWorkflowTaskQueueResponse, - ) -> Result<(Option, String)> { + ) -> Result<(Option, String), PollWfError> { match poll_wf_resp { PollWorkflowTaskQueueResponse { task_token, @@ -399,16 +455,20 @@ impl CoreSDK { .create_or_update(&run_id, history, workflow_execution) { Ok(activation) => Ok((activation, run_id)), - Err(source) => Err(CoreError::WorkflowError { source, run_id }), + Err(source) => Err(PollWfError::WorkflowUpdateError { source, run_id }), } } - p => Err(CoreError::BadPollResponseFromServer(p)), + p => Err(PollWfError::BadPollResponseFromServer(p)), } } /// Feed commands from the lang sdk into appropriate workflow manager which will iterate /// the state machines and return commands ready to be sent to the server - fn push_lang_commands(&self, run_id: &str, cmds: Vec) -> Result> { + fn push_lang_commands( + &self, + run_id: &str, + cmds: Vec, + ) -> Result, WorkflowUpdateError> { self.access_wf_machine(run_id, move |mgr| mgr.push_commands(cmds)) } @@ -419,7 +479,11 @@ impl CoreSDK { /// Wraps access to `self.workflow_machines.access`, properly passing in the current tracing /// span to the wf machines thread. - fn access_wf_machine(&self, run_id: &str, mutator: F) -> Result + fn access_wf_machine( + &self, + run_id: &str, + mutator: F, + ) -> Result where F: FnOnce(&mut WorkflowManager) -> Result + Send + 'static, Fout: Send + Debug + 'static, @@ -431,61 +495,13 @@ impl CoreSDK { }; self.workflow_machines .access(run_id, mutator) - .map_err(|source| CoreError::WorkflowError { + .map_err(|source| WorkflowUpdateError { source, run_id: run_id.to_owned(), }) } } -/// The error type returned by interactions with [Core] -#[derive(thiserror::Error, Debug, displaydoc::Display)] -#[allow(clippy::large_enum_variant)] -// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc -pub enum CoreError { - /** [Core::shutdown] was called, and there are no more replay tasks to be handled. You must - call [Core::complete_task] for any remaining tasks, and then may exit.*/ - ShuttingDown, - /// Poll workflow response from server was malformed: {0:?} - BadPollResponseFromServer(PollWorkflowTaskQueueResponse), - /// Lang SDK sent us a malformed workflow completion ({reason}): {completion:?} - MalformedWorkflowCompletion { - /// Reason the completion was malformed - reason: String, - /// The completion, which may not be included to avoid unnecessary copies. - completion: Option, - }, - /// Lang SDK sent us a malformed activity completion ({reason}): {completion:?} - MalformedActivityCompletion { - /// Reason the completion was malformed - reason: String, - /// The completion, which may not be included to avoid unnecessary copies. - completion: Option, - }, - /// There was an error specific to a workflow instance with id ({run_id}): {source:?} - WorkflowError { - /// Underlying workflow error - source: WorkflowError, - /// The run id of the erring workflow - run_id: String, - }, - /** There exists a pending command in this workflow's history which has not yet been handled. - When thrown from [Core::complete_task], it means you should poll for a new task, receive a - new task token, and complete that new task. */ - UnhandledCommandWhenCompleting, - /// Unhandled error when calling the temporal server: {0:?} - TonicError(#[from] tonic::Status), -} - -/// Errors thrown during initialization of [Core] -#[derive(thiserror::Error, Debug, displaydoc::Display)] -pub enum CoreInitError { - /// Invalid URI: {0:?} - InvalidUri(#[from] InvalidUri), - /// Server connection error: {0:?} - TonicTransportError(#[from] tonic::transport::Error), -} - #[cfg(test)] mod test { use super::*; @@ -765,7 +781,7 @@ mod test { .poll_workflow_task(TASK_Q) .await .unwrap_err(), - CoreError::ShuttingDown + PollWfError::ShutDown ); } diff --git a/src/pollers/mod.rs b/src/pollers/mod.rs index 09cb0e116..6ba1e32b2 100644 --- a/src/pollers/mod.rs +++ b/src/pollers/mod.rs @@ -115,8 +115,8 @@ pub trait ServerGatewayApis { -> Result; /// Complete a workflow activation. `task_token` is the task token that would've been received - /// from [crate::Core::poll_task] API. `commands` is a list of new commands to send to the - /// server, such as starting a timer. + /// from [crate::Core::poll_workflow_task] API. `commands` is a list of new commands to send to + /// the server, such as starting a timer. async fn complete_workflow_task( &self, task_token: Vec, @@ -151,7 +151,7 @@ pub trait ServerGatewayApis { ) -> Result; /// Fail task by sending the failure to the server. `task_token` is the task token that would've - /// been received from [crate::Core::poll_task]. + /// been received from [crate::Core::poll_workflow_task]. async fn fail_workflow_task( &self, task_token: Vec, diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 0bb82109f..a831819fb 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -21,21 +21,25 @@ type Result = std::result::Result; /// Errors relating to workflow management and machine logic. These are going to be passed up and /// out to the lang SDK where it will need to handle them. Generally that will usually mean /// showing an error to the user and/or invalidating the workflow cache. -#[derive(thiserror::Error, Debug, displaydoc::Display)] +#[derive(thiserror::Error, Debug)] #[allow(clippy::large_enum_variant)] -// NOTE: Docstrings take the place of #[error("xxxx")] here b/c of displaydoc pub enum WorkflowError { /// The workflow machines associated with `run_id` were not found in memory + #[error("Workflow machines associated with `{run_id}` not found")] MissingMachine { run_id: String }, - /// Underlying error in state machines: {0:?} + /// Underlying error in state machines + #[error("Underlying error in state machines: {0:?}")] UnderlyingMachinesError(#[from] WFMachinesError), /// There was an error in the history associated with the workflow: {0:?} + #[error("There was an error in the history associated with the workflow: {0:?}")] HistoryError(#[from] HistoryInfoError), - /** Error buffering commands coming in from the lang side. This shouldn't happen unless we've - run out of memory or there is a logic bug. Considered fatal. */ + /// Error buffering commands coming in from the lang side. This shouldn't happen unless we've + /// run out of memory or there is a logic bug. Considered fatal. + #[error("Internal error buffering workflow commands")] CommandBufferingError(#[from] SendError>), - /** We tried to instantiate a workflow instance, but the provided history resulted in no - new activations. There is nothing to do. */ + /// We tried to instantiate a workflow instance, but the provided history resulted in no + /// new activations. There is nothing to do. + #[error("Machine created with no activations for run_id {run_id}")] MachineWasCreatedWithNoActivations { run_id: String }, } @@ -78,10 +82,20 @@ impl WorkflowManager { #[derive(Debug)] pub(crate) struct NextWfActivation { - pub activation: WfActivation, + /// Keep this private, so we can ensure task tokens are attached via [Self::finalize] + activation: WfActivation, pub more_activations_needed: bool, } +impl NextWfActivation { + /// Attach a task token to the activation so it can be sent out to the lang sdk + pub(crate) fn finalize(self, task_token: Vec) -> WfActivation { + let mut a = self.activation; + a.task_token = task_token; + a + } +} + impl WorkflowManager { /// Given history that was just obtained from the server, pipe it into this workflow's machines. /// diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index eaf6690bc..d67a79712 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -19,7 +19,8 @@ use temporal_sdk_core::{ }, workflow_completion::WfActivationCompletion, }, - Core, CoreError, CoreInitOptions, ServerGatewayApis, ServerGatewayOptions, Url, + CompleteWfError, Core, CoreInitOptions, PollWfError, ServerGatewayApis, ServerGatewayOptions, + Url, }; // TODO: These tests can get broken permanently if they break one time and the server is not @@ -551,14 +552,14 @@ async fn shutdown_aborts_actively_blocked_poll() { }); assert_matches!( core.poll_workflow_task(task_q).await.unwrap_err(), - CoreError::ShuttingDown + PollWfError::ShutDown ); handle.join().unwrap(); // Ensure double-shutdown doesn't explode core.shutdown(); assert_matches!( core.poll_workflow_task(task_q).await.unwrap_err(), - CoreError::ShuttingDown + PollWfError::ShutDown ); } @@ -767,7 +768,7 @@ async fn signal_workflow_signal_not_handled_on_workflow_completion() { )) .await .unwrap_err(); - assert_matches!(unhandled, CoreError::UnhandledCommandWhenCompleting); + assert_matches!(unhandled, CompleteWfError::UnhandledCommandWhenCompleting); // We should get a new task with the signal let res = core.poll_workflow_task(task_q).await.unwrap();