Skip to content

Commit

Permalink
Move top-level pipeline aggs out of QuerySearchResult (elastic#40319)
Browse files Browse the repository at this point in the history
As part of elastic#40177 we have added top-level pipeline aggs to
`InternalAggregations`. Given that `QuerySearchResult` holds an
`InternalAggregations` instance, there is no need to keep on setting
top-level pipeline aggs separately. Top-level pipeline aggs can then
always be transported through `InternalAggregations`. Such change is
made in a backwards compatible manner.
  • Loading branch information
javanna committed Mar 29, 2019
1 parent 91c442a commit c5367b7
Show file tree
Hide file tree
Showing 6 changed files with 174 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
ReduceContext reduceContext = reduceContextFunction.apply(performFinalReduce);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null :
InternalAggregations.reduce(aggregationsList, firstResult.pipelineAggregators(), reduceContext);
InternalAggregations.reduce(aggregationsList, reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
final SortedTopDocs sortedTopDocs = sortDocs(isScrollRequest, queryResults, bufferedTopDocs, topDocsStats, from, size,
reducedCompletionSuggestions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ public void execute(SearchContext context) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}
context.queryResult().aggregations(new InternalAggregations(aggregations));
List<PipelineAggregator> pipelineAggregators = context.aggregations().factories().createPipelineAggregators();
List<SiblingPipelineAggregator> siblingPipelineAggregators = new ArrayList<>(pipelineAggregators.size());
for (PipelineAggregator pipelineAggregator : pipelineAggregators) {
Expand All @@ -144,7 +143,7 @@ public void execute(SearchContext context) {
+ "allowed at the top level");
}
}
context.queryResult().pipelineAggregators(siblingPipelineAggregators);
context.queryResult().aggregations(new InternalAggregations(aggregations, siblingPipelineAggregators));

// disable aggregations so that they don't run on next pages in case of scrolling
context.aggregations(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public InternalAggregations(List<InternalAggregation> aggregations, List<Sibling
* Note that top-level pipeline aggregators become normal aggregation once the final reduction has been performed, after which they
* become part of the list of {@link InternalAggregation}s.
*/
List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
public List<SiblingPipelineAggregator> getTopLevelPipelineAggregators() {
return topLevelPipelineAggregators;
}

Expand All @@ -92,20 +92,7 @@ public static InternalAggregations reduce(List<InternalAggregations> aggregation
if (aggregationsList.isEmpty()) {
return null;
}
InternalAggregations first = aggregationsList.get(0);
return reduce(aggregationsList, first.topLevelPipelineAggregators, context);
}

/**
* Reduces the given list of aggregations as well as the provided top-level pipeline aggregators.
* Note that top-level pipeline aggregators are reduced only as part of the final reduction phase, otherwise they are left untouched.
*/
public static InternalAggregations reduce(List<InternalAggregations> aggregationsList,
List<SiblingPipelineAggregator> topLevelPipelineAggregators,
ReduceContext context) {
if (aggregationsList.isEmpty()) {
return null;
}
List<SiblingPipelineAggregator> topLevelPipelineAggregators = aggregationsList.get(0).getTopLevelPipelineAggregators();

// first we collect all aggregations of the same type and list them together
Map<String, List<InternalAggregation>> aggByName = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
Expand All @@ -38,7 +39,6 @@
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static org.elasticsearch.common.lucene.Lucene.readTopDocs;
Expand All @@ -55,7 +55,6 @@ public final class QuerySearchResult extends SearchPhaseResult {
private DocValueFormat[] sortValueFormats;
private InternalAggregations aggregations;
private boolean hasAggs;
private List<SiblingPipelineAggregator> pipelineAggregators = Collections.emptyList();
private Suggest suggest;
private boolean searchTimedOut;
private Boolean terminatedEarly = null;
Expand Down Expand Up @@ -199,14 +198,6 @@ public void profileResults(ProfileShardResult shardResults) {
hasProfileResults = shardResults != null;
}

public List<SiblingPipelineAggregator> pipelineAggregators() {
return pipelineAggregators;
}

public void pipelineAggregators(List<SiblingPipelineAggregator> pipelineAggregators) {
this.pipelineAggregators = Objects.requireNonNull(pipelineAggregators);
}

public Suggest suggest() {
return suggest;
}
Expand Down Expand Up @@ -295,8 +286,18 @@ public void readFromWithId(long id, StreamInput in) throws IOException {
if (hasAggs = in.readBoolean()) {
aggregations = InternalAggregations.readAggregations(in);
}
pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream().map(a -> (SiblingPipelineAggregator) a)
.collect(Collectors.toList());
if (in.getVersion().before(Version.V_7_1_0)) {
List<SiblingPipelineAggregator> pipelineAggregators = in.readNamedWriteableList(PipelineAggregator.class).stream()
.map(a -> (SiblingPipelineAggregator) a).collect(Collectors.toList());
if (hasAggs && pipelineAggregators.isEmpty() == false) {
List<InternalAggregation> internalAggs = aggregations.asList().stream()
.map(agg -> (InternalAggregation) agg).collect(Collectors.toList());
//Earlier versions serialize sibling pipeline aggs separately as they used to be set to QuerySearchResult directly, while
//later versions include them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1.
this.aggregations = new InternalAggregations(internalAggs, pipelineAggregators);
}
}
if (in.readBoolean()) {
suggest = new Suggest(in);
}
Expand Down Expand Up @@ -338,7 +339,16 @@ public void writeToNoId(StreamOutput out) throws IOException {
out.writeBoolean(true);
aggregations.writeTo(out);
}
out.writeNamedWriteableList(pipelineAggregators);
if (out.getVersion().before(Version.V_7_1_0)) {
//Earlier versions expect sibling pipeline aggs separately as they used to be set to QuerySearchResult directly,
//while later versions expect them in InternalAggregations. Note that despite serializing sibling pipeline aggs as part of
//InternalAggregations is supported since 6.7.0, the shards set sibling pipeline aggs to InternalAggregations only from 7.1 on.
if (aggregations == null) {
out.writeNamedWriteableList(Collections.emptyList());
} else {
out.writeNamedWriteableList(aggregations.getTopLevelPipelineAggregators());
}
}
if (suggest == null) {
out.writeBoolean(false);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,18 +50,19 @@ public class InternalAggregationsTests extends ESTestCase {
public void testReduceEmptyAggs() {
List<InternalAggregations> aggs = Collections.emptyList();
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, randomBoolean());
assertNull(InternalAggregations.reduce(aggs, Collections.emptyList(), reduceContext));
assertNull(InternalAggregations.reduce(aggs, reduceContext));
}

public void testNonFinalReduceTopLevelPipelineAggs() {
InternalAggregation terms = new StringTerms("name", BucketOrder.key(true),
10, 1, Collections.emptyList(), Collections.emptyMap(), DocValueFormat.RAW, 25, false, 10, Collections.emptyList(), 0);
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms)));
List<SiblingPipelineAggregator> topLevelPipelineAggs = new ArrayList<>();
MaxBucketPipelineAggregationBuilder maxBucketPipelineAggregationBuilder = new MaxBucketPipelineAggregationBuilder("test", "test");
topLevelPipelineAggs.add((SiblingPipelineAggregator)maxBucketPipelineAggregationBuilder.create());
List<InternalAggregations> aggs = Collections.singletonList(new InternalAggregations(Collections.singletonList(terms),
topLevelPipelineAggs));
InternalAggregation.ReduceContext reduceContext = new InternalAggregation.ReduceContext(null, null, false);
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, topLevelPipelineAggs, reduceContext);
InternalAggregations reducedAggs = InternalAggregations.reduce(aggs, reduceContext);
assertEquals(1, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(1, reducedAggs.aggregations.size());
}
Expand All @@ -79,15 +80,15 @@ public void testFinalReduceTopLevelPipelineAggs() {
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
} else {
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms));
List<SiblingPipelineAggregator> topLevelPipelineAggs = Collections.singletonList(siblingPipelineAggregator);
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), topLevelPipelineAggs, reduceContext);
InternalAggregations aggs = new InternalAggregations(Collections.singletonList(terms),
Collections.singletonList(siblingPipelineAggregator));
reducedAggs = InternalAggregations.reduce(Collections.singletonList(aggs), reduceContext);
}
assertEquals(0, reducedAggs.getTopLevelPipelineAggregators().size());
assertEquals(2, reducedAggs.aggregations.size());
}

