Skip to content

Commit

Permalink
[Concurrent Segment Search]: Implement concurrent aggregations suppor…
Browse files Browse the repository at this point in the history
…t without profile option (opensearch-project#7514)

* Refactoring of AggregationReduceContext to use in SearchContext. This will be used for performing shard level reduce of aggregations during concurrent segment search usecase

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Support for non global aggregations with concurrent segment search. This PR does not include the support for
profile option with aggregations to work with concurrent model

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Implement AggregationCollectorManager's reduce

Signed-off-by: Andriy Redko <andriy.redko@aiven.io>

* Use CollectorManager for both concurrent and non concurrent use case
Add CollectorManager for Global Aggregations to support concurrent use case

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Address review comments

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

* Address serializing pipelineTree in QueryResult in 2.x for bwc and fix the new tests as well

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>

---------

Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
Co-authored-by: Andriy Redko <andriy.redko@aiven.io>
  • Loading branch information
sohami and reta committed Jun 8, 2023
1 parent 740956a commit 5066a31
Show file tree
Hide file tree
Showing 38 changed files with 1,306 additions and 320 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ All notable changes to this project are documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). See the [CONTRIBUTING guide](./CONTRIBUTING.md#Changelog) for instructions on how to add changelog entries.

## [Unreleased 2.x]
### Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
### Added
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))

