diff --git a/core/src/main/java/org/infinispan/persistence/sifs/Index.java b/core/src/main/java/org/infinispan/persistence/sifs/Index.java index 506e334e5f82..f309e2b9a570 100644 --- a/core/src/main/java/org/infinispan/persistence/sifs/Index.java +++ b/core/src/main/java/org/infinispan/persistence/sifs/Index.java @@ -8,6 +8,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PrimitiveIterator; @@ -72,6 +73,41 @@ class Index { private final FlowableProcessor[] flowableProcessors; + private final IndexNode.OverwriteHook movedHook = new IndexNode.OverwriteHook() { + @Override + public boolean check(IndexRequest request, int oldFile, int oldOffset) { + return oldFile == request.getPrevFile() && oldOffset == request.getPrevOffset(); + } + + @Override + public void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) { + if (overwritten && request.getOffset() < 0 && request.getPrevOffset() >= 0) { + sizePerSegment.decrementAndGet(cacheSegment); + } + } + }; + + private final IndexNode.OverwriteHook updateHook = new IndexNode.OverwriteHook() { + @Override + public void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) { + nonBlockingManager.complete(request, overwritten); + if (request.getOffset() >= 0 && prevOffset < 0) { + sizePerSegment.incrementAndGet(cacheSegment); + } else if (request.getOffset() < 0 && prevOffset >= 0) { + sizePerSegment.decrementAndGet(cacheSegment); + } + } + }; + + private final IndexNode.OverwriteHook droppedHook = new IndexNode.OverwriteHook() { + @Override + public void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) { + if (request.getPrevFile() == prevFile && request.getPrevOffset() == prevOffset) { + sizePerSegment.decrementAndGet(cacheSegment); + } + } + }; + public Index(NonBlockingManager nonBlockingManager, FileProvider fileProvider, Path indexDir, int segments, int cacheSegments, int minNodeSize, int maxNodeSize, TemporaryTable temporaryTable, Compactor compactor, TimeService timeService) throws IOException { @@ -485,38 +521,15 @@ public void accept(IndexRequest request) throws Throwable { return; case MOVED: recordChange = IndexNode.RecordChange.MOVE; - overwriteHook = new IndexNode.OverwriteHook() { - @Override - public boolean check(int oldFile, int oldOffset) { - return oldFile == request.getPrevFile() && oldOffset == request.getPrevOffset(); - } - - @Override - public void setOverwritten(int cacheSegment, boolean overwritten, int prevFile, int prevOffset) { - if (overwritten && request.getOffset() < 0 && request.getPrevOffset() >= 0) { - index.sizePerSegment.decrementAndGet(cacheSegment); - } - } - }; + overwriteHook = index.movedHook; break; case UPDATE: recordChange = IndexNode.RecordChange.INCREASE; - overwriteHook = (cacheSegment, overwritten, prevFile, prevOffset) -> { - index.nonBlockingManager.complete(request, overwritten); - if (request.getOffset() >= 0 && prevOffset < 0) { - index.sizePerSegment.incrementAndGet(cacheSegment); - } else if (request.getOffset() < 0 && prevOffset >= 0) { - index.sizePerSegment.decrementAndGet(cacheSegment); - } - }; + overwriteHook = index.updateHook; break; case DROPPED: recordChange = IndexNode.RecordChange.DECREASE; - overwriteHook = (cacheSegment, overwritten, prevFile, prevOffset) -> { - if (request.getPrevFile() == prevFile && request.getPrevOffset() == prevOffset) { - index.sizePerSegment.decrementAndGet(cacheSegment); - } - }; + overwriteHook = index.droppedHook; break; case FOUND_OLD: recordChange = IndexNode.RecordChange.INCREASE_FOR_OLD; @@ -526,8 +539,7 @@ public void setOverwritten(int cacheSegment, boolean overwritten, int prevFile, throw new IllegalArgumentException(request.toString()); } try { - IndexNode.setPosition(root, request.getSegment(), request.getKey(), request.getSerializedKey(), request.getFile(), request.getOffset(), - request.getSize(), overwriteHook, recordChange); + IndexNode.setPosition(root, request, overwriteHook, recordChange); } catch (IllegalStateException e) { request.completeExceptionally(e); } @@ -605,19 +617,22 @@ private void loadFreeBlocks(long freeBlocksOffset) throws IOException { int blockLength = buffer.getInt(0); assert blockLength <= Short.MAX_VALUE; int listSize = buffer.getInt(4); - int requiredSize = 10 * listSize; - buffer = buffer.capacity() < requiredSize ? ByteBuffer.allocate(requiredSize) : buffer; - buffer.position(0); - buffer.limit(requiredSize); - if (!read(indexFile, buffer)) { - throw new IOException("Cannot read free blocks lists!"); - } - buffer.flip(); - ArrayList list = new ArrayList<>(listSize); - for (int j = 0; j < listSize; ++j) { - list.add(new IndexSpace(buffer.getLong(), buffer.getShort())); + // Ignore any free block that had no entries as it adds time complexity to our lookup + if (listSize > 0) { + int requiredSize = 10 * listSize; + buffer = buffer.capacity() < requiredSize ? ByteBuffer.allocate(requiredSize) : buffer; + buffer.position(0); + buffer.limit(requiredSize); + if (!read(indexFile, buffer)) { + throw new IOException("Cannot read free blocks lists!"); + } + buffer.flip(); + ArrayList list = new ArrayList<>(listSize); + for (int j = 0; j < listSize; ++j) { + list.add(new IndexSpace(buffer.getLong(), buffer.getShort())); + } + freeBlocks.put((short) blockLength, list); } - freeBlocks.put((short) blockLength, list); } } @@ -654,14 +669,29 @@ public int getMinNodeSize() { // this should be accessed only from the updater thread IndexSpace allocateIndexSpace(short length) { - Map.Entry> entry = freeBlocks.ceilingEntry(length); - if (entry == null || entry.getValue().isEmpty()) { - long oldSize = indexFileSize; - indexFileSize += length; - return new IndexSpace(oldSize, length); - } else { - return entry.getValue().remove(entry.getValue().size() - 1); + // Use tailMap so that we only require O(logN) to find the iterator + // This avoids an additional O(logN) to do an entry removal + Iterator>> iter = freeBlocks.tailMap(length).entrySet().iterator(); + while (iter.hasNext()) { + Map.Entry> entry = iter.next(); + short spaceLength = entry.getKey(); + // Only use the space if it is only 25% larger to avoid too much fragmentation + if ((length + (length >> 2)) < spaceLength) { + break; + } + List list = entry.getValue(); + if (!list.isEmpty()) { + IndexSpace spaceToReturn = list.remove(list.size() - 1); + if (list.isEmpty()) { + iter.remove(); + } + return spaceToReturn; + } + iter.remove(); } + long oldSize = indexFileSize; + indexFileSize += length; + return new IndexSpace(oldSize, length); } // this should be accessed only from the updater thread diff --git a/core/src/main/java/org/infinispan/persistence/sifs/IndexNode.java b/core/src/main/java/org/infinispan/persistence/sifs/IndexNode.java index 61b7324aa153..6a10e4b5466c 100644 --- a/core/src/main/java/org/infinispan/persistence/sifs/IndexNode.java +++ b/core/src/main/java/org/infinispan/persistence/sifs/IndexNode.java @@ -8,6 +8,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.Comparator; import java.util.Deque; import java.util.List; import java.util.concurrent.locks.Lock; @@ -52,6 +53,7 @@ class IndexNode { private static final byte HAS_LEAVES = 1; private static final byte HAS_NODES = 2; + // Prefix length (short) + keyNode length (short) + flag (byte) private static final int INNER_NODE_HEADER_SIZE = 5; private static final int INNER_NODE_REFERENCE_SIZE = 10; private static final int LEAF_NODE_REFERENCE_SIZE = 14; @@ -66,6 +68,7 @@ class IndexNode { private LeafNode[] leafNodes = LeafNode.EMPTY_ARRAY; private final ReadWriteLock lock = new ReentrantReadWriteLock(); private long offset = -1; + private short keyPartsLength = -1; private short contentLength = -1; private short totalLength = -1; private short occupiedSpace; @@ -90,23 +93,28 @@ public enum RecordChange { byte flags = buffer.get(); int numKeyParts = buffer.getShort(); + int afterHeaderPos = buffer.position(); keyParts = new byte[numKeyParts][]; for (int i = 0; i < numKeyParts; ++i) { keyParts[i] = new byte[buffer.getShort()]; buffer.get(keyParts[i]); } + assert (buffer.position() - afterHeaderPos) < Short.MAX_VALUE; + keyPartsLength = (short) (buffer.position() - afterHeaderPos); if ((flags & HAS_LEAVES) != 0) { leafNodes = new LeafNode[numKeyParts + 1]; for (int i = 0; i < numKeyParts + 1; ++i) { leafNodes[i] = new LeafNode(buffer.getInt(), buffer.getInt(), buffer.getShort(), buffer.getInt()); } - } else if ((flags & HAS_NODES) != 0){ + } else if ((flags & HAS_NODES) != 0) { innerNodes = new InnerNode[numKeyParts + 1]; for (int i = 0; i < numKeyParts + 1; ++i) { innerNodes[i] = new InnerNode(buffer.getLong(), buffer.getShort()); } } + assert (buffer.position() - afterHeaderPos) < Short.MAX_VALUE; + contentLength = (short) (buffer.position() - afterHeaderPos); if (log.isTraceEnabled()) { log.tracef("Loaded %08x from %d:%d (length %d)", System.identityHashCode(this), offset, occupiedSpace, length()); @@ -168,6 +176,7 @@ private void replaceContent(IndexNode other) throws IOException { this.innerNodes = other.innerNodes; this.leafNodes = other.leafNodes; this.contentLength = -1; + this.keyPartsLength = -1; this.totalLength = -1; } finally { lock.writeLock().unlock(); @@ -372,9 +381,7 @@ private void updateFileOffsetInFile(int leafOffset, int newFile, int newOffset, // Root is -1, so that means the beginning of the file long offset = this.offset >= 0 ? this.offset : 0; offset += headerLength(); - for (byte[] keyPart : this.keyParts) { - offset += 2 + keyPart.length; - } + offset += keyPartsLength(); offset += (long) leafOffset * LEAF_NODE_REFERENCE_SIZE; ByteBuffer buffer = ByteBuffer.allocate(10); @@ -400,14 +407,15 @@ private static IndexNode findParentNode(IndexNode root, byte[] indexKey, Deque

stack = new ArrayDeque<>(); IndexNode node = findParentNode(root, indexKey, stack); - IndexNode copy = node.copyWith(cacheSegment, objectKey, indexKey, file, offset, size, overwriteHook, recordChange); + IndexNode copy = node.copyWith(request, cacheSegment, indexKey, overwriteHook, recordChange); if (copy == node) { // no change was executed return; @@ -658,12 +666,15 @@ private byte[] rightmostKey() throws IOException, IndexNodeOutdatedException { /** * Called on the most bottom node */ - private IndexNode copyWith(int cacheSegment, Object objectKey, byte[] indexKey, int file, int offset, int size, OverwriteHook overwriteHook, RecordChange recordChange) throws IOException { + private IndexNode copyWith(IndexRequest request, int cacheSegment, byte[] indexKey, OverwriteHook overwriteHook, RecordChange recordChange) throws IOException { if (leafNodes == null) throw new IllegalArgumentException(); byte[] newPrefix; + int file = request.getFile(); + int offset = request.getOffset(); + int size = request.getSize(); if (leafNodes.length == 0) { - overwriteHook.setOverwritten(cacheSegment, false, -1, -1); - if (overwriteHook.check(-1, -1)) { + overwriteHook.setOverwritten(request, cacheSegment, false, -1, -1); + if (overwriteHook.check(request, -1, -1)) { return new IndexNode(segment, prefix, keyParts, new LeafNode[]{new LeafNode(file, offset, (short) 1, cacheSegment)}); } else { segment.getCompactor().free(file, size); @@ -700,9 +711,10 @@ private IndexNode copyWith(int cacheSegment, Object objectKey, byte[] indexKey, } byte[] oldIndexKey = Index.toIndexKey(oldLeafNode.cacheSegment, hak.getKey()); int keyComp = compare(oldIndexKey, indexKey); + Object objectKey = request.getKey(); if (keyComp == 0) { if (numRecords > 0) { - if (overwriteHook.check(oldLeafNode.file, oldLeafNode.offset)) { + if (overwriteHook.check(request, oldLeafNode.file, oldLeafNode.offset)) { if (recordChange == RecordChange.INCREASE || recordChange == RecordChange.MOVE) { if (log.isTraceEnabled()) { log.trace(String.format("Overwriting %s %d:%d with %d:%d (%d)", objectKey, @@ -732,15 +744,15 @@ private IndexNode copyWith(int cacheSegment, Object objectKey, byte[] indexKey, lock.writeLock().unlock(); } - overwriteHook.setOverwritten(cacheSegment, true, oldLeafNode.file, oldLeafNode.offset); + overwriteHook.setOverwritten(request, cacheSegment, true, oldLeafNode.file, oldLeafNode.offset); return this; } else { - overwriteHook.setOverwritten(cacheSegment, false, -1, -1); + overwriteHook.setOverwritten(request, cacheSegment, false, -1, -1); segment.getCompactor().free(file, size); return this; } } else { - overwriteHook.setOverwritten(cacheSegment, true, oldLeafNode.file, oldLeafNode.offset); + overwriteHook.setOverwritten(request, cacheSegment, true, oldLeafNode.file, oldLeafNode.offset); if (keyParts.length <= 1) { newPrefix = Util.EMPTY_BYTE_ARRAY; newKeyParts = Util.EMPTY_BYTE_ARRAY_ARRAY; @@ -766,7 +778,7 @@ private IndexNode copyWith(int cacheSegment, Object objectKey, byte[] indexKey, } else { // IndexRequest cannot be MOVED or DROPPED when the key is not in the index assert recordChange == RecordChange.INCREASE; - overwriteHook.setOverwritten(cacheSegment, false, -1, -1); + overwriteHook.setOverwritten(request, cacheSegment, false, -1, -1); // We have to insert the record even if this is a delete request and the key was not found // because otherwise we would have incorrect numRecord count. Eventually, Compactor will @@ -807,7 +819,7 @@ private int getIterationPoint(byte[] key, int cacheSegment) { insertionPoint = keyParts.length; } else { byte[] keyPostfix = substring(key, prefix.length, key.length); - insertionPoint = Arrays.binarySearch(keyParts, keyPostfix, (o1, o2) -> IndexNode.compare(o2, o1)); + insertionPoint = Arrays.binarySearch(keyParts, keyPostfix, REVERSED_COMPARE_TO); if (insertionPoint < 0) { insertionPoint = -insertionPoint - 1; } else { @@ -843,7 +855,7 @@ private int getInsertionPoint(byte[] key) { insertionPoint = keyParts.length; } else { byte[] keyPostfix = substring(key, prefix.length, key.length); - insertionPoint = Arrays.binarySearch(keyParts, keyPostfix, (o1, o2) -> IndexNode.compare(o2, o1)); + insertionPoint = Arrays.binarySearch(keyParts, keyPostfix, REVERSED_COMPARE_TO); if (insertionPoint < 0) { insertionPoint = -insertionPoint - 1; } else { @@ -964,42 +976,49 @@ private static byte[] substring(byte[] key, int begin, int end) { } private static byte[] commonPrefix(byte[] oldPrefix, byte[] newKey) { - int i; - for (i = 0; i < oldPrefix.length && i < newKey.length; ++i) { - if (newKey[i] != oldPrefix[i]) break; - } + int i = Arrays.mismatch(oldPrefix, newKey); if (i == oldPrefix.length) { return oldPrefix; } if (i == newKey.length) { return newKey; } - byte[] prefix = new byte[i]; - for (--i; i >= 0; --i) { - prefix[i] = oldPrefix[i]; + if (i == 0) { + return Util.EMPTY_BYTE_ARRAY; } + byte[] prefix = new byte[i]; + System.arraycopy(oldPrefix, 0, prefix, 0, i); return prefix; } // Compares the two arrays. This is different from a regular compare that if the second array has more bytes than // the first but contains all the same bytes it is treated equal private static int compare(byte[] first, byte[] second, int secondLength) { - for (int i = 0; i < secondLength; ++i) { - if (i >= first.length) { - return 1; - } - if (second[i] == first[i]) continue; - return second[i] > first[i] ? 1 : -1; + if (secondLength == 0) { + return 0; } - return 0; + int mismatchPos = Arrays.mismatch(first, 0, first.length, second, 0, secondLength); + if (mismatchPos == -1 || mismatchPos == secondLength) { + return 0; + } else if (mismatchPos >= first.length) { + return first.length + 1; + } + return second[mismatchPos] > first[mismatchPos] ? mismatchPos + 1 : -mismatchPos - 1; } + public static final Comparator REVERSED_COMPARE_TO = ((Comparator) IndexNode::compare).reversed(); private static int compare(byte[] first, byte[] second) { - for (int i = 0; i < first.length && i < second.length; ++i) { - if (second[i] == first[i]) continue; - return second[i] > first[i] ? i + 1 : -i - 1; + // Use Arrays.mismatch as it doesn't do boundary check for every byte and uses vectorized comparison for arrays + // larger than 7 + int mismatchPos = Arrays.mismatch(first, second); + if (mismatchPos == -1) { + return 0; + } else if (mismatchPos >= first.length) { + return first.length + 1; + } else if (mismatchPos >= second.length) { + return -second.length - 1; } - return second.length > first.length ? first.length + 1 : (second.length < first.length ? -second.length - 1 : 0); + return second[mismatchPos] > first[mismatchPos] ? mismatchPos + 1 : -mismatchPos - 1; } private short headerLength() { @@ -1008,14 +1027,24 @@ private short headerLength() { return (short) headerLength; } - private int contentLength() { - if (contentLength >= 0) { - return contentLength; + private short keyPartsLength() { + if (keyPartsLength >= 0) { + return keyPartsLength; } int sum = 0; for (byte[] keyPart : keyParts) { sum += 2 + keyPart.length; } + + assert sum <= Short.MAX_VALUE; + return keyPartsLength = (short) sum; + } + + private short contentLength() { + if (contentLength >= 0) { + return contentLength; + } + int sum = keyPartsLength(); if (innerNodes != null) { sum += INNER_NODE_REFERENCE_SIZE * innerNodes.length; } else if (leafNodes != null) { @@ -1042,14 +1071,14 @@ private static IndexNode emptyWithInnerNodes(Index.Segment segment) { return new IndexNode(segment, Util.EMPTY_BYTE_ARRAY, Util.EMPTY_BYTE_ARRAY_ARRAY, new InnerNode[]{new InnerNode(-1L, (short) -1)}); } - static final OverwriteHook NOOP_HOOK = (int cacheSegment, boolean overwritten, int prevFile, int prevOffset) -> { }; + static final OverwriteHook NOOP_HOOK = (IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset) -> { }; public interface OverwriteHook { - default boolean check(int oldFile, int oldOffset) { + default boolean check(IndexRequest request, int oldFile, int oldOffset) { return true; } - void setOverwritten(int cacheSegment, boolean overwritten, int prevFile, int prevOffset); + void setOverwritten(IndexRequest request, int cacheSegment, boolean overwritten, int prevFile, int prevOffset); } static class InnerNode extends Index.IndexSpace { @@ -1212,7 +1241,7 @@ Flowable publish(IntSet cacheSegments, boolean loadValues) { byte[] segmentPrefix = new byte[UnsignedNumeric.sizeUnsignedInt(cacheSegment)]; UnsignedNumeric.writeUnsignedInt(segmentPrefix, 0, cacheSegment); return segmentPrefix; - }).sorted((o1, o2) -> IndexNode.compare(o2, o1)) + }).sorted(REVERSED_COMPARE_TO) .collect(Collectors.toCollection(ArrayDeque::new)); if (sortedSegmentPrefixes.isEmpty()) { return Flowable.empty();