Skip to content

Commit

Permalink
Faster merging of block entry readers
Browse files Browse the repository at this point in the history
By using a manual brute-force loop over heads, instead of PriorityQueue.
This results in ~15% faster merging.
  • Loading branch information
tinwelint committed Jun 5, 2019
1 parent b5d145c commit b560558
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 24 deletions.
Expand Up @@ -19,74 +19,129 @@
*/
package org.neo4j.kernel.impl.index.schema;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.PriorityQueue;

import org.neo4j.index.internal.gbptree.Layout;
import org.neo4j.io.IOUtils;

import static org.neo4j.io.IOUtils.closeAll;

/**
* Take multiple {@link BlockEntryCursor} that each by themselves provide block entries in sorted order and lazily merge join, providing a view over all
* entries from given cursors in sorted order.
* Merging is done by keeping the cursors in a priority queue with a comparator that compare {@link BlockEntryCursor#key()} (current key on cursor).
* Merging is done by keeping the cursors in an array amd pick the next lowest among them until all are exhausted, comparing
* {@link BlockEntryCursor#key()} (current key on each cursor).
* Instances handed out from {@link #key()} and {@link #value()} are reused, consumer is responsible for creating copy if there is a need to cache results.
*/
public class MergingBlockEntryReader<KEY,VALUE> implements BlockEntryCursor<KEY,VALUE>
{
private final PriorityQueue<BlockEntryCursor<KEY,VALUE>> sortedReaders;
private final List<BlockEntryCursor<KEY,VALUE>> readersToClose = new ArrayList<>();
private BlockEntryCursor<KEY,VALUE> lastReturned;
// Means that a cursor needs to be advanced, i.e. its current head has already been used, or that it has no head yet
private static final byte STATE_NEED_ADVANCE = 0;
// Means that a cursor has been advanced and its current key() contains its current head
private static final byte STATE_HAS = 1;
// Means that a cursor has been exhausted and has no more entries in it
private static final byte STATE_EXHAUSTED = 2;

private final Layout<KEY,VALUE> layout;
private List<Source> sources = new ArrayList<>();
private Source lastReturned;

MergingBlockEntryReader( Layout<KEY,VALUE> layout )
{
this.sortedReaders = new PriorityQueue<>( ( o1, o2 ) -> layout.compare( o1.key(), o2.key() ) );
this.layout = layout;
}

void addSource( BlockEntryCursor<KEY,VALUE> source ) throws IOException
void addSource( BlockEntryCursor<KEY,VALUE> source )
{
readersToClose.add( source );
if ( source.next() )
{
sortedReaders.add( source );
}
sources.add( new Source( source ) );
}

@Override
public boolean next() throws IOException
{
if ( lastReturned != null )
// Figure out lowest among cursor heads
KEY lowest = null;
Source lowestSource = null;
for ( Source source : sources )
{
if ( lastReturned.next() )
KEY candidate = source.tryNext();
if ( candidate != null && (lowest == null || layout.compare( candidate, lowest ) < 0) )
{
sortedReaders.add( lastReturned );
lowest = candidate;
lowestSource = source;
}
}

if ( sortedReaders.isEmpty() )
// Make state transitions so that this entry is now considered used
if ( lowest != null )
{
return false;
lastReturned = lowestSource.takeHead();
return true;
}
lastReturned = sortedReaders.poll();
return true;
return false;
}

@Override
public KEY key()
{
return lastReturned.key();
return lastReturned.cursor.key();
}

@Override
public VALUE value()
{
return lastReturned.value();
return lastReturned.cursor.value();
}

@Override
public void close() throws IOException
{
IOUtils.closeAll( readersToClose );
closeAll( sources );
}

private class Source implements Closeable
{
private final BlockEntryCursor<KEY,VALUE> cursor;
private byte state;

Source( BlockEntryCursor<KEY,VALUE> cursor )
{
this.cursor = cursor;
}

KEY tryNext() throws IOException
{
if ( state == STATE_NEED_ADVANCE )
{
if ( cursor.next() )
{
state = STATE_HAS;
return cursor.key();
}
else
{
state = STATE_EXHAUSTED;
}
}
else if ( state == STATE_HAS )
{
return cursor.key();
}
return null;
}

Source takeHead()
{
state = STATE_NEED_ADVANCE;
return this;
}

@Override
public void close() throws IOException
{
cursor.close();
}
}
}
Expand Up @@ -170,7 +170,8 @@ private static CloseTrackingBlockEntryCursor newReader( List<BlockEntry<MutableL
private List<BlockEntry<MutableLong,MutableLong>> someBlockEntries( Set<MutableLong> uniqueKeys )
{
List<BlockEntry<MutableLong,MutableLong>> entries = new ArrayList<>();
for ( int i = 0; i < rnd.nextInt( 10 ); i++ )
int size = rnd.nextInt( 10 );
for ( int i = 0; i < size; i++ )
{
MutableLong key;
do
Expand Down

0 comments on commit b560558

Please sign in to comment.