Skip to content

Commit

Permalink
catch exc + a bit less contention
Browse files Browse the repository at this point in the history
  • Loading branch information
tlrx committed May 29, 2024
1 parent dc4ce41 commit ab1351f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.lang.invoke.VarHandle;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -708,7 +709,7 @@ int getFreq(CacheFileRegion cacheFileRegion) {

@Override
public void close() {
sharedBytes.decRef();
sharedBytes.close();
}

// used by tests
Expand Down Expand Up @@ -884,16 +885,17 @@ private static void throwAlreadyEvicted() {
*/
boolean tryRead(ByteBuffer buf, long offset) throws IOException {
SharedBytes.IO ioRef = this.io;
if (ioRef != null && ioRef.tryIncRef()) {
if (ioRef != null) {
try {
int readBytes = ioRef.read(buf, getRegionRelativePosition(offset));
if (isEvicted()) {
buf.position(buf.position() - readBytes);
return false;
}
return true;
} finally {
ioRef.decRef();
} catch (ClosedChannelException e) {
// the cache file channel has been closed
return false;
}
} else {
// taken by someone else
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.store.AlreadyClosedException;
import org.elasticsearch.blobcache.BlobCacheUtils;
import org.elasticsearch.blobcache.common.ByteBufferReference;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.core.AbstractRefCounted;
import org.elasticsearch.core.IOUtils;
import org.elasticsearch.core.RefCounted;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.env.Environment;
Expand All @@ -32,7 +31,7 @@
import java.nio.file.StandardOpenOption;
import java.util.function.IntConsumer;

public class SharedBytes extends AbstractRefCounted {
public class SharedBytes extends AbstractRefCounted implements Releasable {

/**
* Thread local direct byte buffer to aggregate multiple positional writes to the cache file.
Expand Down Expand Up @@ -118,6 +117,16 @@ public class SharedBytes extends AbstractRefCounted {
this.readBytes = readBytes;
}

@Override
public void close() {
if (ios != null) {
for (var io : ios) {
io.decRef();
}
}
decRef();
}

/**
* Tries to find a suitable path to a searchable snapshots shared cache file in the data paths founds in the environment.
*
Expand Down Expand Up @@ -287,39 +296,22 @@ public IO getFileChannel(int sharedBytesPos) {
return ios[sharedBytesPos];
}

public final class IO implements RefCounted {
public final class IO extends AbstractRefCounted {

private final long pageStart;

private final MappedByteBuffer mappedByteBuffer;

private IO(final int sharedBytesPos, MappedByteBuffer mappedByteBuffer) {
long physicalOffset = (long) sharedBytesPos * regionSize;
assert physicalOffset <= (long) numRegions * regionSize;
this.pageStart = physicalOffset;
this.mappedByteBuffer = mappedByteBuffer;
SharedBytes.this.incRef();
}

@Override
public boolean tryIncRef() {
return SharedBytes.this.tryIncRef();
}

@Override
public void incRef() {
if (tryIncRef() == false) {
throw new AlreadyClosedException("File channel is closed");
}
}

@Override
public boolean decRef() {
return SharedBytes.this.decRef();
}

@Override
public boolean hasReferences() {
return SharedBytes.this.hasReferences();
protected void closeInternal() {
SharedBytes.this.decRef();
}

@SuppressForbidden(reason = "Use positional reads on purpose")
Expand Down

0 comments on commit ab1351f

Please sign in to comment.