Skip to content

Commit

Permalink
Sort leaves on search according to the primary numeric sort field
Browse files Browse the repository at this point in the history
This change pre-sort the index reader leaves (segment) prior to search
when the primary sort is a numeric field eligible to the distance feature
optimization. It also adds a tie breaker on `_doc` to the rewritten sort
in order to bypass the fact that leaves will be collected in a random order.
I ran this patch on the http_logs benchmark and the results are very promising:

```
|                                       50th percentile latency | desc_sort_timestamp |    220.706 |      136544 |   136324 |     ms |
|                                       90th percentile latency | desc_sort_timestamp |    244.847 |      162084 |   161839 |     ms |
|                                       99th percentile latency | desc_sort_timestamp |    316.627 |      172005 |   171688 |     ms |
|                                      100th percentile latency | desc_sort_timestamp |    335.306 |      173325 |   172989 |     ms |
|                                  50th percentile service time | desc_sort_timestamp |    218.369 |     1968.11 |  1749.74 |     ms |
|                                  90th percentile service time | desc_sort_timestamp |    244.182 |      2447.2 |  2203.02 |     ms |
|                                  99th percentile service time | desc_sort_timestamp |    313.176 |     2950.85 |  2637.67 |     ms |
|                                 100th percentile service time | desc_sort_timestamp |    332.924 |     2959.38 |  2626.45 |     ms |
|                                                    error rate | desc_sort_timestamp |          0 |           0 |        0 |      % |
|                                                Min Throughput |  asc_sort_timestamp |   0.801824 |    0.800855 | -0.00097 |  ops/s |
|                                             Median Throughput |  asc_sort_timestamp |   0.802595 |    0.801104 | -0.00149 |  ops/s |
|                                                Max Throughput |  asc_sort_timestamp |   0.803282 |    0.801351 | -0.00193 |  ops/s |
|                                       50th percentile latency |  asc_sort_timestamp |    220.761 |     824.098 |  603.336 |     ms |
|                                       90th percentile latency |  asc_sort_timestamp |    251.741 |     853.984 |  602.243 |     ms |
|                                       99th percentile latency |  asc_sort_timestamp |    368.761 |     893.943 |  525.182 |     ms |
|                                      100th percentile latency |  asc_sort_timestamp |    431.042 |      908.85 |  477.808 |     ms |
|                                  50th percentile service time |  asc_sort_timestamp |    218.547 |     820.757 |  602.211 |     ms |
|                                  90th percentile service time |  asc_sort_timestamp |    249.578 |     849.886 |  600.308 |     ms |
|                                  99th percentile service time |  asc_sort_timestamp |    366.317 |     888.894 |  522.577 |     ms |
|                                 100th percentile service time |  asc_sort_timestamp |    430.952 |     908.401 |   477.45 |     ms |
|                                                    error rate |  asc_sort_timestamp |          0 |           0 |        0 |      % |
```

So roughly 10x faster for the descending sort and 2-3x faster in the ascending case. Note
that I indexed the http_logs with a single client in order to simulate real time-based indices
where document are indexed in their timestamp order.

Relates elastic#37043
  • Loading branch information
jimczi committed Jul 5, 2019
1 parent 04e5e41 commit 3a18a39
Show file tree
Hide file tree
Showing 8 changed files with 266 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.TermStates;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.BulkScorer;
import org.apache.lucene.search.CollectionStatistics;
import org.apache.lucene.search.CollectionTerminatedException;
Expand Down Expand Up @@ -132,12 +133,67 @@ public Weight createWeight(Query query, ScoreMode scoreMode, float boost) throws
}
}

private void checkCancelled() {
if (checkCancelled != null) {
checkCancelled.run();
}
}

