diff --git a/crates/sdk-core/src/lib.rs b/crates/sdk-core/src/lib.rs index 663af8fd8..d46d5f311 100644 --- a/crates/sdk-core/src/lib.rs +++ b/crates/sdk-core/src/lib.rs @@ -241,19 +241,35 @@ pub struct CoreRuntime { } /// Holds telemetry options, as well as worker heartbeat_interval. Construct with [RuntimeOptionsBuilder] -#[derive(derive_builder::Builder)] +#[derive(Default, derive_builder::Builder)] +#[builder(build_fn(validate = "Self::validate"))] #[non_exhaustive] -#[derive(Default)] pub struct RuntimeOptions { /// Telemetry configuration options. #[builder(default)] telemetry_options: TelemetryOptions, /// Optional worker heartbeat interval - This configures the heartbeat setting of all /// workers created using this runtime. + /// + /// Interval must be between 1s and 60s, inclusive. #[builder(default = "Some(Duration::from_secs(60))")] heartbeat_interval: Option, } +impl RuntimeOptionsBuilder { + fn validate(&self) -> Result<(), String> { + if let Some(Some(interval)) = self.heartbeat_interval + && (interval < Duration::from_secs(1) || interval > Duration::from_secs(60)) + { + return Err(format!( + "heartbeat_interval ({interval:?}) must be between 1s and 60s", + )); + } + + Ok(()) + } +} + /// Wraps a [tokio::runtime::Builder] to allow layering multiple on_thread_start functions pub struct TokioRuntimeBuilder { /// The underlying tokio runtime builder diff --git a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs index 941490257..3f168816e 100644 --- a/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs +++ b/crates/sdk-core/tests/integ_tests/worker_heartbeat_tests.rs @@ -56,7 +56,7 @@ fn within_duration(dur: PbDuration, threshold: Duration) -> bool { fn new_no_metrics_starter(wf_name: &str) -> CoreWfStarter { let runtimeopts = RuntimeOptionsBuilder::default() .telemetry_options(TelemetryOptionsBuilder::default().build().unwrap()) - .heartbeat_interval(Some(Duration::from_millis(100))) + .heartbeat_interval(Some(Duration::from_secs(1))) .build() .unwrap(); CoreWfStarter::new_with_runtime(wf_name, CoreRuntime::new_assume_tokio(runtimeopts).unwrap()) @@ -105,7 +105,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b }; let runtimeopts = RuntimeOptionsBuilder::default() .telemetry_options(telemopts) - .heartbeat_interval(Some(Duration::from_millis(100))) + .heartbeat_interval(Some(Duration::from_secs(1))) .build() .unwrap(); let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); @@ -153,7 +153,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b ctx.activity(ActivityOptions { activity_type: "pass_fail_act".to_string(), input: "pass".as_json_payload().expect("serializes fine"), - start_to_close_timeout: Some(Duration::from_secs(1)), + start_to_close_timeout: Some(Duration::from_secs(5)), ..Default::default() }) .await; @@ -184,7 +184,7 @@ async fn docker_worker_heartbeat_basic(#[values("otel", "prom", "no_metrics")] b let test_fut = async { // Give enough time to ensure heartbeat interval has been hit - tokio::time::sleep(Duration::from_millis(110)).await; + tokio::time::sleep(Duration::from_millis(1500)).await; acts_started.notified().await; let client = starter.get_client().await; let mut raw_client = (*client).clone(); @@ -267,7 +267,7 @@ async fn docker_worker_heartbeat_tuner() { } let runtimeopts = RuntimeOptionsBuilder::default() .telemetry_options(get_integ_telem_options()) - .heartbeat_interval(Some(Duration::from_millis(100))) + .heartbeat_interval(Some(Duration::from_secs(1))) .build() .unwrap(); let mut rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap(); @@ -454,8 +454,8 @@ fn after_shutdown_checks( assert!(!host_info.host_name.is_empty()); assert!(!host_info.process_key.is_empty()); assert!(!host_info.process_id.is_empty()); - assert_ne!(host_info.current_host_cpu_usage, 0.0); - assert_ne!(host_info.current_host_mem_usage, 0.0); + assert!(host_info.current_host_cpu_usage >= 0.0); + assert!(host_info.current_host_mem_usage >= 0.0); assert!(heartbeat.task_queue.starts_with(wf_name)); assert_eq!( @@ -479,7 +479,7 @@ fn after_shutdown_checks( ); assert!(within_duration( heartbeat.elapsed_since_last_heartbeat.unwrap(), - Duration::from_millis(200) + Duration::from_millis(2000) )); let workflow_task_slots = heartbeat.workflow_task_slots_info.clone().unwrap(); @@ -647,7 +647,7 @@ async fn worker_heartbeat_sticky_cache_miss() { tokio::join!(orchestrator, runner); - sleep(Duration::from_millis(200)).await; + sleep(Duration::from_secs(2)).await; let mut heartbeats = list_worker_heartbeats(&client, format!("WorkerInstanceKey=\"{worker_key}\"")).await; assert_eq!(heartbeats.len(), 1); @@ -694,7 +694,7 @@ async fn worker_heartbeat_multiple_workers() { let _ = starter_b.start_with_worker(wf_name, &mut worker_b).await; worker_b.run_until_done().await.unwrap(); - sleep(Duration::from_millis(200)).await; + sleep(Duration::from_secs(2)).await; let all = list_worker_heartbeats(&client, String::new()).await; let keys: HashSet<_> = all @@ -776,7 +776,7 @@ async fn worker_heartbeat_failure_metrics() { .activity(ActivityOptions { activity_type: "failing_act".to_string(), input: "boom".as_json_payload().expect("serialize"), - start_to_close_timeout: Some(Duration::from_secs(1)), + start_to_close_timeout: Some(Duration::from_secs(5)), retry_policy: Some(RetryPolicy { initial_interval: Some(prost_dur!(from_millis(10))), backoff_coefficient: 1.0, @@ -859,7 +859,7 @@ async fn worker_heartbeat_failure_metrics() { } Err("activity_slots.last_interval_failure_tasks still 0, retrying") }, - Duration::from_millis(150), + Duration::from_millis(1500), ) .await .unwrap(); @@ -901,7 +901,7 @@ async fn worker_heartbeat_failure_metrics() { } Err("workflow_slots.last_interval_failure_tasks still 0, retrying") }, - Duration::from_millis(150), + Duration::from_millis(1500), ) .await .unwrap(); @@ -1000,7 +1000,7 @@ async fn worker_heartbeat_skip_client_worker_set_check() { let wf_name = "worker_heartbeat_skip_client_worker_set_check"; let runtimeopts = RuntimeOptionsBuilder::default() .telemetry_options(get_integ_telem_options()) - .heartbeat_interval(Some(Duration::from_millis(100))) + .heartbeat_interval(Some(Duration::from_secs(1))) .build() .unwrap(); let rt = CoreRuntime::new_assume_tokio(runtimeopts).unwrap();