diff --git a/.buildkite/docker/docker-compose.yaml b/.buildkite/docker/docker-compose.yaml index 002bd045e..abc9aca59 100644 --- a/.buildkite/docker/docker-compose.yaml +++ b/.buildkite/docker/docker-compose.yaml @@ -8,16 +8,6 @@ services: ports: - "9042:9042" - statsd: - image: hopsoft/graphite-statsd - logging: - driver: none - ports: - - "8080:80" - - "2003:2003" - - "8125:8125" - - "8126:8126" - temporal: image: temporalio/auto-setup:1.6.3 logging: @@ -33,11 +23,20 @@ services: - "6939:6939" environment: - "CASSANDRA_SEEDS=cassandra" - - "STATSD_ENDPOINT=statsd:8125" - "DYNAMIC_CONFIG_FILE_PATH=config/dynamicconfig/development.yaml" depends_on: - cassandra - - statsd + + temporal-web: + image: temporalio/web:1.7.0 + logging: + driver: none + ports: + - "8088:8088" + environment: + - "TEMPORAL_GRPC_ENDPOINT=temporal:7233" + depends_on: + - temporal unit-test: build: diff --git a/src/machines/test_help/workflow_driver.rs b/src/machines/test_help/workflow_driver.rs index 801eb4969..1ab783337 100644 --- a/src/machines/test_help/workflow_driver.rs +++ b/src/machines/test_help/workflow_driver.rs @@ -22,7 +22,7 @@ use tracing::Level; /// This is a test only implementation of a [DrivenWorkflow] which has finer-grained control /// over when commands are returned than a normal workflow would. /// -/// It replaces "TestEnitityTestListenerBase" in java which is pretty hard to follow. +/// It replaces "TestEntityTestListenerBase" in java which is pretty hard to follow. /// /// It is important to understand that this driver doesn't work like a real workflow in the sense /// that nothing in it ever blocks, or ever should block. Every workflow task will run through the diff --git a/src/protos/mod.rs b/src/protos/mod.rs index 777c187d9..ed8995319 100644 --- a/src/protos/mod.rs +++ b/src/protos/mod.rs @@ -31,6 +31,15 @@ pub mod coresdk { vec![] } } + + /// Returns the workflow run id if the task was a workflow + pub fn get_run_id(&self) -> Option<&str> { + if let Some(task::Variant::Workflow(a)) = &self.variant { + Some(&a.run_id) + } else { + None + } + } } impl From for WfActivationJob { diff --git a/tests/integ_tests/simple_wf_tests.rs b/tests/integ_tests/simple_wf_tests.rs index ce2d1d7cc..6de090128 100644 --- a/tests/integ_tests/simple_wf_tests.rs +++ b/tests/integ_tests/simple_wf_tests.rs @@ -1,12 +1,23 @@ use assert_matches::assert_matches; use rand::{self, Rng}; -use std::{convert::TryFrom, env, time::Duration}; -use temporal_sdk_core::protos::temporal::api::command::v1::CancelTimerCommandAttributes; +use std::{ + collections::HashMap, + convert::TryFrom, + env, + sync::{ + mpsc::{channel, Receiver}, + Arc, + }, + time::Duration, +}; use temporal_sdk_core::{ protos::{ - coresdk::{wf_activation_job, FireTimer, TaskCompletion, WfActivationJob}, + coresdk::{ + wf_activation_job, FireTimer, StartWorkflow, Task, TaskCompletion, WfActivationJob, + }, temporal::api::command::v1::{ - CompleteWorkflowExecutionCommandAttributes, StartTimerCommandAttributes, + CancelTimerCommandAttributes, CompleteWorkflowExecutionCommandAttributes, + StartTimerCommandAttributes, }, }, Core, CoreInitOptions, ServerGatewayOptions, Url, @@ -22,18 +33,26 @@ use temporal_sdk_core::{ const NAMESPACE: &str = "default"; #[tokio::main] -async fn create_workflow(core: &dyn Core, task_q: &str, workflow_id: &str) -> String { +async fn create_workflow( + core: &dyn Core, + task_q: &str, + workflow_id: &str, + wf_type: Option<&str>, +) -> String { core.server_gateway() .unwrap() - .start_workflow(NAMESPACE, task_q, workflow_id, "test-workflow") + .start_workflow( + NAMESPACE, + task_q, + workflow_id, + wf_type.unwrap_or("test-workflow"), + ) .await .unwrap() .run_id } -#[test] -fn timer_workflow() { - let task_q = "timer_workflow"; +fn get_integ_core() -> impl Core { let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { Ok(addr) => addr, Err(_) => "http://localhost:7233".to_owned(), @@ -41,15 +60,22 @@ fn timer_workflow() { let url = Url::try_from(&*temporal_server_address).unwrap(); let gateway_opts = ServerGatewayOptions { namespace: NAMESPACE.to_string(), - identity: "none".to_string(), + identity: "integ_tester".to_string(), worker_binary_id: "".to_string(), long_poll_timeout: Duration::from_secs(60), target_url: url, }; let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + core +} + +#[test] +fn timer_workflow() { + let task_q = "timer_workflow"; + let core = get_integ_core(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); + create_workflow(&core, task_q, &workflow_id.to_string(), None); let timer_id: String = rng.gen::().to_string(); let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( @@ -73,22 +99,10 @@ fn timer_workflow() { #[test] fn parallel_timer_workflow() { let task_q = "parallel_timer_workflow"; - let temporal_server_address = match env::var("TEMPORAL_SERVICE_ADDRESS") { - Ok(addr) => addr, - Err(_) => "http://localhost:7233".to_owned(), - }; - let url = Url::try_from(&*temporal_server_address).unwrap(); - let gateway_opts = ServerGatewayOptions { - namespace: NAMESPACE.to_string(), - identity: "none".to_string(), - worker_binary_id: "".to_string(), - long_poll_timeout: Duration::from_secs(60), - target_url: url, - }; - let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); + let core = get_integ_core(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); + create_workflow(&core, task_q, &workflow_id.to_string(), None); let timer_id = "timer 1".to_string(); let timer_2_id = "timer 2".to_string(); let task = dbg!(core.poll_task(task_q).unwrap()); @@ -157,7 +171,12 @@ fn timer_cancel_workflow() { let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - dbg!(create_workflow(&core, task_q, &workflow_id.to_string())); + dbg!(create_workflow( + &core, + task_q, + &workflow_id.to_string(), + None + )); let timer_id = "wait_timer"; let cancel_timer_id = "cancel_timer"; let task = core.poll_task(task_q).unwrap(); @@ -211,7 +230,7 @@ fn timer_immediate_cancel_workflow() { let core = temporal_sdk_core::init(CoreInitOptions { gateway_opts }).unwrap(); let mut rng = rand::thread_rng(); let workflow_id: u32 = rng.gen(); - create_workflow(&core, task_q, &workflow_id.to_string()); + create_workflow(&core, task_q, &workflow_id.to_string(), None); let cancel_timer_id = "cancel_timer"; let task = core.poll_task(task_q).unwrap(); core.complete_task(TaskCompletion::ok_from_api_attrs( @@ -232,3 +251,69 @@ fn timer_immediate_cancel_workflow() { )) .unwrap(); } + +#[test] +fn parallel_workflows_same_queue() { + let task_q = "parallel_workflows_same_queue"; + let core = get_integ_core(); + let num_workflows = 25; + + let run_ids: Vec<_> = (0..num_workflows) + .map(|i| create_workflow(&core, task_q, &format!("wf-id-{}", i), Some("wf-type-1"))) + .collect(); + + let mut send_chans = HashMap::new(); + + fn wf_thread(core: Arc, task_chan: Receiver) { + let task = task_chan.recv().unwrap(); + assert_matches!( + task.get_wf_jobs().as_slice(), + [WfActivationJob { + variant: Some(wf_activation_job::Variant::StartWorkflow( + StartWorkflow { + workflow_type, + .. + } + )), + }] => assert_eq!(&workflow_type, &"wf-type-1") + ); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![StartTimerCommandAttributes { + timer_id: "timer".to_string(), + start_to_fire_timeout: Some(Duration::from_secs(1).into()), + ..Default::default() + } + .into()], + task.task_token, + )) + .unwrap(); + let task = task_chan.recv().unwrap(); + core.complete_task(TaskCompletion::ok_from_api_attrs( + vec![CompleteWorkflowExecutionCommandAttributes { result: None }.into()], + task.task_token, + )) + .unwrap(); + } + + let core = Arc::new(core); + let handles: Vec<_> = run_ids + .iter() + .map(|run_id| { + let (tx, rx) = channel(); + send_chans.insert(run_id.clone(), tx); + let core_c = core.clone(); + std::thread::spawn(move || wf_thread(core_c, rx)) + }) + .collect(); + + for _ in 0..num_workflows * 2 { + let task = core.poll_task(task_q).unwrap(); + send_chans + .get(task.get_run_id().unwrap()) + .unwrap() + .send(task) + .unwrap(); + } + + handles.into_iter().for_each(|h| h.join().unwrap()); +}