Skip to content

Commit

Permalink
Merge branch 'master' into pipeline-promises
Browse files Browse the repository at this point in the history
* master:
  Align REST specs for HEAD requests
  Remove unnecessary result sorting in SearchPhaseController (elastic#23321)
  Fix SamplerAggregatorTests to have stable and predictable docIds
  Tests: Ensure multi node integ tests wait on first node
  • Loading branch information
jasontedor committed Feb 23, 2017
2 parents 70d57b7 + e579629 commit bd88c03
Show file tree
Hide file tree
Showing 8 changed files with 184 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ class ClusterFormationTasks {
sharedDir.mkdirs()
}
}
List<Task> startTasks = [cleanup]
List<Task> startTasks = []
List<NodeInfo> nodes = []
if (config.numNodes < config.numBwcNodes) {
throw new GradleException("numNodes must be >= numBwcNodes [${config.numNodes} < ${config.numBwcNodes}]")
Expand Down Expand Up @@ -102,7 +102,8 @@ class ClusterFormationTasks {
}
NodeInfo node = new NodeInfo(config, i, project, prefix, elasticsearchVersion, sharedDir)
nodes.add(node)
startTasks.add(configureNode(project, prefix, runner, cleanup, node, distro, nodes.get(0)))
Task dependsOn = startTasks.empty ? cleanup : startTasks.get(0)
startTasks.add(configureNode(project, prefix, runner, dependsOn, node, distro, nodes.get(0)))
}

Task wait = configureWaitTask("${prefix}#wait", project, nodes, startTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@

public class SearchPhaseController extends AbstractComponent {

private static final Comparator<AtomicArray.Entry<? extends QuerySearchResultProvider>> QUERY_RESULT_ORDERING = (o1, o2) -> {
int i = o1.value.shardTarget().getIndex().compareTo(o2.value.shardTarget().getIndex());
if (i == 0) {
i = o1.value.shardTarget().getShardId().id() - o2.value.shardTarget().getShardId().id();
}
return i;
};

private static final ScoreDoc[] EMPTY_DOCS = new ScoreDoc[0];

private final BigArrays bigArrays;
Expand Down Expand Up @@ -150,6 +142,9 @@ private static long optionalSum(long left, long right) {
* named completion suggestion across all shards. If more than one named completion suggestion is specified in the
* request, the suggest docs for a named suggestion are ordered by the suggestion name.
*
* Note: The order of the sorted score docs depends on the shard index in the result array if the merge process needs to disambiguate
* the result. In oder to obtain stable results the shard index (index of the result in the result array) must be the same.
*
* @param ignoreFrom Whether to ignore the from and sort all hits in each shard result.
* Enabled only for scroll search, because that only retrieves hits of length 'size' in the query phase.
* @param resultsArr Shard result holder
Expand All @@ -160,26 +155,31 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray<? extends QuerySearch
return EMPTY_DOCS;
}

final QuerySearchResult result;
boolean canOptimize = false;
QuerySearchResult result = null;
int shardIndex = -1;
if (results.size() == 1) {
canOptimize = true;
result = results.get(0).value.queryResult();
shardIndex = results.get(0).index;
} else {
boolean hasResult = false;
QuerySearchResult resultToOptimize = null;
// lets see if we only got hits from a single shard, if so, we can optimize...
for (AtomicArray.Entry<? extends QuerySearchResultProvider> entry : results) {
if (entry.value.queryResult().hasHits()) {
if (result != null) { // we already have one, can't really optimize
if (hasResult) { // we already have one, can't really optimize
canOptimize = false;
break;
}
canOptimize = true;
result = entry.value.queryResult();
hasResult = true;
resultToOptimize = entry.value.queryResult();
shardIndex = entry.index;
}
}
result = canOptimize ? resultToOptimize : results.get(0).value.queryResult();
assert result != null;
}
if (canOptimize) {
int offset = result.from();
Expand Down Expand Up @@ -225,74 +225,62 @@ public ScoreDoc[] sortDocs(boolean ignoreFrom, AtomicArray<? extends QuerySearch
return docs;
}

@SuppressWarnings("unchecked")
AtomicArray.Entry<? extends QuerySearchResultProvider>[] sortedResults = results.toArray(new AtomicArray.Entry[results.size()]);
Arrays.sort(sortedResults, QUERY_RESULT_ORDERING);
QuerySearchResultProvider firstResult = sortedResults[0].value;

int topN = firstResult.queryResult().size();
int from = firstResult.queryResult().from();
if (ignoreFrom) {
from = 0;
}
final int topN = result.queryResult().size();
final int from = ignoreFrom ? 0 : result.queryResult().from();

final TopDocs mergedTopDocs;
int numShards = resultsArr.length();
if (firstResult.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) firstResult.queryResult().topDocs();
final int numShards = resultsArr.length();
if (result.queryResult().topDocs() instanceof CollapseTopFieldDocs) {
CollapseTopFieldDocs firstTopDocs = (CollapseTopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields);

final CollapseTopFieldDocs[] shardTopDocs = new CollapseTopFieldDocs[numShards];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
final CollapseTopFieldDocs empty = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
sort.getSort(), new Object[0], Float.NaN);
Arrays.fill(shardTopDocs, empty);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = (CollapseTopFieldDocs) topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new CollapseTopFieldDocs(firstTopDocs.field, 0, new FieldDoc[0],
sort.getSort(), new Object[0], Float.NaN);
}
}
mergedTopDocs = CollapseTopFieldDocs.merge(sort, from, topN, shardTopDocs);
} else if (firstResult.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) firstResult.queryResult().topDocs();
} else if (result.queryResult().topDocs() instanceof TopFieldDocs) {
TopFieldDocs firstTopDocs = (TopFieldDocs) result.queryResult().topDocs();
final Sort sort = new Sort(firstTopDocs.fields);

final TopFieldDocs[] shardTopDocs = new TopFieldDocs[resultsArr.length()];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
final TopFieldDocs empty = new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN);
Arrays.fill(shardTopDocs, empty);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = (TopFieldDocs) topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = new TopFieldDocs(0, new FieldDoc[0], sort.getSort(), Float.NaN);
}
}
mergedTopDocs = TopDocs.merge(sort, from, topN, shardTopDocs);
} else {
final TopDocs[] shardTopDocs = new TopDocs[resultsArr.length()];
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
if (result.size() != shardTopDocs.length) {
// TopDocs#merge can't deal with null shard TopDocs
Arrays.fill(shardTopDocs, Lucene.EMPTY_TOP_DOCS);
}
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
TopDocs topDocs = sortedResult.value.queryResult().topDocs();
// the 'index' field is the position in the resultsArr atomic array
shardTopDocs[sortedResult.index] = topDocs;
}
// TopDocs#merge can't deal with null shard TopDocs
for (int i = 0; i < shardTopDocs.length; ++i) {
if (shardTopDocs[i] == null) {
shardTopDocs[i] = Lucene.EMPTY_TOP_DOCS;
}
}
mergedTopDocs = TopDocs.merge(from, topN, shardTopDocs);
}

