Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "temporal/api/workflowservice/v1/request_response.proto";
import "temporal/api/taskqueue/v1/message.proto";
import "temporal/api/enums/v1/failed_cause.proto";
import "temporal/api/failure/v1/message.proto";
import "temporal/api/history/v1/message.proto";
import "temporal/api/common/v1/message.proto";
import "temporal/api/command/v1/message.proto";
import "temporal/api/query/v1/message.proto";
Expand Down Expand Up @@ -69,6 +70,8 @@ message WFActivationJob {
QueryWorkflow query_workflow = 5;
// A request to cancel the workflow was received.
CancelWorkflow cancel_workflow = 6;
// A request to signal the workflow was received.
SignalWorkflow signal_workflow = 7;
}
}

Expand All @@ -87,10 +90,6 @@ message StartWorkflow {
// will be others - workflow exe started attrs, etc
}

message CancelWorkflow {
// TODO: add attributes here
}

message FireTimer {
string timer_id = 1;
}
Expand All @@ -107,6 +106,15 @@ message QueryWorkflow {
temporal.api.query.v1.WorkflowQuery query = 1;
}

message CancelWorkflow {
// TODO: add attributes here
}

message SignalWorkflow {
// The signal information from the workflow's history
temporal.api.history.v1.WorkflowExecutionSignaledEventAttributes signal = 1;
}

message ActivityTask {
// Original task from temporal service
temporal.api.workflowservice.v1.PollActivityTaskQueueResponse original = 1;
Expand Down
31 changes: 31 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,10 @@ pub enum CoreError {
InvalidUri(#[from] InvalidUri),
/// State machines are missing for the workflow with run id {0}!
MissingMachines(String),
/// There exists a pending command in this workflow's history which has not yet been handled.
/// When thrown from complete_task, it means you should poll for a new task, receive a new
/// task token, and complete that task.
UnhandledCommandWhenCompleting,
}

#[cfg(test)]
Expand Down Expand Up @@ -766,4 +770,31 @@ mod test {
))
.unwrap();
}

#[rstest(hist_batches, case::incremental(&[1, 2]), case::replay(&[2]))]
fn two_signals(hist_batches: &[usize]) {
let wfid = "fake_wf_id";
let run_id = "fake_run_id";

let mut t = canned_histories::two_signals("sig1", "sig2");
let core = build_fake_core(wfid, run_id, &mut t, hist_batches);

let res = core.poll_task(TASK_Q).unwrap();
// Task is completed with no commands
core.complete_task(TaskCompletion::ok_from_api_attrs(vec![], res.task_token))
.unwrap();

let res = core.poll_task(TASK_Q).unwrap();
assert_matches!(
res.get_wf_jobs().as_slice(),
[
WfActivationJob {
variant: Some(wf_activation_job::Variant::SignalWorkflow(_)),
},
WfActivationJob {
variant: Some(wf_activation_job::Variant::SignalWorkflow(_)),
}
]
);
}
}
11 changes: 11 additions & 0 deletions src/machines/test_help/history_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use super::Result;
use crate::protos::temporal::api::common::v1::{Payload, Payloads};
use crate::protos::temporal::api::failure::v1::Failure;
use crate::protos::temporal::api::history::v1::WorkflowExecutionSignaledEventAttributes;
use crate::{
machines::{workflow_machines::WorkflowMachines, ProtoCommand},
protos::temporal::api::{
Expand Down Expand Up @@ -128,6 +130,15 @@ impl TestHistoryBuilder {
self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into());
}

pub fn add_we_signaled(&mut self, signal_name: &str, payloads: Vec<Payload>) {
let attrs = WorkflowExecutionSignaledEventAttributes {
signal_name: signal_name.to_string(),
input: Some(Payloads { payloads }),
..Default::default()
};
self.build_and_push_event(EventType::WorkflowExecutionSignaled, attrs.into());
}

pub fn as_history(&self) -> History {
History {
events: self.events.clone(),
Expand Down
2 changes: 2 additions & 0 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ where
// to re-run the workflow again.
// TODO: This would be better to solve by actually pausing the workflow properly rather
// than doing the re-run the whole thing every time deal.
// TODO: Probably screws up signals edge case where signal rcvd after complete makes it to
// server
if self.sent_final_execution {
return vec![];
}
Expand Down
9 changes: 8 additions & 1 deletion src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,14 @@ impl WorkflowMachines {
self.machines_by_event_id.insert(event.event_id, key);
}
Some(EventType::WorkflowExecutionSignaled) => {
// TODO: Signal callbacks
if let Some(history_event::Attributes::WorkflowExecutionSignaledEventAttributes(
attrs,
)) = &event.attributes
{
self.drive_me.signal(attrs.clone());
} else {
// err
}
}
Some(EventType::WorkflowExecutionCancelRequested) => {
// TODO: Cancel callbacks
Expand Down
17 changes: 13 additions & 4 deletions src/pollers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
PollWorkflowTaskQueueApi, RespondWorkflowTaskCompletedApi, RespondWorkflowTaskFailedApi,
StartWorkflowExecutionApi,
},
Result,
CoreError, Result,
};
use tonic::{transport::Channel, Request, Status};
use url::Url;
Expand Down Expand Up @@ -140,12 +140,21 @@ impl RespondWorkflowTaskCompletedApi for ServerGateway {
namespace: self.opts.namespace.to_string(),
..Default::default()
};
Ok(self
match self
.service
.clone()
.respond_workflow_task_completed(request)
.await?
.into_inner())
.await
{
Ok(pwtr) => Ok(pwtr.into_inner()),
Err(ts) => {
if ts.code() == tonic::Code::InvalidArgument && ts.message() == "UnhandledCommand" {
Err(CoreError::UnhandledCommandWhenCompleting)
} else {
Err(ts.into())
}
}
}
}
}

