Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
a78439e
Creating new UT, want to dedupe next
Sushisource Feb 9, 2021
79495e6
Dedupe core response setup
Sushisource Feb 9, 2021
070f863
Response batch control
Sushisource Feb 9, 2021
ab685cc
Saving here while waiting for some feedback
Sushisource Feb 9, 2021
72327b9
Replay test works. Just need to do a bit of cleanup
Sushisource Feb 10, 2021
a29489e
Fix task token stuff
Sushisource Feb 10, 2021
4bf1703
Some refactoring to eliminate invalid states in workflow representation
Sushisource Feb 11, 2021
fed36fb
Make parallel timer integ test actually test two timers
Sushisource Feb 11, 2021
28a45bd
Merge branch 'master' into more-tests
Sushisource Feb 11, 2021
121977a
Clippy lints
Sushisource Feb 11, 2021
c03b304
Fix a todo
Sushisource Feb 11, 2021
7300cd6
Clean up uggo todos, it's clear we want to reduce test verbosity
Sushisource Feb 11, 2021
f722f3e
Fix merge problem in integ tests
Sushisource Feb 11, 2021
d59c394
Use separate task queues to enable running integ tests in parallel
Sushisource Feb 11, 2021
600e82b
Use explicit rust image version to avoid clippy inconsistencies
Sushisource Feb 12, 2021
e472fb3
CI server needs a bit more time to fire timers under test
Sushisource Feb 12, 2021
edc3254
Simplify timer fsm code before proceeding
Sushisource Feb 12, 2021
b7499e5
Ensure wf machines attaches full event to malformed details
Sushisource Feb 12, 2021
8e8aac6
Test for mismatched id error
Sushisource Feb 12, 2021
a2d7db7
Merge remote-tracking branch 'upstream/master' into cancellations
Sushisource Feb 12, 2021
d9d5167
Some various error enhancements
Sushisource Feb 13, 2021
5c7901f
Rename WorkflowTrigger -> MachineResponse
Sushisource Feb 16, 2021
e1f7627
Moving to desktop
Sushisource Feb 17, 2021
23b2f47
Most of the way there. Fixing deadlocks.
Sushisource Feb 17, 2021
0155521
Deadlocks are fixed!
Sushisource Feb 17, 2021
b23ae72
Some cleanup and UTs
Sushisource Feb 18, 2021
ace7a1c
Replace all unwraps with expectations
Sushisource Feb 18, 2021
2d171ba
Clean up unused warning
Sushisource Feb 18, 2021
75e3da4
Sensibly drop workflow managers when send side is dropped
Sushisource Feb 18, 2021
42feb9b
Extract function for dedicated thread
Sushisource Feb 18, 2021
139d43f
Function breakdown
Sushisource Feb 18, 2021
39f75ca
Fix busy looping
Sushisource Feb 18, 2021
e2566eb
Server is a bit inconsistent w/ timing in parallel timer test.
Sushisource Feb 18, 2021
bb97e12
Remove leftover temp RC variable
Sushisource Feb 18, 2021
fa15c49
Comment correction
Sushisource Feb 18, 2021
89c824b
Merge branch 'single-threaded-workflows' into cancellations
Sushisource Feb 18, 2021
8f1cb1b
Got things compiling with Rc but it's a mess
Sushisource Feb 19, 2021
1d8caaf
Merge branch 'master' into cancellations
Sushisource Feb 19, 2021
7e7b44a
Fixed exploding Rc, still a mess.
Sushisource Feb 19, 2021
0f4a32c
Incremental timer cancel UT is working
Sushisource Feb 19, 2021
fe41ba2
:tada: Working!
Sushisource Feb 19, 2021
8495c98
Fix errors and remove extra indirection in timer machine lookup
Sushisource Feb 23, 2021
6015f20
Fix tracing warnings
Sushisource Feb 23, 2021
98f4dc9
Clippy fixes
Sushisource Feb 23, 2021
6bc7bb1
Use slotmap instead of Rc<RefCell<>> stuff
Sushisource Feb 24, 2021
e577b3e
Merge branch 'master' into cancellations
Sushisource Feb 24, 2021
6653bfc
Add higher level test, dedupe replay testing stuff with rstest
Sushisource Feb 24, 2021
64b7527
Integration test
Sushisource Feb 24, 2021
6dcc989
Various cleanups
Sushisource Feb 24, 2021
eae5143
Merge branch 'master' into cancellations
Sushisource Feb 24, 2021
f688423
Use shutdown of workflow managers in core shutdown
Sushisource Feb 24, 2021
ce8600d
Remove probably unneeded todo
Sushisource Feb 24, 2021
a8cabe8
Dedupe histories in tests
Sushisource Feb 24, 2021
54e3216
Merge remote-tracking branch 'upstream/master' into cancellations
Sushisource Feb 24, 2021
68ca783
Fix up merge problems
Sushisource Feb 24, 2021
1e2f7c7
Add UT for cancelling before sending to server
Sushisource Feb 25, 2021
431064e
Add higher level & integ test. Need to fix hist ending w/ WECompleted
Sushisource Feb 25, 2021
b5b601c
Make history w/ workflow execution completed work for cancel b4 sent
Sushisource Feb 25, 2021
711c30e
Add integ test that polls and completes sorta-parallel
Sushisource Feb 26, 2021
56aee67
MORE PARALLEL!
Sushisource Feb 26, 2021
c82dc5f
Optionally enable temporal web in docker compose
Sushisource Feb 26, 2021
7050d68
Merge remote-tracking branch 'upstream/master' into parallel-polling
Sushisource Mar 1, 2021
47de750
Fix merge error / restore no logger in docker compose
Sushisource Mar 1, 2021
e7e974e
Fix integ test compile issues
Sushisource Mar 1, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions .buildkite/docker/docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,6 @@ services:
ports:
- "9042:9042"

