diff --git a/build.rs b/build.rs index b473d3cfa..05d28921b 100644 --- a/build.rs +++ b/build.rs @@ -15,11 +15,11 @@ fn main() -> Result<(), Box> { "#[derive(::derive_more::From)]", ) .type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]") - .type_attribute("coresdk.SDKWFTask.task", "#[derive(::derive_more::From)]") .type_attribute( - "coresdk.PollSDKTaskResp.task", + "coresdk.WorkflowTask.attributes", "#[derive(::derive_more::From)]", ) + .type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]") .compile( &[ "protos/local/core_interface.proto", diff --git a/protos/local/core_interface.proto b/protos/local/core_interface.proto index 3b68fa1b2..6548ec66f 100644 --- a/protos/local/core_interface.proto +++ b/protos/local/core_interface.proto @@ -16,7 +16,6 @@ import "temporal/api/failure/v1/message.proto"; import "temporal/api/common/v1/message.proto"; import "temporal/api/command/v1/message.proto"; -// TODO: Use the SDK prefix? message WorkflowIdentifier { string namespace = 1; string name = 2; @@ -32,19 +31,17 @@ message RegistrationReq { repeated ActivityIdentifier activities = 2; } -// TODO: SDK prefix in front of everything is maybe pointless given it's all within this package - -service CoreSDKService { - rpc PollSDKTask (google.protobuf.Empty) returns (PollSDKTaskResp) {} - rpc CompleteSDKTask (CompleteSDKTaskReq) returns (google.protobuf.Empty) {} +service Core { + rpc PollTask (google.protobuf.Empty) returns (Task) {} + rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {} rpc RegisterImplementations (RegistrationReq) returns (google.protobuf.Empty) {} } -message PollSDKTaskResp { +message Task { bytes task_token = 1; - oneof task { - SDKWFTask wf_task = 2; - SDKActivityTask activity_task = 3; + oneof variant { + WorkflowTask workflow = 2; + ActivityTask activity = 3; } } @@ -54,75 +51,75 @@ message StartWorkflowTaskAttributes { temporal.api.common.v1.Payloads arguments = 3; } -message UnblockTimerTaskAttibutes { +message TriggerTimerTaskAttributes { string timer_id = 1; } -message SDKWFTask { +message WorkflowTask { google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true]; string workflow_id = 2; - oneof task { + oneof attributes { StartWorkflowTaskAttributes start_workflow = 3; - UnblockTimerTaskAttibutes unblock_timer = 4; + TriggerTimerTaskAttributes trigger_timer = 4; } } -message SDKActivityTask { +message ActivityTask { // Original task from temporal service temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1; } -message CompleteSDKTaskReq { +message CompleteTaskReq { bytes task_token = 1; oneof completion { - SDKWFTaskCompletion workflow = 2; - SDKActivityTaskCompletion activity = 3; + WorkflowTaskCompletion workflow = 2; + ActivityTaskCompletion activity = 3; } } -message SDKWFTaskCompletion { +message WorkflowTaskCompletion { oneof status { - SDKWFTaskSuccess successful = 1; - SDKWFTaskFailure failed = 2; + WorkflowTaskSuccess successful = 1; + WorkflowTaskFailure failed = 2; } } -message SDKActivityTaskCompletion { +message ActivityTaskCompletion { oneof status { - SDKActivityTaskSuccess successful = 1; - SDKActivityTaskFailure failed = 2; + ActivityTaskSuccess successful = 1; + ActivityTaskFailure failed = 2; } } -message SDKCommand { - // Reserved for SDK specific commands +message CoreCommand { + // Reserved for specific commands } message Command { oneof variant { temporal.api.command.v1.Command api = 1; - SDKCommand sdk = 2; + CoreCommand core = 2; } } -message SDKWFTaskSuccess { +message WorkflowTaskSuccess { repeated Command commands = 1; // Other bits from RespondWorkflowTaskCompletedRequest as needed } -message SDKWFTaskFailure { +message WorkflowTaskFailure { temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1; temporal.api.failure.v1.Failure failure = 2; // Other bits from RespondWorkflowTaskFailedRequest as needed } -message SDKActivityTaskSuccess { +message ActivityTaskSuccess { temporal.api.common.v1.Payloads result = 1; // Other bits from RespondActivityTaskCompletedRequest as needed } -message SDKActivityTaskFailure { +message ActivityTaskFailure { temporal.api.failure.v1.Failure failure = 1; // Other bits from RespondActivityTaskFailedRequest as needed } diff --git a/src/lib.rs b/src/lib.rs index b9db375c1..53726438c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,8 +9,8 @@ use crate::{ machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines}, protos::{ coresdk::{ - complete_sdk_task_req::Completion, sdkwf_task_completion::Status, CompleteSdkTaskReq, - PollSdkTaskResp, RegistrationReq, SdkwfTaskCompletion, + complete_task_req::Completion, workflow_task_completion::Status, CompleteTaskReq, + RegistrationReq, Task, WorkflowTaskCompletion, }, temporal::api::{ common::v1::WorkflowExecution, @@ -29,27 +29,27 @@ use std::{ sync::mpsc::{self, Receiver, SendError, Sender}, }; -pub type Result = std::result::Result; +pub type Result = std::result::Result; // TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have // any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can // be accessed from many threads, but each workflow is pinned to one thread (possibly with many // sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should // be both, ideally. -pub trait CoreSDKService { - fn poll_sdk_task(&self) -> Result; - fn complete_sdk_task(&self, req: CompleteSdkTaskReq) -> Result<()>; +pub trait Core { + fn poll_task(&self) -> Result; + fn complete_task(&self, req: CompleteTaskReq) -> Result<()>; fn register_implementations(&self, req: RegistrationReq) -> Result<()>; } -pub struct CoreSDKInitOptions { +pub struct CoreInitOptions { _queue_name: String, _max_concurrent_workflow_executions: u32, _max_concurrent_activity_executions: u32, } -pub fn init_sdk(_opts: CoreSDKInitOptions) -> Result> { - Err(SDKServiceError::Unknown {}) +pub fn init(_opts: CoreInitOptions) -> Result> { + Err(CoreError::Unknown {}) } pub struct CoreSDK { @@ -78,11 +78,11 @@ pub trait WorkProvider { fn get_work(&self, task_queue: &str) -> Result; } -impl CoreSDKService for CoreSDK +impl Core for CoreSDK where WP: WorkProvider, { - fn poll_sdk_task(&self) -> Result { + fn poll_task(&self) -> Result { // This will block forever in the event there is no work from the server let work = self.work_provider.get_work("TODO: Real task queue")?; match &work.workflow_execution { @@ -91,18 +91,18 @@ where self.new_workflow(workflow_id.to_string()); } // TODO: Appropriate error - None => return Err(SDKServiceError::Unknown), + None => return Err(CoreError::Unknown), } // TODO: Apply history to machines, get commands out, convert them to task - Ok(PollSdkTaskResp { + Ok(Task { task_token: work.task_token, - task: None, + variant: None, }) } - fn complete_sdk_task(&self, req: CompleteSdkTaskReq) -> Result<(), SDKServiceError> { + fn complete_task(&self, req: CompleteTaskReq) -> Result<(), CoreError> { match req.completion { - Some(Completion::Workflow(SdkwfTaskCompletion { + Some(Completion::Workflow(WorkflowTaskCompletion { status: Some(wfstatus), })) => { match wfstatus { @@ -127,11 +127,11 @@ where Some(Completion::Activity(_)) => { unimplemented!() } - _ => Err(SDKServiceError::MalformedCompletion(req)), + _ => Err(CoreError::MalformedCompletion(req)), } } - fn register_implementations(&self, _req: RegistrationReq) -> Result<(), SDKServiceError> { + fn register_implementations(&self, _req: RegistrationReq) -> Result<(), CoreError> { unimplemented!() } } @@ -184,13 +184,13 @@ impl DrivenWorkflow for WorkflowBridge { #[derive(thiserror::Error, Debug)] #[allow(clippy::large_enum_variant)] -pub enum SDKServiceError { +pub enum CoreError { #[error("Unknown service error")] Unknown, #[error("No tasks to perform for now")] NoWork, #[error("Lang SDK sent us a malformed completion: {0:?}")] - MalformedCompletion(CompleteSdkTaskReq), + MalformedCompletion(CompleteTaskReq), #[error("Error buffering commands")] CantSendCommands(#[from] SendError>), #[error("Couldn't interpret command from ")] @@ -203,7 +203,7 @@ mod test { use crate::{ machines::test_help::TestHistoryBuilder, protos::{ - coresdk::{self, command::Variant, SdkwfTaskSuccess}, + coresdk::{self, command::Variant, WorkflowTaskSuccess}, temporal::api::{ command::v1::{command, Command, StartTimerCommandAttributes}, enums::v1::EventType, @@ -248,7 +248,7 @@ mod test { }; core.new_workflow(wfid.to_string()); - // TODO: These are what poll_sdk_task should end up returning + // TODO: These are what poll_task should end up returning // SdkwfTask { // timestamp: None, // workflow_id: wfid.to_string(), @@ -274,7 +274,7 @@ mod test { // } // .into(), - let res = core.poll_sdk_task().unwrap(); + let res = core.poll_task().unwrap(); let task_tok = res.task_token; let timer_atom = Arc::new(AtomicBool::new(false)); let cmd: command::Attributes = StartTimerCommandAttributes { @@ -283,14 +283,14 @@ mod test { } .into(); let cmd: Command = cmd.into(); - let success = SdkwfTaskSuccess { + let success = WorkflowTaskSuccess { commands: vec![coresdk::Command { variant: Some(Variant::Api(cmd)), }], }; - core.complete_sdk_task(CompleteSdkTaskReq { + core.complete_task(CompleteTaskReq { task_token: task_tok, - completion: Some(Completion::Workflow(SdkwfTaskCompletion { + completion: Some(Completion::Workflow(WorkflowTaskCompletion { status: Some(Status::Successful(success)), })), }) diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 33c6a0e13..60fec832f 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -4,11 +4,11 @@ pub mod coresdk { pub type HistoryEventId = i64; - impl PollSdkTaskResp { - pub fn from_wf_task(task_token: Vec, t: SdkwfTask) -> Self { - PollSdkTaskResp { + impl Task { + pub fn from_wf_task(task_token: Vec, t: WorkflowTask) -> Self { + Task { task_token, - task: Some(t.into()), + variant: Some(t.into()), } } }