Skip to content

Commit

Permalink
More testing and some fixes in SimpleLongLayout for this scenario
Browse files Browse the repository at this point in the history
  • Loading branch information
tinwelint authored and burqen committed Feb 18, 2019
1 parent b49225e commit 84e1a2e
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 29 deletions.
Expand Up @@ -148,6 +148,7 @@ public int valueSize( MutableLong value )
public void writeKey( PageCursor cursor, MutableLong key ) public void writeKey( PageCursor cursor, MutableLong key )
{ {
cursor.putLong( key.longValue() ); cursor.putLong( key.longValue() );
cursor.putBytes( keyPadding, (byte) 0 );
} }


@Override @Override
Expand All @@ -160,6 +161,7 @@ public void writeValue( PageCursor cursor, MutableLong value )
public void readKey( PageCursor cursor, MutableLong into, int keySize ) public void readKey( PageCursor cursor, MutableLong into, int keySize )
{ {
into.setValue( cursor.getLong() ); into.setValue( cursor.getLong() );
cursor.getBytes( new byte[keyPadding] );
} }


@Override @Override
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.neo4j.io.fs.FileSystemAbstraction; import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.fs.StoreChannel; import org.neo4j.io.fs.StoreChannel;
import org.neo4j.io.pagecache.ByteArrayPageCursor; import org.neo4j.io.pagecache.ByteArrayPageCursor;
import org.neo4j.util.Preconditions;


import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.getOverhead; import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.getOverhead;
import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.putKeyValueSize; import static org.neo4j.index.internal.gbptree.DynamicSizeUtil.putKeyValueSize;
Expand All @@ -49,35 +50,38 @@ class BlockStorage<KEY, VALUE> implements Closeable
private final Comparator<BlockEntry<KEY,VALUE>> comparator; private final Comparator<BlockEntry<KEY,VALUE>> comparator;
private final StoreChannel storeChannel; private final StoreChannel storeChannel;
private final Monitor monitor; private final Monitor monitor;
private final int bufferSize; private final int blockSize;
private final ByteBufferFactory bufferFactory; private final ByteBufferFactory bufferFactory;
private int currentBufferSize; private int currentBufferSize;
private int mergeIteration; private int mergeIteration;
private boolean doneAdding;


BlockStorage( Layout<KEY,VALUE> layout, ByteBufferFactory bufferFactory, FileSystemAbstraction fs, File blockFile, Monitor monitor, int bufferSize ) BlockStorage( Layout<KEY,VALUE> layout, ByteBufferFactory bufferFactory, FileSystemAbstraction fs, File blockFile, Monitor monitor, int blockSize )
throws IOException throws IOException
{ {
this.layout = layout; this.layout = layout;
this.fs = fs; this.fs = fs;
this.blockFile = blockFile; this.blockFile = blockFile;
this.monitor = monitor; this.monitor = monitor;
this.bufferSize = bufferSize; this.blockSize = blockSize;
this.bufferedEntries = Lists.mutable.empty(); this.bufferedEntries = Lists.mutable.empty();
this.bufferFactory = bufferFactory; this.bufferFactory = bufferFactory;
this.byteBuffer = bufferFactory.newBuffer( bufferSize ); this.byteBuffer = bufferFactory.newBuffer( blockSize );
this.comparator = ( e0, e1 ) -> layout.compare( e0.key(), e1.key() ); this.comparator = ( e0, e1 ) -> layout.compare( e0.key(), e1.key() );
this.storeChannel = fs.create( blockFile ); this.storeChannel = fs.create( blockFile );
resetBuffer(); resetBuffer();
} }


