Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Sorabh Hamirwasia <sohami.apache@gmail.com>
  • Loading branch information
sohami committed Jun 2, 2023
1 parent 18da9fe commit 62005f7
Show file tree
Hide file tree
Showing 10 changed files with 45 additions and 21 deletions.
4 changes: 0 additions & 4 deletions server/src/main/java/org/opensearch/search/SearchModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@
import org.opensearch.plugins.SearchPlugin.SuggesterSpec;
import org.opensearch.search.aggregations.AggregationBuilder;
import org.opensearch.search.aggregations.BaseAggregationBuilder;
import org.opensearch.search.aggregations.ConcurrentAggregationProcessor;
import org.opensearch.search.aggregations.InternalAggregation;
import org.opensearch.search.aggregations.PipelineAggregationBuilder;
import org.opensearch.search.aggregations.bucket.adjacency.AdjacencyMatrixAggregationBuilder;
Expand Down Expand Up @@ -1295,9 +1294,6 @@ public QueryPhase getQueryPhase() {
if (queryPhaseSearcher == null) {
// use the defaults
queryPhase = new QueryPhase();
} else if (queryPhaseSearcher instanceof ConcurrentQueryPhaseSearcher) {
// use ConcurrentAggregationProcessor only with ConcurrentQueryPhaseSearcher
queryPhase = new QueryPhase(queryPhaseSearcher, new ConcurrentAggregationProcessor());
} else {
queryPhase = new QueryPhase(queryPhaseSearcher);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,29 +28,27 @@
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
* aggregation operators
*/
public class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
class AggregationCollectorManager implements CollectorManager<Collector, ReduceableSearchResult> {
private final SearchContext context;
private final CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider;
private final String collectorReason;

protected Collector collector;

public AggregationCollectorManager(
AggregationCollectorManager(
SearchContext context,
CheckedFunction<SearchContext, List<Aggregator>, IOException> aggProvider,
String collectorReason
) throws IOException {
) {
this.context = context;
this.aggProvider = aggProvider;
this.collectorReason = collectorReason;
collector = createCollector(context, aggProvider.apply(context), collectorReason);
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
}

@Override
public Collector newCollector() throws IOException {
return createCollector(context, aggProvider.apply(context), collectorReason);
final Collector collector = createCollector(context, aggProvider.apply(context), collectorReason);
// For Aggregations we should not have a NO_OP_Collector
assert collector != BucketCollector.NO_OP_COLLECTOR;
return collector;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
* {@link CollectorManager} to take care of global aggregation operators in case of concurrent segment search
*/
public class GlobalAggCollectorManager extends AggregationCollectorManager {

private Collector collector;

public GlobalAggCollectorManager(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL);
collector = super.newCollector();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
* returns the same collector instance (i.e. created in constructor of super class) on each newCollector call
*/
public class GlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager {

private final Collector collector;

public GlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelGlobalAggregators, CollectorResult.REASON_AGGREGATION_GLOBAL);
collector = super.newCollector();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,12 @@
* {@link CollectorManager} to take care of non-global aggregation operators in case of concurrent segment search
*/
public class NonGlobalAggCollectorManager extends AggregationCollectorManager {

private Collector collector;

public NonGlobalAggCollectorManager(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION);
collector = super.newCollector();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@
* CollectorManager returns the same collector instance (i.e. created in constructor of super class) on each newCollector call
*/
public class NonGlobalAggCollectorManagerWithSingleCollector extends AggregationCollectorManager {

private final Collector collector;

public NonGlobalAggCollectorManagerWithSingleCollector(SearchContext context) throws IOException {
super(context, context.aggregations().factories()::createTopLevelNonGlobalAggregators, CollectorResult.REASON_AGGREGATION);
collector = super.newCollector();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import org.apache.lucene.search.Collector;
import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Query;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.aggregations.ConcurrentAggregationProcessor;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.query.ProfileCollectorManager;
Expand Down Expand Up @@ -101,6 +103,11 @@ private static boolean searchWithCollectorManager(
return topDocsFactory.shouldRescore();
}

@Override
public AggregationProcessor newAggregationProcessor() {
return new ConcurrentAggregationProcessor();
}

private static boolean allowConcurrentSegmentSearch(final ContextIndexSearcher searcher) {
return (searcher.getExecutor() != null);
}
Expand Down
11 changes: 4 additions & 7 deletions server/src/main/java/org/opensearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@
import org.opensearch.search.SearchContextSourcePrinter;
import org.opensearch.search.SearchService;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.aggregations.DefaultAggregationProcessor;
import org.opensearch.search.aggregations.GlobalAggCollectorManager;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.ScrollContext;
Expand Down Expand Up @@ -93,7 +92,6 @@ public class QueryPhase {
// TODO: remove this property
public static final boolean SYS_PROP_REWRITE_SORT = Booleans.parseBoolean(System.getProperty("opensearch.search.rewrite_sort", "true"));
public static final QueryPhaseSearcher DEFAULT_QUERY_PHASE_SEARCHER = new DefaultQueryPhaseSearcher();
public static final AggregationProcessor DEFAULT_AGGREGATION_PROCESSOR = new DefaultAggregationProcessor();
private final QueryPhaseSearcher queryPhaseSearcher;
private final AggregationProcessor aggregationProcessor;
private final SuggestPhase suggestPhase;
Expand All @@ -104,12 +102,11 @@ public QueryPhase() {
}

public QueryPhase(QueryPhaseSearcher queryPhaseSearcher) {
this(queryPhaseSearcher, DEFAULT_AGGREGATION_PROCESSOR);
}

public QueryPhase(QueryPhaseSearcher queryPhaseSearcher, AggregationProcessor aggregationProcessor) {
this.queryPhaseSearcher = Objects.requireNonNull(queryPhaseSearcher, "QueryPhaseSearcher is required");
this.aggregationProcessor = aggregationProcessor;
this.aggregationProcessor = Objects.requireNonNull(
queryPhaseSearcher.newAggregationProcessor(),
"AggregationProcessor is required"
);
this.suggestPhase = new SuggestPhase();
this.rescorePhase = new RescorePhase();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

import org.apache.lucene.search.CollectorManager;
import org.apache.lucene.search.Query;
import org.opensearch.search.aggregations.AggregationProcessor;
import org.opensearch.search.aggregations.DefaultAggregationProcessor;
import org.opensearch.search.internal.ContextIndexSearcher;
import org.opensearch.search.internal.SearchContext;

Expand Down Expand Up @@ -40,4 +42,12 @@ boolean searchWith(
boolean hasFilterCollector,
boolean hasTimeout
) throws IOException;

/**
* {@link AggregationProcessor} to use to setup and post process aggregation related collectors during search request
* @return {@link AggregationProcessor} to use
*/
default AggregationProcessor newAggregationProcessor() {
return new DefaultAggregationProcessor();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,7 @@ public void testConcurrentQueryPhaseSearcher() {
public void testPluginQueryPhaseSearcher() {
Settings settings = Settings.builder().put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, true).build();
FeatureFlags.initializeFeatureFlags(settings);
QueryPhaseSearcher queryPhaseSearcher = mock(QueryPhaseSearcher.class);
QueryPhaseSearcher queryPhaseSearcher = (searchContext, searcher, query, collectors, hasFilterCollector, hasTimeout) -> false;
SearchPlugin plugin1 = new SearchPlugin() {
@Override
public Optional<QueryPhaseSearcher> getQueryPhaseSearcher() {
Expand Down

0 comments on commit 62005f7

Please sign in to comment.