Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add time_series_dimension property into _field_caps API method #5004

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn elastic_api_handlers(
.or(es_compat_index_multi_search_handler(search_service.clone()))
.or(es_compat_index_field_capabilities_handler(
search_service.clone(),
metastore.clone(),
))
.or(es_compat_bulk_handler(
ingest_service.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;

use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse, ListFieldsResponse};
use serde::{Deserialize, Serialize};
use quickwit_metastore::IndexMetadata;

use super::search_query_params::*;
use super::ElasticsearchError;
Expand Down Expand Up @@ -61,14 +62,14 @@ pub struct FieldCapabilityRequestBody {
pub runtime_mappings: serde_json::Value,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct FieldCapabilityResponse {
indices: Vec<String>,
fields: HashMap<String, FieldCapabilityFieldTypesResponse>,
}

type FieldCapabilityFieldTypesResponse =
HashMap<FieldCapabilityEntryType, FieldCapabilityEntryResponse>;
HashMap<FieldCapabilityEntryType, FieldCapabilityEntryResponse>;

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
enum FieldCapabilityEntryType {
Expand Down Expand Up @@ -96,11 +97,13 @@ enum FieldCapabilityEntryType {
Object,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct FieldCapabilityEntryResponse {
metadata_field: bool, // Always false
searchable: bool,
aggregatable: bool,
// Option since for non-time-series indices this field is not present.
time_series_dimension: Option<bool>,
// Option since it is filled later
#[serde(rename = "type")]
typ: Option<FieldCapabilityEntryType>,
Expand All @@ -117,6 +120,7 @@ impl FieldCapabilityEntryResponse {
metadata_field: false,
searchable: entry.searchable,
aggregatable: entry.aggregatable,
time_series_dimension: None,
typ: None,
indices: entry.index_ids.clone(),
non_aggregatable_indices: entry.non_aggregatable_index_ids,
Expand All @@ -133,6 +137,7 @@ struct FieldCapabilityEntry {

pub fn convert_to_es_field_capabilities_response(
resp: ListFieldsResponse,
index_metadata: Vec<IndexMetadata>,
) -> FieldCapabilityResponse {
let mut indices = resp
.fields
Expand All @@ -142,13 +147,20 @@ pub fn convert_to_es_field_capabilities_response(
indices.sort();
indices.dedup();

let timestamp_fields: Vec<String> = index_metadata
.iter()
.filter_map(|index| index.index_config.doc_mapping.timestamp_field.clone())
.collect();

let mut fields: HashMap<String, FieldCapabilityFieldTypesResponse> = HashMap::new();
for list_field_resp in resp.fields {
let entry = fields
.entry(list_field_resp.field_name.to_string())
.or_default();

let field_type = ListFieldType::from_i32(list_field_resp.field_type).unwrap();
let field_name = list_field_resp.field_name.to_string();

let add_entry =
FieldCapabilityEntryResponse::from_list_field_entry_response(list_field_resp);
let types = match field_type {
Expand Down Expand Up @@ -178,6 +190,11 @@ pub fn convert_to_es_field_capabilities_response(
add_entry.indices = Vec::new();
}

// Check if the field_name is in timestamp_fields and set time_series_dimension
if timestamp_fields.contains(&field_name) {
add_entry.time_series_dimension = Some(true);
}

entry.insert(field_type, add_entry);
}
}
Expand All @@ -196,3 +213,120 @@ pub fn build_list_field_request_for_es_api(
end_timestamp: search_params.end_timestamp,
})
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::{BTreeSet};
use std::num::NonZeroU32;
use quickwit_common::uri::Uri;
use quickwit_config::{DocMapping, IndexConfig, IndexingSettings, SearchSettings};
use quickwit_doc_mapper::Mode;

#[test]
fn test_convert_to_es_field_capabilities_response() {
// Define test input data
let list_fields_response = ListFieldsResponse {
fields: vec![
ListFieldsEntryResponse {
field_name: "timestamp_field_1".to_string(),
field_type: ListFieldType::Date as i32,
index_ids: vec!["index1".to_string()],
searchable: true,
aggregatable: true,
non_aggregatable_index_ids: vec![],
non_searchable_index_ids: vec![],
},
ListFieldsEntryResponse {
field_name: "non_timestamp_field".to_string(),
field_type: ListFieldType::Str as i32,
index_ids: vec!["index1".to_string()],
searchable: true,
aggregatable: true,
non_aggregatable_index_ids: vec![],
non_searchable_index_ids: vec![],
},
],
};

let index_metadata = vec![
IndexMetadata::new(IndexConfig {
index_id: "index1".parse().unwrap(),
index_uri: Uri::for_test("s3://config.json"),
doc_mapping: DocMapping {
field_mappings: vec![],
tag_fields: BTreeSet::new(),
store_source: false,
index_field_presence: false,
timestamp_field: Some("timestamp_field_1".to_string()),
mode: Mode::default(),
partition_key: None,
max_num_partitions: NonZeroU32::new(1).unwrap(),
tokenizers: vec![],
document_length: false,
},
indexing_settings: IndexingSettings::default(),
search_settings: SearchSettings::default(),
retention_policy_opt: None,
})
];

// Expected output data
let mut expected_fields: HashMap<String, FieldCapabilityFieldTypesResponse> = HashMap::new();
let mut timestamp_field_1_entry: FieldCapabilityFieldTypesResponse = HashMap::new();
timestamp_field_1_entry.insert(
FieldCapabilityEntryType::DateNanos,
FieldCapabilityEntryResponse {
metadata_field: false,
searchable: true,
aggregatable: true,
time_series_dimension: Some(true),
typ: Some(FieldCapabilityEntryType::DateNanos),
indices: vec![],
non_aggregatable_indices: vec![],
non_searchable_indices: vec![],
},
);
expected_fields.insert("timestamp_field_1".to_string(), timestamp_field_1_entry);

let mut non_timestamp_field_entry: FieldCapabilityFieldTypesResponse = HashMap::new();
non_timestamp_field_entry.insert(
FieldCapabilityEntryType::Text,
FieldCapabilityEntryResponse {
metadata_field: false,
searchable: true,
aggregatable: true,
time_series_dimension: None,
typ: Some(FieldCapabilityEntryType::Text),
indices: vec![],
non_aggregatable_indices: vec![],
non_searchable_indices: vec![],
},
);
non_timestamp_field_entry.insert(
FieldCapabilityEntryType::Keyword,
FieldCapabilityEntryResponse {
metadata_field: false,
searchable: true,
aggregatable: true,
time_series_dimension: None,
typ: Some(FieldCapabilityEntryType::Keyword),
indices: vec![],
non_aggregatable_indices: vec![],
non_searchable_indices: vec![],
},
);
expected_fields.insert("non_timestamp_field".to_string(), non_timestamp_field_entry);

let expected_response = FieldCapabilityResponse {
indices: vec!["index1".to_string()],
fields: expected_fields,
};

// Call the function with test data
let result = convert_to_es_field_capabilities_response(list_fields_response, index_metadata);

// Verify the output
assert_eq!(result, expected_response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ pub fn es_compat_search_handler(
/// GET or POST _elastic/{index}/_field_caps
pub fn es_compat_index_field_capabilities_handler(
search_service: Arc<dyn SearchService>,
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_field_capabilities_filter()
.or(elastic_field_capabilities_filter())
.unify()
.and(with_arg(search_service))
.and(with_arg(metastore_service))
.then(es_compat_index_field_capabilities)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}
Expand Down Expand Up @@ -547,13 +549,16 @@ async fn es_compat_index_field_capabilities(
search_params: FieldCapabilityQueryParams,
search_body: FieldCapabilityRequestBody,
search_service: Arc<dyn SearchService>,
mut metastore: MetastoreServiceClient,
) -> Result<FieldCapabilityResponse, ElasticsearchError> {
let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?;
Copy link
Contributor

@fulmicoton fulmicoton May 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?;
let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?;

I'm really not fond of doubling the calls to resolve_index_patterns.
We happen to do one in root_list_fields too.

It seems like we pre-build /cache the ListFieldsEntryResponse in the packager (and indexing time) so that root_list_fields(..) is a light operation.

It would make sense to add the bool property there, in the ListFieldsEntryResponse.

message ListFieldsEntryResponse {
  ...
  // True means that the field is the timestamp field of all of the requested indexes.
  bool is_timestamp = 8;
}

We then need to know how and when we populate it.

Solution 1:

The logical place would be the packager.
Pros: that's where we build the rest.
Cons: Right now the packager does not have access to the docmapper and does not know what the timestamp field is. We would have to pass that information. It is reasonable however.
Cons2: the data is not available for existing data.

Solution 2:

Keep it set to false in the packager, and actually compute it in the root_list_fields function.
At this point we have access to the index_metadatas already.
Pros: easy
Cons: ugly.


let search_request =
build_list_field_request_for_es_api(index_id_patterns, search_params, search_body)?;
let search_response: ListFieldsResponse =
search_service.root_list_fields(search_request).await?;
let search_response_rest: FieldCapabilityResponse =
convert_to_es_field_capabilities_response(search_response);
convert_to_es_field_capabilities_response(search_response, indexes_metadata);
Ok(search_response_rest)
}

Expand Down
Loading