Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
"#[derive(::derive_more::From)]",
)
.type_attribute("coresdk.Command.variant", "#[derive(::derive_more::From)]")
.type_attribute("coresdk.WFActivationJob", "#[derive(::derive_more::From)]")
.type_attribute(
"coresdk.WFActivation.attributes",
"#[derive(::derive_more::From)]",
)
.type_attribute(
"coresdk.WorkflowTask.attributes",
"coresdk.WFActivationJob.attributes",
"#[derive(::derive_more::From)]",
)
.type_attribute("coresdk.Task.variant", "#[derive(::derive_more::From)]")
Expand Down
15 changes: 10 additions & 5 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,19 +38,24 @@ message Task {
}

// An instruction to the lang sdk to run some workflow code, whether for the first time or from
// a cached state
// a cached state.
message WFActivation {
// Time the activation was requested
// Time the activation(s) were requested
google.protobuf.Timestamp timestamp = 1 [(gogoproto.stdtime) = true];
// The id of the currently active run of the workflow
string run_id = 2;
// The things to do upon activating the workflow
repeated WFActivationJob jobs = 3;
}

message WFActivationJob {
oneof attributes {
// TODO could literally be attributes from events? -- maybe we don't need our own types

// Begin a workflow for the first time
StartWorkflowTaskAttributes start_workflow = 3;
// A timer has fired, allowing whatever was waiting on it
TimerFiredTaskAttributes unblock_timer = 4;
StartWorkflowTaskAttributes start_workflow = 1;
// A timer has fired, allowing whatever was waiting on it (if anything) to proceed
TimerFiredTaskAttributes timer_fired = 2;
}
}

