Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alternative mixed field aggregation collection #2135

Merged
merged 2 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 48 additions & 40 deletions src/aggregation/agg_req_with_accessor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ pub struct AggregationWithAccessor {
pub(crate) accessor: Column<u64>,
pub(crate) str_dict_column: Option<StrColumn>,
pub(crate) field_type: ColumnType,
/// In case there are multiple types of fast fields, e.g. string and numeric.
/// Only used for term aggregations currently.
pub(crate) accessor2: Option<(Column<u64>, ColumnType)>,
pub(crate) sub_aggregation: AggregationsWithAccessor,
pub(crate) limits: ResourceLimitGuard,
pub(crate) column_block_accessor: ColumnBlockAccessor<u64>,
Expand All @@ -52,20 +49,31 @@ impl AggregationWithAccessor {
sub_aggregation: &Aggregations,
reader: &SegmentReader,
limits: AggregationLimits,
) -> crate::Result<AggregationWithAccessor> {
) -> crate::Result<Vec<AggregationWithAccessor>> {
let mut str_dict_column = None;
let mut accessor2 = None;
use AggregationVariants::*;
let (accessor, field_type) = match &agg.agg {
let acc_field_types: Vec<(Column, ColumnType)> = match &agg.agg {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for putting the type!

Range(RangeAggregation {
field: field_name, ..
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
Histogram(HistogramAggregation {
field: field_name, ..
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
DateHistogram(DateHistogramAggregationReq {
field: field_name, ..
}) => get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?,
}) => vec![get_ff_reader(
reader,
field_name,
Some(get_numeric_or_date_column_types()),
)?],
Terms(TermsAggregation {
field: field_name, ..
}) => {
Expand All @@ -80,11 +88,7 @@ impl AggregationWithAccessor {
// ColumnType::IpAddr Unsupported
// ColumnType::DateTime Unsupported
];
let mut columns =
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?;
let first = columns.pop().unwrap();
accessor2 = columns.pop();
first
get_all_ff_reader_or_empty(reader, field_name, Some(&allowed_column_types))?
}
Average(AverageAggregation { field: field_name })
| Count(CountAggregation { field: field_name })
Expand All @@ -95,33 +99,37 @@ impl AggregationWithAccessor {
let (accessor, field_type) =
get_ff_reader(reader, field_name, Some(get_numeric_or_date_column_types()))?;

(accessor, field_type)
vec![(accessor, field_type)]
}
Percentiles(percentiles) => {
let (accessor, field_type) = get_ff_reader(
reader,
percentiles.field_name(),
Some(get_numeric_or_date_column_types()),
)?;
(accessor, field_type)
vec![(accessor, field_type)]
}
};

let sub_aggregation = sub_aggregation.clone();
Ok(AggregationWithAccessor {
accessor,
accessor2,
field_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
&sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column,
limits: limits.new_guard(),
column_block_accessor: Default::default(),
})
let aggs: Vec<_> = acc_field_types
PSeitz marked this conversation as resolved.
Show resolved Hide resolved
.into_iter()
.map(|(accessor, field_type)| {
Ok(AggregationWithAccessor {
accessor,
field_type,
sub_aggregation: get_aggs_with_segment_accessor_and_validate(
sub_aggregation,
reader,
&limits,
)?,
agg: agg.clone(),
str_dict_column: str_dict_column.clone(),
limits: limits.new_guard(),
column_block_accessor: Default::default(),
})
})
.collect::<crate::Result<_>>()?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the return type fixes the types here, so that let aggs: ..., Ok(aggs) and the ? are unnecessary, i.e. it could just end in }).collect().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand, we need to collect to Result to forward the error

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Certainly, but the extra binding and type hints seem unnecessary because the type that is collected already matches the return type, does it not?

So in think the function could just end in

        acc_field_types
            .into_iter()
            .map(|(accessor, field_type)| {
                Ok(AggregationWithAccessor {
                    accessor,
                    field_type,
                    sub_aggregation: get_aggs_with_segment_accessor_and_validate(
                        sub_aggregation,
                        reader,
                        &limits,
                    )?,
                    agg: agg.clone(),
                    str_dict_column: str_dict_column.clone(),
                    limits: limits.new_guard(),
                    column_block_accessor: Default::default(),
                })
            })
            .collect()

Copy link
Contributor Author

@PSeitz PSeitz Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we return a Result<Vec> not Vec<Result>

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, which is what you are collecting by giving the type hint ::<crate::Result<_>> but that type hint just matches the return type. I am not suggesting to change any types, just to drop the unnecessary temporary binding and error forwarding because the collection already produces exactly the required type.

(Turning Iterator<Item=Result<S>> into Result<T> where T: FromIterator<Item=S> is the usual FromIterator impl which you invoke either way.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the binding with type assignment increases readability

Copy link
Collaborator

@adamreichold adamreichold Jul 25, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can still spell out the full return type if desired in the turbo-fish if you want to, e.g.

.collect::<crate::Result<Vec<AggregationWithAccessor>>>()

but the extra binding, error forwarding and ok wrapping are unnecessary.

(There is even a Clippy lint for unnecessary let bindings but IIRC it does not firing when ok wrapping is involved because this is sometimes necessary to change the error type via the From invocation hidding in ?.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't agree, the binding and type serves the purpose of increased readability by giving it a name and a type, which is important if the transformation gets nested

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @PSeitz . It is more readable when we put the type on the variable side.

Ok(aggs)
}
}

Expand All @@ -141,15 +149,15 @@ pub(crate) fn get_aggs_with_segment_accessor_and_validate(
) -> crate::Result<AggregationsWithAccessor> {
let mut aggss = Vec::new();
adamreichold marked this conversation as resolved.
Show resolved Hide resolved
for (key, agg) in aggs.iter() {
aggss.push((
key.to_string(),
AggregationWithAccessor::try_from_agg(
agg,
agg.sub_aggregation(),
reader,
limits.clone(),
)?,
));
let aggs = AggregationWithAccessor::try_from_agg(
agg,
agg.sub_aggregation(),
reader,
limits.clone(),
)?;
for agg in aggs {
aggss.push((key.to_string(), agg));
}
}
Ok(AggregationsWithAccessor::from_data(
VecWithNames::from_entries(aggss),
Expand Down
2 changes: 1 addition & 1 deletion src/aggregation/bucket/range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ mod tests {
SegmentRangeCollector::from_req_and_validate(
&req,
&mut Default::default(),
&mut AggregationLimits::default().new_guard(),
&AggregationLimits::default().new_guard(),
field_type,
0,
)
Expand Down
104 changes: 0 additions & 104 deletions src/aggregation/bucket/term_agg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,110 +224,6 @@ impl TermBuckets {
}
}

/// The composite collector is used, when we have different types under one field, to support a term
/// aggregation on both.
#[derive(Clone, Debug)]
pub struct SegmentTermCollectorComposite {
term_agg1: SegmentTermCollector, // field type 1, e.g. strings
term_agg2: SegmentTermCollector, // field type 2, e.g. u64
accessor_idx: usize,
}
impl SegmentAggregationCollector for SegmentTermCollectorComposite {
fn add_intermediate_aggregation_result(
self: Box<Self>,
agg_with_accessor: &AggregationsWithAccessor,
results: &mut IntermediateAggregationResults,
) -> crate::Result<()> {
let name = agg_with_accessor.aggs.keys[self.accessor_idx].to_string();
let agg_with_accessor = &agg_with_accessor.aggs.values[self.accessor_idx];

let bucket = self
.term_agg1
.into_intermediate_bucket_result(agg_with_accessor)?;
results.push(
name.to_string(),
IntermediateAggregationResult::Bucket(bucket),
)?;
let bucket = self
.term_agg2
.into_intermediate_bucket_result(agg_with_accessor)?;
results.push(name, IntermediateAggregationResult::Bucket(bucket))?;

Ok(())
}

#[inline]
fn collect(
&mut self,
doc: crate::DocId,
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.term_agg1.collect_block(&[doc], agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
self.term_agg2.collect_block(&[doc], agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
Ok(())
}

#[inline]
fn collect_block(
&mut self,
docs: &[crate::DocId],
agg_with_accessor: &mut AggregationsWithAccessor,
) -> crate::Result<()> {
self.term_agg1.collect_block(docs, agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
self.term_agg2.collect_block(docs, agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);

Ok(())
}

fn flush(&mut self, agg_with_accessor: &mut AggregationsWithAccessor) -> crate::Result<()> {
self.term_agg1.flush(agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);
self.term_agg2.flush(agg_with_accessor)?;
self.swap_accessor(&mut agg_with_accessor.aggs.values[self.accessor_idx]);

Ok(())
}
}

impl SegmentTermCollectorComposite {
/// Swaps the accessor and field type with the second accessor and field type.
/// This way we can use the same code for both aggregations.
fn swap_accessor(&self, aggregations: &mut AggregationWithAccessor) {
if let Some(accessor) = aggregations.accessor2.as_mut() {
std::mem::swap(&mut accessor.0, &mut aggregations.accessor);
std::mem::swap(&mut accessor.1, &mut aggregations.field_type);
}
}

pub(crate) fn from_req_and_validate(
req: &TermsAggregation,
sub_aggregations: &mut AggregationsWithAccessor,
field_type: ColumnType,
field_type2: ColumnType,
accessor_idx: usize,
) -> crate::Result<Self> {
Ok(Self {
term_agg1: SegmentTermCollector::from_req_and_validate(
req,
sub_aggregations,
field_type,
accessor_idx,
)?,
term_agg2: SegmentTermCollector::from_req_and_validate(
req,
sub_aggregations,
field_type2,
accessor_idx,
)?,
accessor_idx,
})
}
}

/// The collector puts values from the fast field into the correct buckets and does a conversion to
/// the correct datatype.
#[derive(Clone, Debug)]
Expand Down
27 changes: 6 additions & 21 deletions src/aggregation/segment_agg_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use super::metric::{
SegmentPercentilesCollector, SegmentStatsCollector, SegmentStatsType, StatsAggregation,
SumAggregation,
};
use crate::aggregation::bucket::SegmentTermCollectorComposite;

pub(crate) trait SegmentAggregationCollector: CollectorClone + Debug {
fn add_intermediate_aggregation_result(
Expand Down Expand Up @@ -81,26 +80,12 @@ pub(crate) fn build_single_agg_segment_collector(
) -> crate::Result<Box<dyn SegmentAggregationCollector>> {
use AggregationVariants::*;
match &req.agg.agg {
Terms(terms_req) => {
if let Some(acc2) = req.accessor2.as_ref() {
Ok(Box::new(
SegmentTermCollectorComposite::from_req_and_validate(
terms_req,
&mut req.sub_aggregation,
req.field_type,
acc2.1,
accessor_idx,
)?,
))
} else {
Ok(Box::new(SegmentTermCollector::from_req_and_validate(
terms_req,
&mut req.sub_aggregation,
req.field_type,
accessor_idx,
)?))
}
}
Terms(terms_req) => Ok(Box::new(SegmentTermCollector::from_req_and_validate(
terms_req,
&mut req.sub_aggregation,
req.field_type,
accessor_idx,
)?)),
Range(range_req) => Ok(Box::new(SegmentRangeCollector::from_req_and_validate(
range_req,
&mut req.sub_aggregation,
Expand Down
Loading