Skip to content

Commit

Permalink
GBPTree checkpoint/close can write header data
Browse files Browse the repository at this point in the history
Which is stored in the selected state page, after the other state information.
Checkpoint/close methods come in two variants, one with header writer and one w/o.
The one with header writer will replace the header data with new data,
whereas the variants w/o will keep/carry over the previous header data to
the new state page.

When opening a GBPTree the (newly introduced) supplied header reader can access
the header data.
  • Loading branch information
tinwelint committed Mar 6, 2017
1 parent 8f2d5cc commit d1a2593
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 31 deletions.
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;

Expand All @@ -46,6 +47,8 @@
import static org.neo4j.index.internal.gbptree.Generation.generation;
import static org.neo4j.index.internal.gbptree.Generation.stableGeneration;
import static org.neo4j.index.internal.gbptree.Generation.unstableGeneration;
import static org.neo4j.index.internal.gbptree.Header.CARRY_OVER_PREVIOUS_HEADER;
import static org.neo4j.index.internal.gbptree.Header.replace;
import static org.neo4j.index.internal.gbptree.PageCursorUtil.checkOutOfBounds;

/**
Expand Down Expand Up @@ -161,6 +164,11 @@ default void noStoreFile()
{ // does nothing
};

/**
* No-op header reader.
*/
public static final Header.Reader NO_HEADER = (cursor,length) -> {};

/**
* Paged file in a {@link PageCache} providing the means of storage.
*/
Expand Down Expand Up @@ -282,10 +290,12 @@ default void noStoreFile()
* @param tentativePageSize page size, i.e. tree node size. Must be less than or equal to that of the page cache.
* A pageSize of {@code 0} means to use whatever the page cache has (at creation)
* @param monitor {@link Monitor} for monitoring {@link GBPTree}.
* @param headerReader reads header data, previously written using {@link #checkpoint(IOLimiter, Consumer)}
* or {@link #close()}
* @throws IOException on page cache error
*/
public GBPTree( PageCache pageCache, File indexFile, Layout<KEY,VALUE> layout, int tentativePageSize,
Monitor monitor ) throws IOException
Monitor monitor, Header.Reader headerReader ) throws IOException
{
this.indexFile = indexFile;
this.monitor = monitor;
Expand All @@ -306,7 +316,7 @@ public GBPTree( PageCache pageCache, File indexFile, Layout<KEY,VALUE> layout, i
}
else
{
loadState( pagedFile );
loadState( pagedFile, headerReader );
}
}
catch ( Throwable t )
Expand Down Expand Up @@ -396,10 +406,21 @@ private PagedFile openOrCreate( PageCache pageCache, File indexFile,
}
}

private void loadState( PagedFile pagedFile ) throws IOException
private void loadState( PagedFile pagedFile, Header.Reader headerReader ) throws IOException
{
Pair<TreeState,TreeState> states = readStatePages( pagedFile );
TreeState state = TreeStatePair.selectNewestValidState( states );
try ( PageCursor cursor = pagedFile.io( state.pageId(), PagedFile.PF_SHARED_READ_LOCK ) )
{
PageCursorUtil.goTo( cursor, "header data", state.pageId() );
do
{
TreeState.read( cursor );
int length = cursor.getInt();
headerReader.read( cursor, length );
}
while ( cursor.shouldRetry() );
}
generation = Generation.generation( state.stableGeneration(), state.unstableGeneration() );
setRoot( state.rootId(), state.rootGen() );

Expand All @@ -411,7 +432,7 @@ private void loadState( PagedFile pagedFile ) throws IOException
freeList.initialize( lastId, freeListWritePageId, freeListReadPageId, freeListWritePos, freeListReadPos );
}

private void writeState( PagedFile pagedFile ) throws IOException
private void writeState( PagedFile pagedFile, Header.Writer headerWriter ) throws IOException
{
Pair<TreeState,TreeState> states = readStatePages( pagedFile );
TreeState oldestState = TreeStatePair.selectOldestOrInvalid( states );
Expand All @@ -424,10 +445,43 @@ private void writeState( PagedFile pagedFile ) throws IOException
root.id(), root.generation(),
freeList.lastId(), freeList.writePageId(), freeList.readPageId(),
freeList.writePos(), freeList.readPos() );

// Write/carry over header
int headerOffset = cursor.getOffset();
int headerDataOffset = headerOffset + Integer.BYTES; // will contain length of written header data (below)
TreeState otherState = other( states, oldestState );
if ( otherState.isValid() )
{
PageCursor previousCursor = pagedFile.io( otherState.pageId(), PagedFile.PF_SHARED_READ_LOCK );
PageCursorUtil.goTo( previousCursor, "previous state page", otherState.pageId() );
do
{
// Place the previous state cursor after state data
TreeState.read( previousCursor );
// Read length of previous header
int previousLength = previousCursor.getInt();
// Reserve space to store length
cursor.setOffset( headerDataOffset );
// Write
headerWriter.write( previousCursor, previousLength, cursor );
}
while ( previousCursor.shouldRetry() );
checkOutOfBounds( previousCursor );
checkOutOfBounds( cursor );

int length = cursor.getOffset() - headerDataOffset;
cursor.putInt( headerOffset, length );
}

