diff --git a/Cargo.lock b/Cargo.lock index d21116d0f..c126c737c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1956,6 +1956,7 @@ dependencies = [ "deno_core", "log", "serde", + "strum", "tokio", "uuid", ] diff --git a/Cargo.toml b/Cargo.toml index 641f2482f..74e3f4228 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -78,6 +78,7 @@ glob = "0.3.1" httparse = "1.8" http = "0.2" faster-hex = "0.9.0" +strum = "0.25" # DEBUG #[patch.crates-io] diff --git a/crates/base/src/deno_runtime.rs b/crates/base/src/deno_runtime.rs index 8652169b9..4d1e22c15 100644 --- a/crates/base/src/deno_runtime.rs +++ b/crates/base/src/deno_runtime.rs @@ -32,6 +32,7 @@ use std::borrow::Cow; use std::collections::HashMap; use std::ffi::c_void; use std::fmt; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; use std::time::Duration; @@ -123,6 +124,7 @@ struct MemCheckState { limit: Option, waker: Arc, notify: Arc, + current_bytes: Arc, #[cfg(debug_assertions)] exceeded: Arc, @@ -156,6 +158,8 @@ impl MemCheckState { .saturating_add(used_heap_bytes) .saturating_add(external_bytes); + self.current_bytes.store(total_bytes, Ordering::Release); + if total_bytes >= limit { self.notify.notify_waiters(); @@ -816,19 +820,24 @@ impl DenoRuntime { self.maybe_inspector.clone() } + pub fn mem_check_captured_bytes(&self) -> Arc { + self.mem_check_state.current_bytes.clone() + } + pub fn add_memory_limit_callback(&self, mut cb: C) where // XXX(Nyannyacha): Should we relax bounds a bit more? - C: FnMut() -> bool + Send + 'static, + C: FnMut(usize) -> bool + Send + 'static, { let notify = self.mem_check_state.notify.clone(); let drop_token = self.mem_check_state.drop_token.clone(); + let current_bytes = self.mem_check_state.current_bytes.clone(); drop(rt::SUPERVISOR_RT.spawn(async move { loop { tokio::select! { _ = notify.notified() => { - if cb() { + if cb(current_bytes.load(Ordering::Acquire)) { break; } } @@ -1586,7 +1595,7 @@ mod test { let waker = user_rt.js_runtime.op_state().borrow().waker.clone(); let handle = user_rt.js_runtime.v8_isolate().thread_safe_handle(); - user_rt.add_memory_limit_callback(move || { + user_rt.add_memory_limit_callback(move |_| { handle.terminate_execution(); waker.wake(); callback_tx.send(()).unwrap(); diff --git a/crates/base/src/rt_worker/supervisor/mod.rs b/crates/base/src/rt_worker/supervisor/mod.rs index 835f4f11e..171f6239f 100644 --- a/crates/base/src/rt_worker/supervisor/mod.rs +++ b/crates/base/src/rt_worker/supervisor/mod.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use cpu_timer::{CPUAlarmVal, CPUTimer}; use deno_core::v8::IsolateHandle; use enum_as_inner::EnumAsInner; +use event_worker::events::MemoryLimitDetail; use futures_util::task::AtomicWaker; use log::error; use sb_workers::context::{Timing, UserWorkerMsgs, UserWorkerRuntimeOpts}; @@ -126,7 +127,7 @@ pub struct Arguments { pub cpu_timer_param: CPUTimerParam, pub supervisor_policy: SupervisorPolicy, pub timing: Option, - pub memory_limit_rx: mpsc::UnboundedReceiver<()>, + pub memory_limit_rx: mpsc::UnboundedReceiver, pub pool_msg_tx: Option>, pub isolate_memory_usage_tx: oneshot::Sender, pub thread_safe_handle: IsolateHandle, diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_request.rs b/crates/base/src/rt_worker/supervisor/strategy_per_request.rs index f743c0d71..63ad2e100 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_request.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_request.rs @@ -116,7 +116,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) if !cpu_timer_param.is_disabled() { if cpu_usage_ms >= hard_limit_ms as i64 { - error!("CPU time limit reached. isolate: {:?}", key); + error!("CPU time limit reached: isolate: {:?}", key); complete_reason = Some(ShutdownReason::CPUTime); } @@ -130,7 +130,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => { if is_worker_entered && req_start_ack { - error!("CPU time limit reached. isolate: {:?}", key); + error!("CPU time limit reached: isolate: {:?}", key); complete_reason = Some(ShutdownReason::CPUTime); } } @@ -171,14 +171,14 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64) continue; } else { - error!("wall clock duraiton reached. isolate: {:?}", key); + error!("wall clock duraiton reached: isolate: {:?}", key); complete_reason = Some(ShutdownReason::WallClockTime); } } - Some(_) = memory_limit_rx.recv() => { - error!("memory limit reached for the worker. isolate: {:?}", key); - complete_reason = Some(ShutdownReason::Memory); + Some(detail) = memory_limit_rx.recv() => { + error!("memory limit reached for the worker: isolate: {:?}", key); + complete_reason = Some(ShutdownReason::Memory(detail)); } } diff --git a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs index 0ee4c3f3f..c6f4cc2e6 100644 --- a/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs +++ b/crates/base/src/rt_worker/supervisor/strategy_per_worker.rs @@ -134,15 +134,15 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { if !cpu_timer_param.is_disabled() { if cpu_usage_ms >= hard_limit_ms as i64 { terminate_fn(); - error!("CPU time hard limit reached. isolate: {:?}", key); + error!("CPU time hard limit reached: isolate: {:?}", key); return (ShutdownReason::CPUTime, cpu_usage_ms); } else if cpu_usage_ms >= soft_limit_ms as i64 && !cpu_time_soft_limit_reached { - error!("CPU time soft limit reached. isolate: {:?}", key); + error!("CPU time soft limit reached: isolate: {:?}", key); cpu_time_soft_limit_reached = true; if req_ack_count == demand.load(Ordering::Acquire) { terminate_fn(); - error!("early termination due to the last request being completed. isolate: {:?}", key); + error!("early termination due to the last request being completed: isolate: {:?}", key); return (ShutdownReason::EarlyDrop, cpu_usage_ms); } } @@ -154,17 +154,17 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => { if is_worker_entered { if !cpu_time_soft_limit_reached { - error!("CPU time soft limit reached. isolate: {:?}", key); + error!("CPU time soft limit reached: isolate: {:?}", key); cpu_time_soft_limit_reached = true; if req_ack_count == demand.load(Ordering::Acquire) { terminate_fn(); - error!("early termination due to the last request being completed. isolate: {:?}", key); + error!("early termination due to the last request being completed: isolate: {:?}", key); return (ShutdownReason::EarlyDrop, cpu_usage_ms); } } else { terminate_fn(); - error!("CPU time hard limit reached. isolate: {:?}", key); + error!("CPU time hard limit reached: isolate: {:?}", key); return (ShutdownReason::CPUTime, cpu_usage_ms); } } @@ -186,7 +186,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { } terminate_fn(); - error!("early termination due to the last request being completed. isolate: {:?}", key); + error!("early termination due to the last request being completed: isolate: {:?}", key); return (ShutdownReason::EarlyDrop, cpu_usage_ms); } @@ -195,24 +195,24 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) { // first tick completes immediately wall_clock_alerts += 1; } else if wall_clock_alerts == 1 { - error!("wall clock duration warning. isolate: {:?}", key); + error!("wall clock duration warning: isolate: {:?}", key); wall_clock_alerts += 1; } else { let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire); terminate_fn(); - error!("wall clock duration reached. isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists); + error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists); return (ShutdownReason::WallClockTime, cpu_usage_ms); } } // memory usage - Some(_) = memory_limit_rx.recv() => { + Some(detail) = memory_limit_rx.recv() => { terminate_fn(); - error!("memory limit reached for the worker. isolate: {:?}", key); - return (ShutdownReason::Memory, cpu_usage_ms); + error!("memory limit reached for the worker: isolate: {:?}", key); + return (ShutdownReason::Memory(detail), cpu_usage_ms); } } } diff --git a/crates/base/src/rt_worker/worker.rs b/crates/base/src/rt_worker/worker.rs index 69f47e276..80c348a23 100644 --- a/crates/base/src/rt_worker/worker.rs +++ b/crates/base/src/rt_worker/worker.rs @@ -224,6 +224,7 @@ impl Worker { total: 0, heap: 0, external: 0, + mem_check_captured: 0, }, }, )); diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index 20c596b77..a25d5dced 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -2,7 +2,6 @@ use crate::deno_runtime::DenoRuntime; use crate::inspector_server::Inspector; use crate::timeout::{self, CancelOnWriteTimeout, ReadTimeoutStream}; use crate::utils::send_event_if_event_worker_available; -use crate::utils::units::bytes_to_display; use crate::rt_worker::worker::{Worker, WorkerHandler}; use crate::rt_worker::worker_pool::WorkerPool; @@ -11,7 +10,8 @@ use cpu_timer::CPUTimer; use deno_config::JsxImportSourceConfig; use deno_core::{InspectorSessionProxy, LocalInspectorSession}; use event_worker::events::{ - BootEvent, ShutdownEvent, WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed, + BootEvent, MemoryLimitDetail, MemoryLimitDetailMemCheck, MemoryLimitDetailV8, ShutdownEvent, + WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed, }; use futures_util::pin_mut; use http::StatusCode; @@ -31,6 +31,7 @@ use sb_workers::errors::WorkerError; use std::future::pending; use std::io::ErrorKind; use std::path::PathBuf; +use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::Duration; use tokio::io::{self, copy_bidirectional}; @@ -260,7 +261,7 @@ pub fn create_supervisor( timing: Option, termination_token: Option, ) -> Result<(Option, CancellationToken), Error> { - let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<()>(); + let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::(); let (waker, thread_safe_handle) = { let js_runtime = &mut worker_runtime.js_runtime; ( @@ -271,6 +272,7 @@ pub fn create_supervisor( // we assert supervisor is only run for user workers let conf = worker_runtime.conf.as_user_worker().unwrap().clone(); + let mem_check_captured_bytes = worker_runtime.mem_check_captured_bytes(); let is_termination_requested = worker_runtime.is_termination_requested.clone(); let giveup_process_requests_token = cancel.clone(); @@ -292,32 +294,43 @@ pub fn create_supervisor( ) }); - worker_runtime.add_memory_limit_callback({ - let memory_limit_tx = memory_limit_tx.clone(); - move || { - debug!("Hard memory limit triggered"); + let send_memory_limit_fn = move |detail: MemoryLimitDetail| { + debug!( + "memory limit triggered: isolate: {:?}, detail: {}", + key, detail + ); - if memory_limit_tx.send(()).is_err() { - error!("failed to send memory limit reached notification - isolate may already be terminating"); - } + if memory_limit_tx.send(detail).is_err() { + error!( + "failed to send memory limit reached notification - isolate may already be terminating: kind: {}", + <&'static str>::from(&detail) + ); + } + }; + + worker_runtime.add_memory_limit_callback({ + let send_fn = send_memory_limit_fn.clone(); + move |captured| { + send_fn(MemoryLimitDetail::MemCheck(MemoryLimitDetailMemCheck { + captured, + })); true } }); worker_runtime.js_runtime.add_near_heap_limit_callback({ - let memory_limit_tx = memory_limit_tx.clone(); - move |cur, _| { - debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),); - - if memory_limit_tx.send(()).is_err() { - error!("failed to send memory limit reached notification - isolate may already be terminating"); - } + let send_fn = send_memory_limit_fn; + move |current, initial| { + send_fn(MemoryLimitDetail::V8(MemoryLimitDetailV8 { + current, + initial, + })); // give an allowance on current limit (until the isolate is // terminated) we do this so that oom won't end up killing the // edge-runtime process - cur * (conf.low_memory_multiplier as usize) + current * (conf.low_memory_multiplier as usize) } }); @@ -471,7 +484,9 @@ pub fn create_supervisor( total: v.used_heap_size + v.external_memory, heap: v.used_heap_size, external: v.external_memory, + mem_check_captured: mem_check_captured_bytes.load(Ordering::Acquire), }, + Err(_) => { if !supervise_cancel_token_inner.is_cancelled() { error!("isolate memory usage sender dropped"); @@ -481,6 +496,7 @@ pub fn create_supervisor( total: 0, heap: 0, external: 0, + mem_check_captured: 0, } } }; diff --git a/crates/event_worker/Cargo.toml b/crates/event_worker/Cargo.toml index 7db04fa7a..e8ba7a8da 100644 --- a/crates/event_worker/Cargo.toml +++ b/crates/event_worker/Cargo.toml @@ -16,4 +16,5 @@ uuid.workspace = true serde.workspace = true anyhow.workspace = true tokio.workspace = true -log.workspace = true \ No newline at end of file +log.workspace = true +strum = { workspace = true, features = ["derive"] } \ No newline at end of file diff --git a/crates/event_worker/events.rs b/crates/event_worker/events.rs index 0c2f0da7c..79e46202c 100644 --- a/crates/event_worker/events.rs +++ b/crates/event_worker/events.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use strum::IntoStaticStr; use uuid::Uuid; #[derive(Serialize, Deserialize, Debug)] @@ -15,13 +16,60 @@ pub struct WorkerMemoryUsed { pub total: usize, pub heap: usize, pub external: usize, + pub mem_check_captured: usize, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct MemoryLimitDetailMemCheck { + pub captured: usize, +} + +impl std::fmt::Display for MemoryLimitDetailMemCheck { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryLimitDetailMemCheck") + .field("captured", &self.captured) + .finish() + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Copy)] +pub struct MemoryLimitDetailV8 { + pub current: usize, + pub initial: usize, +} + +impl std::fmt::Display for MemoryLimitDetailV8 { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("MemoryLimitDetailV8") + .field("current", &self.current) + .field("initial", &self.initial) + .finish() + } +} + +#[derive(Serialize, Deserialize, IntoStaticStr, Debug, Clone, Copy)] +#[serde(tag = "limited_by")] +#[serde(rename_all = "snake_case")] +#[strum(serialize_all = "snake_case")] +pub enum MemoryLimitDetail { + MemCheck(MemoryLimitDetailMemCheck), + V8(MemoryLimitDetailV8), +} + +impl std::fmt::Display for MemoryLimitDetail { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::MemCheck(arg) => std::fmt::Display::fmt(arg, f), + Self::V8(arg) => std::fmt::Display::fmt(arg, f), + } + } } #[derive(Serialize, Deserialize, Debug)] pub enum ShutdownReason { WallClockTime, CPUTime, - Memory, + Memory(MemoryLimitDetail), EarlyDrop, TerminationRequested, }