Expand Down
32 changes: 32 additions & 0 deletions src/test_help/canned_histories.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::machines::test_help::TestHistoryBuilder;
use crate::protos::temporal::api::common::v1::Payload;
use crate::protos::temporal::api::enums::v1::{EventType, WorkflowTaskFailedCause};
use crate::protos::temporal::api::failure::v1::Failure;
use crate::protos::temporal::api::history::v1::{
Expand Down Expand Up @@ -150,3 +151,34 @@ pub fn workflow_fails_with_failure_after_timer(timer_id: &str) -> TestHistoryBui
t.add_workflow_task_scheduled_and_started();
t
}

/// First signal's payload is "hello " and second is "world" (no metadata for either)
/// 1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
/// 2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 3: EVENT_TYPE_WORKFLOW_TASK_STARTED
/// 4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
/// 5: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
/// 6: EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED
/// 7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// 8: EVENT_TYPE_WORKFLOW_TASK_STARTED
pub fn two_signals(sig_1_id: &str, sig_2_id: &str) -> TestHistoryBuilder {
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_full_wf_task();
t.add_we_signaled(
sig_1_id,
vec![Payload {
metadata: Default::default(),
data: b"hello ".to_vec(),
}],
);
t.add_we_signaled(
sig_2_id,
vec![Payload {
metadata: Default::default(),
data: b"world".to_vec(),
}],
);
t.add_workflow_task_scheduled_and_started();
t
}
7 changes: 6 additions & 1 deletion src/workflow/driven_workflow.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::protos::coresdk::SignalWorkflow;
use crate::{
machines::WFCommand,
protos::coresdk::wf_activation_job,
Expand Down Expand Up @@ -53,7 +54,11 @@ impl DrivenWorkflow {
}

/// Signal the workflow
pub fn _signal(&mut self, _attribs: WorkflowExecutionSignaledEventAttributes) {}
pub fn signal(&mut self, attribs: WorkflowExecutionSignaledEventAttributes) {
self.send_job(wf_activation_job::Variant::SignalWorkflow(SignalWorkflow {
signal: Some(attribs),
}))
}

/// Cancel the workflow
pub fn _cancel(&mut self, _attribs: WorkflowExecutionCanceledEventAttributes) {}
Expand Down
Loading