Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]")
.type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]")
.type_attribute(
"coresdk.WFActivationJob.attributes",
"coresdk.WFActivationJob.variant",
"#[derive(::derive_more::From)]",
)
.type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]")
Expand Down
33 changes: 16 additions & 17 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -56,24 +56,23 @@ message WFActivation {
}

message WFActivationJob {
oneof attributes {
// TODO could literally be attributes from events? -- maybe we don't need our own types

oneof variant {
// Begin a workflow for the first time
StartWorkflowTaskAttributes start_workflow = 1;
StartWorkflow start_workflow = 1;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with dropping the suffix altogether but maybe let's put the job definitions in a different proto file so they get a proper namespace in all languages?
@mfateev WDYT?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also discussed using passive form for jobs so it'd be WorkflowStarted but I feel like the imperative form is more suitable here.

// A timer has fired, allowing whatever was waiting on it (if anything) to proceed
TimerFiredTaskAttributes timer_fired = 2;
// A timer was canceled
TimerCanceledTaskAttributes timer_canceled = 3;
FireTimer fire_timer = 2;
// A timer was canceled, and needs to be unblocked on the lang side.
CancelTimer cancel_timer = 3;
// Workflow was reset. The randomness seed must be updated.
RandomSeedUpdatedAttributes random_seed_updated = 4;

QueryWorkflowJob query_workflow = 5;
CancelWorkflowTaskAttributes cancel_workflow = 6;
UpdateRandomSeed update_random_seed = 4;
// A request to query the workflow was received.
QueryWorkflow query_workflow = 5;
// A request to cancel the workflow was received.
CancelWorkflow cancel_workflow = 6;
}
}

message StartWorkflowTaskAttributes {
message StartWorkflow {
// The identifier the lang-specific sdk uses to execute workflow code
string workflow_type = 1;
// The workflow id used on the temporal server
Expand All @@ -88,23 +87,23 @@ message StartWorkflowTaskAttributes {
// will be others - workflow exe started attrs, etc
}

message CancelWorkflowTaskAttributes {
message CancelWorkflow {
// TODO: add attributes here
}

message TimerFiredTaskAttributes {
message FireTimer {
string timer_id = 1;
}

message TimerCanceledTaskAttributes {
message CancelTimer {
string timer_id = 1;
}

message RandomSeedUpdatedAttributes {
message UpdateRandomSeed {
uint64 randomness_seed = 1;
}

message QueryWorkflowJob {
message QueryWorkflow {
temporal.api.query.v1.WorkflowQuery query = 1;
}

Expand Down
36 changes: 17 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,8 @@ mod test {
machines::test_help::{build_fake_core, FakeCore},
protos::{
coresdk::{
wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes,
TaskCompletion, TimerFiredTaskAttributes, WfActivationJob,
wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed,
WfActivationJob,
},
temporal::api::command::v1::{
CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes,
Expand All @@ -359,8 +359,7 @@ mod test {
let wfid = "fake_wf_id";

let mut t = canned_histories::single_timer("fake_timer");
let core = build_fake_core(wfid, RUN_ID, &mut t, hist_batches);
core
build_fake_core(wfid, RUN_ID, &mut t, hist_batches)
}

#[rstest(core,
Expand All @@ -372,7 +371,7 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)),
variant: Some(wf_activation_job::Variant::StartWorkflow(_)),
}]
);
assert!(core.workflow_machines.exists(RUN_ID));
Expand All @@ -392,7 +391,7 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(_)),
variant: Some(wf_activation_job::Variant::FireTimer(_)),
}]
);
let task_tok = res.task_token;
Expand All @@ -418,7 +417,7 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)),
variant: Some(wf_activation_job::Variant::StartWorkflow(_)),
}]
);
assert!(core.workflow_machines.exists(run_id));
Expand Down Expand Up @@ -446,13 +445,13 @@ mod test {
res.get_wf_jobs().as_slice(),
[
WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(
TimerFiredTaskAttributes { timer_id: t1_id }
variant: Some(wf_activation_job::Variant::FireTimer(
FireTimer { timer_id: t1_id }
)),
},
WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(
TimerFiredTaskAttributes { timer_id: t2_id }
variant: Some(wf_activation_job::Variant::FireTimer(
FireTimer { timer_id: t2_id }
)),
}
] => {
Expand Down Expand Up @@ -483,7 +482,7 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)),
variant: Some(wf_activation_job::Variant::StartWorkflow(_)),
}]
);
assert!(core.workflow_machines.exists(run_id));
Expand All @@ -510,7 +509,7 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(_)),
variant: Some(wf_activation_job::Variant::FireTimer(_)),
}]
);
let task_tok = res.task_token;
Expand Down Expand Up @@ -558,8 +557,8 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(
StartWorkflowTaskAttributes{randomness_seed, ..}
variant: Some(wf_activation_job::Variant::StartWorkflow(
StartWorkflow{randomness_seed, ..}
)),
}] => {
randomness_seed_from_start = *randomness_seed;
Expand All @@ -582,10 +581,10 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(_),),
variant: Some(wf_activation_job::Variant::FireTimer(_),),
},
WfActivationJob {
attributes: Some(wf_activation_job::Attributes::RandomSeedUpdated(RandomSeedUpdatedAttributes{randomness_seed})),
variant: Some(wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed{randomness_seed})),
}] => {
assert_ne!(randomness_seed_from_start, *randomness_seed)
}
Expand Down Expand Up @@ -616,7 +615,7 @@ mod test {
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)),
variant: Some(wf_activation_job::Variant::StartWorkflow(_)),
}]
);

