Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions crates/sdk-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,19 +241,35 @@ pub struct CoreRuntime {
}

/// Holds telemetry options, as well as worker heartbeat_interval. Construct with [RuntimeOptionsBuilder]
#[derive(derive_builder::Builder)]
#[derive(Default, derive_builder::Builder)]
#[builder(build_fn(validate = "Self::validate"))]
#[non_exhaustive]
#[derive(Default)]
pub struct RuntimeOptions {
/// Telemetry configuration options.
#[builder(default)]
telemetry_options: TelemetryOptions,
/// Optional worker heartbeat interval - This configures the heartbeat setting of all
/// workers created using this runtime.
///
/// Interval must be between 1s and 60s, inclusive.
#[builder(default = "Some(Duration::from_secs(60))")]
heartbeat_interval: Option<Duration>,
}

impl RuntimeOptionsBuilder {
fn validate(&self) -> Result<(), String> {
if let Some(Some(interval)) = self.heartbeat_interval
&& (interval < Duration::from_secs(1) || interval > Duration::from_secs(60))
{
return Err(format!(
"heartbeat_interval ({interval:?}) must be between 1s and 60s",
));
}

Ok(())
}
}

/// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions
pub struct TokioRuntimeBuilder<F> {
/// The underlying tokio runtime builder
Expand Down
28 changes: 14 additions & 14 deletions crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ fn within_duration(dur: PbDuration, threshold: Duration) -> bool {
fn new_no_metrics_starter(wf_name: &str) -> CoreWfStarter {
let runtimeopts = RuntimeOptionsBuilder::default()
.telemetry_options(TelemetryOptionsBuilder::default().build().unwrap())
.heartbeat_interval(Some(Duration::from_millis(100)))
.heartbeat_interval(Some(Duration::from_secs(1)))
.build()
.unwrap();
CoreWfStarter::new_with_runtime(wf_name, CoreRuntime::new_assume_tokio(runtimeopts).unwrap())
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b
};
let runtimeopts = RuntimeOptionsBuilder::default()
.telemetry_options(telemopts)
.heartbeat_interval(Some(Duration::from_millis(100)))
.heartbeat_interval(Some(Duration::from_secs(1)))
.build()
.unwrap();
let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();
Expand Down Expand Up @@ -153,7 +153,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b
ctx.activity(ActivityOptions {
activity_type: "pass_fail_act".to_string(),
input: "pass".as_json_payload().expect("serializes fine"),
start_to_close_timeout: Some(Duration::from_secs(1)),
start_to_close_timeout: Some(Duration::from_secs(5)),
..Default::default()
})
.await;
Expand Down Expand Up @@ -184,7 +184,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b

let test_fut = async {
// Give enough time to ensure heartbeat interval has been hit
tokio::time::sleep(Duration::from_millis(110)).await;
tokio::time::sleep(Duration::from_millis(1500)).await;
acts_started.notified().await;
let client = starter.get_client().await;
let mut raw_client = (*client).clone();
Expand Down Expand Up @@ -267,7 +267,7 @@ async fn docker_worker_heartbeat_tuner() {
}
let runtimeopts = RuntimeOptionsBuilder::default()
.telemetry_options(get_integ_telem_options())
.heartbeat_interval(Some(Duration::from_millis(100)))
.heartbeat_interval(Some(Duration::from_secs(1)))
.build()
.unwrap();
let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();
Expand Down Expand Up @@ -454,8 +454,8 @@ fn after_shutdown_checks(
assert!(!host_info.host_name.is_empty());
assert!(!host_info.process_key.is_empty());
assert!(!host_info.process_id.is_empty());
assert_ne!(host_info.current_host_cpu_usage, 0.0);
assert_ne!(host_info.current_host_mem_usage, 0.0);
assert!(host_info.current_host_cpu_usage >= 0.0);
assert!(host_info.current_host_mem_usage >= 0.0);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Test assertions weakened for CPU and memory usage

The assertions for current_host_cpu_usage and current_host_mem_usage were changed from verifying non-zero values to only checking non-negative values. This weakens the test significantly because it no longer validates that host metrics are actually being collected and reported, only that they aren't negative, which provides minimal value and could allow bugs in metrics collection to go undetected.

Fix in Cursor Fix in Web


assert!(heartbeat.task_queue.starts_with(wf_name));
assert_eq!(
Expand All @@ -479,7 +479,7 @@ fn after_shutdown_checks(
);
assert!(within_duration(
heartbeat.elapsed_since_last_heartbeat.unwrap(),
Duration::from_millis(200)
Duration::from_millis(2000)
));

let workflow_task_slots = heartbeat.workflow_task_slots_info.clone().unwrap();
Expand Down Expand Up @@ -647,7 +647,7 @@ async fn worker_heartbeat_sticky_cache_miss() {

tokio::join!(orchestrator, runner);

sleep(Duration::from_millis(200)).await;
sleep(Duration::from_secs(2)).await;
let mut heartbeats =
list_worker_heartbeats(&client, format!("WorkerInstanceKey=\"{worker_key}\"")).await;
assert_eq!(heartbeats.len(), 1);
Expand Down Expand Up @@ -694,7 +694,7 @@ async fn worker_heartbeat_multiple_workers() {
let _ = starter_b.start_with_worker(wf_name, &mut worker_b).await;
worker_b.run_until_done().await.unwrap();

sleep(Duration::from_millis(200)).await;
sleep(Duration::from_secs(2)).await;

let all = list_worker_heartbeats(&client, String::new()).await;
let keys: HashSet<_> = all
Expand Down Expand Up @@ -776,7 +776,7 @@ async fn worker_heartbeat_failure_metrics() {
.activity(ActivityOptions {
activity_type: "failing_act".to_string(),
input: "boom".as_json_payload().expect("serialize"),
start_to_close_timeout: Some(Duration::from_secs(1)),
start_to_close_timeout: Some(Duration::from_secs(5)),
retry_policy: Some(RetryPolicy {
initial_interval: Some(prost_dur!(from_millis(10))),
backoff_coefficient: 1.0,
Expand Down Expand Up @@ -859,7 +859,7 @@ async fn worker_heartbeat_failure_metrics() {
}
Err("activity_slots.last_interval_failure_tasks still 0, retrying")
},
Duration::from_millis(150),
Duration::from_millis(1500),
)
.await
.unwrap();
Expand Down Expand Up @@ -901,7 +901,7 @@ async fn worker_heartbeat_failure_metrics() {
}
Err("workflow_slots.last_interval_failure_tasks still 0, retrying")
},
Duration::from_millis(150),
Duration::from_millis(1500),
)
.await
.unwrap();
Expand Down Expand Up @@ -1000,7 +1000,7 @@ async fn worker_heartbeat_skip_client_worker_set_check() {
let wf_name = "worker_heartbeat_skip_client_worker_set_check";
let runtimeopts = RuntimeOptionsBuilder::default()
.telemetry_options(get_integ_telem_options())
.heartbeat_interval(Some(Duration::from_millis(100)))
.heartbeat_interval(Some(Duration::from_secs(1)))
.build()
.unwrap();
let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();
Expand Down
Loading