diff --git a/crates/sdk-core/src/worker/mod.rs b/crates/sdk-core/src/worker/mod.rs index dbba320c7..4f9c67ce7 100644 --- a/crates/sdk-core/src/worker/mod.rs +++ b/crates/sdk-core/src/worker/mod.rs @@ -156,7 +156,6 @@ impl AllPermitsTracker { #[derive(Clone)] pub(crate) struct WorkerTelemetry { - metric_meter: Option, temporal_metric_meter: Option, trace_subscriber: Option>, } @@ -313,7 +312,6 @@ impl Worker { info!(task_queue=%config.task_queue, namespace=%config.namespace, "Initializing worker"); let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry { - metric_meter: telem.get_metric_meter(), temporal_metric_meter: telem.get_temporal_metric_meter(), trace_subscriber: telem.trace_subscriber(), }); @@ -382,7 +380,7 @@ impl Worker { config.task_queue.clone(), wt.temporal_metric_meter.clone(), ), - wt.metric_meter.clone(), + wt.temporal_metric_meter.clone(), ) } else { (MetricsContext::no_op(), None) diff --git a/crates/sdk-core/src/worker/tuner/resource_based.rs b/crates/sdk-core/src/worker/tuner/resource_based.rs index ed3abe60a..823b56854 100644 --- a/crates/sdk-core/src/worker/tuner/resource_based.rs +++ b/crates/sdk-core/src/worker/tuner/resource_based.rs @@ -233,14 +233,10 @@ impl PidControllers { impl MetricInstruments { fn new(meter: TemporalMeter) -> Self { - let mem_usage = meter.inner.gauge_f64("resource_slots_mem_usage".into()); - let cpu_usage = meter.inner.gauge_f64("resource_slots_cpu_usage".into()); - let mem_pid_output = meter - .inner - .gauge_f64("resource_slots_mem_pid_output".into()); - let cpu_pid_output = meter - .inner - .gauge_f64("resource_slots_cpu_pid_output".into()); + let mem_usage = meter.gauge_f64("resource_slots_mem_usage".into()); + let cpu_usage = meter.gauge_f64("resource_slots_cpu_usage".into()); + let mem_pid_output = meter.gauge_f64("resource_slots_mem_pid_output".into()); + let cpu_pid_output = meter.gauge_f64("resource_slots_cpu_pid_output".into()); let attribs = meter.inner.new_attributes(meter.default_attribs); Self { attribs, @@ -732,15 +728,17 @@ impl CGroupCpuFileSystem for CgroupV2CpuFileSystem { mod tests { use super::*; use crate::{abstractions::MeteredPermitDealer, telemetry::metrics::MetricsContext}; - use std::cell::RefCell; - use std::env; - use std::hint::black_box; - use std::rc::Rc; - use std::sync::{ - Arc, - atomic::{AtomicU64, Ordering}, + use std::{ + cell::RefCell, + env, + hint::black_box, + rc::Rc, + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + thread::sleep, }; - use std::thread::sleep; use temporalio_common::worker::WorkflowSlotKind; struct FakeMIS { diff --git a/crates/sdk-core/tests/integ_tests/metrics_tests.rs b/crates/sdk-core/tests/integ_tests/metrics_tests.rs index 4e1e8fe27..abdfdca7f 100644 --- a/crates/sdk-core/tests/integ_tests/metrics_tests.rs +++ b/crates/sdk-core/tests/integ_tests/metrics_tests.rs @@ -1415,3 +1415,61 @@ async fn sticky_queue_label_strategy( _ => unreachable!("Test only covers UseNormal and UseNormalAndSticky"), } } + +#[tokio::test] +async fn resource_based_tuner_metrics() { + use temporalio_sdk_core::ResourceBasedTuner; + + let (telemopts, addr, _aborter) = prom_metrics(None); + let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap(); + let wf_name = "resource_based_tuner_metrics"; + let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt); + starter.worker_config.no_remote_activities(true); + starter.worker_config.clear_max_outstanding_opts(); + + // Create a resource-based tuner with reasonable thresholds + let tuner = ResourceBasedTuner::new(0.8, 0.8); + starter.worker_config.tuner(Arc::new(tuner)); + + let mut worker = starter.worker().await; + + worker.register_wf(wf_name.to_string(), |ctx: WfContext| async move { + ctx.timer(Duration::from_millis(100)).await; + Ok(().into()) + }); + + worker + .submit_wf( + wf_name.to_owned(), + wf_name.to_owned(), + vec![], + WorkflowOptions::default(), + ) + .await + .unwrap(); + + worker.run_until_done().await.unwrap(); + + // Give metrics time to be recorded (metrics are emitted every 1 second) + tokio::time::sleep(Duration::from_millis(1500)).await; + + let body = get_text(format!("http://{addr}/metrics")).await; + + // Verify that the resource-based tuner metrics are present + assert!( + body.contains("temporal_resource_slots_mem_usage"), + "Memory usage metric should be present" + ); + assert!( + body.contains("temporal_resource_slots_cpu_usage"), + "CPU usage metric should be present" + ); + assert!( + body.contains("temporal_resource_slots_mem_pid_output"), + "Memory PID output metric should be present" + ); + assert!( + body.contains("temporal_resource_slots_cpu_pid_output"), + "CPU PID output metric should be present" + ); +}