Skip to content

Commit

Permalink
multi index leaf requests (#4962)
Browse files Browse the repository at this point in the history
* multi index leaf search request

execute one request per node, instead one request per index.
A leaf can receive now request over multiple indices.

Especially in cases with search requests on many indices this saves a
lot of requests and therefore memory. It also allows better control of
the memory consumption of a requests on a node, if it is not split up
over multiple requests.

* introduce IntermediateLeafResult

LeafSearchResponse includes serialized aggregations to send them between
nodes. This is used also on a leaf, which causes the results to be
serialized and then deserialized for merging again _per_ split.

This PR introduces IntermediateLeafResult to be used instead.
It also adds a field `aggregation_type` to LeafSearchResponse to be able
to convert self contained between IntermediateLeafResult and
LeafSearchResponse.

* multiple incremental collectors, pass agg limits

* Revert "introduce IntermediateLeafResult"

This reverts commit c968e4d.

* fmt, cleanup

* remove outer incremental collector

* cleanup

* fix test

* fix test

* cleanup

* minor fixes

* clippy
  • Loading branch information
PSeitz committed May 16, 2024
1 parent 2ceab75 commit e7091f8
Show file tree
Hide file tree
Showing 11 changed files with 578 additions and 194 deletions.
16 changes: 7 additions & 9 deletions quickwit/quickwit-janitor/src/actors/delete_task_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use quickwit_proto::metastore::{
};
use quickwit_proto::search::SearchRequest;
use quickwit_proto::types::IndexUid;
use quickwit_search::{jobs_to_leaf_requests, IndexMetasForLeafSearch, SearchJob, SearchJobPlacer};
use quickwit_search::{jobs_to_leaf_request, IndexMetasForLeafSearch, SearchJob, SearchJobPlacer};
use serde::Serialize;
use tantivy::Inventory;
use tracing::{debug, info};
Expand Down Expand Up @@ -325,17 +325,15 @@ impl DeleteTaskPlanner {
index_uri,
},
);
let leaf_search_request = jobs_to_leaf_requests(
let leaf_search_request = jobs_to_leaf_request(
&search_request,
&search_indexes_metas,
vec![search_job.clone()],
)?;
for leaf_request in leaf_search_request {
let response = search_client.leaf_search(leaf_request).await?;
ctx.record_progress();
if response.num_hits > 0 {
return Ok(true);
}
let response = search_client.leaf_search(leaf_search_request).await?;
ctx.record_progress();
if response.num_hits > 0 {
return Ok(true);
}
}
Ok(false)
Expand Down Expand Up @@ -522,7 +520,7 @@ mod tests {
move |request: LeafSearchRequest| {
// Search on body:delete should return one hit only on the last split
// that should contains the doc.
if request.split_offsets[0].split_id == split_id_with_doc_to_delete
if request.leaf_requests[0].split_offsets[0].split_id == split_id_with_doc_to_delete
&& request.search_request.as_ref().unwrap().query_ast == body_delete_ast
{
return Ok(LeafSearchResponse {
Expand Down
26 changes: 20 additions & 6 deletions quickwit/quickwit-proto/protos/quickwit/search.proto
Original file line number Diff line number Diff line change
Expand Up @@ -309,22 +309,36 @@ message SplitSearchError {
bool retryable_error = 3;
}

/// A LeafSearchRequest can span multiple indices.
///
message LeafSearchRequest {
// Search request. This is a perfect copy of the original search request,
// that was sent to root apart from the start_offset & max_hits params.
SearchRequest search_request = 1;

// Index split ids to apply the query on.
// This ids are resolved from the index_uri defined in the search_request.
repeated SplitIdAndFooterOffsets split_offsets = 4;
// List of leaf requests, one per index.
repeated LeafRequestRef leaf_requests = 7;

// `DocMapper` as json serialized trait.
string doc_mapper = 5;
// List of unique doc_mappers serialized as json.
repeated string doc_mappers = 8;

// List of index uris
// Index URI. The index URI defines the location of the storage that contains the
// split files.
string index_uri = 6;
repeated string index_uris = 9;
}

/// LeafRequestRef references data in LeafSearchRequest to deduplicate data.
message LeafRequestRef {
// The ordinal of the doc_mapper in `LeafSearchRequest.doc_mappers`
uint32 doc_mapper_ord = 1;

// The ordinal of the index uri in LeafSearchRequest.index_uris
uint32 index_uri_ord = 2;

// Index split ids to apply the query on.
// This ids are resolved from the index_uri defined in the search_request.
repeated SplitIdAndFooterOffsets split_offsets = 3;
}

message SplitIdAndFooterOffsets {
Expand Down
34 changes: 26 additions & 8 deletions quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 28 additions & 24 deletions quickwit/quickwit-search/src/cluster_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ impl ClusterClient {
let mut response_res = client.leaf_search(request.clone()).await;
let retry_policy = LeafSearchRetryPolicy {};
if let Some(retry_request) = retry_policy.retry_request(request, &response_res) {
assert!(!retry_request.split_offsets.is_empty());
assert!(!retry_request.leaf_requests.is_empty());
client = retry_client(
&self.search_job_placer,
client.grpc_addr(),
&retry_request.split_offsets[0].split_id,
&retry_request.leaf_requests[0].split_offsets[0].split_id,
)
.await?;
debug!(
Expand Down Expand Up @@ -351,8 +351,8 @@ mod tests {
use std::net::SocketAddr;

use quickwit_proto::search::{
PartialHit, SearchRequest, SearchStreamRequest, SortValue, SplitIdAndFooterOffsets,
SplitSearchError,
LeafRequestRef, PartialHit, SearchRequest, SearchStreamRequest, SortValue,
SplitIdAndFooterOffsets, SplitSearchError,
};
use quickwit_query::query_ast::qast_json_helper;

Expand Down Expand Up @@ -394,24 +394,28 @@ mod tests {
};
LeafSearchRequest {
search_request: Some(search_request),
doc_mapper: "doc_mapper".to_string(),
index_uri: "uri".to_string(),
split_offsets: vec![
SplitIdAndFooterOffsets {
split_id: "split_1".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
},
],
doc_mappers: vec!["doc_mapper".to_string()],
index_uris: vec!["uri".to_string()],
leaf_requests: vec![LeafRequestRef {
index_uri_ord: 0,
doc_mapper_ord: 0,
split_offsets: vec![
SplitIdAndFooterOffsets {
split_id: "split_1".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
},
SplitIdAndFooterOffsets {
split_id: "split_2".to_string(),
split_footer_start: 0,
split_footer_end: 100,
timestamp_start: None,
timestamp_end: None,
},
],
}],
}
}

Expand Down Expand Up @@ -555,7 +559,7 @@ mod tests {
let mut mock_search_service = MockSearchService::new();
mock_search_service
.expect_leaf_search()
.withf(|request| request.split_offsets[0].split_id == "split_1")
.withf(|request| request.leaf_requests[0].split_offsets[0].split_id == "split_1")
.return_once(|_: LeafSearchRequest| {
Ok(LeafSearchResponse {
num_hits: 1,
Expand All @@ -570,7 +574,7 @@ mod tests {
});
mock_search_service
.expect_leaf_search()
.withf(|request| request.split_offsets[0].split_id == "split_2")
.withf(|request| request.leaf_requests[0].split_offsets[0].split_id == "split_2")
.return_once(|_: LeafSearchRequest| {
Ok(LeafSearchResponse {
num_hits: 1,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-search/src/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1187,7 +1187,7 @@ impl IncrementalCollector {
}

/// Merge one search result with the current state
pub(crate) fn add_split(&mut self, leaf_response: LeafSearchResponse) -> tantivy::Result<()> {
pub(crate) fn add_result(&mut self, leaf_response: LeafSearchResponse) -> tantivy::Result<()> {
let LeafSearchResponse {
num_hits,
partial_hits,
Expand Down Expand Up @@ -1766,7 +1766,7 @@ mod tests {
.unwrap();

for split_result in results {
incremental_collector.add_split(split_result).unwrap();
incremental_collector.add_result(split_result).unwrap();
}

let incremental_result = incremental_collector.finalize().unwrap();
Expand Down
Loading

0 comments on commit e7091f8

Please sign in to comment.