Skip to content

Commit

Permalink
Improving the performance of date histogram aggregation (without any …
Browse files Browse the repository at this point in the history
…sub-aggregation) (opensearch-project#11083)

* Adding filter based optimization logic to date histogram aggregation

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Reading the field name for aggregation correctly

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Adding the limit on number of buckets for filter aggregation

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Applying the optimizations for match all query as well

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Handling the unwrapped match all query

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Adding logic for recursively unwrapping the query

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Restructuring the code for making it more reusable and unit testable

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Adding javadocs for fixing build failure

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing minor bugs in refactoring

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Adding logic for optimizing auto date histogram

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing bugs and passing unit tests for date histogram

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Temporarily reverting auto date histogram changes

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing spotless check bugs

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Adding back auto date histogram and passing all unit tests

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing the integration tests for reduced collector work

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing the integration test regression

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Addressing code review comments

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Fixing hardbound, missing and script test cases

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Removing collect_count validation to prevent backward compatibility tests from failing

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Finally fixing hardbounds test case

Signed-off-by: Ankit Jain <akjain@amazon.com>

* Refactoring code for reusability

Signed-off-by: Ankit Jain <akjain@amazon.com>

---------

Signed-off-by: Ankit Jain <akjain@amazon.com>
(cherry picked from commit 0ddbd96)
  • Loading branch information
jainankitk committed Nov 29, 2023
1 parent b2770de commit 6fdc854
Show file tree
Hide file tree
Showing 8 changed files with 456 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Disallow removing some metadata fields by remove ingest processor ([#10895](https://github.com/opensearch-project/OpenSearch/pull/10895))
- Performance improvement for MultiTerm Queries on Keyword fields ([#7057](https://github.com/opensearch-project/OpenSearch/issues/7057))
- Refactor common parts from the Rounding class into a separate 'round' package ([#11023](https://github.com/opensearch-project/OpenSearch/issues/11023))
- Performance improvement for date histogram aggregations without sub-aggregations ([#11083](https://github.com/opensearch-project/OpenSearch/pull/11083))
- Disable concurrent aggs for Diversified Sampler and Sampler aggs ([#11087](https://github.com/opensearch-project/OpenSearch/issues/11087))
- Made leader/follower check timeout setting dynamic ([#10528](https://github.com/opensearch-project/OpenSearch/pull/10528))
- Change error message when per shard document limit is breached ([#11312](https://github.com/opensearch-project/OpenSearch/pull/11312))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,6 @@ setup:
- match: { aggregations.histo.buckets.0.doc_count: 2 }
- match: { profile.shards.0.aggregations.0.type: DateHistogramAggregator }
- match: { profile.shards.0.aggregations.0.description: histo }
- match: { profile.shards.0.aggregations.0.breakdown.collect_count: 4 }
- match: { profile.shards.0.aggregations.0.debug.total_buckets: 3 }

---
Expand Down
38 changes: 27 additions & 11 deletions server/src/main/java/org/opensearch/common/Rounding.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ long roundFloor(long utcMillis) {
}

@Override
long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -110,7 +110,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -121,7 +121,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundQuarterOfYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -132,7 +132,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundMonthOfYear(utcMillis);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return extraLocalOffsetLookup;
}
},
Expand All @@ -141,7 +141,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, this.ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -150,7 +150,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -165,7 +165,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
},
Expand All @@ -180,7 +180,7 @@ long roundFloor(long utcMillis) {
return DateUtils.roundFloor(utcMillis, ratio);
}

long extraLocalOffsetLookup() {
public long extraLocalOffsetLookup() {
return ratio;
}
};
Expand Down Expand Up @@ -217,7 +217,7 @@ long extraLocalOffsetLookup() {
* look up so that we can see transitions that we might have rounded
* down beyond.
*/
abstract long extraLocalOffsetLookup();
public abstract long extraLocalOffsetLookup();

public byte getId() {
return id;
Expand Down Expand Up @@ -488,7 +488,7 @@ public double roundingSize(long utcMillis, DateTimeUnit timeUnit) {
*
* @opensearch.internal
*/
static class TimeUnitRounding extends Rounding {
public static class TimeUnitRounding extends Rounding {
static final byte ID = 1;

private final DateTimeUnit unit;
Expand Down Expand Up @@ -523,6 +523,14 @@ public byte id() {
return ID;
}

public DateTimeUnit getUnit() {
return this.unit;
}

public ZoneId getTimeZone() {
return this.timeZone;
}

private LocalDateTime truncateLocalDateTime(LocalDateTime localDateTime) {
switch (unit) {
case SECOND_OF_MINUTE:
Expand Down Expand Up @@ -953,7 +961,7 @@ public final long nextRoundingValue(long utcMillis) {
*
* @opensearch.internal
*/
static class TimeIntervalRounding extends Rounding {
public static class TimeIntervalRounding extends Rounding {
static final byte ID = 2;

private final long interval;
Expand Down Expand Up @@ -984,6 +992,14 @@ public byte id() {
return ID;
}

public long getInterval() {
return this.interval;
}

public ZoneId getTimeZone() {
return this.timeZone;
}

@Override
public Prepared prepare(long minUtcMillis, long maxUtcMillis) {
long minLookup = minUtcMillis - interval;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,16 @@ public long parse(String value) {
return resolution.convert(DateFormatters.from(dateTimeFormatter().parse(value), dateTimeFormatter().locale()).toInstant());
}

public long convertNanosToMillis(long nanoSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toMilliSeconds(nanoSecondsSinceEpoch);
return nanoSecondsSinceEpoch;
}

public long convertRoundedMillisToNanos(long milliSecondsSinceEpoch) {
if (resolution.numericType.equals(NumericType.DATE_NANOSECONDS)) return DateUtils.toNanoSeconds(milliSecondsSinceEpoch);
return milliSecondsSinceEpoch;
}

@Override
public ValueFetcher valueFetcher(QueryShardContext context, SearchLookup searchLookup, String format) {
DateFormatter defaultFormatter = dateTimeFormatter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Weight;
import org.apache.lucene.util.CollectionUtil;
import org.opensearch.common.Rounding;
import org.opensearch.common.Rounding.Prepared;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.util.IntArray;
import org.opensearch.common.util.LongArray;
import org.opensearch.core.common.util.ByteArray;
import org.opensearch.index.mapper.DateFieldMapper;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
Expand Down Expand Up @@ -125,9 +127,13 @@ static AutoDateHistogramAggregator build(
* {@link MergingBucketsDeferringCollector#mergeBuckets(long[])}.
*/
private MergingBucketsDeferringCollector deferringCollector;
private final Weight[] filters;
private final DateFieldMapper.DateFieldType fieldType;

protected final RoundingInfo[] roundingInfos;
protected final int targetBuckets;
protected int roundingIdx;
protected Rounding.Prepared preparedRounding;

private AutoDateHistogramAggregator(
String name,
Expand All @@ -148,8 +154,51 @@ private AutoDateHistogramAggregator(
this.formatter = valuesSourceConfig.format();
this.roundingInfos = roundingInfos;
this.roundingPreparer = roundingPreparer;
this.preparedRounding = prepareRounding(0);

FilterRewriteHelper.FilterContext filterContext = FilterRewriteHelper.buildFastFilterContext(
parent(),
subAggregators.length,
context,
b -> getMinimumRounding(b[0], b[1]),
// Passing prepared rounding as supplier to ensure the correct prepared
// rounding is set as it is done during getMinimumRounding
() -> preparedRounding,
valuesSourceConfig,
fc -> FilterRewriteHelper.getAggregationBounds(context, fc.field())
);
if (filterContext != null) {
fieldType = filterContext.fieldType;
filters = filterContext.filters;
} else {
fieldType = null;
filters = null;
}
}

private Rounding getMinimumRounding(final long low, final long high) {
// max - min / targetBuckets = bestDuration
// find the right innerInterval this bestDuration belongs to
// since we cannot exceed targetBuckets, bestDuration should go up,
// so the right innerInterval should be an upper bound
long bestDuration = (high - low) / targetBuckets;
while (roundingIdx < roundingInfos.length - 1) {
final RoundingInfo curRoundingInfo = roundingInfos[roundingIdx];
final int temp = curRoundingInfo.innerIntervals[curRoundingInfo.innerIntervals.length - 1];
// If the interval duration is covered by the maximum inner interval,
// we can start with this outer interval for creating the buckets
if (bestDuration <= temp * curRoundingInfo.roughEstimateDurationMillis) {
break;
}
roundingIdx++;
}

preparedRounding = prepareRounding(roundingIdx);
return roundingInfos[roundingIdx].rounding;
}

protected abstract LongKeyedBucketOrds getBucketOrds();

@Override
public final ScoreMode scoreMode() {
if (valuesSource != null && valuesSource.needsScores()) {
Expand All @@ -176,7 +225,32 @@ public final LeafBucketCollector getLeafCollector(LeafReaderContext ctx, LeafBuc
if (valuesSource == null) {
return LeafBucketCollector.NO_OP_COLLECTOR;
}
return getLeafCollector(valuesSource.longValues(ctx), sub);

final SortedNumericDocValues values = valuesSource.longValues(ctx);
final LeafBucketCollector iteratingCollector = getLeafCollector(values, sub);

// Need to be declared as final and array for usage within the
// LeafBucketCollectorBase subclass below
final boolean[] useOpt = new boolean[1];
useOpt[0] = filters != null;

return new LeafBucketCollectorBase(sub, values) {
@Override
public void collect(int doc, long owningBucketOrd) throws IOException {
// Try fast filter aggregation if the filters have been created
// Skip if tried before and gave incorrect/incomplete results
if (useOpt[0]) {
useOpt[0] = FilterRewriteHelper.tryFastFilterAggregation(ctx, filters, fieldType, (key, count) -> {
incrementBucketDocCount(
FilterRewriteHelper.getBucketOrd(getBucketOrds().add(owningBucketOrd, preparedRounding.round(key))),
count
);
});
}

iteratingCollector.collect(doc, owningBucketOrd);
}
};
}

protected final InternalAggregation[] buildAggregations(
Expand Down Expand Up @@ -247,8 +321,6 @@ protected final void merge(long[] mergeMap, long newNumBuckets) {
* @opensearch.internal
*/
private static class FromSingle extends AutoDateHistogramAggregator {
private int roundingIdx;
private Rounding.Prepared preparedRounding;
/**
* Map from value to bucket ordinals.
* <p>
Expand Down Expand Up @@ -286,10 +358,14 @@ private static class FromSingle extends AutoDateHistogramAggregator {
metadata
);

preparedRounding = prepareRounding(0);
bucketOrds = new LongKeyedBucketOrds.FromSingle(context.bigArrays());
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down Expand Up @@ -507,6 +583,11 @@ private static class FromMany extends AutoDateHistogramAggregator {
liveBucketCountUnderestimate = context.bigArrays().newIntArray(1, true);
}

@Override
protected LongKeyedBucketOrds getBucketOrds() {
return bucketOrds;
}

@Override
protected LeafBucketCollector getLeafCollector(SortedNumericDocValues values, LeafBucketCollector sub) throws IOException {
return new LeafBucketCollectorBase(sub, values) {
Expand Down

0 comments on commit 6fdc854

Please sign in to comment.