Skip to content

Commit

Permalink
ISPN-14759 SoftIndexFileStore Index can lag behind LogAppender under …
Browse files Browse the repository at this point in the history
…heavy load

* Cache Index lambdas to prevent allocations
* Remove empty free block list to reduce complexity for reused spaces
* Cache keyParts size to prevent computation for replacements
* Cache compare method
* Use Arrays.mismatch to utilize vectorized compare and single array
  boundary check
  • Loading branch information
wburns committed Oct 6, 2023
1 parent 9048f5f commit 87d6528
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 91 deletions.
124 changes: 77 additions & 47 deletions core/src/main/java/org/infinispan/persistence/sifs/Index.java
Expand Up @@ -8,6 +8,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator;
Expand Down Expand Up @@ -72,6 +73,41 @@ class Index {

private final FlowableProcessor<IndexRequest>[] flowableProcessors;

private final IndexNode.OverwriteHook movedHook = new IndexNode.OverwriteHook() {
@Override
public boolean check(IndexRequest request, int oldFile, int oldOffset) {
return oldFile == request.getPrevFile() && oldOffset == request.getPrevOffset();
}

@Override
public void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) {
if (overwritten && request.getOffset() < 0 && request.getPrevOffset() >= 0) {
sizePerSegment.decrementAndGet(cacheSegment);
}
}
};

private final IndexNode.OverwriteHook updateHook = new IndexNode.OverwriteHook() {
@Override
public void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) {
nonBlockingManager.complete(request, overwritten);
if (request.getOffset() >= 0 && prevOffset < 0) {
sizePerSegment.incrementAndGet(cacheSegment);
} else if (request.getOffset() < 0 && prevOffset >= 0) {
sizePerSegment.decrementAndGet(cacheSegment);
}
}
};

private final IndexNode.OverwriteHook droppedHook = new IndexNode.OverwriteHook() {
@Override
public void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) {
if (request.getPrevFile() == prevFile && request.getPrevOffset() == prevOffset) {
sizePerSegment.decrementAndGet(cacheSegment);
}
}
};

public Index(NonBlockingManager nonBlockingManager, FileProvider fileProvider, Path indexDir, int segments,
int cacheSegments, int minNodeSize, int maxNodeSize, TemporaryTable temporaryTable, Compactor compactor,
TimeService timeService) throws IOException {
Expand Down Expand Up @@ -485,38 +521,15 @@ public void accept(IndexRequest request) throws Throwable {
return;
case MOVED:
recordChange = IndexNode.RecordChange.MOVE;
overwriteHook = new IndexNode.OverwriteHook() {
@Override
public boolean check(int oldFile, int oldOffset) {
return oldFile == request.getPrevFile() && oldOffset == request.getPrevOffset();
}

@Override
public void setOverwritten(int cacheSegment, boolean overwritten, int prevFile, int prevOffset) {
if (overwritten && request.getOffset() < 0 && request.getPrevOffset() >= 0) {
index.sizePerSegment.decrementAndGet(cacheSegment);
}
}
};
overwriteHook = index.movedHook;
break;
case UPDATE:
recordChange = IndexNode.RecordChange.INCREASE;
overwriteHook = (cacheSegment, overwritten, prevFile, prevOffset) -> {
index.nonBlockingManager.complete(request, overwritten);
if (request.getOffset() >= 0 && prevOffset < 0) {
index.sizePerSegment.incrementAndGet(cacheSegment);
} else if (request.getOffset() < 0 && prevOffset >= 0) {
index.sizePerSegment.decrementAndGet(cacheSegment);
}
};
overwriteHook = index.updateHook;
break;
case DROPPED:
recordChange = IndexNode.RecordChange.DECREASE;
overwriteHook = (cacheSegment, overwritten, prevFile, prevOffset) -> {
if (request.getPrevFile() == prevFile && request.getPrevOffset() == prevOffset) {
index.sizePerSegment.decrementAndGet(cacheSegment);
}
};
overwriteHook = index.droppedHook;
break;
case FOUND_OLD:
recordChange = IndexNode.RecordChange.INCREASE_FOR_OLD;
Expand All @@ -526,8 +539,7 @@ public void setOverwritten(int cacheSegment, boolean overwritten, int prevFile,
throw new IllegalArgumentException(request.toString());
}
try {
IndexNode.setPosition(root, request.getSegment(), request.getKey(), request.getSerializedKey(), request.getFile(), request.getOffset(),
request.getSize(), overwriteHook, recordChange);
IndexNode.setPosition(root, request, overwriteHook, recordChange);
} catch (IllegalStateException e) {
request.completeExceptionally(e);
}
Expand Down Expand Up @@ -605,19 +617,22 @@ private void loadFreeBlocks(long freeBlocksOffset) throws IOException {
int blockLength = buffer.getInt(0);
assert blockLength <= Short.MAX_VALUE;
int listSize = buffer.getInt(4);
int requiredSize = 10 * listSize;
buffer = buffer.capacity() < requiredSize ? ByteBuffer.allocate(requiredSize) : buffer;
buffer.position(0);
buffer.limit(requiredSize);
if (!read(indexFile, buffer)) {
throw new IOException("Cannot read free blocks lists!");
}
buffer.flip();
ArrayList<IndexSpace> list = new ArrayList<>(listSize);
for (int j = 0; j < listSize; ++j) {
list.add(new IndexSpace(buffer.getLong(), buffer.getShort()));
// Ignore any free block that had no entries as it adds time complexity to our lookup
if (listSize > 0) {
int requiredSize = 10 * listSize;
buffer = buffer.capacity() < requiredSize ? ByteBuffer.allocate(requiredSize) : buffer;
buffer.position(0);
buffer.limit(requiredSize);
if (!read(indexFile, buffer)) {
throw new IOException("Cannot read free blocks lists!");
}
buffer.flip();
ArrayList<IndexSpace> list = new ArrayList<>(listSize);
for (int j = 0; j < listSize; ++j) {
list.add(new IndexSpace(buffer.getLong(), buffer.getShort()));
}
freeBlocks.put((short) blockLength, list);
}
freeBlocks.put((short) blockLength, list);
}
}

Expand Down Expand Up @@ -654,14 +669,29 @@ public int getMinNodeSize() {

// this should be accessed only from the updater thread
IndexSpace allocateIndexSpace(short length) {
Map.Entry<Short, List<IndexSpace>> entry = freeBlocks.ceilingEntry(length);
if (entry == null || entry.getValue().isEmpty()) {
long oldSize = indexFileSize;
indexFileSize += length;
return new IndexSpace(oldSize, length);
} else {
return entry.getValue().remove(entry.getValue().size() - 1);
// Use tailMap so that we only require O(logN) to find the iterator
// This avoids an additional O(logN) to do an entry removal
Iterator<Map.Entry<Short, List<IndexSpace>>> iter = freeBlocks.tailMap(length).entrySet().iterator();
while (iter.hasNext()) {
Map.Entry<Short, List<IndexSpace>> entry = iter.next();
short spaceLength = entry.getKey();
// Only use the space if it is only 25% larger to avoid too much fragmentation
if ((length + (length >> 2)) < spaceLength) {
break;
}
List<IndexSpace> list = entry.getValue();
if (!list.isEmpty()) {
IndexSpace spaceToReturn = list.remove(list.size() - 1);
if (list.isEmpty()) {
iter.remove();
}
return spaceToReturn;
}
iter.remove();
}
long oldSize = indexFileSize;
indexFileSize += length;
return new IndexSpace(oldSize, length);
}

// this should be accessed only from the updater thread
Expand Down

0 comments on commit 87d6528

Please sign in to comment.