Skip to content

Commit

Permalink
Support for writing header on newly instantiated GBPTree
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen committed Jun 13, 2017
1 parent 6ae860d commit 8ebde02
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 15 deletions.
Expand Up @@ -201,6 +201,11 @@ public void startupState( boolean clean )
*/
static final Header.Reader NO_HEADER = (cursor,length) -> {};

/**
* No-op header writer.
*/
static final Consumer<PageCursor> NO_HEADER_WRITER = pc -> {};

/**
* Paged file in a {@link PageCache} providing the means of storage.
*/
Expand Down Expand Up @@ -340,12 +345,13 @@ public void startupState( boolean clean )
* @param tentativePageSize page size, i.e. tree node size. Must be less than or equal to that of the page cache.
* A pageSize of {@code 0} means to use whatever the page cache has (at creation)
* @param monitor {@link Monitor} for monitoring {@link GBPTree}.
* @param headerReader reads header data, previously written using {@link #checkpoint(IOLimiter, Consumer)}
* or {@link #close()}
* @param headerReader reads header data if indexFile already exists,
* previously written using {@link #checkpoint(IOLimiter, Consumer)} or {@link #close()}
* @param headerWriter writes header data if indexFile is created as a result of this call.
* @throws IOException on page cache error
*/
public GBPTree( PageCache pageCache, File indexFile, Layout<KEY,VALUE> layout, int tentativePageSize,
Monitor monitor, Header.Reader headerReader ) throws IOException
Monitor monitor, Header.Reader headerReader, Consumer<PageCursor> headerWriter ) throws IOException
{
this.indexFile = indexFile;
this.monitor = monitor;
Expand All @@ -363,7 +369,7 @@ public GBPTree( PageCache pageCache, File indexFile, Layout<KEY,VALUE> layout, i
// Create or load state
if ( created )
{
initializeAfterCreation( layout );
initializeAfterCreation( layout, headerWriter );
}
else
{
Expand Down Expand Up @@ -396,7 +402,7 @@ public GBPTree( PageCache pageCache, File indexFile, Layout<KEY,VALUE> layout, i
}
}

private void initializeAfterCreation( Layout<KEY,VALUE> layout ) throws IOException
private void initializeAfterCreation( Layout<KEY,VALUE> layout, Consumer<PageCursor> headerWriter ) throws IOException
{
// Initialize meta
writeMeta( layout, pagedFile );
Expand All @@ -419,7 +425,7 @@ private void initializeAfterCreation( Layout<KEY,VALUE> layout ) throws IOExcept
// Initialize free-list
freeList.initializeAfterCreation();
changesSinceLastCheckpoint = true;
checkpoint( IOLimiter.unlimited() );
checkpoint( IOLimiter.unlimited(), headerWriter );
clean = true;
}

Expand Down Expand Up @@ -523,7 +529,7 @@ private static void writerHeader( PagedFile pagedFile, Header.Writer headerWrite
// Write/carry over header
int headerOffset = cursor.getOffset();
int headerDataOffset = headerOffset + Integer.BYTES; // will contain length of written header data (below)
if ( otherState.isValid() )
if ( otherState.isValid() || headerWriter != CARRY_OVER_PREVIOUS_HEADER )
{
PageCursor previousCursor = pagedFile.io( otherState.pageId(), PagedFile.PF_SHARED_READ_LOCK );
PageCursorUtil.goTo( previousCursor, "previous state page", otherState.pageId() );
Expand Down
Expand Up @@ -50,6 +50,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -96,8 +97,8 @@ public void shouldDetectFormatChange() throws Throwable
// WHEN reading from the tree
// THEN everything should work, otherwise there has likely been a format change
PageCache pageCache = pageCacheRule.getPageCache( fsRule.get() );
try ( GBPTree<MutableLong,MutableLong> tree =
new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER ) )
try ( GBPTree<MutableLong,MutableLong> tree = new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0,
NO_MONITOR, NO_HEADER, NO_HEADER_WRITER ) )
{
try
{
Expand Down Expand Up @@ -172,8 +173,8 @@ private void unzipTo( File storeFile ) throws IOException
private void createAndZipTree( File storeFile ) throws IOException
{
PageCache pageCache = pageCacheRule.getPageCache( fsRule.get() );
try ( GBPTree<MutableLong,MutableLong> tree =
new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER ) )
try ( GBPTree<MutableLong,MutableLong> tree = new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0,
NO_MONITOR, NO_HEADER, NO_HEADER_WRITER ) )
{
MutableLong insertKey = new MutableLong();
MutableLong insertValue = new MutableLong();
Expand Down
Expand Up @@ -61,6 +61,7 @@
import static org.junit.Assert.fail;
import static org.junit.rules.RuleChain.outerRule;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -106,7 +107,7 @@ private GBPTree<MutableLong,MutableLong> createIndex( GBPTree.Monitor monitor )
PageCache pageCache =
pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ).withAccessChecks( true ) );
return index = new GBPTree<>( pageCache, directory.file( "index" ),
layout, 0/*use whatever page cache says*/, monitor, NO_HEADER );
layout, 0/*use whatever page cache says*/, monitor, NO_HEADER, NO_HEADER_WRITER );
}

@After
Expand Down
Expand Up @@ -46,6 +46,7 @@
import static org.junit.Assert.fail;
import static org.junit.rules.RuleChain.outerRule;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -75,7 +76,7 @@ private GBPTree<MutableLong,MutableLong> createIndex( int pageSize, GBPTree.Moni
{
pageCache = pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ).withAccessChecks( true ) );
return index = new GBPTree<>( pageCache, directory.file( "index" ),
layout, 0/*use whatever page cache says*/, monitor, NO_HEADER );
layout, 0/*use whatever page cache says*/, monitor, NO_HEADER, NO_HEADER_WRITER );
}