### Dependencies
- Bump `com.azure:azure-storage-common` from 12.21.0 to 12.21.1 (#7566, #7814)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,9 @@ public void testMap() {
assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
// with script based aggregation, if it does not support reduce then aggregationList size
// will be numShards * slicesCount
assertThat(aggregationList.size(), greaterThanOrEqualTo(getNumShards("idx").numPrimaries));
int numShardsRun = 0;
for (Object object : aggregationList) {
assertThat(object, notNullValue());
Expand Down Expand Up @@ -483,7 +485,9 @@ public void testMapWithParams() {
assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
// with script based aggregation, if it does not support reduce then aggregationList size
// will be numShards * slicesCount
assertThat(aggregationList.size(), greaterThanOrEqualTo(getNumShards("idx").numPrimaries));
int numShardsRun = 0;
for (Object object : aggregationList) {
assertThat(object, notNullValue());
Expand Down Expand Up @@ -535,7 +539,9 @@ public void testInitMutatesParams() {
assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
// with script based aggregation, if it does not support reduce then aggregationList size
// will be numShards * slicesCount
assertThat(aggregationList.size(), greaterThanOrEqualTo(getNumShards("idx").numPrimaries));
long totalCount = 0;
for (Object object : aggregationList) {
assertThat(object, notNullValue());
Expand Down Expand Up @@ -588,7 +594,9 @@ public void testMapCombineWithParams() {
assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
// with script based aggregation, if it does not support reduce then aggregationList size
// will be numShards * slicesCount
assertThat(aggregationList.size(), greaterThanOrEqualTo(getNumShards("idx").numPrimaries));
long totalCount = 0;
for (Object object : aggregationList) {
assertThat(object, notNullValue());
Expand Down Expand Up @@ -651,7 +659,9 @@ public void testInitMapCombineWithParams() {
assertThat(scriptedMetricAggregation.aggregation(), notNullValue());
assertThat(scriptedMetricAggregation.aggregation(), instanceOf(ArrayList.class));
List<?> aggregationList = (List<?>) scriptedMetricAggregation.aggregation();
assertThat(aggregationList.size(), equalTo(getNumShards("idx").numPrimaries));
// with script based aggregation, if it does not support reduce then aggregationList size
// will be numShards * slicesCount
assertThat(aggregationList.size(), greaterThanOrEqualTo(getNumShards("idx").numPrimaries));
long totalCount = 0;
for (Object object : aggregationList) {
assertThat(object, notNullValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,11 @@ public final class SearchPhaseController {
private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];

private final NamedWriteableRegistry namedWriteableRegistry;
private final Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;

public SearchPhaseController(
NamedWriteableRegistry namedWriteableRegistry,
Function<SearchRequest, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
) {
this.namedWriteableRegistry = namedWriteableRegistry;
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
Expand Down Expand Up @@ -737,7 +737,7 @@ public InternalSearchResponse buildResponse(SearchHits hits) {
}

InternalAggregation.ReduceContextBuilder getReduceContext(SearchRequest request) {
return requestToAggReduceContextBuilder.apply(request);
return requestToAggReduceContextBuilder.apply(request.source());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ private void executeRequest(
localIndices,
remoteClusterIndices,
timeProvider,
searchService.aggReduceContextBuilder(searchRequest),
searchService.aggReduceContextBuilder(searchRequest.source()),
remoteClusterService,
threadPool,
listener,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@
import org.opensearch.index.search.NestedHelper;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.similarity.SimilarityService;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.SearchContextAggregations;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.search.collapse.CollapseContext;
import org.opensearch.search.dfs.DfsSearchResult;
import org.opensearch.search.fetch.FetchPhase;
Expand Down Expand Up @@ -99,6 +101,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.function.LongSupplier;

/**
Expand Down Expand Up @@ -175,6 +178,7 @@ final class DefaultSearchContext extends SearchContext {
private final Map<Class<?>, CollectorManager<? extends Collector, ReduceableSearchResult>> queryCollectorManagers = new HashMap<>();
private final QueryShardContext queryShardContext;
private final FetchPhase fetchPhase;
private final Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder;

DefaultSearchContext(
ReaderContext readerContext,
Expand All @@ -188,7 +192,8 @@ final class DefaultSearchContext extends SearchContext {
boolean lowLevelCancellation,
Version minNodeVersion,
boolean validate,
Executor executor
Executor executor,
Function<SearchSourceBuilder, InternalAggregation.ReduceContextBuilder> requestToAggReduceContextBuilder
) throws IOException {
this.readerContext = readerContext;
this.request = request;
Expand Down Expand Up @@ -225,6 +230,7 @@ final class DefaultSearchContext extends SearchContext {
);
queryBoost = request.indexBoost();
this.lowLevelCancellation = lowLevelCancellation;
this.requestToAggReduceContextBuilder = requestToAggReduceContextBuilder;
}

@Override
Expand Down Expand Up @@ -886,4 +892,9 @@ public boolean isCancelled() {
public ReaderContext readerContext() {
return readerContext;
}

@Override
public InternalAggregation.ReduceContext partial() {
return requestToAggReduceContextBuilder.apply(request.source()).forPartialReduction();
}
}
9 changes: 8 additions & 1 deletion server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -1326,7 +1326,14 @@ public FetchPhase getFetchPhase() {
}

public QueryPhase getQueryPhase() {
return (queryPhaseSearcher == null) ? new QueryPhase() : new QueryPhase(queryPhaseSearcher);
QueryPhase queryPhase;
if (queryPhaseSearcher == null) {
// use the defaults
queryPhase = new QueryPhase();
} else {
queryPhase = new QueryPhase(queryPhaseSearcher);
}
return queryPhase;
}

public @Nullable ExecutorService getIndexSearcherExecutor(ThreadPool pool) {
Expand Down
18 changes: 9 additions & 9 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitSearchContextIdForNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.UpdatePitContextRequest;
Expand Down Expand Up @@ -1039,7 +1038,8 @@ private DefaultSearchContext createSearchContext(ReaderContext reader, ShardSear
lowLevelCancellation,
clusterService.state().nodes().getMinNodeVersion(),
validate,
indexSearcherExecutor
indexSearcherExecutor,
this::aggReduceContextBuilder
);
// we clone the query shard context here just for rewriting otherwise we
// might end up with incorrect state since we are using now() or script services
Expand Down Expand Up @@ -1621,22 +1621,22 @@ public IndicesService getIndicesService() {

/**
* Returns a builder for {@link InternalAggregation.ReduceContext}. This
* builder retains a reference to the provided {@link SearchRequest}.
* builder retains a reference to the provided {@link SearchSourceBuilder}.
*/
public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(SearchRequest request) {
public InternalAggregation.ReduceContextBuilder aggReduceContextBuilder(SearchSourceBuilder searchSourceBuilder) {
return new InternalAggregation.ReduceContextBuilder() {
@Override
public InternalAggregation.ReduceContext forPartialReduction() {
return InternalAggregation.ReduceContext.forPartialReduction(
bigArrays,
scriptService,
() -> requestToPipelineTree(request)
() -> requestToPipelineTree(searchSourceBuilder)
);
}

@Override
public ReduceContext forFinalReduction() {
PipelineTree pipelineTree = requestToPipelineTree(request);
PipelineTree pipelineTree = requestToPipelineTree(searchSourceBuilder);
return InternalAggregation.ReduceContext.forFinalReduction(
bigArrays,
scriptService,
Expand All @@ -1647,11 +1647,11 @@ public ReduceContext forFinalReduction() {
};
}

private static PipelineTree requestToPipelineTree(SearchRequest request) {
if (request.source() == null || request.source().aggregations() == null) {
private static PipelineTree requestToPipelineTree(SearchSourceBuilder searchSourceBuilder) {
if (searchSourceBuilder == null || searchSourceBuilder.aggregations() == null) {
return PipelineTree.EMPTY;
}
return request.source().aggregations().buildPipelineTree();
return searchSourceBuilder.aggregations().buildPipelineTree();
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.search.aggregations;

import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.opensearch.common.CheckedFunction;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.InternalProfileCollector;
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
*/
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
private final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;

AggregationCollectorManager(
SearchContext context,
CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider,
String collectorReason
) {
this.context = context;
this.aggProvider = aggProvider;
this.collectorReason = collectorReason;
}

@Override
public Collector newCollector() throws IOException {
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
List<Aggregator> aggregators = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof Aggregator) {
aggregators.add((Aggregator) currentCollector);
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof Aggregator) {
aggregators.add((Aggregator) ((InternalProfileCollector) currentCollector).getCollector());
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}

final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
aggregator.postCollection();
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
// Reduce the aggregations across slices before sending to the coordinator. We will perform shard level reduce iff multiple slices
// were created to execute this request and it used concurrent segment search path
// TODO: Add the check for flag that the request was executed using concurrent search
if (collectors.size() > 1) {
// using reduce is fine here instead of topLevelReduce as pipeline aggregation is evaluated on the coordinator after all
// documents are collected across shards for an aggregation
return new AggregationReduceableSearchResult(
InternalAggregations.reduce(Collections.singletonList(internalAggregations), context.partial())
);
} else {
return new AggregationReduceableSearchResult(internalAggregations);
}
}

static Collector createCollector(SearchContext context, List<Aggregator> collectors, String reason) throws IOException {
Collector collector = MultiBucketCollector.wrap(collectors);
((BucketCollector) collector).preCollection();
if (context.getProfilers() != null) {
collector = new InternalProfileCollector(
collector,
reason,
// TODO: report on child aggs as well
Collections.emptyList()
);
}
return collector;
}
}

0 comments on commit 5066a31

Please sign in to comment.