Skip to content

Commit

Permalink
Never checkpoint GBPTree on close
Browse files Browse the repository at this point in the history
Checkpoint will increment stable and unstable generation of tree,
making it impossible to recognise crash-pointers during the recovery
following the database crash.

Responsibility for checkpointing is completely left to the system
so GBPTree will only checkpoint when rest of system is checkpointing.

First attempt at this prevented checkpoint on close only when system
saw panic, but that did not guard for some failures during recovery
that did not trigger panic.
  • Loading branch information
burqen committed Mar 29, 2017
1 parent f62546c commit 831f69e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 61 deletions.
Expand Up @@ -290,7 +290,7 @@ default void recoveryCompleted( long numberOfPagesVisited, long numberOfCleanedC

/**
* Whether or not this tree has been closed. Accessed and changed solely in
* {@link #close()} and {@link #close(Consumer)} to be able to close tree multiple times gracefully.
* {@link #close()} to be able to close tree multiple times gracefully.
*/
private boolean closed;

Expand Down Expand Up @@ -734,30 +734,14 @@ private void checkpoint( IOLimiter ioLimiter, Header.Writer headerWriter ) throw
}

/**
* Closes this tree and its associated resources. A {@link #checkpoint(IOLimiter)} is first performed
* as part of this call if there have been changes since last call to {@link #checkpoint(IOLimiter)}
* or since opening this tree.
* Closes this tree and its associated resources.
* NOTE: No {@link #checkpoint(IOLimiter) checkpoint} is performed. To make data persistent in store
* a checkpoint needs to be performed manually.
*
* @throws IOException on error either checkpointing or closing resources.
* @throws IOException on error closing resources.
*/
@Override
public void close() throws IOException
{
close( CARRY_OVER_PREVIOUS_HEADER );
}

/**
* Closes the {@link GBPTree} also writing header using the supplied writer.
*
* @param headerWriter hook for writing header data.
* @throws IOException on error either checkpointing or closing resources.
*/
public void close( Consumer<PageCursor> headerWriter ) throws IOException
{
close( replace( headerWriter ) );
}

private void close( Header.Writer headerWriter ) throws IOException
{
writerCheckpointMutex.lock();
try
Expand All @@ -767,19 +751,10 @@ private void close( Header.Writer headerWriter ) throws IOException
return;
}

try
{
writer.close();

// Perform a checkpoint before closing. If no changes has happened since last checkpoint,
// no new checkpoint will be created.
checkpoint( IOLimiter.unlimited(), headerWriter );
}
finally
{
closed = true;
pagedFile.close();
}
closed = true;
// Force close on writer to not risk deadlock on pagedFile.close()
writer.close();
pagedFile.close();
}
finally
{
Expand Down
Expand Up @@ -28,10 +28,12 @@

import java.io.File;
import java.io.IOException;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -40,6 +42,8 @@
import org.neo4j.collection.primitive.PrimitiveLongSet;
import org.neo4j.cursor.RawCursor;
import org.neo4j.index.internal.gbptree.GBPTree.Monitor;
import org.neo4j.io.pagecache.DelegatingPageCache;
import org.neo4j.io.pagecache.DelegatingPagedFile;
import org.neo4j.io.pagecache.IOLimiter;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.PageCursor;
Expand Down Expand Up @@ -99,6 +103,13 @@ private GBPTree<MutableLong,MutableLong> createIndex( int pageSize, Monitor moni
0/*use whatever page cache says*/, monitor, headerReader );
}

private GBPTree<MutableLong,MutableLong> createIndex( PageCache pageCache ) throws IOException
{
this.pageCache = pageCache;
return index = new GBPTree<>( pageCache, indexFile, layout, 0 /*use whatever page cache says*/,
NO_MONITOR, NO_HEADER );
}

