Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/machines/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ mod version_state_machine;
#[allow(unused)]
mod workflow_task_state_machine;

#[cfg(test)]
mod test_help;

/// A command which can be cancelled
#[derive(Debug, Clone)]
pub struct CancellableCommand {
Expand Down
128 changes: 128 additions & 0 deletions src/machines/test_help.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
use crate::protos::temporal::api::{
enums::v1::EventType,
history::v1::{
history_event::Attributes, HistoryEvent, WorkflowTaskCompletedEventAttributes,
WorkflowTaskStartedEventAttributes,
},
};
use std::time::SystemTime;

#[derive(Default, Debug)]
pub(crate) struct TestHistoryBuilder {
events: Vec<HistoryEvent>,
/// Is incremented every time a new event is added, and that *new* value is used as that event's
/// id
current_event_id: i64,
workflow_task_scheduled_event_id: i64,
previous_started_event_id: i64,
}

impl TestHistoryBuilder {
/// Add an event by type with attributes. Bundles both into a [HistoryEvent] with an id that is
/// incremented on each call to add.
pub(crate) fn add(&mut self, event_type: EventType, attribs: Attributes) {
self.build_and_push_event(event_type, Some(attribs));
}

/// Adds an event to the history by type, without attributes
pub(crate) fn add_by_type(&mut self, event_type: EventType) {
self.build_and_push_event(event_type.clone(), None);
}

/// Adds an event, returning the ID that was assigned to it
pub(crate) fn add_get_event_id(
&mut self,
event_type: EventType,
attrs: Option<Attributes>,
) -> i64 {
self.build_and_push_event(event_type, attrs);
self.current_event_id
}

/// Adds the following events:
/// ```text
/// EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
/// EVENT_TYPE_WORKFLOW_TASK_STARTED
/// EVENT_TYPE_WORKFLOW_TASK_COMPLETED
/// ```
pub(crate) fn add_workflow_task(&mut self) {
self.add_workflow_task_scheduled_and_started();
self.add_workflow_task_completed();
}

pub(crate) fn add_workflow_task_scheduled_and_started(&mut self) {
self.add_workflow_task_scheduled();
self.add_workflow_task_started();
}

pub(crate) fn add_workflow_task_scheduled(&mut self) {
// WFStarted always immediately follows WFScheduled
self.previous_started_event_id = self.workflow_task_scheduled_event_id + 1;
self.workflow_task_scheduled_event_id =
self.add_get_event_id(EventType::WorkflowTaskScheduled, None);
}

pub(crate) fn add_workflow_task_started(&mut self) {
let attrs = WorkflowTaskStartedEventAttributes {
scheduled_event_id: self.workflow_task_scheduled_event_id,
..Default::default()
};
self.build_and_push_event(
EventType::WorkflowTaskStarted,
Some(Attributes::WorkflowTaskStartedEventAttributes(attrs)),
);
}

pub(crate) fn add_workflow_task_completed(&mut self) {
let attrs = WorkflowTaskCompletedEventAttributes {
scheduled_event_id: self.workflow_task_scheduled_event_id,
..Default::default()
};
self.build_and_push_event(
EventType::WorkflowTaskCompleted,
Some(Attributes::WorkflowTaskCompletedEventAttributes(attrs)),
);
}

/// Counts the number of whole workflow tasks. Looks for WFTaskStarted followed by
/// WFTaskCompleted, adding one to the count for every match. It will additionally count
/// a WFTaskStarted at the end of the event list.
pub(crate) fn get_workflow_task_count(&self) -> usize {
let mut last_wf_started_id = 0;
let mut count = 0;
for (i, event) in self.events.iter().enumerate() {
let at_last_item = i == self.events.len() - 1;
let next_item_is_wftc = self
.events
.get(i + 1)
.map(|e| e.event_type == EventType::WorkflowTaskCompleted as i32)
.unwrap_or(false);
if event.event_type == EventType::WorkflowTaskStarted as i32
&& (at_last_item || next_item_is_wftc)
{
last_wf_started_id = event.event_id;
count += 1;
}
if at_last_item {
// No more events
if last_wf_started_id != event.event_id {
panic!("Last item in history wasn't WorkflowTaskStarted")
}
return count;
}
}
count
}

fn build_and_push_event(&mut self, event_type: EventType, attribs: Option<Attributes>) {
self.current_event_id += 1;
let evt = HistoryEvent {
event_type: event_type as i32,
event_id: self.current_event_id,
event_time: Some(SystemTime::now().into()),
attributes: attribs,
..Default::default()
};
self.events.push(evt);
}
}
37 changes: 36 additions & 1 deletion src/machines/timer_state_machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,41 @@ impl StartCommandRecorded {

#[cfg(test)]
mod test {
use crate::{
machines::test_help::TestHistoryBuilder,
protos::temporal::api::{
enums::v1::EventType,
history::{v1::history_event::Attributes, v1::TimerFiredEventAttributes},
},
};

#[test]
fn wat() {}
fn test_fire_happy_path() {
// We don't actually have a way to author workflows in rust yet, but the workflow that would
// match up with this is just a wf with one timer in it that fires normally.
/*
1: EVENT_TYPE_WORKFLOW_EXECUTION_STARTED
2: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
3: EVENT_TYPE_WORKFLOW_TASK_STARTED
4: EVENT_TYPE_WORKFLOW_TASK_COMPLETED
5: EVENT_TYPE_TIMER_STARTED
6: EVENT_TYPE_TIMER_FIRED
7: EVENT_TYPE_WORKFLOW_TASK_SCHEDULED
8: EVENT_TYPE_WORKFLOW_TASK_STARTED
*/
let mut t = TestHistoryBuilder::default();
t.add_by_type(EventType::WorkflowExecutionStarted);
t.add_workflow_task();
let timer_started_event_id = t.add_get_event_id(EventType::TimerStarted, None);
t.add(
EventType::TimerFired,
Attributes::TimerFiredEventAttributes(TimerFiredEventAttributes {
started_event_id: timer_started_event_id,
timer_id: "timer1".to_string(),
..Default::default()
}),
);
t.add_workflow_task_scheduled_and_started();
assert_eq!(2, t.get_workflow_task_count());
}
}