diff --git a/Cargo.lock b/Cargo.lock index f969be05..516933a4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6224,7 +6224,7 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", + "cfg-if 1.0.0", "rand", "static_assertions", ] diff --git a/Cargo.toml b/Cargo.toml index bb5f671f..5b992dad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -45,6 +45,7 @@ deno_webidl = { version = "0.136.0" } deno_web = { version = "0.167.0" } deno_websocket = { version = "0.141.0" } deno_webstorage = { version = "0.131.0" } +deno_lockfile = { version = "0.18.0" } enum-as-inner = "0.6.0" serde = { version = "1.0.149", features = ["derive"] } hyper = "0.14.26" @@ -52,7 +53,6 @@ tokio = { version = "1.35", features = ["full"] } bytes = { version = "1.4.0" } once_cell = "1.17.1" thiserror = "1.0.40" -deno_lockfile = "0.18.0" async-trait = "0.1.73" indexmap = { version = "2.0.0", features = ["serde"] } flate2 = "=1.0.26" @@ -78,6 +78,7 @@ glob = "0.3.1" httparse = "1.8" http = "0.2" faster-hex = "0.9.0" + # DEBUG #[patch.crates-io] #deno_core = { path = "/your/path/to/deno_core/core" } diff --git a/crates/base/Cargo.toml b/crates/base/Cargo.toml index b092725b..98319d5a 100644 --- a/crates/base/Cargo.toml +++ b/crates/base/Cargo.toml @@ -44,7 +44,7 @@ tokio-rustls = { version = "0.25.0" } rustls-pemfile = { version = "2.1.0" } futures-util = { workspace = true } url.workspace = true -event_worker ={ version = "0.1.0", path = "../event_worker" } +event_worker = { version = "0.1.0", path = "../event_worker" } sb_workers = { version = "0.1.0", path = "../sb_workers" } sb_env = { version = "0.1.0", path = "../sb_env" } sb_core = { version = "0.1.0", path = "../sb_core" } @@ -109,7 +109,7 @@ reqwest.workspace = true serde = { workspace = true, features = ["derive"] } tokio.workspace = true url.workspace = true -event_worker ={ version = "0.1.0", path = "../event_worker" } +event_worker = { version = "0.1.0", path = "../event_worker" } deno_broadcast_channel.workspace = true deno_core.workspace = true deno_canvas.workspace = true diff --git a/crates/base/src/lib.rs b/crates/base/src/lib.rs index 20576151..02158ed8 100644 --- a/crates/base/src/lib.rs +++ b/crates/base/src/lib.rs @@ -9,6 +9,7 @@ pub mod snapshot; pub mod utils; mod inspector_server; +mod timeout; pub use inspector_server::InspectorOption; pub use sb_graph::DecoratorType; diff --git a/crates/base/src/macros/test_macros.rs b/crates/base/src/macros/test_macros.rs index 13ca42c6..d74c544a 100644 --- a/crates/base/src/macros/test_macros.rs +++ b/crates/base/src/macros/test_macros.rs @@ -1,6 +1,36 @@ #[macro_export] -macro_rules! integration_test { - ($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { +macro_rules! integration_test_listen_fut { + ($port:expr, $tls:expr, $main_file:expr, $policy:expr, $import_map:expr, $flag:expr, $tx:expr, $token:expr) => {{ + use futures_util::FutureExt; + + let tls: Option = $tls.clone(); + + base::commands::start_server( + "0.0.0.0", + $port, + tls, + String::from($main_file), + None, + None, + $policy, + $import_map, + $flag, + Some($tx.clone()), + $crate::server::WorkerEntrypoints { + main: None, + events: None, + }, + $token.clone(), + vec![], + None, + ) + .boxed() + }}; +} + +#[macro_export] +macro_rules! integration_test_with_server_flag { + ($flag:expr, $main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { use futures_util::FutureExt; use $crate::macros::test_macros::__private; @@ -11,39 +41,29 @@ macro_rules! integration_test { let schema = if tls.is_some() { "https" } else { "http" }; let signal = tokio::spawn(async move { while let Some(base::server::ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await { - integration_test!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+)); + $crate::integration_test_with_server_flag!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+)); } None }); - let token = integration_test!(@term $(, $($token)+)?); - let mut listen_fut = base::commands::start_server( - "0.0.0.0", + let token = $crate::integration_test_with_server_flag!(@term $(, $($token)+)?); + let mut listen_fut = $crate::integration_test_listen_fut!( $port, tls, - String::from($main_file), - None, - None, + $main_file, $policy, $import_map, - $crate::server::ServerFlags::default(), - Some(tx.clone()), - $crate::server::WorkerEntrypoints { - main: None, - events: None, - }, - token.clone(), - vec![], - None - ) - .boxed(); + $flag, + tx, + token + ); tokio::select! { resp = signal => { if let Ok(maybe_response_from_server) = resp { // then, after checking the response... (2) let resp = maybe_response_from_server.unwrap(); - integration_test!(@resp resp, ($($function)+)).await; + $crate::integration_test_with_server_flag!(@resp resp, ($($function)+)).await; } else { panic!("Request thread had a heart attack"); } @@ -62,7 +82,7 @@ macro_rules! integration_test { let _ = listen_fut.await; }); - integration_test!(@term_cleanup $($($token)+)?, token, join_fut); + $crate::integration_test_with_server_flag!(@term_cleanup $($($token)+)?, token, join_fut); } }; @@ -136,6 +156,24 @@ macro_rules! integration_test { }; } +#[macro_export] +macro_rules! integration_test { + ($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => { + $crate::integration_test_with_server_flag!( + $crate::server::ServerFlags::default(), + $main_file, + $port, + $url, + $policy, + $import_map, + $req_builder, + $tls, + ($($function)+) + $(,$($token)+)? + ) + }; +} + #[doc(hidden)] pub mod __private { use std::future::Future; diff --git a/crates/base/src/rt_worker/worker_ctx.rs b/crates/base/src/rt_worker/worker_ctx.rs index b9e1b0be..bc58fa2f 100644 --- a/crates/base/src/rt_worker/worker_ctx.rs +++ b/crates/base/src/rt_worker/worker_ctx.rs @@ -1,5 +1,6 @@ use crate::deno_runtime::DenoRuntime; use crate::inspector_server::Inspector; +use crate::timeout; use crate::utils::send_event_if_event_worker_available; use crate::utils::units::bytes_to_display; @@ -184,7 +185,7 @@ async fn relay_upgraded_request_and_response( match copy_bidirectional(&mut upstream, &mut downstream).await { Ok(_) => {} Err(err) if err.kind() == ErrorKind::UnexpectedEof => { - let Ok(_) = downstream.downcast::>() else { + let Ok(_) = downstream.downcast::>>() else { // TODO(Nyannyacha): It would be better if we send // `close_notify` before shutdown an upstream if downstream is a // TLS stream. @@ -253,7 +254,7 @@ pub fn create_supervisor( debug!("Hard memory limit triggered"); if memory_limit_tx.send(()).is_err() { - error!("failed to send memory limit reached notification - isolate may already be terminating"); + error!("failed to send memory limit reached notification - isolate may already be terminating"); } true @@ -263,10 +264,7 @@ pub fn create_supervisor( worker_runtime.js_runtime.add_near_heap_limit_callback({ let memory_limit_tx = memory_limit_tx.clone(); move |cur, _| { - debug!( - "Low memory alert triggered: {}", - bytes_to_display(cur as u64), - ); + debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),); if memory_limit_tx.send(()).is_err() { error!("failed to send memory limit reached notification - isolate may already be terminating"); diff --git a/crates/base/src/server.rs b/crates/base/src/server.rs index 107b0697..885e6ff5 100644 --- a/crates/base/src/server.rs +++ b/crates/base/src/server.rs @@ -244,6 +244,7 @@ pub struct ServerFlags { pub tcp_nodelay: bool, pub graceful_exit_deadline_sec: u64, pub graceful_exit_keepalive_deadline_ms: Option, + pub request_read_timeout_ms: Option, } #[derive(Debug)] @@ -467,11 +468,13 @@ impl Server { let ServerFlags { tcp_nodelay, + request_read_timeout_ms, mut graceful_exit_deadline_sec, mut graceful_exit_keepalive_deadline_ms, .. } = flags; + let request_read_timeout_dur = request_read_timeout_ms.map(Duration::from_millis); let mut terminate_signal_fut = get_termination_signal(); loop { @@ -487,7 +490,14 @@ impl Server { let _ = stream.set_nodelay(true); } - accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone()) + accept_stream( + stream, + main_worker_req_tx, + event_tx, + metric_src, + graceful_exit_token.clone(), + request_read_timeout_dur + ) } Err(e) => error!("socket error: {}", e) } @@ -507,7 +517,14 @@ impl Server { let _ = stream.get_ref().0.set_nodelay(true); } - accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone()); + accept_stream( + stream, + main_worker_req_tx, + event_tx, + metric_src, + graceful_exit_token.clone(), + request_read_timeout_dur + ) } Err(e) => error!("socket error: {}", e) } @@ -675,6 +692,7 @@ fn accept_stream( event_tx: Option>, metric_src: SharedMetricSource, graceful_exit_token: CancellationToken, + maybe_req_read_timeout_dur: Option, ) where I: AsyncRead + AsyncWrite + Unpin + Send + 'static, { @@ -682,14 +700,21 @@ fn accept_stream( tokio::task::spawn({ async move { let (service, cancel) = WorkerService::new(metric_src.clone(), req_tx); + let (io, maybe_timeout_tx) = if let Some(timeout_dur) = maybe_req_read_timeout_dur { + crate::timeout::Stream::with_timeout(io, timeout_dur) + } else { + crate::timeout::Stream::with_bypass(io) + }; let _guard = cancel.drop_guard(); let _active_io_count_guard = scopeguard::guard(metric_src, |it| { it.decl_active_io(); }); - let conn_fut = Http::new().serve_connection(io, service).with_upgrades(); let mut shutting_down = false; + let conn_fut = Http::new() + .serve_connection(io, crate::timeout::Service::new(service, maybe_timeout_tx)) + .with_upgrades(); pin!(conn_fut); diff --git a/crates/base/src/timeout.rs b/crates/base/src/timeout.rs new file mode 100644 index 00000000..4fac82bf --- /dev/null +++ b/crates/base/src/timeout.rs @@ -0,0 +1,280 @@ +// This implementation originated from the link below: +// https://gist.github.com/programatik29/36d371c657392fd7f322e7342957b6d1 + +use std::{ + pin::Pin, + task::{ready, Poll}, + time::Duration, +}; + +use futures_util::Future; +use pin_project::pin_project; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::mpsc::{self, UnboundedReceiver, UnboundedSender}, + time::{sleep, Instant, Sleep}, +}; + +pub(super) enum State { + Wait, + Reset, +} + +enum StreamKind { + UseTimeout { + sleep: Pin>, + duration: Duration, + waiting: bool, + finished: bool, + rx: UnboundedReceiver, + }, + + Bypass, +} + +pub struct Stream { + inner: S, + kind: StreamKind, +} + +impl Stream { + fn new(inner: S, kind: StreamKind) -> Self { + Self { inner, kind } + } + + pub(super) fn with_timeout( + inner: S, + duration: Duration, + ) -> (Self, Option>) { + let (tx, rx) = mpsc::unbounded_channel(); + + ( + Self::new( + inner, + StreamKind::UseTimeout { + sleep: Box::pin(sleep(duration)), + duration, + waiting: false, + finished: false, + rx, + }, + ), + Some(tx), + ) + } + + pub(super) fn with_bypass(inner: S) -> (Self, Option>) { + (Self::new(inner, StreamKind::Bypass), None) + } +} + +impl AsyncRead for Stream { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> Poll> { + match &mut self.kind { + StreamKind::UseTimeout { + sleep, + duration, + waiting, + finished, + rx, + } => { + if !*finished { + match Pin::new(rx).poll_recv(cx) { + Poll::Ready(Some(State::Reset)) => { + *waiting = false; + + let deadline = Instant::now() + *duration; + + sleep.as_mut().reset(deadline); + } + + // enter waiting mode (for response body last chunk) + Poll::Ready(Some(State::Wait)) => *waiting = true, + Poll::Ready(None) => *finished = true, + Poll::Pending => (), + } + } + + if !*waiting { + // return error if timer is elapsed + if let Poll::Ready(()) = sleep.as_mut().poll(cx) { + return Poll::Ready(Err(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "request header read timed out", + ))); + } + } + } + + StreamKind::Bypass => {} + } + + Pin::new(&mut self.inner).poll_read(cx, buf) + } +} + +impl AsyncWrite for Stream { + fn poll_write( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write(cx, buf) + } + + fn poll_flush( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_flush(cx) + } + + fn poll_shutdown( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll> { + Pin::new(&mut self.inner).poll_shutdown(cx) + } + + fn poll_write_vectored( + mut self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> Poll> { + Pin::new(&mut self.inner).poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.inner.is_write_vectored() + } +} + +pub struct Service { + inner: S, + tx: Option>, +} + +impl Service { + pub(super) fn new(inner: S, tx: Option>) -> Self { + Self { inner, tx } + } +} + +impl hyper::service::Service for Service +where + S: hyper::service::Service>, +{ + type Response = hyper::Response>; + type Error = S::Error; + type Future = ServiceFuture; + + fn poll_ready(&mut self, cx: &mut std::task::Context<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: Request) -> Self::Future { + if let Some(tx) = self.tx.as_ref() { + // send timer wait signal + let _ = tx.send(State::Wait); + } + + ServiceFuture::new(self.inner.call(req), self.tx.clone()) + } +} + +#[pin_project] +pub struct ServiceFuture { + #[pin] + inner: F, + tx: Option>, +} + +impl ServiceFuture { + fn new(inner: F, tx: Option>) -> Self { + Self { inner, tx } + } +} + +impl Future for ServiceFuture +where + F: Future, Error>>, +{ + type Output = Result>, Error>; + + fn poll(self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll { + let this = self.project(); + + this.inner.poll(cx).map(|result| { + result.map(|response| response.map(|body| Body::new(body, this.tx.take()))) + }) + } +} + +#[pin_project] +pub struct Body { + #[pin] + inner: B, + tx: Option>, +} + +impl Body { + fn new(inner: B, tx: Option>) -> Self { + Self { inner, tx } + } +} + +impl hyper::body::HttpBody for Body +where + B: hyper::body::HttpBody, +{ + type Data = B::Data; + type Error = B::Error; + + fn poll_data( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll>> { + let this = self.project(); + + if let Some(tx) = this.tx.as_ref() { + let option = ready!(this.inner.poll_data(cx)); + + if option.is_none() { + let _ = tx.send(State::Reset); + } + + Poll::Ready(option) + } else { + this.inner.poll_data(cx) + } + } + + fn poll_trailers( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> Poll, Self::Error>> { + self.project().inner.poll_trailers(cx) + } + + fn is_end_stream(&self) -> bool { + if let Some(tx) = self.tx.as_ref() { + let is_end_stream = self.inner.is_end_stream(); + + if is_end_stream { + let _ = tx.send(State::Reset); + } + + is_end_stream + } else { + self.inner.is_end_stream() + } + } + + fn size_hint(&self) -> hyper::body::SizeHint { + self.inner.size_hint() + } +} diff --git a/crates/base/tests/integration_tests.rs b/crates/base/tests/integration_tests.rs index e64fa411..e24f82fb 100644 --- a/crates/base/tests/integration_tests.rs +++ b/crates/base/tests/integration_tests.rs @@ -1,18 +1,26 @@ #[path = "../src/utils/integration_test_helper.rs"] mod integration_test_helper; -use std::{borrow::Cow, collections::HashMap, path::Path, time::Duration}; +use std::{ + borrow::Cow, + collections::HashMap, + io::{self, Cursor}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::Path, + sync::Arc, + time::Duration, +}; use anyhow::Context; use async_tungstenite::WebSocketStream; use base::{ - integration_test, + integration_test, integration_test_listen_fut, rt_worker::worker_ctx::{create_user_worker_pool, create_worker, TerminationToken}, - server::{ServerEvent, Tls}, + server::{ServerEvent, ServerFlags, ServerHealth, Tls}, DecoratorType, }; use deno_core::serde_json; -use futures_util::{Future, SinkExt, StreamExt}; +use futures_util::{future::BoxFuture, Future, FutureExt, SinkExt, StreamExt}; use http::{Method, Request, StatusCode}; use http_utils::utils::get_upgrade_type; use hyper::{body::to_bytes, Body}; @@ -27,10 +35,16 @@ use sb_workers::context::{MainWorkerRuntimeOpts, WorkerContextInitOpts, WorkerRu use serde::Deserialize; use serial_test::serial; use tokio::{ + io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, join, + net::TcpStream, sync::{mpsc, oneshot}, time::{sleep, timeout}, }; +use tokio_rustls::{ + rustls::{pki_types::ServerName, ClientConfig, RootCertStore}, + TlsConnector, +}; use tokio_util::compat::TokioAsyncReadCompatExt; use tungstenite::Message; use urlencoding::encode; @@ -1310,12 +1324,11 @@ async fn test_decorators(ty: Option) { if is_disabled { assert_eq!(resp.status(), StatusCode::INTERNAL_SERVER_ERROR); - assert!( - resp.text().await.unwrap().starts_with( - "{\"msg\":\"InvalidWorkerCreation: worker boot error Uncaught SyntaxError: Invalid or unexpected token" - ), - - ); + assert!(resp + .text() + .await + .unwrap() + .starts_with("{\"msg\":\"InvalidWorkerCreation: worker boot error Uncaught SyntaxError: Invalid or unexpected token"),); } else { assert_eq!(resp.status(), StatusCode::OK); assert_eq!(resp.text().await.unwrap().as_str(), "meow?"); @@ -1415,10 +1428,224 @@ async fn oak_with_jsr_specifier() { ); } +async fn test_slowloris(request_read_timeout_ms: u64, maybe_tls: Option, test_fn: F) +where + F: (FnOnce(Box) -> R) + Send + 'static, + R: Future + Send, +{ + let token = TerminationToken::new(); + + let (health_tx, mut health_rx) = mpsc::channel(1); + let (tx, rx) = oneshot::channel(); + + let mut listen_fut = integration_test_listen_fut!( + NON_SECURE_PORT, + maybe_tls, + "./test_cases/main", + None, + None, + ServerFlags { + request_read_timeout_ms: Some(request_read_timeout_ms), + ..Default::default() + }, + health_tx, + Some(token.clone()) + ); + + let req_fut = { + let token = token.clone(); + async move { + assert!(test_fn(maybe_tls.stream().await).await); + + if timeout(Duration::from_secs(10), token.cancel_and_wait()) + .await + .is_err() + { + panic!("failed to terminate server within 10 seconds"); + } + + tx.send(()).unwrap(); + } + }; + + let join_fut = tokio::spawn(async move { + loop { + if let Some(ServerHealth::Listening(..)) = health_rx.recv().await { + break; + } + } + + req_fut.await; + }); + + tokio::select! { + _ = join_fut => {} + _ = &mut listen_fut => {} + }; + + if timeout(Duration::from_secs(10), rx).await.is_err() { + panic!("failed to check within 10 seconds"); + } +} + +async fn test_slowloris_no_prompt_timeout(maybe_tls: Option, invert: bool) { + test_slowloris( + if invert { u64::MAX } else { 5000 }, + maybe_tls, + move |mut io| async move { + static HEADER: &[u8] = b"GET /oak-with-jsr HTTP/1.1\r\nHost: localhost\r\n\r\n"; + + let check_io_kind_fn = move |err: std::io::Error| { + if invert { + return true; + } + + matches!( + err.kind(), + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof + ) + }; + + // > 5000ms + sleep(Duration::from_secs(10)).await; + + if let Err(err) = io.write_all(HEADER).await { + return check_io_kind_fn(err); + } + + if let Err(err) = io.flush().await { + return check_io_kind_fn(err); + } + + let mut buf = vec![0; 1_048_576]; + + match io.read(&mut buf).await { + Ok(nread) => { + if invert { + nread > 0 + } else { + nread == 0 + } + } + + Err(err) => check_io_kind_fn(err), + } + }, + ) + .await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_no_prompt_timeout_non_secure() { + test_slowloris_no_prompt_timeout(new_localhost_tls(false), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow"] +async fn test_slowloris_no_prompt_timeout_non_secure_inverted() { + test_slowloris_no_prompt_timeout(new_localhost_tls(false), true).await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_no_prompt_timeout_secure() { + test_slowloris_no_prompt_timeout(new_localhost_tls(true), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow"] +async fn test_slowloris_no_prompt_timeout_secure_inverted() { + test_slowloris_no_prompt_timeout(new_localhost_tls(true), true).await; +} + +async fn test_slowloris_slow_header_timedout(maybe_tls: Option, invert: bool) { + test_slowloris( + if invert { u64::MAX } else { 5000 }, + maybe_tls, + move |mut io| async move { + static HEADER: &[u8] = b"GET /oak-with-jsr HTTP/1.1\r\nHost: localhost\r\n\r\n"; + + let check_io_kind_fn = move |err: std::io::Error| { + if invert { + return true; + } + + matches!( + err.kind(), + io::ErrorKind::BrokenPipe | io::ErrorKind::UnexpectedEof + ) + }; + + // takes 1000ms per each character (ie. > 5000ms) + for &b in HEADER { + if let Err(err) = io.write(&[b]).await { + return check_io_kind_fn(err); + } + + if let Err(err) = io.flush().await { + return check_io_kind_fn(err); + } + + sleep(Duration::from_secs(1)).await; + } + + let mut buf = vec![0; 1_048_576]; + + match io.read(&mut buf).await { + Ok(nread) => { + if invert { + nread > 0 + } else { + nread == 0 + } + } + + Err(err) => check_io_kind_fn(err), + } + }, + ) + .await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_slow_header_timedout_non_secure() { + test_slowloris_slow_header_timedout(new_localhost_tls(false), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow 2x"] +async fn test_slowloris_slow_header_timedout_non_secure_inverted() { + test_slowloris_slow_header_timedout(new_localhost_tls(false), true).await; +} + +#[tokio::test] +#[serial] +async fn test_slowloris_slow_header_timedout_secure() { + test_slowloris_slow_header_timedout(new_localhost_tls(true), false).await; +} + +#[tokio::test] +#[serial] +#[ignore = "too slow 2x"] +async fn test_slowloris_slow_header_timedout_secure_inverted() { + test_slowloris_slow_header_timedout(new_localhost_tls(true), true).await; +} + +trait AsyncReadWrite: AsyncRead + AsyncWrite + Send + Unpin {} + +impl AsyncReadWrite for T where T: AsyncRead + AsyncWrite + Send + Unpin {} + trait TlsExt { fn client(&self) -> Client; fn schema(&self) -> &'static str; + fn sock_addr(&self) -> SocketAddr; fn port(&self) -> u16; + fn stream(&self) -> BoxFuture<'static, Box>; } impl TlsExt for Option { @@ -1441,6 +1668,20 @@ impl TlsExt for Option { } } + fn sock_addr(&self) -> SocketAddr { + const SOCK_ADDR_SECURE: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), SECURE_PORT); + + const SOCK_ADDR_NON_SECURE: SocketAddr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), NON_SECURE_PORT); + + if self.is_some() { + SOCK_ADDR_SECURE + } else { + SOCK_ADDR_NON_SECURE + } + } + fn port(&self) -> u16 { if self.is_some() { SECURE_PORT @@ -1448,6 +1689,40 @@ impl TlsExt for Option { NON_SECURE_PORT } } + + fn stream(&self) -> BoxFuture<'static, Box> { + let use_tls = self.is_some(); + let sock_addr = self.sock_addr(); + + async move { + if use_tls { + let mut cursor = Cursor::new(Vec::from(TLS_LOCALHOST_ROOT_CA)); + let certs = rustls_pemfile::certs(&mut cursor) + .collect::, _>>() + .unwrap(); + + let mut root_cert_store = RootCertStore::empty(); + let _ = root_cert_store.add_parsable_certificates(certs); + + let config = ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + + let connector = TlsConnector::from(Arc::new(config)); + let dnsname = ServerName::try_from("localhost").unwrap(); + + let stream = TcpStream::connect(sock_addr).await.unwrap(); + let stream = connector.connect(dnsname, stream).await.unwrap(); + + Box::new(stream) as Box + } else { + let stream = TcpStream::connect(sock_addr).await.unwrap(); + + Box::new(stream) as Box + } + } + .boxed() + } } fn new_localhost_tls(secure: bool) -> Option { diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index fffc676c..67d6cf6d 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -2,6 +2,7 @@ name = "cli" version = "0.1.0" edition = "2021" +description = "A server based on Deno runtime, capable of running JavaScript, TypeScript, and WASM services" [[bin]] name = "edge-runtime" diff --git a/crates/cli/src/flags.rs b/crates/cli/src/flags.rs new file mode 100644 index 00000000..298bad2b --- /dev/null +++ b/crates/cli/src/flags.rs @@ -0,0 +1,225 @@ +use std::{net::SocketAddr, path::PathBuf}; + +use clap::{ + arg, + builder::{BoolishValueParser, FalseyValueParser, TypedValueParser}, + crate_version, value_parser, ArgAction, ArgGroup, Command, +}; + +pub(super) fn get_cli() -> Command { + Command::new(env!("CARGO_BIN_NAME")) + .about(env!("CARGO_PKG_DESCRIPTION")) + .version(format!( + "{}\ndeno {} ({}, {})", + crate_version!(), + env!("DENO_VERSION"), + env!("PROFILE"), + env!("TARGET") + )) + .arg_required_else_help(true) + .arg( + arg!(-v --verbose "Use verbose output") + .conflicts_with("quiet") + .global(true) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(-q --quiet "Do not print any log messages") + .conflicts_with("verbose") + .global(true) + .action(ArgAction::SetTrue), + ) + .arg( + arg!(--"log-source") + .help("Include source file and line in log messages") + .global(true) + .action(ArgAction::SetTrue), + ) + .subcommand(get_start_command()) + .subcommand(get_bundle_command()) + .subcommand(get_unbundle_command()) +} + +fn get_start_command() -> Command { + Command::new("start") + .about("Start the server") + .arg(arg!(-i --ip ).help("Host IP address to listen on").default_value("0.0.0.0")) + .arg( + arg!(-p --port ) + .help("Port to listen on") + .env("EDGE_RUNTIME_PORT") + .default_value("9000") + .value_parser(value_parser!(u16)), + ) + .arg( + arg!(--tls [PORT]) + .env("EDGE_RUNTIME_TLS") + .num_args(0..=1) + .default_missing_value("443") + .value_parser(value_parser!(u16)) + .requires("key") + .requires("cert"), + ) + .arg( + arg!(--key ) + .help("Path to PEM-encoded key to be used to TLS") + .env("EDGE_RUNTIME_TLS_KEY_PATH") + .value_parser(value_parser!(PathBuf)), + ) + .arg( + arg!(--cert ) + .help("Path to PEM-encoded X.509 certificate to be used to TLS") + .env("EDGE_RUNTIME_TLS_CERT_PATH") + .value_parser(value_parser!(PathBuf)), + ) + .arg( + arg!(--"main-service" ) + .help("Path to main service directory or eszip") + .default_value("examples/main"), + ) + .arg( + arg!(--"disable-module-cache") + .help("Disable using module cache") + .default_value("false") + .value_parser(FalseyValueParser::new()), + ) + .arg(arg!(--"import-map" ).help("Path to import map file")) + .arg(arg!(--"event-worker" ).help("Path to event worker directory")) + .arg(arg!(--"main-entrypoint" ).help("Path to entrypoint in main service (only for eszips)")) + .arg(arg!(--"events-entrypoint" ).help("Path to entrypoint in events worker (only for eszips)")) + .arg( + arg!(--"policy" ) + .help("Policy to enforce in the worker pool") + .default_value("per_worker") + .value_parser(["per_worker", "per_request", "oneshot"]), + ) + .arg( + arg!(--"decorator" ) + .help(concat!( + "Type of decorator to use on the main worker and event worker. ", + "If not specified, the decorator feature is disabled." + )) + .value_parser(["tc39", "typescript", "typescript_with_metadata"]), + ) + .arg( + arg!(--"graceful-exit-timeout" [SECONDS]) + .help(concat!( + "Maximum time in seconds that can wait for workers before terminating forcibly. ", + "If providing zero value, the runtime will not try a graceful exit." + )) + // NOTE(Nyannyacha): Default timeout value follows the + // value[1] defined in moby. + // + // [1]: https://github.com/moby/moby/blob/master/daemon/config/config.go#L45-L47 + .default_value("15") + .value_parser(value_parser!(u64).range(..u64::MAX)), + ) + .arg( + arg!( + --"experimental-graceful-exit-keepalive-deadline-ratio" + + ) + .help(concat!( + "(Experimental) Maximum period of time that incoming requests can be processed over a", + " pre-established keep-alive HTTP connection. ", + "This is specified as a percentage of the `--graceful-exit-timeout` value. ", + "The percentage cannot be greater than 95." + )) + .value_parser(value_parser!(u64).range(..=95)), + ) + .arg( + arg!(--"max-parallelism" ) + .help("Maximum count of workers that can exist in the worker pool simultaneously") + .value_parser( + // NOTE: Acceptable bounds were chosen arbitrarily. + value_parser!(u32).range(1..9999).map(|it| -> usize { it as usize }), + ), + ) + .arg( + arg!(--"request-wait-timeout" ) + .help("Maximum time in milliseconds that can wait to establish a connection with a worker") + .value_parser(value_parser!(u64)), + ) + .arg( + arg!(--"request-read-timeout" ) + .help("Maximum time in milliseconds that can be waited from when the connection is accepted until the request body is fully read") + .value_parser(value_parser!(u64)), + ) + .arg( + arg!(--"inspect" [HOST_AND_PORT]) + .help("Activate inspector on host:port") + .num_args(0..=1) + .value_parser(value_parser!(SocketAddr)) + .require_equals(true) + .default_missing_value("127.0.0.1:9229"), + ) + .arg( + arg!(--"inspect-brk" [HOST_AND_PORT]) + .help("Activate inspector on host:port, wait for debugger to connect and break at the start of user script") + .num_args(0..=1) + .value_parser(value_parser!(SocketAddr)) + .require_equals(true) + .default_missing_value("127.0.0.1:9229"), + ) + .arg( + arg!(--"inspect-wait" [HOST_AND_PORT]) + .help("Activate inspector on host:port and wait for debugger to connect before running user code") + .num_args(0..=1) + .value_parser(value_parser!(SocketAddr)) + .require_equals(true) + .default_missing_value("127.0.0.1:9229"), + ) + .group(ArgGroup::new("inspector").args(["inspect", "inspect-brk", "inspect-wait"])) + .arg( + arg!(--"inspect-main") + .help("Allow creating inspector for main worker") + .requires("inspector") + .action(ArgAction::SetTrue), + ) + .arg(arg!(--"static" ).help("Glob pattern for static files to be included")) + .arg( + arg!(--"tcp-nodelay" [BOOL]) + .help("Disables Nagle's algorithm") + .num_args(0..=1) + .value_parser(BoolishValueParser::new()) + .require_equals(true) + .default_value("true") + .default_missing_value("true"), + ) +} + +fn get_bundle_command() -> Command { + Command::new("bundle") + .about(concat!( + "Creates an 'eszip' file that can be executed by the EdgeRuntime. ", + "Such file contains all the modules in contained in a single binary." + )) + .arg(arg!(--"output" ).help("Path to output eszip file").default_value("bin.eszip")) + .arg( + arg!(--"entrypoint" ) + .help("Path to entrypoint to bundle as an eszip") + .required(true), + ) + .arg(arg!(--"static" ).help("Glob pattern for static files to be included")) + .arg(arg!(--"import-map" ).help("Path to import map file")) + .arg( + arg!(--"decorator" ) + .help("Type of decorator to use when bundling. If not specified, the decorator feature is disabled.") + .value_parser(["tc39", "typescript", "typescript_with_metadata"]), + ) +} + +fn get_unbundle_command() -> Command { + Command::new("unbundle") + .about("Unbundles an .eszip file into the specified directory") + .arg( + arg!(--"output" ) + .help("Path to extract the ESZIP content") + .default_value("./"), + ) + .arg( + arg!(--"eszip" ) + .help("Path of eszip to extract") + .required(true), + ) +} diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 26779826..e4468757 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -1,3 +1,4 @@ +mod flags; mod logger; use anyhow::{anyhow, bail, Error}; @@ -6,9 +7,9 @@ use base::deno_runtime::MAYBE_DENO_VERSION; use base::rt_worker::worker_pool::{SupervisorPolicy, WorkerPoolPolicy}; use base::server::{ServerFlags, Tls, WorkerEntrypoints}; use base::{DecoratorType, InspectorOption}; -use clap::builder::{BoolishValueParser, FalseyValueParser, TypedValueParser}; -use clap::{arg, crate_version, value_parser, ArgAction, ArgGroup, ArgMatches, Command}; +use clap::ArgMatches; use deno_core::url::Url; +use flags::get_cli; use log::warn; use sb_graph::emitter::EmitterFactory; use sb_graph::import_map::load_import_map; @@ -21,185 +22,6 @@ use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; -fn cli() -> Command { - Command::new("edge-runtime") - .about("A server based on Deno runtime, capable of running JavaScript, TypeScript, and WASM services") - .version(format!( - "{}\ndeno {} ({}, {})", - crate_version!(), - env!("DENO_VERSION"), - env!("PROFILE"), - env!("TARGET") - )) - .arg_required_else_help(true) - .arg( - arg!(-v --verbose "Use verbose output") - .conflicts_with("quiet") - .global(true) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(-q --quiet "Do not print any log messages") - .conflicts_with("verbose") - .global(true) - .action(ArgAction::SetTrue), - ) - .arg( - arg!(--"log-source" "Include source file and line in log messages") - .global(true) - .action(ArgAction::SetTrue), - ) - .subcommand( - Command::new("start") - .about("Start the server") - .arg(arg!(-i --ip "Host IP address to listen on").default_value("0.0.0.0")) - .arg( - arg!(-p --port "Port to listen on") - .env("EDGE_RUNTIME_PORT") - .default_value("9000") - .value_parser(value_parser!(u16)) - ) - .arg( - arg!(--tls [PORT]) - .env("EDGE_RUNTIME_TLS") - .num_args(0..=1) - .default_missing_value("443") - .value_parser(value_parser!(u16)) - .requires("key") - .requires("cert") - ) - .arg( - arg!(--key "Path to PEM-encoded key to be used to TLS") - .env("EDGE_RUNTIME_TLS_KEY_PATH") - .value_parser(value_parser!(PathBuf)) - ) - .arg( - arg!(--cert "Path to PEM-encoded X.509 certificate to be used to TLS") - .env("EDGE_RUNTIME_TLS_CERT_PATH") - .value_parser(value_parser!(PathBuf)) - ) - .arg(arg!(--"main-service" "Path to main service directory or eszip").default_value("examples/main")) - .arg(arg!(--"disable-module-cache" "Disable using module cache").default_value("false").value_parser(FalseyValueParser::new())) - .arg(arg!(--"import-map" "Path to import map file")) - .arg(arg!(--"event-worker" "Path to event worker directory")) - .arg(arg!(--"main-entrypoint" "Path to entrypoint in main service (only for eszips)")) - .arg(arg!(--"events-entrypoint" "Path to entrypoint in events worker (only for eszips)")) - .arg( - arg!(--"policy" "Policy to enforce in the worker pool") - .default_value("per_worker") - .value_parser(["per_worker", "per_request", "oneshot"]) - ) - .arg( - arg!(--"decorator" "Type of decorator to use on the main worker and event worker. If not specified, the decorator feature is disabled.") - .value_parser(["tc39", "typescript", "typescript_with_metadata"]) - ) - .arg( - arg!(--"graceful-exit-timeout" [SECONDS]) - .help( - concat!( - "Maximum time in seconds that can wait for workers before terminating forcibly. ", - "If providing zero value, the runtime will not try a graceful exit." - ) - ) - // NOTE(Nyannyacha): Default timeout value follows the - // value[1] defined in moby. - // - // [1]: https://github.com/moby/moby/blob/master/daemon/config/config.go#L45-L47 - .default_value("15") - .value_parser( - value_parser!(u64) - .range(..u64::MAX) - ) - ) - .arg( - arg!( - --"experimental-graceful-exit-keepalive-deadline-ratio" - - ) - .help( - concat!( - "(Experimental) Maximum period of time that incoming requests can be processed over a pre-established ", - "keep-alive HTTP connection. ", - "This is specified as a percentage of the `--graceful-exit-timeout` value. ", - "The percentage cannot be greater than 95." - ) - ) - .value_parser( - value_parser!(u64) - .range(..=95) - ) - ) - .arg( - arg!(--"max-parallelism" "Maximum count of workers that can exist in the worker pool simultaneously") - .value_parser( - // NOTE: Acceptable bounds were chosen arbitrarily. - value_parser!(u32) - .range(1..9999) - .map(|it| -> usize { it as usize }) - ) - ) - .arg( - arg!(--"request-wait-timeout" "Maximum time in milliseconds that can wait to establish a connection with a worker") - .value_parser(value_parser!(u64)) - ) - .arg( - arg!(--"inspect" [HOST_AND_PORT] "Activate inspector on host:port (default: 127.0.0.1:9229)") - .num_args(0..=1) - .value_parser(value_parser!(SocketAddr)) - .require_equals(true) - .default_missing_value("127.0.0.1:9229") - ) - .arg( - arg!(--"inspect-brk" [HOST_AND_PORT] "Activate inspector on host:port, wait for debugger to connect and break at the start of user script") - .num_args(0..=1) - .value_parser(value_parser!(SocketAddr)) - .require_equals(true) - .default_missing_value("127.0.0.1:9229") - ) - .arg( - arg!(--"inspect-wait" [HOST_AND_PORT] "Activate inspector on host:port and wait for debugger to connect before running user code") - .num_args(0..=1) - .value_parser(value_parser!(SocketAddr)) - .require_equals(true) - .default_missing_value("127.0.0.1:9229") - ) - .group( - ArgGroup::new("inspector") - .args(["inspect", "inspect-brk", "inspect-wait"]) - ) - .arg( - arg!(--"inspect-main" "Allow creating inspector for main worker") - .requires("inspector") - .action(ArgAction::SetTrue) - ) - .arg(arg!(--"static" "Glob pattern for static files to be included")) - .arg(arg!(--"tcp-nodelay" [BOOL] "Disables Nagle's algorithm") - .num_args(0..=1) - .value_parser(BoolishValueParser::new()) - .require_equals(true) - .default_value("true") - .default_missing_value("true") - ) - ) - .subcommand( - Command::new("bundle") - .about("Creates an 'eszip' file that can be executed by the EdgeRuntime. Such file contains all the modules in contained in a single binary.") - .arg(arg!(--"output" "Path to output eszip file").default_value("bin.eszip")) - .arg(arg!(--"entrypoint" "Path to entrypoint to bundle as an eszip").required(true)) - .arg(arg!(--"static" "Glob pattern for static files to be included")) - .arg(arg!(--"import-map" "Path to import map file")) - .arg( - arg!(--"decorator" "Type of decorator to use when bundling. If not specified, the decorator feature is disabled.") - .value_parser(["tc39", "typescript", "typescript_with_metadata"]) - ) - ).subcommand( - Command::new("unbundle") - .about("Unbundles an .eszip file into the specified directory") - .arg(arg!(--"output" "Path to extract the ESZIP content").default_value("./")) - .arg(arg!(--"eszip" "Path of eszip to extract").required(true)) - ) -} - fn main() -> Result<(), anyhow::Error> { MAYBE_DENO_VERSION.get_or_init(|| env!("DENO_VERSION").to_string()); @@ -212,7 +34,7 @@ fn main() -> Result<(), anyhow::Error> { // TODO: Tokio runtime shouldn't be needed here (Address later) let local = tokio::task::LocalSet::new(); let res: Result<(), Error> = local.block_on(&runtime, async { - let matches = cli().get_matches(); + let matches = get_cli().get_matches(); if !matches.get_flag("quiet") { let verbose = matches.get_flag("verbose"); @@ -300,6 +122,8 @@ fn main() -> Result<(), anyhow::Error> { sub_matches.get_one::("max-parallelism").cloned(); let maybe_request_wait_timeout = sub_matches.get_one::("request-wait-timeout").cloned(); + let maybe_request_read_timeout = + sub_matches.get_one::("request-read-timeout").cloned(); let static_patterns = if let Some(val_ref) = sub_matches.get_many::("static") { val_ref.map(|s| s.as_str()).collect::>() @@ -372,6 +196,7 @@ fn main() -> Result<(), anyhow::Error> { tcp_nodelay, graceful_exit_deadline_sec, graceful_exit_keepalive_deadline_ms, + request_read_timeout_ms: maybe_request_read_timeout, }, None, WorkerEntrypoints { diff --git a/crates/cpu_timer/src/lib.rs b/crates/cpu_timer/src/lib.rs index f68c08bf..9d3e40d5 100644 --- a/crates/cpu_timer/src/lib.rs +++ b/crates/cpu_timer/src/lib.rs @@ -212,14 +212,9 @@ fn register_sigalrm() { std::thread::Builder::new() .name("sb-cpu-timer".into()) .spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - + let rt = tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap(); let sig_receiver_handle = rt.spawn(async move { - let mut signals = - SignalsInfo::with_exfiltrator([signal::SIGALRM], raw::WithRawSiginfo).unwrap(); + let mut signals = SignalsInfo::with_exfiltrator([signal::SIGALRM], raw::WithRawSiginfo).unwrap(); while let Some(siginfo) = signals.next().await { let _ = sig_timer_id_tx.send(unsafe { siginfo.si_value().sival_ptr as usize }); diff --git a/crates/node/errors.rs b/crates/node/errors.rs index 16b6b59b..352b5e3c 100644 --- a/crates/node/errors.rs +++ b/crates/node/errors.rs @@ -147,13 +147,13 @@ pub fn err_package_import_not_defined( } pub fn err_unsupported_dir_import(path: &str, base: &str) -> AnyError { - generic_error(format!("[ERR_UNSUPPORTED_DIR_IMPORT] Directory import '{path}' is not supported resolving ES modules imported from {base}")) + generic_error(format!( + "[ERR_UNSUPPORTED_DIR_IMPORT] Directory import '{path}' is not supported resolving ES modules imported from {base}" + )) } pub fn err_unsupported_esm_url_scheme(url: &Url) -> AnyError { - let mut msg = - "[ERR_UNSUPPORTED_ESM_URL_SCHEME] Only file and data URLS are supported by the default ESM loader" - .to_string(); + let mut msg = "[ERR_UNSUPPORTED_ESM_URL_SCHEME] Only file and data URLS are supported by the default ESM loader".to_string(); if cfg!(window) && url.scheme().len() == 2 { msg = format!("{msg}. On Windows, absolute path must be valid file:// URLs"); @@ -181,14 +181,8 @@ mod test { format!("[ERR_PACKAGE_PATH_NOT_EXPORTED] Package subpath './jsx-runtime' is not defined for types by \"exports\" in 'test_path{separator_char}package.json'") ); assert_eq!( - err_package_path_not_exported( - "test_path".to_string(), - ".", - None, - NodeResolutionMode::Types, - ) - .to_string(), - format!("[ERR_PACKAGE_PATH_NOT_EXPORTED] No \"exports\" main defined for types in 'test_path{separator_char}package.json'") - ); + err_package_path_not_exported("test_path".to_string(), ".", None, NodeResolutionMode::Types,).to_string(), + format!("[ERR_PACKAGE_PATH_NOT_EXPORTED] No \"exports\" main defined for types in 'test_path{separator_char}package.json'") + ); } } diff --git a/crates/npm/managed/mod.rs b/crates/npm/managed/mod.rs index b6bb9699..540d3645 100644 --- a/crates/npm/managed/mod.rs +++ b/crates/npm/managed/mod.rs @@ -378,7 +378,10 @@ impl ManagedCliNpmResolver { Ok(nv) => NpmPackageReqResolution::Ok(nv), Err(err) => { if self.api.mark_force_reload() { - log::debug!("Restarting npm specifier resolution to check for new registry information. Error: {:#}", err); + log::debug!( + "Restarting npm specifier resolution to check for new registry information. Error: {:#}", + err + ); NpmPackageReqResolution::ReloadRegistryInfo(err.into()) } else { NpmPackageReqResolution::Err(err.into()) diff --git a/crates/npm/managed/tarball.rs b/crates/npm/managed/tarball.rs index 585d34c5..bfc94684 100644 --- a/crates/npm/managed/tarball.rs +++ b/crates/npm/managed/tarball.rs @@ -75,11 +75,11 @@ fn verify_tarball_integrity( if tarball_checksum != *expected_checksum { bail!( - "Tarball checksum did not match what was provided by npm registry for {}.\n\nExpected: {}\nActual: {}", - package, - expected_checksum, - tarball_checksum, - ) + "Tarball checksum did not match what was provided by npm registry for {}.\n\nExpected: {}\nActual: {}", + package, + expected_checksum, + tarball_checksum, + ) } Ok(()) } @@ -159,8 +159,7 @@ mod test { name: "package".to_string(), version: Version::parse_from_npm("1.0.0").unwrap(), }; - let actual_checksum = - "z4PhNX7vuL3xVChQ1m2AB9Yg5AULVxXcg/SpIdNs6c5H0NE8XYXysP+DGNKHfuwvY7kxvUdBeoGlODJ6+SfaPg=="; + let actual_checksum = "z4PhNX7vuL3xVChQ1m2AB9Yg5AULVxXcg/SpIdNs6c5H0NE8XYXysP+DGNKHfuwvY7kxvUdBeoGlODJ6+SfaPg=="; assert_eq!( verify_tarball_integrity( &package, @@ -196,23 +195,25 @@ mod test { .unwrap_err() .to_string(), concat!( - "Tarball checksum did not match what was provided by npm ", - "registry for package@1.0.0.\n\nExpected: test\nActual: 2jmj7l5rSw0yVb/vlWAYkK/YBwk=", - ), + "Tarball checksum did not match what was provided by npm ", + "registry for package@1.0.0.\n\nExpected: test\nActual: 2jmj7l5rSw0yVb/vlWAYkK/YBwk=", + ), ); assert_eq!( - verify_tarball_integrity( - &package, - &Vec::new(), - &NpmPackageVersionDistInfoIntegrity::Integrity { - algorithm: "sha512", - base64_hash: "test" - } - ) - .unwrap_err() - .to_string(), - format!("Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_checksum}"), - ); + verify_tarball_integrity( + &package, + &Vec::new(), + &NpmPackageVersionDistInfoIntegrity::Integrity { + algorithm: "sha512", + base64_hash: "test" + } + ) + .unwrap_err() + .to_string(), + format!( + "Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_checksum}" + ), + ); assert!(verify_tarball_integrity( &package, &Vec::new(), @@ -224,15 +225,11 @@ mod test { .is_ok()); let actual_hex = "da39a3ee5e6b4b0d3255bfef95601890afd80709"; assert_eq!( - verify_tarball_integrity( - &package, - &Vec::new(), - &NpmPackageVersionDistInfoIntegrity::LegacySha1Hex("test"), - ) - .unwrap_err() - .to_string(), - format!("Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_hex}"), - ); + verify_tarball_integrity(&package, &Vec::new(), &NpmPackageVersionDistInfoIntegrity::LegacySha1Hex("test"),) + .unwrap_err() + .to_string(), + format!("Tarball checksum did not match what was provided by npm registry for package@1.0.0.\n\nExpected: test\nActual: {actual_hex}"), + ); assert!(verify_tarball_integrity( &package, &Vec::new(), diff --git a/crates/sb_core/cache/fetch_cacher.rs b/crates/sb_core/cache/fetch_cacher.rs index e660691b..9a21768b 100644 --- a/crates/sb_core/cache/fetch_cacher.rs +++ b/crates/sb_core/cache/fetch_cacher.rs @@ -178,8 +178,8 @@ impl Loader for FetchCacher { LoaderCacheSetting::Reload => { if matches!(file_fetcher.cache_setting(), CacheSetting::Only) { return Err(deno_core::anyhow::anyhow!( - "Failed to resolve version constraint. Try running again without --cached-only" - )); + "Failed to resolve version constraint. Try running again without --cached-only" + )); } Some(CacheSetting::ReloadAll) } @@ -194,15 +194,12 @@ impl Loader for FetchCacher { }) .await .map(|file| { - let maybe_headers = - match (file.maybe_headers, file_header_overrides.get(&specifier)) { - (Some(headers), Some(overrides)) => { - Some(headers.into_iter().chain(overrides.clone()).collect()) - } - (Some(headers), None) => Some(headers), - (None, Some(overrides)) => Some(overrides.clone()), - (None, None) => None, - }; + let maybe_headers = match (file.maybe_headers, file_header_overrides.get(&specifier)) { + (Some(headers), Some(overrides)) => Some(headers.into_iter().chain(overrides.clone()).collect()), + (Some(headers), None) => Some(headers), + (None, Some(overrides)) => Some(overrides.clone()), + (None, None) => None, + }; Ok(Some(LoadResponse::Module { specifier: file.specifier, maybe_headers, diff --git a/crates/sb_fs/virtual_fs.rs b/crates/sb_fs/virtual_fs.rs index de14b469..a533b308 100644 --- a/crates/sb_fs/virtual_fs.rs +++ b/crates/sb_fs/virtual_fs.rs @@ -96,22 +96,22 @@ impl VfsBuilder { if target.is_file() { // this may change behavior, so warn the user about it log::warn!( - "Symlink target is outside '{}'. Inlining symlink at '{}' to '{}' as file.", - self.root_path.display(), - path.display(), - target.display(), - ); + "Symlink target is outside '{}'. Inlining symlink at '{}' to '{}' as file.", + self.root_path.display(), + path.display(), + target.display(), + ); // inline the symlink and make the target file let file_bytes = std::fs::read(&target) .with_context(|| format!("Reading {}", path.display()))?; self.add_file(&path, file_bytes)?; } else { log::warn!( - "Symlink target is outside '{}'. Excluding symlink at '{}' with target '{}'.", - self.root_path.display(), - path.display(), - target.display(), - ); + "Symlink target is outside '{}'. Excluding symlink at '{}' with target '{}'.", + self.root_path.display(), + path.display(), + target.display(), + ); } } } @@ -527,7 +527,11 @@ impl FileBackedVfsFile { if offset >= 0 { *current_pos += offset as u64; } else if -offset as u64 > *current_pos { - return Err(std::io::Error::new(std::io::ErrorKind::PermissionDenied, "An attempt was made to move the file pointer before the beginning of the file.").into()); + return Err(std::io::Error::new( + std::io::ErrorKind::PermissionDenied, + "An attempt was made to move the file pointer before the beginning of the file.", + ) + .into()); } else { *current_pos -= -offset as u64; } diff --git a/crates/sb_graph/graph_resolver.rs b/crates/sb_graph/graph_resolver.rs index 330ebe21..43f47e62 100644 --- a/crates/sb_graph/graph_resolver.rs +++ b/crates/sb_graph/graph_resolver.rs @@ -49,9 +49,10 @@ fn resolve_package_json_dep( if specifier.starts_with(bare_specifier) { let path = &specifier[bare_specifier.len()..]; if path.is_empty() || path.starts_with('/') { - let req = req_result.as_ref().map_err(|_err| { - anyhow!("Parsing version constraints in the application-level package.json is more strict at the moment.") - })?; + let req = req_result + .as_ref() + .map_err(|_err| anyhow!("Parsing version constraints in the application-level package.json is more strict at the moment."))?; + return Ok(Some(ModuleSpecifier::parse(&format!("npm:{req}{path}"))?)); } } @@ -234,7 +235,9 @@ impl Resolver for CliGraphResolver { if let Some(vendor_specifier) = &self.maybe_vendor_specifier { if let Ok(specifier) = &result { if specifier.as_str().starts_with(vendor_specifier.as_str()) { - return Err(ResolveError::Other(anyhow!("Importing from the vendor directory is not permitted. Use a remote specifier instead or disable vendoring."))); + return Err(ResolveError::Other(anyhow!( + "Importing from the vendor directory is not permitted. Use a remote specifier instead or disable vendoring." + ))); } } }