Expand Down
26 changes: 9 additions & 17 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ mod test {
use crate::{
machines::test_help::TestHistoryBuilder,
protos::{
coresdk::{task, wf_activation, WfActivation},
coresdk::{wf_activation_job, WfActivationJob},
temporal::api::{
command::v1::{
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
Expand Down Expand Up @@ -417,14 +417,10 @@ mod test {
let res = dbg!(core.poll_task().unwrap());
// TODO: uggo
assert_matches!(
res,
Task {
variant: Some(task::Variant::Workflow(WfActivation {
attributes: Some(wf_activation::Attributes::StartWorkflow(_)),
..
})),
..
}
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(_)),
}]
);
assert!(core.workflow_machines.get(run_id).is_some());

Expand All @@ -443,14 +439,10 @@ mod test {
let res = dbg!(core.poll_task().unwrap());
// TODO: uggo
assert_matches!(
res,
Task {
variant: Some(task::Variant::Workflow(WfActivation {
attributes: Some(wf_activation::Attributes::UnblockTimer(_)),
..
})),
..
}
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(_)),
}]
);
let task_tok = res.task_token;
core.complete_task(CompleteTaskReq::ok_from_api_attrs(
Expand Down
7 changes: 3 additions & 4 deletions src/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,8 @@ pub(crate) use workflow_machines::{WFMachinesError, WorkflowMachines};

use crate::{
machines::workflow_machines::WorkflowTrigger,
protos::coresdk::wf_activation,
protos::{
coresdk::{self, command::Variant},
coresdk::{self, command::Variant, wf_activation_job},
temporal::api::{
command::v1::{
command::Attributes, Command, CompleteWorkflowExecutionCommandAttributes,
Expand Down Expand Up @@ -93,10 +92,10 @@ pub(crate) trait DrivenWorkflow: ActivationListener + Send {
) -> Result<(), anyhow::Error>;
}

/// Allows observers to listen to newly generated outgoing activations. Used for testing, where
/// Allows observers to listen to newly generated outgoing activation jobs. Used for testing, where
/// some activations must be handled before outgoing commands are issued to avoid deadlocking.
pub(crate) trait ActivationListener {
fn on_activation(&mut self, _activation: &wf_activation::Attributes) {}
fn on_activation_job(&mut self, _activation: &wf_activation_job::Attributes) {}
}

/// The struct for [WFCommand::AddCommand]
Expand Down
6 changes: 3 additions & 3 deletions src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::Result;
use crate::{
machines::{ActivationListener, DrivenWorkflow, WFCommand},
protos::{
coresdk::{wf_activation::Attributes, TimerFiredTaskAttributes},
coresdk::{wf_activation_job::Attributes, TimerFiredTaskAttributes},
temporal::api::{
command::v1::StartTimerCommandAttributes,
history::v1::{
Expand Down Expand Up @@ -58,8 +58,8 @@ where
}

impl<F> ActivationListener for TestWorkflowDriver<F> {
fn on_activation(&mut self, activation: &Attributes) {
if let Attributes::UnblockTimer(TimerFiredTaskAttributes { timer_id }) = activation {
fn on_activation_job(&mut self, activation: &Attributes) {
if let Attributes::TimerFired(TimerFiredTaskAttributes { timer_id }) = activation {
Arc::get_mut(&mut self.cache)
.unwrap()
.unblocked_timers
Expand Down
2 changes: 1 addition & 1 deletion src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
AddCommand, CancellableCommand, WFCommand, WFMachinesAdapter,
},
protos::{
coresdk::{wf_activation, HistoryEventId, TimerFiredTaskAttributes, WfActivation},
coresdk::{HistoryEventId, TimerFiredTaskAttributes, WfActivation},
temporal::api::{
command::v1::{
command::Attributes, CancelTimerCommandAttributes, Command,
Expand Down
37 changes: 22 additions & 15 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::machines::ActivationListener;
use crate::protos::coresdk::WfActivationJob;
use crate::{
machines::{
complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer,
workflow_task_state_machine::WorkflowTaskMachine, CancellableCommand, DrivenWorkflow,
ProtoCommand, TemporalStateMachine, WFCommand,
},
protos::{
coresdk::{wf_activation, StartWorkflowTaskAttributes, WfActivation},
coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation},
temporal::api::{
command::v1::StartTimerCommandAttributes,
common::v1::WorkflowExecution,
Expand Down Expand Up @@ -62,8 +63,8 @@ pub(crate) struct WorkflowMachines {
/// Old note: It is a queue as commands can be added (due to marker based commands) while
/// iterating over already added commands.
current_wf_task_commands: VecDeque<CancellableCommand>,
/// Outgoing activations that need to be sent to the lang sdk
outgoing_wf_activations: VecDeque<wf_activation::Attributes>,
/// Outgoing activation jobs that need to be sent to the lang sdk
outgoing_wf_activation_jobs: VecDeque<wf_activation_job::Attributes>,

/// The workflow that is being driven by this instance of the machines
drive_me: Box<dyn DrivenWorkflow + 'static>,
Expand All @@ -73,7 +74,7 @@ pub(crate) struct WorkflowMachines {
#[derive(Debug, derive_more::From)]
#[must_use]
pub(super) enum WorkflowTrigger {
PushWFActivation(#[from(forward)] wf_activation::Attributes),
PushWFJob(#[from(forward)] wf_activation_job::Attributes),
TriggerWFTaskStarted {
task_started_event_id: i64,
time: SystemTime,
Expand Down Expand Up @@ -114,7 +115,7 @@ impl WorkflowMachines {
machines_by_id: Default::default(),
commands: Default::default(),
current_wf_task_commands: Default::default(),
outgoing_wf_activations: Default::default(),
outgoing_wf_activation_jobs: Default::default(),
}
}

Expand Down Expand Up @@ -287,7 +288,7 @@ impl WorkflowMachines {
{
self.run_id = attrs.original_execution_run_id.clone();
// We need to notify the lang sdk that it's time to kick off a workflow
self.outgoing_wf_activations.push_back(
self.outgoing_wf_activation_jobs.push_back(
StartWorkflowTaskAttributes {
// TODO: This needs to be set during init
workflow_type: "".to_string(),
Expand Down Expand Up @@ -344,14 +345,20 @@ impl WorkflowMachines {
/// Returns the next activation that needs to be performed by the lang sdk. Things like unblock
/// timer, etc.
pub(crate) fn get_wf_activation(&mut self) -> Option<WfActivation> {
self.outgoing_wf_activations
.pop_front()
.map(|attrs| WfActivation {
// todo wat ?
timestamp: None,
if self.outgoing_wf_activation_jobs.is_empty() {
None
} else {
let jobs = self
.outgoing_wf_activation_jobs
.drain(..)
.map(Into::into)
.collect();
Some(WfActivation {
timestamp: self.current_wf_time.map(Into::into),
run_id: self.run_id.clone(),
attributes: attrs.into(),
jobs,
})
}
}

/// Given an event id (possibly zero) of the last successfully executed workflow task and an
Expand Down Expand Up @@ -394,9 +401,9 @@ impl WorkflowMachines {
event!(Level::DEBUG, msg = "Machine produced triggers", ?triggers);
for trigger in triggers {
match trigger {
WorkflowTrigger::PushWFActivation(a) => {
self.drive_me.on_activation(&a);
self.outgoing_wf_activations.push_back(a);
WorkflowTrigger::PushWFJob(a) => {
self.drive_me.on_activation_job(&a);
self.outgoing_wf_activation_jobs.push_back(a);
}
WorkflowTrigger::TriggerWFTaskStarted {
task_started_event_id,
Expand Down
19 changes: 19 additions & 0 deletions src/protos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub mod coresdk {
use super::temporal::api::command::v1 as api_command;
use super::temporal::api::command::v1::Command as ApiCommand;
use crate::protos::coresdk::complete_task_req::Completion;
use crate::protos::coresdk::wf_activation_job::Attributes;
use command::Variant;

pub type HistoryEventId = i64;
Expand All @@ -23,6 +24,24 @@ pub mod coresdk {
variant: Some(t.into()),
}
}

/// Returns any contained jobs if this task was a wf activation and it had some
#[cfg(test)]
pub fn get_wf_jobs(&self) -> Vec<WfActivationJob> {
if let Some(task::Variant::Workflow(a)) = &self.variant {
a.jobs.clone()
} else {
vec![]
}
}
}

impl From<wf_activation_job::Attributes> for WfActivationJob {
fn from(a: Attributes) -> Self {
WfActivationJob {
attributes: Some(a),
}
}
}

impl From<Vec<ApiCommand>> for WfActivationSuccess {
Expand Down