diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 4df301d43..e84b65cad 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -13,7 +13,7 @@ steps: agents: queue: "default" docker: "*" - command: "cargo clippy --all -- -D warnings" + command: "cargo clippy --workspace --all-features --all-targets -- -D warnings && cargo clippy --test integ_tests --all-features -- --D warnings" timeout_in_minutes: 15 plugins: - docker-compose#v3.0.0: diff --git a/build.rs b/build.rs index 474f91870..040b1f1e0 100644 --- a/build.rs +++ b/build.rs @@ -14,10 +14,16 @@ fn main() -> Result<(), Box> { "temporal.api.command.v1.Command.attributes", "#[derive(::derive_more::From)]", ) - .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") - .type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.WFActivationJob.variant", + "coresdk.workflow_commands.WorkflowCommand.variant", + "#[derive(::derive_more::From)]", + ) + .type_attribute( + "coresdk.workflow_activation.wf_activation_job", + "#[derive(::derive_more::From)]", + ) + .type_attribute( + "coresdk.workflow_activation.WFActivationJob.variant", "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") diff --git a/protos/local/activity_result.proto b/protos/local/activity_result.proto new file mode 100644 index 000000000..59aa21d15 --- /dev/null +++ b/protos/local/activity_result.proto @@ -0,0 +1,30 @@ +syntax = "proto3"; + +package coresdk.activity_result; + +import "common.proto"; + +/// Used to report activity completion to core and to resolve the activity in a workflow activation +message ActivityResult { + oneof status { + Success completed = 1; + Failure failed = 2; + Cancelation canceled = 3; + } +} + +/// Used in ActivityResult to report cancellation +message Cancelation { + repeated common.Payload details = 1; +} + +/// Used in ActivityResult to report successful completion +message Success { + repeated common.Payload result = 1; +} + +/// Used in ActivityResult to report failure +message Failure { + common.UserCodeFailure failure = 1; +} + diff --git a/protos/local/activity_task.proto b/protos/local/activity_task.proto new file mode 100644 index 000000000..73625455f --- /dev/null +++ b/protos/local/activity_task.proto @@ -0,0 +1,55 @@ +syntax = "proto3"; + +/** + * Definitions of the different activity tasks returned from [crate::Core::poll_task]. + */ +package coresdk.activity_task; + +import "common.proto"; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; + +message ActivityTask { + string activity_id = 1; + oneof variant { + // Start activity execution. + Start start = 2; + // Attempt to cancel activity execution. + Cancel cancel = 3; + } +} + +/// Begin executing an activity +message Start { + string workflow_namespace = 1; + /// The workflow's type name or function identifier + string workflow_type = 2; + common.WorkflowExecution workflow_execution = 3; + /// The activity's type name or function identifier + string activity_type = 4; + map header_fields = 5; + /// Arguments to the activity + repeated common.Payload input = 6; + repeated common.Payload heartbeat_details = 7; + + google.protobuf.Timestamp scheduled_time = 8; + google.protobuf.Timestamp current_attempt_scheduled_time = 9; + google.protobuf.Timestamp started_time = 10; + int32 attempt = 11; + + google.protobuf.Duration schedule_to_close_timeout = 12; + google.protobuf.Duration start_to_close_timeout = 13; + google.protobuf.Duration heartbeat_timeout = 14; + /// This is an actual retry policy the service uses. It can be different from the one provided + /// (or not) during activity scheduling as the service can override the provided one in case its + /// values are not specified or exceed configured system limits. + common.RetryPolicy retry_policy = 15; +} + +/// Attempt to cancel a running activity +message Cancel { + // TODO: add attributes +} + + diff --git a/protos/local/common.proto b/protos/local/common.proto new file mode 100644 index 000000000..71a7f7e10 --- /dev/null +++ b/protos/local/common.proto @@ -0,0 +1,61 @@ +syntax = "proto3"; + +package coresdk.common; + +import "google/protobuf/duration.proto"; + +// Many of the messages in here are exact or near duplicates of the protobufs defined by the +// Temporal API. We dupe them here to introduce better ergonomics wherever possible, and to +// decouple ourselves from upstream changes. Additionally, we have no need for wire compatibility +// between core and lang sdks, since the lang SDK chooses which version of core it wants to use. + +// Used as arguments to activities, signals, queries, etc. +message Payload { + map metadata = 1; + bytes data = 2; +} + +// Identifying information about a particular workflow execution +message WorkflowExecution { + string workflow_id = 1; + string run_id = 2; +} + +// Defines how an activity or workflow should be retried in the event of failure, timeout, etc. +message RetryPolicy { + // Interval of the first retry. If backoff_coefficient is 1.0 then it is used for all + // retries. + google.protobuf.Duration initial_interval = 1; + // Coefficient used to calculate the next retry interval. The next retry interval is previous + // interval multiplied by the coefficient. Must be 1 or larger. + double backoff_coefficient = 2; + // Maximum interval between retries. Exponential backoff leads to interval increase. This value + // caps that interval. Default is 100x of the initial interval. + google.protobuf.Duration maximum_interval = 3; + // Maximum number of attempts. When exceeded, retrying will stop. 1 disables retries. 0 means + // unlimited retries (until the activity or workflow's total timeout is reached). + int32 maximum_attempts = 4; + // If a stringified error matches something in this list, retries will cease. + repeated string non_retryable_error_types = 5; +} + +// Represents a failure in user code, workflow or activity, which could've been triggered by +// an exception or similar error mechanism like the error half of a Result type. +// +// This eventually needs to be converted into an upstream `Failure` which needs to handle a lot +// more cases that the lang sdk does not care about. By default any lang sdk failure is an upstream +// `ApplicationFailureInfo`. +message UserCodeFailure { + // Human-specified or otherwise most-human-readable representation of the error. + string message = 1; + // A type identifier for the error, if the error is well-typed. + string type = 2; + // If known, the location the error was issued at. + string source = 3; + // If collected, a stack trace for the error. + string stack_trace = 4; + // Explicitly thrown user errors are able to indicate that retries should be prevented + bool non_retryable = 5; + + UserCodeFailure cause = 6; +} diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 797fba8b5..c80c3ae26 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -2,22 +2,19 @@ syntax = "proto3"; package coresdk; +import "activity_result.proto"; +import "activity_task.proto"; +import "common.proto"; +import "workflow_activation.proto"; +import "workflow_commands.proto"; +import "workflow_completion.proto"; + // Note: Intellij will think these imports don't work because of the slightly odd nature of // the include paths. You can make it work by going to the "Protobuf Support" settings section // and adding the "api_upstream" subdir as an include path. - import "google/protobuf/timestamp.proto"; import "google/protobuf/duration.proto"; import "google/protobuf/empty.proto"; -import "dependencies/gogoproto/gogo.proto"; -import "temporal/api/workflowservice/v1/request_response.proto"; -import "temporal/api/taskqueue/v1/message.proto"; -import "temporal/api/enums/v1/failed_cause.proto"; -import "temporal/api/failure/v1/message.proto"; -import "temporal/api/history/v1/message.proto"; -import "temporal/api/common/v1/message.proto"; -import "temporal/api/command/v1/message.proto"; -import "temporal/api/query/v1/message.proto"; // A request as given to [crate::Core::poll_task] message PollTaskReq { @@ -40,214 +37,26 @@ message Task { // The type of task to be performed oneof variant { // Wake up a workflow - WFActivation workflow = 2; + workflow_activation.WFActivation workflow = 2; // Run an activity - ActivityTask activity = 3; + activity_task.ActivityTask activity = 3; } } -// An instruction to the lang sdk to run some workflow code, whether for the first time or from -// a cached state. -message WFActivation { - // The current time as understood by the workflow, which is set by workflow task started events - google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; - // The id of the currently active run of the workflow - string run_id = 2; - // The things to do upon activating the workflow - repeated WFActivationJob jobs = 3; -} - -message WFActivationJob { - oneof variant { - // Begin a workflow for the first time - StartWorkflow start_workflow = 1; - // A timer has fired, allowing whatever was waiting on it (if anything) to proceed - FireTimer fire_timer = 2; - // A timer was canceled, and needs to be unblocked on the lang side. - // TODO: No reason for this to exist. Lang is always doing the cancel, so it can remove - // it itself. - CancelTimer cancel_timer = 3; - // Workflow was reset. The randomness seed must be updated. - UpdateRandomSeed update_random_seed = 4; - // A request to query the workflow was received. - QueryWorkflow query_workflow = 5; - // A request to cancel the workflow was received. - CancelWorkflow cancel_workflow = 6; - // A request to signal the workflow was received. - SignalWorkflow signal_workflow = 7; - // An activity was resolved with, result could be completed, failed or cancelled - ResolveActivity resolve_activity = 8; - } -} - -message StartWorkflow { - // The identifier the lang-specific sdk uses to execute workflow code - string workflow_type = 1; - // The workflow id used on the temporal server - string workflow_id = 2; - // Input to the workflow code - temporal.api.common.v1.Payloads arguments = 3; - // The seed must be used to initialize the random generator used by SDK. - // RandomSeedUpdatedAttributes are used to deliver seed updates. - uint64 randomness_seed = 4; - - // TODO: Do we need namespace here, or should that just be fetchable easily? - // will be others - workflow exe started attrs, etc -} - -message FireTimer { - string timer_id = 1; -} - -message ResolveActivity { - string activity_id = 1; - ActivityResult result = 2; -} - -message CancelTimer { - string timer_id = 1; -} - -message UpdateRandomSeed { - uint64 randomness_seed = 1; -} - -message QueryWorkflow { - temporal.api.query.v1.WorkflowQuery query = 1; -} - -message CancelWorkflow { - // TODO: add attributes here -} - -message SignalWorkflow { - // The signal information from the workflow's history - temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes signal = 1; -} - -message StartActivity { - string workflow_namespace = 1; - temporal.api.common.v1.WorkflowType workflow_type = 2; - temporal.api.common.v1.WorkflowExecution workflow_execution = 3; - temporal.api.common.v1.ActivityType activity_type = 4; - temporal.api.common.v1.Header header = 5; - temporal.api.common.v1.Payloads input = 6; - temporal.api.common.v1.Payloads heartbeat_details = 7; - google.protobuf.Timestamp scheduled_time = 8 [(gogoproto.stdtime) = true]; - google.protobuf.Timestamp current_attempt_scheduled_time = 9 [(gogoproto.stdtime) = true]; - google.protobuf.Timestamp started_time = 10 [(gogoproto.stdtime) = true]; - int32 attempt = 11; - // (-- api-linter: core::0140::prepositions=disabled - // aip.dev/not-precedent: "to" is used to indicate interval. --) - google.protobuf.Duration schedule_to_close_timeout = 12 [(gogoproto.stdduration) = true]; - // (-- api-linter: core::0140::prepositions=disabled - // aip.dev/not-precedent: "to" is used to indicate interval. --) - google.protobuf.Duration start_to_close_timeout = 13 [(gogoproto.stdduration) = true]; - google.protobuf.Duration heartbeat_timeout = 14 [(gogoproto.stdduration) = true]; - // This is an actual retry policy the service uses. - // It can be different from the one provided (or not) during activity scheduling - // as the service can override the provided one in case its values are not specified - // or exceed configured system limits. - temporal.api.common.v1.RetryPolicy retry_policy = 15; -} - -message CancelActivity { - // TODO: add attributes -} - -message ActivityTask { - string activity_id = 1; - oneof variant { - // Start activity execution. - StartActivity start = 2; - // Attempt to cancel activity execution. - CancelActivity cancel = 3; - } -} - - // Sent from lang side to core when calling [crate::Core::complete_task] message TaskCompletion { // The id from the [Task] being completed bytes task_token = 1; oneof variant { // Complete a workflow task - WFActivationCompletion workflow = 2; + workflow_completion.WFActivationCompletion workflow = 2; // Complete an activity task - ActivityResult activity = 3; - } -} - -message WFActivationCompletion { - oneof status { - WFActivationSuccess successful = 1; - WFActivationFailure failed = 2; - } -} - -/// Used to report activity completion to core and to resolve the activity in a workflow activation -message ActivityResult { - oneof status { - ActivityTaskSuccess completed = 1; - ActivityTaskFailure failed = 2; - ActivityTaskCancelation canceled = 3; - } -} - -/// Request cancellation of an activity from a workflow -message RequestActivityCancellation { - string activity_id = 1; - string reason = 2; -} - -message CoreCommand { - oneof variant { - temporal.api.query.v1.WorkflowQueryResult respond_to_query = 1; - RequestActivityCancellation request_activity_cancellation = 2; + activity_result.ActivityResult activity = 3; } } -// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next -message Command { - oneof variant { - temporal.api.command.v1.Command api = 1; - CoreCommand core = 2; - } -} - -message WFActivationSuccess { - // A list of commands to send back to the temporal server - repeated Command commands = 1; - - // Other bits from RespondWorkflowTaskCompletedRequest as needed -} - -message WFActivationFailure { - temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; - temporal.api.failure.v1.Failure failure = 2; - - // Other bits from RespondWorkflowTaskFailedRequest as needed -} - -/// Used in ActivityResult to report cancellation -message ActivityTaskCancelation { - temporal.api.common.v1.Payloads details = 1; -} - -/// Used in ActivityResult to report successful completion -message ActivityTaskSuccess { - temporal.api.common.v1.Payloads result = 1; - // Other bits from RespondActivityTaskCompletedRequest as needed -} - -/// Used in ActivityResult to report failure -message ActivityTaskFailure { - temporal.api.failure.v1.Failure failure = 1; - // Other bits from RespondActivityTaskFailedRequest as needed -} - // A request as given to [crate::Core::send_activity_heartbeat] message ActivityHeartbeat { string activity_id = 1; - temporal.api.common.v1.Payloads details = 2; + repeated common.Payload details = 2; } diff --git a/protos/local/workflow_activation.proto b/protos/local/workflow_activation.proto new file mode 100644 index 000000000..97fbb4d1a --- /dev/null +++ b/protos/local/workflow_activation.proto @@ -0,0 +1,93 @@ +syntax = "proto3"; + +/** + * Definitions of the different workflow activation jobs returned from [crate::Core::poll_task]. The + * lang SDK applies these activation jobs to drive workflows. + */ +package coresdk.workflow_activation; + +import "common.proto"; +import "activity_result.proto"; + +import "google/protobuf/timestamp.proto"; + +/// An instruction to the lang sdk to run some workflow code, whether for the first time or from +/// a cached state. +message WFActivation { + /// The current time as understood by the workflow, which is set by workflow task started events + google.protobuf.Timestamp timestamp = 1; + /// The id of the currently active run of the workflow + string run_id = 2; + /// The things to do upon activating the workflow + repeated WFActivationJob jobs = 3; +} + +message WFActivationJob { + oneof variant { + /// Begin a workflow for the first time + StartWorkflow start_workflow = 1; + /// A timer has fired, allowing whatever was waiting on it (if anything) to proceed + FireTimer fire_timer = 2; + /// Workflow was reset. The randomness seed must be updated. + UpdateRandomSeed update_random_seed = 4; + /// A request to query the workflow was received. + QueryWorkflow query_workflow = 5; + /// A request to cancel the workflow was received. + CancelWorkflow cancel_workflow = 6; + /// A request to signal the workflow was received. + SignalWorkflow signal_workflow = 7; + /// An activity was resolved with, result could be completed, failed or cancelled + ResolveActivity resolve_activity = 8; + } +} + +/// Start a new workflow +message StartWorkflow { + /// The identifier the lang-specific sdk uses to execute workflow code + string workflow_type = 1; + /// The workflow id used on the temporal server + string workflow_id = 2; + /// Inputs to the workflow code + repeated common.Payload arguments = 3; + /// The seed must be used to initialize the random generator used by SDK. + /// RandomSeedUpdatedAttributes are used to deliver seed updates. + uint64 randomness_seed = 4; + + // TODO: Do we need namespace here, or should that just be fetchable easily? + // will be others - workflow exe started attrs, etc +} + +/// Notify a workflow that a timer has fired +message FireTimer { + string timer_id = 1; +} + +/// Notify a workflow that an activity has been resolved +message ResolveActivity { + string activity_id = 1; + activity_result.ActivityResult result = 2; +} + +/// Update the workflow's random seed +message UpdateRandomSeed { + uint64 randomness_seed = 1; +} + +/// Query a workflow +message QueryWorkflow { + string query_type = 1; + repeated common.Payload arguments = 2; +} + +/// Cancel a running workflow +message CancelWorkflow { + // TODO: add attributes here +} + +/// Send a signal to a workflow +message SignalWorkflow { + string signal_name = 1; + repeated common.Payload input = 2; + string identity = 3; +} + diff --git a/protos/local/workflow_commands.proto b/protos/local/workflow_commands.proto new file mode 100644 index 000000000..93f7cfea8 --- /dev/null +++ b/protos/local/workflow_commands.proto @@ -0,0 +1,104 @@ +syntax = "proto3"; + +/** + * Definitions for commands from a workflow in lang SDK to core. While a workflow processes a batch + * of activation jobs, it accumulates these commands to be sent back to core to conclude that + * activation. + */ +package coresdk.workflow_commands; + +import "common.proto"; + +import "google/protobuf/duration.proto"; + +message WorkflowCommand { + oneof variant { + StartTimer start_timer = 1; + ScheduleActivity schedule_activity = 2; + QueryResult respond_to_query = 3; + RequestCancelActivity request_cancel_activity = 4; + CancelTimer cancel_timer = 5; + CompleteWorkflowExecution complete_workflow_execution = 6; + FailWorkflowExecution fail_workflow_execution = 7; + + // To be added as/if needed: + // RequestCancelActivityTask request_cancel_activity_task_command_attributes = 6; + // CancelWorkflowExecution cancel_workflow_execution_command_attributes = 8; + // RequestCancelExternalWorkflowExecution request_cancel_external_workflow_execution_command_attributes = 9; + // RecordMarker record_marker_command_attributes = 10; + // ContinueAsNewWorkflowExecution continue_as_new_workflow_execution_command_attributes = 11; + // StartChildWorkflowExecution start_child_workflow_execution_command_attributes = 12; + // SignalExternalWorkflowExecution signal_external_workflow_execution_command_attributes = 13; + // UpsertWorkflowSearchAttributes upsert_workflow_search_attributes_command_attributes = 14; + } +} + +message StartTimer { + string timer_id = 1; + google.protobuf.Duration start_to_fire_timeout = 2; +} + +message CancelTimer { + string timer_id = 1; +} + +message ScheduleActivity { + string activity_id = 1; + string activity_type = 2; + string namespace = 3; + // The name of the task queue to place this activity request in + // TODO: Do we care about sticky/not sticky? + string task_queue = 4; + map header_fields = 5; + /// Arguments/input to the activity. Called "input" upstream. + repeated common.Payload arguments = 6; + /// Indicates how long the caller is willing to wait for an activity completion. Limits how long + /// retries will be attempted. Either this or start_to_close_timeout_seconds must be specified. + /// When not specified defaults to the workflow execution timeout. + google.protobuf.Duration schedule_to_close_timeout = 7; + /// Limits time an activity task can stay in a task queue before a worker picks it up. This + /// timeout is always non retryable as all a retry would achieve is to put it back into the same + /// queue. Defaults to schedule_to_close_timeout or workflow execution timeout if not specified. + google.protobuf.Duration schedule_to_start_timeout = 8; + /// Maximum time an activity is allowed to execute after a pick up by a worker. This timeout is + /// always retryable. Either this or schedule_to_close_timeout must be specified. + /// TODO: Is this really either or can you do both? Make oneof if mutually exclusive + google.protobuf.Duration start_to_close_timeout = 9; + /// Maximum time allowed between successful worker heartbeats. + google.protobuf.Duration heartbeat_timeout = 10; + /// Activities are provided by a default retry policy controlled through the service dynamic + /// configuration. Retries are happening up to schedule_to_close_timeout. To disable retries set + /// retry_policy.maximum_attempts to 1. + common.RetryPolicy retry_policy = 11; +} + +message RequestCancelActivity { + string activity_id = 1; +} + +message QueryResult { + oneof variant { + QuerySuccess succeeded = 1; + string failed_with_message = 2; + } +} + +message QuerySuccess { + common.Payload response = 1; +} + +/// Request cancellation of an activity from a workflow +message RequestActivityCancellation { + string activity_id = 1; + string reason = 2; +} + +/// Issued when the workflow completes successfully +message CompleteWorkflowExecution { + repeated common.Payload result = 1; +} + +/// Issued when the workflow errors out +message FailWorkflowExecution { + common.UserCodeFailure failure = 1; +} \ No newline at end of file diff --git a/protos/local/workflow_completion.proto b/protos/local/workflow_completion.proto new file mode 100644 index 000000000..02aefee6c --- /dev/null +++ b/protos/local/workflow_completion.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +package coresdk.workflow_completion; + +import "common.proto"; +import "workflow_commands.proto"; + +/// Result of a single workflow activation, reported from lang to core +message WFActivationCompletion { + oneof status { + Success successful = 1; + Failure failed = 2; + } +} + +/// Successful workflow activation with a list of commands generated by the workflow execution +message Success { + // A list of commands to send back to the temporal server + repeated workflow_commands.WorkflowCommand commands = 1; +} + +/// Failure to activate or execute a workflow +message Failure { + common.UserCodeFailure failure = 1; +} + diff --git a/src/lib.rs b/src/lib.rs index 6eeddd7ce..96cc60c27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,16 +27,17 @@ pub use pollers::{ }; pub use url::Url; -use crate::protos::coresdk::{ - activity_result, task, ActivityTaskCancelation, ActivityTaskFailure, ActivityTaskSuccess, -}; +use crate::machines::EmptyWorkflowCommandErr; +use crate::protos::coresdk::task; use crate::{ - machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError}, + machines::{ProtoCommand, WFCommand, WFMachinesError}, pending_activations::{PendingActivation, PendingActivations}, protos::{ coresdk::{ - task_completion, wf_activation_completion, ActivityResult, Task, TaskCompletion, - WfActivationCompletion, WfActivationSuccess, + activity_result::{self as ar, activity_result, ActivityResult}, + task_completion, workflow_completion, + workflow_completion::{wf_activation_completion, WfActivationCompletion}, + PayloadsExt, Task, TaskCompletion, }, temporal::api::{ enums::v1::WorkflowTaskFailedCause, workflowservice::v1::PollWorkflowTaskQueueResponse, @@ -250,14 +251,12 @@ where // Blow up any cached data associated with the workflow self.evict_run(&run_id); - self.runtime.block_on( - self.server_gateway.fail_workflow_task( + self.runtime + .block_on(self.server_gateway.fail_workflow_task( task_token, - WorkflowTaskFailedCause::from_i32(failure.cause) - .unwrap_or(WorkflowTaskFailedCause::Unspecified), - failure.failure, - ), - )?; + WorkflowTaskFailedCause::Unspecified, + failure.failure.map(Into::into), + ))?; } } Ok(()) @@ -277,21 +276,22 @@ where })), } => { match status { - activity_result::Status::Completed(ActivityTaskSuccess { result }) => { + activity_result::Status::Completed(ar::Success { result }) => { self.runtime.block_on( self.server_gateway - .complete_activity_task(task_token, result), + .complete_activity_task(task_token, result.into_payloads()), )?; } - activity_result::Status::Failed(ActivityTaskFailure { failure }) => { + activity_result::Status::Failed(ar::Failure { failure }) => { self.runtime.block_on( - self.server_gateway.fail_activity_task(task_token, failure), + self.server_gateway + .fail_activity_task(task_token, failure.map(Into::into)), )?; } - activity_result::Status::Canceled(ActivityTaskCancelation { details }) => { + activity_result::Status::Canceled(ar::Cancelation { details }) => { self.runtime.block_on( self.server_gateway - .cancel_activity_task(task_token, details), + .cancel_activity_task(task_token, details.into_payloads()), )?; } } @@ -349,7 +349,7 @@ impl CoreSDK { fn push_lang_commands( &self, run_id: &str, - success: WfActivationSuccess, + success: workflow_completion::Success, ) -> Result> { // Convert to wf commands let cmds = success @@ -417,8 +417,8 @@ pub enum CoreError { MalformedCompletion(TaskCompletion), /// Error buffering commands CantSendCommands(#[from] SendError>), - /// Couldn't interpret command from - UninterpretableCommand(#[from] InconvertibleCommandError), + /// Land SDK sent us an empty workflow command (no variant) + UninterpretableCommand(#[from] EmptyWorkflowCommandErr), /// Underlying error in history processing UnderlyingHistError(#[from] HistoryInfoError), /// Underlying error in state machines: {0:?} @@ -447,24 +447,20 @@ pub enum CoreError { #[cfg(test)] mod test { use super::*; - use crate::protos::temporal::api::command::v1::{ - FailWorkflowExecutionCommandAttributes, ScheduleActivityTaskCommandAttributes, + use crate::protos::coresdk::common::UserCodeFailure; + use crate::protos::coresdk::workflow_commands::{ + CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, ScheduleActivity, StartTimer, }; use crate::{ machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder}, protos::{ coresdk::{ - wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed, - WfActivationJob, + workflow_activation::{wf_activation_job, WfActivationJob}, + workflow_activation::{FireTimer, StartWorkflow, UpdateRandomSeed}, + TaskCompletion, }, temporal::api::{ - command::v1::{ - CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, - StartTimerCommandAttributes, - }, - enums::v1::{EventType, WorkflowTaskFailedCause}, - failure::v1::Failure, - workflowservice::v1::RespondWorkflowTaskFailedResponse, + enums::v1::EventType, workflowservice::v1::RespondWorkflowTaskFailedResponse, }, }, test_help::canned_histories, @@ -506,7 +502,7 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: "fake_timer".to_string(), ..Default::default() } @@ -524,7 +520,7 @@ mod test { ); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task_tok, )) .unwrap(); @@ -546,7 +542,7 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![ScheduleActivityTaskCommandAttributes { + vec![ScheduleActivity { activity_id: "fake_activity".to_string(), ..Default::default() } @@ -564,13 +560,13 @@ mod test { ); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task_tok, )) .unwrap(); } - #[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))] + #[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))] fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) { let wfid = "fake_wf_id"; let run_id = "fake_run_id"; @@ -593,12 +589,12 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - StartTimerCommandAttributes { + StartTimer { timer_id: timer_1_id.to_string(), ..Default::default() } .into(), - StartTimerCommandAttributes { + StartTimer { timer_id: timer_2_id.to_string(), ..Default::default() } @@ -629,7 +625,7 @@ mod test { ); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task_tok, )) .unwrap(); @@ -658,12 +654,12 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - StartTimerCommandAttributes { + StartTimer { timer_id: cancel_timer_id.to_string(), ..Default::default() } .into(), - StartTimerCommandAttributes { + StartTimer { timer_id: timer_id.to_string(), ..Default::default() } @@ -683,11 +679,11 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - CancelTimerCommandAttributes { + CancelTimer { timer_id: cancel_timer_id.to_string(), } .into(), - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + CompleteWorkflowExecution { result: vec![] }.into(), ], task_tok, )) @@ -734,7 +730,7 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: timer_1_id.to_string(), ..Default::default() } @@ -757,7 +753,7 @@ mod test { ); let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task_tok, )) .unwrap(); @@ -788,16 +784,16 @@ mod test { let task_tok = res.task_token; core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - StartTimerCommandAttributes { + StartTimer { timer_id: cancel_timer_id.to_string(), ..Default::default() } .into(), - CancelTimerCommandAttributes { + CancelTimer { timer_id: cancel_timer_id.to_string(), } .into(), - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + CompleteWorkflowExecution { result: vec![] }.into(), ], task_tok, )) @@ -823,7 +819,7 @@ mod test { let res = core.poll_task(TASK_Q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: timer_id.to_string(), ..Default::default() } @@ -835,8 +831,7 @@ mod test { let res = core.poll_task(TASK_Q).unwrap(); core.complete_task(TaskCompletion::fail( res.task_token, - WorkflowTaskFailedCause::BadBinary, - Failure { + UserCodeFailure { message: "oh noooooooo".to_string(), ..Default::default() }, @@ -852,7 +847,7 @@ mod test { ); // Need to re-issue the start timer command (we are replaying) core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: timer_id.to_string(), ..Default::default() } @@ -869,7 +864,7 @@ mod test { }] ); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], res.task_token, )) .unwrap(); @@ -886,7 +881,7 @@ mod test { let res = core.poll_task(TASK_Q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: timer_id.to_string(), ..Default::default() } @@ -897,8 +892,8 @@ mod test { let res = core.poll_task(TASK_Q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![FailWorkflowExecutionCommandAttributes { - failure: Some(Failure { + vec![FailWorkflowExecution { + failure: Some(UserCodeFailure { message: "I'm ded".to_string(), ..Default::default() }), diff --git a/src/machines/activity_state_machine.rs b/src/machines/activity_state_machine.rs index 4b62328e9..1bc7eb76b 100644 --- a/src/machines/activity_state_machine.rs +++ b/src/machines/activity_state_machine.rs @@ -1,14 +1,23 @@ -use crate::machines::workflow_machines::MachineResponse; -use crate::machines::{Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError}; -use crate::protos::coresdk::{ - activity_result, activity_task, ActivityResult, ActivityTaskSuccess, CancelActivity, - ResolveActivity, StartActivity, -}; -use crate::protos::temporal::api::command::v1::{Command, ScheduleActivityTaskCommandAttributes}; -use crate::protos::temporal::api::common::v1::Payloads; -use crate::protos::temporal::api::enums::v1::{CommandType, EventType}; -use crate::protos::temporal::api::history::v1::{ - history_event, ActivityTaskCompletedEventAttributes, HistoryEvent, +use crate::protos::coresdk::PayloadsExt; +use crate::{ + machines::{ + workflow_machines::MachineResponse, Cancellable, NewMachineWithCommand, WFMachinesAdapter, + WFMachinesError, + }, + protos::coresdk::workflow_commands::ScheduleActivity, + protos::{ + coresdk::{ + activity_result::{self as ar, activity_result, ActivityResult}, + activity_task, + workflow_activation::ResolveActivity, + }, + temporal::api::{ + command::v1::Command, + common::v1::Payloads, + enums::v1::{CommandType, EventType}, + history::v1::{history_event, ActivityTaskCompletedEventAttributes, HistoryEvent}, + }, + }, }; use rustfsm::{fsm, MachineError, StateMachine, TransitionResult}; use std::convert::TryFrom; @@ -92,9 +101,7 @@ impl Default for ActivityCancellationType { } /// Creates a new activity state machine and a command to schedule it on the server. -pub(super) fn new_activity( - attribs: ScheduleActivityTaskCommandAttributes, -) -> NewMachineWithCommand { +pub(super) fn new_activity(attribs: ScheduleActivity) -> NewMachineWithCommand { let (activity, add_cmd) = ActivityMachine::new_scheduled(attribs); NewMachineWithCommand { command: add_cmd, @@ -104,7 +111,7 @@ pub(super) fn new_activity( impl ActivityMachine { /// Create a new activity and immediately schedule it. - pub(crate) fn new_scheduled(attribs: ScheduleActivityTaskCommandAttributes) -> (Self, Command) { + pub(crate) fn new_scheduled(attribs: ScheduleActivity) -> (Self, Command) { let mut s = Self { state: Created {}.into(), shared_state: SharedState { @@ -179,8 +186,8 @@ impl WFMachinesAdapter for ActivityMachine { ActivityMachineCommand::Complete(result) => vec![ResolveActivity { activity_id: self.shared_state.attrs.activity_id.clone(), result: Some(ActivityResult { - status: Some(activity_result::Status::Completed(ActivityTaskSuccess { - result, + status: Some(activity_result::Status::Completed(ar::Success { + result: Vec::from_payloads(result), })), }), } @@ -201,7 +208,7 @@ impl Cancellable for ActivityMachine { #[derive(Default, Clone)] pub(super) struct SharedState { - attrs: ScheduleActivityTaskCommandAttributes, + attrs: ScheduleActivity, cancellation_type: ActivityCancellationType, } diff --git a/src/machines/complete_workflow_state_machine.rs b/src/machines/complete_workflow_state_machine.rs index 300b218df..e9d88b5a7 100644 --- a/src/machines/complete_workflow_state_machine.rs +++ b/src/machines/complete_workflow_state_machine.rs @@ -3,10 +3,13 @@ use crate::{ workflow_machines::MachineResponse, Cancellable, NewMachineWithCommand, WFMachinesAdapter, WFMachinesError, }, - protos::temporal::api::{ - command::v1::{Command, CompleteWorkflowExecutionCommandAttributes}, - enums::v1::{CommandType, EventType}, - history::v1::HistoryEvent, + protos::{ + coresdk::workflow_commands::CompleteWorkflowExecution, + temporal::api::{ + command::v1::Command, + enums::v1::{CommandType, EventType}, + history::v1::HistoryEvent, + }, }, }; use rustfsm::{fsm, StateMachine, TransitionResult}; @@ -17,7 +20,7 @@ fsm! { name CompleteWorkflowMachine; command CompleteWFCommand; error WFMachinesError; - shared_state CompleteWorkflowExecutionCommandAttributes; + shared_state CompleteWorkflowExecution; Created --(Schedule, shared on_schedule) --> CompleteWorkflowCommandCreated; @@ -34,7 +37,7 @@ pub(super) enum CompleteWFCommand { /// Complete a workflow pub(super) fn complete_workflow( - attribs: CompleteWorkflowExecutionCommandAttributes, + attribs: CompleteWorkflowExecution, ) -> NewMachineWithCommand { let (machine, add_cmd) = CompleteWorkflowMachine::new_scheduled(attribs); NewMachineWithCommand { @@ -45,9 +48,7 @@ pub(super) fn complete_workflow( impl CompleteWorkflowMachine { /// Create a new WF machine and schedule it - pub(crate) fn new_scheduled( - attribs: CompleteWorkflowExecutionCommandAttributes, - ) -> (Self, Command) { + pub(crate) fn new_scheduled(attribs: CompleteWorkflowExecution) -> (Self, Command) { let mut s = Self { state: Created {}.into(), shared_state: attribs, @@ -97,7 +98,7 @@ pub(super) struct Created {} impl Created { pub(super) fn on_schedule( self, - dat: CompleteWorkflowExecutionCommandAttributes, + dat: CompleteWorkflowExecution, ) -> CompleteWorkflowMachineTransition { let cmd = Command { command_type: CommandType::CompleteWorkflowExecution as i32, diff --git a/src/machines/fail_workflow_state_machine.rs b/src/machines/fail_workflow_state_machine.rs index 89235d5c9..3fd6985b4 100644 --- a/src/machines/fail_workflow_state_machine.rs +++ b/src/machines/fail_workflow_state_machine.rs @@ -1,9 +1,9 @@ +use crate::protos::coresdk::workflow_commands::FailWorkflowExecution; use crate::{ machines::{ workflow_machines::MachineResponse, Cancellable, NewMachineWithCommand, ProtoCommand, WFMachinesAdapter, WFMachinesError, }, - protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes, protos::temporal::api::enums::v1::CommandType, protos::temporal::api::enums::v1::EventType, protos::temporal::api::history::v1::HistoryEvent, @@ -15,7 +15,7 @@ fsm! { pub(super) name FailWorkflowMachine; command FailWFCommand; error WFMachinesError; - shared_state FailWorkflowExecutionCommandAttributes; + shared_state FailWorkflowExecution; Created --(Schedule, shared on_schedule) --> FailWorkflowCommandCreated; @@ -30,7 +30,7 @@ pub(super) enum FailWFCommand { /// Fail a workflow pub(super) fn fail_workflow( - attribs: FailWorkflowExecutionCommandAttributes, + attribs: FailWorkflowExecution, ) -> NewMachineWithCommand { let (machine, add_cmd) = FailWorkflowMachine::new_scheduled(attribs); NewMachineWithCommand { @@ -41,9 +41,7 @@ pub(super) fn fail_workflow( impl FailWorkflowMachine { /// Create a new WF machine and schedule it - pub(crate) fn new_scheduled( - attribs: FailWorkflowExecutionCommandAttributes, - ) -> (Self, ProtoCommand) { + pub(crate) fn new_scheduled(attribs: FailWorkflowExecution) -> (Self, ProtoCommand) { let mut s = Self { state: Created {}.into(), shared_state: attribs, @@ -64,10 +62,7 @@ impl FailWorkflowMachine { pub(super) struct Created {} impl Created { - pub(super) fn on_schedule( - self, - dat: FailWorkflowExecutionCommandAttributes, - ) -> FailWorkflowMachineTransition { + pub(super) fn on_schedule(self, dat: FailWorkflowExecution) -> FailWorkflowMachineTransition { let cmd = ProtoCommand { command_type: CommandType::FailWorkflowExecution as i32, attributes: Some(dat.into()), diff --git a/src/machines/mod.rs b/src/machines/mod.rs index 86df95148..4c12e9886 100644 --- a/src/machines/mod.rs +++ b/src/machines/mod.rs @@ -33,23 +33,15 @@ pub(crate) mod test_help; pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines}; -use crate::protos::temporal::api::command::v1::{ - FailWorkflowExecutionCommandAttributes, RequestCancelActivityTaskCommandAttributes, - ScheduleActivityTaskCommandAttributes, -}; use crate::{ core_tracing::VecDisplayer, machines::workflow_machines::MachineResponse, protos::{ - coresdk::{self, command::Variant}, - temporal::api::{ - command::v1::{ - command::Attributes, CancelTimerCommandAttributes, Command, - CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, - }, - enums::v1::CommandType, - history::v1::HistoryEvent, + coresdk::workflow_commands::{ + workflow_command, CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, + RequestCancelActivity, ScheduleActivity, StartTimer, WorkflowCommand, }, + temporal::api::{command::v1::Command, enums::v1::CommandType, history::v1::HistoryEvent}, }, }; use prost::alloc::fmt::Formatter; @@ -68,44 +60,34 @@ pub(crate) type ProtoCommand = Command; pub enum WFCommand { /// Returned when we need to wait for the lang sdk to send us something NoCommandsFromLang, - AddActivity(ScheduleActivityTaskCommandAttributes), - RequestCancelActivity(RequestCancelActivityTaskCommandAttributes), - AddTimer(StartTimerCommandAttributes), - CancelTimer(CancelTimerCommandAttributes), - CompleteWorkflow(CompleteWorkflowExecutionCommandAttributes), - FailWorkflow(FailWorkflowExecutionCommandAttributes), + AddActivity(ScheduleActivity), + RequestCancelActivity(RequestCancelActivity), + AddTimer(StartTimer), + CancelTimer(CancelTimer), + CompleteWorkflow(CompleteWorkflowExecution), + FailWorkflow(FailWorkflowExecution), } #[derive(thiserror::Error, Debug, derive_more::From)] -#[error("Couldn't convert command")] -pub struct InconvertibleCommandError(pub coresdk::Command); +#[error("Lang provided workflow command with empty variant")] +pub struct EmptyWorkflowCommandErr; -impl TryFrom for WFCommand { - type Error = InconvertibleCommandError; +impl TryFrom for WFCommand { + type Error = EmptyWorkflowCommandErr; - fn try_from(c: coresdk::Command) -> Result { - match c.variant { - Some(Variant::Api(Command { - attributes: Some(attrs), - .. - })) => match attrs { - Attributes::StartTimerCommandAttributes(s) => Ok(WFCommand::AddTimer(s)), - Attributes::CancelTimerCommandAttributes(s) => Ok(WFCommand::CancelTimer(s)), - Attributes::ScheduleActivityTaskCommandAttributes(s) => { - Ok(WFCommand::AddActivity(s)) - } - Attributes::RequestCancelActivityTaskCommandAttributes(s) => { - Ok(WFCommand::RequestCancelActivity(s)) - } - Attributes::CompleteWorkflowExecutionCommandAttributes(c) => { - Ok(WFCommand::CompleteWorkflow(c)) - } - Attributes::FailWorkflowExecutionCommandAttributes(s) => { - Ok(WFCommand::FailWorkflow(s)) - } - _ => unimplemented!(), - }, - _ => Err(c.into()), + fn try_from(c: WorkflowCommand) -> Result { + match c.variant.ok_or(EmptyWorkflowCommandErr)? { + workflow_command::Variant::StartTimer(s) => Ok(WFCommand::AddTimer(s)), + workflow_command::Variant::CancelTimer(s) => Ok(WFCommand::CancelTimer(s)), + workflow_command::Variant::ScheduleActivity(s) => Ok(WFCommand::AddActivity(s)), + workflow_command::Variant::RequestCancelActivity(s) => { + Ok(WFCommand::RequestCancelActivity(s)) + } + workflow_command::Variant::CompleteWorkflowExecution(c) => { + Ok(WFCommand::CompleteWorkflow(c)) + } + workflow_command::Variant::FailWorkflowExecution(s) => Ok(WFCommand::FailWorkflow(s)), + _ => unimplemented!(), } } } diff --git a/src/machines/test_help/async_workflow_driver.rs b/src/machines/test_help/async_workflow_driver.rs index 1b126530c..6ad4cfd37 100644 --- a/src/machines/test_help/async_workflow_driver.rs +++ b/src/machines/test_help/async_workflow_driver.rs @@ -1,9 +1,8 @@ -use crate::machines::workflow_machines::CommandID; use crate::{ - machines::WFCommand, - protos::{ - coresdk::{wf_activation_job, FireTimer}, - temporal::api::command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes}, + machines::{workflow_machines::CommandID, WFCommand}, + protos::coresdk::{ + workflow_activation::{wf_activation_job, FireTimer}, + workflow_commands::{CancelTimer, StartTimer}, }, workflow::{ActivationListener, WorkflowFetcher}, }; @@ -127,7 +126,7 @@ impl CommandSender { } /// Request to create a timer - pub fn timer(&mut self, a: StartTimerCommandAttributes) -> impl Future { + pub fn timer(&mut self, a: StartTimer) -> impl Future { let tid = a.timer_id.clone(); let c = WFCommand::AddTimer(a); self.send(c); @@ -141,7 +140,7 @@ impl CommandSender { /// Cancel a timer pub fn cancel_timer(&self, timer_id: &str) { - let c = WFCommand::CancelTimer(CancelTimerCommandAttributes { + let c = WFCommand::CancelTimer(CancelTimer { timer_id: timer_id.to_owned(), }); self.twd_cache.cancel_timer(timer_id); diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs new file mode 100644 index 000000000..8b1378917 --- /dev/null +++ b/src/machines/test_help/workflow_driver.rs @@ -0,0 +1 @@ + diff --git a/src/machines/timer_state_machine.rs b/src/machines/timer_state_machine.rs index 56c8696f3..05973379c 100644 --- a/src/machines/timer_state_machine.rs +++ b/src/machines/timer_state_machine.rs @@ -6,9 +6,13 @@ use crate::{ Cancellable, NewMachineWithCommand, WFMachinesAdapter, }, protos::{ - coresdk::{CancelTimer, FireTimer, HistoryEventId}, + coresdk::{ + workflow_activation::FireTimer, + workflow_commands::{CancelTimer, StartTimer}, + HistoryEventId, + }, temporal::api::{ - command::v1::{CancelTimerCommandAttributes, Command, StartTimerCommandAttributes}, + command::v1::Command, enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent, TimerFiredEventAttributes}, }, @@ -48,14 +52,12 @@ pub(super) enum TimerMachineCommand { #[derive(Default, Clone)] pub(super) struct SharedState { - attrs: StartTimerCommandAttributes, + attrs: StartTimer, cancelled_before_sent: bool, } /// Creates a new, scheduled, timer as a [CancellableCommand] -pub(super) fn new_timer( - attribs: StartTimerCommandAttributes, -) -> NewMachineWithCommand { +pub(super) fn new_timer(attribs: StartTimer) -> NewMachineWithCommand { let (timer, add_cmd) = TimerMachine::new_scheduled(attribs); NewMachineWithCommand { command: add_cmd, @@ -65,7 +67,7 @@ pub(super) fn new_timer( impl TimerMachine { /// Create a new timer and immediately schedule it - pub(crate) fn new_scheduled(attribs: StartTimerCommandAttributes) -> (Self, Command) { + pub(crate) fn new_scheduled(attribs: StartTimer) -> (Self, Command) { let mut s = Self::new(attribs); s.on_event_mut(TimerMachineEvents::Schedule) .expect("Scheduling timers doesn't fail"); @@ -76,7 +78,7 @@ impl TimerMachine { (s, cmd) } - fn new(attribs: StartTimerCommandAttributes) -> Self { + fn new(attribs: StartTimer) -> Self { Self { state: Created {}.into(), shared_state: SharedState { @@ -207,7 +209,7 @@ impl StartCommandRecorded { let cmd = Command { command_type: CommandType::CancelTimer as i32, attributes: Some( - CancelTimerCommandAttributes { + CancelTimer { timer_id: dat.attrs.timer_id, } .into(), @@ -233,10 +235,10 @@ impl WFMachinesAdapter for TimerMachine { timer_id: self.shared_state.attrs.timer_id.clone(), } .into()], - TimerMachineCommand::Canceled => vec![CancelTimer { - timer_id: self.shared_state.attrs.timer_id.clone(), - } - .into()], + // We don't issue activations for timer cancellations. Lang SDK is expected to cancel + // it's own timers when user calls cancel, and they cannot be cancelled by any other + // means. + TimerMachineCommand::Canceled => vec![], TimerMachineCommand::IssueCancelCmd(c) => vec![MachineResponse::IssueNewCommand(c)], }) } @@ -264,7 +266,7 @@ mod test { test_help::{CommandSender, TestHistoryBuilder, TestWorkflowDriver}, workflow_machines::WorkflowMachines, }, - protos::temporal::api::command::v1::CompleteWorkflowExecutionCommandAttributes, + protos::coresdk::workflow_commands::CompleteWorkflowExecution, test_help::canned_histories, }; use rstest::{fixture, rstest}; @@ -284,13 +286,13 @@ mod test { task, hence no extra loop. */ let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { - let timer = StartTimerCommandAttributes { + let timer = StartTimer { timer_id: "timer1".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; command_sink.timer(timer).await; - let complete = CompleteWorkflowExecutionCommandAttributes::default(); + let complete = CompleteWorkflowExecution::default(); command_sink.send(complete.into()); }); @@ -342,7 +344,7 @@ mod test { #[test] fn mismatched_timer_ids_errors() { let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move { - let timer = StartTimerCommandAttributes { + let timer = StartTimer { timer_id: "realid".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }; @@ -366,12 +368,12 @@ mod test { #[fixture] fn cancellation_setup() -> (TestHistoryBuilder, WorkflowMachines) { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { - let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { + let cancel_timer_fut = cmd_sink.timer(StartTimer { timer_id: "cancel_timer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(500).into()), }); cmd_sink - .timer(StartTimerCommandAttributes { + .timer(StartTimer { timer_id: "wait_timer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(5).into()), }) @@ -380,7 +382,7 @@ mod test { cmd_sink.cancel_timer("cancel_timer"); cancel_timer_fut.await; - let complete = CompleteWorkflowExecutionCommandAttributes::default(); + let complete = CompleteWorkflowExecution::default(); cmd_sink.send(complete.into()); }); @@ -431,7 +433,7 @@ mod test { #[test] fn cancel_before_sent_to_server() { let twd = TestWorkflowDriver::new(|mut cmd_sink: CommandSender| async move { - let cancel_timer_fut = cmd_sink.timer(StartTimerCommandAttributes { + let cancel_timer_fut = cmd_sink.timer(StartTimer { timer_id: "cancel_timer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(500).into()), }); @@ -439,7 +441,7 @@ mod test { cmd_sink.cancel_timer("cancel_timer"); cancel_timer_fut.await; - let complete = CompleteWorkflowExecutionCommandAttributes::default(); + let complete = CompleteWorkflowExecution::default(); cmd_sink.send(complete.into()); }); diff --git a/src/machines/workflow_machines.rs b/src/machines/workflow_machines.rs index b25c285a4..ca29d88f2 100644 --- a/src/machines/workflow_machines.rs +++ b/src/machines/workflow_machines.rs @@ -1,21 +1,25 @@ -use crate::machines::activity_state_machine::new_activity; -use crate::workflow::{DrivenWorkflow, WorkflowFetcher}; use crate::{ core_tracing::VecDisplayer, machines::{ - complete_workflow_state_machine::complete_workflow, + activity_state_machine::new_activity, complete_workflow_state_machine::complete_workflow, fail_workflow_state_machine::fail_workflow, timer_state_machine::new_timer, workflow_task_state_machine::WorkflowTaskMachine, NewMachineWithCommand, ProtoCommand, TemporalStateMachine, WFCommand, }, protos::{ - coresdk::{wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation}, + coresdk::{ + workflow_activation::{ + wf_activation_job, StartWorkflow, UpdateRandomSeed, WfActivation, + }, + PayloadsExt, + }, temporal::api::{ enums::v1::{CommandType, EventType}, history::v1::{history_event, HistoryEvent}, }, }, protosext::HistoryInfo, + workflow::{DrivenWorkflow, WorkflowFetcher}, }; use slotmap::SlotMap; use std::{ @@ -333,7 +337,7 @@ impl WorkflowMachines { .map(|wt| wt.name.clone()) .unwrap_or_default(), workflow_id: self.workflow_id.clone(), - arguments: attrs.input.clone(), + arguments: Vec::from_payloads(attrs.input.clone()), randomness_seed: str_to_randomness_seed( &attrs.original_execution_run_id, ), @@ -360,7 +364,7 @@ impl WorkflowMachines { attrs, )) = &event.attributes { - self.drive_me.signal(attrs.clone()); + self.drive_me.signal(attrs.clone().into()); } else { // err } diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 223b1dbf9..39580156e 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -9,12 +9,38 @@ pub mod coresdk { //! Contains all protobufs relating to communication between core and lang-specific SDKs include!("coresdk.rs"); - use super::temporal::api::command::v1 as api_command; - use super::temporal::api::command::v1::Command as ApiCommand; - use super::temporal::api::enums::v1::WorkflowTaskFailedCause; + #[allow(clippy::module_inception)] + pub mod activity_task { + include!("coresdk.activity_task.rs"); + } + #[allow(clippy::module_inception)] + pub mod activity_result { + include!("coresdk.activity_result.rs"); + } + pub mod common { + include!("coresdk.common.rs"); + } + pub mod workflow_activation { + include!("coresdk.workflow_activation.rs"); + } + pub mod workflow_completion { + include!("coresdk.workflow_completion.rs"); + } + pub mod workflow_commands { + include!("coresdk.workflow_commands.rs"); + } + use super::temporal::api::failure::v1::Failure; - use crate::protos::temporal::api::common::v1::Payloads; - use command::Variant; + use crate::protos::coresdk::activity_result::ActivityResult; + use crate::protos::coresdk::common::{Payload, UserCodeFailure}; + use crate::protos::coresdk::workflow_activation::SignalWorkflow; + use crate::protos::temporal::api::common::v1::{Payloads, WorkflowExecution}; + use crate::protos::temporal::api::failure::v1::failure::FailureInfo; + use crate::protos::temporal::api::failure::v1::ApplicationFailureInfo; + use crate::protos::temporal::api::history::v1::WorkflowExecutionSignaledEventAttributes; + use workflow_activation::{wf_activation_job, WfActivation, WfActivationJob}; + use workflow_commands::{workflow_command, WorkflowCommand}; + use workflow_completion::{wf_activation_completion, WfActivationCompletion}; pub type HistoryEventId = i64; @@ -35,7 +61,7 @@ pub mod coresdk { } } /// Returns any contained jobs if this task was a wf activation and it had some - pub fn get_activity_variant(&self) -> Option { + pub fn get_activity_variant(&self) -> Option { if let Some(task::Variant::Activity(a)) = &self.variant { a.variant.clone() } else { @@ -59,27 +85,23 @@ pub mod coresdk { } } - impl From> for WfActivationSuccess { - fn from(v: Vec) -> Self { - WfActivationSuccess { - commands: v - .into_iter() - .map(|cmd| Command { - variant: Some(Variant::Api(cmd)), - }) - .collect(), - } + impl From> for workflow_completion::Success { + fn from(v: Vec) -> Self { + Self { commands: v } } } impl TaskCompletion { /// Build a successful completion from some api command attributes and a task token pub fn ok_from_api_attrs( - cmds: Vec, + cmds: Vec, task_token: Vec, ) -> Self { - let cmds: Vec = cmds.into_iter().map(Into::into).collect(); - let success: WfActivationSuccess = cmds.into(); + let cmds: Vec<_> = cmds + .into_iter() + .map(|c| WorkflowCommand { variant: Some(c) }) + .collect(); + let success: workflow_completion::Success = cmds.into(); TaskCompletion { task_token, variant: Some(task_completion::Variant::Workflow(WfActivationCompletion { @@ -88,24 +110,23 @@ pub mod coresdk { } } - pub fn ok_activity(result: Option, task_token: Vec) -> Self { + pub fn ok_activity(result: Vec, task_token: Vec) -> Self { TaskCompletion { task_token, variant: Some(task_completion::Variant::Activity(ActivityResult { - status: Some(activity_result::Status::Completed(ActivityTaskSuccess { - result, - })), + status: Some(activity_result::activity_result::Status::Completed( + activity_result::Success { result }, + )), })), } } - pub fn fail(task_token: Vec, cause: WorkflowTaskFailedCause, failure: Failure) -> Self { + pub fn fail(task_token: Vec, failure: UserCodeFailure) -> Self { Self { task_token, variant: Some(task_completion::Variant::Workflow(WfActivationCompletion { status: Some(wf_activation_completion::Status::Failed( - WfActivationFailure { - cause: cause as i32, + workflow_completion::Failure { failure: Some(failure), }, )), @@ -113,6 +134,85 @@ pub mod coresdk { } } } + + impl From for Failure { + fn from(f: UserCodeFailure) -> Self { + Self { + message: f.message, + source: f.source, + stack_trace: f.stack_trace, + cause: f.cause.map(|b| Box::new((*b).into())), + failure_info: Some(FailureInfo::ApplicationFailureInfo( + ApplicationFailureInfo { + r#type: f.r#type, + non_retryable: f.non_retryable, + details: None, + }, + )), + } + } + } + + impl From for super::temporal::api::common::v1::Payload { + fn from(p: Payload) -> Self { + Self { + metadata: p.metadata, + data: p.data, + } + } + } + + impl From for common::Payload { + fn from(p: super::temporal::api::common::v1::Payload) -> Self { + Self { + metadata: p.metadata, + data: p.data, + } + } + } + + pub trait PayloadsExt { + fn into_payloads(self) -> Option; + fn from_payloads(p: Option) -> Self; + } + + impl PayloadsExt for Vec { + fn into_payloads(self) -> Option { + if self.is_empty() { + None + } else { + Some(Payloads { + payloads: self.into_iter().map(Into::into).collect(), + }) + } + } + + fn from_payloads(p: Option) -> Self { + match p { + None => vec![], + Some(p) => p.payloads.into_iter().map(Into::into).collect(), + } + } + } + + impl From for SignalWorkflow { + fn from(a: WorkflowExecutionSignaledEventAttributes) -> Self { + Self { + signal_name: a.signal_name, + input: Vec::from_payloads(a.input), + identity: a.identity, + } + } + } + + impl From for common::WorkflowExecution { + fn from(w: WorkflowExecution) -> Self { + Self { + workflow_id: w.workflow_id, + run_id: w.run_id, + } + } + } } // No need to lint these @@ -124,9 +224,15 @@ pub mod temporal { pub mod command { pub mod v1 { include!("temporal.api.command.v1.rs"); - use crate::protos::coresdk::{activity_task, ActivityTask, StartActivity}; - use crate::protos::temporal::api::enums::v1::CommandType; - use crate::protos::temporal::api::workflowservice::v1::PollActivityTaskQueueResponse; + + use crate::protos::{ + coresdk::{ + activity_task, activity_task::ActivityTask, workflow_commands, PayloadsExt, + }, + temporal::api::common::v1::ActivityType, + temporal::api::enums::v1::CommandType, + temporal::api::workflowservice::v1::PollActivityTaskQueueResponse, + }; use command::Attributes; use std::fmt::{Display, Formatter}; @@ -166,23 +272,32 @@ pub mod temporal { fn from(r: PollActivityTaskQueueResponse) -> Self { ActivityTask { activity_id: r.activity_id, - variant: Some(activity_task::Variant::Start(StartActivity { - workflow_namespace: r.workflow_namespace, - workflow_type: r.workflow_type, - workflow_execution: r.workflow_execution, - activity_type: r.activity_type, - header: r.header, - input: r.input, - heartbeat_details: r.heartbeat_details, - scheduled_time: r.scheduled_time, - current_attempt_scheduled_time: r.current_attempt_scheduled_time, - started_time: r.started_time, - attempt: r.attempt, - schedule_to_close_timeout: r.schedule_to_close_timeout, - start_to_close_timeout: r.start_to_close_timeout, - heartbeat_timeout: r.heartbeat_timeout, - retry_policy: r.retry_policy, - })), + variant: Some(activity_task::activity_task::Variant::Start( + activity_task::Start { + workflow_namespace: r.workflow_namespace, + workflow_type: r + .workflow_type + .map(|wt| wt.name) + .unwrap_or("".to_string()), + workflow_execution: r.workflow_execution.map(Into::into), + activity_type: r + .activity_type + .map(|at| at.name) + .unwrap_or("".to_string()), + header_fields: r.header.map(Into::into).unwrap_or_default(), + input: Vec::from_payloads(r.input), + heartbeat_details: Vec::from_payloads(r.heartbeat_details), + scheduled_time: r.scheduled_time, + current_attempt_scheduled_time: r + .current_attempt_scheduled_time, + started_time: r.started_time, + attempt: r.attempt, + schedule_to_close_timeout: r.schedule_to_close_timeout, + start_to_close_timeout: r.start_to_close_timeout, + heartbeat_timeout: r.heartbeat_timeout, + retry_policy: r.retry_policy.map(Into::into), + }, + )), } } } @@ -194,6 +309,65 @@ pub mod temporal { write!(f, "{:?}", ct) } } + + impl From for command::Attributes { + fn from(s: workflow_commands::StartTimer) -> Self { + Self::StartTimerCommandAttributes(StartTimerCommandAttributes { + timer_id: s.timer_id, + start_to_fire_timeout: s.start_to_fire_timeout, + }) + } + } + + impl From for command::Attributes { + fn from(s: workflow_commands::CancelTimer) -> Self { + Self::CancelTimerCommandAttributes(CancelTimerCommandAttributes { + timer_id: s.timer_id, + }) + } + } + + impl From for command::Attributes { + fn from(s: workflow_commands::ScheduleActivity) -> Self { + Self::ScheduleActivityTaskCommandAttributes( + ScheduleActivityTaskCommandAttributes { + activity_id: s.activity_id, + activity_type: Some(ActivityType { + name: s.activity_type, + }), + namespace: s.namespace, + task_queue: Some(s.task_queue.into()), + header: Some(s.header_fields.into()), + input: s.arguments.into_payloads(), + schedule_to_close_timeout: s.schedule_to_close_timeout, + schedule_to_start_timeout: s.schedule_to_start_timeout, + start_to_close_timeout: s.start_to_close_timeout, + heartbeat_timeout: s.heartbeat_timeout, + retry_policy: s.retry_policy.map(Into::into), + }, + ) + } + } + + impl From for command::Attributes { + fn from(c: workflow_commands::CompleteWorkflowExecution) -> Self { + Self::CompleteWorkflowExecutionCommandAttributes( + CompleteWorkflowExecutionCommandAttributes { + result: c.result.into_payloads(), + }, + ) + } + } + + impl From for command::Attributes { + fn from(c: workflow_commands::FailWorkflowExecution) -> Self { + Self::FailWorkflowExecutionCommandAttributes( + FailWorkflowExecutionCommandAttributes { + failure: c.failure.map(Into::into), + }, + ) + } + } } } pub mod enums { @@ -213,7 +387,47 @@ pub mod temporal { } pub mod common { pub mod v1 { + use crate::protos::coresdk::common; + use std::collections::HashMap; include!("temporal.api.common.v1.rs"); + + impl From> for Header { + fn from(h: HashMap) -> Self { + Self { + fields: h.into_iter().map(|(k, v)| (k, v.into())).collect(), + } + } + } + + impl From
for HashMap { + fn from(h: Header) -> Self { + h.fields.into_iter().map(|(k, v)| (k, v.into())).collect() + } + } + + impl From for RetryPolicy { + fn from(r: common::RetryPolicy) -> Self { + Self { + initial_interval: r.initial_interval, + backoff_coefficient: r.backoff_coefficient, + maximum_interval: r.maximum_interval, + maximum_attempts: r.maximum_attempts, + non_retryable_error_types: r.non_retryable_error_types, + } + } + } + + impl From for common::RetryPolicy { + fn from(r: RetryPolicy) -> Self { + Self { + initial_interval: r.initial_interval, + backoff_coefficient: r.backoff_coefficient, + maximum_interval: r.maximum_interval, + maximum_attempts: r.maximum_attempts, + non_retryable_error_types: r.non_retryable_error_types, + } + } + } } } pub mod history { @@ -391,7 +605,17 @@ pub mod temporal { pub mod taskqueue { pub mod v1 { + use crate::protos::temporal::api::enums::v1::TaskQueueKind; include!("temporal.api.taskqueue.v1.rs"); + + impl From for TaskQueue { + fn from(name: String) -> Self { + Self { + name, + kind: TaskQueueKind::Normal as i32, + } + } + } } } diff --git a/src/workflow/driven_workflow.rs b/src/workflow/driven_workflow.rs index 4decbee1e..978c0b0f5 100644 --- a/src/workflow/driven_workflow.rs +++ b/src/workflow/driven_workflow.rs @@ -1,11 +1,10 @@ -use crate::protos::coresdk::SignalWorkflow; use crate::{ machines::WFCommand, - protos::coresdk::wf_activation_job, - protos::coresdk::WfActivationJob, - protos::temporal::api::history::v1::{ - WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes, - WorkflowExecutionStartedEventAttributes, + protos::{ + coresdk::workflow_activation::{wf_activation_job, SignalWorkflow, WfActivationJob}, + temporal::api::history::v1::{ + WorkflowExecutionCanceledEventAttributes, WorkflowExecutionStartedEventAttributes, + }, }, }; use std::collections::VecDeque; @@ -54,10 +53,8 @@ impl DrivenWorkflow { } /// Signal the workflow - pub fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes) { - self.send_job(wf_activation_job::Variant::SignalWorkflow(SignalWorkflow { - signal: Some(attribs), - })) + pub fn signal(&mut self, signal: SignalWorkflow) { + self.send_job(wf_activation_job::Variant::SignalWorkflow(signal)) } /// Cancel the workflow diff --git a/src/workflow/mod.rs b/src/workflow/mod.rs index 67413bbc3..ca57345e7 100644 --- a/src/workflow/mod.rs +++ b/src/workflow/mod.rs @@ -9,7 +9,7 @@ pub(crate) use driven_workflow::{ActivationListener, DrivenWorkflow, WorkflowFet use crate::{ machines::{ProtoCommand, WFCommand, WorkflowMachines}, protos::{ - coresdk::WfActivation, + coresdk::workflow_activation::WfActivation, temporal::api::{history::v1::History, workflowservice::v1::PollWorkflowTaskQueueResponse}, }, protosext::HistoryInfo, diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index 7ea3726cf..71065dfbd 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -11,23 +11,19 @@ use std::{ }, time::Duration, }; -use temporal_sdk_core::protos::temporal::api::command::v1::ScheduleActivityTaskCommandAttributes; -use temporal_sdk_core::protos::temporal::api::common::v1::{ActivityType, Payload, Payloads}; -use temporal_sdk_core::protos::temporal::api::taskqueue::v1::TaskQueue; use temporal_sdk_core::{ - protos::{ - coresdk::{ - activity_result, activity_task, wf_activation_job, ActivityResult, ActivityTaskSuccess, - FireTimer, ResolveActivity, StartWorkflow, Task, TaskCompletion, WfActivationJob, + protos::coresdk::{ + activity_result::{self, activity_result as act_res, ActivityResult}, + activity_task::activity_task as act_task, + common::{Payload, UserCodeFailure}, + workflow_activation::{ + wf_activation_job, FireTimer, ResolveActivity, StartWorkflow, WfActivationJob, }, - temporal::api::{ - command::v1::{ - CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, - FailWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, - }, - enums::v1::WorkflowTaskFailedCause, - failure::v1::Failure, + workflow_commands::{ + CancelTimer, CompleteWorkflowExecution, FailWorkflowExecution, ScheduleActivity, + StartTimer, }, + Task, TaskCompletion, }, Core, CoreError, CoreInitOptions, ServerGatewayApis, ServerGatewayOptions, Url, }; @@ -84,8 +80,7 @@ fn get_integ_server_options() -> ServerGatewayOptions { fn get_integ_core() -> impl Core { let gateway_opts = get_integ_server_options(); - let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); - core + temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap() } #[test] @@ -98,10 +93,9 @@ fn timer_workflow() { let timer_id: String = rng.gen::().to_string(); let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { - timer_id: timer_id.to_string(), + vec![StartTimer { + timer_id, start_to_fire_timeout: Some(Duration::from_secs(1).into()), - ..Default::default() } .into()], task.task_token, @@ -109,7 +103,7 @@ fn timer_workflow() { .unwrap(); let task = dbg!(core.poll_task(task_q).unwrap()); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task.task_token, )) .unwrap(); @@ -126,16 +120,11 @@ fn activity_workflow() { let activity_id: String = rng.gen::().to_string(); let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![ScheduleActivityTaskCommandAttributes { + vec![ScheduleActivity { activity_id: activity_id.to_string(), - activity_type: Some(ActivityType { - name: "test_activity".to_string(), - }), + activity_type: "test_activity".to_string(), namespace: NAMESPACE.to_owned(), - task_queue: Some(TaskQueue { - name: task_q.to_owned(), - kind: 1, - }), + task_queue: task_q.to_owned(), schedule_to_start_timeout: Some(Duration::from_secs(30).into()), start_to_close_timeout: Some(Duration::from_secs(30).into()), schedule_to_close_timeout: Some(Duration::from_secs(60).into()), @@ -149,20 +138,16 @@ fn activity_workflow() { let task = dbg!(core.poll_activity_task(task_q).unwrap()); assert_matches!( task.get_activity_variant(), - Some(activity_task::Variant::Start(start_activity)) => { - assert_eq!(start_activity.activity_type, Some(ActivityType { - name: "test_activity".to_string(), - })) + Some(act_task::Variant::Start(start_activity)) => { + assert_eq!(start_activity.activity_type, "test_activity".to_string()) } ); let response_payloads = vec![Payload { - metadata: Default::default(), data: b"hello ".to_vec(), + metadata: Default::default(), }]; core.complete_activity_task(TaskCompletion::ok_activity( - Some(Payloads { - payloads: response_payloads.clone(), - }), + response_payloads.clone(), task.task_token, )) .unwrap(); @@ -173,16 +158,16 @@ fn activity_workflow() { WfActivationJob { variant: Some(wf_activation_job::Variant::ResolveActivity( ResolveActivity {activity_id: a_id, result: Some(ActivityResult{ - status: Some(activity_result::Status::Completed(ActivityTaskSuccess{result: Some(r)}))})} + status: Some(act_res::Status::Completed(activity_result::Success{result: r}))})} )), }, ] => { assert_eq!(a_id, &activity_id); - assert_eq!(r, &Payloads{ payloads: response_payloads.clone()}); + assert_eq!(r, &response_payloads); } ); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task.task_token, )) .unwrap() @@ -200,16 +185,14 @@ fn parallel_timer_workflow() { let task = dbg!(core.poll_task(task_q).unwrap()); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - StartTimerCommandAttributes { + StartTimer { timer_id: timer_id.clone(), start_to_fire_timeout: Some(Duration::from_millis(50).into()), - ..Default::default() } .into(), - StartTimerCommandAttributes { + StartTimer { timer_id: timer_2_id.clone(), start_to_fire_timeout: Some(Duration::from_millis(100).into()), - ..Default::default() } .into(), ], @@ -239,7 +222,7 @@ fn parallel_timer_workflow() { } ); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task.task_token, )) .unwrap(); @@ -262,16 +245,14 @@ fn timer_cancel_workflow() { let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - StartTimerCommandAttributes { + StartTimer { timer_id: timer_id.to_string(), start_to_fire_timeout: Some(Duration::from_millis(50).into()), - ..Default::default() } .into(), - StartTimerCommandAttributes { + StartTimer { timer_id: cancel_timer_id.to_string(), start_to_fire_timeout: Some(Duration::from_secs(10).into()), - ..Default::default() } .into(), ], @@ -281,11 +262,11 @@ fn timer_cancel_workflow() { let task = dbg!(core.poll_task(task_q).unwrap()); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - CancelTimerCommandAttributes { + CancelTimer { timer_id: cancel_timer_id.to_string(), } .into(), - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + CompleteWorkflowExecution { result: vec![] }.into(), ], task.task_token, )) @@ -303,17 +284,16 @@ fn timer_immediate_cancel_workflow() { let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( vec![ - StartTimerCommandAttributes { + StartTimer { timer_id: cancel_timer_id.to_string(), ..Default::default() } .into(), - CancelTimerCommandAttributes { + CancelTimer { timer_id: cancel_timer_id.to_string(), - ..Default::default() } .into(), - CompleteWorkflowExecutionCommandAttributes { result: None }.into(), + CompleteWorkflowExecution { result: vec![] }.into(), ], task.task_token, )) @@ -346,10 +326,9 @@ fn parallel_workflows_same_queue() { }] => assert_eq!(&workflow_type, &"wf-type-1") ); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: "timer".to_string(), start_to_fire_timeout: Some(Duration::from_secs(1).into()), - ..Default::default() } .into()], task.task_token, @@ -357,7 +336,7 @@ fn parallel_workflows_same_queue() { .unwrap(); let task = task_chan.recv().unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task.task_token, )) .unwrap(); @@ -417,10 +396,9 @@ fn fail_wf_task() { // Start with a timer let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: "best-timer".to_string(), start_to_fire_timeout: Some(Duration::from_millis(200).into()), - ..Default::default() } .into()], task.task_token, @@ -434,8 +412,7 @@ fn fail_wf_task() { let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::fail( task.task_token, - WorkflowTaskFailedCause::WorkflowWorkerUnhandledFailure, - Failure { + UserCodeFailure { message: "I did an oopsie".to_string(), ..Default::default() }, @@ -446,10 +423,9 @@ fn fail_wf_task() { // to poll a couple of times as there will be more than one required workflow activation. let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: "best-timer".to_string(), start_to_fire_timeout: Some(Duration::from_millis(200).into()), - ..Default::default() } .into()], task.task_token, @@ -457,7 +433,7 @@ fn fail_wf_task() { .unwrap(); let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task.task_token, )) .unwrap(); @@ -473,10 +449,9 @@ fn fail_workflow_execution() { let timer_id: String = rng.gen::().to_string(); let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { - timer_id: timer_id.to_string(), + vec![StartTimer { + timer_id, start_to_fire_timeout: Some(Duration::from_secs(1).into()), - ..Default::default() } .into()], task.task_token, @@ -484,8 +459,8 @@ fn fail_workflow_execution() { .unwrap(); let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![FailWorkflowExecutionCommandAttributes { - failure: Some(Failure { + vec![FailWorkflowExecution { + failure: Some(UserCodeFailure { message: "I'm ded".to_string(), ..Default::default() }), @@ -547,7 +522,7 @@ fn signal_workflow() { ] ); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], res.task_token, )) .unwrap(); @@ -565,13 +540,12 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { let res = core.poll_task(task_q).unwrap(); // Task is completed with a timer core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![StartTimerCommandAttributes { + vec![StartTimer { timer_id: "sometimer".to_string(), start_to_fire_timeout: Some(Duration::from_millis(10).into()), - ..Default::default() } .into()], - res.task_token.clone(), + res.task_token, )) .unwrap(); @@ -600,7 +574,7 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { // Send completion - not having seen a poll response with a signal in it yet let unhandled = core .complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], task_token, )) .unwrap_err(); @@ -615,7 +589,7 @@ fn signal_workflow_signal_not_handled_on_workflow_completion() { }] ); core.complete_task(TaskCompletion::ok_from_api_attrs( - vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + vec![CompleteWorkflowExecution { result: vec![] }.into()], res.task_token, )) .unwrap();