Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
[workspace]
members = [
"./crates/base",
"./crates/base_mem_check",
"./crates/cli",
"./crates/sb_workers",
"./crates/sb_env",
Expand Down
1 change: 1 addition & 0 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
base_mem_check = { version = "0.1.0", path = "../base_mem_check" }
http_utils = { version = "0.1.0", path = "../http_utils" }
async-trait.workspace = true
thiserror.workspace = true
Expand Down
91 changes: 46 additions & 45 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::rt_worker::worker::DuplexStreamEntry;
use crate::utils::units::{bytes_to_display, mib_to_bytes};

use anyhow::{anyhow, bail, Context, Error};
use base_mem_check::{MemCheckState, WorkerHeapStatistics};
use cooked_waker::{IntoWaker, WakeRef};
use cpu_timer::get_thread_time;
use ctor::ctor;
Expand Down Expand Up @@ -33,8 +34,7 @@ use std::collections::HashMap;
use std::ffi::c_void;
use std::fmt;
use std::marker::PhantomData;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use std::task::Poll;
use std::time::Duration;
use tokio::sync::{mpsc, Notify};
Expand Down Expand Up @@ -118,25 +118,22 @@ fn get_error_class_name(e: &AnyError) -> &'static str {
sb_core::errors_rt::get_error_class_name(e).unwrap_or("Error")
}