private PageCache createPageCache( int pageSize )
{
return pageCacheRule.getPageCache( fs.get(), config().withPageSize( pageSize ) );
Expand Down Expand Up @@ -471,7 +482,8 @@ public void shouldPutHeaderDataInCheckPoint() throws Exception
byte[] expectedHeader = new byte[12];
ThreadLocalRandom.current().nextBytes( expectedHeader );
index = createIndex( 256 );
index.close( cursor -> cursor.putBytes( expectedHeader ) );
index.checkpoint( IOLimiter.unlimited(), cursor -> cursor.putBytes( expectedHeader ) );
index.close();

// WHEN
byte[] readHeader = new byte[expectedHeader.length];
Expand Down Expand Up @@ -514,7 +526,8 @@ public void shouldReplaceHeaderDataInNextCheckPoint() throws Exception
index = createIndex( 256 );
index.checkpoint( IOLimiter.unlimited(), cursor -> cursor.putBytes( expectedHeader ) );
ThreadLocalRandom.current().nextBytes( expectedHeader );
index.close( cursor -> cursor.putBytes( expectedHeader ) );
index.checkpoint( IOLimiter.unlimited(), cursor -> cursor.putBytes( expectedHeader ) );
index.close();

// WHEN
byte[] readHeader = new byte[expectedHeader.length];
Expand Down Expand Up @@ -586,19 +599,21 @@ public void checkPointShouldWaitForWriter() throws Exception
public void closeShouldLockOutWriter() throws Exception
{
// GIVEN
CheckpointControlledMonitor monitor = new CheckpointControlledMonitor();
index = createIndex( 1024, monitor );
AtomicBoolean enabled = new AtomicBoolean();
Barrier.Control barrier = new Barrier.Control();
PageCache pageCacheWithBarrier = pageCacheWithBarrierInClose( enabled, barrier );
index = createIndex( pageCacheWithBarrier );
long key = 10;
try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{
writer.put( new MutableLong( key ), new MutableLong( key ) );
}

// WHEN
monitor.enabled = true;
enabled.set( true );
Thread closer = new Thread( throwing( () -> index.close() ) );
closer.start();
monitor.barrier.awaitUninterruptibly();
barrier.awaitUninterruptibly();
// now we're in the smack middle of a close/checkpoint
AtomicReference<Exception> writerError = new AtomicReference<>();
Thread t2 = new Thread( () ->
Expand All @@ -616,7 +631,7 @@ public void closeShouldLockOutWriter() throws Exception
t2.start();
t2.join( 200 );
assertTrue( Arrays.toString( closer.getStackTrace() ), t2.isAlive() );
monitor.barrier.release();
barrier.release();

// THEN
t2.join();
Expand All @@ -625,6 +640,29 @@ public void closeShouldLockOutWriter() throws Exception
index = null;
}

private PageCache pageCacheWithBarrierInClose( final AtomicBoolean enabled, final Barrier.Control barrier )
{
return new DelegatingPageCache( createPageCache( 1024 ) )
{
@Override
public PagedFile map( File file, int pageSize, OpenOption... openOptions ) throws IOException
{
return new DelegatingPagedFile( super.map( file, pageSize, openOptions ) )
{
@Override
public void close() throws IOException
{
if ( enabled.get() )
{
barrier.reached();
}
super.close();
}
};
}
};
}

@Test
public void closeShouldWaitForWriter() throws Exception
{
Expand Down Expand Up @@ -744,25 +782,6 @@ public void shouldCheckpointAfterInitialCreation() throws Exception
assertEquals( 1, checkpointCounter.count );
}

@Test
public void shouldCheckpointOnCloseAfterChangesHappened() throws Exception
{
// GIVEN
CheckpointCounter checkpointCounter = new CheckpointCounter();

// WHEN
GBPTree<MutableLong,MutableLong> index = createIndex( 256, checkpointCounter );
int countBefore = checkpointCounter.count;
try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{
writer.put( new MutableLong( 0 ), new MutableLong( 1 ) );
}

// THEN
closeIndex();
assertEquals( countBefore + 1, checkpointCounter.count );
}

@Test
public void shouldNotCheckpointOnCloseIfNoChangesHappened() throws Exception
{
Expand Down
Expand Up @@ -19,6 +19,7 @@
*/
package org.neo4j.unsafe.batchinsert.internal;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
Expand Down Expand Up @@ -48,6 +49,7 @@
import org.neo4j.helpers.collection.IteratorWrapper;
import org.neo4j.helpers.collection.Iterators;
import org.neo4j.helpers.collection.Visitor;
import org.neo4j.io.IOUtils;
import org.neo4j.io.fs.FileSystemAbstraction;
import org.neo4j.io.pagecache.PageCache;
import org.neo4j.io.pagecache.tracing.PageCacheTracer;
Expand Down Expand Up @@ -217,6 +219,7 @@ public Label apply( long from )
private final Locks.Client noopLockClient = new NoOpClient();
private final long maxNodeId;
private final RecordCursors cursors;
private final List<Closeable> resources = new ArrayList<>();

public BatchInserterImpl( final File storeDir, final FileSystemAbstraction fileSystem,
Map<String, String> stringParams, Iterable<KernelExtensionFactory<?>> kernelExtensions ) throws IOException
Expand Down Expand Up @@ -496,6 +499,7 @@ private void repopulateAllIndexes() throws IOException, IndexEntryConflictExcept
.toArray();

InitialNodeLabelCreationVisitor labelUpdateVisitor = new InitialNodeLabelCreationVisitor();
resources.add( labelUpdateVisitor );
StoreScan<IOException> storeScan = indexStoreView.visitNodes( labelIds,
(propertyKeyId) -> PrimitiveIntCollections.contains( propertyKeyIds, propertyKeyId ),
propertyUpdateVisitor, labelUpdateVisitor, true );
Expand Down Expand Up @@ -524,7 +528,7 @@ private void rebuildCounts()
CountsComputer.recomputeCounts( neoStores );
}

private class InitialNodeLabelCreationVisitor implements Visitor<NodeLabelUpdate, IOException>
private class InitialNodeLabelCreationVisitor implements Visitor<NodeLabelUpdate, IOException>, Closeable
{
LabelScanWriter writer = labelScanStore.newWriter();

Expand All @@ -535,6 +539,7 @@ public boolean visit( NodeLabelUpdate update ) throws IOException
return true;
}

@Override
public void close() throws IOException
{
writer.close();
Expand Down Expand Up @@ -967,6 +972,7 @@ public void shutdown()
{
cursors.close();
neoStores.close();
IOUtils.closeAll( RuntimeException.class, resources.toArray( new Closeable[resources.size()] ) );

try
{
Expand Down

0 comments on commit 831f69e

Please sign in to comment.