Skip to content

Commit

Permalink
Add time_series_dimension property into _field_caps API method
Browse files Browse the repository at this point in the history
`time_series_dimension` determine whether field will be used by Opensearch Dashboards to build histogram or not.

Fixing issue: #5003
  • Loading branch information
kuzaxak committed May 22, 2024
1 parent ec502f0 commit bc8af38
Show file tree
Hide file tree
Showing 3 changed files with 144 additions and 4 deletions.
1 change: 1 addition & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn elastic_api_handlers(
.or(es_compat_index_multi_search_handler(search_service.clone()))
.or(es_compat_index_field_capabilities_handler(
search_service.clone(),
metastore.clone(),
))
.or(es_compat_bulk_handler(
ingest_service.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use std::collections::HashMap;

use quickwit_proto::search::{ListFieldType, ListFieldsEntryResponse, ListFieldsResponse};
use serde::{Deserialize, Serialize};
use quickwit_metastore::IndexMetadata;

use super::search_query_params::*;
use super::ElasticsearchError;
Expand Down Expand Up @@ -61,14 +62,14 @@ pub struct FieldCapabilityRequestBody {
pub runtime_mappings: serde_json::Value,
}

#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
pub struct FieldCapabilityResponse {
indices: Vec<String>,
fields: HashMap<String, FieldCapabilityFieldTypesResponse>,
}

type FieldCapabilityFieldTypesResponse =
HashMap<FieldCapabilityEntryType, FieldCapabilityEntryResponse>;
HashMap<FieldCapabilityEntryType, FieldCapabilityEntryResponse>;

#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq, Hash)]
enum FieldCapabilityEntryType {
Expand Down Expand Up @@ -96,11 +97,13 @@ enum FieldCapabilityEntryType {
Object,
}

#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
struct FieldCapabilityEntryResponse {
metadata_field: bool, // Always false
searchable: bool,
aggregatable: bool,
// Option since for non-time-series indices this field is not present.
time_series_dimension: Option<bool>,
// Option since it is filled later
#[serde(rename = "type")]
typ: Option<FieldCapabilityEntryType>,
Expand All @@ -117,6 +120,7 @@ impl FieldCapabilityEntryResponse {
metadata_field: false,
searchable: entry.searchable,
aggregatable: entry.aggregatable,
time_series_dimension: None,
typ: None,
indices: entry.index_ids.clone(),
non_aggregatable_indices: entry.non_aggregatable_index_ids,
Expand All @@ -133,6 +137,7 @@ struct FieldCapabilityEntry {

pub fn convert_to_es_field_capabilities_response(
resp: ListFieldsResponse,
index_metadata: Vec<IndexMetadata>,
) -> FieldCapabilityResponse {
let mut indices = resp
.fields
Expand All @@ -142,13 +147,20 @@ pub fn convert_to_es_field_capabilities_response(
indices.sort();
indices.dedup();

let timestamp_fields: Vec<String> = index_metadata
.iter()
.filter_map(|index| index.index_config.doc_mapping.timestamp_field.clone())
.collect();

let mut fields: HashMap<String, FieldCapabilityFieldTypesResponse> = HashMap::new();
for list_field_resp in resp.fields {
let entry = fields
.entry(list_field_resp.field_name.to_string())
.or_default();

let field_type = ListFieldType::from_i32(list_field_resp.field_type).unwrap();
let field_name = list_field_resp.field_name.to_string();

let add_entry =
FieldCapabilityEntryResponse::from_list_field_entry_response(list_field_resp);
let types = match field_type {
Expand Down Expand Up @@ -178,6 +190,11 @@ pub fn convert_to_es_field_capabilities_response(
add_entry.indices = Vec::new();
}

// Check if the field_name is in timestamp_fields and set time_series_dimension
if timestamp_fields.contains(&field_name) {
add_entry.time_series_dimension = Some(true);
}

entry.insert(field_type, add_entry);
}
}
Expand All @@ -196,3 +213,120 @@ pub fn build_list_field_request_for_es_api(
end_timestamp: search_params.end_timestamp,
})
}

#[cfg(test)]
mod tests {
use super::*;
use std::collections::{BTreeSet};
use std::num::NonZeroU32;
use quickwit_common::uri::Uri;
use quickwit_config::{DocMapping, IndexConfig, IndexingSettings, SearchSettings};
use quickwit_doc_mapper::Mode;

#[test]
fn test_convert_to_es_field_capabilities_response() {
// Define test input data
let list_fields_response = ListFieldsResponse {
fields: vec![
ListFieldsEntryResponse {
field_name: "timestamp_field_1".to_string(),
field_type: ListFieldType::Date as i32,
index_ids: vec!["index1".to_string()],
searchable: true,
aggregatable: true,
non_aggregatable_index_ids: vec![],
non_searchable_index_ids: vec![],
},
ListFieldsEntryResponse {
field_name: "non_timestamp_field".to_string(),
field_type: ListFieldType::Str as i32,
index_ids: vec!["index1".to_string()],
searchable: true,
aggregatable: true,
non_aggregatable_index_ids: vec![],
non_searchable_index_ids: vec![],
},
],
};

let index_metadata = vec![
IndexMetadata::new(IndexConfig {
index_id: "index1".parse().unwrap(),
index_uri: Uri::for_test("s3://config.json"),
doc_mapping: DocMapping {
field_mappings: vec![],
tag_fields: BTreeSet::new(),
store_source: false,
index_field_presence: false,
timestamp_field: Some("timestamp_field_1".to_string()),
mode: Mode::default(),
partition_key: None,
max_num_partitions: NonZeroU32::new(1).unwrap(),
tokenizers: vec![],
document_length: false,
},
indexing_settings: IndexingSettings::default(),
search_settings: SearchSettings::default(),
retention_policy_opt: None,
})
];

// Expected output data
let mut expected_fields: HashMap<String, FieldCapabilityFieldTypesResponse> = HashMap::new();
let mut timestamp_field_1_entry: FieldCapabilityFieldTypesResponse = HashMap::new();
timestamp_field_1_entry.insert(
FieldCapabilityEntryType::DateNanos,
FieldCapabilityEntryResponse {
metadata_field: false,
searchable: true,
aggregatable: true,
time_series_dimension: Some(true),
typ: Some(FieldCapabilityEntryType::DateNanos),
indices: vec![],
non_aggregatable_indices: vec![],
non_searchable_indices: vec![],
},
);
expected_fields.insert("timestamp_field_1".to_string(), timestamp_field_1_entry);

let mut non_timestamp_field_entry: FieldCapabilityFieldTypesResponse = HashMap::new();
non_timestamp_field_entry.insert(
FieldCapabilityEntryType::Text,
FieldCapabilityEntryResponse {
metadata_field: false,
searchable: true,
aggregatable: true,
time_series_dimension: None,
typ: Some(FieldCapabilityEntryType::Text),
indices: vec![],
non_aggregatable_indices: vec![],
non_searchable_indices: vec![],
},
);
non_timestamp_field_entry.insert(
FieldCapabilityEntryType::Keyword,
FieldCapabilityEntryResponse {
metadata_field: false,
searchable: true,
aggregatable: true,
time_series_dimension: None,
typ: Some(FieldCapabilityEntryType::Keyword),
indices: vec![],
non_aggregatable_indices: vec![],
non_searchable_indices: vec![],
},
);
expected_fields.insert("non_timestamp_field".to_string(), non_timestamp_field_entry);

let expected_response = FieldCapabilityResponse {
indices: vec!["index1".to_string()],
fields: expected_fields,
};

// Call the function with test data
let result = convert_to_es_field_capabilities_response(list_fields_response, index_metadata);

// Verify the output
assert_eq!(result, expected_response);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,13 @@ pub fn es_compat_search_handler(
/// GET or POST _elastic/{index}/_field_caps
pub fn es_compat_index_field_capabilities_handler(
search_service: Arc<dyn SearchService>,
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_field_capabilities_filter()
.or(elastic_field_capabilities_filter())
.unify()
.and(with_arg(search_service))
.and(with_arg(metastore_service))
.then(es_compat_index_field_capabilities)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}
Expand Down Expand Up @@ -547,13 +549,16 @@ async fn es_compat_index_field_capabilities(
search_params: FieldCapabilityQueryParams,
search_body: FieldCapabilityRequestBody,
search_service: Arc<dyn SearchService>,
mut metastore: MetastoreServiceClient,
) -> Result<FieldCapabilityResponse, ElasticsearchError> {
let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?;

let search_request =
build_list_field_request_for_es_api(index_id_patterns, search_params, search_body)?;
let search_response: ListFieldsResponse =
search_service.root_list_fields(search_request).await?;
let search_response_rest: FieldCapabilityResponse =
convert_to_es_field_capabilities_response(search_response);
convert_to_es_field_capabilities_response(search_response, indexes_metadata);
Ok(search_response_rest)
}

Expand Down

0 comments on commit bc8af38

Please sign in to comment.