From 2e0c7a5efb553ca84baf1711b813fa5e5a7a4ed3 Mon Sep 17 00:00:00 2001 From: Shay Banon Date: Wed, 18 Jan 2012 15:08:59 +0200 Subject: [PATCH] 0.18.7 Failure exception while executing a valid query after an invalid query, closes #1617. --- .../search/type/TransportSearchCache.java | 16 --------- ...TransportSearchDfsQueryAndFetchAction.java | 2 +- ...ransportSearchDfsQueryThenFetchAction.java | 4 +-- .../search/type/TransportSearchHelper.java | 15 -------- .../TransportSearchQueryThenFetchAction.java | 2 +- ...nsportSearchScrollQueryAndFetchAction.java | 34 ++++++++++++++---- ...sportSearchScrollQueryThenFetchAction.java | 34 ++++++++++++++---- .../type/TransportSearchScrollScanAction.java | 35 +++++++++++++++---- .../type/TransportSearchTypeAction.java | 30 ++++++++++------ 9 files changed, 108 insertions(+), 64 deletions(-) diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java index d7fe85deaa8db..fa5a76d942435 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchCache.java @@ -19,7 +19,6 @@ package org.elasticsearch.action.search.type; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.search.SearchShardTarget; @@ -37,8 +36,6 @@ */ public class TransportSearchCache { - private final Queue> cacheShardFailures = new LinkedTransferQueue>(); - private final Queue> cacheDfsResults = new LinkedTransferQueue>(); private final Queue> cacheQueryResults = new LinkedTransferQueue>(); @@ -48,19 +45,6 @@ public class TransportSearchCache { private final Queue> cacheQueryFetchResults = new LinkedTransferQueue>(); - public Collection obtainShardFailures() { - Collection shardFailures; - while ((shardFailures = cacheShardFailures.poll()) == null) { - cacheShardFailures.offer(new LinkedTransferQueue()); - } - return shardFailures; - } - - public void releaseShardFailures(Collection shardFailures) { - shardFailures.clear(); - cacheShardFailures.offer(shardFailures); - } - public Collection obtainDfsResults() { Collection dfsSearchResults; while ((dfsSearchResults = cacheDfsResults.poll()) == null) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java index 115e51f96b6e3..f82ecd4e89f8a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryAndFetchAction.java @@ -147,7 +147,7 @@ void executeSecondPhase(final DfsSearchResult dfsResult, final AtomicInteger cou if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java index 41858bb44ebfb..e90f0559fea59 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchDfsQueryThenFetchAction.java @@ -154,7 +154,7 @@ void executeQuery(final DfsSearchResult dfsResult, final AtomicInteger counter, if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, querySearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -241,7 +241,7 @@ void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger count if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java index 5bce195f5eac2..18defc3606360 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchHelper.java @@ -24,7 +24,6 @@ import org.elasticsearch.action.search.SearchRequest; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.common.Base64; import org.elasticsearch.common.Nullable; @@ -47,20 +46,6 @@ public abstract class TransportSearchHelper { - /** - * Builds the shard failures, and releases the cache (meaning this should only be called once!). - */ - public static ShardSearchFailure[] buildShardFailures(Collection shardFailures, TransportSearchCache searchCache) { - ShardSearchFailure[] ret; - if (shardFailures.isEmpty()) { - ret = ShardSearchFailure.EMPTY_ARRAY; - } else { - ret = shardFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); - } - searchCache.releaseShardFailures(shardFailures); - return ret; - } - public static InternalSearchRequest internalSearchRequest(ShardRouting shardRouting, int numberOfShards, SearchRequest request, String[] filteringAliases, long nowInMillis) { InternalSearchRequest internalRequest = new InternalSearchRequest(shardRouting, numberOfShards, request.searchType()); internalRequest.source(request.source(), request.sourceOffset(), request.sourceLength()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java index 67bf4377765fd..95ab8e5efc55a 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchQueryThenFetchAction.java @@ -155,7 +155,7 @@ void executeFetch(final SearchShardTarget shardTarget, final AtomicInteger count if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute fetch phase", t, fetchSearchRequest.id()); } - AsyncAction.this.shardFailures.add(new ShardSearchFailure(t)); + AsyncAction.this.addShardFailure(new ShardSearchFailure(t)); successulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java index 596eb4ddf4d94..e5d544d6405d5 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryAndFetchAction.java @@ -20,7 +20,12 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchOperationThreading; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -28,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -37,7 +43,6 @@ import org.elasticsearch.search.internal.InternalSearchResponse; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -83,7 +88,7 @@ private class AsyncAction { private final DiscoveryNodes nodes; - protected final Collection shardFailures = searchCache.obtainShardFailures(); + private volatile LinkedTransferQueue shardFailures; private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); @@ -102,6 +107,23 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action this.counter = new AtomicInteger(scrollId.context().length); } + protected final ShardSearchFailure[] buildShardFailures() { + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); + } + public void start() { if (scrollId.context().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); @@ -187,7 +209,7 @@ private void executePhase(DiscoveryNode node, final long searchId) { if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -200,7 +222,7 @@ private void finishHim() { try { innerFinishHim(); } catch (Exception e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); } } @@ -213,7 +235,7 @@ private void innerFinishHim() { } searchCache.releaseQueryFetchResults(queryFetchResults); listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), - System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + System.currentTimeMillis() - startTime, buildShardFailures())); } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java index f76cf0d1ed20a..4b913a3535488 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollQueryThenFetchAction.java @@ -20,7 +20,12 @@ package org.elasticsearch.action.search.type; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchOperationThreading; +import org.elasticsearch.action.search.SearchPhaseExecutionException; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -29,6 +34,7 @@ import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -41,7 +47,6 @@ import org.elasticsearch.search.query.QuerySearchResultProvider; import org.elasticsearch.threadpool.ThreadPool; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -87,7 +92,7 @@ private class AsyncAction { private final DiscoveryNodes nodes; - protected final Collection shardFailures = searchCache.obtainShardFailures(); + private volatile LinkedTransferQueue shardFailures; private final Map queryResults = searchCache.obtainQueryResults(); @@ -107,6 +112,23 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action this.successfulOps = new AtomicInteger(scrollId.context().length); } + protected final ShardSearchFailure[] buildShardFailures() { + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); + } + public void start() { if (scrollId.context().length == 0) { listener.onFailure(new SearchPhaseExecutionException("query", "no nodes to search on", null)); @@ -179,7 +201,7 @@ private void executeQueryPhase(final AtomicInteger counter, DiscoveryNode node, if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { executeFetchPhase(); @@ -229,7 +251,7 @@ private void finishHim() { try { innerFinishHim(); } catch (Exception e) { - listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache))); + listener.onFailure(new ReduceSearchPhaseException("fetch", "", e, buildShardFailures())); } } @@ -240,7 +262,7 @@ private void innerFinishHim() { scrollId = request.scrollId(); } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), - System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + System.currentTimeMillis() - startTime, buildShardFailures())); searchCache.releaseQueryResults(queryResults); searchCache.releaseFetchResults(fetchResults); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java index 4def8bea7b7b3..b8d24d5f8fa1b 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchScrollScanAction.java @@ -21,7 +21,11 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.search.*; +import org.elasticsearch.action.search.ReduceSearchPhaseException; +import org.elasticsearch.action.search.SearchOperationThreading; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequest; +import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.ClusterService; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -29,6 +33,7 @@ import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; import org.elasticsearch.search.action.SearchServiceTransportAction; @@ -41,7 +46,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.io.IOException; -import java.util.Collection; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -87,7 +91,7 @@ private class AsyncAction { private final DiscoveryNodes nodes; - protected final Collection shardFailures = searchCache.obtainShardFailures(); + private volatile LinkedTransferQueue shardFailures; private final Map queryFetchResults = searchCache.obtainQueryFetchResults(); @@ -106,11 +110,28 @@ private AsyncAction(SearchScrollRequest request, ParsedScrollId scrollId, Action this.counter = new AtomicInteger(scrollId.context().length); } + protected final ShardSearchFailure[] buildShardFailures() { + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); + } + public void start() { if (scrollId.context().length == 0) { final InternalSearchResponse internalResponse = new InternalSearchResponse(new InternalSearchHits(InternalSearchHits.EMPTY, Long.parseLong(this.scrollId.attributes().get("total_hits")), 0.0f), null, false); searchCache.releaseQueryFetchResults(queryFetchResults); - listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, TransportSearchHelper.buildShardFailures(shardFailures, searchCache))); + listener.onResponse(new SearchResponse(internalResponse, request.scrollId(), 0, 0, 0l, buildShardFailures())); return; } @@ -193,7 +214,7 @@ private void executePhase(DiscoveryNode node, final long searchId) { if (logger.isDebugEnabled()) { logger.debug("[{}] Failed to execute query phase", t, searchId); } - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); successfulOps.decrementAndGet(); if (counter.decrementAndGet() == 0) { finishHim(); @@ -206,7 +227,7 @@ private void finishHim() { try { innerFinishHim(); } catch (Exception e) { - ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures(shardFailures, searchCache)); + ReduceSearchPhaseException failure = new ReduceSearchPhaseException("fetch", "", e, buildShardFailures()); if (logger.isDebugEnabled()) { logger.debug("failed to reduce search", failure); } @@ -246,7 +267,7 @@ private void innerFinishHim() throws IOException { scrollId = TransportSearchHelper.buildScrollId(this.scrollId.type(), queryFetchResults.values(), this.scrollId.attributes()); // continue moving the total_hits } listener.onResponse(new SearchResponse(internalResponse, scrollId, this.scrollId.context().length, successfulOps.get(), - System.currentTimeMillis() - startTime, buildShardFailures(shardFailures, searchCache))); + System.currentTimeMillis() - startTime, buildShardFailures())); } } } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java index 2dfb0af68d730..af4eaa2104af3 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/search/type/TransportSearchTypeAction.java @@ -38,6 +38,7 @@ import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.trove.ExtTIntArrayList; +import org.elasticsearch.common.util.concurrent.jsr166y.LinkedTransferQueue; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; import org.elasticsearch.search.action.SearchServiceListener; @@ -49,7 +50,6 @@ import org.elasticsearch.threadpool.ThreadPool; import java.util.Arrays; -import java.util.Collection; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; @@ -97,7 +97,7 @@ protected abstract class BaseAsyncAction private final AtomicInteger totalOps = new AtomicInteger(); - protected final Collection shardFailures = searchCache.obtainShardFailures(); + private volatile LinkedTransferQueue shardFailures; protected volatile ShardDoc[] sortedShardList; @@ -249,9 +249,9 @@ void onFirstPhaseResult(@Nullable ShardRouting shard, final ShardIterator shardI // no more shards, add a failure if (t == null) { // no active shards - shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); + addShardFailure(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); } else { - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); } if (successulOps.get() == 0) { // no successful ops, raise an exception @@ -291,9 +291,9 @@ void onFirstPhaseResult(@Nullable ShardRouting shard, final ShardIterator shardI } if (t == null) { // no active shards - shardFailures.add(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); + addShardFailure(new ShardSearchFailure("No active shards", new SearchShardTarget(null, shardIt.shardId().index().name(), shardIt.shardId().id()))); } else { - shardFailures.add(new ShardSearchFailure(t)); + addShardFailure(new ShardSearchFailure(t)); } } } @@ -306,11 +306,21 @@ protected final long buildTookInMillis() { return System.currentTimeMillis() - startTime; } - /** - * Builds the shard failures, and releases the cache (meaning this should only be called once!). - */ protected final ShardSearchFailure[] buildShardFailures() { - return TransportSearchHelper.buildShardFailures(shardFailures, searchCache); + LinkedTransferQueue localFailures = shardFailures; + if (localFailures == null) { + return ShardSearchFailure.EMPTY_ARRAY; + } + return localFailures.toArray(ShardSearchFailure.EMPTY_ARRAY); + } + + // we do our best to return the shard failures, but its ok if its not fully concurrently safe + // we simply try and return as much as possible + protected final void addShardFailure(ShardSearchFailure failure) { + if (shardFailures == null) { + shardFailures = new LinkedTransferQueue(); + } + shardFailures.add(failure); } /**