Skip to content

Commit

Permalink
Sort field tiebreaker for PIT (point in time) readers
Browse files Browse the repository at this point in the history
This commit introduces a new sort field called `_shard_doc` that
can be used in conjunction with a PIT to consistently tiebreak
identical sort values. The sort value is a numeric long that is
composed of the ordinal of the shard (assigned by the coordinating node)
and the internal Lucene document ID. These two values are consistent within
a PIT so this sort criteria can be used as the tiebreaker of any search
requests.
Since this sort criteria is stable we'd like to add it automatically to any
sorted search requests that use a PIT but we also need to expose it explicitly
in order to be able to:
* Reverse the order of the tiebreaking, useful to search "before" `search_after`.
* Force the primary sort to use it in order to benefit from the `search_after` optimization when sorting by index order (to be released in Lucene 8.8.

I plan to add the documentation and the automatic configuration for PIT in a follow up since this change is already big.

Relates elastic#56828
  • Loading branch information
jimczi committed Dec 9, 2020
1 parent e1d594b commit b34df54
Show file tree
Hide file tree
Showing 54 changed files with 489 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void setup() {
indexService = createIndex("test", settings, "t",
"text_shingle", "type=text,analyzer=text_shingle",
"text_shingle_unigram", "type=text,analyzer=text_shingle_unigram");
shardContext = indexService.newQueryShardContext(0, null, () -> 0L, null, emptyMap());
shardContext = indexService.newQueryShardContext(0, 0, null, () -> 0L, null, emptyMap());

// parsed queries for "text_shingle_unigram:(foo bar baz)" with query parsers
// that ignores position length attribute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ private static Response prepareRamIndex(Request request,
searcher.setQueryCache(null);
final long absoluteStartMillis = System.currentTimeMillis();
QueryShardContext context =
indexService.newQueryShardContext(0, searcher, () -> absoluteStartMillis, null, emptyMap());
indexService.newQueryShardContext(0, 0, searcher, () -> absoluteStartMillis, null, emptyMap());
return handler.apply(context, indexReader.leaves().get(0));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testNeedsScores() {
contexts.put(NumberSortScript.CONTEXT, Whitelist.BASE_WHITELISTS);
PainlessScriptEngine service = new PainlessScriptEngine(Settings.EMPTY, contexts);

QueryShardContext shardContext = index.newQueryShardContext(0, null, () -> 0, null, emptyMap());
QueryShardContext shardContext = index.newQueryShardContext(0, 0, null, () -> 0, null, emptyMap());

NumberSortScript.Factory factory = service.compile(null, "1.2", NumberSortScript.CONTEXT, Collections.emptyMap());
NumberSortScript.LeafFactory ss = factory.newFactory(Collections.emptyMap(), shardContext.lookup());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ public void testQueryWithRewrite() throws Exception {
XContentType.JSON));
BytesRef qbSource = doc.rootDoc().getFields(fieldType.queryBuilderField.name())[0].binaryValue();
QueryShardContext shardContext = indexService.newQueryShardContext(
randomInt(20), null, () -> {
randomInt(20), 0, null, () -> {
throw new UnsupportedOperationException();
}, null, emptyMap());
PlainActionFuture<QueryBuilder> future = new PlainActionFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void testRangeQueriesWithNow() throws Exception {
try (Engine.Searcher searcher = indexService.getShard(0).acquireSearcher("test")) {
long[] currentTime = new long[] {System.currentTimeMillis()};
QueryShardContext queryShardContext =
indexService.newQueryShardContext(0, searcher, () -> currentTime[0], null, emptyMap());
indexService.newQueryShardContext(0, 0, searcher, () -> currentTime[0], null, emptyMap());

BytesReference source = BytesReference.bytes(jsonBuilder().startObject()
.field("field1", "value")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@
---
"date_nanos":
- skip:
version: " - 7.99.99"
reason: fixed in 8.0.0 to be backported to 7.10.0
version: " - 7.9.99"
reason: fixed in 7.10.0

- do:
indices.create:
Expand Down Expand Up @@ -218,3 +218,27 @@
- match: {hits.hits.0._source.timestamp: "2019-10-21 00:30:04.828740" }
- match: {hits.hits.0.sort: [1571617804828740000] }


---
"_shard_doc sort":
- skip:
version: " - 7.99.99"
reason: _shard_doc sort was added in 8.0 (TODO adapt version after backport)

- do:
indices.create:
index: test
- do:
index:
index: test
id: 1
body: { id: 1, foo: bar, age: 18 }

- do:
catch: /\[_shard_doc\] sort field cannot be used without \[point in time\]/
search:
index: test
body:
size: 1
sort: [{ _shard_doc }]
search_after: [ 0L ]
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public static Template resolveTemplate(final String matchingTemplate, final Stri
resolvedAliases, tempClusterState.metadata(), aliasValidator, xContentRegistry,
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
tempIndexService.newQueryShardContext(0, null, () -> 0L, null, emptyMap())));
tempIndexService.newQueryShardContext(0, 0, null, () -> 0L, null, emptyMap())));
Map<String, AliasMetadata> aliasesByName = aliases.stream().collect(
Collectors.toMap(AliasMetadata::getAlias, Function.identity()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,9 +290,13 @@ long ramBytesUsedQueryResult(QuerySearchResult result) {
if (hasAggs == false) {
return 0;
}
return result.aggregations()
.asSerialized(InternalAggregations::readFrom, namedWriteableRegistry)
.ramBytesUsed();
if (result.aggregations().isSerialized()) {
return result.aggregations()
.asSerialized(InternalAggregations::readFrom, namedWriteableRegistry)
.ramBytesUsed();
} else {
return result.aggregations().expand().getSerializedSize();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.elasticsearch.search.builder.PointInTimeBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.search.sort.ShardDocSortField;
import org.elasticsearch.tasks.TaskId;

import java.io.IOException;
Expand Down Expand Up @@ -280,6 +283,14 @@ public ActionRequestValidationException validate() {
if (scroll) {
validationException = addValidationError("using [point in time] is not allowed in a scroll context", validationException);
}
} else if (source != null && source.sorts() != null) {
for (SortBuilder<?> sortBuilder : source.sorts()) {
if (sortBuilder instanceof FieldSortBuilder
&& ShardDocSortField.NAME.equals(((FieldSortBuilder) sortBuilder).getFieldName())) {
validationException = addValidationError("[" + FieldSortBuilder.SHARD_DOC_FIELD_NAME
+ "] sort field cannot be used without [point in time]", validationException);
}
}
}
return validationException;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ private ClusterState applyCreateIndexRequestWithV1Templates(final ClusterState c
MetadataIndexTemplateService.resolveAliases(templates), currentState.metadata(), aliasValidator,
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
xContentRegistry, indexService.newQueryShardContext(0, null, () -> 0L, null, emptyMap())),
xContentRegistry, indexService.newQueryShardContext(0, 0, null, () -> 0L, null, emptyMap())),
templates.stream().map(IndexTemplateMetadata::getName).collect(toList()), metadataTransformer);
}

Expand Down Expand Up @@ -520,7 +520,7 @@ private ClusterState applyCreateIndexRequestWithV2Template(final ClusterState cu
MetadataIndexTemplateService.resolveAliases(currentState.metadata(), templateName), currentState.metadata(), aliasValidator,
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
xContentRegistry, indexService.newQueryShardContext(0, null, () -> 0L, null, emptyMap())),
xContentRegistry, indexService.newQueryShardContext(0, 0, null, () -> 0L, null, emptyMap())),
Collections.singletonList(templateName), metadataTransformer);
}

Expand Down Expand Up @@ -566,7 +566,7 @@ private ClusterState applyCreateIndexRequestWithExistingMetadata(final ClusterSt
currentState.metadata(), aliasValidator, xContentRegistry,
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
indexService.newQueryShardContext(0, null, () -> 0L, null, emptyMap())),
indexService.newQueryShardContext(0, 0, null, () -> 0L, null, emptyMap())),
List.of(), metadataTransformer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ public ClusterState applyAliasActions(ClusterState currentState, Iterable<AliasA
}
// the context is only used for validation so it's fine to pass fake values for the shard id,
// but the current timestamp should be set to real value as we may use `now` in a filtered alias
aliasValidator.validateAliasFilter(alias, filter, indexService.newQueryShardContext(0, null,
() -> System.currentTimeMillis(), null, emptyMap()), xContentRegistry);
aliasValidator.validateAliasFilter(alias, filter, indexService.newQueryShardContext(0, -1,
null, () -> System.currentTimeMillis(), null, emptyMap()), xContentRegistry);
}
};
if (action.apply(newAliasValidator, metadata, index)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1118,7 +1118,7 @@ private static void validateCompositeTemplate(final ClusterState state,
new AliasValidator(),
// the context is only used for validation so it's fine to pass fake values for the
// shard id and the current timestamp
xContentRegistry, tempIndexService.newQueryShardContext(0, null, () -> 0L, null, emptyMap()));
xContentRegistry, tempIndexService.newQueryShardContext(0, 0, null, () -> 0L, null, emptyMap()));

