Skip to content

Commit

Permalink
Internal: split shard search request into a local and a transport var…
Browse files Browse the repository at this point in the history
…iant

In some cases a shard search request gets created on a node to be only used there and never sent over the transport. This commit clarifies that and creates a new base class called `ShardSearchLocalRequest` that can and will be only used locally. `ShardSearchTransportRequest` on the other hand delegates to the local version but extends `TransportRequest` and is `Streamable`, which means that it is supposed to be sent over the transport.

This way we can make the `OriginalIndices` only required (and mandatory now) in the transport variant.

Took the chance to remove an unused InternalScrollSearchRequest constructor and an empty else branch in `TransportSearchScrollQueryAndFetchAction`.

Closes elastic#7855
  • Loading branch information
javanna committed Oct 8, 2014
1 parent 476cbda commit 9e01c90
Show file tree
Hide file tree
Showing 22 changed files with 563 additions and 351 deletions.
4 changes: 2 additions & 2 deletions src/main/java/org/elasticsearch/action/OriginalIndices.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,12 @@ private OriginalIndices() {
}

public OriginalIndices(IndicesRequest indicesRequest) {
this.indices = indicesRequest.indices();
this.indicesOptions = indicesRequest.indicesOptions();
this(indicesRequest.indices(), indicesRequest.indicesOptions());
}

