Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ enum-iterator,https://github.com/stephaneyfx/enum-iterator,MIT,Stephane Raux <st
enum-iterator-derive,https://github.com/stephaneyfx/enum-iterator,0BSD,Stephane Raux <stephaneyfx@gmail.com>
env_logger,https://github.com/rust-cli/env_logger,MIT OR Apache-2.0,The env_logger Authors
equivalent,https://github.com/indexmap-rs/equivalent,Apache-2.0 OR MIT,The equivalent Authors
erased-serde,https://github.com/dtolnay/erased-serde,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
errno,https://github.com/lambda-fairy/rust-errno,MIT OR Apache-2.0,"Chris Wong <lambda.fairy@gmail.com>, Dan Gohman <dev@sunfishcode.online>"
fail,https://github.com/tikv/fail-rs,Apache-2.0,The TiKV Project Developers
fastdivide,https://github.com/fulmicoton/fastdivide,zlib-acknowledgement OR MIT,Paul Masurel <paul.masurel@gmail.com>
Expand Down Expand Up @@ -229,6 +230,7 @@ indexmap,https://github.com/indexmap-rs/indexmap,Apache-2.0 OR MIT,The indexmap
indicatif,https://github.com/console-rs/indicatif,MIT,The indicatif Authors
inout,https://github.com/RustCrypto/utils,MIT OR Apache-2.0,RustCrypto Developers
instant,https://github.com/sebcrozet/instant,BSD-3-Clause,sebcrozet <developer@crozet.re>
inventory,https://github.com/dtolnay/inventory,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
io-uring,https://github.com/tokio-rs/io-uring,MIT OR Apache-2.0,quininer <quininer@live.com>
ipnet,https://github.com/krisprice/ipnet,MIT OR Apache-2.0,Kris Price <kris@krisprice.nz>
ipnetwork,https://github.com/achanda/ipnetwork,MIT OR Apache-2.0,"Abhishek Chanda <abhishek.becs@gmail.com>, Linus Färnstrand <faern@faern.net>"
Expand Down Expand Up @@ -512,7 +514,10 @@ tracing-serde,https://github.com/tokio-rs/tracing,MIT,Tokio Contributors <team@t
tracing-subscriber,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman <eliza@buoyant.io>, David Barsky <me@davidbarsky.com>, Tokio Contributors <team@tokio.rs>"
try-lock,https://github.com/seanmonstar/try-lock,MIT,Sean McArthur <sean@seanmonstar.com>
ttl_cache,https://github.com/stusmall/ttl_cache,MIT OR Apache-2.0,Stu Small <stuart.alan.small@gmail.com>
typeid,https://github.com/dtolnay/typeid,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
typenum,https://github.com/paholg/typenum,MIT OR Apache-2.0,"Paho Lurie-Gregg <paho@paholg.com>, Andre Bogus <bogusandre@gmail.com>"
typetag,https://github.com/dtolnay/typetag,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
typetag-impl,https://github.com/dtolnay/typetag,MIT OR Apache-2.0,David Tolnay <dtolnay@gmail.com>
ulid,https://github.com/dylanhart/ulid-rs,MIT,dylanhart <dylan96hart@gmail.com>
unarray,https://github.com/cameron1024/unarray,MIT OR Apache-2.0,The unarray Authors
unicase,https://github.com/seanmonstar/unicase,MIT OR Apache-2.0,Sean McArthur <sean@seanmonstar.com>
Expand Down
68 changes: 59 additions & 9 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -346,7 +346,7 @@ quickwit-serve = { path = "quickwit-serve" }
quickwit-storage = { path = "quickwit-storage" }
quickwit-telemetry = { path = "quickwit-telemetry" }

tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "dabcaa5", default-features = false, features = [
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "70e591e", default-features = false, features = [
"lz4-compression",
"mmap",
"quickwit",
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-query/src/aggregations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,9 @@ impl From<TantivyBucketResult> for BucketResult {
sum_other_doc_count,
doc_count_error_upper_bound,
},
TantivyBucketResult::Filter(_filter_bucket_result) => {
unimplemented!("filter aggregation is not yet supported in quickwit")
}
}
}
}
Expand Down
24 changes: 16 additions & 8 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ use quickwit_proto::types::SplitId;
use serde::Deserialize;
use tantivy::aggregation::agg_req::{Aggregations, get_fast_field_names};
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use tantivy::aggregation::{AggregationLimitsGuard, AggregationSegmentCollector};
use tantivy::aggregation::{AggContextParams, AggregationLimitsGuard, AggregationSegmentCollector};
use tantivy::collector::{Collector, SegmentCollector};
use tantivy::columnar::{ColumnType, MonotonicallyMappableToU64};
use tantivy::fastfield::Column;
use tantivy::tokenizer::TokenizerManager;
use tantivy::{DateTime, DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};

use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector, Span};
Expand Down Expand Up @@ -715,7 +716,7 @@ pub(crate) struct QuickwitCollector {
pub max_hits: usize,
pub sort_by: SortByPair,
pub aggregation: Option<QuickwitAggregations>,
pub aggregation_limits: AggregationLimitsGuard,
pub agg_context_params: AggContextParams,
search_after: Option<PartialHit>,
}

