Skip to content

Commit

Permalink
Add a new cluster setting to limit the total number of buckets return…
Browse files Browse the repository at this point in the history
…ed by a request (elastic#27581)

This commit adds a new dynamic cluster setting named `search.max_buckets` that can be used to limit the number of buckets created per shard or by the reduce phase. Each multi bucket aggregator can consume buckets during the final build of the aggregation at the shard level or during the reduce phase (final or not) in the coordinating node. When an aggregator consumes a bucket, a global count for the request is incremented and if this number is greater than the limit an exception is thrown (TooManyBuckets exception).
This change adds the ability for multi bucket aggregator to "consume" buckets in the global limit, the default is 10,000. It's an opt-in consumer so each multi-bucket aggregator must explicitly call the consumer when a bucket is added in the response.

Closes elastic#27452 elastic#26012
  • Loading branch information
jimczi committed Dec 6, 2017
1 parent 70f8ea3 commit caea6b7
Show file tree
Hide file tree
Showing 49 changed files with 658 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.index.Index;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.transport.TcpTransport;

import java.io.IOException;
Expand Down Expand Up @@ -986,7 +987,10 @@ private enum ElasticsearchExceptionHandle {
SHARD_LOCK_OBTAIN_FAILED_EXCEPTION(org.elasticsearch.env.ShardLockObtainFailedException.class,
org.elasticsearch.env.ShardLockObtainFailedException::new, 147, Version.V_5_0_2),
UNKNOWN_NAMED_OBJECT_EXCEPTION(org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException.class,
org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException::new, 148, Version.V_5_2_0);
org.elasticsearch.common.xcontent.NamedXContentRegistry.UnknownNamedObjectException::new, 148, Version.V_5_2_0),
TOO_MANY_BUCKETS_EXCEPTION(MultiBucketConsumerService.TooManyBucketsException.class,
MultiBucketConsumerService.TooManyBucketsException::new, 149,
Version.V_7_0_0_alpha1);

final Class<? extends ElasticsearchException> exceptionClass;
final CheckedFunction<StreamInput, ? extends ElasticsearchException, IOException> constructor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
Expand All @@ -73,13 +74,16 @@ public final class SearchPhaseController extends AbstractComponent {

private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];

private final BigArrays bigArrays;
private final ScriptService scriptService;
private final Function<Boolean, ReduceContext> reduceContextFunction;

public SearchPhaseController(Settings settings, BigArrays bigArrays, ScriptService scriptService) {
/**
* Constructor.
* @param settings Node settings
* @param reduceContextFunction A function that builds a context for the reduce of an {@link InternalAggregation}
*/
public SearchPhaseController(Settings settings, Function<Boolean, ReduceContext> reduceContextFunction) {
super(settings);
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.reduceContextFunction = reduceContextFunction;
}

public AggregatedDfs aggregateDfs(Collection<DfsSearchResult> results) {
Expand Down Expand Up @@ -496,7 +500,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
}
}
final Suggest suggest = groupedSuggestions.isEmpty() ? null : new Suggest(Suggest.reduce(groupedSuggestions));
ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, true);
ReduceContext reduceContext = reduceContextFunction.apply(true);
final InternalAggregations aggregations = aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
firstResult.pipelineAggregators(), reduceContext);
final SearchProfileShardResults shardResults = profileResults.isEmpty() ? null : new SearchProfileShardResults(profileResults);
Expand All @@ -513,7 +517,7 @@ private ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResu
* that relevant for the final reduce step. For final reduce see {@link #reduceAggs(List, List, ReduceContext)}
*/
private InternalAggregations reduceAggsIncrementally(List<InternalAggregations> aggregationsList) {
ReduceContext reduceContext = new ReduceContext(bigArrays, scriptService, false);
ReduceContext reduceContext = reduceContextFunction.apply(false);
return aggregationsList.isEmpty() ? null : reduceAggs(aggregationsList,
null, reduceContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.SearchModule;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.fetch.subphase.highlight.FastVectorHighlighter;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -360,6 +361,7 @@ public void apply(Settings value, Settings current, Settings previous) {
SearchService.DEFAULT_KEEPALIVE_SETTING,
SearchService.KEEPALIVE_INTERVAL_SETTING,
SearchService.MAX_KEEPALIVE_SETTING,
MultiBucketConsumerService.MAX_BUCKET_SETTING,
SearchService.LOW_LEVEL_CANCELLATION_SETTING,
Node.WRITE_PORTS_FILE_SETTING,
Node.NODE_NAME_SETTING,
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@
import org.elasticsearch.indices.breaker.HierarchyCircuitBreakerService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.indices.cluster.IndicesClusterStateService;
import org.elasticsearch.indices.mapper.MapperRegistry;
import org.elasticsearch.indices.recovery.PeerRecoverySourceService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoverySettings;
Expand Down Expand Up @@ -449,6 +448,11 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
transportService, indicesService, pluginsService, circuitBreakerService, scriptModule.getScriptService(),
httpServerTransport, ingestService, clusterService, settingsModule.getSettingsFilter(), responseCollectorService,
searchTransportService);

final SearchService searchService = newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand All @@ -470,12 +474,10 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(newSearchService(clusterService, indicesService,
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService));
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings, bigArrays,
scriptModule.getScriptService()));
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings,
searchService::createReduceContext));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
Expand Down
12 changes: 11 additions & 1 deletion core/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.elasticsearch.script.SearchScript;
import org.elasticsearch.search.aggregations.AggregationInitializationException;
import org.elasticsearch.search.aggregations.AggregatorFactories;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.SearchContextAggregations;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.collapse.CollapseContext;
Expand Down Expand Up @@ -118,6 +120,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv
Setting.positiveTimeSetting("search.max_keep_alive", timeValueHours(24), Property.NodeScope, Property.Dynamic);
public static final Setting<TimeValue> KEEPALIVE_INTERVAL_SETTING =
Setting.positiveTimeSetting("search.keep_alive_interval", timeValueMinutes(1), Property.NodeScope);