public OriginalIndices(String[] indices, IndicesOptions indicesOptions) {
this.indices = indices;
assert indicesOptions != null;
this.indicesOptions = indicesOptions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -179,8 +179,7 @@ protected ShardValidateQueryResponse shardOperation(ShardValidateQueryRequest re
String error = null;

DefaultSearchContext searchContext = new DefaultSearchContext(0,
new ShardSearchRequest(request).types(request.types()).nowInMillis(request.nowInMillis())
.filteringAliases(request.filteringAliases()),
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
null, indexShard.acquireSearcher("validate_query"), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -170,9 +170,7 @@ protected ShardCountResponse shardOperation(ShardCountRequest request) throws El

SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
SearchContext context = new DefaultSearchContext(0,
new ShardSearchRequest(request).types(request.types())
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
shardTarget, indexShard.acquireSearcher("count"), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

Expand Down Expand Up @@ -108,7 +108,7 @@ protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest.request).types(request.types()).nowInMillis(request.nowInMillis()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchLocalRequest(request.types(), request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API), indexService, indexShard, scriptService, cacheRecycler,
pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()));
try {
Expand All @@ -130,7 +130,7 @@ protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
IndexService indexService = indicesService.indexServiceSafe(shardRequest.shardId.getIndex());
IndexShard indexShard = indexService.shardSafe(shardRequest.shardId.id());

SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchRequest(shardRequest).types(request.types()).nowInMillis(request.nowInMillis()), null,
SearchContext.setCurrent(new DefaultSearchContext(0, new ShardSearchLocalRequest(request.types(), request.nowInMillis()), null,
indexShard.acquireSearcher(DELETE_BY_QUERY_API, IndexShard.Mode.WRITE), indexService, indexShard, scriptService,
cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter()));
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.search.query.QueryPhaseExecutionException;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
Expand Down Expand Up @@ -171,9 +171,7 @@ protected ShardExistsResponse shardOperation(ShardExistsRequest request) throws

SearchShardTarget shardTarget = new SearchShardTarget(clusterService.localNode().id(), request.shardId().getIndex(), request.shardId().id());
SearchContext context = new DefaultSearchContext(0,
new ShardSearchRequest(request).types(request.types())
.filteringAliases(request.filteringAliases())
.nowInMillis(request.nowInMillis()),
new ShardSearchLocalRequest(request.types(), request.nowInMillis(), request.filteringAliases()),
shardTarget, indexShard.acquireSearcher("exists"), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler, bigArrays, threadPool.estimatedTimeInMillisCounter());
SearchContext.setCurrent(context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.internal.DefaultSearchContext;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchLocalRequest;
import org.elasticsearch.search.rescore.RescoreSearchContext;
import org.elasticsearch.search.rescore.Rescorer;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -117,10 +117,7 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
}

SearchContext context = new DefaultSearchContext(
0,
new ShardSearchRequest(request).types(new String[]{request.type()})
.filteringAliases(request.filteringAlias())
.nowInMillis(request.nowInMillis),
0, new ShardSearchLocalRequest(new String[]{request.type()}, request.nowInMillis, request.filteringAlias()),
null, result.searcher(), indexService, indexShard,
scriptService, cacheRecycler, pageCacheRecycler,
bigArrays, threadPool.estimatedTimeInMillisCounter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -68,7 +67,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.elasticsearch.search.dfs.DfsSearchResult;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -74,7 +74,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<DfsSearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -84,7 +84,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<DfsSearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<DfsSearchResult> listener) {
searchService.sendExecuteDfs(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.internal.InternalScrollSearchRequest;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;

import java.io.IOException;
import java.util.Map;
Expand All @@ -46,11 +46,8 @@
*/
public abstract class TransportSearchHelper {

public static ShardSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis, boolean useSlowScroll) {
ShardSearchRequest shardRequest = new ShardSearchRequest(request, shardRouting, numberOfShards, useSlowScroll);
shardRequest.filteringAliases(filteringAliases);
shardRequest.nowInMillis(nowInMillis);
return shardRequest;
public static ShardSearchTransportRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis, boolean useSlowScroll) {
return new ShardSearchTransportRequest(request, shardRouting, numberOfShards, useSlowScroll, filteringAliases, nowInMillis);
}

public static InternalScrollSearchRequest internalScrollSearchRequest(long id, SearchScrollRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.QueryFetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.threadpool.ThreadPool;

import static org.elasticsearch.action.search.type.TransportSearchHelper.buildScrollId;
Expand Down Expand Up @@ -67,7 +67,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QueryFetchSearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QueryFetchSearchResult> listener) {
searchService.sendExecuteFetch(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.fetch.FetchSearchResult;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -78,7 +78,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResultProvider> listener) {
searchService.sendExecuteQuery(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.FetchSearchResultProvider;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -65,7 +65,7 @@ protected String firstPhaseName() {
}

@Override
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<QuerySearchResult> listener) {
protected void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<QuerySearchResult> listener) {
searchService.sendExecuteScan(node, request, listener);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ public void start() {
if (counter.decrementAndGet() == 0) {
finishHim();
}
} else {
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.elasticsearch.search.controller.SearchPhaseController;
import org.elasticsearch.search.fetch.ShardFetchSearchRequest;
import org.elasticsearch.search.internal.InternalSearchResponse;
import org.elasticsearch.search.internal.ShardSearchRequest;
import org.elasticsearch.search.internal.ShardSearchTransportRequest;
import org.elasticsearch.search.query.QuerySearchResult;
import org.elasticsearch.search.query.QuerySearchResultProvider;
import org.elasticsearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -357,7 +357,7 @@ protected ShardFetchSearchRequest createFetchRequest(QuerySearchResult queryResu
}
}

protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchRequest request, SearchServiceListener<FirstResult> listener);
protected abstract void sendExecuteFirstPhase(DiscoveryNode node, ShardSearchTransportRequest request, SearchServiceListener<FirstResult> listener);

protected final void processFirstPhaseResult(int shardIndex, ShardRouting shard, FirstResult result) {
firstResults.set(shardIndex, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -415,13 +415,9 @@ private static boolean verifyCacheSerializationSameAsQueryResult(BytesReference
private static Key buildKey(ShardSearchRequest request, SearchContext context) throws Exception {
// TODO: for now, this will create different keys for different JSON order
// TODO: tricky to get around this, need to parse and order all, which can be expensive
BytesStreamOutput out = new BytesStreamOutput();
request.writeTo(out, true);
// copy it over, most requests are small, we might as well copy to make sure we are not sliced...
// we could potentially keep it without copying, but then pay the price of extra unused bytes up to a page
return new Key(context.indexShard(),
((DirectoryReader) context.searcher().getIndexReader()).getVersion(),
out.bytes().copyBytesArray());
request.cacheKey());
}

/**
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/org/elasticsearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -937,11 +937,8 @@ public void run() {
SearchContext context = null;
try {
long now = System.nanoTime();
ShardSearchRequest request = new ShardSearchRequest(indexShard.shardId().index().name(), indexShard.shardId().id(), indexMetaData.numberOfShards(),
SearchType.QUERY_THEN_FETCH)
.source(entry.source())
.types(entry.types())
.queryCache(entry.queryCache());
ShardSearchRequest request = new ShardSearchLocalRequest(indexShard.shardId(), indexMetaData.numberOfShards(),
SearchType.QUERY_THEN_FETCH, entry.source(), entry.types(), entry.queryCache());
context = createContext(request, warmerContext.searcher());
// if we use sort, we need to do query to sort on it and load relevant field data
// if not, we might as well use COUNT (and cache if needed)
Expand Down
Loading

0 comments on commit 9e01c90

Please sign in to comment.