From 0ad1b4fd3ecb08ad7655ac3352b0df629cef85b6 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Thu, 27 Nov 2025 22:21:48 +0100 Subject: [PATCH 1/5] Skipping running tantivy search if cannot do better. Also, - removes some noisy intermediary traces/span. - logs the ast/agg in the root node. - logs a summary about the split execution (cache, pruned, etc.) in the leaf node --- quickwit/quickwit-common/src/metrics.rs | 37 ++- quickwit/quickwit-search/src/leaf.rs | 251 ++++++++++++------ quickwit/quickwit-search/src/list_terms.rs | 2 +- quickwit/quickwit-search/src/metrics.rs | 94 ++++++- quickwit/quickwit-search/src/root.rs | 25 +- .../src/search_permit_provider.rs | 3 - 6 files changed, 308 insertions(+), 104 deletions(-) diff --git a/quickwit/quickwit-common/src/metrics.rs b/quickwit/quickwit-common/src/metrics.rs index c59bf953937..db88280794f 100644 --- a/quickwit/quickwit-common/src/metrics.rs +++ b/quickwit/quickwit-common/src/metrics.rs @@ -40,6 +40,26 @@ pub struct IntCounterVec { } impl IntCounterVec { + pub fn new( + name: &str, + help: &str, + subsystem: &str, + const_labels: &[(&str, &str)], + label_names: [&str; N], + ) -> IntCounterVec { + let owned_const_labels: HashMap = const_labels + .iter() + .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) + .collect(); + let counter_opts = Opts::new(name, help) + .namespace("quickwit") + .subsystem(subsystem) + .const_labels(owned_const_labels); + let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names) + .expect("failed to create counter vec"); + IntCounterVec { underlying } + } + pub fn with_label_values(&self, label_values: [&str; N]) -> IntCounter { self.underlying.with_label_values(&label_values) } @@ -92,21 +112,10 @@ pub fn new_counter_vec( const_labels: &[(&str, &str)], label_names: [&str; N], ) -> IntCounterVec { - let owned_const_labels: HashMap = const_labels - .iter() - .map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string())) - .collect(); - let counter_opts = Opts::new(name, help) - .namespace("quickwit") - .subsystem(subsystem) - .const_labels(owned_const_labels); - let underlying = PrometheusIntCounterVec::new(counter_opts, &label_names) - .expect("failed to create counter vec"); - - let collector = Box::new(underlying.clone()); + let int_counter_vec = IntCounterVec::new(name, help, subsystem, const_labels, label_names); + let collector = Box::new(int_counter_vec.underlying.clone()); prometheus::register(collector).expect("failed to register counter vec"); - - IntCounterVec { underlying } + int_counter_vec } pub fn new_float_gauge( diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index c1362d83600..2029711f56b 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -45,12 +45,12 @@ use tokio::task::JoinError; use tracing::*; use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector}; +use crate::metrics::SplitSearchOutcomeCounters; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation}; use crate::service::{SearcherContext, deserialize_doc_mapper}; use crate::{QuickwitAggregations, SearchError}; -#[instrument(skip_all)] async fn get_split_footer_from_cache_or_fetch( index_storage: Arc, split_and_footer_offsets: &SplitIdAndFooterOffsets, @@ -88,7 +88,6 @@ async fn get_split_footer_from_cache_or_fetch( /// Returns hotcache_bytes and the split directory (`BundleStorage`) with cache layer: /// - A split footer cache given by `SearcherContext.split_footer_cache`. -#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] pub(crate) async fn open_split_bundle( searcher_context: &SearcherContext, index_storage: Arc, @@ -143,7 +142,6 @@ fn configure_storage_retries( /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. /// - An ephemeral unbounded cache directory (whose lifetime is tied to the /// returned `Index` if no `ByteRangeCache` is provided). -#[instrument(skip_all, fields(split_footer_start=split_and_footer_offsets.split_footer_start, split_footer_end=split_and_footer_offsets.split_footer_end))] pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, @@ -434,25 +432,28 @@ fn compute_index_size(hot_directory: &HotDirectory) -> ByteSize { /// Apply a leaf search on a single split. #[allow(clippy::too_many_arguments)] async fn leaf_search_single_split( - searcher_context: &SearcherContext, mut search_request: SearchRequest, + ctx: Arc, storage: Arc, split: SplitIdAndFooterOffsets, - doc_mapper: Arc, - split_filter: Arc>, aggregations_limits: AggregationLimitsGuard, search_permit: &mut SearchPermit, -) -> crate::Result { +) -> crate::Result> { + let mut leaf_search_state_guard = + SplitSearchStateGuard::new(ctx.split_outcome_counters.clone()); + rewrite_request( &mut search_request, &split, - doc_mapper.timestamp_field_name(), + ctx.doc_mapper.timestamp_field_name(), ); - if let Some(cached_answer) = searcher_context + if let Some(cached_answer) = ctx + .searcher_context .leaf_search_cache .get(split.clone(), search_request.clone()) { - return Ok(cached_answer); + leaf_search_state_guard.set_state(SplitSearchState::CacheHit); + return Ok(Some(cached_answer)); } let query_ast: QueryAst = serde_json::from_str(search_request.query_ast.as_str()) @@ -463,17 +464,18 @@ async fn leaf_search_single_split( // split can't have better results. // if is_metadata_count_request_with_ast(&query_ast, &search_request) { - return Ok(get_leaf_resp_from_count(split.num_docs)); + leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmUp); + return Ok(Some(get_leaf_resp_from_count(split.num_docs))); } let split_id = split.split_id.to_string(); let byte_range_cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); let (index, hot_directory) = open_index_with_caches( - searcher_context, + &*ctx.searcher_context, storage, &split, - Some(doc_mapper.tokenizer_manager()), + Some(ctx.doc_mapper.tokenizer_manager()), Some(byte_range_cache.clone()), ) .await?; @@ -491,19 +493,22 @@ async fn leaf_search_single_split( let agg_context_params = AggContextParams { limits: aggregations_limits, - tokenizers: doc_mapper.tokenizer_manager().tantivy_manager().clone(), + tokenizers: ctx.doc_mapper.tokenizer_manager().tantivy_manager().clone(), }; let mut collector = make_collector_for_split(split_id.clone(), &search_request, agg_context_params)?; let split_schema = index.schema(); - let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?; + let (query, mut warmup_info) = ctx + .doc_mapper + .query(split_schema.clone(), &query_ast, false)?; let collector_warmup_info = collector.warmup_info(); warmup_info.merge(collector_warmup_info); warmup_info.simplify(); let warmup_start = Instant::now(); + leaf_search_state_guard.set_state(SplitSearchState::WarmUp); warmup(&searcher, &warmup_info).await?; let warmup_end = Instant::now(); let warmup_duration: Duration = warmup_end.duration_since(warmup_start); @@ -525,20 +530,28 @@ async fn leaf_search_single_split( let span = info_span!("tantivy_search"); - let (search_request, leaf_search_response) = { - let split = split.clone(); + let split_clone = split.clone(); + let ctx_clone = ctx.clone(); + leaf_search_state_guard.set_state(SplitSearchState::CpuQueue); + let search_request_and_result: Option<(SearchRequest, LeafSearchResponse)> = crate::search_thread_pool() .run_cpu_intensive(move || { + leaf_search_state_guard.set_state(SplitSearchState::Cpu); let cpu_start = Instant::now(); let cpu_thread_pool_wait_microsecs = cpu_start.duration_since(warmup_end); let _span_guard = span.enter(); // Our search execution has been scheduled, let's check if we can improve the // request based on the results of the preceding searches - check_optimize_search_request(&mut search_request, &split, &split_filter); - collector.update_search_param(&search_request); + let Some(simplified_search_request) = + simplify_search_request(search_request, &split_clone, &ctx_clone.split_filter) + else { + leaf_search_state_guard.set_state(SplitSearchState::PrunedAfterWarmup); + return Ok(None); + }; + collector.update_search_param(&simplified_search_request); let mut leaf_search_response: LeafSearchResponse = - if is_metadata_count_request_with_ast(&query_ast, &search_request) { + if is_metadata_count_request_with_ast(&query_ast, &simplified_search_request) { get_leaf_resp_from_count(searcher.num_docs()) } else if collector.is_count_only() { let count = query.count(&searcher)? as u64; @@ -554,18 +567,28 @@ async fn leaf_search_single_split( cpu_thread_pool_wait_microsecs: cpu_thread_pool_wait_microsecs.as_micros() as u64, }); - Result::<_, TantivyError>::Ok((search_request, leaf_search_response)) + leaf_search_state_guard.set_state(SplitSearchState::Success); + Result::<_, TantivyError>::Ok(Some(( + simplified_search_request, + leaf_search_response, + ))) }) .await .map_err(|_| { crate::SearchError::Internal(format!("leaf search panicked. split={split_id}")) - })?? - }; + })??; - searcher_context - .leaf_search_cache - .put(split, search_request, leaf_search_response.clone()); - Ok(leaf_search_response) + // Let's cache this result in the partial result cache. + if let Some((leaf_search_req, leaf_search_resp)) = search_request_and_result { + ctx.searcher_context.leaf_search_cache.put( + split, + leaf_search_req, + leaf_search_resp.clone(), + ); + Ok(Some(leaf_search_resp)) + } else { + Ok(None) + } } /// Rewrite a request removing parts which incur additional download or computation with no @@ -1255,17 +1278,39 @@ pub async fn multi_index_leaf_search( } /// Optimizes the search_request based on CanSplitDoBetter -/// Returns true if the split can return better results -fn check_optimize_search_request( - search_request: &mut SearchRequest, +/// Returns None if the search request does nothing can be skipped. +#[must_use] +fn simplify_search_request( + mut search_request: SearchRequest, split: &SplitIdAndFooterOffsets, - split_filter: &Arc>, -) -> bool { - let can_be_better = split_filter.read().unwrap().can_be_better(split); + split_filter_lock: &Arc>, +) -> Option { + let can_be_better: bool; + let is_trace_req: bool; + { + let split_filter_guard = split_filter_lock.read().unwrap(); + can_be_better = split_filter_guard.can_be_better(split); + // The info is originally from the search_request.aggregation as a string (yes we need to + // clean this eventually). We don't want to parse it again, so we use the + // split_filter variant to get that info. + is_trace_req = matches!( + &*split_filter_guard, + &CanSplitDoBetter::FindTraceIdsAggregation(_) + ); + } if !can_be_better { - disable_search_request_hits(search_request); + disable_search_request_hits(&mut search_request); + } + if search_request.max_hits > 0 { + return Some(search_request); + } + if search_request.aggregation_request.is_some() && !is_trace_req { + return Some(search_request); } - can_be_better + if search_request.count_hits() == CountHits::CountAll { + return Some(search_request); + } + None } /// Alter the search request so it does not return any docs. @@ -1283,7 +1328,6 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) { /// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in /// charge to consolidate, identify the actual final top hits to display, and /// fetch the actual documents to convert the partial hits into actual Hits. -#[instrument(skip_all, fields(index = ?request.index_id_patterns))] pub async fn single_doc_mapping_leaf_search( searcher_context: Arc, request: Arc, @@ -1294,21 +1338,11 @@ pub async fn single_doc_mapping_leaf_search( ) -> Result { let num_docs: u64 = splits.iter().map(|split| split.num_docs).sum(); let num_splits = splits.len(); - let current_span = tracing::Span::current(); - current_span.record("num_docs", num_docs); - current_span.record("num_splits", num_splits); - info!(num_docs, num_splits, split_offsets = ?PrettySample::new(&splits, 5)); let split_filter = CanSplitDoBetter::from_request(&request, doc_mapper.timestamp_field_name()); let split_with_req = split_filter.optimize(request.clone(), splits)?; - // if client wants full count, or we are doing an aggregation, we want to run every splits. - // However if the aggregation is the tracing aggregation, we don't actually need all splits. - let run_all_splits = request.count_hits() == CountHits::CountAll - || (request.aggregation_request.is_some() - && !matches!(split_filter, CanSplitDoBetter::FindTraceIdsAggregation(_))); - let split_filter = Arc::new(RwLock::new(split_filter)); let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> = @@ -1333,29 +1367,35 @@ pub async fn single_doc_mapping_leaf_search( .get_permits(permit_sizes) .await; - for ((split, mut request), permit_fut) in + let leaf_search_context = Arc::new(LeafSearchContext { + searcher_context: searcher_context.clone(), + split_outcome_counters: Arc::new(SplitSearchOutcomeCounters::new_unregistered()), + incremental_merge_collector: incremental_merge_collector.clone(), + doc_mapper: doc_mapper.clone(), + split_filter: split_filter.clone(), + }); + + for ((split, search_request), permit_fut) in split_with_req.into_iter().zip(permit_futures.into_iter()) { let leaf_split_search_permit = permit_fut .instrument(info_span!("waiting_for_leaf_search_split_semaphore")) .await; - let can_be_better = check_optimize_search_request(&mut request, &split, &split_filter); - if !can_be_better && !run_all_splits { + let Some(simplified_search_request) = + simplify_search_request(search_request, &split, &split_filter) + else { continue; - } + }; leaf_search_single_split_join_handles.push(( split.split_id.clone(), tokio::spawn( leaf_search_single_split_wrapper( - request, - searcher_context.clone(), + simplified_search_request, + leaf_search_context.clone(), index_storage.clone(), - doc_mapper.clone(), split, - split_filter.clone(), - incremental_merge_collector.clone(), leaf_split_search_permit, aggregations_limits.clone(), ) @@ -1385,6 +1425,8 @@ pub async fn single_doc_mapping_leaf_search( } } + info!(split_outcome_counters=%leaf_search_context.split_outcome_counters, "leaf split search finished"); + // we can't use unwrap_or_clone because mutexes aren't Clone let mut incremental_merge_collector = match Arc::try_unwrap(incremental_merge_collector) { Ok(filter_merger) => filter_merger.into_inner().unwrap(), @@ -1409,46 +1451,102 @@ pub async fn single_doc_mapping_leaf_search( Ok(leaf_search_response_reresult??) } +#[derive(Copy, Clone)] +enum SplitSearchState { + Start, + CacheHit, + PrunedBeforeWarmUp, + WarmUp, + PrunedAfterWarmup, + CpuQueue, + Cpu, + Success, +} + +impl SplitSearchState { + pub fn inc(self, counters: &SplitSearchOutcomeCounters) { + match self { + SplitSearchState::Start => counters.cancel_before_warmup.inc(), + SplitSearchState::CacheHit => counters.cache_hit.inc(), + SplitSearchState::PrunedBeforeWarmUp => counters.pruned_before_warmup.inc(), + SplitSearchState::WarmUp => counters.cancel_warmup.inc(), + SplitSearchState::PrunedAfterWarmup => counters.pruned_after_warmup.inc(), + SplitSearchState::CpuQueue => counters.cancel_cpu_queue.inc(), + SplitSearchState::Cpu => counters.cancel_cpu.inc(), + SplitSearchState::Success => counters.success.inc(), + } + } +} + +impl Drop for SplitSearchStateGuard { + fn drop(&mut self) { + self.state + .inc(&crate::metrics::SEARCH_METRICS.split_search_outcome_total); + self.state.inc(&self.local_split_search_outcome_counters); + } +} + +struct SplitSearchStateGuard { + state: SplitSearchState, + local_split_search_outcome_counters: Arc, +} + +impl SplitSearchStateGuard { + pub fn new(local_split_search_outcome_counters: Arc) -> Self { + SplitSearchStateGuard { + state: SplitSearchState::Start, + local_split_search_outcome_counters: local_split_search_outcome_counters.clone(), + } + } + + pub fn set_state(&mut self, state: SplitSearchState) { + self.state = state; + } +} + +struct LeafSearchContext { + searcher_context: Arc, + split_outcome_counters: Arc, + incremental_merge_collector: Arc>, + doc_mapper: Arc, + split_filter: Arc>, +} + #[allow(clippy::too_many_arguments)] #[instrument(skip_all, fields(split_id = split.split_id, num_docs = split.num_docs))] async fn leaf_search_single_split_wrapper( request: SearchRequest, - searcher_context: Arc, + ctx: Arc, index_storage: Arc, - doc_mapper: Arc, split: SplitIdAndFooterOffsets, - split_filter: Arc>, - incremental_merge_collector: Arc>, mut search_permit: SearchPermit, aggregations_limits: AggregationLimitsGuard, ) { - crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); let timer = crate::SEARCH_METRICS .leaf_search_split_duration_secs .start_timer(); - let leaf_search_single_split_res = leaf_search_single_split( - &searcher_context, - request, - index_storage, - split.clone(), - doc_mapper, - split_filter.clone(), - aggregations_limits, - &mut search_permit, - ) - .await; + let leaf_search_single_split_opt_res: crate::Result> = + leaf_search_single_split( + request, + ctx.clone(), + index_storage, + split.clone(), + aggregations_limits, + &mut search_permit, + ) + .await; // Explicitly drop the permit for readability. // This should always happen after the ephemeral search cache is dropped. std::mem::drop(search_permit); - if leaf_search_single_split_res.is_ok() { + if leaf_search_single_split_opt_res.is_ok() { timer.observe_duration(); } - let mut locked_incremental_merge_collector = incremental_merge_collector.lock().unwrap(); - match leaf_search_single_split_res { - Ok(split_search_res) => { + let mut locked_incremental_merge_collector = ctx.incremental_merge_collector.lock().unwrap(); + match leaf_search_single_split_opt_res { + Ok(Some(split_search_res)) => { if let Err(err) = locked_incremental_merge_collector.add_result(split_search_res) { locked_incremental_merge_collector.add_failed_split(SplitSearchError { split_id: split.split_id.clone(), @@ -1457,6 +1555,7 @@ async fn leaf_search_single_split_wrapper( }); } } + Ok(None) => {} Err(err) => locked_incremental_merge_collector.add_failed_split(SplitSearchError { split_id: split.split_id.clone(), error: format!("{err}"), @@ -1466,7 +1565,7 @@ async fn leaf_search_single_split_wrapper( if let Some(last_hit) = locked_incremental_merge_collector.peek_worst_hit() { // TODO: we could use the RWLock instead and read the value instead of updating it // unconditionally. - split_filter + ctx.split_filter .write() .unwrap() .record_new_worst_hit(last_hit.as_ref()); diff --git a/quickwit/quickwit-search/src/list_terms.rs b/quickwit/quickwit-search/src/list_terms.rs index c69f232f9d8..45dd1256f36 100644 --- a/quickwit/quickwit-search/src/list_terms.rs +++ b/quickwit/quickwit-search/src/list_terms.rs @@ -345,7 +345,7 @@ pub async fn leaf_list_terms( async move { let leaf_split_search_permit = search_permit_recv.await; // TODO dedicated counter and timer? - crate::SEARCH_METRICS.leaf_searches_splits_total.inc(); + crate::SEARCH_METRICS.leaf_list_terms_splits_total.inc(); let timer = crate::SEARCH_METRICS .leaf_search_split_duration_secs .start_timer(); diff --git a/quickwit/quickwit-search/src/metrics.rs b/quickwit/quickwit-search/src/metrics.rs index 42e3d289d0c..9ed11d7ecbb 100644 --- a/quickwit/quickwit-search/src/metrics.rs +++ b/quickwit/quickwit-search/src/metrics.rs @@ -14,6 +14,8 @@ // See https://prometheus.io/docs/practices/naming/ +use std::fmt; + use bytesize::ByteSize; use once_cell::sync::Lazy; use quickwit_common::metrics::{ @@ -22,6 +24,86 @@ use quickwit_common::metrics::{ new_histogram_vec, }; +fn print_if_not_null( + field_name: &'static str, + counter: &IntCounter, + f: &mut fmt::Formatter, +) -> fmt::Result { + let val = counter.get(); + if val > 0 { + write!(f, "{}={} ", field_name, val)?; + } + Ok(()) +} + +pub struct SplitSearchOutcomeCounters { + pub cancel_before_warmup: IntCounter, + pub cache_hit: IntCounter, + pub pruned_before_warmup: IntCounter, + pub cancel_warmup: IntCounter, + pub pruned_after_warmup: IntCounter, + pub cancel_cpu_queue: IntCounter, + pub cancel_cpu: IntCounter, + pub success: IntCounter, +} + +impl fmt::Display for SplitSearchOutcomeCounters { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + print_if_not_null("cancel_before_warmup", &self.cancel_before_warmup, f)?; + print_if_not_null("cache_hit", &self.cache_hit, f)?; + print_if_not_null("pruned_before_warmup", &self.pruned_before_warmup, f)?; + print_if_not_null("cancel_warmup", &self.cancel_warmup, f)?; + print_if_not_null("pruned_after_warmup", &self.pruned_after_warmup, f)?; + print_if_not_null("cancel_cpu_queue", &self.cancel_cpu_queue, f)?; + print_if_not_null("cancel_cpu", &self.cancel_cpu, f)?; + print_if_not_null("success", &self.success, f)?; + Ok(()) + } +} + +impl SplitSearchOutcomeCounters { + /// Create a new SplitSearchOutcomeCounters instance, registered in prometheus. + pub fn new_registered() -> Self { + let search_split_outcome_vec = new_counter_vec( + "split_search_outcome", + "Count the state in which each leaf search split ended", + "search", + &[], + ["category"], + ); + Self::new_from_counter_vec(search_split_outcome_vec) + } + + /// Create a new SplitSearchOutcomeCounters instance, but this one won't be reported to + /// prometheus. + pub fn new_unregistered() -> Self { + let search_split_outcome_vec = IntCounterVec::new( + "split_search_outcome", + "Count the state in which each leaf search split ended", + "search", + &[], + ["category"], + ); + Self::new_from_counter_vec(search_split_outcome_vec) + } + + pub fn new_from_counter_vec(search_split_outcome_vec: IntCounterVec<1>) -> Self { + SplitSearchOutcomeCounters { + cancel_before_warmup: search_split_outcome_vec + .with_label_values(["cancel_before_warmup"]), + cache_hit: search_split_outcome_vec.with_label_values(["cache_hit"]), + pruned_before_warmup: search_split_outcome_vec + .with_label_values(["pruned_before_warmup"]), + cancel_warmup: search_split_outcome_vec.with_label_values(["cancel_warmup"]), + pruned_after_warmup: search_split_outcome_vec + .with_label_values(["pruned_after_warmup"]), + cancel_cpu_queue: search_split_outcome_vec.with_label_values(["cancel_cpu_queue"]), + cancel_cpu: search_split_outcome_vec.with_label_values(["cancel_cpu"]), + success: search_split_outcome_vec.with_label_values(["success"]), + } + } +} + pub struct SearchMetrics { pub root_search_requests_total: IntCounterVec<1>, pub root_search_request_duration_seconds: HistogramVec<1>, @@ -29,7 +111,8 @@ pub struct SearchMetrics { pub leaf_search_requests_total: IntCounterVec<1>, pub leaf_search_request_duration_seconds: HistogramVec<1>, pub leaf_search_targeted_splits: HistogramVec<1>, - pub leaf_searches_splits_total: IntCounter, + pub leaf_list_terms_splits_total: IntCounter, + pub split_search_outcome_total: SplitSearchOutcomeCounters, pub leaf_search_split_duration_secs: Histogram, pub job_assigned_total: IntCounterVec<1>, pub leaf_search_single_split_tasks_pending: IntGauge, @@ -123,12 +206,15 @@ impl Default for SearchMetrics { ["status"], targeted_splits_buckets, ), - leaf_searches_splits_total: new_counter( - "leaf_searches_splits_total", - "Number of leaf searches (count of splits) started.", + + leaf_list_terms_splits_total: new_counter( + "leaf_list_terms_splits_total", + "Number of list terms splits total", "search", &[], ), + split_search_outcome_total: SplitSearchOutcomeCounters::new_registered(), + leaf_search_split_duration_secs: new_histogram( "leaf_search_split_duration_secs", "Number of seconds required to run a leaf search over a single split. The timer \ diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index 345cfdacd19..c2225b7524e 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -45,7 +45,7 @@ use tantivy::aggregation::agg_result::AggregationResults; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; -use tracing::{debug, info_span, instrument}; +use tracing::{debug, info, info_span, instrument}; use crate::cluster_client::ClusterClient; use crate::collector::{QuickwitAggregations, make_merge_collector}; @@ -784,7 +784,12 @@ pub(crate) async fn search_partial_hits_phase( ) { // We log at most 5 times per minute. - quickwit_common::rate_limited_info!(limit_per_min=5, split_num_docs=resource_stats.split_num_docs, %search_request.query_ast, short_lived_cached_num_bytes=resource_stats.short_lived_cache_num_bytes, query=%search_request.query_ast, "memory intensive query"); + quickwit_common::rate_limited_info!( + limit_per_min = 5, + split_num_docs = resource_stats.split_num_docs, + short_lived_cached_num_bytes = resource_stats.short_lived_cache_num_bytes, + "memory intensive query" + ); } if !leaf_search_response.failed_splits.is_empty() { @@ -958,7 +963,6 @@ fn get_sort_field_datetime_format( /// 2. Merges the search results. /// 3. Sends fetch docs requests to multiple leaf nodes. /// 4. Builds the response with docs and returns. -#[instrument(skip_all, fields(num_splits=%split_metadatas.len()))] async fn root_search_aux( searcher_context: &SearcherContext, indexes_metas_for_leaf_search: &IndexesMetasForLeafSearch, @@ -1207,9 +1211,18 @@ pub async fn root_search( let num_docs: usize = split_metadatas.iter().map(|split| split.num_docs).sum(); let num_splits = split_metadatas.len(); - let current_span = tracing::Span::current(); - current_span.record("num_docs", num_docs); - current_span.record("num_splits", num_splits); + + // It would have been nice to add those in the context of the trace span, + // but with our current logging setting, it makes logs too verbose. + info!( + query_ast = search_request.query_ast.as_str(), + agg = search_request.aggregation_request(), + start_ts = ?(search_request.start_timestamp()..search_request.end_timestamp()), + count_required = search_request.count_hits().as_str_name(), + num_docs = num_docs, + num_splits = num_splits, + "root_search" + ); let mut search_response_result = RootSearchMetricsFuture { start: start_instant, diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index 3a9eff7e8be..d75c72587a6 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -38,7 +38,6 @@ pub struct SearchPermitProvider { actor_stopped: watch::Receiver, } -#[derive(Debug)] pub enum SearchPermitMessage { Request { permit_sender: oneshot::Sender>, @@ -235,7 +234,6 @@ impl SearchPermitActor { } } -#[derive(Debug)] pub struct SearchPermit { _ongoing_gauge_guard: GaugeGuard<'static>, msg_sender: mpsc::WeakUnboundedSender, @@ -288,7 +286,6 @@ impl Drop for SearchPermit { } } -#[derive(Debug)] pub struct SearchPermitFuture(oneshot::Receiver); impl Future for SearchPermitFuture { From fed8a1bf21263b624ff37a1b8f81f3c8cebc89d9 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Fri, 28 Nov 2025 14:50:02 +0100 Subject: [PATCH 2/5] CR comment --- quickwit/quickwit-search/src/leaf.rs | 10 +++++++--- quickwit/quickwit-search/src/search_permit_provider.rs | 10 +++++----- 2 files changed, 12 insertions(+), 8 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 2029711f56b..1d6a096e423 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -464,7 +464,7 @@ async fn leaf_search_single_split( // split can't have better results. // if is_metadata_count_request_with_ast(&query_ast, &search_request) { - leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmUp); + leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup); return Ok(Some(get_leaf_resp_from_count(split.num_docs))); } @@ -1320,6 +1320,7 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) { search_request.max_hits = 0; search_request.start_offset = 0; search_request.sort_fields.clear(); + search_request.search_after = None; } /// Searches multiple splits for a specific index and a single doc mapping @@ -1385,6 +1386,9 @@ pub async fn single_doc_mapping_leaf_search( let Some(simplified_search_request) = simplify_search_request(search_request, &split, &split_filter) else { + let mut leaf_search_state_guard = + SplitSearchStateGuard::new(leaf_search_context.split_outcome_counters.clone()); + leaf_search_state_guard.set_state(SplitSearchState::PrunedBeforeWarmup); continue; }; @@ -1455,7 +1459,7 @@ pub async fn single_doc_mapping_leaf_search( enum SplitSearchState { Start, CacheHit, - PrunedBeforeWarmUp, + PrunedBeforeWarmup, WarmUp, PrunedAfterWarmup, CpuQueue, @@ -1468,7 +1472,7 @@ impl SplitSearchState { match self { SplitSearchState::Start => counters.cancel_before_warmup.inc(), SplitSearchState::CacheHit => counters.cache_hit.inc(), - SplitSearchState::PrunedBeforeWarmUp => counters.pruned_before_warmup.inc(), + SplitSearchState::PrunedBeforeWarmup => counters.pruned_before_warmup.inc(), SplitSearchState::WarmUp => counters.cancel_warmup.inc(), SplitSearchState::PrunedAfterWarmup => counters.pruned_after_warmup.inc(), SplitSearchState::CpuQueue => counters.cancel_cpu_queue.inc(), diff --git a/quickwit/quickwit-search/src/search_permit_provider.rs b/quickwit/quickwit-search/src/search_permit_provider.rs index d75c72587a6..315962c6a5c 100644 --- a/quickwit/quickwit-search/src/search_permit_provider.rs +++ b/quickwit/quickwit-search/src/search_permit_provider.rs @@ -418,14 +418,14 @@ mod tests { .await; // the next permit is blocked by the memory budget let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - try_get(next_blocked_permit_fut).await.unwrap_err(); + try_get(next_blocked_permit_fut).await.err().unwrap(); // if we drop one of the permits, we can get a new one permits.drain(0..1); let next_permit_fut = remaining_permit_futs.next().unwrap(); let _new_permit = try_get(next_permit_fut).await.unwrap(); // the next permit is blocked again by the memory budget let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - try_get(next_blocked_permit_fut).await.unwrap_err(); + try_get(next_blocked_permit_fut).await.err().unwrap(); // by setting a more accurate memory usage after a completed warmup, we can get more permits permits[0].update_memory_usage(ByteSize::mb(4)); permits[1].update_memory_usage(ByteSize::mb(6)); @@ -448,14 +448,14 @@ mod tests { .await; // the next permit is blocked by the warmup slots let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - try_get(next_blocked_permit_fut).await.unwrap_err(); + try_get(next_blocked_permit_fut).await.err().unwrap(); // if we drop one of the permits, we can get a new one permits.drain(0..1); let next_permit_fut = remaining_permit_futs.next().unwrap(); permits.push(try_get(next_permit_fut).await.unwrap()); // the next permit is blocked again by the warmup slots let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - try_get(next_blocked_permit_fut).await.unwrap_err(); + try_get(next_blocked_permit_fut).await.err().unwrap(); // we can explicitly free the warmup slot on a permit permits[0].free_warmup_slot(); let next_permit_fut = remaining_permit_futs.next().unwrap(); @@ -463,7 +463,7 @@ mod tests { // dropping that same permit does not free up another slot permits.drain(0..1); let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); - try_get(next_blocked_permit_fut).await.unwrap_err(); + try_get(next_blocked_permit_fut).await.err().unwrap(); // but dropping a permit for which the slot wasn't explicitly free does free up a slot permits.drain(0..1); let next_blocked_permit_fut = remaining_permit_futs.next().unwrap(); From 31d58575ef691f55ac5ceaddee0d66e446fa0554 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Fri, 28 Nov 2025 15:42:19 +0100 Subject: [PATCH 3/5] fixing unit test --- quickwit/quickwit-cli/src/lib.rs | 12 ++++++++++++ quickwit/quickwit-cli/src/main.rs | 6 ++---- quickwit/quickwit-common/src/io.rs | 2 +- .../src/actors/indexing_pipeline.rs | 14 ++++++++++++-- .../src/test_utils/cluster_sandbox.rs | 5 +---- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/quickwit/quickwit-cli/src/lib.rs b/quickwit/quickwit-cli/src/lib.rs index 5a591ac76e3..378d71f260f 100644 --- a/quickwit/quickwit-cli/src/lib.rs +++ b/quickwit/quickwit-cli/src/lib.rs @@ -16,6 +16,7 @@ use std::collections::HashSet; use std::str::FromStr; +use std::sync::OnceLock; use anyhow::Context; use clap::{Arg, ArgMatches, arg}; @@ -107,6 +108,17 @@ fn client_args() -> Vec { ] } +pub fn install_default_crypto_ring_provider() { + static CALL_ONLY_ONCE: OnceLock> = OnceLock::new(); + CALL_ONLY_ONCE + .get_or_init(|| { + rustls::crypto::ring::default_provider() + .install_default() + .map_err(|_| ()) + }) + .expect("rustls crypto ring default provider installation should not fail"); +} + #[derive(Debug, Eq, PartialEq)] pub struct ClientArgs { pub cluster_endpoint: Url, diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index 01b37a34266..2bfca7de352 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -19,12 +19,12 @@ use std::collections::BTreeMap; use anyhow::Context; use colored::Colorize; use opentelemetry::global; -use quickwit_cli::busy_detector; use quickwit_cli::checklist::RED_COLOR; use quickwit_cli::cli::{CliCommand, build_cli}; #[cfg(feature = "jemalloc")] use quickwit_cli::jemalloc::start_jemalloc_metrics_loop; use quickwit_cli::logger::setup_logging_and_tracing; +use quickwit_cli::{busy_detector, install_default_crypto_ring_provider}; use quickwit_common::runtimes::scrape_tokio_runtime_metrics; use quickwit_serve::BuildInfo; use tracing::error; @@ -93,9 +93,7 @@ async fn main_impl() -> anyhow::Result<()> { } }; - rustls::crypto::ring::default_provider() - .install_default() - .expect("rustls crypto ring default provider installation should not fail"); + install_default_crypto_ring_provider(); #[cfg(feature = "jemalloc")] start_jemalloc_metrics_loop(); diff --git a/quickwit/quickwit-common/src/io.rs b/quickwit/quickwit-common/src/io.rs index 102393f33ce..69c2091c237 100644 --- a/quickwit/quickwit-common/src/io.rs +++ b/quickwit/quickwit-common/src/io.rs @@ -377,7 +377,7 @@ mod tests { } controlled_write.flush().await.unwrap(); let elapsed = start.elapsed(); - assert!(elapsed <= Duration::from_millis(5)); + assert!(elapsed <= Duration::from_millis(10)); assert_eq!(io_controls.num_bytes(), 2_000_000u64); } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index b41eba19c79..99065651db1 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -733,8 +733,18 @@ mod tests { let pipeline = IndexingPipeline::new(pipeline_params); let (_pipeline_mailbox, pipeline_handle) = universe.spawn_builder().spawn(pipeline); let (pipeline_exit_status, pipeline_statistics) = pipeline_handle.join().await; - assert_eq!(pipeline_statistics.generation, 1); - assert_eq!(pipeline_statistics.num_spawn_attempts, 1 + num_fails); + assert_eq!( + pipeline_statistics.generation, 1, + "generation is {}, expected 1", + pipeline_statistics.generation + ); + assert_eq!( + pipeline_statistics.num_spawn_attempts, + 1 + num_fails, + "num spawn attempts is {}, expected 1 + {}", + pipeline_statistics.num_spawn_attempts, + 1 + num_fails + ); assert!(pipeline_exit_status.is_success()); Ok(()) } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 36fbadbbcea..2e8bdd1e3d4 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -176,10 +176,7 @@ pub struct ResolvedClusterConfig { impl ResolvedClusterConfig { /// Start a cluster using this config and waits for the nodes to be ready pub async fn start(self) -> ClusterSandbox { - rustls::crypto::ring::default_provider() - .install_default() - .expect("rustls crypto ring default provider installation should not fail"); - + quickwit_cli::install_default_crypto_ring_provider(); let mut node_shutdown_handles = Vec::new(); let runtimes_config = RuntimesConfig::light_for_tests(); let storage_resolver = StorageResolver::unconfigured(); From 9003cf780abf46cb7d35c89ee9da84b4c2183cf6 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Fri, 28 Nov 2025 16:14:34 +0100 Subject: [PATCH 4/5] clippy --- quickwit/quickwit-search/src/leaf.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 1d6a096e423..a3f5fb63488 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -140,8 +140,8 @@ fn configure_storage_retries( /// Opens a `tantivy::Index` for the given split with several cache layers: /// - A split footer cache given by `SearcherContext.split_footer_cache`. /// - A fast fields cache given by `SearcherContext.storage_long_term_cache`. -/// - An ephemeral unbounded cache directory (whose lifetime is tied to the -/// returned `Index` if no `ByteRangeCache` is provided). +/// - An ephemeral unbounded cache directory (whose lifetime is tied to the returned `Index` if no +/// `ByteRangeCache` is provided). pub(crate) async fn open_index_with_caches( searcher_context: &SearcherContext, index_storage: Arc, @@ -472,7 +472,7 @@ async fn leaf_search_single_split( let byte_range_cache = ByteRangeCache::with_infinite_capacity(&quickwit_storage::STORAGE_METRICS.shortlived_cache); let (index, hot_directory) = open_index_with_caches( - &*ctx.searcher_context, + &ctx.searcher_context, storage, &split, Some(ctx.doc_mapper.tokenizer_manager()), From 610bd1721e25d90df746709221da361875abf869 Mon Sep 17 00:00:00 2001 From: fulmicoton Date: Fri, 28 Nov 2025 19:04:46 +0100 Subject: [PATCH 5/5] bugfix --- quickwit/quickwit-search/src/leaf.rs | 5 ++++- .../src/elasticsearch_api/rest_handler.rs | 3 ++- .../scenarii/es_compatibility/0006-term_query.yaml | 12 ++++++++++++ .../scenarii/es_compatibility/0025-msearch.yaml | 6 +++--- .../es_compatibility/0028-fast_only_field_query.yaml | 8 ++++++++ 5 files changed, 29 insertions(+), 5 deletions(-) diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index a3f5fb63488..f8aa60cf98f 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -1301,10 +1301,13 @@ fn simplify_search_request( if !can_be_better { disable_search_request_hits(&mut search_request); } + if is_trace_req { + return Some(search_request); + } if search_request.max_hits > 0 { return Some(search_request); } - if search_request.aggregation_request.is_some() && !is_trace_req { + if search_request.aggregation_request.is_some() { return Some(search_request); } if search_request.count_hits() == CountHits::CountAll { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs index 79ab0025ae3..db14c37700a 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs @@ -491,7 +491,8 @@ async fn es_compat_index_count( search_body: SearchBody, search_service: Arc, ) -> Result { - let search_params: SearchQueryParams = search_params.into(); + let mut search_params: SearchQueryParams = search_params.into(); + search_params.track_total_hits = Some(TrackTotalHits::Track(true)); let (search_request, _append_shard_doc) = build_request_for_es_api(index_id_patterns, search_params, search_body)?; let search_response: SearchResponse = search_service.root_search(search_request).await?; diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0006-term_query.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0006-term_query.yaml index 14599566959..320ea03b47b 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0006-term_query.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0006-term_query.yaml @@ -4,6 +4,7 @@ params: # this overrides the query sent in body apparently size: 3 json: + track_total_hits: true query: term: type: @@ -21,6 +22,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: type: @@ -35,6 +37,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: type: @@ -49,6 +52,7 @@ params: size: 0 # Testing the format without the "value" object json: + track_total_hits: true query: term: type: "pushevent" @@ -63,6 +67,7 @@ engines: ["elasticsearch"] params: size: 0 json: + track_total_hits: true query: term: actor.id: 1762355 @@ -75,6 +80,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: actor.id: "1762355" @@ -87,6 +93,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: actor.id: @@ -100,6 +107,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: actor.id: @@ -114,6 +122,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: id: @@ -127,6 +136,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: id: @@ -140,6 +150,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: id: 2549961272 @@ -152,6 +163,7 @@ expected: params: size: 0 json: + track_total_hits: true query: term: id: "2549961272" diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0025-msearch.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0025-msearch.yaml index 2ab8f12fa0c..e0a5c1505c4 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0025-msearch.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0025-msearch.yaml @@ -2,7 +2,7 @@ endpoint: "_msearch" method: POST ndjson: - {"index":"gharchive"} - - {"query" : {"match" : { "type": "PushEvent"}}, "size": 0} + - {"query" : {"match" : { "type": "PushEvent"}}, "size": 0, "track_total_hits": true} expected: responses: - hits: @@ -16,7 +16,7 @@ params: extra_filters: "type:PushEvent,actor.login:jadonk" ndjson: - {"index":"gharchive"} - - {"query" : {"match" : { "type": "PushEvent"}}, "size": 0} + - {"query" : {"match" : { "type": "PushEvent"}}, "size": 0, "track_total_hits": true} expected: responses: - hits: @@ -110,7 +110,7 @@ endpoint: "_msearch" method: POST ndjson: - {"index":"idontexist"} - - {"query" : {"match" : { "type": "PushEvent"}}, "size": 0} + - {"query" : {"match" : { "type": "PushEvent"}}, "size": 0, "track_total_hits": true} expected: responses: - status: 404 diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/0028-fast_only_field_query.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/0028-fast_only_field_query.yaml index 1c397f42810..8e55d54b471 100644 --- a/quickwit/rest-api-tests/scenarii/es_compatibility/0028-fast_only_field_query.yaml +++ b/quickwit/rest-api-tests/scenarii/es_compatibility/0028-fast_only_field_query.yaml @@ -5,6 +5,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: term: fast_text: "abc-123" @@ -20,6 +21,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: term: fast_text: "zzz" @@ -36,6 +38,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: terms: fast_text: @@ -54,6 +57,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: terms: fast_text: @@ -72,6 +76,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: term: obj.nested_text: "abc-123" @@ -88,6 +93,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: term: obj.nested_text: "zzz" @@ -104,6 +110,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: terms: obj.nested_text: @@ -122,6 +129,7 @@ endpoint: "fast_only/_search" params: size: 0 json: + track_total_hits: true query: terms: obj.nested_text: