Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use warp::{Filter, Rejection};

use super::model::{
CatIndexQueryParams, DeleteQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
MultiSearchQueryParams, SearchQueryParamsCount,
IndexMappingQueryParams, MultiSearchQueryParams, SearchQueryParamsCount,
};
use crate::Body;
use crate::decompression::get_body_bytes;
Expand Down Expand Up @@ -285,9 +285,10 @@ pub(crate) fn elastic_aliases_filter() -> impl Filter<Extract = (), Error = Reje
}

pub(crate) fn elastic_index_mapping_filter()
-> impl Filter<Extract = (String,), Error = Rejection> + Clone {
-> impl Filter<Extract = (String, IndexMappingQueryParams), Error = Rejection> + Clone {
warp::path!("_elastic" / String / "_mapping")
.or(warp::path!("_elastic" / String / "_mappings"))
.unify()
.and(warp::get())
.and(warp::query())
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright 2021-Present Datadog, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use serde::{Deserialize, Serialize};

/// Query parameters for `_mapping(s)`. Unknown params are silently ignored.
///
/// Timestamps are epoch seconds, half-open `[start, end)` — forwarded into
/// `ListFieldsRequest` to prune splits. `field_patterns` is a comma-separated
/// hint mirroring `ListFieldsRequest.field_patterns`: it is pushed down to the
/// leaves for dynamic-field filtering and, when every pattern matches a flat
/// declared field, triggers a fast path that skips `list_fields` entirely.
#[serde_with::skip_serializing_none]
#[derive(Default, Debug, Clone, Serialize, Deserialize)]
pub struct IndexMappingQueryParams {
#[serde(default)]
pub start_timestamp: Option<i64>,
#[serde(default)]
pub end_timestamp: Option<i64>,
#[serde(default)]
pub field_patterns: Option<String>,
}

#[cfg(test)]
mod tests {
use super::IndexMappingQueryParams;

#[test]
fn empty_query_string_yields_none() {
let params: IndexMappingQueryParams = serde_qs::from_str("").unwrap();
assert!(params.start_timestamp.is_none());
assert!(params.end_timestamp.is_none());
assert!(params.field_patterns.is_none());
}

#[test]
fn both_params_present_yield_some() {
let qs = "start_timestamp=1712160204&end_timestamp=1712764984";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1712160204));
assert_eq!(params.end_timestamp, Some(1712764984));
assert!(params.field_patterns.is_none());
}

#[test]
fn only_start_timestamp_present() {
let qs = "start_timestamp=1712160204";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1712160204));
assert!(params.end_timestamp.is_none());
}

#[test]
fn only_end_timestamp_present() {
let qs = "end_timestamp=1712764984";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert!(params.start_timestamp.is_none());
assert_eq!(params.end_timestamp, Some(1712764984));
}

#[test]
fn unknown_field_is_ignored() {
let qs = "start_timestamp=1&pretty=true&ignore_unavailable=true";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert!(params.end_timestamp.is_none());
assert!(params.field_patterns.is_none());
}

#[test]
fn field_patterns_param_present() {
let qs = "field_patterns=host,message,status";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(
params.field_patterns.as_deref(),
Some("host,message,status")
);
}

#[test]
fn field_patterns_combined_with_timestamps() {
let qs = "start_timestamp=1&end_timestamp=2&field_patterns=host";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert_eq!(params.start_timestamp, Some(1));
assert_eq!(params.end_timestamp, Some(2));
assert_eq!(params.field_patterns.as_deref(), Some("host"));
}

#[test]
fn empty_field_patterns_value() {
// `serde_qs` collapses an empty value to `None`.
let qs = "field_patterns=";
let params: IndexMappingQueryParams = serde_qs::from_str(qs).unwrap();
assert!(params.field_patterns.is_none());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ enum FieldMapping {
}

impl ElasticsearchMappingsResponse {
/// Builds a response from declared doc-mapping field mappings, optionally
/// merged with dynamic fields discovered via `ListFields`. Dynamic-field
/// filtering (when requested via `field_patterns`) is handled at the
/// leaves; this function does not filter on its own.
pub fn from_doc_mapping(
indexes_metadata: Vec<IndexMetadata>,
list_fields_response: Option<&ListFieldsResponse>,
Expand Down Expand Up @@ -272,10 +276,10 @@ mod tests {
assert_eq!(meta["properties"]["source"]["type"], "keyword");
}

use quickwit_proto::search::ListFieldsEntry;

#[test]
fn test_merge_dynamic_fields_skips_existing_and_internal() {
use quickwit_proto::search::ListFieldsEntry;

let mut properties = HashMap::new();
properties.insert("title".to_string(), FieldMapping::Leaf { typ: "text" });

Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mod bulk_query_params;
mod cat_indices;
mod error;
mod field_capability;
mod index_mapping_query_params;
mod mappings;
mod multi_search;
mod scroll;
Expand All @@ -36,6 +37,7 @@ pub use field_capability::{
FieldCapabilityQueryParams, FieldCapabilityRequestBody, FieldCapabilityResponse,
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
};
pub use index_mapping_query_params::IndexMappingQueryParams;
pub(crate) use mappings::ElasticsearchMappingsResponse;
pub use multi_search::{
MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse,
Expand Down
78 changes: 68 additions & 10 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::collections::{HashMap, HashSet};
use std::str::from_utf8;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -58,9 +58,9 @@ use super::model::{
CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError,
ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse,
ElasticsearchResponse, ElasticsearchStatsResponse, FieldCapabilityQueryParams,
FieldCapabilityRequestBody, FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams,
MultiSearchResponse, MultiSearchSingleResponse, ScrollQueryParams, SearchBody,
SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry,
FieldCapabilityRequestBody, FieldCapabilityResponse, IndexMappingQueryParams,
MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse, MultiSearchSingleResponse,
ScrollQueryParams, SearchBody, SearchQueryParams, SearchQueryParamsCount, StatsResponseEntry,
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
};
use super::{TrackTotalHits, make_elastic_api_response};
Expand Down Expand Up @@ -199,8 +199,45 @@ async fn get_index_metadata(
Ok(index_metadata)
}

/// Splits `field_patterns=` on commas, trims, drops empties. `None` if nothing
/// usable remains.
fn parse_field_patterns(params: &IndexMappingQueryParams) -> Option<Vec<String>> {
let raw = params.field_patterns.as_deref()?;
let patterns: Vec<String> = raw
.split(',')
.map(str::trim)
.filter(|s| !s.is_empty())
.map(str::to_string)
.collect();
if patterns.is_empty() {
None
} else {
Some(patterns)
}
}

fn collect_declared_top_level_names(indexes_metadata: &[IndexMetadata]) -> HashSet<String> {
let upper_bound: usize = indexes_metadata
.iter()
.map(|m| m.index_config.doc_mapping.field_mappings.len())
.sum();
let mut names = HashSet::with_capacity(upper_bound);
for metadata in indexes_metadata {
for entry in &metadata.index_config.doc_mapping.field_mappings {
names.insert(entry.name.clone());
}
}
names
}

/// `_mapping(s)` handler. `field_patterns` is a hint: when every requested
/// pattern matches a flat declared field, we skip `list_fields` and return the
/// declared schema directly. Anything else falls through to a split fan-out
/// filtered by `[start, end)`, with the same patterns pushed down to the
/// leaves for dynamic-field filtering.
pub(crate) async fn es_compat_index_mapping(
index_id: String,
params: IndexMappingQueryParams,
mut metastore: MetastoreServiceClient,
search_service: Arc<dyn SearchService>,
) -> Result<ElasticsearchMappingsResponse, ElasticsearchError> {
Expand All @@ -214,22 +251,43 @@ pub(crate) async fn es_compat_index_mapping(
.iter()
.map(|m| m.index_id().to_string())
.collect();

let field_patterns = parse_field_patterns(&params);

// Fast path: every requested name is a flat declared field — skip
// `list_fields` entirely. The declared schema is returned as-is; dynamic
// discovery is not needed.
if let Some(field_patterns) = &field_patterns {
let declared_top: HashSet<String> = collect_declared_top_level_names(&indexes_metadata);
let all_declared = field_patterns.iter().all(|pattern| {
!pattern.contains('*')
&& !pattern.contains('?')
&& !pattern.contains('.')
&& declared_top.contains(pattern.as_str())
});
if all_declared {
return Ok(ElasticsearchMappingsResponse::from_doc_mapping(
indexes_metadata,
None,
));
}
}

let list_fields_request = quickwit_proto::search::ListFieldsRequest {
index_id_patterns,
field_patterns: Vec::new(),
start_timestamp: None,
end_timestamp: None,
field_patterns: field_patterns.unwrap_or_default(),
start_timestamp: params.start_timestamp,
end_timestamp: params.end_timestamp,
query_ast: None,
};
let list_fields_response = search_service
.root_list_fields(list_fields_request)
.await
.ok();
let response = ElasticsearchMappingsResponse::from_doc_mapping(
Ok(ElasticsearchMappingsResponse::from_doc_mapping(
indexes_metadata,
list_fields_response.as_ref(),
);
Ok(response)
))
}

/// GET or POST _elastic/_search
Expand Down
Loading