/**
* Enables low-level, frequent search cancellation checks. Enabling low-level checks will make long running searches to react
* to the cancellation request faster. However, since it will produce more cancellation checks it might slow the search performance
Expand Down Expand Up @@ -163,6 +166,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv

private final ConcurrentMapLong<SearchContext> activeContexts = ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency();

private final MultiBucketConsumerService multiBucketConsumerService;

public SearchService(ClusterService clusterService, IndicesService indicesService,
ThreadPool threadPool, ScriptService scriptService, BigArrays bigArrays, FetchPhase fetchPhase,
ResponseCollectorService responseCollectorService) {
Expand All @@ -175,6 +180,7 @@ public SearchService(ClusterService clusterService, IndicesService indicesServic
this.bigArrays = bigArrays;
this.queryPhase = new QueryPhase(settings);
this.fetchPhase = fetchPhase;
this.multiBucketConsumerService = new MultiBucketConsumerService(clusterService, settings);

TimeValue keepAliveInterval = KEEPALIVE_INTERVAL_SETTING.get(settings);
setKeepAlives(DEFAULT_KEEPALIVE_SETTING.get(settings), MAX_KEEPALIVE_SETTING.get(settings));
Expand Down Expand Up @@ -741,7 +747,7 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
if (source.aggregations() != null) {
try {
AggregatorFactories factories = source.aggregations().build(context, null);
context.aggregations(new SearchContextAggregations(factories));
context.aggregations(new SearchContextAggregations(factories, multiBucketConsumerService.create()));
} catch (IOException e) {
throw new AggregationInitializationException("Failed to create aggregators", e);
}
Expand Down Expand Up @@ -1017,4 +1023,8 @@ public QueryRewriteContext getRewriteContext(LongSupplier nowInMillis) {
public IndicesService getIndicesService() {
return indicesService;
}

public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce) {
return new InternalAggregation.ReduceContext(bigArrays, scriptService, multiBucketConsumerService.create(), finalReduce);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public void execute(SearchContext context) {
}

List<InternalAggregation> aggregations = new ArrayList<>(aggregators.length);
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : context.aggregations().aggregators()) {
try {
aggregator.postCollection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.NamedWriteable;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArray;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.rest.action.search.RestSearchAction;
Expand All @@ -33,6 +34,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.IntConsumer;

/**
* An internal implementation of {@link Aggregation}. Serves as a base class for all aggregation implementations.
Expand All @@ -43,11 +45,17 @@ public static class ReduceContext {

private final BigArrays bigArrays;
private final ScriptService scriptService;
private final IntConsumer multiBucketConsumer;
private final boolean isFinalReduce;

public ReduceContext(BigArrays bigArrays, ScriptService scriptService, boolean isFinalReduce) {
this(bigArrays, scriptService, (s) -> {}, isFinalReduce);
}

public ReduceContext(BigArrays bigArrays, ScriptService scriptService, IntConsumer multiBucketConsumer, boolean isFinalReduce) {
this.bigArrays = bigArrays;
this.scriptService = scriptService;
this.multiBucketConsumer = multiBucketConsumer;
this.isFinalReduce = isFinalReduce;
}

Expand All @@ -67,6 +75,14 @@ public BigArrays bigArrays() {
public ScriptService scriptService() {
return scriptService;
}

/**
* Adds <tt>count</tt> buckets to the global count for the request and fails if this number is greater than
* the maximum number of buckets allowed in a response
*/
public void consumeBucketsAndMaybeBreak(int size) {
multiBucketConsumer.accept(size);
}
}

protected final String name;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.search.aggregations.bucket.MultiBucketsAggregation;
import org.elasticsearch.search.aggregations.bucket.SingleBucketAggregation;
import org.elasticsearch.search.aggregations.pipeline.PipelineAggregator;

import java.io.IOException;
Expand Down Expand Up @@ -82,6 +83,39 @@ public Object getProperty(List<String> path) {
}
}

/**
* Counts the number of inner buckets inside the provided {@link InternalBucket}
*/
public static int countInnerBucket(InternalBucket bucket) {
int count = 0;
for (Aggregation agg : bucket.getAggregations().asList()) {
count += countInnerBucket(agg);
}
return count;
}

/**
* Counts the number of inner buckets inside the provided {@link Aggregation}
*/
public static int countInnerBucket(Aggregation agg) {
int size = 0;
if (agg instanceof MultiBucketsAggregation) {
MultiBucketsAggregation multi = (MultiBucketsAggregation) agg;
for (MultiBucketsAggregation.Bucket bucket : multi.getBuckets()) {
++ size;
for (Aggregation bucketAgg : bucket.getAggregations().asList()) {
size += countInnerBucket(bucketAgg);
}
}
} else if (agg instanceof SingleBucketAggregation) {
SingleBucketAggregation single = (SingleBucketAggregation) agg;
for (Aggregation bucketAgg : single.getAggregations().asList()) {
size += countInnerBucket(bucketAgg);
}
}
return size;
}

public abstract static class InternalBucket implements Bucket, Writeable {

public Object getProperty(String containingAggName, List<String> path) {
Expand Down
Loading

0 comments on commit caea6b7

Please sign in to comment.