ScoreDoc[] scoreDocs = mergedTopDocs.scoreDocs;
final Map<String, List<Suggestion<CompletionSuggestion.Entry>>> groupedCompletionSuggestions = new HashMap<>();
// group suggestions and assign shard index
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : sortedResults) {
for (AtomicArray.Entry<? extends QuerySearchResultProvider> sortedResult : results) {
Suggest shardSuggest = sortedResult.value.queryResult().suggest();
if (shardSuggest != null) {
for (CompletionSuggestion suggestion : shardSuggest.filter(CompletionSuggestion.class)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void testSort() throws Exception {
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, suggestions, queryResultSize, false);
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(true, results);
int accumulatedLength = Math.min(queryResultSize, getTotalQueryHits(results));
for (Suggest.Suggestion<?> suggestion : reducedSuggest(results)) {
Expand All @@ -83,14 +83,26 @@ public void testSort() throws Exception {
assertThat(sortedDocs.length, equalTo(accumulatedLength));
}

public void testSortIsIdempotent() throws IOException {
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> results = generateQueryResults(nShards, Collections.emptyList(), queryResultSize,
randomBoolean() || true);
boolean ignoreFrom = randomBoolean();
ScoreDoc[] sortedDocs = searchPhaseController.sortDocs(ignoreFrom, results);

ScoreDoc[] sortedDocs2 = searchPhaseController.sortDocs(ignoreFrom, results);
assertArrayEquals(sortedDocs, sortedDocs2);
}

public void testMerge() throws IOException {
List<CompletionSuggestion> suggestions = new ArrayList<>();
for (int i = 0; i < randomIntBetween(1, 5); i++) {
suggestions.add(new CompletionSuggestion(randomAsciiOfLength(randomIntBetween(1, 5)), randomIntBetween(1, 20)));
}
int nShards = randomIntBetween(1, 20);
int queryResultSize = randomBoolean() ? 0 : randomIntBetween(1, nShards * 2);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize);
AtomicArray<QuerySearchResultProvider> queryResults = generateQueryResults(nShards, suggestions, queryResultSize, false);

// calculate offsets and score doc array
List<ScoreDoc> mergedScoreDocs = new ArrayList<>();
Expand Down Expand Up @@ -127,7 +139,7 @@ public void testMerge() throws IOException {

private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
List<CompletionSuggestion> suggestions,
int searchHitsSize) {
int searchHitsSize, boolean useConstantScore) {
AtomicArray<QuerySearchResultProvider> queryResults = new AtomicArray<>(nShards);
for (int shardIndex = 0; shardIndex < nShards; shardIndex++) {
QuerySearchResult querySearchResult = new QuerySearchResult(shardIndex,
Expand All @@ -138,7 +150,7 @@ private AtomicArray<QuerySearchResultProvider> generateQueryResults(int nShards,
ScoreDoc[] scoreDocs = new ScoreDoc[nDocs];
float maxScore = 0F;
for (int i = 0; i < nDocs; i++) {
float score = Math.abs(randomFloat());
float score = useConstantScore ? 1.0F : Math.abs(randomFloat());
scoreDocs[i] = new ScoreDoc(i, score);
if (score > maxScore) {
maxScore = score;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.RandomIndexWriter;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
Expand All @@ -50,8 +53,11 @@ public void testSampler() throws IOException {
MappedFieldType numericFieldType = new NumberFieldMapper.NumberFieldType(NumberFieldMapper.NumberType.LONG);
numericFieldType.setName("int");

IndexWriterConfig indexWriterConfig = newIndexWriterConfig();
indexWriterConfig.setMaxBufferedDocs(100);
indexWriterConfig.setRAMBufferSizeMB(100); // flush on open to have a single segment with predictable docIds
try (Directory dir = newDirectory();
RandomIndexWriter w = new RandomIndexWriter(random(), dir)) {
IndexWriter w = new IndexWriter(dir, indexWriterConfig)) {
for (long value : new long[] {7, 3, -10, -6, 5, 50}) {
Document doc = new Document();
StringBuilder text = new StringBuilder();
Expand All @@ -67,7 +73,8 @@ public void testSampler() throws IOException {
.shardSize(3)
.subAggregation(new MinAggregationBuilder("min")
.field("int"));
try (IndexReader reader = w.getReader()) {
try (IndexReader reader = DirectoryReader.open(w)) {
assertEquals("test expects a single segment", 1, reader.leaves().size());
IndexSearcher searcher = new IndexSearcher(reader);
Sampler sampler = searchAndReduce(searcher, new TermQuery(new Term("text", "good")), aggBuilder, textFieldType,
numericFieldType);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
{
"exists_source": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/docs-get.html",
"methods": ["HEAD"],
"url": {
"path": "/{index}/{type}/{id}/_source",
"paths": ["/{index}/{type}/{id}/_source"],
"parts": {
"id": {
"type" : "string",
"required" : true,
"description" : "The document ID"
},
"index": {
"type" : "string",
"required" : true,
"description" : "The name of the index"
},
"type": {
"type" : "string",
"required" : true,
"description" : "The type of the document; use `_all` to fetch the first document matching the ID across all types"
}
},
"params": {
"parent": {
"type" : "string",
"description" : "The ID of the parent document"
},
"preference": {
"type" : "string",
"description" : "Specify the node or shard the operation should be performed on (default: random)"
},
"realtime": {
"type" : "boolean",
"description" : "Specify whether to perform the operation in realtime or search mode"
},
"refresh": {
"type" : "boolean",
"description" : "Refresh the shard containing the document before performing the operation"
},
"routing": {
"type" : "string",
"description" : "Specific routing value"
},
"_source": {
"type" : "list",
"description" : "True or false to return the _source field or not, or a list of fields to return"
},
"_source_exclude": {
"type" : "list",
"description" : "A list of fields to exclude from the returned _source field"
},
"_source_include": {
"type" : "list",
"description" : "A list of fields to extract and return from the _source field"
},
"version" : {
"type" : "number",
"description" : "Explicit version number for concurrency control"
},
"version_type": {
"type" : "enum",
"options" : ["internal", "external", "external_gte", "force"],
"description" : "Specific version type"
}
}
},
"body": null
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,44 @@
{
"indices.exists": {
"documentation": "http://www.elastic.co/guide/en/elasticsearch/reference/master/indices-exists.html",
"methods": ["HEAD"],
"methods": [ "HEAD" ],
"url": {
"path": "/{index}",
"paths": ["/{index}"],
"paths": [ "/{index}" ],
"parts": {
"index": {
"type" : "list",
"required" : true,
"description" : "A comma-separated list of indices to check"
"type": "list",
"required": true,
"description": "A comma-separated list of index names"
}
},
"params": {
"local": {
"type": "boolean",
"description": "Return local information, do not retrieve the state from master node (default: false)"
},
"ignore_unavailable": {
"type": "boolean",
"description": "Ignore unavailable indexes (default: false)"
},
"allow_no_indices": {
"type": "boolean",
"description": "Ignore if a wildcard expression resolves to no concrete indices (default: false)"
},
"expand_wildcards": {
"type" : "enum",
"options" : ["open","closed","none","all"],
"default" : "open",
"description" : "Whether to expand wildcard expression to concrete indices that are open, closed or both."
"type": "enum",
"options": [ "open", "closed", "none", "all" ],
"default": "open",
"description": "Whether wildcard expressions should get expanded to open or closed indices (default: open)"
},
"local": {
"type": "boolean",
"description": "Return local information, do not retrieve the state from master node (default: false)"
"flat_settings": {
"type": "boolean",
"description": "Return settings in flat format (default: false)"
},
"include_defaults": {
"type": "boolean",
"description": "Whether to return all default setting for each of the indices.",
"default": false
}
}
},
Expand Down
Loading

0 comments on commit bd88c03

Please sign in to comment.