public void add( KEY key, VALUE value ) throws IOException public void add( KEY key, VALUE value ) throws IOException
{ {
Preconditions.checkState( !doneAdding, "Cannot add more after done adding" );

int keySize = layout.keySize( key ); int keySize = layout.keySize( key );
int valueSize = layout.valueSize( value ); int valueSize = layout.valueSize( value );
int overhead = getOverhead( keySize, valueSize ); int overhead = getOverhead( keySize, valueSize );
int entrySize = keySize + valueSize + overhead; int entrySize = keySize + valueSize + overhead;


if ( currentBufferSize + entrySize > bufferSize ) if ( currentBufferSize + entrySize > blockSize )
{ {
// append buffer to file and clear buffers // append buffer to file and clear buffers
flushAndResetBuffer(); flushAndResetBuffer();
Expand All @@ -94,6 +98,7 @@ public void doneAdding() throws IOException
{ {
flushAndResetBuffer(); flushAndResetBuffer();
} }
doneAdding = true;
} }


private void resetBuffer() private void resetBuffer()
Expand All @@ -108,6 +113,7 @@ private void flushAndResetBuffer() throws IOException
bufferedEntries.sortThis( comparator ); bufferedEntries.sortThis( comparator );
ByteArrayPageCursor pageCursor = new ByteArrayPageCursor( byteBuffer ); ByteArrayPageCursor pageCursor = new ByteArrayPageCursor( byteBuffer );


System.out.println( "Writing block at " + storeChannel.position() );
// Header // Header
pageCursor.putLong( bufferedEntries.size() ); pageCursor.putLong( bufferedEntries.size() );


Expand All @@ -123,7 +129,7 @@ private void flushAndResetBuffer() throws IOException


// TODO solve the BIG padding problem // TODO solve the BIG padding problem
// Zero pad // Zero pad
pageCursor.putBytes( bufferSize - currentBufferSize, (byte) 0 ); pageCursor.putBytes( blockSize - currentBufferSize, (byte) 0 );


// Append to file // Append to file
byteBuffer.flip(); byteBuffer.flip();
Expand All @@ -140,7 +146,7 @@ public void close() throws IOException


private long calculateBlockSize() private long calculateBlockSize()
{ {
return (long) Math.pow( 2, mergeIteration ) * bufferSize; return (long) Math.pow( 2, mergeIteration ) * blockSize;
} }


public BlockStorageReader<KEY,VALUE> reader() throws IOException public BlockStorageReader<KEY,VALUE> reader() throws IOException
Expand All @@ -152,7 +158,7 @@ public interface Monitor
{ {
void entryAdded( int entrySize ); void entryAdded( int entrySize );


void blockFlushed( long keyCount, int bufferSize, long positionAfterFlush ); void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush );


class Adapter implements Monitor class Adapter implements Monitor
{ {
Expand All @@ -162,7 +168,7 @@ public void entryAdded( int entrySize )
} }


@Override @Override
public void blockFlushed( long keyCount, int bufferSize, long positionAfterFlush ) public void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush )
{ // no-op { // no-op
} }
} }
Expand Down
Expand Up @@ -58,6 +58,7 @@ public BlockReader<KEY,VALUE> nextBlock() throws IOException
blockChannel.position( position ); blockChannel.position( position );
channel.position( position + blockSize ); channel.position( position + blockSize );
PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) ); PageCursor pageCursor = new ReadableChannelPageCursor( new ReadAheadChannel<>( blockChannel ) );
System.out.println( "Opening block reader at " + position );
return new BlockReader<>( pageCursor, layout ); return new BlockReader<>( pageCursor, layout );
} }


Expand Down
Expand Up @@ -23,9 +23,12 @@
import static java.nio.ByteBuffer.allocate; import static java.nio.ByteBuffer.allocate;
import static java.util.Collections.singletonList; import static java.util.Collections.singletonList;
import static java.util.Comparator.comparingLong; import static java.util.Comparator.comparingLong;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.neo4j.kernel.impl.index.schema.BlockStorage.Monitor.NO_MONITOR; import static org.neo4j.kernel.impl.index.schema.BlockStorage.Monitor.NO_MONITOR;


