Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ glob = "0.3.1"
httparse = "1.8"
http = "0.2"
faster-hex = "0.9.0"
strum = "0.25"

# DEBUG
#[patch.crates-io]
Expand Down
15 changes: 12 additions & 3 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use std::borrow::Cow;
use std::collections::HashMap;
use std::ffi::c_void;
use std::fmt;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Poll;
use std::time::Duration;
Expand Down Expand Up @@ -123,6 +124,7 @@ struct MemCheckState {
limit: Option<usize>,
waker: Arc<AtomicWaker>,
notify: Arc<Notify>,
current_bytes: Arc<AtomicUsize>,

#[cfg(debug_assertions)]
exceeded: Arc<AtomicFlag>,
Expand Down Expand Up @@ -156,6 +158,8 @@ impl MemCheckState {
.saturating_add(used_heap_bytes)
.saturating_add(external_bytes);

self.current_bytes.store(total_bytes, Ordering::Release);

if total_bytes >= limit {
self.notify.notify_waiters();

Expand Down Expand Up @@ -816,19 +820,24 @@ impl DenoRuntime {
self.maybe_inspector.clone()
}

pub fn mem_check_captured_bytes(&self) -> Arc<AtomicUsize> {
self.mem_check_state.current_bytes.clone()
}

pub fn add_memory_limit_callback<C>(&self, mut cb: C)
where
// XXX(Nyannyacha): Should we relax bounds a bit more?
C: FnMut() -> bool + Send + 'static,
C: FnMut(usize) -> bool + Send + 'static,
{
let notify = self.mem_check_state.notify.clone();
let drop_token = self.mem_check_state.drop_token.clone();
let current_bytes = self.mem_check_state.current_bytes.clone();

drop(rt::SUPERVISOR_RT.spawn(async move {
loop {
tokio::select! {
_ = notify.notified() => {
if cb() {
if cb(current_bytes.load(Ordering::Acquire)) {
break;
}
}
Expand Down Expand Up @@ -1586,7 +1595,7 @@ mod test {
let waker = user_rt.js_runtime.op_state().borrow().waker.clone();
let handle = user_rt.js_runtime.v8_isolate().thread_safe_handle();

user_rt.add_memory_limit_callback(move || {
user_rt.add_memory_limit_callback(move |_| {
handle.terminate_execution();
waker.wake();
callback_tx.send(()).unwrap();
Expand Down
3 changes: 2 additions & 1 deletion crates/base/src/rt_worker/supervisor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::sync::Arc;
use cpu_timer::{CPUAlarmVal, CPUTimer};
use deno_core::v8::IsolateHandle;
use enum_as_inner::EnumAsInner;
use event_worker::events::MemoryLimitDetail;
use futures_util::task::AtomicWaker;
use log::error;
use sb_workers::context::{Timing, UserWorkerMsgs, UserWorkerRuntimeOpts};
Expand Down Expand Up @@ -126,7 +127,7 @@ pub struct Arguments {
pub cpu_timer_param: CPUTimerParam,
pub supervisor_policy: SupervisorPolicy,
pub timing: Option<Timing>,
pub memory_limit_rx: mpsc::UnboundedReceiver<()>,
pub memory_limit_rx: mpsc::UnboundedReceiver<MemoryLimitDetail>,
pub pool_msg_tx: Option<mpsc::UnboundedSender<UserWorkerMsgs>>,
pub isolate_memory_usage_tx: oneshot::Sender<IsolateMemoryStats>,
pub thread_safe_handle: IsolateHandle,
Expand Down
12 changes: 6 additions & 6 deletions crates/base/src/rt_worker/supervisor/strategy_per_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)

if !cpu_timer_param.is_disabled() {
if cpu_usage_ms >= hard_limit_ms as i64 {
error!("CPU time limit reached. isolate: {:?}", key);
error!("CPU time limit reached: isolate: {:?}", key);
complete_reason = Some(ShutdownReason::CPUTime);
}

Expand All @@ -130,7 +130,7 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)

Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => {
if is_worker_entered && req_start_ack {
error!("CPU time limit reached. isolate: {:?}", key);
error!("CPU time limit reached: isolate: {:?}", key);
complete_reason = Some(ShutdownReason::CPUTime);
}
}
Expand Down Expand Up @@ -171,14 +171,14 @@ pub async fn supervise(args: Arguments, oneshot: bool) -> (ShutdownReason, i64)

continue;
} else {
error!("wall clock duraiton reached. isolate: {:?}", key);
error!("wall clock duraiton reached: isolate: {:?}", key);
complete_reason = Some(ShutdownReason::WallClockTime);
}
}

Some(_) = memory_limit_rx.recv() => {
error!("memory limit reached for the worker. isolate: {:?}", key);
complete_reason = Some(ShutdownReason::Memory);
Some(detail) = memory_limit_rx.recv() => {
error!("memory limit reached for the worker: isolate: {:?}", key);
complete_reason = Some(ShutdownReason::Memory(detail));
}
}

Expand Down
24 changes: 12 additions & 12 deletions crates/base/src/rt_worker/supervisor/strategy_per_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,15 +134,15 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
if !cpu_timer_param.is_disabled() {
if cpu_usage_ms >= hard_limit_ms as i64 {
terminate_fn();
error!("CPU time hard limit reached. isolate: {:?}", key);
error!("CPU time hard limit reached: isolate: {:?}", key);
return (ShutdownReason::CPUTime, cpu_usage_ms);
} else if cpu_usage_ms >= soft_limit_ms as i64 && !cpu_time_soft_limit_reached {
error!("CPU time soft limit reached. isolate: {:?}", key);
error!("CPU time soft limit reached: isolate: {:?}", key);
cpu_time_soft_limit_reached = true;

if req_ack_count == demand.load(Ordering::Acquire) {
terminate_fn();
error!("early termination due to the last request being completed. isolate: {:?}", key);
error!("early termination due to the last request being completed: isolate: {:?}", key);
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
}
}
Expand All @@ -154,17 +154,17 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
Some(_) = wait_cpu_alarm(cpu_alarms_rx.as_mut()) => {
if is_worker_entered {
if !cpu_time_soft_limit_reached {
error!("CPU time soft limit reached. isolate: {:?}", key);
error!("CPU time soft limit reached: isolate: {:?}", key);
cpu_time_soft_limit_reached = true;

if req_ack_count == demand.load(Ordering::Acquire) {
terminate_fn();
error!("early termination due to the last request being completed. isolate: {:?}", key);
error!("early termination due to the last request being completed: isolate: {:?}", key);
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
}
} else {
terminate_fn();
error!("CPU time hard limit reached. isolate: {:?}", key);
error!("CPU time hard limit reached: isolate: {:?}", key);
return (ShutdownReason::CPUTime, cpu_usage_ms);
}
}
Expand All @@ -186,7 +186,7 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
}

terminate_fn();
error!("early termination due to the last request being completed. isolate: {:?}", key);
error!("early termination due to the last request being completed: isolate: {:?}", key);
return (ShutdownReason::EarlyDrop, cpu_usage_ms);
}

Expand All @@ -195,24 +195,24 @@ pub async fn supervise(args: Arguments) -> (ShutdownReason, i64) {
// first tick completes immediately
wall_clock_alerts += 1;
} else if wall_clock_alerts == 1 {
error!("wall clock duration warning. isolate: {:?}", key);
error!("wall clock duration warning: isolate: {:?}", key);
wall_clock_alerts += 1;
} else {
let is_in_flight_req_exists = req_ack_count != demand.load(Ordering::Acquire);

terminate_fn();

error!("wall clock duration reached. isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);
error!("wall clock duration reached: isolate: {:?} (in_flight_req_exists = {})", key, is_in_flight_req_exists);

return (ShutdownReason::WallClockTime, cpu_usage_ms);
}
}

// memory usage
Some(_) = memory_limit_rx.recv() => {
Some(detail) = memory_limit_rx.recv() => {
terminate_fn();
error!("memory limit reached for the worker. isolate: {:?}", key);
return (ShutdownReason::Memory, cpu_usage_ms);
error!("memory limit reached for the worker: isolate: {:?}", key);
return (ShutdownReason::Memory(detail), cpu_usage_ms);
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@ impl Worker {
total: 0,
heap: 0,
external: 0,
mem_check_captured: 0,
},
},
));
Expand Down
52 changes: 34 additions & 18 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use crate::deno_runtime::DenoRuntime;
use crate::inspector_server::Inspector;
use crate::timeout::{self, CancelOnWriteTimeout, ReadTimeoutStream};
use crate::utils::send_event_if_event_worker_available;
use crate::utils::units::bytes_to_display;

use crate::rt_worker::worker::{Worker, WorkerHandler};
use crate::rt_worker::worker_pool::WorkerPool;
Expand All @@ -11,7 +10,8 @@ use cpu_timer::CPUTimer;
use deno_config::JsxImportSourceConfig;
use deno_core::{InspectorSessionProxy, LocalInspectorSession};
use event_worker::events::{
BootEvent, ShutdownEvent, WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed,
BootEvent, MemoryLimitDetail, MemoryLimitDetailMemCheck, MemoryLimitDetailV8, ShutdownEvent,
WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed,
};
use futures_util::pin_mut;
use http::StatusCode;
Expand All @@ -31,6 +31,7 @@ use sb_workers::errors::WorkerError;
use std::future::pending;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::io::{self, copy_bidirectional};
Expand Down Expand Up @@ -260,7 +261,7 @@ pub fn create_supervisor(
timing: Option<Timing>,
termination_token: Option<TerminationToken>,
) -> Result<(Option<CPUTimer>, CancellationToken), Error> {
let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<()>();
let (memory_limit_tx, memory_limit_rx) = mpsc::unbounded_channel::<MemoryLimitDetail>();
let (waker, thread_safe_handle) = {
let js_runtime = &mut worker_runtime.js_runtime;
(
Expand All @@ -271,6 +272,7 @@ pub fn create_supervisor(

// we assert supervisor is only run for user workers
let conf = worker_runtime.conf.as_user_worker().unwrap().clone();
let mem_check_captured_bytes = worker_runtime.mem_check_captured_bytes();
let is_termination_requested = worker_runtime.is_termination_requested.clone();

let giveup_process_requests_token = cancel.clone();
Expand All @@ -292,32 +294,43 @@ pub fn create_supervisor(
)
});

worker_runtime.add_memory_limit_callback({
let memory_limit_tx = memory_limit_tx.clone();
move || {
debug!("Hard memory limit triggered");
let send_memory_limit_fn = move |detail: MemoryLimitDetail| {
debug!(
"memory limit triggered: isolate: {:?}, detail: {}",
key, detail
);

if memory_limit_tx.send(()).is_err() {
error!("failed to send memory limit reached notification - isolate may already be terminating");
}
if memory_limit_tx.send(detail).is_err() {
error!(
"failed to send memory limit reached notification - isolate may already be terminating: kind: {}",
<&'static str>::from(&detail)
);
}
};

worker_runtime.add_memory_limit_callback({
let send_fn = send_memory_limit_fn.clone();
move |captured| {
send_fn(MemoryLimitDetail::MemCheck(MemoryLimitDetailMemCheck {
captured,
}));

true
}
});

worker_runtime.js_runtime.add_near_heap_limit_callback({
let memory_limit_tx = memory_limit_tx.clone();
move |cur, _| {
debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),);

if memory_limit_tx.send(()).is_err() {
error!("failed to send memory limit reached notification - isolate may already be terminating");
}
let send_fn = send_memory_limit_fn;
move |current, initial| {
send_fn(MemoryLimitDetail::V8(MemoryLimitDetailV8 {
current,
initial,
}));

// give an allowance on current limit (until the isolate is
// terminated) we do this so that oom won't end up killing the
// edge-runtime process
cur * (conf.low_memory_multiplier as usize)
current * (conf.low_memory_multiplier as usize)
}
});

Expand Down Expand Up @@ -471,7 +484,9 @@ pub fn create_supervisor(
total: v.used_heap_size + v.external_memory,
heap: v.used_heap_size,
external: v.external_memory,
mem_check_captured: mem_check_captured_bytes.load(Ordering::Acquire),
},

Err(_) => {
if !supervise_cancel_token_inner.is_cancelled() {
error!("isolate memory usage sender dropped");
Expand All @@ -481,6 +496,7 @@ pub fn create_supervisor(
total: 0,
heap: 0,
external: 0,
mem_check_captured: 0,
}
}
};
Expand Down
3 changes: 2 additions & 1 deletion crates/event_worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ uuid.workspace = true
serde.workspace = true
anyhow.workspace = true
tokio.workspace = true
log.workspace = true
log.workspace = true
strum = { workspace = true, features = ["derive"] }
Loading