@Override
protected void search(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
final Weight cancellableWeight;
if (checkCancelled != null) {
cancellableWeight = new Weight(weight.getQuery()) {
for (LeafReaderContext ctx : leaves) { // search each subreader
searchLeaf(ctx, weight, collector);
}
}

/**
* Lower-level search API.
*
* {@link LeafCollector#collect(int)} is called for every matching document in
* the provided <code>ctx</code>.
*/
public void searchLeaf(LeafReaderContext ctx, Weight weight, Collector collector) throws IOException {
checkCancelled();
weight = wrapWeight(weight);
final LeafCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
return;
}
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(ctx.reader().getLiveDocs());
if (liveDocsBitSet == null) {
BulkScorer bulkScorer = weight.bulkScorer(ctx);
if (bulkScorer != null) {
try {
bulkScorer.score(leafCollector, liveDocs);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
} else {
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
Scorer scorer = weight.scorer(ctx);
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
checkCancelled == null ? () -> {
} : checkCancelled);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
}
}

private Weight wrapWeight(Weight weight) {
if (checkCancelled != null) {
return new Weight(weight.getQuery()) {
@Override
public void extractTerms(Set<Term> terms) {
throw new UnsupportedOperationException();
Expand Down Expand Up @@ -169,48 +225,10 @@ public BulkScorer bulkScorer(LeafReaderContext context) throws IOException {
}
};
} else {
cancellableWeight = weight;
return weight;
}
searchInternal(leaves, cancellableWeight, collector);
}

private void searchInternal(List<LeafReaderContext> leaves, Weight weight, Collector collector) throws IOException {
for (LeafReaderContext ctx : leaves) { // search each subreader
final LeafCollector leafCollector;
try {
leafCollector = collector.getLeafCollector(ctx);
} catch (CollectionTerminatedException e) {
// there is no doc of interest in this reader context
// continue with the following leaf
continue;
}
Bits liveDocs = ctx.reader().getLiveDocs();
BitSet liveDocsBitSet = getSparseBitSetOrNull(ctx.reader().getLiveDocs());
if (liveDocsBitSet == null) {
BulkScorer bulkScorer = weight.bulkScorer(ctx);
if (bulkScorer != null) {
try {
bulkScorer.score(leafCollector, liveDocs);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
} else {
// if the role query result set is sparse then we should use the SparseFixedBitSet for advancing:
Scorer scorer = weight.scorer(ctx);
if (scorer != null) {
try {
intersectScorerAndBitSet(scorer, liveDocsBitSet, leafCollector,
checkCancelled == null ? () -> {} : checkCancelled);
} catch (CollectionTerminatedException e) {
// collection was terminated prematurely
// continue with the following leaf
}
}
}
}
}

private static BitSet getSparseBitSetOrNull(Bits liveDocs) {
if (liveDocs instanceof SparseFixedBitSet) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,6 @@ protected InternalProfileCollector createWithProfiler(InternalProfileCollector i
};
}

/**
* Creates a collector that throws {@link TaskCancelledException} if the search is cancelled
*/
static QueryCollectorContext createCancellableCollectorContext(BooleanSupplier cancelled) {
return new QueryCollectorContext(REASON_SEARCH_CANCELLED) {
@Override
Collector create(Collector in) throws IOException {
return new CancellableCollector(cancelled, in);
}
};
}

/**
* Creates collector limiting the collection to the first <code>numHits</code> documents
*/
Expand Down
75 changes: 53 additions & 22 deletions server/src/main/java/org/elasticsearch/search/query/QueryPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocValuesFieldExistsQuery;
import org.apache.lucene.search.FieldDoc;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
Expand All @@ -43,8 +42,10 @@
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TopFieldDocs;
import org.apache.lucene.search.TotalHits;
import org.apache.lucene.search.Weight;
import org.elasticsearch.action.search.SearchTask;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.search.TopDocsAndMaxScore;
import org.elasticsearch.common.util.concurrent.QueueResizingEsThreadPoolExecutor;
Expand All @@ -68,12 +69,14 @@
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

import static org.elasticsearch.search.query.QueryCollectorContext.createCancellableCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createEarlyTerminationCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createFilteredCollectorContext;
import static org.elasticsearch.search.query.QueryCollectorContext.createMinScoreCollectorContext;
Expand All @@ -89,7 +92,7 @@
public class QueryPhase implements SearchPhase {
private static final Logger LOGGER = LogManager.getLogger(QueryPhase.class);
public static final boolean SYS_PROP_LONG_SORT_OPTIMIZED =
Booleans.parseBoolean(System.getProperty("es.search.long_sort_optimized", "false"));
Booleans.parseBoolean(System.getProperty("es.search.long_sort_optimized", "true"));

private final AggregationPhase aggregationPhase;
private final SuggestPhase suggestPhase;
Expand Down Expand Up @@ -124,8 +127,7 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
// request, preProcess is called on the DFS phase phase, this is why we pre-process them
// here to make sure it happens during the QUERY phase
aggregationPhase.preProcess(searchContext);
final ContextIndexSearcher searcher = searchContext.searcher();
boolean rescore = execute(searchContext, searchContext.searcher(), searcher::setCheckCancelled);
boolean rescore = executeInternal(searchContext);

if (rescore) { // only if we do a regular search
rescorePhase.execute(searchContext);
Expand All @@ -145,9 +147,8 @@ public void execute(SearchContext searchContext) throws QueryPhaseExecutionExcep
* wire everything (mapperService, etc.)
* @return whether the rescoring phase should be executed
*/
static boolean execute(SearchContext searchContext,
final IndexSearcher searcher,
Consumer<Runnable> checkCancellationSetter) throws QueryPhaseExecutionException {
static boolean executeInternal(SearchContext searchContext) throws QueryPhaseExecutionException {
final ContextIndexSearcher searcher = searchContext.searcher();
SortAndFormats sortAndFormatsForRewrittenNumericSort = null;
final IndexReader reader = searcher.getIndexReader();
QuerySearchResult queryResult = searchContext.queryResult();
Expand Down Expand Up @@ -220,6 +221,7 @@ static boolean execute(SearchContext searchContext,
hasFilterCollector = true;
}

CheckedConsumer<List<LeafReaderContext>, IOException> leafSorter = l -> {};
// try to rewrite numeric or date sort to the optimized distanceFeatureQuery
if ((searchContext.sort() != null) && SYS_PROP_LONG_SORT_OPTIMIZED) {
Query rewrittenQuery = tryRewriteLongSort(searchContext, searcher.getIndexReader(), query, hasFilterCollector);
Expand All @@ -228,14 +230,17 @@ static boolean execute(SearchContext searchContext,
// modify sorts: add sort on _score as 1st sort, and move the sort on the original field as the 2nd sort
SortField[] oldSortFields = searchContext.sort().sort.getSort();
DocValueFormat[] oldFormats = searchContext.sort().formats;
SortField[] newSortFields = new SortField[oldSortFields.length + 1];
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 1];
SortField[] newSortFields = new SortField[oldSortFields.length + 2];
DocValueFormat[] newFormats = new DocValueFormat[oldSortFields.length + 2];
newSortFields[0] = SortField.FIELD_SCORE;
newFormats[0] = DocValueFormat.RAW;
newSortFields[newSortFields.length-1] = SortField.FIELD_SCORE;
newFormats[newSortFields.length-1] = DocValueFormat.RAW;
System.arraycopy(oldSortFields, 0, newSortFields, 1, oldSortFields.length);
System.arraycopy(oldFormats, 0, newFormats, 1, oldFormats.length);
sortAndFormatsForRewrittenNumericSort = searchContext.sort(); // stash SortAndFormats to restore it later
searchContext.sort(new SortAndFormats(new Sort(newSortFields), newFormats));
leafSorter = createLeafSorter(oldSortFields[0]);
}
}

Expand Down Expand Up @@ -279,16 +284,11 @@ static boolean execute(SearchContext searchContext,
checkCancelled = null;
}

checkCancellationSetter.accept(checkCancelled);

// add cancellable
// this only performs segment-level cancellation, which is cheap and checked regardless of
// searchContext.lowLevelCancellation()
collectors.add(createCancellableCollectorContext(searchContext.getTask()::isCancelled));
searcher.setCheckCancelled(checkCancelled);

final boolean doProfile = searchContext.getProfilers() != null;
// create the top docs collector last when the other collectors are known
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, reader, hasFilterCollector);
final TopDocsCollectorContext topDocsFactory = createTopDocsCollectorContext(searchContext, hasFilterCollector);
// add the top docs collector, the first collector context in the chain
collectors.addFirst(topDocsFactory);

Expand All @@ -302,7 +302,12 @@ static boolean execute(SearchContext searchContext,
}

try {
searcher.search(query, queryCollector);
Weight weight = searcher.createWeight(searcher.rewrite(query), queryCollector.scoreMode(), 1f);
List<LeafReaderContext> leaves = new ArrayList<>(searcher.getIndexReader().leaves());
leafSorter.accept(leaves);
for (LeafReaderContext ctx : leaves) {
searcher.searchLeaf(ctx, weight, queryCollector);
}
} catch (EarlyTerminatingCollector.EarlyTerminationException e) {
queryResult.terminatedEarly(true);
} catch (TimeExceededException e) {
Expand Down Expand Up @@ -427,13 +432,39 @@ private static Query tryRewriteLongSort(SearchContext searchContext, IndexReader
return rewrittenQuery;
}

// Restore fieldsDocs to remove the first _score sort
// updating in place without creating new FieldDoc objects
/**
* Creates a sorter of {@link LeafReaderContext} that orders leaves depending on the minimum
* value and the sort order of the provided <code>sortField</code>.
*/
static CheckedConsumer<List<LeafReaderContext>, IOException> createLeafSorter(SortField sortField) {
return leaves -> {
long[] minValues = new long[leaves.size()];
long missingValue = (long) sortField.getMissingValue();
for (LeafReaderContext ctx : leaves) {
PointValues values = ctx.reader().getPointValues(sortField.getField());
if (values == null) {
minValues[ctx.ord] = (long) sortField.getMissingValue();
} else {
byte[] minValue = values.getMinPackedValue();
minValues[ctx.ord] = minValue == null ? missingValue : LongPoint.decodeDimension(minValue, 0);
}
}
Comparator<LeafReaderContext> comparator = Comparator.comparingLong(l -> minValues[l.ord]);
if (sortField.getReverse()) {
comparator = comparator.reversed();
}
Collections.sort(leaves, comparator);
};
}

/**
* Restore fieldsDocs to remove the first _score and last _doc sort.
*/
static void restoreTopFieldDocs(QuerySearchResult result, SortAndFormats originalSortAndFormats) {
TopDocs topDocs = result.topDocs().topDocs;
for (ScoreDoc scoreDoc : topDocs.scoreDocs) {
FieldDoc fieldDoc = (FieldDoc) scoreDoc;
fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length);
fieldDoc.fields = Arrays.copyOfRange(fieldDoc.fields, 1, fieldDoc.fields.length-1);
}
TopFieldDocs newTopDocs = new TopFieldDocs(topDocs.totalHits, topDocs.scoreDocs, originalSortAndFormats.sort.getSort());
result.topDocs(new TopDocsAndMaxScore(newTopDocs, Float.NaN), originalSortAndFormats.formats);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ static int shortcutTotalHitCount(IndexReader reader, Query query) throws IOExcep
* @param hasFilterCollector True if the collector chain contains at least one collector that can filters document.
*/
static TopDocsCollectorContext createTopDocsCollectorContext(SearchContext searchContext,
IndexReader reader,
boolean hasFilterCollector) throws IOException {
final IndexReader reader = searchContext.searcher().getIndexReader();
final Query query = searchContext.query();
// top collectors don't like a size of 0
final int totalNumDocs = Math.max(1, reader.numDocs());
Expand Down
Loading

0 comments on commit 3a18a39

Please sign in to comment.