diff --git a/protos/local/activity_result.proto b/protos/local/activity_result.proto new file mode 100644 index 000000000..8e99a1e89 --- /dev/null +++ b/protos/local/activity_result.proto @@ -0,0 +1,29 @@ +syntax = "proto3"; + +package coresdk.activity_result; + +import "temporal/api/failure/v1/message.proto"; +import "temporal/api/common/v1/message.proto"; + +/// Used in ActivityResult to report cancellation +message Cancellation { +} + +/// Used in ActivityResult to report successful completion +message Success { + temporal.api.common.v1.Payload result = 1; +} + +/// Used in ActivityResult to report failure +message Failure { + temporal.api.failure.v1.Failure failure = 1; +} + +/// Used to report activity completion to core and to resolve the activity in a workflow activation +message ActivityResult { + oneof status { + Success successful = 1; + Cancellation cancelled = 2; + Failure failed = 3; + } +} diff --git a/protos/local/activity_task.proto b/protos/local/activity_task.proto new file mode 100644 index 000000000..c2f8fad61 --- /dev/null +++ b/protos/local/activity_task.proto @@ -0,0 +1,51 @@ +/** + * Definitions of the different activity tasks returned from [crate::Core::poll_task]. + */ + +syntax = "proto3"; + +package coresdk.activity_task; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "dependencies/gogoproto/gogo.proto"; +import "temporal/api/common/v1/message.proto"; +import "activity_result.proto"; +import "timer_result.proto"; + +// TODO: docstrings everywhere +message StartActivity { + string workflow_namespace = 1; + temporal.api.common.v1.WorkflowType workflow_type = 2; + temporal.api.common.v1.WorkflowExecution workflow_execution = 3; + temporal.api.common.v1.ActivityType activity_type = 4; + temporal.api.common.v1.Header header = 5; + temporal.api.common.v1.Payloads input = 6; + temporal.api.common.v1.Payloads heartbeat_details = 7; + google.protobuf.Timestamp scheduled_time = 8 [(gogoproto.stdtime) = true]; + google.protobuf.Timestamp current_attempt_scheduled_time = 9 [(gogoproto.stdtime) = true]; + google.protobuf.Timestamp started_time = 10 [(gogoproto.stdtime) = true]; + int32 attempt = 11; + google.protobuf.Duration schedule_to_close_timeout = 12 [(gogoproto.stdduration) = true]; + google.protobuf.Duration start_to_close_timeout = 13 [(gogoproto.stdduration) = true]; + google.protobuf.Duration heartbeat_timeout = 14 [(gogoproto.stdduration) = true]; + // This is an actual retry policy the service uses. + // It can be different from the one provided (or not) during activity scheduling + // as the service can override the provided one in case its values are not specified + // or exceed configured system limits. + temporal.api.common.v1.RetryPolicy retry_policy = 15; +} + +/// Request to cancel an activity from a workflow +message CancelActivity { +} + +message ActivityTask { + string activity_id = 1; + oneof variant { + // Start activity execution. + StartActivity start = 2; + // Attempt to cancel activity execution. + CancelActivity cancel = 3; + } +} diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 0880ad626..403211ca8 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -1,3 +1,7 @@ +/** + * Core<->Lang request / response definitions + */ + syntax = "proto3"; package coresdk; @@ -6,245 +10,46 @@ package coresdk; // the include paths. You can make it work by going to the "Protobuf Support" settings section // and adding the "api_upstream" subdir as an include path. -import "google/protobuf/timestamp.proto"; -import "google/protobuf/duration.proto"; -import "google/protobuf/empty.proto"; -import "dependencies/gogoproto/gogo.proto"; -import "temporal/api/workflowservice/v1/request_response.proto"; -import "temporal/api/taskqueue/v1/message.proto"; -import "temporal/api/enums/v1/failed_cause.proto"; -import "temporal/api/failure/v1/message.proto"; -import "temporal/api/history/v1/message.proto"; import "temporal/api/common/v1/message.proto"; -import "temporal/api/command/v1/message.proto"; -import "temporal/api/query/v1/message.proto"; +import "workflow_activation.proto"; +import "workflow_activation_completion.proto"; +import "activity_task.proto"; +import "activity_result.proto"; -// A request as given to [crate::Core::poll_task] +/// A request as given to [crate::Core::poll_task] message PollTaskReq { - // What type of task to poll for - enum TaskType { - // Poll for workflows - WORKFLOWS = 0; - // Poll for activities - ACTIVITIES = 1; - } - - // A list of task types to poll for - repeated TaskType types = 1; + bool workflows = 1; + bool activities = 2; } -// An instruction to perform work from core->lang sdk +/// An instruction to perform work from core->lang SDK, returned from [crate::Core::poll_task] message Task { // A unique identifier for this task bytes task_token = 1; // The type of task to be performed oneof variant { // Wake up a workflow - WFActivation workflow = 2; - // Run an activity - ActivityTask activity = 3; - } -} - -// An instruction to the lang sdk to run some workflow code, whether for the first time or from -// a cached state. -message WFActivation { - // The current time as understood by the workflow, which is set by workflow task started events - google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; - // The id of the currently active run of the workflow - string run_id = 2; - // The things to do upon activating the workflow - repeated WFActivationJob jobs = 3; -} - -message WFActivationJob { - oneof variant { - // Begin a workflow for the first time - StartWorkflow start_workflow = 1; - // A timer has fired, allowing whatever was waiting on it (if anything) to proceed - FireTimer fire_timer = 2; - // A timer was canceled, and needs to be unblocked on the lang side. - CancelTimer cancel_timer = 3; - // Workflow was reset. The randomness seed must be updated. - UpdateRandomSeed update_random_seed = 4; - // A request to query the workflow was received. - QueryWorkflow query_workflow = 5; - // A request to cancel the workflow was received. - CancelWorkflow cancel_workflow = 6; - // A request to signal the workflow was received. - SignalWorkflow signal_workflow = 7; - // An activity was resolved with, result could be completed, failed or cancelled - ResolveActivity resolve_activity = 8; - } -} - -message StartWorkflow { - // The identifier the lang-specific sdk uses to execute workflow code - string workflow_type = 1; - // The workflow id used on the temporal server - string workflow_id = 2; - // Input to the workflow code - temporal.api.common.v1.Payloads arguments = 3; - // The seed must be used to initialize the random generator used by SDK. - // RandomSeedUpdatedAttributes are used to deliver seed updates. - uint64 randomness_seed = 4; - - // TODO: Do we need namespace here, or should that just be fetchable easily? - // will be others - workflow exe started attrs, etc -} - -message FireTimer { - string timer_id = 1; -} - -message ResolveActivity { - string activity_id = 1; - ActivityResult result = 2; -} - -message CancelTimer { - string timer_id = 1; -} - -message UpdateRandomSeed { - uint64 randomness_seed = 1; -} - -message QueryWorkflow { - temporal.api.query.v1.WorkflowQuery query = 1; -} - -message CancelWorkflow { - // TODO: add attributes here -} - -message SignalWorkflow { - // The signal information from the workflow's history - temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes signal = 1; -} - -message StartActivity { - string workflow_namespace = 1; - temporal.api.common.v1.WorkflowType workflow_type = 2; - temporal.api.common.v1.WorkflowExecution workflow_execution = 3; - temporal.api.common.v1.ActivityType activity_type = 4; - temporal.api.common.v1.Header header = 5; - temporal.api.common.v1.Payloads input = 6; - temporal.api.common.v1.Payloads heartbeat_details = 7; - google.protobuf.Timestamp scheduled_time = 8 [(gogoproto.stdtime) = true]; - google.protobuf.Timestamp current_attempt_scheduled_time = 9 [(gogoproto.stdtime) = true]; - google.protobuf.Timestamp started_time = 10 [(gogoproto.stdtime) = true]; - int32 attempt = 11; - // (-- api-linter: core::0140::prepositions=disabled - // aip.dev/not-precedent: "to" is used to indicate interval. --) - google.protobuf.Duration schedule_to_close_timeout = 12 [(gogoproto.stdduration) = true]; - // (-- api-linter: core::0140::prepositions=disabled - // aip.dev/not-precedent: "to" is used to indicate interval. --) - google.protobuf.Duration start_to_close_timeout = 13 [(gogoproto.stdduration) = true]; - google.protobuf.Duration heartbeat_timeout = 14 [(gogoproto.stdduration) = true]; - // This is an actual retry policy the service uses. - // It can be different from the one provided (or not) during activity scheduling - // as the service can override the provided one in case its values are not specified - // or exceed configured system limits. - temporal.api.common.v1.RetryPolicy retry_policy = 15; -} - -message CancelActivity { - // TODO: add attributes -} - -message ActivityTask { - string activity_id = 1; - oneof job { - // Start activity execution. - StartActivity start = 2; - // Attempt to cancel activity execution. - CancelActivity cancel = 3; + coresdk.workflow_activation.WFActivation workflow = 2; + // Start or cancel an activity + coresdk.activity_task.ActivityTask activity = 3; } } -// Sent from lang side to core when calling [crate::Core::complete_task] +/// Sent from lang side to core when calling [crate::Core::complete_task] message TaskCompletion { // The id from the [Task] being completed bytes task_token = 1; oneof variant { - // Complete a workflow task - WFActivationCompletion workflow = 2; - // Complete an activity task - ActivityResult activity = 3; - } -} - -message WFActivationCompletion { - oneof status { - WFActivationSuccess successful = 1; - WFActivationFailure failed = 2; - } -} - -/// Used to report activity completion to core and to resolve the activity in a workflow activtion -message ActivityResult { - oneof status { - ActivityTaskSuccess completed = 1; - ActivityTaskCancelation canceled = 2; - ActivityTaskFailure failed = 3; - } -} - -/// Request cancellation of an activity from a workflow -message RequestActivityCancellation { - string activity_id = 1; - string reason = 2; -} - -message CoreCommand { - oneof variant { - temporal.api.query.v1.WorkflowQueryResult respond_to_query = 1; - RequestActivityCancellation request_activity_cancellation = 2; + /// Complete a workflow task + coresdk.workflow_activation_completion.WFActivationCompletion workflow = 2; + /// Complete an activity task + coresdk.activity_result.ActivityResult activity = 3; } } -// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next -message Command { - oneof variant { - temporal.api.command.v1.Command api = 1; - CoreCommand core = 2; - } -} - -message WFActivationSuccess { - // A list of commands to send back to the temporal server - repeated Command commands = 1; - - // Other bits from RespondWorkflowTaskCompletedRequest as needed -} - -message WFActivationFailure { - temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; - temporal.api.failure.v1.Failure failure = 2; - - // Other bits from RespondWorkflowTaskFailedRequest as needed -} - -/// Used in ActivityResult to report cancellation -message ActivityTaskCancelation { -} - -/// Used in ActivityResult to report successful completion -message ActivityTaskSuccess { - temporal.api.common.v1.Payloads result = 1; - // Other bits from RespondActivityTaskCompletedRequest as needed -} - -/// Used in ActivityResult to report failure -message ActivityTaskFailure { - temporal.api.failure.v1.Failure failure = 1; - // Other bits from RespondActivityTaskFailedRequest as needed -} - -// A request as given to [crate::Core::send_activity_heartbeat] +/// A request as given to [crate::Core::send_activity_heartbeat] message ActivityHeartbeat { string activity_id = 1; - temporal.api.common.v1.Payloads details = 2; + repeated temporal.api.common.v1.Payload details = 2; } diff --git a/protos/local/timer_result.proto b/protos/local/timer_result.proto new file mode 100644 index 000000000..509051fc5 --- /dev/null +++ b/protos/local/timer_result.proto @@ -0,0 +1,19 @@ +syntax = "proto3"; + +package coresdk.timer_result; + +/// Used in TimerResult to report cancellation +message Cancellation { +} + +/// Used in TimerResult to report successful completion +message Success { +} + +/// Used to resolve a timer in a workflow activation +message TimerResult { + oneof status { + Success successful = 1; + Cancellation cancelled = 2; + } +} diff --git a/protos/local/workflow_activation.proto b/protos/local/workflow_activation.proto new file mode 100644 index 000000000..442f96177 --- /dev/null +++ b/protos/local/workflow_activation.proto @@ -0,0 +1,95 @@ +/** + * Definitions of the different workflow activation jobs returned from [crate::Core::poll_task]. + * The lang SDK applies these activation jobs to drives workflows. + */ + +syntax = "proto3"; + +package coresdk.workflow_activation; + +import "google/protobuf/timestamp.proto"; +import "dependencies/gogoproto/gogo.proto"; +import "temporal/api/common/v1/message.proto"; +import "activity_result.proto"; +import "timer_result.proto"; + +/// Start a new workflow +message StartWorkflow { + /// The identifier the lang-specific sdk uses to locate which workflow to execute + string workflow_type = 1; + /// The workflow id used on the temporal server + string workflow_id = 2; + /// Input to the workflow code - apply to the workflow method + repeated temporal.api.common.v1.Payload arguments = 3; + /// The seed must be used to initialize the random generator used by SDK. + /// UpdateRandomSeed is used to deliver seed updates. + uint64 randomness_seed = 4; + + // TODO: Do we need namespace here, or should that just be fetchable easily? + // will be others - workflow exe started attrs, etc +} + +/// Cancel a running workflow +message CancelWorkflow { +} + +/// Send a signal to a workflow +message SignalWorkflow { + string name = 1; + repeated temporal.api.common.v1.Payload arguments = 2; +} + +/// Query a workflow +message QueryWorkflow { + string name = 1; + repeated temporal.api.common.v1.Payload arguments = 2; +} + + +/// Notify a workflow that a timer has been resolved +message ResolveTimer { + string timer_id = 1; + coresdk.timer_result.TimerResult result = 2; +} + +/// Notify a workflow that an activity has been resolved +message ResolveActivity { + string activity_id = 1; + coresdk.activity_result.ActivityResult result = 2; +} + +/// Update the workflow's random seed +message UpdateRandomSeed { + uint64 randomness_seed = 1; +} + +/// An instruction to the lang sdk to run some workflow code, whether for the first time or from +/// a cached state. +message WFActivation { + /// The current time as understood by the workflow, which is set by workflow task started events + google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; + /// The id of the currently active run of the workflow + string run_id = 2; + /// The things to do upon activating the workflow + repeated WFActivationJob jobs = 3; +} + +message WFActivationJob { + oneof variant { + /// Begin a workflow for the first time + StartWorkflow start_workflow = 1; + /// A timer has fired or was cancelled, allowing whatever was waiting on it (if anything) to proceed + ResolveTimer resolve_timer = 2; + /// An activity was resolved with, result could be completed, failed or cancelled + ResolveActivity resolve_activity = 3; + /// Workflow was reset. The randomness seed must be updated + UpdateRandomSeed update_random_seed = 4; + /// A request to cancel the workflow was received + CancelWorkflow cancel_workflow = 5; + /// A request to query the workflow was received + QueryWorkflow query_workflow = 6; + /// A request to signal the workflow was received + SignalWorkflow signal_workflow = 7; + } +} + diff --git a/protos/local/workflow_activation_completion.proto b/protos/local/workflow_activation_completion.proto new file mode 100644 index 000000000..eb4c0f66f --- /dev/null +++ b/protos/local/workflow_activation_completion.proto @@ -0,0 +1,32 @@ +syntax = "proto3"; + +package coresdk.workflow_activation_completion; + +import "temporal/api/enums/v1/failed_cause.proto"; +import "temporal/api/failure/v1/message.proto"; +import "workflow_command.proto"; + +/// Result of a single workflow activation, reported from lang to core +message WFActivationCompletion { + oneof status { + Success successful = 1; + Failure failed = 2; + } +} + +/// Successful workflow activation with a list of commands generated by the workflow execution +message Success { + // A list of commands to send back to the temporal server + repeated coresdk.workflow_command.WorkflowCommand commands = 1; + + // Other bits from RespondWorkflowTaskCompletedRequest as needed +} + +/// Failure to activate a workflow +message Failure { + // TODO: port these protos + temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; + temporal.api.failure.v1.Failure failure = 2; + + // Other bits from RespondWorkflowTaskFailedRequest as needed +} diff --git a/protos/local/workflow_command.proto b/protos/local/workflow_command.proto new file mode 100644 index 000000000..24536cbf9 --- /dev/null +++ b/protos/local/workflow_command.proto @@ -0,0 +1,31 @@ +/** + * Definitions for commands from a workflow in lang SDK to core. + * While a workflow processes a batch of activiation jobs, + * it accumulates these commands to be sent back to core to conclude an activation. + */ + +syntax = "proto3"; + +package coresdk.workflow_command; + +import "google/protobuf/timestamp.proto"; +import "google/protobuf/duration.proto"; +import "temporal/api/common/v1/message.proto"; +import "temporal/api/query/v1/message.proto"; +import "activity_result.proto"; +import "timer_result.proto"; + +/// Request cancellation of an activity from a workflow +message RequestActivityCancellation { + string activity_id = 1; + string reason = 2; +} + +// Included in successful [WfActivationCompletion]s, indicates what the workflow wishes to do next +message WorkflowCommand { + oneof variant { + temporal.api.query.v1.WorkflowQueryResult respond_to_query = 1; + RequestActivityCancellation request_activity_cancellation = 2; + // TODO: add more, port API commands + } +}