@After
Expand Down
Expand Up @@ -44,6 +44,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.rules.RuleChain.outerRule;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.index.internal.gbptree.ThrowingRunnable.throwing;
import static org.neo4j.io.pagecache.IOLimiter.unlimited;
Expand Down Expand Up @@ -381,7 +382,7 @@ private long[] modificationData( int min, int max )

private static GBPTree<MutableLong,MutableLong> createIndex( PageCache pageCache, File file ) throws IOException
{
return new GBPTree<>( pageCache, file, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER );
return new GBPTree<>( pageCache, file, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER, NO_HEADER_WRITER );
}

private PageCache createPageCache()
Expand Down
Expand Up @@ -44,6 +44,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.neo4j.collection.primitive.Primitive;
import org.neo4j.collection.primitive.PrimitiveLongCollections;
Expand Down Expand Up @@ -73,6 +74,7 @@
import static org.junit.Assert.fail;
import static org.junit.rules.RuleChain.outerRule;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER_WRITER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.index.internal.gbptree.ThrowingRunnable.throwing;
import static org.neo4j.io.pagecache.IOLimiter.unlimited;
Expand Down Expand Up @@ -557,6 +559,50 @@ public void shouldReplaceHeaderDataInNextCheckPoint() throws Exception
verifyHeaderDataAfterClose( beforeClose );
}

@Test
public void mustWriteHeaderOnInitialization() throws Exception
{
// GIVEN
byte[] headerBytes = new byte[12];
ThreadLocalRandom.current().nextBytes( headerBytes );
Consumer<PageCursor> headerWriter = pc -> pc.putBytes( headerBytes );

// WHEN
try ( GBPTree<MutableLong,MutableLong> ignore = index().with( headerWriter ).build() )
{
}

// THEN
verifyHeader( headerBytes );
}

@Test
public void mustNotOverwriteHeaderOnExistingTree() throws Exception
{
// GIVEN
byte[] expectedBytes = new byte[12];
ThreadLocalRandom.current().nextBytes( expectedBytes );
Consumer<PageCursor> headerWriter = pc -> pc.putBytes( expectedBytes );
try ( GBPTree<MutableLong,MutableLong> ignore = index().with( headerWriter ).build() )
{
}

// WHEN
byte[] fraudulentBytes = new byte[12];
do
{
ThreadLocalRandom.current().nextBytes( fraudulentBytes );
}
while ( Arrays.equals( expectedBytes, fraudulentBytes ) );

try ( GBPTree<MutableLong,MutableLong> ignore = index().with( headerWriter ).build() )
{
}

// THEN
verifyHeader( expectedBytes );
}

private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableLong>,byte[]> beforeClose ) throws IOException
{
byte[] expectedHeader = new byte[12];
Expand All @@ -568,6 +614,11 @@ private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableL
beforeClose.accept( index, expectedHeader );
}

verifyHeader( expectedHeader );
}

private void verifyHeader( byte[] expectedHeader ) throws IOException
{
// WHEN
byte[] readHeader = new byte[expectedHeader.length];
AtomicInteger length = new AtomicInteger();
Expand Down Expand Up @@ -1213,6 +1264,7 @@ private class GBPTreeBuilder
private Header.Reader headerReader = NO_HEADER;
private Layout<MutableLong,MutableLong> layout = GBPTreeTest.layout;
private PageCache specificPageCache;
private Consumer<PageCursor> headerWriter = NO_HEADER_WRITER;

private GBPTreeBuilder withPageCachePageSize( int pageSize )
{
Expand Down Expand Up @@ -1250,6 +1302,12 @@ private GBPTreeBuilder with( PageCache pageCache )
return this;
}

public GBPTreeBuilder with( Consumer<PageCursor> headerWriter )
{
this.headerWriter = headerWriter;
return this;
}

private GBPTree<MutableLong,MutableLong> build() throws IOException
{
PageCache pageCacheToUse;
Expand All @@ -1267,7 +1325,8 @@ private GBPTree<MutableLong,MutableLong> build() throws IOException
pageCacheToUse = specificPageCache;
}

return new GBPTree<>( pageCacheToUse, indexFile, layout, tentativePageSize, monitor, headerReader );
return new GBPTree<>( pageCacheToUse, indexFile, layout, tentativePageSize, monitor, headerReader,
headerWriter );
}
}

Expand Down

0 comments on commit 8ebde02

Please sign in to comment.