Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,18 @@ steps:
- docker-compose#v3.0.0:
run: unit-test
config: .buildkite/docker/docker-compose.yaml
- label: "doc"
agents:
queue: "default"
docker: "*"
command: "cargo doc --workspace --all-features --no-deps"
timeout_in_minutes: 15
plugins:
- docker-compose#v3.0.0:
run: unit-test
config: .buildkite/docker/docker-compose.yaml
env:
- RUSTDOCFLAGS=-Dwarnings
- label: "lint"
agents:
queue: "default"
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ base64 = "0.13"
crossbeam = "0.8"
dashmap = "4.0"
derive_more = "0.99"
displaydoc = "0.2"
futures = "0.3"
itertools = "0.10"
once_cell = "1.5"
Expand Down
2 changes: 1 addition & 1 deletion protos/local/workflow_completion.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import "workflow_commands.proto";

/// Result of a single workflow activation, reported from lang to core
message WFActivationCompletion {
// The token from the [WfActivation] you are completing
// The token from the workflow activation you are completing
bytes task_token = 1;
oneof status {
Success successful = 2;
Expand Down
145 changes: 145 additions & 0 deletions src/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use crate::{
protos::coresdk::activity_result::ActivityResult,
protos::coresdk::workflow_completion::WfActivationCompletion,
protos::temporal::api::workflowservice::v1::PollWorkflowTaskQueueResponse,
workflow::WorkflowError,
};
use tonic::codegen::http::uri::InvalidUri;

pub(crate) struct ShutdownErr;
pub(crate) struct WorkflowUpdateError {
/// Underlying workflow error
pub source: WorkflowError,
/// The run id of the erring workflow
pub run_id: String,
}

/// Errors thrown during initialization of [crate::Core]
#[derive(thiserror::Error, Debug)]
pub enum CoreInitError {
/// Invalid URI. Configuration error, fatal.
#[error("Invalid URI: {0:?}")]
InvalidUri(#[from] InvalidUri),
/// Server connection error. Crashing and restarting the worker is likely best.
#[error("Server connection error: {0:?}")]
TonicTransportError(#[from] tonic::transport::Error),
}

/// Errors thrown by [crate::Core::poll_workflow_task]
#[derive(thiserror::Error, Debug)]
pub enum PollWfError {
/// There was an error specific to a workflow instance. The cached workflow should be deleted
/// from lang side.
#[error("There was an error with the workflow instance with run id ({run_id}): {source:?}")]
WorkflowUpdateError {
/// Underlying workflow error
source: WorkflowError,
/// The run id of the erring workflow
run_id: String,
},
/// The server returned a malformed polling response. Either we aren't handling a valid form,
/// or the server is bugging out. Likely fatal.
#[error("Poll workflow response from server was malformed: {0:?}")]
BadPollResponseFromServer(PollWorkflowTaskQueueResponse),
/// [crate::Core::shutdown] was called, and there are no more replay tasks to be handled. Lang
/// must call [crate::Core::complete_workflow_task] for any remaining tasks, and then may
/// exit.
#[error("Core is shut down and there are no more workflow replay tasks")]
ShutDown,
/// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
/// errors, so lang should consider this fatal.
#[error("Unhandled error when calling the temporal server: {0:?}")]
TonicError(#[from] tonic::Status),
}

impl From<WorkflowUpdateError> for PollWfError {
fn from(e: WorkflowUpdateError) -> Self {
Self::WorkflowUpdateError {
source: e.source,
run_id: e.run_id,
}
}
}

impl From<ShutdownErr> for PollWfError {
fn from(_: ShutdownErr) -> Self {
Self::ShutDown
}
}

/// Errors thrown by [crate::Core::poll_activity_task]
#[derive(thiserror::Error, Debug)]
pub enum PollActivityError {
/// [crate::Core::shutdown] was called, we will no longer fetch new activity tasks. Lang must
/// ensure it is finished with any workflow replay, see [PollWfError::ShutDown]
#[error("Core is shut down")]
ShutDown,
/// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
/// errors, so lang should consider this fatal.
#[error("Unhandled error when calling the temporal server: {0:?}")]
TonicError(#[from] tonic::Status),
}

impl From<ShutdownErr> for PollActivityError {
fn from(_: ShutdownErr) -> Self {
Self::ShutDown
}
}

/// Errors thrown by [crate::Core::complete_workflow_task]
#[derive(thiserror::Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum CompleteWfError {
/// Lang SDK sent us a malformed workflow completion. This likely means a bug in the lang sdk.
#[error("Lang SDK sent us a malformed workflow completion ({reason}): {completion:?}")]
MalformedWorkflowCompletion {
/// Reason the completion was malformed
reason: String,
/// The completion, which may not be included to avoid unnecessary copies.
completion: Option<WfActivationCompletion>,
},
/// There was an error specific to a workflow instance. The cached workflow should be deleted
/// from lang side.
#[error("There was an error with the workflow instance with run id ({run_id}): {source:?}")]
WorkflowUpdateError {
/// Underlying workflow error
source: WorkflowError,
/// The run id of the erring workflow
run_id: String,
},
/// There exists a pending command in this workflow's history which has not yet been handled.
/// When thrown from [crate::Core::complete_workflow_task], it means you should poll for a new
/// task, receive a new task token, and complete that new task.
#[error("Unhandled command when completing workflow activation")]
UnhandledCommandWhenCompleting,
/// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
/// errors, so lang should consider this fatal.
#[error("Unhandled error when calling the temporal server: {0:?}")]
TonicError(#[from] tonic::Status),
}

impl From<WorkflowUpdateError> for CompleteWfError {
fn from(e: WorkflowUpdateError) -> Self {
Self::WorkflowUpdateError {
source: e.source,
run_id: e.run_id,
}
}
}

/// Errors thrown by [crate::Core::complete_activity_task]
#[derive(thiserror::Error, Debug)]
pub enum CompleteActivityError {
/// Lang SDK sent us a malformed activity completion. This likely means a bug in the lang sdk.
#[error("Lang SDK sent us a malformed activity completion ({reason}): {completion:?}")]
MalformedActivityCompletion {
/// Reason the completion was malformed
reason: String,
/// The completion, which may not be included to avoid unnecessary copies.
completion: Option<ActivityResult>,
},
/// Unhandled error when calling the temporal server. Core will attempt to retry any non-fatal
/// errors, so lang should consider this fatal.
#[error("Unhandled error when calling the temporal server: {0:?}")]
TonicError(#[from] tonic::Status),
}
Loading