Skip to content

Commit

Permalink
Automatic tie-breaking for sorted queries within a PIT
Browse files Browse the repository at this point in the history
This change generates a tiebreaker automatically for sorted queries that are executed
under a PIT (point in time reader). This allows to paginate consistently over the matching documents without
requiring to provide a sort criteria that is unique per document.
The tiebreaker is automatically added as the last sort values of the search hits in the response.
It is then used by `search_after` to ensure that pagination will not miss any documents and that each document
will appear only once.
This commit also allows queries sorted by internal Lucene id (`_doc`) to be optimized if they are executed
under a PIT the same way than scroll queries.

Closes elastic#56828
  • Loading branch information
jimczi committed Nov 24, 2020
1 parent 7505d7d commit 826e2fc
Show file tree
Hide file tree
Showing 13 changed files with 324 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ To get the first page of results, submit a search request with a `sort`
argument. If using a PIT, specify the PIT ID in the `pit.id` parameter and omit
the target data stream or index from the request path.

IMPORTANT: We recommend you include a tiebreaker field in your `sort`. This
tiebreaker field should contain a unique value for each document. If you don't
include a tiebreaker field, your paged results could miss or duplicate hits.
NOTE: Search after requests have optimizations that make them faster when the sort
order is `_doc` and total hits are not tracked. If you want to iterate over all documents regardless of the
order, this is the most efficient option.

[source,console]
----
Expand All @@ -90,8 +90,7 @@ GET /_search
"keep_alive": "1m"
},
"sort": [ <2>
{"@timestamp": "asc"},
{"tie_breaker_id": "asc"}
{"@timestamp": "asc"}
]
}
----
Expand All @@ -101,7 +100,9 @@ GET /_search
<2> Sorts hits for the search.

The search response includes an array of `sort` values for each hit. If you used
a PIT, the response's `pit_id` parameter contains an updated PIT ID.
a PIT, the response's `pit_id` parameter contains an updated PIT ID and a tiebreaker
is included as the last `sort` values for each hit. This tiebreaker is a unique value for each document that allows
consistent pagination within a `pit_id`.

[source,console-result]
----
Expand All @@ -122,7 +123,7 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.
"_source" : ...,
"sort" : [ <2>
4098435132000,
"FaslK3QBySSL_rrj9zM5"
4294967298 <3>
]
}
]
Expand All @@ -133,9 +134,10 @@ a PIT, the response's `pit_id` parameter contains an updated PIT ID.

<1> Updated `id` for the point in time.
<2> Sort values for the last returned hit.
<3> The tiebreaker value, unique per document within the `pit_id`.

To get the next page of results, rerun the previous search using the last hit's
sort values as the `search_after` argument. If using a PIT, use the latest PIT
sort values (including the tiebreaker) as the `search_after` argument. If using a PIT, use the latest PIT
ID in the `pit.id` parameter. The search's `query` and `sort` arguments must
remain unchanged. If provided, the `from` argument must be `0` (default) or `-1`.

Expand All @@ -154,19 +156,20 @@ GET /_search
"keep_alive": "1m"
},
"sort": [
{"@timestamp": "asc"},
{"tie_breaker_id": "asc"}
{"@timestamp": "asc"}
],
"search_after": [ <2>
4098435132000,
"FaslK3QBySSL_rrj9zM5"
]
4294967298
],
"track_total_hits": false <3>
}
----
// TEST[catch:missing]

<1> PIT ID returned by the previous search.
<2> Sort values from the previous search's last hit.
<3> Disable the tracking of total hits to speed up pagination.

You can repeat this process to get additional pages of results. If using a PIT,
you can extend the PIT's retention period using the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,9 @@ public final void run() {
}
for (int index = 0; index < shardsIts.size(); index++) {
final SearchShardIterator shardRoutings = shardsIts.get(index);
shardRoutings.setShardIndex(index);
assert shardRoutings.skip() == false;
performPhaseOnShard(index, shardRoutings, shardRoutings.nextOrNull());
performPhaseOnShard(shardRoutings, shardRoutings.nextOrNull());
}
}
}
Expand All @@ -225,7 +226,7 @@ void skipShard(SearchShardIterator iterator) {
successfulShardExecution(iterator);
}

