Skip to content

Commit

Permalink
Fail and close translog hard if writing to disk fails
Browse files Browse the repository at this point in the history
Today we are super lenient (how could I missed that for f**k sake) with failing
/ closing the translog writer when we hit an exception. It's actually worse, we allow
to further write to it and don't care what has been already written to disk and what hasn't.
We keep the buffer in memory and try to write it again on the next operation.

When we hit a disk-full expcetion due to for instance a big merge we are likely adding document to the
translog but fail to write them to disk. Once the merge failed and freed up it's diskspace (note this is
a small window when concurrently indexing and failing the shard due to out of space exceptions) we will
allow in-flight operations to add to the translog and then once we fail the shard fsync it. These operations
are written to disk and fsynced which is fine but the previous buffer flush might have written some bytes
to disk which are not corrupting the translog. That wouldn't be an issue if we prevented the fsync.

Closes elastic#15333
  • Loading branch information
s1monw committed Dec 14, 2015
1 parent 2602439 commit 2d03a6b
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 56 deletions.
Expand Up @@ -48,13 +48,17 @@ public BufferingTranslogWriter(ShardId shardId, long generation, ChannelReferenc
public Translog.Location add(BytesReference data) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
operationCounter++;
final long offset = totalOffset;
if (data.length() >= buffer.length) {
flush();
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
data.writeTo(channel);
try {
data.writeTo(channel);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
}
writtenOffset += data.length();
totalOffset += data.length();
return new Translog.Location(generation, offset, data.length());
Expand All @@ -64,17 +68,25 @@ public Translog.Location add(BytesReference data) throws IOException {
}
data.writeTo(bufferOs);
totalOffset += data.length();
operationCounter++;
return new Translog.Location(generation, offset, data.length());
}
}

protected final void flush() throws IOException {
assert writeLock.isHeldByCurrentThread();
if (bufferCount > 0) {
ensureOpen();
// we use the channel to write, since on windows, writing to the RAF might not be reflected
// when reading through the channel
Channels.writeToChannel(buffer, 0, bufferCount, channel);
writtenOffset += bufferCount;
final int bufferSize = bufferCount;
try {
Channels.writeToChannel(buffer, 0, bufferSize, channel);
} catch (Throwable ex) {
closeWithTragicEvent(ex);
throw ex;
}
writtenOffset += bufferSize;
bufferCount = 0;
}
}
Expand All @@ -97,7 +109,7 @@ protected void readBytes(ByteBuffer targetBuffer, long position) throws IOExcept
}

