diff --git a/src/frontend/http.rs b/src/frontend/http.rs index acd44dfc..09305aa1 100644 --- a/src/frontend/http.rs +++ b/src/frontend/http.rs @@ -3,6 +3,7 @@ use datafusion::error::DataFusionError; use std::fmt::Debug; use std::io::Write; +use std::time::Instant; use std::{net::SocketAddr, sync::Arc}; use warp::{hyper, Rejection}; @@ -34,6 +35,7 @@ use warp::multipart::{FormData, Part}; use warp::reply::{with_header, Response}; use warp::{hyper::header, hyper::StatusCode, Filter, Reply}; +use super::http_utils::{handle_rejection, into_response, ApiError}; use crate::auth::{token_to_principal, AccessPolicy, Action, UserContext}; use crate::catalog::DEFAULT_DB; use crate::config::schema::{AccessSettings, MEBIBYTES}; @@ -44,13 +46,12 @@ use crate::{ }, }; -use super::http_utils::{handle_rejection, into_response, ApiError}; - const QUERY_HEADER: &str = "X-Seafowl-Query"; const BEARER_PREFIX: &str = "Bearer "; // We have a very lax CORS on this, so we don't mind browsers // caching it for as long as possible. const CORS_MAXAGE: u32 = 86400; +const QUERY_TIME_HEADER: &str = "X-Seafowl-Query-Time"; // Vary on Origin, as warp's CORS responds with Access-Control-Allow-Origin: [origin], // so we can't cache the response in the browser if the origin changes. @@ -155,6 +156,8 @@ pub async fn uncached_read_write_query( query: String, mut context: Arc, ) -> Result { + let timer = Instant::now(); + // If a specific DB name was used as a parameter in the route, scope the context to it, // effectively making it the default DB for the duration of the session. if database_name != context.database { @@ -215,6 +218,12 @@ pub async fn uncached_read_write_query( .headers_mut() .insert(header::CONTENT_TYPE, content_type_with_schema(schema)); } + + let elapsed = timer.elapsed().as_millis().to_string(); + response + .headers_mut() + .insert(QUERY_TIME_HEADER, elapsed.parse().unwrap()); + Ok(response) } @@ -285,6 +294,8 @@ pub async fn cached_read_query( if_none_match: Option, mut context: Arc, ) -> Result { + let timer = Instant::now(); + // Ignore dots at the end let query_or_hash = query_or_hash.split('.').next().unwrap(); @@ -346,6 +357,10 @@ pub async fn cached_read_query( let schema = physical.schema().clone(); let mut response = plan_to_response(context, physical).await?; + let elapsed = timer.elapsed().as_millis().to_string(); + response + .headers_mut() + .insert(QUERY_TIME_HEADER, elapsed.parse().unwrap()); response .headers_mut() .insert(header::ETAG, etag.parse().unwrap()); @@ -477,7 +492,6 @@ pub fn filters( .max_age(CORS_MAXAGE); let log = warp::log(module_path!()); - // Cached read query let ctx = context.clone(); let cached_read_query_route = warp::path!(String / "q" / String) @@ -605,10 +619,10 @@ pub mod tests { use crate::catalog::DEFAULT_DB; use crate::config::schema::{str_to_hex_hash, HttpFrontend}; - use crate::testutils::schema_from_header; + use crate::testutils::{assert_header_is_float, schema_from_header}; use crate::{ context::{test_utils::in_memory_context, DefaultSeafowlContext, SeafowlContext}, - frontend::http::{filters, QUERY_HEADER}, + frontend::http::{filters, QUERY_HEADER, QUERY_TIME_HEADER}, }; fn http_config_from_access_policy_and_cache_control( @@ -1518,7 +1532,10 @@ pub mod tests { )] #[case::uncached_post("POST", "/q")] #[tokio::test] - async fn test_http_type_conversion(#[case] method: &str, #[case] path: &str) { + async fn test_http_type_conversion_and_timing_header( + #[case] method: &str, + #[case] path: &str, + ) { let context = Arc::new(in_memory_context().await); let handler = filters( context.clone(), @@ -1572,5 +1589,11 @@ SELECT "# ) ); + + // Assert the "request-to-response" time header is present + assert!(resp.headers().contains_key(QUERY_TIME_HEADER)); + // Assert that it's a float + let header_value = resp.headers().get(QUERY_TIME_HEADER).unwrap(); + assert_header_is_float(header_value); } } diff --git a/src/testutils.rs b/src/testutils.rs index 38daa48b..78b7574f 100644 --- a/src/testutils.rs +++ b/src/testutils.rs @@ -1,4 +1,5 @@ use std::cmp::min; +use std::str::FromStr; use std::sync::Arc; use arrow::array::Int32Array; @@ -161,3 +162,9 @@ pub fn schema_from_header(headers: &HeaderMap) -> Schema { schema_from_json(&schema_json).expect("arrow schema reconstructable from JSON") } + +pub fn assert_header_is_float(header: &HeaderValue) -> bool { + let float_str = header.to_str().unwrap(); + let parsed_float = f64::from_str(float_str).unwrap(); + parsed_float.is_finite() +}