Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions crates/sdk-core/src/worker/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ impl AllPermitsTracker {

#[derive(Clone)]
pub(crate) struct WorkerTelemetry {
metric_meter: Option<TemporalMeter>,
temporal_metric_meter: Option<TemporalMeter>,
trace_subscriber: Option<Arc<dyn Subscriber + Send + Sync>>,
}
Expand Down Expand Up @@ -313,7 +312,6 @@ impl Worker {
info!(task_queue=%config.task_queue, namespace=%config.namespace, "Initializing worker");

let worker_telemetry = telem_instance.map(|telem| WorkerTelemetry {
metric_meter: telem.get_metric_meter(),
temporal_metric_meter: telem.get_temporal_metric_meter(),
trace_subscriber: telem.trace_subscriber(),
});
Expand Down Expand Up @@ -382,7 +380,7 @@ impl Worker {
config.task_queue.clone(),
wt.temporal_metric_meter.clone(),
),
wt.metric_meter.clone(),
wt.temporal_metric_meter.clone(),
)
} else {
(MetricsContext::no_op(), None)
Expand Down
30 changes: 14 additions & 16 deletions crates/sdk-core/src/worker/tuner/resource_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,14 +233,10 @@ impl PidControllers {

impl MetricInstruments {
fn new(meter: TemporalMeter) -> Self {
let mem_usage = meter.inner.gauge_f64("resource_slots_mem_usage".into());
let cpu_usage = meter.inner.gauge_f64("resource_slots_cpu_usage".into());
let mem_pid_output = meter
.inner
.gauge_f64("resource_slots_mem_pid_output".into());
let cpu_pid_output = meter
.inner
.gauge_f64("resource_slots_cpu_pid_output".into());
let mem_usage = meter.gauge_f64("resource_slots_mem_usage".into());
let cpu_usage = meter.gauge_f64("resource_slots_cpu_usage".into());
let mem_pid_output = meter.gauge_f64("resource_slots_mem_pid_output".into());
let cpu_pid_output = meter.gauge_f64("resource_slots_cpu_pid_output".into());
let attribs = meter.inner.new_attributes(meter.default_attribs);
Self {
attribs,
Expand Down Expand Up @@ -732,15 +728,17 @@ impl CGroupCpuFileSystem for CgroupV2CpuFileSystem {
mod tests {
use super::*;
use crate::{abstractions::MeteredPermitDealer, telemetry::metrics::MetricsContext};
use std::cell::RefCell;
use std::env;
use std::hint::black_box;
use std::rc::Rc;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
use std::{
cell::RefCell,
env,
hint::black_box,
rc::Rc,
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
thread::sleep,
};
use std::thread::sleep;
use temporalio_common::worker::WorkflowSlotKind;

struct FakeMIS {
Expand Down
58 changes: 58 additions & 0 deletions crates/sdk-core/tests/integ_tests/metrics_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1415,3 +1415,61 @@ async fn sticky_queue_label_strategy(
_ => unreachable!("Test only covers UseNormal and UseNormalAndSticky"),
}
}

#[tokio::test]
async fn resource_based_tuner_metrics() {
use temporalio_sdk_core::ResourceBasedTuner;

let (telemopts, addr, _aborter) = prom_metrics(None);
let rt = CoreRuntime::new_assume_tokio(get_integ_runtime_options(telemopts)).unwrap();
let wf_name = "resource_based_tuner_metrics";
let mut starter = CoreWfStarter::new_with_runtime(wf_name, rt);
starter.worker_config.no_remote_activities(true);
starter.worker_config.clear_max_outstanding_opts();

// Create a resource-based tuner with reasonable thresholds
let tuner = ResourceBasedTuner::new(0.8, 0.8);
starter.worker_config.tuner(Arc::new(tuner));

let mut worker = starter.worker().await;

worker.register_wf(wf_name.to_string(), |ctx: WfContext| async move {
ctx.timer(Duration::from_millis(100)).await;
Ok(().into())
});

worker
.submit_wf(
wf_name.to_owned(),
wf_name.to_owned(),
vec![],
WorkflowOptions::default(),
)
.await
.unwrap();

worker.run_until_done().await.unwrap();

// Give metrics time to be recorded (metrics are emitted every 1 second)
tokio::time::sleep(Duration::from_millis(1500)).await;

let body = get_text(format!("http://{addr}/metrics")).await;

// Verify that the resource-based tuner metrics are present
assert!(
body.contains("temporal_resource_slots_mem_usage"),
"Memory usage metric should be present"
);
assert!(
body.contains("temporal_resource_slots_cpu_usage"),
"CPU usage metric should be present"
);
assert!(
body.contains("temporal_resource_slots_mem_pid_output"),
"Memory PID output metric should be present"
);
assert!(
body.contains("temporal_resource_slots_cpu_pid_output"),
"CPU PID output metric should be present"
);
}
Loading