statsd:
image: hopsoft/graphite-statsd
logging:
driver: none
ports:
- "8080:80"
- "2003:2003"
- "8125:8125"
- "8126:8126"

temporal:
image: temporalio/auto-setup:1.6.3
logging:
Expand All @@ -33,11 +23,20 @@ services:
- "6939:6939"
environment:
- "CASSANDRA_SEEDS=cassandra"
- "STATSD_ENDPOINT=statsd:8125"
- "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml"
depends_on:
- cassandra
- statsd

temporal-web:
image: temporalio/web:1.7.0
logging:
driver: none
ports:
- "8088:8088"
environment:
- "TEMPORAL_GRPC_ENDPOINT=temporal:7233"
depends_on:
- temporal

unit-test:
build:
Expand Down
2 changes: 1 addition & 1 deletion src/machines/test_help/workflow_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use tracing::Level;
/// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control
/// over when commands are returned than a normal workflow would.
///
/// It replaces "TestEnitityTestListenerBase" in java which is pretty hard to follow.
/// It replaces "TestEntityTestListenerBase" in java which is pretty hard to follow.
///
/// It is important to understand that this driver doesn't work like a real workflow in the sense
/// that nothing in it ever blocks, or ever should block. Every workflow task will run through the
Expand Down
9 changes: 9 additions & 0 deletions src/protos/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ pub mod coresdk {
vec![]
}
}

/// Returns the workflow run id if the task was a workflow
pub fn get_run_id(&self) -> Option<&str> {
if let Some(task::Variant::Workflow(a)) = &self.variant {
Some(&a.run_id)
} else {
None
}
}
}