@Override
public boolean syncNeeded() {
public synchronized boolean syncNeeded() {
return totalOffset != lastSyncedOffset;
}

Expand All @@ -107,15 +119,24 @@ public void sync() throws IOException {
return;
}
synchronized (this) {
ensureOpen();
channelReference.incRef();
try {
final long offsetToSync;
try (ReleasableLock lock = writeLock.acquire()) {
flush();
lastSyncedOffset = totalOffset;
offsetToSync = totalOffset;
}
// we can do this outside of the write lock but we have to protect from
// concurrent syncs
checkpoint(lastSyncedOffset, operationCounter, channelReference);
try {
ensureOpen();
checkpoint(offsetToSync, operationCounter, channelReference);
} catch (IOException ex) {
closeWithTragicEvent(ex);
throw ex;
}
lastSyncedOffset = offsetToSync;
} finally {
channelReference.decRef();
}
Expand Down
58 changes: 32 additions & 26 deletions core/src/main/java/org/elasticsearch/index/translog/Translog.java
Expand Up @@ -115,7 +115,7 @@ public class Translog extends AbstractIndexShardComponent implements IndexShardC
private final Path location;
private TranslogWriter current;
private volatile ImmutableTranslogReader currentCommittingTranslog;
private long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
private volatile long lastCommittedTranslogFileGeneration = -1; // -1 is safe as it will not cause an translog deletion.
private final AtomicBoolean closed = new AtomicBoolean();
private final TranslogConfig config;
private final String translogUUID;
Expand Down Expand Up @@ -288,10 +288,14 @@ public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try (ReleasableLock lock = writeLock.acquire()) {
try {
IOUtils.close(current, currentCommittingTranslog);
current.sync();
} finally {
IOUtils.close(recoveredTranslogs);
recoveredTranslogs.clear();
try {
IOUtils.close(current, currentCommittingTranslog);
} finally {
IOUtils.close(recoveredTranslogs);
recoveredTranslogs.clear();
}
}
} finally {
FutureUtils.cancel(syncScheduler);
Expand Down Expand Up @@ -354,7 +358,7 @@ public long sizeInBytes() {
TranslogWriter createWriter(long fileGeneration) throws IOException {
TranslogWriter newFile;
try {
newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize());
newFile = TranslogWriter.create(config.getType(), shardId, translogUUID, fileGeneration, location.resolve(getFilename(fileGeneration)), new OnCloseRunnable(), config.getBufferSize(), getChannelFactory());
} catch (IOException e) {
throw new TranslogException(shardId, "failed to create new translog file", e);
}
Expand Down Expand Up @@ -548,31 +552,29 @@ public TranslogConfig getConfig() {
private final class OnCloseRunnable implements Callback<ChannelReference> {
@Override
public void handle(ChannelReference channelReference) {
try (ReleasableLock lock = writeLock.acquire()) {
if (isReferencedGeneration(channelReference.getGeneration()) == false) {
Path translogPath = channelReference.getPath();
assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath;
// if the given translogPath is not the current we can safely delete the file since all references are released
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
IOUtils.deleteFilesIgnoringExceptions(translogPath);
IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
if (isReferencedGeneration(channelReference.getGeneration()) == false) {
Path translogPath = channelReference.getPath();
assert channelReference.getPath().getParent().equals(location) : "translog files must be in the location folder: " + location + " but was: " + translogPath;
// if the given translogPath is not the current we can safely delete the file since all references are released
logger.trace("delete translog file - not referenced and not current anymore {}", translogPath);
IOUtils.deleteFilesIgnoringExceptions(translogPath);
IOUtils.deleteFilesIgnoringExceptions(translogPath.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));

}
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
for (Path path : stream) {
Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString());
if (matcher.matches()) {
long generation = Long.parseLong(matcher.group(1));
if (isReferencedGeneration(generation) == false) {
logger.trace("delete translog file - not referenced and not current anymore {}", path);
IOUtils.deleteFilesIgnoringExceptions(path);
IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
}
}
try (DirectoryStream<Path> stream = Files.newDirectoryStream(location)) {
for (Path path : stream) {
Matcher matcher = PARSE_STRICT_ID_PATTERN.matcher(path.getFileName().toString());
if (matcher.matches()) {
long generation = Long.parseLong(matcher.group(1));
if (isReferencedGeneration(generation) == false) {
logger.trace("delete translog file - not referenced and not current anymore {}", path);
IOUtils.deleteFilesIgnoringExceptions(path);
IOUtils.deleteFilesIgnoringExceptions(path.resolveSibling(getCommitCheckpointFileName(channelReference.getGeneration())));
}
}
} catch (IOException e) {
logger.warn("failed to delete unreferenced translog files", e);
}
} catch (IOException e) {
logger.warn("failed to delete unreferenced translog files", e);
}
}
}
Expand Down Expand Up @@ -1400,4 +1402,8 @@ int getNumOpenViews() {
return outstandingViews.size();
}

TranslogWriter.ChannelFactory getChannelFactory() {
return TranslogWriter.ChannelFactory.DEFAULT;
}

}
Expand Up @@ -140,16 +140,16 @@ protected Translog.Operation read(BufferedChecksumStreamInput inStream) throws I
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
doClose();
channelReference.decRef();
}
}

protected void doClose() throws IOException {
channelReference.decRef();
protected final boolean isClosed() {
return closed.get();
}

protected void ensureOpen() {
if (closed.get()) {
if (isClosed()) {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed");
}
}
Expand Down
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.index.translog;

import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.OutputStreamDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.IOUtils;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class TranslogWriter extends TranslogReader {
protected volatile int operationCounter;
/* the offset in bytes written to the file */
protected volatile long writtenOffset;
protected volatile Throwable tragicEvent;


public TranslogWriter(ShardId shardId, long generation, ChannelReference channelReference) throws IOException {
super(generation, channelReference, channelReference.getChannel().position());
Expand All @@ -65,10 +68,10 @@ public TranslogWriter(ShardId shardId, long generation, ChannelReference channel
this.lastSyncedOffset = channelReference.getChannel().position();;
}

public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize) throws IOException {
public static TranslogWriter create(Type type, ShardId shardId, String translogUUID, long fileGeneration, Path file, Callback<ChannelReference> onClose, int bufferSize, ChannelFactory channelFactory) throws IOException {
final BytesRef ref = new BytesRef(translogUUID);
final int headerLength = CodecUtil.headerLength(TRANSLOG_CODEC) + ref.length + RamUsageEstimator.NUM_BYTES_INT;
final FileChannel channel = FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
final FileChannel channel = channelFactory.open(file);
try {
// This OutputStreamDataOutput is intentionally not closed because
// closing it will close the FileChannel
Expand Down Expand Up @@ -118,6 +121,18 @@ public static Type fromString(String type) {
}
}

protected final void closeWithTragicEvent(Throwable throwable) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
if (throwable != null) {
if (tragicEvent == null) {
tragicEvent = throwable;
} else {
tragicEvent.addSuppressed(throwable);
}
}
close();
}
}

/**
* add the given bytes to the translog and return the location they were written at
Expand All @@ -127,9 +142,14 @@ public Translog.Location add(BytesReference data) throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
position = writtenOffset;
data.writeTo(channel);
try {
data.writeTo(channel);
} catch (Throwable e) {
closeWithTragicEvent(e);
throw e;
}
writtenOffset = writtenOffset + data.length();
operationCounter = operationCounter + 1;
operationCounter++;;
}
return new Translog.Location(generation, position, data.length());
}
Expand All @@ -147,6 +167,7 @@ public void sync() throws IOException {
// check if we really need to sync here...
if (syncNeeded()) {
try (ReleasableLock lock = writeLock.acquire()) {
ensureOpen();
lastSyncedOffset = writtenOffset;
checkpoint(lastSyncedOffset, operationCounter, channelReference);
}
Expand Down Expand Up @@ -262,15 +283,6 @@ public boolean syncUpTo(long offset) throws IOException {
return false;
}

@Override
protected final void doClose() throws IOException {
try (ReleasableLock lock = writeLock.acquire()) {
sync();
} finally {
super.doClose();
}
}

@Override
protected void readBytes(ByteBuffer buffer, long position) throws IOException {
try (ReleasableLock lock = readLock.acquire()) {
Expand All @@ -288,4 +300,20 @@ private static void writeCheckpoint(long syncPosition, int numOperations, Path t
Checkpoint checkpoint = new Checkpoint(syncPosition, numOperations, generation);
Checkpoint.write(checkpointFile, checkpoint, options);
}

static class ChannelFactory {

static final ChannelFactory DEFAULT = new ChannelFactory();

// only for testing until we have a disk-full FileSystemt
public FileChannel open(Path file) throws IOException {
return FileChannel.open(file, StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE_NEW);
}
}

protected final void ensureOpen() {
if (isClosed()) {
throw new AlreadyClosedException("translog [" + getGeneration() + "] is already closed", tragicEvent);
}
}
}
Expand Up @@ -34,13 +34,12 @@
public class BufferedTranslogTests extends TranslogTests {

@Override
protected Translog create(Path path) throws IOException {
protected TranslogConfig getTranslogConfig(Path path) {
Settings build = Settings.settingsBuilder()
.put("index.translog.fs.type", TranslogWriter.Type.BUFFERED.name())
.put("index.translog.fs.buffer_size", 10 + randomInt(128 * 1024), ByteSizeUnit.BYTES)
.put(IndexMetaData.SETTING_VERSION_CREATED, org.elasticsearch.Version.CURRENT)
.build();
TranslogConfig translogConfig = new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
return new Translog(translogConfig);
return new TranslogConfig(shardId, path, IndexSettingsModule.newIndexSettings(shardId.index(), build), Translog.Durabilty.REQUEST, BigArrays.NON_RECYCLING_INSTANCE, null);
}
}

0 comments on commit 2d03a6b

Please sign in to comment.