Skip to content

Commit

Permalink
[ENGINE] Wait until engine is started up when acquireing searcher
Browse files Browse the repository at this point in the history
Today we have a small window where a searcher can be acquired but the
engine is in the state of starting up. This causes a NPE triggering a
shard failure if we are fast enough. This commit fixes this situation
gracefully.

Closes elastic#7455
  • Loading branch information
s1monw committed Aug 26, 2014
1 parent 11d3c7e commit 0296da6
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 5 deletions.
Expand Up @@ -689,12 +689,23 @@ public void delete(DeleteByQuery delete) throws EngineException {
@Override
public final Searcher acquireSearcher(String source) throws EngineException {
boolean success = false;
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
store.incRef();
try {
/* Acquire order here is store -> manager since we need
* to make sure that the store is not closed before
* the searcher is acquired. */
store.incRef();
final SearcherManager manager = this.searcherManager;
SearcherManager manager = this.searcherManager;
if (manager == null) {
ensureOpen();
try (InternalLock _ = this.readLock.acquire()) {
// we might start up right now and the searcherManager is not initialized
// we take the read lock and retry again since write lock is taken
// while start() is called and otherwise the ensureOpen() call will
// barf.
manager = this.searcherManager;
assert manager != null : "SearcherManager is null but shouldn't";
}
}
/* This might throw NPE but that's fine we will run ensureOpen()
* in the catch block and throw the right exception */
final IndexSearcher searcher = manager.acquire();
Expand All @@ -707,6 +718,8 @@ public final Searcher acquireSearcher(String source) throws EngineException {
manager.release(searcher);
}
}
} catch (EngineClosedException ex) {
throw ex;
} catch (Throwable ex) {
ensureOpen(); // throw EngineCloseException here if we are already closed
logger.error("failed to acquire searcher, source {}", ex, source);
Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.lucene.index.IndexDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.bytes.BytesArray;
Expand Down Expand Up @@ -81,6 +82,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import static org.elasticsearch.common.settings.ImmutableSettings.Builder.EMPTY_SETTINGS;
Expand Down Expand Up @@ -322,6 +324,33 @@ public void testSegments() throws Exception {
assertThat(segments.get(2).isCompound(), equalTo(true));
}

public void testStartAndAcquireConcurrently() {
ConcurrentMergeSchedulerProvider mergeSchedulerProvider = new ConcurrentMergeSchedulerProvider(shardId, EMPTY_SETTINGS, threadPool, new IndexSettingsService(shardId.index(), EMPTY_SETTINGS));
final Engine engine = createEngine(engineSettingsService, store, createTranslog(), mergeSchedulerProvider);
final AtomicBoolean startPending = new AtomicBoolean(true);
Thread thread = new Thread() {
public void run() {
try {
Thread.yield();
engine.start();
} finally {
startPending.set(false);
}

}
};
thread.start();
while(startPending.get()) {
try {
engine.acquireSearcher("foobar").close();
break;
} catch (EngineClosedException ex) {
// all good
}
}
engine.close();
}


@Test
public void testSegmentsWithMergeFlag() throws Exception {
Expand Down

0 comments on commit 0296da6

Please sign in to comment.