From 7ba5caf75a1b46ea66f7106c2dc1beb4f7b131b1 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 22 Jan 2025 14:21:47 -0800 Subject: [PATCH 1/2] Write sdk name & version to WFT completes --- .gitignore | 1 + core/src/core_tests/workers.rs | 2 + core/src/internal_flags.rs | 99 ++++++++++++++++--- core/src/worker/client.rs | 8 ++ core/src/worker/client/mocks.rs | 5 + core/src/worker/mod.rs | 28 ++---- .../machines/activity_state_machine.rs | 4 +- .../machines/child_workflow_state_machine.rs | 4 +- .../workflow/machines/workflow_machines.rs | 6 +- core/src/worker/workflow/mod.rs | 4 + core/src/worker/workflow/run_cache.rs | 5 + core/src/worker/workflow/workflow_stream.rs | 1 + 12 files changed, 130 insertions(+), 37 deletions(-) diff --git a/.gitignore b/.gitignore index c81a5cf2e..5eb7db15f 100644 --- a/.gitignore +++ b/.gitignore @@ -16,3 +16,4 @@ src/protos/*.rs /bindings/ /core/machine_coverage/ /.cloud_certs/ +.aider* diff --git a/core/src/core_tests/workers.rs b/core/src/core_tests/workers.rs index 44bc5f419..9076b80f1 100644 --- a/core/src/core_tests/workers.rs +++ b/core/src/core_tests/workers.rs @@ -313,6 +313,8 @@ async fn worker_shutdown_api(#[case] use_cache: bool, #[case] api_success: bool) mock.expect_workers() .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); mock.expect_is_mock().returning(|| true); + mock.expect_sdk_name_and_version() + .returning(|| ("test-core".to_string(), "0.0.0".to_string())); if use_cache { if api_success { mock.expect_shutdown_worker() diff --git a/core/src/internal_flags.rs b/core/src/internal_flags.rs index 7497ff0f4..47d90c029 100644 --- a/core/src/internal_flags.rs +++ b/core/src/internal_flags.rs @@ -47,25 +47,44 @@ pub(crate) enum InternalFlags { lang: BTreeSet, core_since_last_complete: HashSet, lang_since_last_complete: HashSet, + last_sdk_name: String, + last_sdk_version: String, + sdk_name: String, + sdk_version: String, }, Disabled, } impl InternalFlags { - pub(crate) fn new(server_capabilities: &get_system_info_response::Capabilities) -> Self { + pub(crate) fn new( + server_capabilities: &get_system_info_response::Capabilities, + sdk_name: String, + sdk_version: String, + ) -> Self { match server_capabilities.sdk_metadata { true => Self::Enabled { core: Default::default(), lang: Default::default(), core_since_last_complete: Default::default(), lang_since_last_complete: Default::default(), + last_sdk_name: "".to_string(), + last_sdk_version: "".to_string(), + sdk_name, + sdk_version, }, false => Self::Disabled, } } pub(crate) fn add_from_complete(&mut self, e: &WorkflowTaskCompletedEventAttributes) { - if let Self::Enabled { core, lang, .. } = self { + if let Self::Enabled { + core, + lang, + last_sdk_name, + last_sdk_version, + .. + } = self + { if let Some(metadata) = e.sdk_metadata.as_ref() { core.extend( metadata @@ -74,6 +93,12 @@ impl InternalFlags { .map(|u| CoreInternalFlags::from_u32(*u)), ); lang.extend(metadata.lang_used_flags.iter()); + if !metadata.sdk_name.is_empty() { + *last_sdk_name = metadata.sdk_name.clone(); + } + if !metadata.sdk_version.is_empty() { + *last_sdk_version = metadata.sdk_version.clone(); + } } } } @@ -133,6 +158,10 @@ impl InternalFlags { lang_since_last_complete, core, lang, + last_sdk_name, + last_sdk_version, + sdk_name, + sdk_version, } => { let core_newly_used: Vec<_> = core_since_last_complete .iter() @@ -146,11 +175,17 @@ impl InternalFlags { .collect(); core.extend(core_since_last_complete.iter()); lang.extend(lang_since_last_complete.iter()); + let (sdk_name, sdk_version) = + if last_sdk_name != sdk_name || last_sdk_version != sdk_version { + (sdk_name.clone(), sdk_version.clone()) + } else { + ("".to_owned(), "".to_owned()) + }; WorkflowTaskCompletedMetadata { core_used_flags: core_newly_used, lang_used_flags: lang_newly_used, - sdk_name: "".to_string(), - sdk_version: "".to_string(), + sdk_name, + sdk_version, } } Self::Disabled => WorkflowTaskCompletedMetadata::default(), @@ -186,9 +221,19 @@ mod tests { use super::*; use temporal_sdk_core_protos::temporal::api::workflowservice::v1::get_system_info_response::Capabilities; + impl Default for InternalFlags { + fn default() -> Self { + Self::Disabled + } + } + #[test] fn disabled_in_capabilities_disables() { - let mut f = InternalFlags::new(&Capabilities::default()); + let mut f = InternalFlags::new( + &Capabilities::default(), + "name".to_string(), + "ver".to_string(), + ); f.add_lang_used([1]); f.add_from_complete(&WorkflowTaskCompletedEventAttributes { sdk_metadata: Some(WorkflowTaskCompletedMetadata { @@ -214,23 +259,29 @@ mod tests { } #[test] - fn only_writes_new_flags() { - let mut f = InternalFlags::new(&Capabilities { - sdk_metadata: true, - ..Default::default() - }); + fn only_writes_new_flags_and_sdk_info() { + let mut f = InternalFlags::new( + &Capabilities { + sdk_metadata: true, + ..Default::default() + }, + "name".to_string(), + "ver".to_string(), + ); f.add_lang_used([1]); f.try_use(CoreInternalFlags::IdAndTypeDeterminismChecks, true); let gathered = f.gather_for_wft_complete(); assert_matches!(gathered.core_used_flags.as_slice(), &[1]); assert_matches!(gathered.lang_used_flags.as_slice(), &[1]); + assert_matches!(gathered.sdk_name.as_str(), "name"); + assert_matches!(gathered.sdk_version.as_str(), "ver"); f.add_from_complete(&WorkflowTaskCompletedEventAttributes { sdk_metadata: Some(WorkflowTaskCompletedMetadata { core_used_flags: vec![2], lang_used_flags: vec![2], - sdk_name: "".to_string(), - sdk_version: "".to_string(), + sdk_name: "name".to_string(), + sdk_version: "ver".to_string(), }), ..Default::default() }); @@ -239,5 +290,29 @@ mod tests { let gathered = f.gather_for_wft_complete(); assert_matches!(gathered.core_used_flags.as_slice(), &[]); assert_matches!(gathered.lang_used_flags.as_slice(), &[]); + assert!(gathered.sdk_name.is_empty()); + assert!(gathered.sdk_version.is_empty()); + + f.add_from_complete(&WorkflowTaskCompletedEventAttributes { + sdk_metadata: Some(WorkflowTaskCompletedMetadata::default()), + ..Default::default() + }); + let gathered = f.gather_for_wft_complete(); + assert_matches!(gathered.core_used_flags.as_slice(), &[]); + assert_matches!(gathered.lang_used_flags.as_slice(), &[]); + assert!(gathered.sdk_name.is_empty()); + assert!(gathered.sdk_version.is_empty()); + + f.add_from_complete(&WorkflowTaskCompletedEventAttributes { + sdk_metadata: Some(WorkflowTaskCompletedMetadata { + sdk_name: "other sdk".to_string(), + sdk_version: "ver".to_string(), + ..Default::default() + }), + ..Default::default() + }); + let gathered = f.gather_for_wft_complete(); + assert_matches!(gathered.sdk_name.as_str(), "name"); + assert_matches!(gathered.sdk_version.as_str(), "ver"); } } diff --git a/core/src/worker/client.rs b/core/src/worker/client.rs index eb3c21228..5a32931b2 100644 --- a/core/src/worker/client.rs +++ b/core/src/worker/client.rs @@ -164,6 +164,8 @@ pub(crate) trait WorkerClient: Sync + Send { fn capabilities(&self) -> Option; fn workers(&self) -> Arc; fn is_mock(&self) -> bool; + /// Return (sdk_name, sdk_version) from the underlying client configuration + fn sdk_name_and_version(&self) -> (String, String); } #[async_trait::async_trait] @@ -486,6 +488,12 @@ impl WorkerClient for WorkerClientBag { fn is_mock(&self) -> bool { false } + + fn sdk_name_and_version(&self) -> (String, String) { + let lock = self.replaceable_client.read(); + let opts = lock.get_client().inner().options(); + (opts.client_name.clone(), opts.client_version.clone()) + } } /// A version of [RespondWorkflowTaskCompletedRequest] that will finish being filled out by the diff --git a/core/src/worker/client/mocks.rs b/core/src/worker/client/mocks.rs index 41fcc5a62..312025872 100644 --- a/core/src/worker/client/mocks.rs +++ b/core/src/worker/client/mocks.rs @@ -31,6 +31,8 @@ pub(crate) fn mock_workflow_client() -> MockWorkerClient { r.expect_is_mock().returning(|| true); r.expect_shutdown_worker() .returning(|_| Ok(ShutdownWorkerResponse {})); + r.expect_sdk_name_and_version() + .returning(|| ("test-core".to_string(), "0.0.0".to_string())); r } @@ -42,6 +44,8 @@ pub(crate) fn mock_manual_workflow_client() -> MockManualWorkerClient { r.expect_workers() .returning(|| DEFAULT_WORKERS_REGISTRY.clone()); r.expect_is_mock().returning(|| true); + r.expect_sdk_name_and_version() + .returning(|| ("test-core".to_string(), "0.0.0".to_string())); r } @@ -146,5 +150,6 @@ mockall::mock! { fn capabilities(&self) -> Option; fn workers(&self) -> Arc; fn is_mock(&self) -> bool; + fn sdk_name_and_version(&self) -> (String, String); } } diff --git a/core/src/worker/mod.rs b/core/src/worker/mod.rs index fae51cc07..63ffd2a1a 100644 --- a/core/src/worker/mod.rs +++ b/core/src/worker/mod.rs @@ -68,7 +68,6 @@ use temporal_sdk_core_protos::{ temporal::api::{ enums::v1::TaskQueueKind, taskqueue::v1::{StickyExecutionAttributes, TaskQueue}, - workflowservice::v1::get_system_info_response, }, TaskToken, }; @@ -508,16 +507,19 @@ impl Worker { external_wft_tx, ); let worker_key = Mutex::new(client.workers().register(Box::new(provider))); + let sdk_name_and_ver = client.sdk_name_and_version(); Self { worker_key, client: client.clone(), workflows: Workflows::new( - build_wf_basics( - config.clone(), + WorkflowBasics { + worker_config: Arc::new(config.clone()), + shutdown_token: shutdown_token.child_token(), metrics, - shutdown_token.child_token(), - client.capabilities().unwrap_or_default(), - ), + server_capabilities: client.capabilities().unwrap_or_default(), + sdk_name: sdk_name_and_ver.0, + sdk_version: sdk_name_and_ver.1, + }, sticky_queue_name.map(|sq| StickyExecutionAttributes { worker_task_queue: Some(TaskQueue { name: sq, @@ -864,20 +866,6 @@ pub(crate) struct PostActivateHookData<'a> { pub(crate) replaying: bool, } -fn build_wf_basics( - config: WorkerConfig, - metrics: MetricsContext, - shutdown_token: CancellationToken, - server_capabilities: get_system_info_response::Capabilities, -) -> WorkflowBasics { - WorkflowBasics { - worker_config: Arc::new(config), - shutdown_token, - metrics, - server_capabilities, - } -} - pub(crate) enum TaskPollers { Real, #[cfg(test)] diff --git a/core/src/worker/workflow/machines/activity_state_machine.rs b/core/src/worker/workflow/machines/activity_state_machine.rs index a7925e961..cfde59020 100644 --- a/core/src/worker/workflow/machines/activity_state_machine.rs +++ b/core/src/worker/workflow/machines/activity_state_machine.rs @@ -876,7 +876,7 @@ mod test { attrs: Default::default(), cancellation_type: Default::default(), cancelled_before_sent: false, - internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))), + internal_flags: Rc::new(RefCell::new(InternalFlags::default())), }, ); let cmds = s.cancel().unwrap(); @@ -894,7 +894,7 @@ mod test { cancellation_type: ActivityCancellationType::Abandon.into(), ..Default::default() }, - Rc::new(RefCell::new(InternalFlags::new(&Default::default()))), + Rc::new(RefCell::new(InternalFlags::default())), true, ); let mut s = if let Machines::ActivityMachine(am) = s.machine { diff --git a/core/src/worker/workflow/machines/child_workflow_state_machine.rs b/core/src/worker/workflow/machines/child_workflow_state_machine.rs index e3505c9f3..f6c9d0547 100644 --- a/core/src/worker/workflow/machines/child_workflow_state_machine.rs +++ b/core/src/worker/workflow/machines/child_workflow_state_machine.rs @@ -985,7 +985,7 @@ mod test { workflow_type: "".to_string(), cancelled_before_sent: false, cancel_type: Default::default(), - internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))), + internal_flags: Rc::new(RefCell::new(InternalFlags::default())), }, ); let cmds = s.cancel().unwrap(); @@ -1006,7 +1006,7 @@ mod test { workflow_type: "".to_string(), cancelled_before_sent: false, cancel_type: ChildWorkflowCancellationType::Abandon, - internal_flags: Rc::new(RefCell::new(InternalFlags::new(&Default::default()))), + internal_flags: Rc::new(RefCell::new(InternalFlags::default())), }; let state = Cancelled { seen_cancelled_event: true, diff --git a/core/src/worker/workflow/machines/workflow_machines.rs b/core/src/worker/workflow/machines/workflow_machines.rs index 9545391ae..0ae8d8120 100644 --- a/core/src/worker/workflow/machines/workflow_machines.rs +++ b/core/src/worker/workflow/machines/workflow_machines.rs @@ -240,7 +240,11 @@ where impl WorkflowMachines { pub(crate) fn new(basics: RunBasics, driven_wf: DrivenWorkflow) -> Self { let replaying = basics.history.previous_wft_started_id > 0; - let mut observed_internal_flags = InternalFlags::new(basics.capabilities); + let mut observed_internal_flags = InternalFlags::new( + basics.capabilities, + basics.sdk_name.to_owned(), + basics.sdk_version.to_owned(), + ); // Peek ahead to determine used flags in the first WFT. if let Some(attrs) = basics.history.peek_next_wft_completed(0) { observed_internal_flags.add_from_complete(attrs); diff --git a/core/src/worker/workflow/mod.rs b/core/src/worker/workflow/mod.rs index 61d0df6a4..08470a8f4 100644 --- a/core/src/worker/workflow/mod.rs +++ b/core/src/worker/workflow/mod.rs @@ -131,6 +131,8 @@ pub(crate) struct WorkflowBasics { pub(crate) shutdown_token: CancellationToken, pub(crate) metrics: MetricsContext, pub(crate) server_capabilities: get_system_info_response::Capabilities, + pub(crate) sdk_name: String, + pub(crate) sdk_version: String, } pub(crate) struct RunBasics<'a> { @@ -141,6 +143,8 @@ pub(crate) struct RunBasics<'a> { pub(crate) history: HistoryUpdate, pub(crate) metrics: MetricsContext, pub(crate) capabilities: &'a get_system_info_response::Capabilities, + pub(crate) sdk_name: &'a str, + pub(crate) sdk_version: &'a str, } impl Workflows { diff --git a/core/src/worker/workflow/run_cache.rs b/core/src/worker/workflow/run_cache.rs index 9990168f5..dc173ebb8 100644 --- a/core/src/worker/workflow/run_cache.rs +++ b/core/src/worker/workflow/run_cache.rs @@ -16,6 +16,7 @@ use temporal_sdk_core_protos::{ pub(super) struct RunCache { worker_config: Arc, + sdk_name_and_version: (String, String), server_capabilities: get_system_info_response::Capabilities, /// Run id -> Data runs: LruCache, @@ -27,6 +28,7 @@ pub(super) struct RunCache { impl RunCache { pub(super) fn new( worker_config: Arc, + sdk_name_and_version: (String, String), server_capabilities: get_system_info_response::Capabilities, local_activity_request_sink: impl LocalActivityRequestSink, metrics: MetricsContext, @@ -40,6 +42,7 @@ impl RunCache { }; Self { worker_config, + sdk_name_and_version, server_capabilities, runs: LruCache::new( NonZeroUsize::new(lru_size).expect("LRU size is guaranteed positive"), @@ -73,6 +76,8 @@ impl RunCache { history: HistoryUpdate::dummy(), metrics, capabilities: &self.server_capabilities, + sdk_name: &self.sdk_name_and_version.0, + sdk_version: &self.sdk_name_and_version.1, }, pwft, self.local_activity_request_sink.clone(), diff --git a/core/src/worker/workflow/workflow_stream.rs b/core/src/worker/workflow/workflow_stream.rs index 4ac9bf413..1e4e7e928 100644 --- a/core/src/worker/workflow/workflow_stream.rs +++ b/core/src/worker/workflow/workflow_stream.rs @@ -87,6 +87,7 @@ impl WFStream { buffered_polls_need_cache_slot: Default::default(), runs: RunCache::new( basics.worker_config.clone(), + (basics.sdk_name.clone(), basics.sdk_version.clone()), basics.server_capabilities, local_activity_request_sink, basics.metrics.clone(), From a9d6b7e3046704671d24fe51e123e320fa76caf6 Mon Sep 17 00:00:00 2001 From: Spencer Judge Date: Wed, 22 Jan 2025 14:36:38 -0800 Subject: [PATCH 2/2] Address review comment --- core/src/internal_flags.rs | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/core/src/internal_flags.rs b/core/src/internal_flags.rs index 47d90c029..d613ba849 100644 --- a/core/src/internal_flags.rs +++ b/core/src/internal_flags.rs @@ -175,12 +175,16 @@ impl InternalFlags { .collect(); core.extend(core_since_last_complete.iter()); lang.extend(lang_since_last_complete.iter()); - let (sdk_name, sdk_version) = - if last_sdk_name != sdk_name || last_sdk_version != sdk_version { - (sdk_name.clone(), sdk_version.clone()) - } else { - ("".to_owned(), "".to_owned()) - }; + let sdk_name = if last_sdk_name != sdk_name { + sdk_name.clone() + } else { + "".to_string() + }; + let sdk_version = if last_sdk_version != sdk_version { + sdk_version.clone() + } else { + "".to_string() + }; WorkflowTaskCompletedMetadata { core_used_flags: core_newly_used, lang_used_flags: lang_newly_used, @@ -306,7 +310,7 @@ mod tests { f.add_from_complete(&WorkflowTaskCompletedEventAttributes { sdk_metadata: Some(WorkflowTaskCompletedMetadata { sdk_name: "other sdk".to_string(), - sdk_version: "ver".to_string(), + sdk_version: "other ver".to_string(), ..Default::default() }), ..Default::default() @@ -314,5 +318,17 @@ mod tests { let gathered = f.gather_for_wft_complete(); assert_matches!(gathered.sdk_name.as_str(), "name"); assert_matches!(gathered.sdk_version.as_str(), "ver"); + + f.add_from_complete(&WorkflowTaskCompletedEventAttributes { + sdk_metadata: Some(WorkflowTaskCompletedMetadata { + sdk_name: "name".to_string(), + sdk_version: "ver2".to_string(), + ..Default::default() + }), + ..Default::default() + }); + let gathered = f.gather_for_wft_complete(); + assert!(gathered.sdk_name.is_empty()); + assert_matches!(gathered.sdk_version.as_str(), "ver"); } }