#[derive(Default, Clone)]
struct MemCheckState {
#[derive(Default)]
struct MemCheck {
drop_token: CancellationToken,
limit: Option<usize>,
waker: Arc<AtomicWaker>,
notify: Arc<Notify>,
current_bytes: Arc<AtomicUsize>,

#[cfg(debug_assertions)]
exceeded: Arc<AtomicFlag>,
state: Arc<RwLock<MemCheckState>>,
}

impl Drop for MemCheckState {
impl Drop for MemCheck {
fn drop(&mut self) {
self.drop_token.cancel();
}
}

impl MemCheckState {
impl MemCheck {
fn check(&self, isolate: &mut Isolate) -> usize {
let Some(limit) = self.limit else {
return 0;
Expand All @@ -158,25 +155,22 @@ impl MemCheckState {
.saturating_add(used_heap_bytes)
.saturating_add(external_bytes);

self.current_bytes.store(total_bytes, Ordering::Release);
let heap_stats = WorkerHeapStatistics::from(&stats);
let mut state = self.state.write().unwrap();

if total_bytes >= limit {
self.notify.notify_waiters();
state.current = heap_stats;

#[cfg(debug_assertions)]
if !self.exceeded.is_raised() {
self.exceeded.raise();
if total_bytes >= limit {
if !state.exceeded {
state.exceeded = true;
}

drop(state);
self.notify.notify_waiters();
}

total_bytes
}

#[allow(dead_code)]
#[cfg(debug_assertions)]
fn is_exceeded(&self) -> bool {
self.exceeded.is_raised()
}
}

pub trait GetRuntimeContext {
Expand All @@ -201,7 +195,7 @@ pub struct DenoRuntime<RuntimeContext = ()> {
main_module_id: ModuleId,
maybe_inspector: Option<Inspector>,

mem_check_state: Arc<MemCheckState>,
mem_check: Arc<MemCheck>,
waker: Arc<AtomicWaker>,

_phantom_runtime_context: PhantomData<RuntimeContext>,
Expand All @@ -212,7 +206,7 @@ impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
if self.conf.is_user_worker() {
self.js_runtime.v8_isolate().remove_gc_prologue_callback(
mem_check_gc_prologue_callback_fn,
Arc::as_ptr(&self.mem_check_state) as *mut _,
Arc::as_ptr(&self.mem_check) as *mut _,
);
}
}
Expand Down Expand Up @@ -458,25 +452,25 @@ where
];

let mut create_params = None;
let mut mem_check_state = MemCheckState::default();
let mut mem_check = MemCheck::default();

if conf.is_user_worker() {
let memory_limit =
mib_to_bytes(conf.as_user_worker().unwrap().memory_limit_mb) as usize;

let allocator = CustomAllocator::new(memory_limit);

allocator.set_waker(mem_check_state.waker.clone());
allocator.set_waker(mem_check.waker.clone());

mem_check_state.limit = Some(memory_limit);
mem_check.limit = Some(memory_limit);
create_params = Some(
deno_core::v8::CreateParams::default()
.heap_limits(mib_to_bytes(0) as usize, memory_limit)
.array_buffer_allocator(allocator.into_v8_allocator()),
)
};

let mem_check_state = Arc::new(mem_check_state);
let mem_check = Arc::new(mem_check);
let runtime_options = RuntimeOptions {
extensions,
is_main: true,
Expand Down Expand Up @@ -543,14 +537,14 @@ where
if is_user_worker {
js_runtime.v8_isolate().add_gc_prologue_callback(
mem_check_gc_prologue_callback_fn,
Arc::as_ptr(&mem_check_state) as *mut _,
Arc::as_ptr(&mem_check) as *mut _,
GCType::ALL,
);

js_runtime
.op_state()
.borrow_mut()
.put(MemCheckWaker::from(mem_check_state.waker.clone()));
.put(MemCheckWaker::from(mem_check.waker.clone()));
}

js_runtime
Expand Down Expand Up @@ -607,8 +601,8 @@ where

if is_user_worker {
drop(rt::SUPERVISOR_RT.spawn({
let drop_token = mem_check_state.drop_token.clone();
let waker = mem_check_state.waker.clone();
let drop_token = mem_check.drop_token.clone();
let waker = mem_check.waker.clone();

async move {
// TODO(Nyannyacha): Should we introduce exponential
Expand Down Expand Up @@ -641,7 +635,7 @@ where
main_module_id,
maybe_inspector,

mem_check_state,
mem_check,
waker: Arc::default(),

_phantom_runtime_context: PhantomData,
Expand Down Expand Up @@ -735,7 +729,7 @@ where
let is_termination_requested = self.is_termination_requested.clone();
let is_user_worker = self.conf.is_user_worker();
let global_waker = self.waker.clone();
let mem_check_state = is_user_worker.then(|| self.mem_check_state.clone());
let mem_check = is_user_worker.then(|| self.mem_check.clone());

let poll_result = poll_fn(|cx| unsafe {
// INVARIANT: Only can steal current task by other threads when LIFO
Expand Down Expand Up @@ -809,7 +803,7 @@ where
}));

if is_user_worker {
let mem_state = mem_check_state.as_ref().unwrap();
let mem_state = mem_check.as_ref().unwrap();
let total_malloced_bytes = mem_state.check(js_runtime.v8_isolate().as_mut());

mem_state.waker.register(waker);
Expand Down Expand Up @@ -859,24 +853,31 @@ where
self.maybe_inspector.clone()
}

pub fn mem_check_captured_bytes(&self) -> Arc<AtomicUsize> {
self.mem_check_state.current_bytes.clone()
pub fn mem_check_state(&self) -> Arc<RwLock<MemCheckState>> {
self.mem_check.state.clone()
}

pub fn add_memory_limit_callback<C>(&self, mut cb: C)
where
// XXX(Nyannyacha): Should we relax bounds a bit more?
C: FnMut(usize) -> bool + Send + 'static,
C: FnMut(MemCheckState) -> bool + Send + 'static,
{
let notify = self.mem_check_state.notify.clone();
let drop_token = self.mem_check_state.drop_token.clone();
let current_bytes = self.mem_check_state.current_bytes.clone();
let notify = self.mem_check.notify.clone();
let drop_token = self.mem_check.drop_token.clone();
let state = self.mem_check_state();

drop(rt::SUPERVISOR_RT.spawn(async move {
loop {
tokio::select! {
_ = notify.notified() => {
if cb(current_bytes.load(Ordering::Acquire)) {
let state = tokio::task::spawn_blocking({
let state = state.clone();
move || {
*state.read().unwrap()
}
}).await.unwrap();

if cb(state) {
break;
}
}
Expand Down Expand Up @@ -931,7 +932,7 @@ extern "C" fn mem_check_gc_prologue_callback_fn(
data: *mut c_void,
) {
unsafe {
(*(data as *mut MemCheckState)).check(&mut *isolate);
(*(data as *mut MemCheck)).check(&mut *isolate);
}
}

Expand Down Expand Up @@ -1607,7 +1608,7 @@ mod test {
assert!(result.is_ok(), "expected no errors");

// however, mem checker must be raised because it aggregates heap usage
assert!(user_rt.mem_check_state.is_exceeded());
assert!(user_rt.mem_check.state.read().unwrap().exceeded);
}

#[tokio::test]
Expand Down Expand Up @@ -1661,7 +1662,7 @@ mod test {

callback_rx.recv().await.unwrap();

assert!(user_rt.mem_check_state.is_exceeded());
assert!(user_rt.mem_check.state.read().unwrap().exceeded);
};

if timeout(Duration::from_secs(10), wait_fut).await.is_err() {
Expand Down
3 changes: 2 additions & 1 deletion crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use crate::rt_worker::utils::{get_event_metadata, parse_worker_conf};
use crate::rt_worker::worker_ctx::create_supervisor;
use crate::utils::send_event_if_event_worker_available;
use anyhow::{anyhow, Error};
use base_mem_check::MemCheckState;
use event_worker::events::{
EventLoopCompletedEvent, EventMetadata, ShutdownEvent, ShutdownReason, UncaughtExceptionEvent,
WorkerEventWithMetadata, WorkerEvents, WorkerMemoryUsed,
Expand Down Expand Up @@ -224,7 +225,7 @@ impl Worker {
total: 0,
heap: 0,
external: 0,
mem_check_captured: 0,
mem_check_captured: MemCheckState::default(),
},
},
));
Expand Down
Loading