Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
a78439e
Creating new UT, want to dedupe next
Sushisource Feb 9, 2021
79495e6
Dedupe core response setup
Sushisource Feb 9, 2021
070f863
Response batch control
Sushisource Feb 9, 2021
ab685cc
Saving here while waiting for some feedback
Sushisource Feb 9, 2021
72327b9
Replay test works. Just need to do a bit of cleanup
Sushisource Feb 10, 2021
a29489e
Fix task token stuff
Sushisource Feb 10, 2021
4bf1703
Some refactoring to eliminate invalid states in workflow representation
Sushisource Feb 11, 2021
fed36fb
Make parallel timer integ test actually test two timers
Sushisource Feb 11, 2021
28a45bd
Merge branch 'master' into more-tests
Sushisource Feb 11, 2021
121977a
Clippy lints
Sushisource Feb 11, 2021
c03b304
Fix a todo
Sushisource Feb 11, 2021
7300cd6
Clean up uggo todos, it's clear we want to reduce test verbosity
Sushisource Feb 11, 2021
f722f3e
Fix merge problem in integ tests
Sushisource Feb 11, 2021
d59c394
Use separate task queues to enable running integ tests in parallel
Sushisource Feb 11, 2021
600e82b
Use explicit rust image version to avoid clippy inconsistencies
Sushisource Feb 12, 2021
e472fb3
CI server needs a bit more time to fire timers under test
Sushisource Feb 12, 2021
edc3254
Simplify timer fsm code before proceeding
Sushisource Feb 12, 2021
b7499e5
Ensure wf machines attaches full event to malformed details
Sushisource Feb 12, 2021
8e8aac6
Test for mismatched id error
Sushisource Feb 12, 2021
a2d7db7
Merge remote-tracking branch 'upstream/master' into cancellations
Sushisource Feb 12, 2021
d9d5167
Some various error enhancements
Sushisource Feb 13, 2021
5c7901f
Rename WorkflowTrigger -> MachineResponse
Sushisource Feb 16, 2021
e1f7627
Moving to desktop
Sushisource Feb 17, 2021
23b2f47
Most of the way there. Fixing deadlocks.
Sushisource Feb 17, 2021
0155521
Deadlocks are fixed!
Sushisource Feb 17, 2021
b23ae72
Some cleanup and UTs
Sushisource Feb 18, 2021
ace7a1c
Replace all unwraps with expectations
Sushisource Feb 18, 2021
2d171ba
Clean up unused warning
Sushisource Feb 18, 2021
75e3da4
Sensibly drop workflow managers when send side is dropped
Sushisource Feb 18, 2021
42feb9b
Extract function for dedicated thread
Sushisource Feb 18, 2021
139d43f
Function breakdown
Sushisource Feb 18, 2021
39f75ca
Fix busy looping
Sushisource Feb 18, 2021
e2566eb
Server is a bit inconsistent w/ timing in parallel timer test.
Sushisource Feb 18, 2021
bb97e12
Remove leftover temp RC variable
Sushisource Feb 18, 2021
fa15c49
Comment correction
Sushisource Feb 18, 2021
89c824b
Merge branch 'single-threaded-workflows' into cancellations
Sushisource Feb 18, 2021
8f1cb1b
Got things compiling with Rc but it's a mess
Sushisource Feb 19, 2021
1d8caaf
Merge branch 'master' into cancellations
Sushisource Feb 19, 2021
7e7b44a
Fixed exploding Rc, still a mess.
Sushisource Feb 19, 2021
0f4a32c
Incremental timer cancel UT is working
Sushisource Feb 19, 2021
fe41ba2
:tada: Working!
Sushisource Feb 19, 2021
8495c98
Fix errors and remove extra indirection in timer machine lookup
Sushisource Feb 23, 2021
6015f20
Fix tracing warnings
Sushisource Feb 23, 2021
98f4dc9
Clippy fixes
Sushisource Feb 23, 2021
6bc7bb1
Use slotmap instead of Rc<RefCell<>> stuff
Sushisource Feb 24, 2021
e577b3e
Merge branch 'master' into cancellations
Sushisource Feb 24, 2021
6653bfc
Add higher level test, dedupe replay testing stuff with rstest
Sushisource Feb 24, 2021
64b7527
Integration test
Sushisource Feb 24, 2021
6dcc989
Various cleanups
Sushisource Feb 24, 2021
eae5143
Merge branch 'master' into cancellations
Sushisource Feb 24, 2021
f688423
Use shutdown of workflow managers in core shutdown
Sushisource Feb 24, 2021
ce8600d
Remove probably unneeded todo
Sushisource Feb 24, 2021
a8cabe8
Dedupe histories in tests
Sushisource Feb 24, 2021
54e3216
Merge remote-tracking branch 'upstream/master' into cancellations
Sushisource Feb 24, 2021
68ca783
Fix up merge problems
Sushisource Feb 24, 2021
1e2f7c7
Add UT for cancelling before sending to server
Sushisource Feb 25, 2021
431064e
Add higher level & integ test. Need to fix hist ending w/ WECompleted
Sushisource Feb 25, 2021
b5b601c
Make history w/ workflow execution completed work for cancel b4 sent
Sushisource Feb 25, 2021
51d1b34
Move history application from history info to workflow machines
Sushisource Feb 25, 2021
e07ced1
Merge remote-tracking branch 'upstream/master' into invert-history-ap…
Sushisource Mar 1, 2021
08f1ac7
Fix merge && clippy lints / bool logic simplification
Sushisource Mar 1, 2021
7271e5f
Merge branch 'master' into invert-history-application
Sushisource Mar 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,10 +333,8 @@ pub enum CoreError {
#[cfg(test)]
mod test {
use super::*;
use crate::machines::test_help::TestHistoryBuilder;
use crate::protos::temporal::api::enums::v1::EventType;
use crate::{
machines::test_help::{build_fake_core, FakeCore},
machines::test_help::{build_fake_core, FakeCore, TestHistoryBuilder},
protos::{
coresdk::{
wf_activation_job, FireTimer, StartWorkflow, TaskCompletion, UpdateRandomSeed,
Expand All @@ -346,6 +344,7 @@ mod test {
CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes,
StartTimerCommandAttributes,
},
temporal::api::enums::v1::EventType,
},
test_help::canned_histories,
};
Expand Down
2 changes: 1 addition & 1 deletion src/machines/test_help/history_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ impl TestHistoryBuilder {
to_wf_task_num: Option<usize>,
) -> Result<()> {
let histinfo = HistoryInfo::new_from_events(&self.events, to_wf_task_num)?;
histinfo.apply_history_events(wf_machines)?;
wf_machines.apply_history_events(&histinfo)?;
Ok(())
}

Expand Down
2 changes: 1 addition & 1 deletion src/machines/test_help/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub(crate) fn build_fake_core(
let responses: Vec<_> = response_batches
.iter()
.map(|to_task_num| {
let batch = t.get_history_info(*to_task_num).unwrap().events;
let batch = t.get_history_info(*to_task_num).unwrap().events().to_vec();
let task_token: [u8; 16] = thread_rng().gen();
PollWorkflowTaskQueueResponse {
history: Some(History { events: batch }),
Expand Down
38 changes: 38 additions & 0 deletions src/machines/workflow_machines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
history::v1::{history_event, HistoryEvent},
},
},
protosext::HistoryInfo,
};
use slotmap::SlotMap;
use std::{
Expand Down Expand Up @@ -426,6 +427,43 @@ impl WorkflowMachines {
Ok(())
}

/// Apply events from history to this machines instance
pub(crate) fn apply_history_events(&mut self, history_info: &HistoryInfo) -> Result<()> {
let (_, events) = history_info
.events()
.split_at(self.get_last_started_event_id() as usize);
let mut history = events.iter().peekable();

self.set_started_ids(
history_info.previous_started_event_id,
history_info.workflow_task_started_event_id,
);

// HistoryInfo's constructor enforces some rules about the structure of history that
// could be enforced here, but needn't be because they have already been guaranteed by it.
// See the errors that can be returned from [HistoryInfo::new_from_events] for detail.

while let Some(event) = history.next() {
let next_event = history.peek();

if event.event_type == EventType::WorkflowTaskStarted as i32 && next_event.is_none() {
self.handle_event(event, false)?;
return Ok(());
}

self.handle_event(event, next_event.is_some())?;

if next_event.is_none() {
if event.is_final_wf_execution_event() {
return Ok(());
}
unreachable!()
}
}

Ok(())
}

/// Wrapper for calling [TemporalStateMachine::handle_event] which appropriately takes action
/// on the returned machine responses
fn submachine_handle_event(
Expand Down
68 changes: 6 additions & 62 deletions src/protosext/history_info.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use crate::{
machines::WorkflowMachines,
protos::temporal::api::enums::v1::EventType,
protos::temporal::api::history::v1::{History, HistoryEvent},
};

#[derive(Clone, Debug, derive_more::Constructor, PartialEq)]
#[derive(Clone, Debug, PartialEq)]
pub(crate) struct HistoryInfo {
pub previous_started_event_id: i64,
pub workflow_task_started_event_id: i64,
pub events: Vec<HistoryEvent>,
// This needs to stay private so the struct can't be instantiated outside of the constructor,
// which enforces some invariants regarding history structure that need to be upheld.
events: Vec<HistoryEvent>,
}

type Result<T, E = HistoryInfoError> = std::result::Result<T, E>;
Expand Down Expand Up @@ -101,65 +102,8 @@ impl HistoryInfo {
Self::new_from_events(&h.events, to_wf_task_num)
}

/// Apply events from history to workflow machines. Remember that only the events that exist
/// in this instance will be applied, which is determined by `to_wf_task_num` passed into the
/// constructor.
pub(crate) fn apply_history_events(&self, wf_machines: &mut WorkflowMachines) -> Result<()> {
let (_, events) = self
.events
.split_at(wf_machines.get_last_started_event_id() as usize);
let mut history = events.iter().peekable();

wf_machines.set_started_ids(
self.previous_started_event_id,
self.workflow_task_started_event_id,
);
let mut started_id = self.previous_started_event_id;

while let Some(event) = history.next() {
let next_event = history.peek();

if event.event_type == EventType::WorkflowTaskStarted as i32 {
let next_is_completed = next_event.map_or(false, |ne| {
ne.event_type == EventType::WorkflowTaskCompleted as i32
});
let next_is_failed_or_timeout = next_event.map_or(false, |ne| {
ne.event_type == EventType::WorkflowTaskFailed as i32
|| ne.event_type == EventType::WorkflowTaskTimedOut as i32
});

if next_event.is_none() || next_is_completed {
started_id = event.event_id;
if next_event.is_none() {
wf_machines
.handle_event(event, false)
.map_err(anyhow::Error::from)?;
return Ok(());
}
} else if next_event.is_some() && !next_is_failed_or_timeout {
return Err(HistoryInfoError::FailedOrTimeout(event.clone()));
}
}

wf_machines
.handle_event(event, next_event.is_some())
.map_err(anyhow::Error::from)?;

if next_event.is_none() {
if event.is_final_wf_execution_event() {
return Ok(());
}
if started_id != event.event_id {
return Err(HistoryInfoError::UnexpectedEventId {
previous_started_event_id: started_id,
workflow_task_started_event_id: event.event_id,
});
}
unreachable!()
}
}

Ok(())
pub(crate) fn events(&self) -> &[HistoryEvent] {
&self.events
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/workflow/concurrency_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ mod tests {
"some_run_id",
PollWorkflowTaskQueueResponse {
history: Some(History {
events: t.get_history_info(1).unwrap().events,
events: t.get_history_info(1).unwrap().events().to_vec(),
}),
workflow_execution: Some(WorkflowExecution {
workflow_id: "wid".to_string(),
Expand Down
36 changes: 31 additions & 5 deletions src/workflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,16 @@ pub(crate) struct NextWfActivation {
impl WorkflowManager {
/// Given history that was just obtained from the server, pipe it into this workflow's machines.
///
/// Should only be called when a workflow has caught up on replay. It will return a workflow
/// activation if one is needed, as well as a bool indicating if there are more workflow tasks
/// that need to be performed to replay the remaining history.
/// Should only be called when a workflow has caught up on replay (or is just beginning). It
/// will return a workflow activation if one is needed, as well as a bool indicating if there
/// are more workflow tasks that need to be performed to replay the remaining history.
#[instrument(skip(self))]
pub fn feed_history_from_server(&mut self, hist: History) -> Result<NextWfActivation> {
let task_hist = HistoryInfo::new_from_history(&hist, Some(self.current_wf_task_num))?;
let task_ct = hist.get_workflow_task_count(None)?;
self.last_history_task_count = task_ct;
self.last_history_from_server = hist;
task_hist.apply_history_events(&mut self.machines)?;
self.machines.apply_history_events(&task_hist)?;
let activation = self.machines.get_wf_activation();
let more_activations_needed = task_ct > self.current_wf_task_num;

Expand All @@ -137,7 +137,7 @@ impl WorkflowManager {
pub fn get_next_activation(&mut self) -> Result<NextWfActivation> {
let hist = &self.last_history_from_server;
let task_hist = HistoryInfo::new_from_history(hist, Some(self.current_wf_task_num))?;
task_hist.apply_history_events(&mut self.machines)?;
self.machines.apply_history_events(&task_hist)?;
let activation = self.machines.get_wf_activation();

self.current_wf_task_num += 1;
Expand All @@ -149,3 +149,29 @@ impl WorkflowManager {
})
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::{
protos::temporal::api::common::v1::WorkflowExecution, test_help::canned_histories,
};
use rand::{thread_rng, Rng};

#[test]
fn full_history_application() {
let t = canned_histories::single_timer("fake_timer");
let task_token: [u8; 16] = thread_rng().gen();
let pwtqr = PollWorkflowTaskQueueResponse {
history: Some(t.as_history()),
workflow_execution: Some(WorkflowExecution {
workflow_id: "wfid".to_string(),
run_id: "runid".to_string(),
}),
task_token: task_token.to_vec(),
..Default::default()
};
let mut wfm = WorkflowManager::new(pwtqr).unwrap();
wfm.get_next_activation().unwrap();
}
}