Skip to content

Commit

Permalink
Refactoring code for reusability
Browse files Browse the repository at this point in the history
Signed-off-by: Ankit Jain <akjain@amazon.com>
  • Loading branch information
jainankitk committed Nov 28, 2023
1 parent be20dde commit e225781
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
import org.opensearch.common.lease.Releasables;
Expand All @@ -59,7 +56,6 @@
import org.opensearch.search.aggregations.bucket.MergingBucketsDeferringCollector;
import org.opensearch.search.aggregations.bucket.histogram.AutoDateHistogramAggregationBuilder.RoundingInfo;
import org.opensearch.search.aggregations.bucket.terms.LongKeyedBucketOrds;
import org.opensearch.search.aggregations.support.FieldContext;
import org.opensearch.search.aggregations.support.ValuesSource;
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
import org.opensearch.search.internal.SearchContext;
Expand Down Expand Up @@ -131,8 +127,8 @@ static AutoDateHistogramAggregator build(
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
*/
private MergingBucketsDeferringCollector deferringCollector;
private Weight[] filters = null;
private DateFieldMapper.DateFieldType fieldType;
private final Weight[] filters;
private final DateFieldMapper.DateFieldType fieldType;

protected final RoundingInfo[] roundingInfos;
protected final int targetBuckets;
Expand Down Expand Up @@ -160,27 +156,23 @@ private AutoDateHistogramAggregator(
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent() == null && subAggregators.length == 0 && valuesSourceConfig.missing() == null && valuesSourceConfig.script() == null) {
final FieldContext fieldContext = valuesSourceConfig.fieldContext();
if (fieldContext != null) {
final String fieldName = fieldContext.field();
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldName);
if (bounds != null) {
assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType;
fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType();
filters = FilterRewriteHelper.createFilterForAggregations(
context,
getMinimumRounding(bounds[0], bounds[1]),
preparedRounding,
fieldName,
fieldType,
bounds[0],
bounds[1]
);
}
}
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
context,
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding,
valuesSourceConfig,
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
fieldType = null;
filters = null;
}
}

Expand All @@ -207,37 +199,6 @@ private Rounding getMinimumRounding(final long low, final long high) {

protected abstract LongKeyedBucketOrds getBucketOrds();

private boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucketOrd) throws IOException {
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
counts[i] = filters[i].count(ctx);
if (counts[i] == -1) {
// Cannot use the optimization if any of the counts
// is -1 indicating the segment might have deleted documents
return false;
}
}

for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
long bucketOrd = getBucketOrds().add(
owningBucketOrd,
preparedRounding.round(
fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
)
)
);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
}
incrementBucketDocCount(bucketOrd, counts[i]);
}
}
throw new CollectionTerminatedException();
}

@Override
public final ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand Down Expand Up @@ -279,7 +240,12 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = tryFastFilterAggregation(ctx, owningBucketOrd);
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {

Check warning on line 243 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java#L243

Added line #L243 was not covered by tests
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

iteratingCollector.collect(doc, owningBucketOrd);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@

import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.CollectionTerminatedException;
import org.apache.lucene.search.PointRangeQuery;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.Nullable;
import org.opensearch.common.Rounding;
import org.opensearch.common.lease.Releasables;
Expand Down Expand Up @@ -84,9 +81,9 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg
private final long minDocCount;
private final LongBounds extendedBounds;
private final LongBounds hardBounds;
private Weight[] filters = null;
private final Weight[] filters;
private final LongKeyedBucketOrds bucketOrds;
private DateFieldMapper.DateFieldType fieldType;
private final DateFieldMapper.DateFieldType fieldType;

DateHistogramAggregator(
String name,
Expand Down Expand Up @@ -119,35 +116,34 @@ class DateHistogramAggregator extends BucketsAggregator implements SizedBucketAg

bucketOrds = LongKeyedBucketOrds.build(context.bigArrays(), cardinality);

// Create the filters for fast aggregation only if the query is instance
// of point range query and there aren't any parent/sub aggregations
if (parent() == null && subAggregators.length == 0 && valuesSourceConfig.missing() == null && valuesSourceConfig.script() == null) {
final FieldContext fieldContext = valuesSourceConfig.fieldContext();
if (fieldContext != null) {
final String fieldName = fieldContext.field();
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldName);
if (bounds != null) {
assert fieldContext.fieldType() instanceof DateFieldMapper.DateFieldType;
fieldType = (DateFieldMapper.DateFieldType) fieldContext.fieldType();

// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive
}
FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent,
subAggregators.length,
context,
x -> rounding,
() -> preparedRounding,
valuesSourceConfig,
this::computeBounds
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
filters = null;
fieldType = null;
}
}

filters = FilterRewriteHelper.createFilterForAggregations(
context,
rounding,
preparedRounding,
fieldName,
fieldType,
bounds[0],
bounds[1]
);
}
private long[] computeBounds(final FieldContext fieldContext) throws IOException {
final long[] bounds = FilterRewriteHelper.getAggregationBounds(context, fieldContext.field());
if (bounds != null) {
// Update min/max limit if user specified any hard bounds
if (hardBounds != null) {
bounds[0] = Math.max(bounds[0], hardBounds.getMin());
bounds[1] = Math.min(bounds[1], hardBounds.getMax() - 1); // hard bounds max is exclusive

Check warning on line 143 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java#L142-L143

Added lines #L142 - L143 were not covered by tests
}
}
return bounds;
}

@Override
Expand Down Expand Up @@ -176,7 +172,12 @@ public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = tryFastFilterAggregation(ctx, owningBucketOrd);
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {

Check warning on line 175 in server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/search/aggregations/bucket/histogram/DateHistogramAggregator.java#L175

Added line #L175 was not covered by tests
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(bucketOrds.add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

if (values.advanceExact(doc)) {
Expand Down Expand Up @@ -272,35 +273,4 @@ public double bucketSize(long bucket, Rounding.DateTimeUnit unitSize) {
return 1.0;
}
}

private boolean tryFastFilterAggregation(LeafReaderContext ctx, long owningBucketOrd) throws IOException {
final int[] counts = new int[filters.length];
int i;
for (i = 0; i < filters.length; i++) {
counts[i] = filters[i].count(ctx);
if (counts[i] == -1) {
// Cannot use the optimization if any of the counts
// is -1 indicating the segment might have deleted documents
return false;
}
}

for (i = 0; i < filters.length; i++) {
if (counts[i] > 0) {
long bucketOrd = bucketOrds.add(
owningBucketOrd,
preparedRounding.round(
fieldType.convertNanosToMillis(
NumericUtils.sortableBytesToLong(((PointRangeQuery) filters[i].getQuery()).getLowerPoint(), 0)
)
)
);
if (bucketOrd < 0) { // already seen
bucketOrd = -1 - bucketOrd;
}
incrementBucketDocCount(bucketOrd, counts[i]);
}
}
throw new CollectionTerminatedException();
}
}

0 comments on commit e225781

Please sign in to comment.