Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"#[derive(::derive_more::From)]",
)
.type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]")
.type_attribute("coresdk.SDKWFTask.task", "#[derive(::derive_more::From)]")
.type_attribute(
"coresdk.PollSDKTaskResp.task",
"coresdk.WorkflowTask.attributes",
"#[derive(::derive_more::From)]",
)
.type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]")
.compile(
&[
"protos/local/core_interface.proto",
Expand Down
59 changes: 28 additions & 31 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import "temporal/api/failure/v1/message.proto";
import "temporal/api/common/v1/message.proto";
import "temporal/api/command/v1/message.proto";

// TODO: Use the SDK prefix?
message WorkflowIdentifier {
string namespace = 1;
string name = 2;
Expand All @@ -32,19 +31,17 @@ message RegistrationReq {
repeated ActivityIdentifier activities = 2;
}

// TODO: SDK prefix in front of everything is maybe pointless given it's all within this package

service CoreSDKService {
rpc PollSDKTask (google.protobuf.Empty) returns (PollSDKTaskResp) {}
rpc CompleteSDKTask (CompleteSDKTaskReq) returns (google.protobuf.Empty) {}
service Core {
rpc PollTask (google.protobuf.Empty) returns (Task) {}
rpc CompleteTask (CompleteTaskReq) returns (google.protobuf.Empty) {}
rpc RegisterImplementations (RegistrationReq) returns (google.protobuf.Empty) {}
}

message PollSDKTaskResp {
message Task {
bytes task_token = 1;
oneof task {
SDKWFTask wf_task = 2;
SDKActivityTask activity_task = 3;
oneof variant {
WorkflowTask workflow = 2;
ActivityTask activity = 3;
}
}

Expand All @@ -54,75 +51,75 @@ message StartWorkflowTaskAttributes {
temporal.api.common.v1.Payloads arguments = 3;
}

message UnblockTimerTaskAttibutes {
message TriggerTimerTaskAttributes {
string timer_id = 1;
}

message SDKWFTask {
message WorkflowTask {
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true];
string workflow_id = 2;
oneof task {
oneof attributes {
StartWorkflowTaskAttributes start_workflow = 3;
UnblockTimerTaskAttibutes unblock_timer = 4;
TriggerTimerTaskAttributes trigger_timer = 4;
}
}

message SDKActivityTask {
message ActivityTask {
// Original task from temporal service
temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1;
}


message CompleteSDKTaskReq {
message CompleteTaskReq {
bytes task_token = 1;
oneof completion {
SDKWFTaskCompletion workflow = 2;
SDKActivityTaskCompletion activity = 3;
WorkflowTaskCompletion workflow = 2;
ActivityTaskCompletion activity = 3;
}
}

message SDKWFTaskCompletion {
message WorkflowTaskCompletion {
oneof status {
SDKWFTaskSuccess successful = 1;
SDKWFTaskFailure failed = 2;
WorkflowTaskSuccess successful = 1;
WorkflowTaskFailure failed = 2;
}
}

message SDKActivityTaskCompletion {
message ActivityTaskCompletion {
oneof status {
SDKActivityTaskSuccess successful = 1;
SDKActivityTaskFailure failed = 2;
ActivityTaskSuccess successful = 1;
ActivityTaskFailure failed = 2;
}
}

message SDKCommand {
// Reserved for SDK specific commands
message CoreCommand {
// Reserved for specific commands
}

message Command {
oneof variant {
temporal.api.command.v1.Command api = 1;
SDKCommand sdk = 2;
CoreCommand core = 2;
}
}

message SDKWFTaskSuccess {
message WorkflowTaskSuccess {
repeated Command commands = 1;
// Other bits from RespondWorkflowTaskCompletedRequest as needed
}

message SDKWFTaskFailure {
message WorkflowTaskFailure {
temporal.api.enums.v1.WorkflowTaskFailedCause cause = 1;
temporal.api.failure.v1.Failure failure = 2;
// Other bits from RespondWorkflowTaskFailedRequest as needed
}

message SDKActivityTaskSuccess {
message ActivityTaskSuccess {
temporal.api.common.v1.Payloads result = 1;
// Other bits from RespondActivityTaskCompletedRequest as needed
}

message SDKActivityTaskFailure {
message ActivityTaskFailure {
temporal.api.failure.v1.Failure failure = 1;
// Other bits from RespondActivityTaskFailedRequest as needed
}
52 changes: 26 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
machines::{DrivenWorkflow, InconvertibleCommandError, WFCommand, WorkflowMachines},
protos::{
coresdk::{
complete_sdk_task_req::Completion, sdkwf_task_completion::Status, CompleteSdkTaskReq,
PollSdkTaskResp, RegistrationReq, SdkwfTaskCompletion,
complete_task_req::Completion, workflow_task_completion::Status, CompleteTaskReq,
RegistrationReq, Task, WorkflowTaskCompletion,
},
temporal::api::{
common::v1::WorkflowExecution,
Expand All @@ -29,27 +29,27 @@ use std::{
sync::mpsc::{self, Receiver, SendError, Sender},
};

pub type Result<T, E = SDKServiceError> = std::result::Result<T, E>;
pub type Result<T, E = CoreError> = std::result::Result<T, E>;

// TODO: Do we actually need this to be send+sync? Probably, but there's also no reason to have
// any given WorfklowMachines instance accessed on more than one thread. Ideally this trait can
// be accessed from many threads, but each workflow is pinned to one thread (possibly with many
// sharing the same thread). IE: WorkflowMachines should be Send but not Sync, and this should
// be both, ideally.
pub trait CoreSDKService {
fn poll_sdk_task(&self) -> Result<PollSdkTaskResp>;
fn complete_sdk_task(&self, req: CompleteSdkTaskReq) -> Result<()>;
pub trait Core {
fn poll_task(&self) -> Result<Task>;
fn complete_task(&self, req: CompleteTaskReq) -> Result<()>;
fn register_implementations(&self, req: RegistrationReq) -> Result<()>;
}

pub struct CoreSDKInitOptions {
pub struct CoreInitOptions {
_queue_name: String,
_max_concurrent_workflow_executions: u32,
_max_concurrent_activity_executions: u32,
}

pub fn init_sdk(_opts: CoreSDKInitOptions) -> Result<Box<dyn CoreSDKService>> {
Err(SDKServiceError::Unknown {})
pub fn init(_opts: CoreInitOptions) -> Result<Box<dyn Core>> {
Err(CoreError::Unknown {})
}

pub struct CoreSDK<WP> {
Expand Down Expand Up @@ -78,11 +78,11 @@ pub trait WorkProvider {
fn get_work(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse>;
}

impl<WP> CoreSDKService for CoreSDK<WP>
impl<WP> Core for CoreSDK<WP>
where
WP: WorkProvider,
{
fn poll_sdk_task(&self) -> Result<PollSdkTaskResp, SDKServiceError> {
fn poll_task(&self) -> Result<Task, CoreError> {
// This will block forever in the event there is no work from the server
let work = self.work_provider.get_work("TODO: Real task queue")?;
match &work.workflow_execution {
Expand All @@ -91,18 +91,18 @@ where
self.new_workflow(workflow_id.to_string());
}
// TODO: Appropriate error
None => return Err(SDKServiceError::Unknown),
None => return Err(CoreError::Unknown),
}
// TODO: Apply history to machines, get commands out, convert them to task
Ok(PollSdkTaskResp {
Ok(Task {
task_token: work.task_token,
task: None,
variant: None,
})
}

fn complete_sdk_task(&self, req: CompleteSdkTaskReq) -> Result<(), SDKServiceError> {
fn complete_task(&self, req: CompleteTaskReq) -> Result<(), CoreError> {
match req.completion {
Some(Completion::Workflow(SdkwfTaskCompletion {
Some(Completion::Workflow(WorkflowTaskCompletion {
status: Some(wfstatus),
})) => {
match wfstatus {
Expand All @@ -127,11 +127,11 @@ where
Some(Completion::Activity(_)) => {
unimplemented!()
}
_ => Err(SDKServiceError::MalformedCompletion(req)),
_ => Err(CoreError::MalformedCompletion(req)),
}
}

fn register_implementations(&self, _req: RegistrationReq) -> Result<(), SDKServiceError> {
fn register_implementations(&self, _req: RegistrationReq) -> Result<(), CoreError> {
unimplemented!()
}
}
Expand Down Expand Up @@ -184,13 +184,13 @@ impl DrivenWorkflow for WorkflowBridge {

#[derive(thiserror::Error, Debug)]
#[allow(clippy::large_enum_variant)]
pub enum SDKServiceError {
pub enum CoreError {
#[error("Unknown service error")]
Unknown,
#[error("No tasks to perform for now")]
NoWork,
#[error("Lang SDK sent us a malformed completion: {0:?}")]
MalformedCompletion(CompleteSdkTaskReq),
MalformedCompletion(CompleteTaskReq),
#[error("Error buffering commands")]
CantSendCommands(#[from] SendError<Vec<WFCommand>>),
#[error("Couldn't interpret command from <lang>")]
Expand All @@ -203,7 +203,7 @@ mod test {
use crate::{
machines::test_help::TestHistoryBuilder,
protos::{
coresdk::{self, command::Variant, SdkwfTaskSuccess},
coresdk::{self, command::Variant, WorkflowTaskSuccess},
temporal::api::{
command::v1::{command, Command, StartTimerCommandAttributes},
enums::v1::EventType,
Expand Down Expand Up @@ -248,7 +248,7 @@ mod test {
};
core.new_workflow(wfid.to_string());

// TODO: These are what poll_sdk_task should end up returning
// TODO: These are what poll_task should end up returning
// SdkwfTask {
// timestamp: None,
// workflow_id: wfid.to_string(),
Expand All @@ -274,7 +274,7 @@ mod test {
// }
// .into(),

let res = core.poll_sdk_task().unwrap();
let res = core.poll_task().unwrap();
let task_tok = res.task_token;
let timer_atom = Arc::new(AtomicBool::new(false));
let cmd: command::Attributes = StartTimerCommandAttributes {
Expand All @@ -283,14 +283,14 @@ mod test {
}
.into();
let cmd: Command = cmd.into();
let success = SdkwfTaskSuccess {
let success = WorkflowTaskSuccess {
commands: vec![coresdk::Command {
variant: Some(Variant::Api(cmd)),
}],
};
core.complete_sdk_task(CompleteSdkTaskReq {
core.complete_task(CompleteTaskReq {
task_token: task_tok,
completion: Some(Completion::Workflow(SdkwfTaskCompletion {
completion: Some(Completion::Workflow(WorkflowTaskCompletion {
status: Some(Status::Successful(success)),
})),
})
Expand Down
8 changes: 4 additions & 4 deletions src/protos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ pub mod coresdk {

pub type HistoryEventId = i64;

impl PollSdkTaskResp {
pub fn from_wf_task(task_token: Vec<u8>, t: SdkwfTask) -> Self {
PollSdkTaskResp {
impl Task {
pub fn from_wf_task(task_token: Vec<u8>, t: WorkflowTask) -> Self {
Task {
task_token,
task: Some(t.into()),
variant: Some(t.into()),
}
}
}
Expand Down