private void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
private void performPhaseOnShard(final SearchShardIterator shardIt, final SearchShardTarget shard) {
/*
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
Expand All @@ -236,7 +237,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
if (shard == null) {
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(),
shardIt.getClusterAlias(), shardIt.getOriginalIndices());
fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
fork(() -> onShardFailure(unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
} else {
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
Expand All @@ -245,7 +246,7 @@ private void performPhaseOnShard(final int shardIndex, final SearchShardIterator
final Thread thread = Thread.currentThread();
try {
executePhaseOnShard(shardIt, shard,
new SearchActionListener<Result>(shard, shardIndex) {
new SearchActionListener<Result>(shard, shardIt.getShardIndex()) {
@Override
public void innerOnResponse(Result result) {
try {
Expand All @@ -258,7 +259,7 @@ public void innerOnResponse(Result result) {
@Override
public void onFailure(Exception t) {
try {
onShardFailure(shardIndex, shard, shardIt, t);
onShardFailure(shard, shardIt, t);
} finally {
executeNext(pendingExecutions, thread);
}
Expand All @@ -270,7 +271,7 @@ public void onFailure(Exception t) {
* It is possible to run into connection exceptions here because we are getting the connection early and might
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
*/
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
fork(() -> onShardFailure(shard, shardIt, e));
} finally {
executeNext(pendingExecutions, thread);
}
Expand Down Expand Up @@ -387,10 +388,10 @@ ShardSearchFailure[] buildShardFailures() {
return failures;
}