Expand Down Expand Up @@ -68,10 +71,10 @@ void shouldAddSingleEntryInLastBlock() throws IOException
// given // given
SimpleLongLayout layout = layout( 0 ); SimpleLongLayout layout = layout( 0 );
TrackingMonitor monitor = new TrackingMonitor(); TrackingMonitor monitor = new TrackingMonitor();
int bufferSize = 100; int blockSize = 100;
MutableLong key = new MutableLong( 10 ); MutableLong key = new MutableLong( 10 );
MutableLong value = new MutableLong( 20 ); MutableLong value = new MutableLong( 20 );
try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, bufferSize ) ) try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, blockSize ) )
{ {
// when // when
storage.add( key, value ); storage.add( key, value );
Expand All @@ -80,8 +83,9 @@ void shouldAddSingleEntryInLastBlock() throws IOException
// then // then
assertEquals( 1, monitor.blockFlushedCallCount ); assertEquals( 1, monitor.blockFlushedCallCount );
assertEquals( 1, monitor.lastKeyCount ); assertEquals( 1, monitor.lastKeyCount );
assertEquals( BlockStorage.BLOCK_HEADER_SIZE + monitor.totalEntrySize, monitor.lastBufferSize ); assertEquals( BlockStorage.BLOCK_HEADER_SIZE + monitor.totalEntrySize, monitor.lastNumberOfBytes );
assertEquals( bufferSize, monitor.lastPositionAfterFlush ); assertEquals( blockSize, monitor.lastPositionAfterFlush );
assertThat( monitor.lastNumberOfBytes, lessThan( blockSize ) );
assertContents( layout, storage, singletonList( Pair.of( key, value ) ) ); assertContents( layout, storage, singletonList( Pair.of( key, value ) ) );
} }
} }
Expand All @@ -92,9 +96,9 @@ void shouldSortAndAddMultipleEntriesInLastBlock() throws IOException
// given // given
SimpleLongLayout layout = layout( 0 ); SimpleLongLayout layout = layout( 0 );
TrackingMonitor monitor = new TrackingMonitor(); TrackingMonitor monitor = new TrackingMonitor();
int bufferSize = 1_000; int blockSize = 1_000;
List<Pair<MutableLong,MutableLong>> expected = new ArrayList<>(); List<Pair<MutableLong,MutableLong>> expected = new ArrayList<>();
try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, bufferSize ) ) try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, blockSize ) )
{ {
// when // when
for ( int i = 0; i < 10; i++ ) for ( int i = 0; i < 10; i++ )
Expand All @@ -117,16 +121,16 @@ void shouldSortAndAddMultipleEntriesInLastBlock() throws IOException
void shouldSortAndAddMultipleEntriesInMultipleBlocks() throws IOException void shouldSortAndAddMultipleEntriesInMultipleBlocks() throws IOException
{ {
// given // given
SimpleLongLayout layout = layout( 0 ); SimpleLongLayout layout = layout( random.nextInt( 900 ) );
TrackingMonitor monitor = new TrackingMonitor(); TrackingMonitor monitor = new TrackingMonitor();
int bufferSize = 1_000; int blockSize = 1_000;
List<List<Pair<MutableLong,MutableLong>>> expected = new ArrayList<>(); List<List<Pair<MutableLong,MutableLong>>> expected = new ArrayList<>();
try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, bufferSize ) ) try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, blockSize ) )
{ {
// when // when
List<Pair<MutableLong,MutableLong>> currentExpected = new ArrayList<>(); List<Pair<MutableLong,MutableLong>> currentExpected = new ArrayList<>();
long currentBlock = 0; long currentBlock = 0;
for ( long i = 0; monitor.blockFlushedCallCount < 10; i++ ) for ( long i = 0; monitor.blockFlushedCallCount < 3; i++ )
{ {
long keyNumber = random.nextLong( 10_000_000 ); long keyNumber = random.nextLong( 10_000_000 );
MutableLong key = new MutableLong( keyNumber ); MutableLong key = new MutableLong( keyNumber );
Expand All @@ -151,16 +155,42 @@ void shouldSortAndAddMultipleEntriesInMultipleBlocks() throws IOException
} }
} }


