diff --git a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java index 92bc032811754..0d03e6781330e 100644 --- a/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java +++ b/community/kernel/src/main/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReader.java @@ -19,74 +19,129 @@ */ package org.neo4j.kernel.impl.index.schema; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.PriorityQueue; import org.neo4j.index.internal.gbptree.Layout; -import org.neo4j.io.IOUtils; + +import static org.neo4j.io.IOUtils.closeAll; /** * Take multiple {@link BlockEntryCursor} that each by themselves provide block entries in sorted order and lazily merge join, providing a view over all * entries from given cursors in sorted order. - * Merging is done by keeping the cursors in a priority queue with a comparator that compare {@link BlockEntryCursor#key()} (current key on cursor). + * Merging is done by keeping the cursors in an array amd pick the next lowest among them until all are exhausted, comparing + * {@link BlockEntryCursor#key()} (current key on each cursor). * Instances handed out from {@link #key()} and {@link #value()} are reused, consumer is responsible for creating copy if there is a need to cache results. */ public class MergingBlockEntryReader implements BlockEntryCursor { - private final PriorityQueue> sortedReaders; - private final List> readersToClose = new ArrayList<>(); - private BlockEntryCursor lastReturned; + // Means that a cursor needs to be advanced, i.e. its current head has already been used, or that it has no head yet + private static final byte STATE_NEED_ADVANCE = 0; + // Means that a cursor has been advanced and its current key() contains its current head + private static final byte STATE_HAS = 1; + // Means that a cursor has been exhausted and has no more entries in it + private static final byte STATE_EXHAUSTED = 2; + + private final Layout layout; + private List sources = new ArrayList<>(); + private Source lastReturned; MergingBlockEntryReader( Layout layout ) { - this.sortedReaders = new PriorityQueue<>( ( o1, o2 ) -> layout.compare( o1.key(), o2.key() ) ); + this.layout = layout; } - void addSource( BlockEntryCursor source ) throws IOException + void addSource( BlockEntryCursor source ) { - readersToClose.add( source ); - if ( source.next() ) - { - sortedReaders.add( source ); - } + sources.add( new Source( source ) ); } @Override public boolean next() throws IOException { - if ( lastReturned != null ) + // Figure out lowest among cursor heads + KEY lowest = null; + Source lowestSource = null; + for ( Source source : sources ) { - if ( lastReturned.next() ) + KEY candidate = source.tryNext(); + if ( candidate != null && (lowest == null || layout.compare( candidate, lowest ) < 0) ) { - sortedReaders.add( lastReturned ); + lowest = candidate; + lowestSource = source; } } - if ( sortedReaders.isEmpty() ) + // Make state transitions so that this entry is now considered used + if ( lowest != null ) { - return false; + lastReturned = lowestSource.takeHead(); + return true; } - lastReturned = sortedReaders.poll(); - return true; + return false; } @Override public KEY key() { - return lastReturned.key(); + return lastReturned.cursor.key(); } @Override public VALUE value() { - return lastReturned.value(); + return lastReturned.cursor.value(); } @Override public void close() throws IOException { - IOUtils.closeAll( readersToClose ); + closeAll( sources ); + } + + private class Source implements Closeable + { + private final BlockEntryCursor cursor; + private byte state; + + Source( BlockEntryCursor cursor ) + { + this.cursor = cursor; + } + + KEY tryNext() throws IOException + { + if ( state == STATE_NEED_ADVANCE ) + { + if ( cursor.next() ) + { + state = STATE_HAS; + return cursor.key(); + } + else + { + state = STATE_EXHAUSTED; + } + } + else if ( state == STATE_HAS ) + { + return cursor.key(); + } + return null; + } + + Source takeHead() + { + state = STATE_NEED_ADVANCE; + return this; + } + + @Override + public void close() throws IOException + { + cursor.close(); + } } } diff --git a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java index 8ac5788e45286..f977891ce9525 100644 --- a/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java +++ b/community/kernel/src/test/java/org/neo4j/kernel/impl/index/schema/MergingBlockEntryReaderTest.java @@ -170,7 +170,8 @@ private static CloseTrackingBlockEntryCursor newReader( List> someBlockEntries( Set uniqueKeys ) { List> entries = new ArrayList<>(); - for ( int i = 0; i < rnd.nextInt( 10 ); i++ ) + int size = rnd.nextInt( 10 ); + for ( int i = 0; i < size; i++ ) { MutableLong key; do