private void onShardFailure(final int shardIndex, SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
private void onShardFailure(SearchShardTarget shard, final SearchShardIterator shardIt, Exception e) {
// we always add the shard failure for a specific shard instance
// we do make sure to clean it on a successful response from a shard
onShardFailure(shardIndex, shard, e);
onShardFailure(shardIt.getShardIndex(), shard, e);
final SearchShardTarget nextShard = shardIt.nextOrNull();
final boolean lastShard = nextShard == null;
logger.debug(() -> new ParameterizedMessage("{}: Failed to execute [{}] lastShard [{}]", shard, request, lastShard), e);
Expand All @@ -404,7 +405,7 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final
}
}
}
onShardGroupFailure(shardIndex, shard, e);
onShardGroupFailure(shardIt.getShardIndex(), shard, e);
}
final int totalOps = this.totalOps.incrementAndGet();
if (totalOps == expectedTotalOps) {
Expand All @@ -414,7 +415,7 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final
new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures()));
} else {
if (lastShard == false) {
performPhaseOnShard(shardIndex, shardIt, nextShard);
performPhaseOnShard(shardIt, nextShard);
}
}
}
Expand Down Expand Up @@ -487,6 +488,7 @@ private static boolean isTaskCancelledException(Exception e) {
*/
protected void onShardResult(Result result, SearchShardIterator shardIt) {
assert result.getShardIndex() != -1 : "shard index is not set";
assert result.getShardIndex() == shardIt.getShardIndex() : "shard index is different";
assert result.getSearchShardTarget() != null : "search shard target must not be null";
hasShardResponse.set(true);
if (logger.isTraceEnabled()) {
Expand Down Expand Up @@ -656,19 +658,19 @@ public final ShardSearchRequest buildShardSearchRequest(SearchShardIterator shar
String indexName = shardIt.shardId().getIndex().getName();
final String[] routings = indexRoutings.getOrDefault(indexName, Collections.emptySet())
.toArray(new String[0]);
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(), getNumShards(),
filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings,
ShardSearchRequest shardRequest = new ShardSearchRequest(shardIt.getOriginalIndices(), request, shardIt.shardId(),
getNumShards(), filter, indexBoost, timeProvider.getAbsoluteStartMillis(), shardIt.getClusterAlias(), routings,
shardIt.getSearchContextId(), shardIt.getSearchContextKeepAlive());
// if we already received a search result we can inform the shard that it
// can return a null response if the request rewrites to match none rather
// than creating an empty response in the search thread pool.
// Note that, we have to disable this shortcut for queries that create a context (scroll and search context).
shardRequest.canReturnNullResponseIfMatchNoDocs(hasShardResponse.get() && shardRequest.scroll() == null);
shardRequest.setShardIndex(shardIt.getShardIndex());
return shardRequest;
}

/**
* Returns the next phase based on the results of the initial search phase
* @param results the results of the initial search phase. Each non null element in the result array represent a successfully
* executed shard request
* @param context the search context for the next phase
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.elasticsearch.search.profile.ProfileShardResult;
import org.elasticsearch.search.profile.SearchProfileShardResults;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.searchafter.SearchAfterBuilder;
import org.elasticsearch.search.suggest.Suggest;
import org.elasticsearch.search.suggest.Suggest.Suggestion;
import org.elasticsearch.search.suggest.completion.CompletionSuggestion;
Expand Down Expand Up @@ -349,7 +350,11 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
searchHit.shard(fetchResult.getSearchShardTarget());
if (sortedTopDocs.isSortedByField) {
FieldDoc fieldDoc = (FieldDoc) shardDoc;
searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
if (reducedQueryPhase.hasPIT) {
addSortValuesTie(searchHit, fieldDoc, reducedQueryPhase.sortValueFormats);
} else {
searchHit.sortValues(fieldDoc.fields, reducedQueryPhase.sortValueFormats);
}
if (sortScoreIndex != -1) {
searchHit.score(((Number) fieldDoc.fields[sortScoreIndex]).floatValue());
}
Expand All @@ -363,6 +368,16 @@ private SearchHits getHits(ReducedQueryPhase reducedQueryPhase, boolean ignoreFr
reducedQueryPhase.maxScore, sortedTopDocs.sortFields, sortedTopDocs.collapseField, sortedTopDocs.collapseValues);
}

private void addSortValuesTie(SearchHit searchHit, FieldDoc fieldDoc, DocValueFormat[] sortValueFormats) {
Object[] newFields = new Object[fieldDoc.fields.length+1];
DocValueFormat[] dvFormats = new DocValueFormat[newFields.length];
System.arraycopy(fieldDoc.fields, 0, newFields, 0, fieldDoc.fields.length);
System.arraycopy(sortValueFormats, 0, dvFormats, 0, fieldDoc.fields.length);
newFields[newFields.length-1] = SearchAfterBuilder.createTiebreaker(fieldDoc);
dvFormats[newFields.length-1] = DocValueFormat.RAW;
searchHit.sortValues(newFields, dvFormats);
}

/**
* Reduces the given query results and consumes all aggregations and profile results.
* @param queryResults a list of non-null query shard results
Expand Down Expand Up @@ -416,7 +431,7 @@ ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> quer
if (queryResults.isEmpty()) { // early terminate we have nothing to reduce
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true);
false, null, null, null, null, SortedTopDocs.EMPTY, null, numReducePhases, 0, 0, true, false);
}
int total = queryResults.size();
queryResults = queryResults.stream()
Expand Down Expand Up @@ -477,7 +492,7 @@ ReducedQueryPhase reducedQueryPhase(Collection<? extends SearchPhaseResult> quer
final TotalHits totalHits = topDocsStats.getTotalHits();
return new ReducedQueryPhase(totalHits, topDocsStats.fetchHits, topDocsStats.getMaxScore(),
topDocsStats.timedOut, topDocsStats.terminatedEarly, reducedSuggest, aggregations, shardResults, sortedTopDocs,
firstResult.sortValueFormats(), numReducePhases, size, from, false);
firstResult.sortValueFormats(), numReducePhases, size, from, false, firstResult.hasPIT());
}

private static InternalAggregations reduceAggs(InternalAggregation.ReduceContextBuilder aggReduceContextBuilder,
Expand Down Expand Up @@ -558,10 +573,23 @@ public static final class ReducedQueryPhase {
final int from;
// sort value formats used to sort / format the result
final DocValueFormat[] sortValueFormats;

ReducedQueryPhase(TotalHits totalHits, long fetchHits, float maxScore, boolean timedOut, Boolean terminatedEarly, Suggest suggest,
InternalAggregations aggregations, SearchProfileShardResults shardResults, SortedTopDocs sortedTopDocs,
DocValueFormat[] sortValueFormats, int numReducePhases, int size, int from, boolean isEmptyResult) {
// <code>true</code> if the search request uses a point in time reader
final boolean hasPIT;

ReducedQueryPhase(TotalHits totalHits,
long fetchHits,
float maxScore,
boolean timedOut,
Boolean terminatedEarly,
Suggest suggest,
InternalAggregations aggregations,
SearchProfileShardResults shardResults,
SortedTopDocs sortedTopDocs,
DocValueFormat[] sortValueFormats,
int numReducePhases,
int size, int from,
boolean isEmptyResult,
boolean hasPIT) {
if (numReducePhases <= 0) {
throw new IllegalArgumentException("at least one reduce phase must have been applied but was: " + numReducePhases);
}
Expand All @@ -579,6 +607,7 @@ public static final class ReducedQueryPhase {
this.from = from;
this.isEmptyResult = isEmptyResult;
this.sortValueFormats = sortValueFormats;
this.hasPIT = hasPIT;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator
private final TimeValue searchContextKeepAlive;
private final PlainIterator<String> targetNodesIterator;

private int searchShardIndex;

/**
* Creates a {@link PlainShardIterator} instance that iterates over a subset of the given shards
* this the a given <code>shardId</code>.
Expand All @@ -78,6 +80,17 @@ public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId,
assert searchContextKeepAlive == null || searchContextId != null;
}

void setShardIndex(int shardIndex) {
this.searchShardIndex = shardIndex;
}

/**
* Returns the shard index that is used to tiebreak identical sort values coming from different shards.
*/
int getShardIndex() {
return searchShardIndex;
}

/**
* Returns the original indices associated with this shard iterator, specifically with the cluster that this shard belongs to.
*/
Expand Down
11 changes: 8 additions & 3 deletions server/src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ final SearchContext createContext(ReaderContext readerContext,
if (request.scroll() != null) {
context.scrollContext().scroll = request.scroll();
}
parseSource(context, request.source(), includeAggregations);
parseSource(context, request, includeAggregations);

// if the from and size are still not set, default them
if (context.from() == -1) {
Expand Down Expand Up @@ -878,7 +878,8 @@ private void processFailure(ReaderContext context, Exception exc) {
}
}

private void parseSource(DefaultSearchContext context, SearchSourceBuilder source, boolean includeAggregations) {
private void parseSource(DefaultSearchContext context, ShardSearchRequest request, boolean includeAggregations) {
final SearchSourceBuilder source = request.source();
// nothing to parse...
if (source == null) {
return;
Expand Down Expand Up @@ -1022,7 +1023,11 @@ private void parseSource(DefaultSearchContext context, SearchSourceBuilder sourc
if (context.from() > 0) {
throw new SearchException(shardTarget, "`from` parameter must be set to 0 when `search_after` is used.");
}
FieldDoc fieldDoc = SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter());


FieldDoc fieldDoc = source.pointInTimeBuilder() == null ?
SearchAfterBuilder.buildFieldDoc(context.sort(), source.searchAfter()) :
SearchAfterBuilder.buildFieldDocWithPIT(request.getShardIndex(), context.sort(), source.searchAfter());
context.searchAfter(fieldDoc);
}

Expand Down
Loading

0 comments on commit 826e2fc

Please sign in to comment.