Skip to content

Commit

Permalink
Take refresh IOExceptions into account when catching ACE in InternalE…
Browse files Browse the repository at this point in the history
…ngine

Since elastic#19975 we are aggressively failing with AssertionError when we catch an ACE
inside the InternalEngine. We treat everything that is neither a tragic even on
the IndexWriter or the Translog as a bug and throw an AssertionError. Yet, if the
engine hits an IOException on refresh of some sort and the IW doesn't realize it since
it's not fully under it's control we fail he engine but neither IW nor Translog are marked
as failed by tragic event while they are already closed.
This change takes the `failedEngine` exception into account and if it's set we know
that the engine failed by some other even than a tragic one and can continue.

This change also uses the `ReferenceManager#RefreshListener` interface in the engine rather
than it's concrete implementation.

Relates to elastic#19975
  • Loading branch information
s1monw committed Sep 19, 2016
1 parent 1894489 commit e113d89
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 31 deletions.
15 changes: 8 additions & 7 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Expand Up @@ -42,6 +42,7 @@
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.Accountables;
import org.apache.lucene.util.SetOnce;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -101,7 +102,7 @@ public abstract class Engine implements Closeable {
protected final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
protected final ReleasableLock readLock = new ReleasableLock(rwl.readLock());
protected final ReleasableLock writeLock = new ReleasableLock(rwl.writeLock());
protected volatile Exception failedEngine = null;
protected final SetOnce<Exception> failedEngine = new SetOnce<>();
/*
* on <tt>lastWriteNanos</tt> we use System.nanoTime() to initialize this since:
* - we use the value for figuring out if the shard / engine is active so if we startup and no write has happened yet we still consider it active
Expand Down Expand Up @@ -377,7 +378,7 @@ public final Searcher acquireSearcher(String source) throws EngineException {

protected void ensureOpen() {
if (isClosed.get()) {
throw new EngineClosedException(shardId, failedEngine);
throw new EngineClosedException(shardId, failedEngine.get());
}
}

Expand Down Expand Up @@ -670,17 +671,17 @@ public void failEngine(String reason, @Nullable Exception failure) {
if (failEngineLock.tryLock()) {
store.incRef();
try {
if (failedEngine.get() == null) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure);
return;
}
failedEngine.set((failure != null) ? failure : new IllegalStateException(reason));
try {
// we just go and close this engine - no way to recover
closeNoLock("engine failed on: [" + reason + "]");
} finally {
if (failedEngine != null) {
logger.debug((Supplier<?>) () -> new ParameterizedMessage("tried to fail engine but engine is already failed. ignoring. [{}]", reason), failure);
return;
}
logger.warn((Supplier<?>) () -> new ParameterizedMessage("failed engine [{}]", reason), failure);
// we must set a failure exception, generate one if not supplied
failedEngine = (failure != null) ? failure : new IllegalStateException(reason);
// we first mark the store as corrupted before we notify any listeners
// this must happen first otherwise we might try to reallocate so quickly
// on the same node that we don't see the corrupted marker file when
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.lucene.index.SnapshotDeletionPolicy;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.similarities.Similarity;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.common.Nullable;
Expand All @@ -34,7 +35,6 @@
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.codec.CodecService;
import org.elasticsearch.index.shard.RefreshListeners;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.index.shard.TranslogRecoveryPerformer;
import org.elasticsearch.index.store.Store;
Expand Down Expand Up @@ -68,7 +68,7 @@ public final class EngineConfig {
private final QueryCachingPolicy queryCachingPolicy;
private final long maxUnsafeAutoIdTimestamp;
@Nullable
private final RefreshListeners refreshListeners;
private final ReferenceManager.RefreshListener refreshListeners;

/**
* Index setting to change the low level lucene codec used for writing new segments.
Expand Down Expand Up @@ -112,7 +112,7 @@ public EngineConfig(OpenMode openMode, ShardId shardId, ThreadPool threadPool,
MergePolicy mergePolicy, Analyzer analyzer,
Similarity similarity, CodecService codecService, Engine.EventListener eventListener,
TranslogRecoveryPerformer translogRecoveryPerformer, QueryCache queryCache, QueryCachingPolicy queryCachingPolicy,
TranslogConfig translogConfig, TimeValue flushMergesAfter, RefreshListeners refreshListeners,
TranslogConfig translogConfig, TimeValue flushMergesAfter, ReferenceManager.RefreshListener refreshListeners,
long maxUnsafeAutoIdTimestamp) {
if (openMode == null) {
throw new IllegalArgumentException("openMode must not be null");
Expand Down Expand Up @@ -322,9 +322,9 @@ public enum OpenMode {
}

/**
* {@linkplain RefreshListeners} instance to configure.
* {@linkplain ReferenceManager.RefreshListener} instance to configure.
*/
public RefreshListeners getRefreshListeners() {
public ReferenceManager.RefreshListener getRefreshListeners() {
return refreshListeners;
}

Expand Down
Expand Up @@ -82,9 +82,6 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Function;

/**
*
*/
public class InternalEngine extends Engine {
/**
* When we last pruned expired tombstones from versionMap.deletes:
Expand Down Expand Up @@ -170,7 +167,6 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
allowCommits.compareAndSet(true, openMode != EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG);
if (engineConfig.getRefreshListeners() != null) {
searcherManager.addListener(engineConfig.getRefreshListeners());
engineConfig.getRefreshListeners().setTranslog(translog);
}
success = true;
} finally {
Expand Down Expand Up @@ -951,7 +947,7 @@ private void failOnTragicEvent(AlreadyClosedException ex) {
failEngine("already closed by tragic event on the index writer", tragedy);
} else if (translog.isOpen() == false && translog.getTragicException() != null) {
failEngine("already closed by tragic event on the translog", translog.getTragicException());
} else {
} else if (failedEngine == null) { // we are closed but the engine is not failed yet?
// this smells like a bug - we only expect ACE if we are in a fatal case ie. either translog or IW is closed by
// a tragic event or has closed itself. if that is not the case we are in a buggy state and raise an assertion error
throw new AssertionError("Unexpected AlreadyClosedException", ex);
Expand Down
Expand Up @@ -992,6 +992,7 @@ private void internalPerformTranslogRecovery(boolean skipTranslogRecovery, boole
// but we need to make sure we don't loose deletes until we are done recovering
config.setEnableGcDeletes(false);
Engine newEngine = createNewEngine(config);
refreshListeners.setTranslog(newEngine.getTranslog());
verifyNotClosed();
if (openMode == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
Expand Down
Expand Up @@ -59,7 +59,6 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
Expand Down Expand Up @@ -112,7 +111,6 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC

// the list of translog readers is guaranteed to be in order of translog generation
private final List<TranslogReader> readers = new ArrayList<>();
private volatile ScheduledFuture<?> syncScheduler;
// this is a concurrent set and is not protected by any of the locks. The main reason
// is that is being accessed by two separate classes (additions & reading are done by Translog, remove by View when closed)
private final Set<View> outstandingViews = ConcurrentCollections.newConcurrentSet();
Expand Down Expand Up @@ -312,7 +310,6 @@ public void close() throws IOException {
closeFilesIfNoPendingViews();
}
} finally {
FutureUtils.cancel(syncScheduler);
logger.debug("translog closed");
}
}
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHitCountCollector;
Expand Down Expand Up @@ -276,15 +277,16 @@ protected InternalEngine createEngine(Store store, Path translogPath) throws IOE
}

protected InternalEngine createEngine(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy) throws IOException {
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
EngineConfig config = config(indexSettings, store, translogPath, mergePolicy, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);
InternalEngine internalEngine = new InternalEngine(config);
if (config.getOpenMode() == EngineConfig.OpenMode.OPEN_INDEX_AND_TRANSLOG) {
internalEngine.recoverFromTranslog();
}
return internalEngine;
}

public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy, long maxUnsafeAutoIdTimestamp) {
public EngineConfig config(IndexSettings indexSettings, Store store, Path translogPath, MergePolicy mergePolicy,
long maxUnsafeAutoIdTimestamp, ReferenceManager.RefreshListener refreshListener) {
IndexWriterConfig iwc = newIndexWriterConfig();
TranslogConfig translogConfig = new TranslogConfig(shardId, translogPath, indexSettings, BigArrays.NON_RECYCLING_INSTANCE);
final EngineConfig.OpenMode openMode;
Expand All @@ -306,7 +308,8 @@ public void onFailedEngine(String reason, @Nullable Exception e) {
EngineConfig config = new EngineConfig(openMode, shardId, threadPool, indexSettings, null, store, createSnapshotDeletionPolicy(),
mergePolicy, iwc.getAnalyzer(), iwc.getSimilarity(), new CodecService(null, logger), listener,
new TranslogHandler(shardId.getIndexName(), logger), IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), null, maxUnsafeAutoIdTimestamp);
IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig, TimeValue.timeValueMinutes(5), refreshListener,
maxUnsafeAutoIdTimestamp);

return config;
}
Expand Down Expand Up @@ -903,7 +906,7 @@ public void testSearchResultRelease() throws Exception {
public void testSyncedFlush() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
engine.index(new Engine.Index(newUid("1"), doc));
Expand All @@ -930,7 +933,7 @@ public void testRenewSyncFlush() throws Exception {
for (int i = 0; i < iters; i++) {
try (Store store = createStore();
InternalEngine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
new LogDocMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
final String syncId = randomUnicodeOfCodepointLengthBetween(10, 20);
ParsedDocument doc = testParsedDocument("1", "1", "test", null, -1, -1, testDocumentWithTextField(), B_1, null);
Engine.Index doc1 = new Engine.Index(newUid("1"), doc);
Expand Down Expand Up @@ -1158,7 +1161,7 @@ public void testExternalVersioningIndexConflictWithFlush() {
public void testForceMerge() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(),
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) { // use log MP here we test some behavior in ESMP
new LogByteSizeMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) { // use log MP here we test some behavior in ESMP
int numDocs = randomIntBetween(10, 100);
for (int i = 0; i < numDocs; i++) {
ParsedDocument doc = testParsedDocument(Integer.toString(i), Integer.toString(i), "test", null, -1, -1, testDocument(), B_1, null);
Expand Down Expand Up @@ -1592,7 +1595,7 @@ public void testIndexWriterIFDInfoStream() throws IllegalAccessException {

public void testEnableGcDeletes() throws Exception {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
engine.config().setEnableGcDeletes(false);

// Add document
Expand Down Expand Up @@ -1728,7 +1731,7 @@ public void testMissingTranslog() throws IOException {
// expected
}
// now it should be OK.
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
EngineConfig config = copy(config(defaultSettings, store, primaryTranslogDir, newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null), EngineConfig.OpenMode.OPEN_INDEX_CREATE_TRANSLOG);
engine = new InternalEngine(config);
}

Expand Down Expand Up @@ -2103,7 +2106,7 @@ public void run() {

public void testCurrentTranslogIDisCommitted() throws IOException {
try (Store store = createStore()) {
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
EngineConfig config = config(defaultSettings, store, createTempDir(), newMergePolicy(), IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null);

// create
{
Expand Down Expand Up @@ -2368,15 +2371,15 @@ public void run() {
public void testEngineMaxTimestampIsInitialized() throws IOException {
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP))) {
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, null))) {
assertEquals(IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());

}

long maxTimestamp = Math.abs(randomLong());
try (Store store = createStore();
Engine engine = new InternalEngine(config(defaultSettings, store, createTempDir(), NoMergePolicy.INSTANCE,
maxTimestamp))) {
maxTimestamp, null))) {
assertEquals(maxTimestamp, engine.segmentsStats(false).getMaxUnsafeAutoIdTimestamp());
}
}
Expand Down Expand Up @@ -2435,4 +2438,66 @@ public static long getNumVersionLookups(InternalEngine engine) { // for other te
public static long getNumIndexVersionsLookups(InternalEngine engine) { // for other tests to access this
return engine.getNumIndexVersionsLookups();
}

public void testFailEngineOnRandomIO() throws IOException, InterruptedException {
MockDirectoryWrapper wrapper = newMockDirectory();
final Path translogPath = createTempDir("testFailEngineOnRandomIO");
try (Store store = createStore(wrapper)) {
CyclicBarrier join = new CyclicBarrier(2);
CountDownLatch start = new CountDownLatch(1);
AtomicInteger controller = new AtomicInteger(0);
EngineConfig config = config(defaultSettings, store, translogPath, newMergePolicy(),
IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP, new ReferenceManager.RefreshListener() {
@Override
public void beforeRefresh() throws IOException {
}

@Override
public void afterRefresh(boolean didRefresh) throws IOException {
int i = controller.incrementAndGet();
if (i == 1) {
throw new MockDirectoryWrapper.FakeIOException();
} else if (i == 2) {
try {
start.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
throw new AlreadyClosedException("boom");
}
}
});
InternalEngine internalEngine = new InternalEngine(config);
int docId = 0;
final ParsedDocument doc = testParsedDocument(Integer.toString(docId), Integer.toString(docId), "test", null, docId, -1,
testDocumentWithTextField(), new BytesArray("{}".getBytes(Charset.defaultCharset())), null);

Engine.Index index = randomAppendOnly(docId, doc, false);
internalEngine.index(index);
Runnable r = () -> {
try {
join.await();
} catch (Exception e) {
throw new AssertionError(e);
}
try {
internalEngine.refresh("test");
fail();
} catch (RefreshFailedEngineException | AlreadyClosedException ex) {
// fine
} finally {
start.countDown();
}

};
Thread t = new Thread(r);
Thread t1 = new Thread(r);
t.start();
t1.start();
t.join();
t1.join();
assertTrue(internalEngine.isClosed.get());
assertTrue(internalEngine.failedEngine instanceof MockDirectoryWrapper.FakeIOException);
}
}
}
Expand Up @@ -126,6 +126,7 @@ store, new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), newMe
IndexSearcher.getDefaultQueryCache(), IndexSearcher.getDefaultQueryCachingPolicy(), translogConfig,
TimeValue.timeValueMinutes(5), listeners, IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP);
engine = new InternalEngine(config);
listeners.setTranslog(engine.getTranslog());
}

@After
Expand Down

0 comments on commit e113d89

Please sign in to comment.