Expand Down Expand Up @@ -785,7 +786,7 @@ impl Collector for QuickwitCollector {
aggs,
segment_reader,
segment_ord,
&self.aggregation_limits,
&self.agg_context_params,
)?,
),
),
Expand Down Expand Up @@ -1033,7 +1034,7 @@ pub(crate) fn sort_by_from_request(search_request: &SearchRequest) -> SortByPair
pub(crate) fn make_collector_for_split(
split_id: SplitId,
search_request: &SearchRequest,
aggregation_limits: AggregationLimitsGuard,
agg_context_params: AggContextParams,
) -> crate::Result<QuickwitCollector> {
let aggregation = match &search_request.aggregation_request {
Some(aggregation) => Some(serde_json::from_str(aggregation)?),
Expand All @@ -1046,16 +1047,23 @@ pub(crate) fn make_collector_for_split(
max_hits: search_request.max_hits as usize,
sort_by,
aggregation,
aggregation_limits,
agg_context_params,
search_after: search_request.search_after.clone(),
})
}

/// Builds a QuickwitCollector that's only useful for merging fruits.
pub(crate) fn make_merge_collector(
search_request: &SearchRequest,
aggregation_limits: &AggregationLimitsGuard,
agg_limits: AggregationLimitsGuard,
) -> crate::Result<QuickwitCollector> {
// Note: at this point the tokenizer manager is not used anymore by aggregations (filter query),
// so we can create an empty one. So if it will ever be used, it would panic.
let agg_context_params = AggContextParams {
limits: agg_limits,
tokenizers: TokenizerManager::new(),
};

let aggregation = match &search_request.aggregation_request {
Some(aggregation) => Some(serde_json::from_str(aggregation)?),
None => None,
Expand All @@ -1067,7 +1075,7 @@ pub(crate) fn make_merge_collector(
max_hits: search_request.max_hits as usize,
sort_by,
aggregation,
aggregation_limits: aggregation_limits.clone(),
agg_context_params,
search_after: search_request.search_after.clone(),
})
}
Expand Down Expand Up @@ -1748,7 +1756,7 @@ mod tests {
request: &SearchRequest,
results: Vec<LeafSearchResponse>,
) -> LeafSearchResponse {
let collector = make_merge_collector(request, &Default::default()).unwrap();
let collector = make_merge_collector(request, Default::default()).unwrap();
let mut incremental_collector = IncrementalCollector::new(collector.clone());

let result = collector
Expand Down
12 changes: 8 additions & 4 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ use quickwit_storage::{
BundleStorage, ByteRangeCache, MemorySizedCache, OwnedBytes, SplitCache, Storage,
StorageResolver, TimeoutAndRetryStorage, wrap_storage_with_cache,
};
use tantivy::aggregation::AggregationLimitsGuard;
use tantivy::aggregation::agg_req::{AggregationVariants, Aggregations};
use tantivy::aggregation::{AggContextParams, AggregationLimitsGuard};
use tantivy::directory::FileSlice;
use tantivy::fastfield::FastFieldReaders;
use tantivy::schema::Field;
Expand Down Expand Up @@ -489,8 +489,12 @@ async fn leaf_search_single_split(
.try_into()?;
let searcher = reader.searcher();

let agg_context_params = AggContextParams {
limits: aggregations_limits,
tokenizers: doc_mapper.tokenizer_manager().tantivy_manager().clone(),
};
let mut collector =
make_collector_for_split(split_id.clone(), &search_request, aggregations_limits)?;
make_collector_for_split(split_id.clone(), &search_request, agg_context_params)?;

let split_schema = index.schema();
let (query, mut warmup_info) = doc_mapper.query(split_schema.clone(), &query_ast, false)?;
Expand Down Expand Up @@ -1226,7 +1230,7 @@ pub async fn multi_index_leaf_search(
try_join_all(leaf_request_tasks),
)
.await??;
let merge_collector = make_merge_collector(&search_request, &aggregation_limits)?;
let merge_collector = make_merge_collector(&search_request, aggregation_limits)?;
let mut incremental_merge_collector = IncrementalCollector::new(merge_collector);
for result in leaf_responses {
match result {
Expand Down Expand Up @@ -1310,7 +1314,7 @@ pub async fn single_doc_mapping_leaf_search(
let mut leaf_search_single_split_join_handles: Vec<(String, tokio::task::JoinHandle<()>)> =
Vec::with_capacity(split_with_req.len());

let merge_collector = make_merge_collector(&request, &aggregations_limits)?;
let merge_collector = make_merge_collector(&request, aggregations_limits.clone())?;
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));

Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -751,9 +751,8 @@ pub(crate) async fn search_partial_hits_phase(
try_join_all(leaf_request_tasks).await?
};

// Creates a collector which merges responses into one
let merge_collector =
make_merge_collector(search_request, &searcher_context.get_aggregation_limits())?;
make_merge_collector(search_request, searcher_context.get_aggregation_limits())?;

// Merging is a cpu-bound task.
// It should be executed by Tokio's blocking threads.
Expand Down Expand Up @@ -1287,7 +1286,7 @@ pub async fn search_plan(
&request_metadata.query_ast_resolved,
true,
)?;
let merge_collector = make_merge_collector(&search_request, &Default::default())?;
let merge_collector = make_merge_collector(&search_request, Default::default())?;
warmup_info.merge(merge_collector.warmup_info());
warmup_info.simplify();

Expand Down
Loading