checkOutOfBounds( cursor );
}
}

private static TreeState other( Pair<TreeState,TreeState> states, TreeState state )
{
return states.getLeft() == state ? states.getRight() : states.getLeft();
}

private static Pair<TreeState,TreeState> readStatePages( PagedFile pagedFile ) throws IOException
{
Pair<TreeState,TreeState> states;
Expand Down Expand Up @@ -590,11 +644,30 @@ public RawCursor<Hit<KEY,VALUE>,IOException> seek( KEY fromInclusive, KEY toExcl
* since last call to {@link #checkpoint(IOLimiter)} or since opening this tree.
*
* @param ioLimiter for controlling I/O usage.
* @param headerWriter hook for writing header data.
* @throws IOException on error flushing to storage.
*/
public void checkpoint( IOLimiter ioLimiter, Consumer<PageCursor> headerWriter ) throws IOException
{
checkpoint( ioLimiter, replace( headerWriter ) );
}

/**
* Performs a {@link #checkpoint(IOLimiter, Consumer) check point}, keeping any header information
* written in previous check point.
*
* @param ioLimiter for controlling I/O usage.
* @throws IOException on error flushing to storage.
* @see #checkpoint(IOLimiter, Consumer)
*/
public void checkpoint( IOLimiter ioLimiter ) throws IOException
{
if ( !changesSinceLastCheckpoint )
checkpoint( ioLimiter, CARRY_OVER_PREVIOUS_HEADER );
}

private void checkpoint( IOLimiter ioLimiter, Header.Writer headerWriter ) throws IOException
{
if ( !changesSinceLastCheckpoint && headerWriter == CARRY_OVER_PREVIOUS_HEADER )
{
// No changes has happened since last checkpoint was called, no need to do another checkpoint
return;
Expand All @@ -618,7 +691,7 @@ public void checkpoint( IOLimiter ioLimiter ) throws IOException
// and write the tree state (rootId, lastId, generation a.s.o.) to state page.
long unstableGeneration = unstableGeneration( generation );
generation = Generation.generation( unstableGeneration, unstableGeneration + 1 );
writeState( pagedFile );
writeState( pagedFile, headerWriter );

// Flush the state page.
pagedFile.flushAndForce();
Expand Down Expand Up @@ -646,14 +719,30 @@ public void checkpoint( IOLimiter ioLimiter ) throws IOException
*/
@Override
public void close() throws IOException
{
close( CARRY_OVER_PREVIOUS_HEADER );
}

/**
* Closes the {@link GBPTree} also writing header using the supplied writer.
*
* @param headerWriter hook for writing header data.
* @throws IOException on error either checkpointing or closing resources.
*/
public void close( Consumer<PageCursor> headerWriter ) throws IOException
{
close( replace( headerWriter ) );
}

private void close( Header.Writer headerWriter ) throws IOException
{
writer.close();

try
{
// Perform a checkpoint before closing. If no changes has happened since last checkpoint,
// no new checkpoint will be created.
checkpoint( IOLimiter.unlimited() );
checkpoint( IOLimiter.unlimited(), headerWriter );
}
finally
{
Expand Down Expand Up @@ -732,7 +821,7 @@ public void prepareForRecovery() throws IOException
// Increment unstable generation, widening the gap between stable and unstable generation
// so that generations in between are considered crash generation(s).
generation = generation( stableGeneration( generation ), unstableGeneration( generation ) + 1 );
writeState( pagedFile );
writeState( pagedFile, CARRY_OVER_PREVIOUS_HEADER );
pagedFile.flushAndForce();
}

Expand Down
@@ -0,0 +1,79 @@
/*
* Copyright (c) 2002-2017 "Neo Technology,"
* Network Engine for Objects in Lund AB [http://neotechnology.com]
*
* This file is part of Neo4j.
*
* Neo4j is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package org.neo4j.index.internal.gbptree;

import java.util.function.Consumer;

import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;

/**
* Defines interfaces and common implementations of header reader/writer for {@link GBPTree}.
*/
public class Header
{
/**
* Writes a header into a {@link GBPTree} state page during
* {@link GBPTree#checkpoint(org.neo4j.io.pagecache.IOLimiter)}.
*/
public interface Writer
{
/**
* Writes header data into {@code to} with previous valid header data found in {@code from} of {@code length}
* bytes in size.
*
* @param from {@link PageCursor} positioned at the header data written in the previous check point.
* @param length size in bytes of the previous header data.
* @param to {@link PageCursor} to write new header into.
*/
void write( PageCursor from, int length, PageCursor to );
}

final Consumer<PageCursor> CARRY_OVER = cursor -> {};

static final Writer CARRY_OVER_PREVIOUS_HEADER = (from,length,to) ->
{
from.copyTo( from.getOffset(), to, to.getOffset(), length );
};

static Writer replace( Consumer<PageCursor> writer )
{
// Discard the previous state, just write the new
return (from,length,to) -> writer.accept( to );
}

/**
* Reads a header from a {@link GBPTree} state page during opening it.
*/
public interface Reader
{
/**
* Called when it's time to read header data from the most up to date and valid state page.
* Due to the nature of the underlying {@link PageCache} this method may be called several times,
* some times with invalid data in the {@link PageCursor}. Because of this there mustn't be any
* exceptions thrown or decisions made based on the read data until the GBPTree constructor has been
* completely executed.
*
* @param from {@link PageCursor} positioned at beginning of the header data to read.
* @param length number of bytes available to read in the header data.
*/
void read( PageCursor from, int length );
}
}
Expand Up @@ -20,7 +20,6 @@
package org.neo4j.index.internal.gbptree;

import java.util.Objects;

import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;

Expand Down
Expand Up @@ -49,6 +49,8 @@
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -96,7 +98,7 @@ public void shouldDetectFormatChange() throws Throwable
// THEN everything should work, otherwise there has likely been a format change
PageCache pageCache = pageCacheRule.getPageCache( fsRule.get() );
try ( GBPTree<MutableLong,MutableLong> tree =
new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0, NO_MONITOR ) )
new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER ) )
{
try
{
Expand Down Expand Up @@ -172,7 +174,7 @@ private void createAndZipTree( File storeFile ) throws IOException
{
PageCache pageCache = pageCacheRule.getPageCache( fsRule.get() );
try ( GBPTree<MutableLong,MutableLong> tree =
new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0, NO_MONITOR ) )
new GBPTree<>( pageCache, storeFile, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER ) )
{
MutableLong insertKey = new MutableLong();
MutableLong insertValue = new MutableLong();
Expand Down
Expand Up @@ -60,6 +60,8 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.rules.RuleChain.outerRule;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -105,7 +107,7 @@ private GBPTree<MutableLong,MutableLong> createIndex( GBPTree.Monitor monitor )
PageCache pageCache =
pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ).withAccessChecks( true ) );
return index = new GBPTree<>( pageCache, directory.file( "index" ),
layout, 0/*use whatever page cache says*/, monitor );
layout, 0/*use whatever page cache says*/, monitor, NO_HEADER );
}

@After
Expand Down Expand Up @@ -219,7 +221,7 @@ private class TestCoordinator implements Supplier<ReaderInstruction>
// Instructions for reader
private final boolean forwardsSeek;
private final double writePercentage;
private AtomicReference<ReaderInstruction> currentReaderInstruction;
private final AtomicReference<ReaderInstruction> currentReaderInstruction;
TreeSet<Long> readersShouldSee;

// Progress
Expand Down
Expand Up @@ -45,6 +45,8 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.rules.RuleChain.outerRule;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.test.rule.PageCacheRule.config;

Expand Down Expand Up @@ -74,7 +76,7 @@ private GBPTree<MutableLong,MutableLong> createIndex( int pageSize, GBPTree.Moni
{
pageCache = pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ).withAccessChecks( true ) );
return index = new GBPTree<>( pageCache, directory.file( "index" ),
layout, 0/*use whatever page cache says*/, monitor );
layout, 0/*use whatever page cache says*/, monitor, NO_HEADER );
}

@After
Expand Down
Expand Up @@ -44,6 +44,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.rules.RuleChain.outerRule;

import static org.neo4j.index.internal.gbptree.GBPTree.NO_HEADER;
import static org.neo4j.index.internal.gbptree.GBPTree.NO_MONITOR;
import static org.neo4j.index.internal.gbptree.ThrowingRunnable.throwing;
import static org.neo4j.io.pagecache.IOLimiter.unlimited;
Expand Down Expand Up @@ -363,7 +364,7 @@ private long[] modificationData( int min, int max )

private static GBPTree<MutableLong,MutableLong> createIndex( PageCache pageCache, File file ) throws IOException
{
return new GBPTree<>( pageCache, file, new SimpleLongLayout(), 0, NO_MONITOR );
return new GBPTree<>( pageCache, file, new SimpleLongLayout(), 0, NO_MONITOR, NO_HEADER );
}

private PageCache createPageCache()
Expand Down

0 comments on commit d1a2593

Please sign in to comment.