Skip to content

Commit

Permalink
Request-level circuit breaker support on coordinating nodes
Browse files Browse the repository at this point in the history
This commit allows coordinating node to account the memory used to perform partial and final reduce of
aggregations in the request circuit breaker. The search coordinator adds the memory that it used to save
and reduce the results of shard aggregations in the request circuit breaker. Before any partial or final
reduce, the memory needed to reduce the aggregations is estimated and a CircuitBreakingException} is thrown
if exceeds the maximum memory allowed in this breaker.
This size is estimated as roughly 1.5 times the size of the serialized aggregations that need to be reduced.
This estimation can be completely off for some aggregations but it is corrected with the real size after
the reduce completes.
If the reduce is successful, we update the circuit breaker to remove the size of the source aggregations
and replace the estimation with the serialized size of the newly reduced result.

As a follow up we could trigger partial reduces based on the memory accounted in the circuit breaker instead
of relying on a static number of shard responses. A simpler follow up that could be done in the mean time is
to [reduce the default batch reduce size](elastic#51857) of blocking
search request to a more sane number.

Closes elastic#37182
  • Loading branch information
jimczi committed Sep 10, 2020
1 parent aefca5e commit 934a3d8
Show file tree
Hide file tree
Showing 20 changed files with 871 additions and 378 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.index.shard.ShardId;
Expand Down Expand Up @@ -77,7 +79,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
**/
private final BiFunction<String, String, Transport.Connection> nodeIdToConnection;
private final SearchTask task;
final SearchPhaseResults<Result> results;
protected final SearchPhaseResults<Result> results;
private final ClusterState clusterState;
private final Map<String, AliasFilter> aliasFilter;
private final Map<String, Float> concreteIndexBoosts;
Expand All @@ -98,6 +100,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
private final boolean throttleConcurrentRequests;

private final List<Releasable> releasables = new ArrayList<>();

AbstractSearchAsyncAction(String name, Logger logger, SearchTransportService searchTransportService,
BiFunction<String, String, Transport.Connection> nodeIdToConnection,
Map<String, AliasFilter> aliasFilter, Map<String, Float> concreteIndexBoosts,
Expand Down Expand Up @@ -133,7 +137,7 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.executor = executor;
this.request = request;
this.task = task;
this.listener = listener;
this.listener = ActionListener.runAfter(listener, this::releaseContext);
this.nodeIdToConnection = nodeIdToConnection;
this.clusterState = clusterState;
this.concreteIndexBoosts = concreteIndexBoosts;
Expand All @@ -143,6 +147,15 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
this.clusters = clusters;
}

@Override
public void addReleasable(Releasable releasable) {
releasables.add(releasable);
}

public void releaseContext() {
Releasables.close(releasables);
}

/**
* Builds how long it took to execute the search.
*/
Expand Down Expand Up @@ -529,7 +542,7 @@ public void sendSearchResponse(InternalSearchResponse internalSearchResponse, At
ShardSearchFailure[] failures = buildShardFailures();
Boolean allowPartialResults = request.allowPartialSearchResults();
assert allowPartialResults != null : "SearchRequest missing setting for allowPartialSearchResults";
if (request.pointInTimeBuilder() == null && allowPartialResults == false && failures.length > 0) {
if (allowPartialResults == false && failures.length > 0) {
raisePhaseFailure(new SearchPhaseExecutionException("", "Shard failures", null, failures));
} else {
final Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
Expand Down Expand Up @@ -567,6 +580,7 @@ private void raisePhaseFailure(SearchPhaseExecutionException exception) {
}
});
}
Releasables.close(releasables);
listener.onFailure(exception);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.builder.SearchSourceBuilder;
Expand Down Expand Up @@ -76,6 +77,11 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMa
this.shardsIts = shardsIts;
}

@Override
public void addReleasable(Releasable releasable) {
throw new RuntimeException("cannot add releasable in " + getName() + " phase");
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarget shard,
SearchActionListener<CanMatchResponse> listener) {
Expand All @@ -84,8 +90,7 @@ protected void executePhaseOnShard(SearchShardIterator shardIt, SearchShardTarge
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results,
SearchPhaseContext context) {
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results, SearchPhaseContext context) {

return phaseFactory.apply(getIterator((CanMatchSearchPhaseResults) results, shardsIts));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@

import java.io.IOException;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand All @@ -50,18 +49,21 @@ final class DfsQueryPhase extends SearchPhase {

DfsQueryPhase(List<DfsSearchResult> searchResults,
AggregatedDfs dfs,
SearchPhaseController searchPhaseController,
QueryPhaseResultConsumer queryResult,
Function<ArraySearchPhaseResults<SearchPhaseResult>, SearchPhase> nextPhaseFactory,
SearchPhaseContext context, Consumer<Exception> onPartialMergeFailure) {
SearchPhaseContext context) {
super("dfs_query");
this.progressListener = context.getTask().getProgressListener();
this.queryResult = searchPhaseController.newSearchPhaseResults(context, progressListener,
context.getRequest(), context.getNumShards(), onPartialMergeFailure);
this.queryResult = queryResult;
this.searchResults = searchResults;
this.dfs = dfs;
this.nextPhaseFactory = nextPhaseFactory;
this.context = context;
this.searchTransportService = context.getSearchTransport();

// register the release of the query consumer to free up the circuit breaker memory
// at the end of the search
context.addReleasable(queryResult);
}

@Override
Expand Down
Loading

0 comments on commit 934a3d8

Please sign in to comment.