Skip to content

Commit

Permalink
Pre-sort shards based on the max/min value of the primary sort field
Browse files Browse the repository at this point in the history
This change automatically pre-sort search shards on search requests that use a primary sort based on the value
of a field. When possible, the can_match phase will extract the min/max (depending on the provided sort order) values
of each shard and use it to pre-sort the shards prior to running the subsequent phases. This feature can be useful to
ensure that shards that contain recent data are executed first so that intermediate merge have more chance to contain
contiguous data (think of date_histogram for instance) but it could also be used in a follow up to early terminate sorted
top-hits queries that don't require the total hit count. The latter could significantly speed up the retrieval of the most/least
recent documents from time-based indices.
I took two shortcuts here:
* I reused the can_match phase to add the required information for the shard sort. We could instead introduce a new phase
 but it make sense to me to use the existing phase to add more informations as long as the additional ops are lightweight.
* The shard sort is done automatically if the primary search sort is based on a field. However this sorting only makes sense
if the range of values in each shard doesn't overlap (time-based indices sorted on timestamp for instance). We could add
a new option to enable/disable this behavior or even add an additional `shard_sort` criteria but I also like the fact that
users don't need to set any option to benefit from this feature.

Relates elastic#49091
  • Loading branch information
jimczi committed Nov 14, 2019
1 parent 2723a52 commit 5f91d33
Show file tree
Hide file tree
Showing 13 changed files with 479 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
iterators.add(iterator);
}
}
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
this.shardsIts = new GroupShardsIterator<>(iterators);
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators, false);
this.shardsIts = new GroupShardsIterator<>(iterators, false);
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,23 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.routing.GroupShardsIterator;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.search.SearchService;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchService.CanMatchResponse;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.AliasFilter;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.elasticsearch.transport.Transport;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