impl From<wf_activation_job::Variant> for WfActivationJob {
Expand Down
139 changes: 112 additions & 27 deletions tests/integ_tests/simple_wf_tests.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
use assert_matches::assert_matches;
use rand::{self, Rng};
use std::{convert::TryFrom, env, time::Duration};
use temporal_sdk_core::protos::temporal::api::command::v1::CancelTimerCommandAttributes;
use std::{
collections::HashMap,
convert::TryFrom,
env,
sync::{
mpsc::{channel, Receiver},
Arc,
},
time::Duration,
};
use temporal_sdk_core::{
protos::{
coresdk::{wf_activation_job, FireTimer, TaskCompletion, WfActivationJob},
coresdk::{
wf_activation_job, FireTimer, StartWorkflow, Task, TaskCompletion, WfActivationJob,
},
temporal::api::command::v1::{
CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes,
CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes,
StartTimerCommandAttributes,
},
},
Core, CoreInitOptions, ServerGatewayOptions, Url,
Expand All @@ -22,34 +33,49 @@ use temporal_sdk_core::{
const NAMESPACE: &str = "default";

#[tokio::main]
async fn create_workflow(core: &dyn Core, task_q: &str, workflow_id: &str) -> String {
async fn create_workflow(
core: &dyn Core,
task_q: &str,
workflow_id: &str,
wf_type: Option<&str>,
) -> String {
core.server_gateway()
.unwrap()
.start_workflow(NAMESPACE, task_q, workflow_id, "test-workflow")
.start_workflow(
NAMESPACE,
task_q,
workflow_id,
wf_type.unwrap_or("test-workflow"),
)
.await
.unwrap()
.run_id
}

#[test]
fn timer_workflow() {
let task_q = "timer_workflow";
fn get_integ_core() -> impl Core {
let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") {
Ok(addr) => addr,
Err(_) => "http://localhost:7233".to_owned(),
};
let url = Url::try_from(&*temporal_server_address).unwrap();
let gateway_opts = ServerGatewayOptions {
namespace: NAMESPACE.to_string(),
identity: "none".to_string(),
identity: "integ_tester".to_string(),
worker_binary_id: "".to_string(),
long_poll_timeout: Duration::from_secs(60),
target_url: url,
};
let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap();
core
}

#[test]
fn timer_workflow() {
let task_q = "timer_workflow";
let core = get_integ_core();
let mut rng = rand::thread_rng();
let workflow_id: u32 = rng.gen();
dbg!(create_workflow(&core, task_q, &workflow_id.to_string()));
create_workflow(&core, task_q, &workflow_id.to_string(), None);
let timer_id: String = rng.gen::<u32>().to_string();
let task = core.poll_task(task_q).unwrap();
core.complete_task(TaskCompletion::ok_from_api_attrs(
Expand All @@ -73,22 +99,10 @@ fn timer_workflow() {
#[test]
fn parallel_timer_workflow() {
let task_q = "parallel_timer_workflow";
let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") {
Ok(addr) => addr,
Err(_) => "http://localhost:7233".to_owned(),
};
let url = Url::try_from(&*temporal_server_address).unwrap();
let gateway_opts = ServerGatewayOptions {
namespace: NAMESPACE.to_string(),
identity: "none".to_string(),
worker_binary_id: "".to_string(),
long_poll_timeout: Duration::from_secs(60),
target_url: url,
};
let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap();
let core = get_integ_core();
let mut rng = rand::thread_rng();
let workflow_id: u32 = rng.gen();
dbg!(create_workflow(&core, task_q, &workflow_id.to_string()));
create_workflow(&core, task_q, &workflow_id.to_string(), None);
let timer_id = "timer 1".to_string();
let timer_2_id = "timer 2".to_string();
let task = dbg!(core.poll_task(task_q).unwrap());
Expand Down Expand Up @@ -157,7 +171,12 @@ fn timer_cancel_workflow() {
let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap();
let mut rng = rand::thread_rng();
let workflow_id: u32 = rng.gen();
dbg!(create_workflow(&core, task_q, &workflow_id.to_string()));
dbg!(create_workflow(
&core,
task_q,
&workflow_id.to_string(),
None
));
let timer_id = "wait_timer";
let cancel_timer_id = "cancel_timer";
let task = core.poll_task(task_q).unwrap();
Expand Down Expand Up @@ -211,7 +230,7 @@ fn timer_immediate_cancel_workflow() {
let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap();
let mut rng = rand::thread_rng();
let workflow_id: u32 = rng.gen();
create_workflow(&core, task_q, &workflow_id.to_string());
create_workflow(&core, task_q, &workflow_id.to_string(), None);
let cancel_timer_id = "cancel_timer";
let task = core.poll_task(task_q).unwrap();
core.complete_task(TaskCompletion::ok_from_api_attrs(
Expand All @@ -232,3 +251,69 @@ fn timer_immediate_cancel_workflow() {
))
.unwrap();
}

#[test]
fn parallel_workflows_same_queue() {
let task_q = "parallel_workflows_same_queue";
let core = get_integ_core();
let num_workflows = 25;

let run_ids: Vec<_> = (0..num_workflows)
.map(|i| create_workflow(&core, task_q, &format!("wf-id-{}", i), Some("wf-type-1")))
.collect();

let mut send_chans = HashMap::new();

fn wf_thread(core: Arc<dyn Core>, task_chan: Receiver<Task>) {
let task = task_chan.recv().unwrap();
assert_matches!(
task.get_wf_jobs().as_slice(),
[WfActivationJob {
variant: Some(wf_activation_job::Variant::StartWorkflow(
StartWorkflow {
workflow_type,
..
}
)),
}] => assert_eq!(&workflow_type, &"wf-type-1")
);
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![StartTimerCommandAttributes {
timer_id: "timer".to_string(),
start_to_fire_timeout: Some(Duration::from_secs(1).into()),
..Default::default()
}
.into()],
task.task_token,
))
.unwrap();
let task = task_chan.recv().unwrap();
core.complete_task(TaskCompletion::ok_from_api_attrs(
vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()],
task.task_token,
))
.unwrap();
}

let core = Arc::new(core);
let handles: Vec<_> = run_ids
.iter()
.map(|run_id| {
let (tx, rx) = channel();
send_chans.insert(run_id.clone(), tx);
let core_c = core.clone();
std::thread::spawn(move || wf_thread(core_c, rx))
})
.collect();

for _ in 0..num_workflows * 2 {
let task = core.poll_task(task_q).unwrap();
send_chans
.get(task.get_run_id().unwrap())
.unwrap()
.send(task)
.unwrap();
}

handles.into_iter().for_each(|h| h.join().unwrap());
}