Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support idle timeout for HTTP requests #339

Merged
merged 6 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 63 additions & 9 deletions crates/base/src/rt_worker/worker_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::deno_runtime::DenoRuntime;
use crate::inspector_server::Inspector;
use crate::timeout;
use crate::timeout::{self, CancelOnWriteTimeout, ReadTimeoutStream};
use crate::utils::send_event_if_event_worker_available;
use crate::utils::units::bytes_to_display;

Expand Down Expand Up @@ -36,6 +36,7 @@ use tokio::io::{self, copy_bidirectional};
use tokio::net::TcpStream;
use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::{mpsc, oneshot, Mutex};
use tokio::time::sleep;
use tokio_rustls::server::TlsStream;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
Expand Down Expand Up @@ -96,6 +97,7 @@ async fn handle_request(
worker_kind: WorkerKind,
duplex_stream_tx: mpsc::UnboundedSender<DuplexStreamEntry>,
msg: WorkerRequestMsg,
maybe_request_idle_timeout: Option<u64>,
) -> Result<(), Error> {
let (ours, theirs) = io::duplex(1024);
let WorkerRequestMsg {
Expand Down Expand Up @@ -135,6 +137,7 @@ async fn handle_request(
tokio::spawn(relay_upgraded_request_and_response(
req_upgrade,
parts,
maybe_request_idle_timeout,
));

return;
Expand All @@ -152,7 +155,23 @@ async fn handle_request(

tokio::task::yield_now().await;

let res = request_sender.send_request(req).await;
let maybe_cancel_fut = async move {
if let Some(timeout_ms) = maybe_request_idle_timeout {
sleep(Duration::from_millis(timeout_ms)).await;
} else {
pending::<()>().await;
unreachable!()
}
};

let res = tokio::select! {
resp = request_sender.send_request(req) => resp,
_ = maybe_cancel_fut => {
// XXX(Nyannyacha): Should we add a more detailed message?
Ok(emit_status_code(http::StatusCode::REQUEST_TIMEOUT, None, true))
}
};

let Ok(res) = res else {
drop(res_tx.send(res));
return Ok(());
Expand All @@ -165,26 +184,51 @@ async fn handle_request(
match res_upgrade_type {
Some(accepted) if accepted == requested => {}
_ => {
drop(res_tx.send(Ok(emit_status_code(StatusCode::BAD_GATEWAY))));
drop(res_tx.send(Ok(emit_status_code(StatusCode::BAD_GATEWAY, None, true))));
return Ok(());
}
}
}

if let Some(timeout_ms) = maybe_request_idle_timeout {
let headers = res.headers();
let is_streamed_response = !headers.contains_key(http::header::CONTENT_LENGTH);

if is_streamed_response {
let duration = Duration::from_millis(timeout_ms);
let (parts, body) = res.into_parts();

drop(res_tx.send(Ok(Response::from_parts(
parts,
Body::wrap_stream(CancelOnWriteTimeout::new(body, duration)),
))));

return Ok(());
}
}

drop(res_tx.send(Ok(res)));
Ok(())
}

async fn relay_upgraded_request_and_response(
downstream: OnUpgrade,
parts: http1::Parts<io::DuplexStream>,
maybe_idle_timeout: Option<u64>,
) {
let mut upstream = Upgraded2::new(parts.io, parts.read_buf);
let upstream = Upgraded2::new(parts.io, parts.read_buf);
let mut upstream = if let Some(timeout_ms) = maybe_idle_timeout {
ReadTimeoutStream::with_timeout(upstream, Duration::from_millis(timeout_ms))
} else {
ReadTimeoutStream::with_bypass(upstream)
};

let mut downstream = downstream.await.expect("failed to upgrade request");

match copy_bidirectional(&mut upstream, &mut downstream).await {
Ok(_) => {}
Err(err) if err.kind() == ErrorKind::UnexpectedEof => {
Err(err) if matches!(err.kind(), ErrorKind::TimedOut | ErrorKind::BrokenPipe) => {}
Err(err) if matches!(err.kind(), ErrorKind::UnexpectedEof) => {
let Ok(_) = downstream.downcast::<timeout::Stream<TlsStream<TcpStream>>>() else {
// TODO(Nyannyacha): It would be better if we send
// `close_notify` before shutdown an upstream if downstream is a
Expand All @@ -196,8 +240,8 @@ async fn relay_upgraded_request_and_response(
};
}

_ => {
unreachable!("coping between upgraded connections failed");
value => {
unreachable!("coping between upgraded connections failed: {:?}", value);
}
}

Expand Down Expand Up @@ -512,6 +556,7 @@ impl CreateWorkerArgs {
pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
init_opts: Opt,
inspector: Option<Inspector>,
maybe_request_idle_timeout: Option<u64>,
) -> Result<(MetricSource, mpsc::UnboundedSender<WorkerRequestMsg>), Error> {
let (duplex_stream_tx, duplex_stream_rx) = mpsc::unbounded_channel::<DuplexStreamEntry>();
let (worker_boot_result_tx, worker_boot_result_rx) =
Expand Down Expand Up @@ -553,8 +598,13 @@ pub async fn create_worker<Opt: Into<CreateWorkerArgs>>(
tokio::task::spawn({
let stream_tx_inner = stream_tx.clone();
async move {
if let Err(err) =
handle_request(worker_kind, stream_tx_inner, msg).await
if let Err(err) = handle_request(
worker_kind,
stream_tx_inner,
msg,
maybe_request_idle_timeout,
)
.await
{
error!("worker failed to handle request: {:?}", err);
}
Expand Down Expand Up @@ -666,6 +716,7 @@ pub async fn create_main_worker(
termination_token,
),
inspector,
None,
)
.await
.map_err(|err| anyhow!("main worker boot error: {}", err))?;
Expand Down Expand Up @@ -713,6 +764,7 @@ pub async fn create_events_worker(
termination_token,
),
None,
None,
)
.await
.map_err(|err| anyhow!("events worker boot error: {}", err))?;
Expand All @@ -726,6 +778,7 @@ pub async fn create_user_worker_pool(
termination_token: Option<TerminationToken>,
static_patterns: Vec<String>,
inspector: Option<Inspector>,
request_idle_timeout: Option<u64>,
) -> Result<(SharedMetricSource, mpsc::UnboundedSender<UserWorkerMsgs>), Error> {
let metric_src = SharedMetricSource::default();
let (user_worker_msgs_tx, mut user_worker_msgs_rx) =
Expand All @@ -744,6 +797,7 @@ pub async fn create_user_worker_pool(
worker_event_sender,
user_worker_msgs_tx_clone,
inspector,
request_idle_timeout,
);

// Note: Keep this loop non-blocking. Spawn a task to run blocking calls.
Expand Down
12 changes: 9 additions & 3 deletions crates/base/src/rt_worker/worker_pool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::inspector_server::Inspector;
use crate::rt_worker::worker_ctx::{create_worker, send_user_worker_request};
use crate::server::ServerFlags;
use anyhow::{anyhow, bail, Context, Error};
use enum_as_inner::EnumAsInner;
use event_worker::events::WorkerEventWithMetadata;
Expand Down Expand Up @@ -88,15 +89,15 @@ impl WorkerPoolPolicy {
pub fn new(
supervisor: impl Into<Option<SupervisorPolicy>>,
max_parallelism: impl Into<Option<usize>>,
request_wait_timeout_ms: impl Into<Option<u64>>,
server_flags: ServerFlags,
) -> Self {
let default = Self::default();

Self {
supervisor_policy: supervisor.into().unwrap_or(default.supervisor_policy),
max_parallelism: max_parallelism.into().unwrap_or(default.max_parallelism),
request_wait_timeout_ms: request_wait_timeout_ms
.into()
request_wait_timeout_ms: server_flags
.request_wait_timeout_ms
.unwrap_or(default.request_wait_timeout_ms),
}
}
Expand Down Expand Up @@ -211,6 +212,7 @@ pub struct WorkerPool {
pub active_workers: HashMap<String, ActiveWorkerRegistry>,
pub worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
pub maybe_inspector: Option<Inspector>,
pub maybe_request_idle_timeout: Option<u64>,

// TODO: refactor this out of worker pool
pub worker_event_sender: Option<mpsc::UnboundedSender<WorkerEventWithMetadata>>,
Expand All @@ -223,6 +225,7 @@ impl WorkerPool {
worker_event_sender: Option<UnboundedSender<WorkerEventWithMetadata>>,
worker_pool_msgs_tx: mpsc::UnboundedSender<UserWorkerMsgs>,
inspector: Option<Inspector>,
request_idle_timeout: Option<u64>,
) -> Self {
Self {
policy,
Expand All @@ -231,6 +234,7 @@ impl WorkerPool {
user_workers: HashMap::new(),
active_workers: HashMap::new(),
maybe_inspector: inspector,
maybe_request_idle_timeout: request_idle_timeout,
worker_pool_msgs_tx,
}
}
Expand All @@ -249,6 +253,7 @@ impl WorkerPool {

let is_oneshot_policy = self.policy.supervisor_policy.is_oneshot();
let inspector = self.maybe_inspector.clone();
let request_idle_timeout = self.maybe_request_idle_timeout;

let force_create = worker_options
.conf
Expand Down Expand Up @@ -418,6 +423,7 @@ impl WorkerPool {
match create_worker(
(worker_options, supervisor_policy, termination_token.clone()),
inspector,
request_idle_timeout,
)
.await
{
Expand Down
3 changes: 3 additions & 0 deletions crates/base/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,8 @@ pub struct ServerFlags {
pub tcp_nodelay: bool,
pub graceful_exit_deadline_sec: u64,
pub graceful_exit_keepalive_deadline_ms: Option<u64>,
pub request_wait_timeout_ms: Option<u64>,
pub request_idle_timeout_ms: Option<u64>,
pub request_read_timeout_ms: Option<u64>,
}

Expand Down Expand Up @@ -379,6 +381,7 @@ impl Server {
Some(termination_tokens.pool.clone()),
static_patterns,
inspector.clone(),
flags.request_idle_timeout_ms,
)
.await?;

Expand Down
Loading
Loading