Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions protos/local/core_interface.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ message WFActivationJob {
StartWorkflowTaskAttributes start_workflow = 1;
// A timer has fired, allowing whatever was waiting on it (if anything) to proceed
TimerFiredTaskAttributes timer_fired = 2;
// Workflow was reset. The randomness seed has to be updated.
RandomSeedUpdatedAttributes random_seed_updated = 3;

QueryWorkflowJob query_workflow = 3;
CancelWorkflowTaskAttributes cancel_workflow = 4;
QueryWorkflowJob query_workflow = 4;
CancelWorkflowTaskAttributes cancel_workflow = 5;
}
}

Expand All @@ -76,6 +78,9 @@ message StartWorkflowTaskAttributes {
string workflow_id = 2;
// Input to the workflow code
temporal.api.common.v1.Payloads arguments = 3;
// The seed must be used to initialize the random generator used by SDK.
// RandomSeedUpdatedAttributes are used to deliver seed updates.
uint64 randomness_seed = 4;

// TODO: Do we need namespace here, or should that just be fetchable easily?
// will be others - workflow exe started attrs, etc
Expand All @@ -89,6 +94,10 @@ message TimerFiredTaskAttributes {
string timer_id = 1;
}

message RandomSeedUpdatedAttributes {
uint64 randomness_seed = 1;
}

message QueryWorkflowJob {
temporal.api.query.v1.WorkflowQuery query = 1;
}
Expand Down
98 changes: 93 additions & 5 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,7 @@ where
TaskCompletion {
variant: Some(task_completion::Variant::Activity(_)),
..
} => {
unimplemented!()
}
} => unimplemented!(),
_ => Err(CoreError::MalformedCompletion(req)),
}
}
Expand Down Expand Up @@ -332,13 +330,14 @@ mod test {
machines::test_help::{build_fake_core, TestHistoryBuilder},
protos::{
coresdk::{
wf_activation_job, TaskCompletion, TimerFiredTaskAttributes, WfActivationJob,
wf_activation_job, RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes,
TaskCompletion, TimerFiredTaskAttributes, WfActivationJob,
},
temporal::api::{
command::v1::{
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
},
enums::v1::EventType,
enums::v1::{EventType, WorkflowTaskFailedCause},
history::v1::{history_event, TimerFiredEventAttributes},
},
},
Expand Down Expand Up @@ -585,4 +584,93 @@ mod test {
CoreError::ShuttingDown
);
}

#[test]
fn workflow_update_random_seed_on_workflow_reset() {
let s = span!(Level::DEBUG, "Test start", t = "bridge");
let _enter = s.enter();

let wfid = "fake_wf_id";
let run_id = "CA733AB0-8133-45F6-A4C1-8D375F61AE8B";
let original_run_id = "86E39A5F-AE31-4626-BDFE-398EE072D156";
let timer_1_id = "timer1".to_string();
let task_queue = "test-task-queue";

/*
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_TIMER_STARTED
6: EVENT_TYPE_TIMER_FIRED
7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
8: EVENT_TYPE_WORKFLOW_TASK_STARTED
9: EVENT_TYPE_WORKFLOW_TASK_FAILED
10: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
11: EVENT_TYPE_WORKFLOW_TASK_STARTED
*/
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_workflow_task();
let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
history_event::Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: timer_1_id.clone(),
}),
);
t.add_workflow_task_scheduled_and_started();
t.add_workflow_task_failed(WorkflowTaskFailedCause::ResetWorkflow, original_run_id);

t.add_workflow_task_scheduled_and_started();

// NOTE! What makes this a replay test is the server only responds with *one* batch here.
// So, server is polled once, but lang->core interactions look just like non-replay test.
let core = build_fake_core(wfid, run_id, &mut t, &[2]);

let res = core.poll_task(task_queue).unwrap();
let randomness_seed_from_start: u64;
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::StartWorkflow(
StartWorkflowTaskAttributes{randomness_seed, ..}
)),
}] => {
randomness_seed_from_start = *randomness_seed;
}
);
assert!(core.workflow_machines.exists(run_id));

