Skip to content

Commit

Permalink
[TEST] Test that translog can recover after random IOException
Browse files Browse the repository at this point in the history
This commit adds a new test that can throw an IOException at any point in time
and ensures that all previously synced documents can be successfully recovered after hitting
an excepiton.

Relates to elastic#15788
  • Loading branch information
s1monw committed Jan 7, 2016
1 parent 132df10 commit e7f9d68
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 19 deletions.
Expand Up @@ -51,6 +51,7 @@
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
Expand Down Expand Up @@ -440,7 +441,7 @@ public Location add(Operation operation) throws IOException {
if (config.isSyncOnEachOperation()) {
current.sync();
}
assert current.assertBytesAtLocation(location, bytes);
assert assertBytesAtLocation(location, bytes);
return location;
}
} catch (AlreadyClosedException | IOException ex) {
Expand All @@ -454,6 +455,13 @@ public Location add(Operation operation) throws IOException {
}
}

boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
// tests can override this
ByteBuffer buffer = ByteBuffer.allocate(location.size);
current.readBytes(buffer, location.translogLocation);
return new BytesArray(buffer.array()).equals(expectedBytes);
}

/**
* Snapshots the current transaction log allowing to safely iterate over the snapshot.
* Snapshots are fixed in time and will not be updated with future operations.
Expand Down
Expand Up @@ -218,11 +218,6 @@ public ImmutableTranslogReader immutableReader() throws TranslogException {
}
}

boolean assertBytesAtLocation(Translog.Location location, BytesReference expectedBytes) throws IOException {
ByteBuffer buffer = ByteBuffer.allocate(location.size);
readBytes(buffer, location.translogLocation);
return new BytesArray(buffer.array()).equals(expectedBytes);
}

private long getWrittenOffset() throws IOException {
return channelReference.getChannel().position();
Expand Down
183 changes: 170 additions & 13 deletions core/src/test/java/org/elasticsearch/index/translog/TranslogTests.java
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.FileSystemUtils;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -1316,7 +1317,7 @@ protected void afterAdd() throws IOException {

public void testFailFlush() throws IOException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean();
final FailSwitch fail = new FailSwitch();
TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = getFailableTranslog(fail, config);

Expand All @@ -1336,9 +1337,13 @@ public void testFailFlush() throws IOException {
assertFalse(translog.isOpen());
assertEquals("__FAKE__ no space left on device", ex.getMessage());
}
fail.set(randomBoolean());
if (randomBoolean()) {
fail.failAlways();
} else {
fail.failNever();
}
}
fail.set(false);
fail.failNever();
if (randomBoolean()) {
try {
locations.add(translog.add(new Translog.Index("test", "" + opsSynced, Integer.toString(opsSynced).getBytes(Charset.forName("UTF-8")))));
Expand Down Expand Up @@ -1409,13 +1414,13 @@ public void testTranslogOpsCountIsCorrect() throws IOException {

public void testTragicEventCanBeAnyException() throws IOException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean();
final FailSwitch fail = new FailSwitch();
TranslogConfig config = getTranslogConfig(tempDir);
assumeFalse("this won't work if we sync on any op", config.isSyncOnEachOperation());
Translog translog = getFailableTranslog(fail, config, false, true);
LineFileDocs lineFileDocs = new LineFileDocs(random()); // writes pretty big docs so we cross buffer boarders regularly
translog.add(new Translog.Index("test", "1", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
fail.set(true);
fail.failAlways();
try {
Translog.Location location = translog.add(new Translog.Index("test", "2", lineFileDocs.nextDoc().toString().getBytes(Charset.forName("UTF-8"))));
if (randomBoolean()) {
Expand All @@ -1436,7 +1441,7 @@ public void testTragicEventCanBeAnyException() throws IOException {

public void testFatalIOExceptionsWhileWritingConcurrently() throws IOException, InterruptedException {
Path tempDir = createTempDir();
final AtomicBoolean fail = new AtomicBoolean(false);
final FailSwitch fail = new FailSwitch();

TranslogConfig config = getTranslogConfig(tempDir);
Translog translog = getFailableTranslog(fail, config);
Expand Down Expand Up @@ -1473,7 +1478,7 @@ protected void afterAdd() throws IOException {
// this holds a reference to the current tlog channel such that it's not closed
// if we hit a tragic event. this is important to ensure that asserts inside the Translog#add doesn't trip
// otherwise our assertions here are off by one sometimes.
fail.set(true);
fail.failAlways();
for (int i = 0; i < threadCount; i++) {
threads[i].join();
}
Expand Down Expand Up @@ -1525,11 +1530,40 @@ protected void afterAdd() throws IOException {
}
}

private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config) throws IOException {
private Translog getFailableTranslog(FailSwitch fail, final TranslogConfig config) throws IOException {
return getFailableTranslog(fail, config, randomBoolean(), false);
}

private Translog getFailableTranslog(final AtomicBoolean fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
private static class FailSwitch {
private volatile int failRate;
private volatile boolean onceFailedFailAlways = false;
public boolean fail() {
boolean fail = randomIntBetween(1, 100) <= failRate;
if (fail && onceFailedFailAlways) {
failAlways();
}
return fail;
}

public void failNever() {
failRate = 0;
}

public void failAlways() {
failRate = 100;
}

public void failRandomly() {
failRate = randomIntBetween(1, 100);
}

public void onceFailedFailAlways() {
onceFailedFailAlways = true;
}
}


private Translog getFailableTranslog(final FailSwitch fail, final TranslogConfig config, final boolean paritalWrites, final boolean throwUnknownException) throws IOException {
return new Translog(config) {
@Override
TranslogWriter.ChannelFactory getChannelFactory() {
Expand All @@ -1539,23 +1573,56 @@ TranslogWriter.ChannelFactory getChannelFactory() {
@Override
public FileChannel open(Path file) throws IOException {
FileChannel channel = factory.open(file);
return new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
boolean success = false;
try {
ThrowingFileChannel throwingFileChannel = new ThrowingFileChannel(fail, paritalWrites, throwUnknownException, channel);
success = true;
return throwingFileChannel;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(channel);
}
}
}
};
}

@Override
protected boolean assertBytesAtLocation(Location location, BytesReference expectedBytes) throws IOException {
return true; // we don't wanna fail in the assert
}
};
}

public static class ThrowingFileChannel extends FilterFileChannel {
private final AtomicBoolean fail;
private final FailSwitch fail;
private final boolean partialWrite;
private final boolean throwUnknownException;

public ThrowingFileChannel(AtomicBoolean fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) {
public ThrowingFileChannel(FailSwitch fail, boolean partialWrite, boolean throwUnknownException, FileChannel delegate) throws MockDirectoryWrapper.FakeIOException {
super(delegate);
this.fail = fail;
this.partialWrite = partialWrite;
this.throwUnknownException = throwUnknownException;
if (fail.fail()) {
throw new MockDirectoryWrapper.FakeIOException();
}
}

@Override
public int read(ByteBuffer dst) throws IOException {
if (fail.fail()) {
throw new MockDirectoryWrapper.FakeIOException();
}
return super.read(dst);
}

@Override
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
if (fail.fail()) {
throw new MockDirectoryWrapper.FakeIOException();
}
return super.read(dsts, offset, length);
}

@Override
Expand All @@ -1570,7 +1637,7 @@ public int write(ByteBuffer src, long position) throws IOException {


public int write(ByteBuffer src) throws IOException {
if (fail.get()) {
if (fail.fail()) {
if (partialWrite) {
if (src.hasRemaining()) {
final int pos = src.position();
Expand All @@ -1590,6 +1657,22 @@ public int write(ByteBuffer src) throws IOException {
}
return super.write(src);
}

@Override
public void force(boolean metaData) throws IOException {
if (fail.fail()) {
throw new MockDirectoryWrapper.FakeIOException();
}
super.force(metaData);
}

@Override
public long position() throws IOException {
if (fail.fail()) {
throw new MockDirectoryWrapper.FakeIOException();
}
return super.position();
}
}

private static final class UnknownException extends RuntimeException {
Expand Down Expand Up @@ -1711,4 +1794,78 @@ public void testRecoverWithUnbackedNextGenAndFutureFile() throws IOException {
}
}

/**
* This test adds operations to the translog which might randomly throw an IOException. The only thing this test verifies is
* that we can, after we hit an exception, open and recover the translog successfully and retrieve all successfully synced operations
* from the transaction log.
*/
public void testWithRandomException() throws IOException {
final int runs = randomIntBetween(5, 10);
for (int run = 0; run < runs; run++) {
Path tempDir = createTempDir();
final FailSwitch fail = new FailSwitch();
fail.failRandomly();
TranslogConfig config = getTranslogConfig(tempDir);
final int numOps = randomIntBetween(100, 200);
List<String> syncedDocs = new ArrayList<>();
List<String> unsynced = new ArrayList<>();
if (randomBoolean()) {
fail.onceFailedFailAlways();
}
try {
final Translog failableTLog = getFailableTranslog(fail, config, randomBoolean(), false);
try {
LineFileDocs lineFileDocs = new LineFileDocs(random()); //writes pretty big docs so we cross buffer boarders regularly
for (int opsAdded = 0; opsAdded < numOps; opsAdded++) {
String doc = lineFileDocs.nextDoc().toString();
failableTLog.add(new Translog.Index("test", "" + opsAdded, doc.getBytes(Charset.forName("UTF-8"))));
unsynced.add(doc);
if (randomBoolean()) {
failableTLog.sync();
syncedDocs.addAll(unsynced);
unsynced.clear();
}
if (randomFloat() < 0.1) {
failableTLog.sync(); // we have to sync here first otherwise we don't know if the sync succeeded if the commit fails
syncedDocs.addAll(unsynced);
unsynced.clear();
if (randomBoolean()) {
failableTLog.prepareCommit();
}
failableTLog.commit();
syncedDocs.clear();
}
}
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// fair enough
} catch (IOException ex) {
assertEquals(ex.getMessage(), "__FAKE__ no space left on device");
} finally {
config.setTranslogGeneration(failableTLog.getGeneration());
IOUtils.closeWhileHandlingException(failableTLog);
}
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
}
// now randomly open this failing tlog again just to make sure we can also recover from failing during recovery
if (randomBoolean()) {
try {
IOUtils.close(getFailableTranslog(fail, config, randomBoolean(), false));
} catch (TranslogException | MockDirectoryWrapper.FakeIOException ex) {
// failed - that's ok, we didn't even create it
}
}

try (Translog translog = new Translog(config)) {
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
assertEquals(syncedDocs.size(), snapshot.estimatedTotalOperations());
for (int i = 0; i < syncedDocs.size(); i++) {
Translog.Operation next = snapshot.next();
assertEquals(syncedDocs.get(i), next.getSource().source.toUtf8());
assertNotNull("operation " + i + " must be non-null", next);
}
}
}
}
}
}

0 comments on commit e7f9d68

Please sign in to comment.