Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 35 additions & 26 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::fmt::Formatter;
use std::io;
use std::sync::Arc;

use hyper_util::rt::{TokioExecutor, TokioIo};
Expand All @@ -21,7 +22,8 @@ use hyper_util::service::TowerToHyperService;
use quickwit_common::tower::BoxFutureInfaillible;
use quickwit_config::{disable_ingest_v1, enable_ingest_v2};
use quickwit_search::SearchService;
use tokio::net::TcpListener;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, TcpStream};
use tokio_rustls::TlsAcceptor;
use tokio_util::either::Either;
use tower::ServiceBuilder;
Expand Down Expand Up @@ -120,6 +122,20 @@ impl Predicate for CompressionPredicate {
}
}

async fn apply_tls_if_necessary(
tcp_stream: TcpStream,
tls_acceptor_opt: &Option<TlsAcceptor>,
) -> io::Result<impl AsyncRead + AsyncWrite + Unpin + 'static> {
let Some(tls_acceptor) = &tls_acceptor_opt else {
return Ok(Either::Right(tcp_stream));
};
let tls_stream_res = tls_acceptor
.accept(tcp_stream)
.await
.inspect_err(|err| error!("failed to perform tls handshake: {err:#}"))?;
Ok(Either::Left(tls_stream_res))
}

/// Starts REST services.
pub(crate) async fn start_rest_server(
tcp_listener: TcpListener,
Expand Down Expand Up @@ -227,37 +243,33 @@ pub(crate) async fn start_rest_server(
let mut shutdown_signal = std::pin::pin!(shutdown_signal);
readiness_trigger.await;

let tls_acceptor_opt: Option<TlsAcceptor> =
if let Some(tls_config) = &quickwit_services.node_config.rest_config.tls {
let rustls_config = tls::make_rustls_config(tls_config)?;
Some(TlsAcceptor::from(rustls_config))
} else {
None
};

loop {
tokio::select! {
conn = tcp_listener.accept() => {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not a connection.

let (stream, _remote_addr) = match conn {
Ok(conn) => conn,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

careful with variable shadowing

tcp_accept_res = tcp_listener.accept() => {
let tcp_stream = match tcp_accept_res {
Ok((tcp_stream, _remote_addr)) => tcp_stream,
Err(err) => {
error!("failed to accept connection: {err:#}");
continue;
}
};

let either_stream =
if let Some(tls_config) = &quickwit_services.node_config.rest_config.tls {
let rustls_config = tls::make_rustls_config(tls_config)?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsing tls config on every connection could be catastrohic. We don't know what code is running there.

let acceptor = TlsAcceptor::from(rustls_config);
let tls_stream = match acceptor.accept(stream).await {
Ok(tls_stream) => tls_stream,
Err(err) => {
error!("failed to perform tls handshake: {err:#}");
continue;
}
};
Either::Left(tls_stream)
} else {
Either::Right(stream)
let Ok(tcp_or_tls_stream) = apply_tls_if_necessary(tcp_stream, &tls_acceptor_opt).await else {
continue;
};

let conn = server.serve_connection_with_upgrades(TokioIo::new(either_stream), service.clone());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn again?

let conn = graceful.watch(conn.into_owned());
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

conn again?

let serve_fut = server.serve_connection_with_upgrades(TokioIo::new(tcp_or_tls_stream), service.clone());
let serve_with_shutdown_fut = graceful.watch(serve_fut.into_owned());
tokio::spawn(async move {
if let Err(err) = conn.await {
if let Err(err) = serve_with_shutdown_fut.await {
error!("failed to serve connection: {err:#}");
}
});
Expand All @@ -269,11 +281,8 @@ pub(crate) async fn start_rest_server(
}
}

tokio::select! {
_ = graceful.shutdown() => {
info!("gracefully shutdown");
}
}
graceful.shutdown().await;
info!("gracefully shutdown");

Ok(())
}
Expand Down