diff --git a/src/aggregation/metric/percentiles.rs b/src/aggregation/metric/percentiles.rs index 9f90bcbd04..87c6d547bb 100644 --- a/src/aggregation/metric/percentiles.rs +++ b/src/aggregation/metric/percentiles.rs @@ -11,7 +11,7 @@ use crate::aggregation::intermediate_agg_result::{ IntermediateAggregationResult, IntermediateAggregationResults, IntermediateMetricResult, }; use crate::aggregation::segment_agg_result::SegmentAggregationCollector; -use crate::aggregation::{f64_from_fastfield_u64, AggregationError}; +use crate::aggregation::{f64_from_fastfield_u64, f64_to_fastfield_u64, AggregationError}; use crate::{DocId, TantivyError}; /// # Percentiles @@ -134,6 +134,7 @@ pub(crate) struct SegmentPercentilesCollector { pub(crate) percentiles: PercentilesCollector, pub(crate) accessor_idx: usize, val_cache: Vec, + missing: Option, } #[derive(Clone, Serialize, Deserialize)] @@ -234,11 +235,16 @@ impl SegmentPercentilesCollector { accessor_idx: usize, ) -> crate::Result { req.validate()?; + let missing = req + .missing + .and_then(|val| f64_to_fastfield_u64(val, &field_type)); + Ok(Self { field_type, percentiles: PercentilesCollector::new(), accessor_idx, val_cache: Default::default(), + missing, }) } #[inline] @@ -247,9 +253,17 @@ impl SegmentPercentilesCollector { docs: &[DocId], agg_accessor: &mut AggregationWithAccessor, ) { - agg_accessor - .column_block_accessor - .fetch_block(docs, &agg_accessor.accessor); + if let Some(missing) = self.missing.as_ref() { + agg_accessor.column_block_accessor.fetch_block_with_missing( + docs, + &agg_accessor.accessor, + *missing, + ); + } else { + agg_accessor + .column_block_accessor + .fetch_block(docs, &agg_accessor.accessor); + } for val in agg_accessor.column_block_accessor.iter_vals() { let val1 = f64_from_fastfield_u64(val, &self.field_type); @@ -284,9 +298,22 @@ impl SegmentAggregationCollector for SegmentPercentilesCollector { ) -> crate::Result<()> { let field = &agg_with_accessor.aggs.values[self.accessor_idx].accessor; - for val in field.values_for_doc(doc) { - let val1 = f64_from_fastfield_u64(val, &self.field_type); - self.percentiles.collect(val1); + if let Some(missing) = self.missing { + let mut has_val = false; + for val in field.values_for_doc(doc) { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.percentiles.collect(val1); + has_val = true; + } + if !has_val { + self.percentiles + .collect(f64_from_fastfield_u64(missing, &self.field_type)); + } + } else { + for val in field.values_for_doc(doc) { + let val1 = f64_from_fastfield_u64(val, &self.field_type); + self.percentiles.collect(val1); + } } Ok(()) @@ -316,10 +343,12 @@ mod tests { use crate::aggregation::agg_req::Aggregations; use crate::aggregation::agg_result::AggregationResults; use crate::aggregation::tests::{ - get_test_index_from_values, get_test_index_from_values_and_terms, + exec_request_with_query, get_test_index_from_values, get_test_index_from_values_and_terms, }; use crate::aggregation::AggregationCollector; use crate::query::AllQuery; + use crate::schema::{Schema, FAST}; + use crate::Index; #[test] fn test_aggregation_percentiles_empty_index() -> crate::Result<()> { @@ -552,4 +581,110 @@ mod tests { Ok(()) } + + #[test] + fn test_percentiles_missing_sub_agg() -> crate::Result<()> { + // This test verifies the `collect` method (in contrast to `collect_block`), which is + // called when the sub-aggregations are flushed. + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("texts", FAST); + let score_field_f64 = schema_builder.add_f64_field("score", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + + { + let mut index_writer = index.writer_for_tests()?; + // writing the segment + index_writer.add_document(doc!( + score_field_f64 => 10.0f64, + text_field => "a" + ))?; + index_writer.add_document(doc!( + score_field_f64 => 10.0f64, + text_field => "a" + ))?; + + index_writer.add_document(doc!(text_field => "a"))?; + + index_writer.commit()?; + } + + let agg_req: Aggregations = { + serde_json::from_value(json!({ + "range_with_stats": { + "terms": { + "field": "texts" + }, + "aggs": { + "percentiles": { + "percentiles": { + "field": "score", + "missing": 5.0 + } + } + } + } + })) + .unwrap() + }; + + let res = exec_request_with_query(agg_req, &index, None)?; + assert_eq!(res["range_with_stats"]["buckets"][0]["doc_count"], 3); + + assert_eq!( + res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["1.0"], + 5.0028295751107414 + ); + assert_eq!( + res["range_with_stats"]["buckets"][0]["percentiles"]["values"]["99.0"], + 10.07469668951144 + ); + + Ok(()) + } + + #[test] + fn test_percentiles_missing() -> crate::Result<()> { + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field("texts", FAST); + let score_field_f64 = schema_builder.add_f64_field("score", FAST); + let schema = schema_builder.build(); + let index = Index::create_in_ram(schema); + + { + let mut index_writer = index.writer_for_tests()?; + // writing the segment + index_writer.add_document(doc!( + score_field_f64 => 10.0f64, + text_field => "a" + ))?; + index_writer.add_document(doc!( + score_field_f64 => 10.0f64, + text_field => "a" + ))?; + + index_writer.add_document(doc!(text_field => "a"))?; + + index_writer.commit()?; + } + + let agg_req: Aggregations = { + serde_json::from_value(json!({ + "percentiles": { + "percentiles": { + "field": "score", + "missing": 5.0 + } + } + })) + .unwrap() + }; + + let res = exec_request_with_query(agg_req, &index, None)?; + + assert_eq!(res["percentiles"]["values"]["1.0"], 5.0028295751107414); + assert_eq!(res["percentiles"]["values"]["99.0"], 10.07469668951144); + + Ok(()) + } }