// triggers inclusion of _timestamp field and its validation:
String indexName = DataStream.BACKING_INDEX_PREFIX + temporaryIndexName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ private DelayableWriteable() {}
* {@code true} if the {@linkplain Writeable} is being stored in
* serialized form, {@code false} otherwise.
*/
abstract boolean isSerialized();
public abstract boolean isSerialized();

private static class Referencing<T extends Writeable> extends DelayableWriteable<T> {
private final T reference;
Expand Down Expand Up @@ -109,7 +109,7 @@ public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry regis
}

@Override
boolean isSerialized() {
public boolean isSerialized() {
return false;
}

Expand Down Expand Up @@ -179,7 +179,7 @@ public Serialized<T> asSerialized(Reader<T> reader, NamedWriteableRegistry regis
}

@Override
boolean isSerialized() {
public boolean isSerialized() {
return true;
}

Expand Down
27 changes: 17 additions & 10 deletions server/src/main/java/org/elasticsearch/common/lucene/Lucene.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.elasticsearch.index.analysis.AnalyzerScope;
import org.elasticsearch.index.analysis.NamedAnalyzer;
import org.elasticsearch.index.fielddata.IndexFieldData;
import org.elasticsearch.search.sort.ShardDocSortField;

import java.io.IOException;
import java.math.BigInteger;
Expand Down Expand Up @@ -566,29 +567,35 @@ public static void writeSortType(StreamOutput out, SortField.Type sortType) thro
out.writeVInt(sortType.ordinal());
}

public static void writeSortField(StreamOutput out, SortField sortField) throws IOException {
/**
* Returns the generic version of the provided {@link SortField} that
* can be used to merge documents coming from different shards.
*/
private static SortField rewriteMergeSortField(SortField sortField) {
if (sortField.getClass() == GEO_DISTANCE_SORT_TYPE_CLASS) {
// for geo sorting, we replace the SortField with a SortField that assumes a double field.
// this works since the SortField is only used for merging top docs
SortField newSortField = new SortField(sortField.getField(), SortField.Type.DOUBLE);
newSortField.setMissingValue(sortField.getMissingValue());
sortField = newSortField;
return newSortField;
} else if (sortField.getClass() == SortedSetSortField.class) {
// for multi-valued sort field, we replace the SortedSetSortField with a simple SortField.
// It works because the sort field is only used to merge results from different shards.
SortField newSortField = new SortField(sortField.getField(), SortField.Type.STRING, sortField.getReverse());
newSortField.setMissingValue(sortField.getMissingValue());
sortField = newSortField;
return newSortField;
} else if (sortField.getClass() == SortedNumericSortField.class) {
// for multi-valued sort field, we replace the SortedSetSortField with a simple SortField.
// It works because the sort field is only used to merge results from different shards.
SortField newSortField = new SortField(sortField.getField(),
((SortedNumericSortField) sortField).getNumericType(),
sortField.getReverse());
newSortField.setMissingValue(sortField.getMissingValue());
sortField = newSortField;
return sortField;
} else if (sortField.getClass() == ShardDocSortField.class) {
SortField newSortField = new SortField(sortField.getField(), SortField.Type.LONG, sortField.getReverse());
return newSortField;
} else {
return sortField;
}
}

public static void writeSortField(StreamOutput out, SortField sortField) throws IOException {
sortField = rewriteMergeSortField(sortField);
if (sortField.getClass() != SortField.class) {
throw new IllegalArgumentException("Cannot serialize SortField impl [" + sortField + "]");
}
Expand Down
26 changes: 22 additions & 4 deletions server/src/main/java/org/elasticsearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public IndexService(
assert indexAnalyzers != null;
this.mapperService = new MapperService(indexSettings, indexAnalyzers, xContentRegistry, similarityService, mapperRegistry,
// we parse all percolator queries as they would be parsed on shard 0
() -> newQueryShardContext(0, null, System::currentTimeMillis, null, emptyMap()), idFieldDataEnabled, scriptService);
() -> newQueryShardContext(0, 0, null, System::currentTimeMillis, null, emptyMap()), idFieldDataEnabled, scriptService);
this.indexFieldData = new IndexFieldDataService(indexSettings, indicesFieldDataCache, circuitBreakerService, mapperService);
if (indexSettings.getIndexSortConfig().hasIndexSort()) {
// we delay the actual creation of the sort order for this index because the mapping has not been merged yet.
Expand Down Expand Up @@ -588,6 +588,7 @@ public IndexSettings getIndexSettings() {
*/
public QueryShardContext newQueryShardContext(
int shardId,
int shardRequestIndex,
IndexSearcher searcher,
LongSupplier nowInMillis,
String clusterAlias,
Expand All @@ -596,9 +597,26 @@ public QueryShardContext newQueryShardContext(
final SearchIndexNameMatcher indexNameMatcher =
new SearchIndexNameMatcher(index().getName(), clusterAlias, clusterService, expressionResolver);
return new QueryShardContext(
shardId, indexSettings, bigArrays, indexCache.bitsetFilterCache(), indexFieldData::getForField, mapperService(),
similarityService(), scriptService, xContentRegistry, namedWriteableRegistry, client, searcher, nowInMillis, clusterAlias,
indexNameMatcher, allowExpensiveQueries, valuesSourceRegistry, runtimeMappings);
shardId,
shardRequestIndex,
indexSettings,
bigArrays,
indexCache.bitsetFilterCache(),
indexFieldData::getForField,
mapperService(),
similarityService(),
scriptService,
xContentRegistry,
namedWriteableRegistry,
client,
searcher,
nowInMillis,
clusterAlias,
indexNameMatcher,
allowExpensiveQueries,
valuesSourceRegistry,
runtimeMappings
);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public class QueryShardContext extends QueryRewriteContext {
private final BitsetFilterCache bitsetFilterCache;
private final TriFunction<MappedFieldType, String, Supplier<SearchLookup>, IndexFieldData<?>> indexFieldDataService;
private final int shardId;
private final int shardRequestIndex;
private final IndexSearcher searcher;
private boolean cacheable = true;
private final SetOnce<Boolean> frozen = new SetOnce<>();
Expand All @@ -114,6 +115,7 @@ public class QueryShardContext extends QueryRewriteContext {
*/
public QueryShardContext(
int shardId,
int shardRequestIndex,
IndexSettings indexSettings,
BigArrays bigArrays,
BitsetFilterCache bitsetFilterCache,
Expand All @@ -134,6 +136,7 @@ public QueryShardContext(
) {
this(
shardId,
shardRequestIndex,
indexSettings,
bigArrays,
bitsetFilterCache,
Expand All @@ -158,13 +161,30 @@ public QueryShardContext(
}

public QueryShardContext(QueryShardContext source) {
this(source.shardId, source.indexSettings, source.bigArrays, source.bitsetFilterCache, source.indexFieldDataService,
source.mapperService, source.similarityService, source.scriptService, source.getXContentRegistry(),
source.getWriteableRegistry(), source.client, source.searcher, source.nowInMillis, source.indexNameMatcher,
source.fullyQualifiedIndex, source.allowExpensiveQueries, source.valuesSourceRegistry, source.runtimeMappings);
this(
source.shardId,
source.shardRequestIndex,
source.indexSettings,
source.bigArrays,
source.bitsetFilterCache,
source.indexFieldDataService,
source.mapperService,
source.similarityService,
source.scriptService,
source.getXContentRegistry(),
source.getWriteableRegistry(),
source.client, source.searcher,
source.nowInMillis,
source.indexNameMatcher,
source.fullyQualifiedIndex,
source.allowExpensiveQueries,
source.valuesSourceRegistry,
source.runtimeMappings
);
}

private QueryShardContext(int shardId,
int shardRequestIndex,
IndexSettings indexSettings,
BigArrays bigArrays,
BitsetFilterCache bitsetFilterCache,
Expand All @@ -184,6 +204,7 @@ private QueryShardContext(int shardId,
Map<String, MappedFieldType> runtimeMappings) {
super(xContentRegistry, namedWriteableRegistry, client, nowInMillis);
this.shardId = shardId;
this.shardRequestIndex = shardRequestIndex;
this.similarityService = similarityService;
this.mapperService = mapperService;
this.bigArrays = bigArrays;
Expand Down Expand Up @@ -520,6 +541,14 @@ public int getShardId() {
return shardId;
}

/**
* Returns the shard request ordinal that is used by the main search request
* to reference this shard.
*/
public int getShardRequestIndex() {
return shardRequestIndex;
}

@Override
public final long nowInMillis() {
failIfFrozen();
Expand Down
Loading

0 comments on commit b34df54

Please sign in to comment.