@Test
void shouldNotAcceptAddedEntriesAfterDoneAdding() throws IOException
{
// given
SimpleLongLayout layout = layout( 0 );
try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, NO_MONITOR, 100 ) )
{
// when
storage.doneAdding();

// then
assertThrows( IllegalStateException.class, () -> storage.add( new MutableLong( 0 ), new MutableLong( 1 ) ) );
}
}

@Test
void shouldNotFlushAnythingOnEmptyBufferInDoneAdding() throws IOException
{
// given
SimpleLongLayout layout = layout( 0 );
TrackingMonitor monitor = new TrackingMonitor();
try ( BlockStorage<MutableLong,MutableLong> storage = new BlockStorage<>( layout, BUFFER_FACTORY, fileSystem, file, monitor, 100 ) )
{
// when
storage.doneAdding();

// then
assertEquals( 0, monitor.blockFlushedCallCount );
}
}

private void sortExpectedBlock( List<Pair<MutableLong,MutableLong>> currentExpected ) private void sortExpectedBlock( List<Pair<MutableLong,MutableLong>> currentExpected )
{ {
currentExpected.sort( comparingLong( p -> p.getKey().longValue() ) ); currentExpected.sort( comparingLong( p -> p.getKey().longValue() ) );
} }


// TODO shouldFlushSortedEntriesOnFullBuffer
// TODO shouldFlushPaddedBlocks
// TODO shouldNotAcceptAddedEntriesAfterDoneAdding
// TODO shouldNotFlushAnythingOnEmptyBufferInDoneAdding

private void assertContents( SimpleLongLayout layout, BlockStorage<MutableLong,MutableLong> storage, List<Pair<MutableLong,MutableLong>>... expectedBlocks ) private void assertContents( SimpleLongLayout layout, BlockStorage<MutableLong,MutableLong> storage, List<Pair<MutableLong,MutableLong>>... expectedBlocks )
throws IOException throws IOException
{ {
Expand All @@ -175,8 +205,8 @@ private void assertContents( SimpleLongLayout layout, BlockStorage<MutableLong,M
for ( Pair<MutableLong,MutableLong> expectedEntry : expectedBlock ) for ( Pair<MutableLong,MutableLong> expectedEntry : expectedBlock )
{ {
assertTrue( block.next() ); assertTrue( block.next() );
assertEquals( 0, layout.compare( block.key(), expectedEntry.getKey() ) ); assertEquals( 0, layout.compare( expectedEntry.getKey(), block.key() ) );
assertEquals( block.value(), expectedEntry.getValue() ); assertEquals( expectedEntry.getValue(), block.value() );
} }
} }
} }
Expand All @@ -198,7 +228,7 @@ private static class TrackingMonitor implements BlockStorage.Monitor
// For blockFlushed // For blockFlushed
private int blockFlushedCallCount; private int blockFlushedCallCount;
private long lastKeyCount; private long lastKeyCount;
private int lastBufferSize; private int lastNumberOfBytes;
private long lastPositionAfterFlush; private long lastPositionAfterFlush;


@Override @Override
Expand All @@ -210,11 +240,11 @@ public void entryAdded( int entrySize )
} }


@Override @Override
public void blockFlushed( long keyCount, int bufferSize, long positionAfterFlush ) public void blockFlushed( long keyCount, int numberOfBytes, long positionAfterFlush )
{ {
blockFlushedCallCount++; blockFlushedCallCount++;
lastKeyCount = keyCount; lastKeyCount = keyCount;
lastBufferSize = bufferSize; lastNumberOfBytes = numberOfBytes;
lastPositionAfterFlush = positionAfterFlush; lastPositionAfterFlush = positionAfterFlush;
} }
} }
Expand Down

0 comments on commit 84e1a2e

Please sign in to comment.