From 9ad0ff946730df77ce274ae350b47d60b3b30ccb Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 11 Aug 2025 05:36:53 +0000 Subject: [PATCH 1/3] feat: add early shutdown when no requests received for X seconds --- .../worker/supervisor/strategy_per_worker.rs | 23 +++++++++++++++++++ types/global.d.ts | 3 +++ 2 files changed, 26 insertions(+) diff --git a/crates/base/src/worker/supervisor/strategy_per_worker.rs b/crates/base/src/worker/supervisor/strategy_per_worker.rs index ca5364393..280c16d43 100644 --- a/crates/base/src/worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/worker/supervisor/strategy_per_worker.rs @@ -3,6 +3,7 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; +use std::time::SystemTime; use base_rt::RuntimeState; use deno_core::unsync::sync::AtomicFlag; @@ -37,6 +38,8 @@ use super::V8HandleTerminationData; #[derive(Debug, Default)] struct State { + req_absent_duration: Option, + is_worker_entered: bool, is_wall_clock_limit_disabled: bool, is_wall_clock_beforeunload_armed: bool, @@ -49,6 +52,7 @@ struct State { wall_clock_alerts: usize, req_ack_count: usize, + last_req_ack: Option, req_demand: Arc, runtime: Arc, @@ -80,6 +84,7 @@ impl State { fn req_acknowledged(&mut self) { self.req_ack_count += 1; + self.last_req_ack = Some(SystemTime::now()); self.update_runtime_state(); } @@ -92,6 +97,14 @@ impl State { || self.is_cpu_time_soft_limit_reached || self.is_mem_half_reached || self.wall_clock_alerts == 2 + || matches!( + self + .last_req_ack + .as_ref() + .zip(self.req_absent_duration) + .and_then(|(t, d)| t.checked_add(d)), + Some(t) if t < SystemTime::now() + ) } fn have_all_reqs_been_acknowledged(&self) -> bool { @@ -143,6 +156,16 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { let mut complete_reason = None::; let mut state = State { + req_absent_duration: runtime_opts + .context + .as_ref() + .and_then(|it| it.get("supervisor")) + .and_then(|it| { + it.get("requestAbsentTimeoutMs") + .and_then(|it| it.as_u64()) + .map(|it| Duration::from_millis(it)) + }), + is_wall_clock_limit_disabled: worker_timeout_ms == 0, is_cpu_time_limit_disabled: cpu_time_soft_limit_ms == 0 && cpu_time_hard_limit_ms == 0, diff --git a/types/global.d.ts b/types/global.d.ts index cc549960c..853d139d4 100644 --- a/types/global.d.ts +++ b/types/global.d.ts @@ -49,6 +49,9 @@ interface UserWorkerCreateContext { shouldBootstrapMockFnThrowError?: boolean | null; suppressEszipMigrationWarning?: boolean | null; useReadSyncFileAPI?: boolean | null; + supervisor?: { + requestAbsentTimeoutMs?: number | null; + }; } interface UserWorkerCreateOptions { From 97ad1bf86e5dbe03d6ae3d3e306ab5cf9e6b7a75 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 11 Aug 2025 22:39:26 +0000 Subject: [PATCH 2/3] chore: add an integration test --- crates/base/test_cases/main/index.ts | 11 ++++-- crates/base/tests/integration_tests.rs | 48 +++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 3 deletions(-) diff --git a/crates/base/test_cases/main/index.ts b/crates/base/test_cases/main/index.ts index e02931849..b8b85cf34 100644 --- a/crates/base/test_cases/main/index.ts +++ b/crates/base/test_cases/main/index.ts @@ -1,6 +1,6 @@ console.log("main function started"); -function parseIntFromHeadersOrDefault(req: Request, key: string, val: number) { +function parseIntFromHeadersOrDefault(req: Request, key: string, val?: number) { const headerValue = req.headers.get(key); if (!headerValue) { return val; @@ -62,7 +62,14 @@ Deno.serve((req: Request) => { const envVars = Object.keys(envVarsObj).map((k) => [k, envVarsObj[k]]); const context = { sourceMap: req.headers.get("x-context-source-map") == "true", - useReadSyncFileAPI: req.headers.get("x-use-read-sync-file-api") == "true", + useReadSyncFileAPI: + req.headers.get("x-context-use-read-sync-file-api") == "true", + supervisor: { + requestAbsentTimeoutMs: parseIntFromHeadersOrDefault( + req, + "x-context-supervisor-request-absent-timeout-ms", + ), + }, }; return await EdgeRuntime.userWorkers.create({ diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index 3891db66b..92fdfe0e5 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -2522,7 +2522,10 @@ async fn test_issue_func_205() { b.uri("/issue-func-205") .header("x-cpu-time-soft-limit-ms", HeaderValue::from_static("500")) .header("x-cpu-time-hard-limit-ms", HeaderValue::from_static("1000")) - .header("x-use-read-sync-file-api", HeaderValue::from_static("true")) + .header( + "x-context-use-read-sync-file-api", + HeaderValue::from_static("true"), + ) .body(Body::empty()) .context("can't make request") }) @@ -3865,6 +3868,49 @@ async fn test_eszip_wasm_import() { ); } +#[tokio::test] +#[serial] +async fn test_request_absent_timeout() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let tb = TestBedBuilder::new("./test_cases/main") + .with_per_worker_policy(None) + .with_worker_event_sender(Some(tx)) + .build() + .await; + + let resp = tb + .request(|b| { + b.uri("/sleep-5000ms") + .header("x-worker-timeout-ms", HeaderValue::from_static("3600000")) + .header( + "x-context-supervisor-request-absent-timeout-ms", + HeaderValue::from_static("1000"), + ) + .body(Body::empty()) + .context("can't make request") + }) + .await + .unwrap(); + + assert_eq!(resp.status().as_u16(), StatusCode::OK); + + sleep(Duration::from_secs(3)).await; + rx.close(); + tb.exit(Duration::from_secs(TESTBED_DEADLINE_SEC)).await; + + while let Some(ev) = rx.recv().await { + let WorkerEvents::Shutdown(ev) = ev.event else { + continue; + }; + if ev.reason != ShutdownReason::EarlyDrop { + break; + } + return; + } + + unreachable!("test failed"); +} + #[derive(Deserialize)] struct ErrorResponsePayload { msg: String, From 220284593c4c26c56410fa66500d9a26ea225c68 Mon Sep 17 00:00:00 2001 From: Nyannyacha Date: Mon, 11 Aug 2025 23:19:45 +0000 Subject: [PATCH 3/3] stamp: clippy --- crates/base/src/worker/supervisor/strategy_per_worker.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/base/src/worker/supervisor/strategy_per_worker.rs b/crates/base/src/worker/supervisor/strategy_per_worker.rs index 280c16d43..29cccb0b3 100644 --- a/crates/base/src/worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/worker/supervisor/strategy_per_worker.rs @@ -163,7 +163,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { .and_then(|it| { it.get("requestAbsentTimeoutMs") .and_then(|it| it.as_u64()) - .map(|it| Duration::from_millis(it)) + .map(Duration::from_millis) }), is_wall_clock_limit_disabled: worker_timeout_ms == 0,