From 342cdc7e68cd90e90d9e2500601b9e687dc4c192 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Tue, 16 Sep 2025 14:15:07 +0200 Subject: [PATCH] Simplifying serve function In particular, there is no need to parse TLS config upon the reception of every connections. --- quickwit/quickwit-serve/src/rest.rs | 61 +++++++++++++++++------------ 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index e0e8e637f6b..a1240cb3c53 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -13,6 +13,7 @@ // limitations under the License. use std::fmt::Formatter; +use std::io; use std::sync::Arc; use hyper_util::rt::{TokioExecutor, TokioIo}; @@ -21,7 +22,8 @@ use hyper_util::service::TowerToHyperService; use quickwit_common::tower::BoxFutureInfaillible; use quickwit_config::{disable_ingest_v1, enable_ingest_v2}; use quickwit_search::SearchService; -use tokio::net::TcpListener; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio::net::{TcpListener, TcpStream}; use tokio_rustls::TlsAcceptor; use tokio_util::either::Either; use tower::ServiceBuilder; @@ -120,6 +122,20 @@ impl Predicate for CompressionPredicate { } } +async fn apply_tls_if_necessary( + tcp_stream: TcpStream, + tls_acceptor_opt: &Option, +) -> io::Result { + let Some(tls_acceptor) = &tls_acceptor_opt else { + return Ok(Either::Right(tcp_stream)); + }; + let tls_stream_res = tls_acceptor + .accept(tcp_stream) + .await + .inspect_err(|err| error!("failed to perform tls handshake: {err:#}"))?; + Ok(Either::Left(tls_stream_res)) +} + /// Starts REST services. pub(crate) async fn start_rest_server( tcp_listener: TcpListener, @@ -227,37 +243,33 @@ pub(crate) async fn start_rest_server( let mut shutdown_signal = std::pin::pin!(shutdown_signal); readiness_trigger.await; + let tls_acceptor_opt: Option = + if let Some(tls_config) = &quickwit_services.node_config.rest_config.tls { + let rustls_config = tls::make_rustls_config(tls_config)?; + Some(TlsAcceptor::from(rustls_config)) + } else { + None + }; + loop { tokio::select! { - conn = tcp_listener.accept() => { - let (stream, _remote_addr) = match conn { - Ok(conn) => conn, + tcp_accept_res = tcp_listener.accept() => { + let tcp_stream = match tcp_accept_res { + Ok((tcp_stream, _remote_addr)) => tcp_stream, Err(err) => { error!("failed to accept connection: {err:#}"); continue; } }; - let either_stream = - if let Some(tls_config) = &quickwit_services.node_config.rest_config.tls { - let rustls_config = tls::make_rustls_config(tls_config)?; - let acceptor = TlsAcceptor::from(rustls_config); - let tls_stream = match acceptor.accept(stream).await { - Ok(tls_stream) => tls_stream, - Err(err) => { - error!("failed to perform tls handshake: {err:#}"); - continue; - } - }; - Either::Left(tls_stream) - } else { - Either::Right(stream) + let Ok(tcp_or_tls_stream) = apply_tls_if_necessary(tcp_stream, &tls_acceptor_opt).await else { + continue; }; - let conn = server.serve_connection_with_upgrades(TokioIo::new(either_stream), service.clone()); - let conn = graceful.watch(conn.into_owned()); + let serve_fut = server.serve_connection_with_upgrades(TokioIo::new(tcp_or_tls_stream), service.clone()); + let serve_with_shutdown_fut = graceful.watch(serve_fut.into_owned()); tokio::spawn(async move { - if let Err(err) = conn.await { + if let Err(err) = serve_with_shutdown_fut.await { error!("failed to serve connection: {err:#}"); } }); @@ -269,11 +281,8 @@ pub(crate) async fn start_rest_server( } } - tokio::select! { - _ = graceful.shutdown() => { - info!("gracefully shutdown"); - } - } + graceful.shutdown().await; + info!("gracefully shutdown"); Ok(()) }