let task_tok = res.task_token;
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![StartTimerCommandAttributes {
timer_id: timer_1_id,
..Default::default()
}
.into()],
task_tok,
))
.unwrap();

let res = core.poll_task(task_queue).unwrap();
assert_matches!(
res.get_wf_jobs().as_slice(),
[WfActivationJob {
attributes: Some(wf_activation_job::Attributes::TimerFired(_),),
},
WfActivationJob {
attributes: Some(wf_activation_job::Attributes::RandomSeedUpdated(RandomSeedUpdatedAttributes{randomness_seed})),
}] => {
assert_ne!(randomness_seed_from_start, *randomness_seed)
}
);
let task_tok = res.task_token;
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()],
task_tok,
))
.unwrap();
}
}
20 changes: 17 additions & 3 deletions src/machines/test_help/history_builder.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use super::Result;
use crate::protos::temporal::api::history::v1::History;
use crate::protos::temporal::api::enums::v1::WorkflowTaskFailedCause;
use crate::protos::temporal::api::history::v1::{History, WorkflowTaskFailedEventAttributes};
use crate::{
machines::{workflow_machines::WorkflowMachines, ProtoCommand},
protos::temporal::api::{
Expand All @@ -14,6 +15,7 @@ use crate::{
};
use anyhow::bail;
use std::time::SystemTime;
use uuid::Uuid;

#[derive(Default, Debug)]
pub struct TestHistoryBuilder {
Expand Down Expand Up @@ -88,6 +90,16 @@ impl TestHistoryBuilder {
self.build_and_push_event(EventType::WorkflowTaskCompleted, attrs.into());
}

pub fn add_workflow_task_failed(&mut self, cause: WorkflowTaskFailedCause, new_run_id: &str) {
let attrs = WorkflowTaskFailedEventAttributes {
scheduled_event_id: self.workflow_task_scheduled_event_id,
cause: cause.into(),
new_run_id: new_run_id.into(),
..Default::default()
};
self.build_and_push_event(EventType::WorkflowTaskFailed, attrs.into());
}

pub fn as_history(&self) -> History {
History {
events: self.events.clone(),
Expand Down Expand Up @@ -142,9 +154,11 @@ impl TestHistoryBuilder {

fn default_attribs(et: EventType) -> Result<Attributes> {
Ok(match et {
EventType::WorkflowExecutionStarted => {
WorkflowExecutionStartedEventAttributes::default().into()
EventType::WorkflowExecutionStarted => WorkflowExecutionStartedEventAttributes {
original_execution_run_id: Uuid::new_v4().to_string(),
..Default::default()
}
.into(),
EventType::WorkflowTaskScheduled => WorkflowTaskScheduledEventAttributes::default().into(),
EventType::TimerStarted => TimerStartedEventAttributes::default().into(),
_ => bail!("Don't know how to construct default attrs for {:?}", et),
Expand Down
15 changes: 5 additions & 10 deletions src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ impl Created {

#[derive(Default, Clone)]
pub(super) struct CancelTimerCommandCreated {}

impl CancelTimerCommandCreated {
pub(super) fn on_command_cancel_timer(self, dat: SharedState) -> TimerMachineTransition {
TimerMachineTransition::ok(
Expand All @@ -163,6 +164,7 @@ pub(super) struct CancelTimerCommandSent {}

#[derive(Default, Clone)]
pub(super) struct Canceled {}

impl From<CancelTimerCommandSent> for Canceled {
fn from(_: CancelTimerCommandSent) -> Self {
Self::default()
Expand Down Expand Up @@ -227,9 +229,7 @@ impl WFMachinesAdapter for TimerMachine {
timer_id: self.shared_state.timer_attributes.timer_id.clone(),
}
.into()]),
TimerMachineCommand::AddCommand(_) => {
unreachable!()
}
TimerMachineCommand::AddCommand(_) => unreachable!(),
}
}
}
Expand Down Expand Up @@ -271,13 +271,8 @@ mod test {
8: EVENT_TYPE_WORKFLOW_TASK_STARTED

We have two versions of this test, one which processes the history in two calls,
and one which replays all of it in one go. The former will run the event loop three
times total, and the latter two.

There are two workflow tasks, so it seems we should only loop two times, but the reason
for the extra iteration in the incremental version is that we need to "wait" for the
timer to fire. In the all-in-one-go test, the timer is created and resolved in the same
task, hence no extra loop.
and one which replays all of it in one go. Both versions must produce the same
two activations.
*/
let twd = TestWorkflowDriver::new(|mut command_sink: CommandSender| async move {
let timer = StartTimerCommandAttributes {
Expand Down
33 changes: 31 additions & 2 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::machines::workflow_machines::WFMachinesError::MalformedEvent;
use crate::{
machines::{
complete_workflow_state_machine::complete_workflow, timer_state_machine::new_timer,
Expand All @@ -6,7 +7,10 @@ use crate::{
},
protos::coresdk::WfActivationJob,
protos::{
coresdk::{wf_activation_job, StartWorkflowTaskAttributes, WfActivation},
coresdk::{
wf_activation_job, wf_activation_job::Attributes::RandomSeedUpdated,
RandomSeedUpdatedAttributes, StartWorkflowTaskAttributes, WfActivation,
},
temporal::api::{
command::v1::StartTimerCommandAttributes,
common::v1::WorkflowExecution,
Expand All @@ -20,12 +24,16 @@ use rustfsm::StateMachine;
use std::{
borrow::BorrowMut,
cell::RefCell,
collections::hash_map::DefaultHasher,
collections::{HashMap, HashSet, VecDeque},
hash::Hash,
hash::Hasher,
ops::DerefMut,
sync::{atomic::AtomicBool, Arc},
time::SystemTime,
};
use tracing::Level;
use uuid::Uuid;

type Result<T, E = WFMachinesError> = std::result::Result<T, E>;

Expand All @@ -44,7 +52,7 @@ pub(crate) struct WorkflowMachines {
replaying: bool,
/// Workflow identifier
pub workflow_id: String,
/// Identifies the current run and is used as a seed for faux-randomness.
/// Identifies the current run
pub run_id: String,
/// The current workflow time if it has been established
current_wf_time: Option<SystemTime>,
Expand Down Expand Up @@ -76,6 +84,9 @@ pub(super) enum WorkflowTrigger {
task_started_event_id: i64,
time: SystemTime,
},
UpdateRunIdOnWorkflowReset {
run_id: String,
},
}

#[derive(thiserror::Error, Debug)]
Expand Down Expand Up @@ -292,6 +303,9 @@ impl WorkflowMachines {
.unwrap_or_default(),
workflow_id: self.workflow_id.clone(),
arguments: attrs.input.clone(),
randomness_seed: str_to_randomness_seed(
&attrs.original_execution_run_id,
),
}
.into(),
);
Expand Down Expand Up @@ -410,6 +424,15 @@ impl WorkflowMachines {
} => {
self.task_started(task_started_event_id, time);
}
WorkflowTrigger::UpdateRunIdOnWorkflowReset { run_id: new_run_id } => {
self.outgoing_wf_activation_jobs.push_back(
wf_activation_job::Attributes::RandomSeedUpdated(
RandomSeedUpdatedAttributes {
randomness_seed: str_to_randomness_seed(&new_run_id),
},
),
);
}
}
}
Ok(())
Expand Down Expand Up @@ -441,3 +464,9 @@ impl WorkflowMachines {
}
}
}

fn str_to_randomness_seed(run_id: &str) -> u64 {
let mut s = DefaultHasher::new();
run_id.hash(&mut s);
s.finish()
}
Loading