/**
Expand All @@ -40,8 +48,12 @@
* from the search. The extra round trip to the search shards is very cheap and is not subject to rejections
* which allows to fan out to more shards at the same time without running into rejections even if we are hitting a
* large portion of the clusters indices.
* This phase can also be used to pre-sort shards based on the maximum value in each shard of the provided primary sort.
* When the query primary sort is perform on a field, this phase extracts the maximum possible value in each shard and
* sort them according to the provided order. This can be useful for instance to ensure that shards that contain recent
* data are executed first.
*/
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<SearchService.CanMatchResponse> {
final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<CanMatchResponse> {

private final Function<GroupShardsIterator<SearchShardIterator>, SearchPhase> phaseFactory;
private final GroupShardsIterator<SearchShardIterator> shardsIts;
Expand All @@ -58,26 +70,26 @@ final class CanMatchPreFilterSearchPhase extends AbstractSearchAsyncAction<Searc
//We set max concurrent shard requests to the number of shards so no throttling happens for can_match requests
super("can_match", logger, searchTransportService, nodeIdToConnection, aliasFilter, concreteIndexBoosts, indexRoutings,
executor, request, listener, shardsIts, timeProvider, clusterStateVersion, task,
new BitSetSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
new CanMatchSearchPhaseResults(shardsIts.size()), shardsIts.size(), clusters);
this.phaseFactory = phaseFactory;
this.shardsIts = shardsIts;
}

@Override
protected void executePhaseOnShard(SearchShardIterator shardIt, ShardRouting shard,
SearchActionListener<SearchService.CanMatchResponse> listener) {
SearchActionListener<CanMatchResponse> listener) {
getSearchTransport().sendCanMatch(getConnection(shardIt.getClusterAlias(), shard.currentNodeId()),
buildShardSearchRequest(shardIt), getTask(), listener);
}

@Override
protected SearchPhase getNextPhase(SearchPhaseResults<SearchService.CanMatchResponse> results,
protected SearchPhase getNextPhase(SearchPhaseResults<CanMatchResponse> results,
SearchPhaseContext context) {

return phaseFactory.apply(getIterator((BitSetSearchPhaseResults) results, shardsIts));
return phaseFactory.apply(getIterator((CanMatchSearchPhaseResults) results, shardsIts));
}

private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseResults results,
private GroupShardsIterator<SearchShardIterator> getIterator(CanMatchSearchPhaseResults results,
GroupShardsIterator<SearchShardIterator> shardsIts) {
int cardinality = results.getNumPossibleMatches();
FixedBitSet possibleMatches = results.getPossibleMatches();
Expand All @@ -86,6 +98,7 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
// to produce a valid search result with all the aggs etc.
possibleMatches.set(0);
}
SearchSourceBuilder source = getRequest().source();
int i = 0;
for (SearchShardIterator iter : shardsIts) {
if (possibleMatches.get(i++)) {
Expand All @@ -94,24 +107,56 @@ private GroupShardsIterator<SearchShardIterator> getIterator(BitSetSearchPhaseRe
iter.resetAndSkip();
}
}
return shardsIts;
if (shouldSortShards(results.sortValues) == false) {
return shardsIts;
}
int sortMul = FieldSortBuilder.getPrimaryFieldSortOrNull(source).order() == SortOrder.ASC ? 1 : -1;
return new GroupShardsIterator<>(sortShards(shardsIts, results.sortValues, sortMul), false);
}

private static List<SearchShardIterator> sortShards(GroupShardsIterator<SearchShardIterator> shardsIts,
Comparable[] sortValues,
int sortMul) {
return IntStream.range(0, shardsIts.size())
.boxed()
.sorted((a, b) -> compareShardSortValues(shardsIts.get(a).shardId(), shardsIts.get(b).shardId(),
sortValues[a], sortValues[b], sortMul))
.map(ord -> shardsIts.get(ord))
.collect(Collectors.toList());
}

private static final class BitSetSearchPhaseResults extends SearchPhaseResults<SearchService.CanMatchResponse> {
private static boolean shouldSortShards(Comparable[] sortValues) {
return Arrays.stream(sortValues).anyMatch(e -> e != null);
}

static int compareShardSortValues(ShardId shard1, ShardId shard2, Comparable v1, Comparable v2, int sortMul) {
final int cmp;
if (v1 == null && v2 == null) {
cmp = 0;
} else if (v1 == null) {
cmp = -1;
} else if (v2 == null) {
cmp = 1 * sortMul;
} else {
cmp = v1.compareTo(v2);
}
return cmp != 0 ? cmp * sortMul : shard1.compareTo(shard2);
}

private static final class CanMatchSearchPhaseResults extends SearchPhaseResults<CanMatchResponse> {
private final FixedBitSet possibleMatches;
private final Comparable[] sortValues;
private int numPossibleMatches;

BitSetSearchPhaseResults(int size) {
CanMatchSearchPhaseResults(int size) {
super(size);
possibleMatches = new FixedBitSet(size);
sortValues = new Comparable[size];
}

@Override
void consumeResult(SearchService.CanMatchResponse result) {
if (result.canMatch()) {
consumeShardFailure(result.getShardIndex());
}
void consumeResult(CanMatchResponse result) {
consumeResult(result.getShardIndex(), result.canMatch(), result.sortValue());
}

@Override
Expand All @@ -120,10 +165,18 @@ boolean hasResult(int shardIndex) {
}

@Override
synchronized void consumeShardFailure(int shardIndex) {
void consumeShardFailure(int shardIndex) {
// we have to carry over shard failures in order to account for them in the response.
possibleMatches.set(shardIndex);
numPossibleMatches++;
consumeResult(shardIndex, true, null);
}

synchronized void consumeResult(int shardIndex, boolean canMatch, Comparable sortValue) {
if (canMatch) {
possibleMatches.set(shardIndex);
numPossibleMatches++;
}
sortValues[shardIndex] = sortValue;

}


Expand All @@ -136,7 +189,7 @@ synchronized FixedBitSet getPossibleMatches() {
}

@Override
Stream<SearchService.CanMatchResponse> getSuccessfulResults() {
Stream<CanMatchResponse> getSuccessfulResults() {
return Stream.empty();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
*/
abstract class SearchActionListener<T extends SearchPhaseResult> implements ActionListener<T> {

private final int requestIndex;
final int requestIndex;
private final SearchShardTarget searchShardTarget;

protected SearchActionListener(SearchShardTarget searchShardTarget,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
Expand Down Expand Up @@ -616,8 +617,8 @@ private static boolean shouldPreFilterSearchShards(SearchRequest searchRequest,
GroupShardsIterator<SearchShardIterator> shardIterators) {
SearchSourceBuilder source = searchRequest.source();
return searchRequest.searchType() == QUERY_THEN_FETCH && // we can't do this for DFS it needs to fan out to all shards all the time
SearchService.canRewriteToMatchNone(source) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
(SearchService.canRewriteToMatchNone(source) || FieldSortBuilder.hasPrimaryFieldSort(source)) &&
searchRequest.getPreFilterShardSize() < shardIterators.size();
}

static GroupShardsIterator<SearchShardIterator> mergeShardsIterators(GroupShardsIterator<ShardIterator> localShardsIterator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,16 @@ public final class GroupShardsIterator<ShardIt extends ShardIterator> implements
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators) {
CollectionUtil.timSort(iterators);
this(iterators, true);
}

/**
* Constructs a enw GroupShardsIterator from the given list.
*/
public GroupShardsIterator(List<ShardIt> iterators, boolean useSort) {
if (useSort) {
CollectionUtil.timSort(iterators);
}
this.iterators = iterators;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public static final class DateFieldType extends MappedFieldType {
protected DateMathParser dateMathParser;
protected Resolution resolution;

DateFieldType() {
public DateFieldType() {
super();
setTokenized(false);
setHasDocValues(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -816,7 +816,7 @@ public final String typeName() {
return name;
}
/** Get the associated numeric type */
final NumericType numericType() {
public final NumericType numericType() {
return numericType;
}
public abstract Query termQuery(String field, Object value);
Expand Down
33 changes: 27 additions & 6 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.TopDocs;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -92,6 +94,7 @@
import org.elasticsearch.search.query.ScrollQuerySearchResult;
import org.elasticsearch.search.rescore.RescorerBuilder;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortAndFormats;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.suggest.Suggest;
Expand Down Expand Up @@ -1012,7 +1015,7 @@ public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String
* This method can have false positives while if it returns <code>false</code> the query won't match any documents on the current
* shard.
*/
public boolean canMatch(ShardSearchRequest request) throws IOException {
public CanMatchResponse canMatch(ShardSearchRequest request) throws IOException {
assert request.searchType() == SearchType.QUERY_THEN_FETCH : "unexpected search type: " + request.searchType();
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
IndexShard indexShard = indexService.getShard(request.shardId().getId());
Expand All @@ -1022,18 +1025,20 @@ public boolean canMatch(ShardSearchRequest request) throws IOException {
QueryShardContext context = indexService.newQueryShardContext(request.shardId().id(), searcher,
request::nowInMillis, request.getClusterAlias());
Rewriteable.rewrite(request.getRewriteable(), context, false);
FieldSortBuilder sortBuilder = FieldSortBuilder.getPrimaryFieldSortOrNull(request.source());
Comparable sortValue = sortBuilder != null ? FieldSortBuilder.getMaxSortValueOrNull(context, sortBuilder) : null;
if (canRewriteToMatchNone(request.source())) {
QueryBuilder queryBuilder = request.source().query();
return queryBuilder instanceof MatchNoneQueryBuilder == false;
return new CanMatchResponse(queryBuilder instanceof MatchNoneQueryBuilder == false, sortValue);
}
return true; // null query means match_all
// null query means match_all
return new CanMatchResponse(true, sortValue);
}
}


public void canMatch(ShardSearchRequest request, ActionListener<CanMatchResponse> listener) {
try {
listener.onResponse(new CanMatchResponse(canMatch(request)));
listener.onResponse(canMatch(request));
} catch (IOException e) {
listener.onFailure(e);
}
Expand All @@ -1052,6 +1057,7 @@ public static boolean canRewriteToMatchNone(SearchSourceBuilder source) {
return aggregations == null || aggregations.mustVisitAllDocs() == false;
}


/*
* Rewrites the search request with a light weight rewrite context in order to fetch resources asynchronously
* The action listener is guaranteed to be executed on the search thread-pool
Expand Down Expand Up @@ -1087,24 +1093,39 @@ public InternalAggregation.ReduceContext createReduceContext(boolean finalReduce

public static final class CanMatchResponse extends SearchPhaseResult {
private final boolean canMatch;
private final Comparable sortValue;

public CanMatchResponse(StreamInput in) throws IOException {
super(in);
this.canMatch = in.readBoolean();
if (in.getVersion().onOrAfter(Version.V_8_0_0)) {
sortValue = Lucene.readSortValue(in);
} else {
sortValue = null;
}
}

public CanMatchResponse(boolean canMatch) {
public CanMatchResponse(boolean canMatch, @Nullable Comparable sortValue) {
this.canMatch = canMatch;
this.sortValue = sortValue;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(canMatch);
if (out.getVersion().onOrAfter(Version.V_8_0_0)) {
Lucene.writeSortValue(out, sortValue);
}
}

public boolean canMatch() {
return canMatch;
}

@Nullable
public Comparable sortValue() {
return sortValue;
}
}

/**
Expand Down
Loading

0 comments on commit 5f91d33

Please sign in to comment.