Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add the tracing layer to enhance rest api metrics #4392

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ tower = { version = "0.4.13", features = [
"retry",
"util",
] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] }
tower-http = { version = "0.4.0", features = ["compression-gzip", "cors", "trace"] }
tracing = "0.1.37"
tracing-opentelemetry = "0.20.0"
tracing-subscriber = { version = "0.3.16", features = [
Expand Down
150 changes: 145 additions & 5 deletions quickwit/quickwit-serve/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,164 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::marker::PhantomData;
use std::sync::{Arc, Mutex};

use hyper::{Request, Response};
use once_cell::sync::Lazy;
use quickwit_common::metrics::{new_counter, IntCounter};
use quickwit_common::metrics::{
new_counter_vec, new_gauge_vec, new_histogram_vec, HistogramVec, IntCounterVec, IntGaugeVec,
};
use tower_http::classify::{ServerErrorsAsFailures, SharedClassifier};
use tower_http::trace::{
DefaultMakeSpan, DefaultOnBodyChunk, DefaultOnEos, OnFailure, OnRequest, OnResponse, TraceLayer,
};

const UI_RESOURCES_PATH: &str = "/ui/";

/// `RestMetrics` is a structure representing a collection of metrics.
pub struct RestMetrics {
pub http_requests_total: IntCounter,
/// `http_requests_total` (labels: endpoint, method, status): the total number of HTTP requests
/// handled (counter)
pub http_requests_total: IntCounterVec<3>,
/// `http_requests_pending` (labels: endpoint, method): the number of currently in-flight
/// requests (gauge)
pub http_requests_pending: IntGaugeVec<2>,
/// `http_requests_duration_seconds` (labels: endpoint, method, status): the request duration
/// for all HTTP requests handled (histogram)
pub http_requests_duration_seconds: HistogramVec<3>,
}

impl Default for RestMetrics {
fn default() -> Self {
RestMetrics {
http_requests_total: new_counter(
http_requests_total: new_counter_vec(
"http_requests_total",
"Total number of HTTP requests received",
"Total total number of HTTP requests handled (counter)",
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
"quickwit",
["method", "path", "status"],
),
http_requests_pending: new_gauge_vec(
etolbakov marked this conversation as resolved.
Show resolved Hide resolved
"http_requests_pending",
"Number of currently in-flight requests (gauge)",
"quickwit",
["method", "path"],
),
http_requests_duration_seconds: new_histogram_vec(
"http_requests_duration_seconds",
"Request duration for all HTTP requests handled (histogram)",
"quickwit",
["method", "path", "status"],
),
}
}
}

pub type RestMetricsTraceLayer<B> = TraceLayer<
SharedClassifier<ServerErrorsAsFailures>,
DefaultMakeSpan,
RestMetricsRecorder<B>,
RestMetricsRecorder<B>,
DefaultOnBodyChunk,
DefaultOnEos,
RestMetricsRecorder<B>,
>;

/// `RestMetricsRecorder` holds the state(labels) required for recording metrics on a given
/// request/response.
pub struct RestMetricsRecorder<B> {
pub labels: Arc<Mutex<Vec<String>>>,
_phantom: PhantomData<B>,
}

impl<B> Clone for RestMetricsRecorder<B> {
fn clone(&self) -> Self {
Self {
labels: self.labels.clone(),
_phantom: self._phantom,
}
}
}

impl<B> RestMetricsRecorder<B> {
pub fn new() -> Self {
Self {
labels: Arc::new(Mutex::new(vec!["".to_string(); 2])),
_phantom: PhantomData,
}
}
}

/// Serve counters exposes a bunch a set of metrics about the request received to quickwit.
impl<B, FailureClass> OnFailure<FailureClass> for RestMetricsRecorder<B> {
fn on_failure(
&mut self,
_failure_classification: FailureClass,
_latency: std::time::Duration,
_span: &tracing::Span,
) {
let labels = self.labels.lock().expect("Failed to unlock labels").clone();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was trying to extract method_and_path obtaining in a method but it didn't compile. However, the gut feeling says that it should be possible

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have not checked, but I suspect the span stores the method and path, and hopefully, we can access those fields later to populate the labels.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for raising this point!
I've checked by simply putting dbg!(_span); in both on_failure and on_response and seems that the structure is barely informative:

[quickwit-serve/src/metrics.rs:135] _span = Span {
    name: "request",
    level: Level(
        Debug,
    ),
    target: "tower_http::trace::make_span",
    disabled: true,
    module_path: "tower_http::trace::make_span",
    line: 109,
    file: "/Users/etolbakov/.cargo/index.crates.io-6f17d22bba15001f/tower-http-0.4.4/src/trace/make_span.rs",
}

let labels_str: Vec<&str> = labels.iter().map(String::as_ref).collect();
let method_and_path =
<[&str; 2]>::try_from(labels_str).expect("Failed to convert to slice");

SERVE_METRICS
.http_requests_pending
.with_label_values(method_and_path)
.inc();
}
}

impl<B, RB> OnResponse<RB> for RestMetricsRecorder<B> {
fn on_response(
self,
response: &Response<RB>,
latency: std::time::Duration,
_span: &tracing::Span,
) {
let labels = self.labels.lock().expect("Failed to unlock labels").clone();
let labels_str: Vec<&str> = labels.iter().map(String::as_ref).collect();
let method_and_path =
<[&str; 2]>::try_from(labels_str).expect("Failed to convert to slice");

let code = response.status().to_string();
let method_path_and_code =
<[&str; 3]>::try_from([&method_and_path[..], &[code.as_str()]].concat())
.expect("Failed to convert to slice");

SERVE_METRICS
.http_requests_duration_seconds
.with_label_values(method_path_and_code)
.observe(latency.as_secs_f64());

SERVE_METRICS
.http_requests_total
.with_label_values(method_path_and_code)
.inc();

SERVE_METRICS
.http_requests_pending
.with_label_values(method_and_path)
.dec();
}
}

impl<B, RB> OnRequest<RB> for RestMetricsRecorder<B> {
fn on_request(&mut self, request: &Request<RB>, _span: &tracing::Span) {
let path = request.uri().path();
if !path.starts_with(UI_RESOURCES_PATH) {
*self.labels.lock().expect("Failed to unlock labels") =
vec![request.method().to_string(), path.to_string()]
}
}
}

pub fn make_rest_metrics_layer<B>() -> RestMetricsTraceLayer<B> {
let metrics_recorder = RestMetricsRecorder::new();
TraceLayer::new_for_http()
.on_request(metrics_recorder.clone())
.on_response(metrics_recorder.clone())
.on_failure(metrics_recorder)
}

/// `SERVE_METRICS` exposes a set of metrics about requests/response received to Quickwit.
pub static SERVE_METRICS: Lazy<RestMetrics> = Lazy::new(RestMetrics::default);
6 changes: 2 additions & 4 deletions quickwit/quickwit-serve/src/rest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::indexing_api::indexing_get_handler;
use crate::ingest_api::ingest_api_handlers;
use crate::jaeger_api::jaeger_api_handlers;
use crate::json_api_response::{ApiError, JsonApiResponse};
use crate::metrics::make_rest_metrics_layer;
use crate::metrics_api::metrics_handler;
use crate::node_info_handler::node_info_handler;
use crate::otlp_api::otlp_ingest_api_handlers;
Expand Down Expand Up @@ -70,9 +71,6 @@ pub(crate) async fn start_rest_server(
readiness_trigger: BoxFutureInfaillible<()>,
shutdown_signal: BoxFutureInfaillible<()>,
) -> anyhow::Result<()> {
let request_counter = warp::log::custom(|_| {
crate::SERVE_METRICS.http_requests_total.inc();
});
// Docs routes
let api_doc = warp::path("openapi.json")
.and(warp::get())
Expand Down Expand Up @@ -117,7 +115,6 @@ pub(crate) async fn start_rest_server(
.or(health_check_routes)
.or(metrics_routes)
.or(debugging_routes)
.with(request_counter)
.recover(recover_fn)
.with(extra_headers)
.boxed();
Expand All @@ -128,6 +125,7 @@ pub(crate) async fn start_rest_server(
let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins);

let service = ServiceBuilder::new()
.layer(make_rest_metrics_layer::<hyper::Body>())
.layer(
CompressionLayer::new()
.gzip(true)
Expand Down