Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
base_rt = { version = "0.1.0", path = "../base_rt" }
base_mem_check = { version = "0.1.0", path = "../base_mem_check" }
http_utils = { version = "0.1.0", path = "../http_utils" }
async-trait.workspace = true
Expand Down
14 changes: 11 additions & 3 deletions crates/base/src/deno_runtime.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use crate::inspector_server::Inspector;
use crate::rt_worker::rt;
use crate::rt_worker::supervisor::{CPUUsage, CPUUsageMetrics};
use crate::rt_worker::worker::DuplexStreamEntry;
use crate::utils::units::{bytes_to_display, mib_to_bytes};
Expand All @@ -25,6 +24,7 @@ use futures_util::future::poll_fn;
use futures_util::task::AtomicWaker;
use log::{error, trace};
use once_cell::sync::{Lazy, OnceCell};
use sb_core::conn_sync::DenoRuntimeDropToken;
use sb_core::http::sb_core_http;
use sb_core::http_start::sb_core_http_start;
use sb_core::util::sync::AtomicFlag;
Expand Down Expand Up @@ -184,6 +184,7 @@ impl GetRuntimeContext for () {
}

pub struct DenoRuntime<RuntimeContext = ()> {
pub drop_token: CancellationToken,
pub js_runtime: JsRuntime,
pub env_vars: HashMap<String, String>, // TODO: does this need to be pub?
pub conf: WorkerRuntimeOpts,
Expand All @@ -203,6 +204,8 @@ pub struct DenoRuntime<RuntimeContext = ()> {

impl<RuntimeContext> Drop for DenoRuntime<RuntimeContext> {
fn drop(&mut self) {
self.drop_token.cancel();

if self.conf.is_user_worker() {
self.js_runtime.v8_isolate().remove_gc_prologue_callback(
mem_check_gc_prologue_callback_fn,
Expand Down Expand Up @@ -238,13 +241,16 @@ where
..
} = opts;

let drop_token = CancellationToken::default();

let base_dir_path = std::env::current_dir().map(|p| p.join(&service_path))?;
let base_url = Url::from_directory_path(&base_dir_path).unwrap();

let is_user_worker = conf.is_user_worker();

let potential_exts = vec!["ts", "tsx", "js", "jsx"];
let mut main_module_url = base_url.join("index.ts")?;

for potential_ext in potential_exts {
main_module_url = base_url.join(format!("index.{}", potential_ext).as_str())?;
if main_module_url.to_file_path().unwrap().exists() {
Expand Down Expand Up @@ -587,6 +593,7 @@ where
}

op_state.put::<sb_env::EnvVars>(env_vars);
op_state.put(DenoRuntimeDropToken(drop_token.clone()))
}

let main_module_id = {
Expand All @@ -600,7 +607,7 @@ where
};

if is_user_worker {
drop(rt::SUPERVISOR_RT.spawn({
drop(base_rt::SUPERVISOR_RT.spawn({
let drop_token = mem_check.drop_token.clone();
let waker = mem_check.waker.clone();

Expand All @@ -624,6 +631,7 @@ where
}

Ok(Self {
drop_token,
js_runtime,
env_vars,
conf,
Expand Down Expand Up @@ -866,7 +874,7 @@ where
let drop_token = self.mem_check.drop_token.clone();
let state = self.mem_check_state();

drop(rt::SUPERVISOR_RT.spawn(async move {
drop(base_rt::SUPERVISOR_RT.spawn(async move {
loop {
tokio::select! {
_ = notify.notified() => {
Expand Down
1 change: 0 additions & 1 deletion crates/base/src/rt_worker/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
pub mod implementation;
pub mod rt;
pub mod supervisor;
pub mod utils;
pub mod worker;
Expand Down
7 changes: 3 additions & 4 deletions crates/base/src/rt_worker/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use tokio::time::Instant;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use super::rt;
use super::supervisor::CPUUsageMetrics;
use super::worker_ctx::TerminationToken;
use super::worker_pool::SupervisorPolicy;
Expand Down Expand Up @@ -115,9 +114,9 @@ impl Worker {

let cancel = self.cancel.clone();
let rt = if worker_kind.is_user_worker() {
&rt::USER_WORKER_RT
&base_rt::USER_WORKER_RT
} else {
&rt::PRIMARY_WORKER_RT
&base_rt::PRIMARY_WORKER_RT
};

let _worker_handle = rt.spawn_pinned(move || {
Expand Down Expand Up @@ -194,7 +193,7 @@ impl Worker {
)
};

rt::SUPERVISOR_RT
base_rt::SUPERVISOR_RT
.spawn(async move {
token.inbound.cancelled().await;
is_termination_requested.raise();
Expand Down
7 changes: 3 additions & 4 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ use tokio_rustls::server::TlsStream;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;

use super::rt;
use super::supervisor::{self, CPUTimerParam, CPUUsageMetrics};
use super::worker::DuplexStreamEntry;
use super::worker_pool::{SupervisorPolicy, WorkerPoolPolicy};
Expand Down Expand Up @@ -333,7 +332,7 @@ pub fn create_supervisor(
cpu_timer_param.get_cpu_timer(supervisor_policy).unzip();

drop({
let _rt_guard = rt::SUPERVISOR_RT.enter();
let _rt_guard = base_rt::SUPERVISOR_RT.enter();
let maybe_cpu_timer_inner = maybe_cpu_timer.clone();
let supervise_cancel_token_inner = supervise_cancel_token.clone();

Expand Down Expand Up @@ -378,7 +377,7 @@ pub fn create_supervisor(
use deno_core::futures::channel::mpsc;
use deno_core::serde_json::Value;

rt::SUPERVISOR_RT
base_rt::SUPERVISOR_RT
.spawn_blocking(move || {
let wait_inspector_disconnect_fut = async move {
let ls = tokio::task::LocalSet::new();
Expand Down Expand Up @@ -449,7 +448,7 @@ pub fn create_supervisor(
.await;
};

rt::SUPERVISOR_RT.block_on(wait_inspector_disconnect_fut);
base_rt::SUPERVISOR_RT.block_on(wait_inspector_disconnect_fut);
})
.await
.unwrap();
Expand Down
9 changes: 9 additions & 0 deletions crates/base_rt/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[package]
name = "base_rt"
version = "0.1.0"
edition = "2021"

[dependencies]
tokio.workspace = true
tokio-util.workspace = true
once_cell.workspace = true
File renamed without changes.
4 changes: 3 additions & 1 deletion crates/sb_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ serde.workspace = true
bytes.workspace = true
deno_tls.workspace = true
thiserror.workspace = true
base_rt = { version = "0.1.0", path = "../base_rt" }
base_mem_check = { version = "0.1.0", path = "../base_mem_check" }
sb_node = { version = "0.1.0", path = "../node" }
deno_crypto.workspace = true
Expand Down Expand Up @@ -54,4 +55,5 @@ enum-as-inner.workspace = true
httparse.workspace = true
http.workspace = true
memmem = "0.1"
faster-hex.workspace=true
faster-hex.workspace = true
tracing.workspace = true
3 changes: 3 additions & 0 deletions crates/sb_core/conn_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,6 @@ impl ConnWatcher {
self.0.clone()
}
}

#[derive(Clone)]
pub struct DenoRuntimeDropToken(pub CancellationToken);
35 changes: 32 additions & 3 deletions crates/sb_core/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use std::sync::atomic::Ordering;
use tokio::io;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::span;
use tracing::Level;

use crate::conn_sync::DenoRuntimeDropToken;

pub struct TokioDuplexResource {
id: usize,
Expand Down Expand Up @@ -95,10 +99,18 @@ pub async fn op_net_accept(
// we do not want to keep the op_state locked,
// so we take the channel receiver from it and release op state.
// we need to add it back later after processing a message.
let rx = {
let (rx, runtime_token) = {
let mut op_state = state.borrow_mut();
op_state
.try_take::<mpsc::UnboundedReceiver<(io::DuplexStream, Option<CancellationToken>)>>()

(
op_state
.try_take::<mpsc::UnboundedReceiver<(io::DuplexStream, Option<CancellationToken>)>>(
),
op_state
.try_borrow::<DenoRuntimeDropToken>()
.cloned()
.unwrap(),
)
};

if rx.is_none() {
Expand Down Expand Up @@ -131,6 +143,23 @@ pub async fn op_net_accept(
let rid = op_state.resource_table.add(resource);

if let Some(token) = conn_token {
// connection token should only last as long as the worker is alive.
drop(base_rt::SUPERVISOR_RT.spawn({
let token = token.clone();
async move {
let _lt_track = span!(Level::DEBUG, "lt_track", id);
tokio::select! {
_ = runtime_token.0.cancelled_owned() => {
if !token.is_cancelled() {
token.cancel();
}
}

_ = token.cancelled() => {}
}
}
}));

let _ = op_state
.borrow_mut::<HashMap<usize, CancellationToken>>()
.insert(id, token);
Expand Down