Expand All @@ -630,7 +629,6 @@ mod test {
.into(),
CancelTimerCommandAttributes {
timer_id: cancel_timer_id.to_string(),
..Default::default()
}
.into(),
CompleteWorkflowExecutionCommandAttributes { result: None }.into(),
Expand Down
2 changes: 1 addition & 1 deletion src/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send {
/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where
/// some activations must be handled before outgoing commands are issued to avoid deadlocking.
pub(crate) trait ActivationListener {
fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {}
fn on_activation_job(&mut self, _activation: &wf_activation_job::Variant) {}
}

/// [DrivenWorkflow]s respond with these when called, to indicate what they want to do next.
Expand Down
9 changes: 4 additions & 5 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use crate::protos::temporal::api::command::v1::CancelTimerCommandAttributes;
use crate::{
machines::{ActivationListener, DrivenWorkflow, WFCommand},
protos::{
coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes},
coresdk::{wf_activation_job, FireTimer},
temporal::api::{
command::v1::StartTimerCommandAttributes,
command::v1::{CancelTimerCommandAttributes, StartTimerCommandAttributes},
history::v1::{
WorkflowExecutionCanceledEventAttributes, WorkflowExecutionSignaledEventAttributes,
WorkflowExecutionStartedEventAttributes,
Expand Down Expand Up @@ -66,8 +65,8 @@ where
}

impl<F> ActivationListener for TestWorkflowDriver<F> {
fn on_activation_job(&mut self, activation: &Attributes) {
if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation {
fn on_activation_job(&mut self, activation: &wf_activation_job::Variant) {
if let wf_activation_job::Variant::FireTimer(FireTimer { timer_id }) = activation {
Arc::get_mut(&mut self.cache)
.unwrap()
.unblocked_timers
Expand Down
6 changes: 3 additions & 3 deletions src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
Cancellable, NewMachineWithCommand, WFMachinesAdapter,
},
protos::{
coresdk::{HistoryEventId, TimerCanceledTaskAttributes, TimerFiredTaskAttributes},
coresdk::{CancelTimer, FireTimer, HistoryEventId},
temporal::api::{
command::v1::{CancelTimerCommandAttributes, Command, StartTimerCommandAttributes},
enums::v1::{CommandType, EventType},
Expand Down Expand Up @@ -229,11 +229,11 @@ impl WFMachinesAdapter for TimerMachine {
) -> Result<Vec<MachineResponse>, WFMachinesError> {
Ok(match my_command {
// Fire the completion
TimerMachineCommand::Complete => vec![TimerFiredTaskAttributes {
TimerMachineCommand::Complete => vec![FireTimer {
timer_id: self.shared_state.attrs.timer_id.clone(),
}
.into()],
TimerMachineCommand::Canceled => vec![TimerCanceledTaskAttributes {
TimerMachineCommand::Canceled => vec![CancelTimer {
timer_id: self.shared_state.attrs.timer_id.clone(),
}
.into()],
Expand Down
20 changes: 8 additions & 12 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
use crate::protos::coresdk::wf_activation_job;
use crate::{
machines::{
complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer,
workflow_task_state_machine::WorkflowTaskMachine, DrivenWorkflow, NewMachineWithCommand,
ProtoCommand, TemporalStateMachine, WFCommand,
},
protos::{
coresdk::{
wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes,
WfActivation,
},
coresdk::{StartWorkflow, UpdateRandomSeed, WfActivation},
temporal::api::{
enums::v1::{CommandType, EventType},
history::v1::{history_event, HistoryEvent},
Expand Down Expand Up @@ -66,7 +64,7 @@ pub(crate) struct WorkflowMachines {
/// iterating over already added commands.
current_wf_task_commands: VecDeque<CommandAndMachine>,
/// Outgoing activation jobs that need to be sent to the lang sdk
outgoing_wf_activation_jobs: VecDeque<wf_activation_job::Attributes>,
outgoing_wf_activation_jobs: VecDeque<wf_activation_job::Variant>,

/// The workflow that is being driven by this instance of the machines
drive_me: Box<dyn DrivenWorkflow + 'static>,
Expand All @@ -84,7 +82,7 @@ struct CommandAndMachine {
#[must_use]
#[allow(clippy::large_enum_variant)]
pub enum MachineResponse {
PushWFJob(#[from(forward)] wf_activation_job::Attributes),
PushWFJob(#[from(forward)] wf_activation_job::Variant),
IssueNewCommand(ProtoCommand),
TriggerWFTaskStarted {
task_started_event_id: i64,
Expand Down Expand Up @@ -318,7 +316,7 @@ impl WorkflowMachines {
self.run_id = attrs.original_execution_run_id.clone();
// We need to notify the lang sdk that it's time to kick off a workflow
self.outgoing_wf_activation_jobs.push_back(
StartWorkflowTaskAttributes {
StartWorkflow {
workflow_type: attrs
.workflow_type
.as_ref()
Expand Down Expand Up @@ -465,11 +463,9 @@ impl WorkflowMachines {
}
MachineResponse::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => {
self.outgoing_wf_activation_jobs.push_back(
wf_activation_job::Attributes::RandomSeedUpdated(
RandomSeedUpdatedAttributes {
randomness_seed: str_to_randomness_seed(&new_run_id),
},
),
wf_activation_job::Variant::UpdateRandomSeed(UpdateRandomSeed {
randomness_seed: str_to_randomness_seed(&new_run_id),
}),
);
}
MachineResponse::NoOp => (),
Expand Down
9 changes: 3 additions & 6 deletions src/protos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ pub mod coresdk {
include!("coresdk.rs");
use super::temporal::api::command::v1 as api_command;
use super::temporal::api::command::v1::Command as ApiCommand;
use crate::protos::coresdk::wf_activation_job::Attributes;
use command::Variant;

pub type HistoryEventId = i64;
Expand All @@ -34,11 +33,9 @@ pub mod coresdk {
}
}

impl From<wf_activation_job::Attributes> for WfActivationJob {
fn from(a: Attributes) -> Self {
WfActivationJob {
attributes: Some(a),
}
impl From<wf_activation_job::Variant> for WfActivationJob {
fn from(a: wf_activation_job::Variant) -> Self {
WfActivationJob { variant: Some(a) }
}
}

Expand Down
10 changes: 5 additions & 5 deletions tests/integ_tests/simple_wf_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{convert::TryFrom, env, time::Duration};
use temporal_sdk_core::protos::temporal::api::command::v1::CancelTimerCommandAttributes;
use temporal_sdk_core::{
protos::{
coresdk::{wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob},
coresdk::{wf_activation_job, FireTimer, TaskCompletion, WfActivationJob},
temporal::api::command::v1::{
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
},
Expand Down Expand Up @@ -118,13 +118,13 @@ fn parallel_timer_workflow() {
task.get_wf_jobs().as_slice(),
[
WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(
TimerFiredTaskAttributes { timer_id: t1_id }
variant: Some(wf_activation_job::Variant::FireTimer(
FireTimer { timer_id: t1_id }
)),
},
WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(
TimerFiredTaskAttributes { timer_id: t2_id }
variant: Some(wf_activation_job::Variant::FireTimer(
FireTimer { timer_id: t2_id }
)),
}
] => {
Expand Down