public void testSerialization() throws Exception {
public static InternalAggregations createTestInstance() throws Exception {
List<InternalAggregation> aggsList = new ArrayList<>();
if (randomBoolean()) {
StringTermsTests stringTermsTests = new StringTermsTests();
Expand Down Expand Up @@ -116,7 +117,11 @@ public void testSerialization() throws Exception {
topLevelPipelineAggs.add((SiblingPipelineAggregator)new SumBucketPipelineAggregationBuilder("name3", "bucket3").create());
}
}
InternalAggregations aggregations = new InternalAggregations(aggsList, topLevelPipelineAggs);
return new InternalAggregations(aggsList, topLevelPipelineAggs);
}

public void testSerialization() throws Exception {
InternalAggregations aggregations = createTestInstance();
writeToAndReadFrom(aggregations, 0);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.search.query;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.elasticsearch.Version;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.common.io.stream.NamedWriteableAwareStreamInput;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregations;
import org.elasticsearch.search.aggregations.InternalAggregationsTests;
import org.elasticsearch.search.aggregations.pipeline.SiblingPipelineAggregator;
import org.elasticsearch.search.suggest.SuggestTests;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.VersionUtils;

import java.io.IOException;
import java.util.Base64;
import java.util.List;

import static java.util.Collections.emptyList;

public class QuerySearchResultTests extends ESTestCase {

private final NamedWriteableRegistry namedWriteableRegistry;

public QuerySearchResultTests() {
SearchModule searchModule = new SearchModule(Settings.EMPTY, false, emptyList());
this.namedWriteableRegistry = new NamedWriteableRegistry(searchModule.getNamedWriteables());
}

private static QuerySearchResult createTestInstance() throws Exception {
ShardId shardId = new ShardId("index", "uuid", randomInt());
QuerySearchResult result = new QuerySearchResult(randomLong(), new SearchShardTarget("node", shardId, null, OriginalIndices.NONE));
if (randomBoolean()) {
result.terminatedEarly(randomBoolean());
}
TopDocs topDocs = new TopDocs(new TotalHits(randomLongBetween(0, Long.MAX_VALUE), TotalHits.Relation.EQUAL_TO), new ScoreDoc[0]);
result.topDocs(new TopDocsAndMaxScore(topDocs, randomBoolean() ? Float.NaN : randomFloat()), new DocValueFormat[0]);
result.size(randomInt());
result.from(randomInt());
if (randomBoolean()) {
result.suggest(SuggestTests.createTestItem());
}
if (randomBoolean()) {
result.aggregations(InternalAggregationsTests.createTestInstance());
}
return result;
}

public void testSerialization() throws Exception {
QuerySearchResult querySearchResult = createTestInstance();
Version version = VersionUtils.randomVersion(random());
QuerySearchResult deserialized = copyStreamable(querySearchResult, namedWriteableRegistry, QuerySearchResult::new, version);
assertEquals(querySearchResult.getRequestId(), deserialized.getRequestId());
assertNull(deserialized.getSearchShardTarget());
assertEquals(querySearchResult.topDocs().maxScore, deserialized.topDocs().maxScore, 0f);
assertEquals(querySearchResult.topDocs().topDocs.totalHits, deserialized.topDocs().topDocs.totalHits);
assertEquals(querySearchResult.from(), deserialized.from());
assertEquals(querySearchResult.size(), deserialized.size());
assertEquals(querySearchResult.hasAggs(), deserialized.hasAggs());
if (deserialized.hasAggs()) {
Aggregations aggs = querySearchResult.consumeAggs();
Aggregations deserializedAggs = deserialized.consumeAggs();
assertEquals(aggs.asList(), deserializedAggs.asList());
List<SiblingPipelineAggregator> pipelineAggs = ((InternalAggregations) aggs).getTopLevelPipelineAggregators();
List<SiblingPipelineAggregator> deserializedPipelineAggs =
((InternalAggregations) deserializedAggs).getTopLevelPipelineAggregators();
assertEquals(pipelineAggs.size(), deserializedPipelineAggs.size());
for (int i = 0; i < pipelineAggs.size(); i++) {
SiblingPipelineAggregator pipelineAgg = pipelineAggs.get(i);
SiblingPipelineAggregator deserializedPipelineAgg = deserializedPipelineAggs.get(i);
assertArrayEquals(pipelineAgg.bucketsPaths(), deserializedPipelineAgg.bucketsPaths());
assertEquals(pipelineAgg.name(), deserializedPipelineAgg.name());
}
}
assertEquals(querySearchResult.terminatedEarly(), deserialized.terminatedEarly());
}

public void testReadFromPre_7_1_0() throws IOException {
String message = "AAAAAAAAAGQAAAEAAAB/wAAAAAEBBnN0ZXJtcwVJblhNRgoDBVNhdWpvAAVrS3l3cwVHSVVZaAAFZXRUbEUFZGN0WVoABXhzYnVrAAEDAfoN" +
"A3JhdwUBAAJRAAAAAAAAA30DBnN0ZXJtcwVNdVVFRwoAAAEDAfoNA3JhdwUBAAdDAAAAAAAAA30AAApQVkFhaUxSdHh5TAAAAAAAAAN9AAAKTVRUeUxnd1hyd" +
"y0AAAAAAAADfQAACnZRQXZ3cWp0SmwPAAAAAAAAA30AAApmYXNyUUhNVWZBCwAAAAAAAAN9AAAKT3FIQ2RMZ1JZUwUAAAAAAAADfQAACm9jT05aZmZ4ZmUmAA" +
"AAAAAAA30AAApvb0tJTkdvbHdzBnN0ZXJtcwVtRmlmZAoAAAEDAfoNA3JhdwUBAARXAAAAAAAAA30AAApZd3BwQlpBZEhpMQAAAAAAAAN9AAAKREZ3UVpTSXh" +
"DSE4AAAAAAAADfQAAClVMZW1YZGtkSHUUAAAAAAAAA30AAApBUVdKVk1kTlF1BnN0ZXJtcwVxbkJGVgoAAAEDAfoNA3JhdwUBAAYJAAAAAAAAA30AAApBS2NL" +
"U1ZVS25EIQAAAAAAAAN9AAAKWGpCbXZBZmduRhsAAAAAAAADfQAACk54TkJEV3pLRmI7AAAAAAAAA30AAApydkdaZnJycXhWSAAAAAAAAAN9AAAKSURVZ3JhQ" +
"lFHSy4AAAAAAAADfQAACmJmZ0x5YlFlVksAClRJZHJlSkpVc1Y4AAAAAAAAA30DBnN0ZXJtcwVNdVVFRwoAAAEDAfoNA3JhdwUBAAdDAAAAAAAAA30AAApQVk" +
"FhaUxSdHh5TAAAAAAAAAN9AAAKTVRUeUxnd1hydy0AAAAAAAADfQAACnZRQXZ3cWp0SmwPAAAAAAAAA30AAApmYXNyUUhNVWZBCwAAAAAAAAN9AAAKT3FIQ2R" +
"MZ1JZUwUAAAAAAAADfQAACm9jT05aZmZ4ZmUmAAAAAAAAA30AAApvb0tJTkdvbHdzBnN0ZXJtcwVtRmlmZAoAAAEDAfoNA3JhdwUBAARXAAAAAAAAA30AAApZ" +
"d3BwQlpBZEhpMQAAAAAAAAN9AAAKREZ3UVpTSXhDSE4AAAAAAAADfQAAClVMZW1YZGtkSHUUAAAAAAAAA30AAApBUVdKVk1kTlF1BnN0ZXJtcwVxbkJGVgoAA" +
"AEDAfoNA3JhdwUBAAYJAAAAAAAAA30AAApBS2NLU1ZVS25EIQAAAAAAAAN9AAAKWGpCbXZBZmduRhsAAAAAAAADfQAACk54TkJEV3pLRmI7AAAAAAAAA30AAA" +
"pydkdaZnJycXhWSAAAAAAAAAN9AAAKSURVZ3JhQlFHSy4AAAAAAAADfQAACmJmZ0x5YlFlVksACm5rdExLUHp3cGgBCm1heF9idWNrZXQFbmFtZTEBB2J1Y2t" +
"ldDH/A3JhdwEBCm1heF9idWNrZXQFbmFtZTEBB2J1Y2tldDH/A3JhdwEAAAIAAf////8AAAAAAAAAAAAAAAAAAAAAAAAAAAAA";
byte[] bytes = Base64.getDecoder().decode(message);
try (NamedWriteableAwareStreamInput in = new NamedWriteableAwareStreamInput(StreamInput.wrap(bytes), namedWriteableRegistry)) {
in.setVersion(Version.V_7_0_0);
QuerySearchResult querySearchResult = new QuerySearchResult();
querySearchResult.readFrom(in);
assertEquals(100, querySearchResult.getRequestId());
assertTrue(querySearchResult.hasAggs());
InternalAggregations aggs = (InternalAggregations)querySearchResult.consumeAggs();
assertEquals(1, aggs.asList().size());
//top-level pipeline aggs are retrieved as part of InternalAggregations although they were serialized separately
assertEquals(1, aggs.getTopLevelPipelineAggregators().size());
}
}
}

0 comments on commit c5367b7

Please sign in to comment.