Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ message CancelActivity {

message ActivityTask {
string activity_id = 1;
oneof job {
oneof variant {
// Start activity execution.
StartActivity start = 2;
// Attempt to cancel activity execution.
Expand Down Expand Up @@ -185,12 +185,12 @@ message WFActivationCompletion {
}
}

/// Used to report activity completion to core and to resolve the activity in a workflow activtion
/// Used to report activity completion to core and to resolve the activity in a workflow activation
message ActivityResult {
oneof status {
ActivityTaskSuccess completed = 1;
ActivityTaskCancelation canceled = 2;
ActivityTaskFailure failed = 3;
ActivityTaskFailure failed = 2;
ActivityTaskCancelation canceled = 3;
}
}

Expand Down Expand Up @@ -231,6 +231,7 @@ message WFActivationFailure {

/// Used in ActivityResult to report cancellation
message ActivityTaskCancelation {
temporal.api.common.v1.Payloads details = 1;
}

/// Used in ActivityResult to report successful completion
Expand Down
153 changes: 134 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,20 @@ mod workflow;
mod test_help;

pub use core_tracing::tracing_init;
pub use pollers::{ServerGateway, ServerGatewayApis, ServerGatewayOptions};
pub use pollers::{
PollTaskRequest, PollTaskResponse, ServerGateway, ServerGatewayApis, ServerGatewayOptions,
};
pub use url::Url;

use crate::protos::coresdk::{
activity_result, task, ActivityTaskCancelation, ActivityTaskFailure, ActivityTaskSuccess,
};
use crate::{
machines::{InconvertibleCommandError, ProtoCommand, WFCommand, WFMachinesError},
pending_activations::{PendingActivation, PendingActivations},
protos::{
coresdk::{
task_completion, wf_activation_completion::Status, Task, TaskCompletion,
task_completion, wf_activation_completion, ActivityResult, Task, TaskCompletion,
WfActivationCompletion, WfActivationSuccess,
},
temporal::api::{
Expand Down Expand Up @@ -62,17 +67,24 @@ pub type Result<T, E = CoreError> = std::result::Result<T, E>;
/// expected that only one instance of an implementation will exist for the lifetime of the
/// worker(s) using it.
pub trait Core: Send + Sync {
/// Ask the core for some work, returning a [Task], which will eventually contain either a
/// [protos::coresdk::WfActivation] or an [protos::coresdk::ActivityTask]. It is then the
/// language SDK's responsibility to call the appropriate code with the provided inputs.
/// Ask the core for some work, returning a [Task], which will contain a [protos::coresdk::WfActivation].
/// It is then the language SDK's responsibility to call the appropriate code with the provided inputs.
///
/// TODO: Examples
/// TODO: rename to poll_workflow_task and change result type to WfActivation
fn poll_task(&self, task_queue: &str) -> Result<Task>;

/// Ask the core for some work, returning a [protos::coresdk::Task], which will contain a [protos::coresdk::ActivityTask].
/// It is then the language SDK's responsibility to call the completion API.
fn poll_activity_task(&self, task_queue: &str) -> Result<Task>;

/// Tell the core that some work has been completed - whether as a result of running workflow
/// code or executing an activity.
fn complete_task(&self, req: TaskCompletion) -> Result<()>;

/// Tell the core that activity has completed. This will result in core calling the server and completing activity synchronously.
fn complete_activity_task(&self, req: TaskCompletion) -> Result<()>;

/// Returns an instance of ServerGateway.
fn server_gateway(&self) -> Result<Arc<dyn ServerGatewayApis>>;

Expand Down Expand Up @@ -158,8 +170,8 @@ where

// This will block forever (unless interrupted by shutdown) in the event there is no work
// from the server
match self.poll_server(task_queue) {
Ok(work) => {
match self.poll_server(PollTaskRequest::Workflow(task_queue.to_owned())) {
Ok(PollTaskResponse::WorkflowTask(work)) => {
let task_token = work.task_token.clone();
debug!(
task_token = %fmt_task_token(&task_token),
Expand All @@ -180,8 +192,29 @@ where
variant: next_activation.activation.map(Into::into),
})
}
// Drain pending activations in case of shutdown.
Err(CoreError::ShuttingDown) => self.poll_task(task_queue),
Err(e) => Err(e),
Ok(PollTaskResponse::ActivityTask(_)) => Err(CoreError::UnexpectedResult),
}
}

#[instrument(skip(self))]
fn poll_activity_task(&self, task_queue: &str) -> Result<Task> {
if self.shutdown_requested.load(Ordering::SeqCst) {
return Err(CoreError::ShuttingDown);
}

match self.poll_server(PollTaskRequest::Activity(task_queue.to_owned())) {
Ok(PollTaskResponse::ActivityTask(work)) => {
let task_token = work.task_token.clone();
Ok(Task {
task_token,
variant: Some(task::Variant::Activity(work.into())),
})
}
Err(e) => Err(e),
Ok(PollTaskResponse::WorkflowTask(_)) => Err(CoreError::UnexpectedResult),
}
}

Expand All @@ -201,7 +234,7 @@ where
.map(|x| x.value().clone())
.ok_or_else(|| CoreError::NothingFoundForTaskToken(task_token.clone()))?;
match wfstatus {
Status::Successful(success) => {
wf_activation_completion::Status::Successful(success) => {
let commands = self.push_lang_commands(&run_id, success)?;
// We only actually want to send commands back to the server if there are
// no more pending activations -- in other words the lang SDK has caught
Expand All @@ -213,7 +246,7 @@ where
)?;
}
}
Status::Failed(failure) => {
wf_activation_completion::Status::Failed(failure) => {
// Blow up any cached data associated with the workflow
self.evict_run(&run_id);

Expand All @@ -229,10 +262,41 @@ where
}
Ok(())
}
_ => Err(CoreError::MalformedCompletion(req)),
}
}

#[instrument(skip(self))]
fn complete_activity_task(&self, req: TaskCompletion) -> Result<()> {
match req {
TaskCompletion {
variant: Some(task_completion::Variant::Activity(_)),
..
} => unimplemented!(),
task_token,
variant:
Some(task_completion::Variant::Activity(ActivityResult {
status: Some(status),
})),
} => {
match status {
activity_result::Status::Completed(ActivityTaskSuccess { result }) => {
self.runtime.block_on(
self.server_gateway
.complete_activity_task(task_token, result),
)?;
}
activity_result::Status::Failed(ActivityTaskFailure { failure }) => {
self.runtime.block_on(
self.server_gateway.fail_activity_task(task_token, failure),
)?;
}
activity_result::Status::Canceled(ActivityTaskCancelation { details }) => {
self.runtime.block_on(
self.server_gateway
.cancel_activity_task(task_token, details),
)?;
}
}
Ok(())
}
_ => Err(CoreError::MalformedCompletion(req)),
}
}
Expand Down Expand Up @@ -298,7 +362,7 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {

/// Blocks polling the server until it responds, or until the shutdown flag is set (aborting
/// the poll)
fn poll_server(&self, task_queue: &str) -> Result<PollWorkflowTaskQueueResponse> {
fn poll_server(&self, req: PollTaskRequest) -> Result<PollTaskResponse> {
self.runtime.block_on(async {
let shutdownfut = async {
loop {
Expand All @@ -308,14 +372,12 @@ impl<WP: ServerGatewayApis> CoreSDK<WP> {
tokio::time::sleep(Duration::from_millis(100)).await;
}
};
let pollfut = self
.server_gateway
.poll_workflow_task(task_queue.to_owned());
let poll_result_future = self.server_gateway.poll_task(req);
tokio::select! {
_ = shutdownfut => {
Err(CoreError::ShuttingDown)
}
r = pollfut => r
r = poll_result_future => r
}
})
}
Expand Down Expand Up @@ -377,12 +439,17 @@ pub enum CoreError {
/// When thrown from complete_task, it means you should poll for a new task, receive a new
/// task token, and complete that task.
UnhandledCommandWhenCompleting,
/// Indicates that underlying function returned Ok, but result type was incorrect.
/// This is likely a result of a bug and should never happen.
UnexpectedResult,
}

#[cfg(test)]
mod test {
use super::*;
use crate::protos::temporal::api::command::v1::FailWorkflowExecutionCommandAttributes;
use crate::protos::temporal::api::command::v1::{
FailWorkflowExecutionCommandAttributes, ScheduleActivityTaskCommandAttributes,
};
use crate::{
machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder},
protos::{
Expand Down Expand Up @@ -415,6 +482,14 @@ mod test {
build_fake_core(wfid, RUN_ID, &mut t, hist_batches)
}

#[fixture(hist_batches = &[])]
fn single_activity_setup(hist_batches: &[usize]) -> FakeCore {
let wfid = "fake_wf_id";

let mut t = canned_histories::single_activity("fake_activity");
build_fake_core(wfid, RUN_ID, &mut t, hist_batches)
}

#[rstest(core,
case::incremental(single_timer_setup(&[1, 2])),
case::replay(single_timer_setup(&[2]))
Expand Down Expand Up @@ -455,7 +530,47 @@ mod test {
.unwrap();
}

#[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
#[rstest(core,
case::incremental(single_activity_setup(&[1, 2])),
case::replay(single_activity_setup(&[2]))
)]
fn single_activity_completion(core: FakeCore) {
let res = core.poll_task(TASK_Q).unwrap();
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::StartWorkflow(_)),
}]
);
assert!(core.workflow_machines.exists(RUN_ID));

let task_tok = res.task_token;
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![ScheduleActivityTaskCommandAttributes {
activity_id: "fake_activity".to_string(),
..Default::default()
}
.into()],
task_tok,
))
.unwrap();

let res = core.poll_task(TASK_Q).unwrap();
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::ResolveActivity(_)),
}]
);
let task_tok = res.task_token;
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()],
task_tok,
))
.unwrap();
}

#[rstest(hist_batches, case::incremental(& [1, 2]), case::replay(& [2]))]
fn parallel_timer_test_across_wf_bridge(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
let run_id = "fake_run_id";
Expand Down
Loading