diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 0d2fba6ea5772..88e4b06421e0c 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -51,6 +51,7 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.DirectoryStream; import java.nio.file.Files; @@ -440,7 +441,7 @@ public Location add(Operation operation) throws IOException { if (config.isSyncOnEachOperation()) { current.sync(); } - assert current.assertBytesAtLocation(location, bytes); + assert assertBytesAtLocation(location, bytes); return location; } } catch (AlreadyClosedException | IOException ex) { @@ -454,6 +455,13 @@ public Location add(Operation operation) throws IOException { } } + boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException { + // tests can override this + ByteBuffer buffer = ByteBuffer.allocate(location.size); + current.readBytes(buffer, location.translogLocation); + return new BytesArray(buffer.array()).equals(expectedBytes); + } + /** * Snapshots the current transaction log allowing to safely iterate over the snapshot. * Snapshots are fixed in time and will not be updated with future operations. diff --git a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java index 867b94a85053d..6a4d40ec54501 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java +++ b/core/src/main/java/org/elasticsearch/index/translog/TranslogWriter.java @@ -218,11 +218,6 @@ public ImmutableTranslogReader immutableReader() throws TranslogException { } } - boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException { - ByteBuffer buffer = ByteBuffer.allocate(location.size); - readBytes(buffer, location.translogLocation); - return new BytesArray(buffer.array()).equals(expectedBytes); - } private long getWrittenOffset() throws IOException { return channelReference.getChannel().position(); diff --git a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java index b31aafa432d34..ddd8c2f649f4e 100644 --- a/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java +++ b/core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java @@ -32,6 +32,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.io.FileSystemUtils; import org.elasticsearch.common.io.stream.BytesStreamOutput; import org.elasticsearch.common.io.stream.StreamInput; @@ -1316,7 +1317,7 @@ protected void afterAdd() throws IOException { public void testFailFlush() throws IOException { Path tempDir = createTempDir(); - final AtomicBoolean fail = new AtomicBoolean(); + final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); Translog translog = getFailableTranslog(fail, config); @@ -1336,9 +1337,13 @@ public void testFailFlush() throws IOException { assertFalse(translog.isOpen()); assertEquals("__FAKE__ no space left on device", ex.getMessage()); } - fail.set(randomBoolean()); + if (randomBoolean()) { + fail.failAlways(); + } else { + fail.failNever(); + } } - fail.set(false); + fail.failNever(); if (randomBoolean()) { try { locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8"))))); @@ -1409,13 +1414,13 @@ public void testTranslogOpsCountIsCorrect() throws IOException { public void testTragicEventCanBeAnyException() throws IOException { Path tempDir = createTempDir(); - final AtomicBoolean fail = new AtomicBoolean(); + final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation()); Translog translog = getFailableTranslog(fail, config, false, true); LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); - fail.set(true); + fail.failAlways(); try { Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8")))); if (randomBoolean()) { @@ -1436,7 +1441,7 @@ public void testTragicEventCanBeAnyException() throws IOException { public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException { Path tempDir = createTempDir(); - final AtomicBoolean fail = new AtomicBoolean(false); + final FailSwitch fail = new FailSwitch(); TranslogConfig config = getTranslogConfig(tempDir); Translog translog = getFailableTranslog(fail, config); @@ -1473,7 +1478,7 @@ protected void afterAdd() throws IOException { // this holds a reference to the current tlog channel such that it's not closed // if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip // otherwise our assertions here are off by one sometimes. - fail.set(true); + fail.failAlways(); for (int i = 0; i < threadCount; i++) { threads[i].join(); } @@ -1525,11 +1530,40 @@ protected void afterAdd() throws IOException { } } - private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException { + private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException { return getFailableTranslog(fail, config, randomBoolean(), false); } - private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException { + private static class FailSwitch { + private volatile int failRate; + private volatile boolean onceFailedFailAlways = false; + public boolean fail() { + boolean fail = randomIntBetween(1, 100) <= failRate; + if (fail && onceFailedFailAlways) { + failAlways(); + } + return fail; + } + + public void failNever() { + failRate = 0; + } + + public void failAlways() { + failRate = 100; + } + + public void failRandomly() { + failRate = randomIntBetween(1, 100); + } + + public void onceFailedFailAlways() { + onceFailedFailAlways = true; + } + } + + + private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException { return new Translog(config) { @Override TranslogWriter.ChannelFactory getChannelFactory() { @@ -1539,23 +1573,56 @@ TranslogWriter.ChannelFactory getChannelFactory() { @Override public FileChannel open(Path file) throws IOException { FileChannel channel = factory.open(file); - return new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel); + boolean success = false; + try { + ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel); + success = true; + return throwingFileChannel; + } finally { + if (success == false) { + IOUtils.closeWhileHandlingException(channel); + } + } } }; } + + @Override + protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException { + return true; // we don't wanna fail in the assert + } }; } public static class ThrowingFileChannel extends FilterFileChannel { - private final AtomicBoolean fail; + private final FailSwitch fail; private final boolean partialWrite; private final boolean throwUnknownException; - public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) { + public ThrowingFileChannel(FailSwitch fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) throws MockDirectoryWrapper.FakeIOException { super(delegate); this.fail = fail; this.partialWrite = partialWrite; this.throwUnknownException = throwUnknownException; + if (fail.fail()) { + throw new MockDirectoryWrapper.FakeIOException(); + } + } + + @Override + public int read(ByteBuffer dst) throws IOException { + if (fail.fail()) { + throw new MockDirectoryWrapper.FakeIOException(); + } + return super.read(dst); + } + + @Override + public long read(ByteBuffer[] dsts, int offset, int length) throws IOException { + if (fail.fail()) { + throw new MockDirectoryWrapper.FakeIOException(); + } + return super.read(dsts, offset, length); } @Override @@ -1570,7 +1637,7 @@ public int write(ByteBuffer src, long position) throws IOException { public int write(ByteBuffer src) throws IOException { - if (fail.get()) { + if (fail.fail()) { if (partialWrite) { if (src.hasRemaining()) { final int pos = src.position(); @@ -1590,6 +1657,22 @@ public int write(ByteBuffer src) throws IOException { } return super.write(src); } + + @Override + public void force(boolean metaData) throws IOException { + if (fail.fail()) { + throw new MockDirectoryWrapper.FakeIOException(); + } + super.force(metaData); + } + + @Override + public long position() throws IOException { + if (fail.fail()) { + throw new MockDirectoryWrapper.FakeIOException(); + } + return super.position(); + } } private static final class UnknownException extends RuntimeException { @@ -1711,4 +1794,78 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException { } } + /** + * This test adds operations to the translog which might randomly throw an IOException. The only thing this test verifies is + * that we can, after we hit an exception, open and recover the translog successfully and retrieve all successfully synced operations + * from the transaction log. + */ + public void testWithRandomException() throws IOException { + final int runs = randomIntBetween(5, 10); + for (int run = 0; run < runs; run++) { + Path tempDir = createTempDir(); + final FailSwitch fail = new FailSwitch(); + fail.failRandomly(); + TranslogConfig config = getTranslogConfig(tempDir); + final int numOps = randomIntBetween(100, 200); + List syncedDocs = new ArrayList<>(); + List unsynced = new ArrayList<>(); + if (randomBoolean()) { + fail.onceFailedFailAlways(); + } + try { + final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false); + try { + LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly + for (int opsAdded = 0; opsAdded < numOps; opsAdded++) { + String doc = lineFileDocs.nextDoc().toString(); + failableTLog.add(new Translog.Index("test", "" + opsAdded, doc.getBytes(Charset.forName("UTF-8")))); + unsynced.add(doc); + if (randomBoolean()) { + failableTLog.sync(); + syncedDocs.addAll(unsynced); + unsynced.clear(); + } + if (randomFloat() < 0.1) { + failableTLog.sync(); // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails + syncedDocs.addAll(unsynced); + unsynced.clear(); + if (randomBoolean()) { + failableTLog.prepareCommit(); + } + failableTLog.commit(); + syncedDocs.clear(); + } + } + } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { + // fair enough + } catch (IOException ex) { + assertEquals(ex.getMessage(), "__FAKE__ no space left on device"); + } finally { + config.setTranslogGeneration(failableTLog.getGeneration()); + IOUtils.closeWhileHandlingException(failableTLog); + } + } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { + // failed - that's ok, we didn't even create it + } + // now randomly open this failing tlog again just to make sure we can also recover from failing during recovery + if (randomBoolean()) { + try { + IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false)); + } catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) { + // failed - that's ok, we didn't even create it + } + } + + try (Translog translog = new Translog(config)) { + try (Translog.Snapshot snapshot = translog.newSnapshot()) { + assertEquals(syncedDocs.size(), snapshot.estimatedTotalOperations()); + for (int i = 0; i < syncedDocs.size(); i++) { + Translog.Operation next = snapshot.next(); + assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8()); + assertNotNull("operation " + i + " must be non-null", next); + } + } + } + } + } }