diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 3821e703d..aa7762708 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -38,6 +38,7 @@ tower = { version = "0.5", features = ["util"] } tracing = "0.1" url = "2.5" uuid = { version = "1.18", features = ["v4"] } +rand = "0.9.2" [dependencies.temporalio-common] path = "../common" diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index f76157df3..5f3e9d6ec 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -1243,7 +1243,7 @@ pub struct WorkflowOptions { pub search_attributes: Option>, /// Optionally enable Eager Workflow Start, a latency optimization using local workers - /// NOTE: Experimental and incompatible with versioning with BuildIDs + /// NOTE: Experimental pub enable_eager_workflow_start: bool, /// Optionally set a retry policy for the workflow diff --git a/crates/client/src/raw.rs b/crates/client/src/raw.rs index eee55efab..a3213d9bf 100644 --- a/crates/client/src/raw.rs +++ b/crates/client/src/raw.rs @@ -659,8 +659,7 @@ proxier! { } else { temporalio_common::protos::temporal::api::enums::v1::WorkerVersioningMode::Unversioned.into() }, - }); - } + }); } slot = Some(reservation.slot); } None => req_mut.request_eager_execution = false diff --git a/crates/client/src/worker_registry/mod.rs b/crates/client/src/worker_registry/mod.rs index 3c81bae14..d77ac81ec 100644 --- a/crates/client/src/worker_registry/mod.rs +++ b/crates/client/src/worker_registry/mod.rs @@ -4,6 +4,7 @@ use anyhow::bail; use parking_lot::RwLock; +use rand::seq::SliceRandom; use std::{ collections::{ HashMap, @@ -54,8 +55,8 @@ impl SlotKey { /// This is an inner class for [ClientWorkerSet] needed to hide the mutex. struct ClientWorkerSetImpl { - /// Maps slot keys to slot provider worker. - slot_providers: HashMap, + /// Maps slot keys to slot provider worker UUID and deployment build ID. + slot_providers: HashMap)>>, /// Maps worker_instance_key to registered workers all_workers: HashMap>, /// Maps namespace to shared worker for worker heartbeating @@ -78,19 +79,38 @@ impl ClientWorkerSetImpl { task_queue: String, ) -> Option { let key = SlotKey::new(namespace, task_queue); - if let Some(p) = self.slot_providers.get(&key) - && let Some(worker) = self.all_workers.get(p) - && let Some(slot) = worker.try_reserve_wft_slot() - { - let deployment_options = worker.deployment_options(); - return Some(SlotReservation { - slot, - deployment_options, - }); + if let Some(worker_list) = self.slot_providers.get(&key) { + for worker_id in Self::worker_ids_in_selection_order(&worker_list.clone()) { + if let Some(worker) = self.all_workers.get(&worker_id) + && let Some(slot) = worker.try_reserve_wft_slot() + { + let deployment_options = worker.deployment_options(); + return Some(SlotReservation { + slot, + deployment_options, + }); + } + } } None } + fn worker_ids_in_selection_order(worker_list: &[(Uuid, Option)]) -> Vec { + // For tests we return workers in the order they're registered, so we can test + // the retry mechanism deterministically + if cfg!(test) { + worker_list + .iter() + .map(|(worker_id, _)| *worker_id) + .collect() + } else { + let mut rng = rand::rng(); + let mut shuffled: Vec<_> = worker_list.to_vec(); + shuffled.shuffle(&mut rng); + shuffled.iter().map(|(worker_id, _)| *worker_id).collect() + } + } + fn register( &mut self, worker: Arc, @@ -100,9 +120,18 @@ impl ClientWorkerSetImpl { worker.namespace().to_string(), worker.task_queue().to_string(), ); - if self.slot_providers.contains_key(&slot_key) && !skip_client_worker_set_check { + let build_id = worker + .deployment_options() + .map(|opts| opts.version.build_id); + if self.slot_providers.contains_key(&slot_key) + && self + .slot_providers + .get(&slot_key) + .is_some_and(|vec| vec.iter().any(|(_, opt)| opt.as_ref() == build_id.as_ref())) + && !skip_client_worker_set_check + { bail!( - "Registration of multiple workers on the same namespace and task queue for the same client not allowed: {slot_key:?}, worker_instance_key: {:?}.", + "Registration of multiple workers on the same namespace, task queue, and deployment build ID for the same client not allowed: {slot_key:?}, worker_instance_key: {:?}.", worker.worker_instance_key() ); } @@ -123,8 +152,12 @@ impl ClientWorkerSetImpl { shared_worker.register_callback(worker_instance_key, heartbeat_callback); } - self.slot_providers - .insert(slot_key.clone(), worker.worker_instance_key()); + match self.slot_providers.entry(slot_key.clone()) { + Occupied(o) => o.into_mut().push((worker.worker_instance_key(), build_id)), + Vacant(v) => { + v.insert(vec![(worker.worker_instance_key(), build_id)]); + } + }; self.all_workers .insert(worker.worker_instance_key(), worker); @@ -148,7 +181,12 @@ impl ClientWorkerSetImpl { worker.task_queue().to_string(), ); - self.slot_providers.remove(&slot_key); + if let Some(slot_vec) = self.slot_providers.get_mut(&slot_key) { + slot_vec.retain(|(worker_id, _)| *worker_id != worker_instance_key); + if slot_vec.is_empty() { + self.slot_providers.remove(&slot_key); + } + } if let Some(w) = self.shared_worker.get_mut(worker.namespace()) { let (callback, is_empty) = w.unregister_callback(worker.worker_instance_key()); @@ -162,7 +200,7 @@ impl ClientWorkerSetImpl { #[cfg(test)] fn num_providers(&self) -> usize { - self.slot_providers.len() + self.slot_providers.values().map(|v| v.len()).sum() } #[cfg(test)] @@ -356,6 +394,159 @@ mod tests { mock_provider } + #[test] + fn reserve_wft_slot_retries_another_worker_when_first_has_no_slot() { + let mut manager = ClientWorkerSetImpl::new(); + let namespace = "retry_namespace".to_string(); + let task_queue = "retry_queue".to_string(); + + let failing_worker_id = Uuid::new_v4(); + let mut failing_worker = MockClientWorker::new(); + failing_worker + .expect_try_reserve_wft_slot() + .times(1) + .returning(|| None); + failing_worker + .expect_namespace() + .return_const(namespace.clone()); + failing_worker + .expect_task_queue() + .return_const(task_queue.clone()); + failing_worker + .expect_deployment_options() + .return_const(WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { + deployment_name: "test-deployment".to_string(), + build_id: "build-fail".to_string(), + }, + use_worker_versioning: true, + default_versioning_behavior: None, + }); + failing_worker + .expect_worker_instance_key() + .return_const(failing_worker_id); + failing_worker + .expect_heartbeat_enabled() + .return_const(false); + + let succeeding_worker_id = Uuid::new_v4(); + let mut succeeding_worker = MockClientWorker::new(); + succeeding_worker + .expect_try_reserve_wft_slot() + .times(1) + .returning(|| Some(new_mock_slot(false))); + succeeding_worker + .expect_namespace() + .return_const(namespace.clone()); + succeeding_worker + .expect_task_queue() + .return_const(task_queue.clone()); + let success_deployment_options = WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { + deployment_name: "test-deployment".to_string(), + build_id: "build-success".to_string(), + }, + use_worker_versioning: true, + default_versioning_behavior: None, + }; + succeeding_worker + .expect_deployment_options() + .return_const(success_deployment_options.clone()); + succeeding_worker + .expect_worker_instance_key() + .return_const(succeeding_worker_id); + succeeding_worker + .expect_heartbeat_enabled() + .return_const(false); + + manager + .register(Arc::new(failing_worker), false) + .expect("failing worker registration succeeds"); + manager + .register(Arc::new(succeeding_worker), false) + .expect("succeeding worker registration succeeds"); + + let reservation = manager.try_reserve_wft_slot(namespace.clone(), task_queue.clone()); + + let reservation_deployment_options = reservation + .expect("succeeding worker was used after failing worker failed") + .deployment_options + .unwrap(); + assert_eq!( + reservation_deployment_options, success_deployment_options, + "deployment options bubble through from succeeding worker" + ); + } + + #[test] + fn reserve_wft_slot_retries_respects_slot_boundary() { + let mut manager = ClientWorkerSetImpl::new(); + let namespace = "retry_namespace".to_string(); + let task_queue = "retry_queue".to_string(); + + let failing_worker_id = Uuid::new_v4(); + let mut failing_worker = MockClientWorker::new(); + failing_worker + .expect_try_reserve_wft_slot() + .times(1) + .returning(|| None); + failing_worker + .expect_namespace() + .return_const(namespace.clone()); + failing_worker + .expect_task_queue() + .return_const(task_queue.clone()); + failing_worker + .expect_deployment_options() + .return_const(WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { + deployment_name: "test-deployment".to_string(), + build_id: "build-fail".to_string(), + }, + use_worker_versioning: true, + default_versioning_behavior: None, + }); + failing_worker + .expect_worker_instance_key() + .return_const(failing_worker_id); + failing_worker + .expect_heartbeat_enabled() + .return_const(false); + + // On a separate task queue + let succeeding_worker_id = Uuid::new_v4(); + let mut succeeding_worker = MockClientWorker::new(); + succeeding_worker.expect_try_reserve_wft_slot().times(0); + succeeding_worker + .expect_namespace() + .return_const(namespace.clone()); + succeeding_worker + .expect_task_queue() + .return_const("other_task_queue".to_string()); + succeeding_worker + .expect_deployment_options() + .return_const(None); + succeeding_worker + .expect_worker_instance_key() + .return_const(succeeding_worker_id); + succeeding_worker + .expect_heartbeat_enabled() + .return_const(false); + + manager + .register(Arc::new(failing_worker), false) + .expect("failing worker registration succeeds"); + manager + .register(Arc::new(succeeding_worker), false) + .expect("succeeding worker registration succeeds"); + + let reservation = manager.try_reserve_wft_slot(namespace.clone(), task_queue.clone()); + assert!( + reservation.is_none(), + "succeeding_worker should not be picked due to it being on a separate task queue" + ); + } + #[test] fn registry_keeps_one_provider_per_namespace() { let manager = ClientWorkerSet::new(); @@ -375,7 +566,7 @@ mod tests { } else { // Should get error for duplicate namespace+task_queue combinations assert!(result.unwrap_err().to_string().contains( - "Registration of multiple workers on the same namespace and task queue" + "Registration of multiple workers on the same namespace, task queue, and deployment build ID for the same client not allowed" )); } } @@ -451,7 +642,7 @@ mod tests { namespace: String, task_queue: String, heartbeat_enabled: bool, - worker_instance_key: Uuid, + build_id: Option, ) -> MockClientWorker { let mut mock_provider = MockClientWorker::new(); mock_provider @@ -466,8 +657,23 @@ mod tests { .return_const(heartbeat_enabled); mock_provider .expect_worker_instance_key() - .return_const(worker_instance_key); - mock_provider.expect_deployment_options().return_const(None); + .return_const(Uuid::new_v4()); + let deployment_name = "test-deployment".to_string(); + let build_id_for_closure = build_id.clone(); + mock_provider + .expect_deployment_options() + .returning(move || { + build_id_for_closure + .as_ref() + .map(|build_id| WorkerDeploymentOptions { + version: temporalio_common::worker::WorkerDeploymentVersion { + deployment_name: deployment_name.clone(), + build_id: build_id.clone(), + }, + use_worker_versioning: true, + default_versioning_behavior: None, + }) + }); if heartbeat_enabled { mock_provider @@ -495,7 +701,7 @@ mod tests { "test_namespace".to_string(), "test_queue".to_string(), true, - Uuid::new_v4(), + None, ); // Same namespace+task_queue but different worker instance @@ -503,7 +709,7 @@ mod tests { "test_namespace".to_string(), "test_queue".to_string(), true, - Uuid::new_v4(), + None, ); manager.register_worker(Arc::new(worker1), false).unwrap(); @@ -515,7 +721,7 @@ mod tests { result .unwrap_err() .to_string() - .contains("Registration of multiple workers on the same namespace and task queue") + .contains("Registration of multiple workers on the same namespace, task queue, and deployment build ID for the same client not allowed") ); assert_eq!(1, manager.num_providers()); @@ -526,6 +732,81 @@ mod tests { assert!(impl_ref.shared_worker.contains_key("test_namespace")); } + #[test] + fn duplicate_namespace_with_different_build_ids_succeeds() { + let manager = ClientWorkerSet::new(); + let namespace = "test_namespace".to_string(); + let task_queue = "test_queue".to_string(); + + let worker1 = + new_mock_provider_with_heartbeat(namespace.clone(), task_queue.clone(), false, None); + let worker1_instance_key = worker1.worker_instance_key(); + let worker2 = new_mock_provider_with_heartbeat( + namespace.clone(), + task_queue.clone(), + false, + Some("build-1".to_string()), + ); + let worker2_instance_key = worker2.worker_instance_key(); + let worker3 = + new_mock_provider_with_heartbeat(namespace.clone(), task_queue.clone(), false, None); + let worker4 = new_mock_provider_with_heartbeat( + namespace.clone(), + task_queue.clone(), + false, + Some("build-1".to_string()), + ); + + manager.register_worker(Arc::new(worker1), false).unwrap(); + + manager + .register_worker(Arc::new(worker2), false) + .expect("worker with new build ID should register"); + assert_eq!(2, manager.num_providers()); + + assert!(manager + .register_worker(Arc::new(worker3), false).unwrap_err().to_string() + .contains("Registration of multiple workers on the same namespace, task queue, and deployment build ID for the same client not allowed")); + + assert!(manager + .register_worker(Arc::new(worker4), false).unwrap_err().to_string() + .contains("Registration of multiple workers on the same namespace, task queue, and deployment build ID for the same client not allowed")); + assert_eq!(2, manager.num_providers()); + + { + let impl_ref = manager.worker_manager.read(); + let slot_key = SlotKey::new(namespace.clone(), task_queue.clone()); + let providers = impl_ref + .slot_providers + .get(&slot_key) + .expect("slot providers should exist for namespace/task queue"); + assert_eq!(2, providers.len()); + + assert_eq!( + providers, + &vec![ + (worker1_instance_key, None), + (worker2_instance_key, Some("build-1".to_string())), + ] + ); + } + + manager.unregister_worker(worker2_instance_key).unwrap(); + + { + let impl_ref = manager.worker_manager.read(); + let slot_key = SlotKey::new(namespace.clone(), task_queue.clone()); + let providers = impl_ref + .slot_providers + .get(&slot_key) + .expect("slot providers should exist for namespace/task queue"); + + assert_eq!(1, providers.len()); + + assert_eq!(providers, &vec![(worker1_instance_key, None),]); + } + } + #[test] fn multiple_workers_same_namespace_share_heartbeat_manager() { let manager = ClientWorkerSet::new(); @@ -534,7 +815,7 @@ mod tests { "shared_namespace".to_string(), "queue1".to_string(), true, - Uuid::new_v4(), + None, ); // Same namespace but different task queue @@ -542,7 +823,7 @@ mod tests { "shared_namespace".to_string(), "queue2".to_string(), true, - Uuid::new_v4(), + None, ); manager.register_worker(Arc::new(worker1), false).unwrap(); @@ -566,13 +847,13 @@ mod tests { "namespace1".to_string(), "queue1".to_string(), true, - Uuid::new_v4(), + None, ); let worker2 = new_mock_provider_with_heartbeat( "namespace2".to_string(), "queue1".to_string(), true, - Uuid::new_v4(), + None, ); manager.register_worker(Arc::new(worker1), false).unwrap(); @@ -596,13 +877,13 @@ mod tests { "test_namespace".to_string(), "queue1".to_string(), true, - Uuid::new_v4(), + None, ); let worker2 = new_mock_provider_with_heartbeat( "test_namespace".to_string(), "queue2".to_string(), true, - Uuid::new_v4(), + None, ); let worker_instance_key1 = worker1.worker_instance_key(); let worker_instance_key2 = worker2.worker_instance_key();