Skip to content

Commit

Permalink
feat: support read timeout for inbound HTTP requests (#337)
Browse files Browse the repository at this point in the history
* feat: initial implementation for request read timeout

* style: do format rust files correctly

* chore(cli): fill `description` field

* stamp: move cli flags into a separated module

* stamp: update `Cargo.lock`

* stamp: polishing

* stamp: add a flag for read timeout for request

* stamp: niche optimization

* stamp: apply timeout passed from cli

* stamp: update integration test macro

* stamp: rustfmt

* stamp: update integration test macro (2)

* stamp: downcast the downstream correctly

* stamp: add tests for slowloris attack

* stamp: makes clippy happy

* stamp: polishing
  • Loading branch information
nyannyacha committed May 13, 2024
1 parent ac5e2d6 commit b104b1e
Show file tree
Hide file tree
Showing 19 changed files with 967 additions and 305 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ deno_webidl = { version = "0.136.0" }
deno_web = { version = "0.167.0" }
deno_websocket = { version = "0.141.0" }
deno_webstorage = { version = "0.131.0" }
deno_lockfile = { version = "0.18.0" }
enum-as-inner = "0.6.0"
serde = { version = "1.0.149", features = ["derive"] }
hyper = "0.14.26"
tokio = { version = "1.35", features = ["full"] }
bytes = { version = "1.4.0" }
once_cell = "1.17.1"
thiserror = "1.0.40"
deno_lockfile = "0.18.0"
async-trait = "0.1.73"
indexmap = { version = "2.0.0", features = ["serde"] }
flate2 = "=1.0.26"
Expand All @@ -78,6 +78,7 @@ glob = "0.3.1"
httparse = "1.8"
http = "0.2"
faster-hex = "0.9.0"

# DEBUG
#[patch.crates-io]
#deno_core = { path = "/your/path/to/deno_core/core" }
Expand Down
4 changes: 2 additions & 2 deletions crates/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ tokio-rustls = { version = "0.25.0" }
rustls-pemfile = { version = "2.1.0" }
futures-util = { workspace = true }
url.workspace = true
event_worker ={ version = "0.1.0", path = "../event_worker" }
event_worker = { version = "0.1.0", path = "../event_worker" }
sb_workers = { version = "0.1.0", path = "../sb_workers" }
sb_env = { version = "0.1.0", path = "../sb_env" }
sb_core = { version = "0.1.0", path = "../sb_core" }
Expand Down Expand Up @@ -109,7 +109,7 @@ reqwest.workspace = true
serde = { workspace = true, features = ["derive"] }
tokio.workspace = true
url.workspace = true
event_worker ={ version = "0.1.0", path = "../event_worker" }
event_worker = { version = "0.1.0", path = "../event_worker" }
deno_broadcast_channel.workspace = true
deno_core.workspace = true
deno_canvas.workspace = true
Expand Down
1 change: 1 addition & 0 deletions crates/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub mod snapshot;
pub mod utils;

mod inspector_server;
mod timeout;

pub use inspector_server::InspectorOption;
pub use sb_graph::DecoratorType;
82 changes: 60 additions & 22 deletions crates/base/src/macros/test_macros.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,36 @@
#[macro_export]
macro_rules! integration_test {
($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
macro_rules! integration_test_listen_fut {
($port:expr, $tls:expr, $main_file:expr, $policy:expr, $import_map:expr, $flag:expr, $tx:expr, $token:expr) => {{
use futures_util::FutureExt;

let tls: Option<base::server::Tls> = $tls.clone();

base::commands::start_server(
"0.0.0.0",
$port,
tls,
String::from($main_file),
None,
None,
$policy,
$import_map,
$flag,
Some($tx.clone()),
$crate::server::WorkerEntrypoints {
main: None,
events: None,
},
$token.clone(),
vec![],
None,
)
.boxed()
}};
}

#[macro_export]
macro_rules! integration_test_with_server_flag {
($flag:expr, $main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
use futures_util::FutureExt;
use $crate::macros::test_macros::__private;

Expand All @@ -11,39 +41,29 @@ macro_rules! integration_test {
let schema = if tls.is_some() { "https" } else { "http" };
let signal = tokio::spawn(async move {
while let Some(base::server::ServerHealth::Listening(event_rx, metric_src)) = rx.recv().await {
integration_test!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+));
$crate::integration_test_with_server_flag!(@req event_rx, metric_src, schema, $port, $url, req_builder, ($($function)+));
}
None
});

let token = integration_test!(@term $(, $($token)+)?);
let mut listen_fut = base::commands::start_server(
"0.0.0.0",
let token = $crate::integration_test_with_server_flag!(@term $(, $($token)+)?);
let mut listen_fut = $crate::integration_test_listen_fut!(
$port,
tls,
String::from($main_file),
None,
None,
$main_file,
$policy,
$import_map,
$crate::server::ServerFlags::default(),
Some(tx.clone()),
$crate::server::WorkerEntrypoints {
main: None,
events: None,
},
token.clone(),
vec![],
None
)
.boxed();
$flag,
tx,
token
);

tokio::select! {
resp = signal => {
if let Ok(maybe_response_from_server) = resp {
// then, after checking the response... (2)
let resp = maybe_response_from_server.unwrap();
integration_test!(@resp resp, ($($function)+)).await;
$crate::integration_test_with_server_flag!(@resp resp, ($($function)+)).await;
} else {
panic!("Request thread had a heart attack");
}
Expand All @@ -62,7 +82,7 @@ macro_rules! integration_test {
let _ = listen_fut.await;
});

integration_test!(@term_cleanup $($($token)+)?, token, join_fut);
$crate::integration_test_with_server_flag!(@term_cleanup $($($token)+)?, token, join_fut);
}
};

Expand Down Expand Up @@ -136,6 +156,24 @@ macro_rules! integration_test {
};
}

#[macro_export]
macro_rules! integration_test {
($main_file:expr, $port:expr, $url:expr, $policy:expr, $import_map:expr, $req_builder:expr, $tls:expr, ($($function:tt)+) $(, $($token:tt)+)?) => {
$crate::integration_test_with_server_flag!(
$crate::server::ServerFlags::default(),
$main_file,
$port,
$url,
$policy,
$import_map,
$req_builder,
$tls,
($($function)+)
$(,$($token)+)?
)
};
}

#[doc(hidden)]
pub mod __private {
use std::future::Future;
Expand Down
10 changes: 4 additions & 6 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::deno_runtime::DenoRuntime;
use crate::inspector_server::Inspector;
use crate::timeout;
use crate::utils::send_event_if_event_worker_available;
use crate::utils::units::bytes_to_display;

Expand Down Expand Up @@ -184,7 +185,7 @@ async fn relay_upgraded_request_and_response(
match copy_bidirectional(&mut upstream, &mut downstream).await {
Ok(_) => {}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
let Ok(_) = downstream.downcast::<TlsStream<TcpStream>>() else {
let Ok(_) = downstream.downcast::<timeout::Stream<TlsStream<TcpStream>>>() else {
// TODO(Nyannyacha): It would be better if we send
// `close_notify` before shutdown an upstream if downstream is a
// TLS stream.
Expand Down Expand Up @@ -253,7 +254,7 @@ pub fn create_supervisor(
debug!("Hard memory limit triggered");

if memory_limit_tx.send(()).is_err() {
error!("failed to send memory limit reached notification - isolate may already be terminating");
error!("failed to send memory limit reached notification - isolate may already be terminating");
}

true
Expand All @@ -263,10 +264,7 @@ pub fn create_supervisor(
worker_runtime.js_runtime.add_near_heap_limit_callback({
let memory_limit_tx = memory_limit_tx.clone();
move |cur, _| {
debug!(
"Low memory alert triggered: {}",
bytes_to_display(cur as u64),
);
debug!("Low memory alert triggered: {}", bytes_to_display(cur as u64),);

if memory_limit_tx.send(()).is_err() {
error!("failed to send memory limit reached notification - isolate may already be terminating");
Expand Down
31 changes: 28 additions & 3 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ pub struct ServerFlags {
pub tcp_nodelay: bool,
pub graceful_exit_deadline_sec: u64,
pub graceful_exit_keepalive_deadline_ms: Option<u64>,
pub request_read_timeout_ms: Option<u64>,
}

#[derive(Debug)]
Expand Down Expand Up @@ -467,11 +468,13 @@ impl Server {

let ServerFlags {
tcp_nodelay,
request_read_timeout_ms,
mut graceful_exit_deadline_sec,
mut graceful_exit_keepalive_deadline_ms,
..
} = flags;

let request_read_timeout_dur = request_read_timeout_ms.map(Duration::from_millis);
let mut terminate_signal_fut = get_termination_signal();

loop {
Expand All @@ -487,7 +490,14 @@ impl Server {
let _ = stream.set_nodelay(true);
}

accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone())
accept_stream(
stream,
main_worker_req_tx,
event_tx,
metric_src,
graceful_exit_token.clone(),
request_read_timeout_dur
)
}
Err(e) => error!("socket error: {}", e)
}
Expand All @@ -507,7 +517,14 @@ impl Server {
let _ = stream.get_ref().0.set_nodelay(true);
}

accept_stream(stream, main_worker_req_tx, event_tx, metric_src, graceful_exit_token.clone());
accept_stream(
stream,
main_worker_req_tx,
event_tx,
metric_src,
graceful_exit_token.clone(),
request_read_timeout_dur
)
}
Err(e) => error!("socket error: {}", e)
}
Expand Down Expand Up @@ -675,21 +692,29 @@ fn accept_stream<I>(
event_tx: Option<UnboundedSender<ServerEvent>>,
metric_src: SharedMetricSource,
graceful_exit_token: CancellationToken,
maybe_req_read_timeout_dur: Option<Duration>,
) where
I: AsyncRead + AsyncWrite + Unpin + Send + 'static,
{
metric_src.incl_active_io();
tokio::task::spawn({
async move {
let (service, cancel) = WorkerService::new(metric_src.clone(), req_tx);
let (io, maybe_timeout_tx) = if let Some(timeout_dur) = maybe_req_read_timeout_dur {
crate::timeout::Stream::with_timeout(io, timeout_dur)
} else {
crate::timeout::Stream::with_bypass(io)
};

let _guard = cancel.drop_guard();
let _active_io_count_guard = scopeguard::guard(metric_src, |it| {
it.decl_active_io();
});

let conn_fut = Http::new().serve_connection(io, service).with_upgrades();
let mut shutting_down = false;
let conn_fut = Http::new()
.serve_connection(io, crate::timeout::Service::new(service, maybe_timeout_tx))
.with_upgrades();

pin!(conn_fut);

Expand Down

0 comments on commit b104b1e

Please sign in to comment.