Skip to content

Commit

Permalink
Don't overwrite dirty state and writer use correct lock
Browse files Browse the repository at this point in the history
- GBPTree leave state 'dirty' on close if it was not cleaned.
- GBPTree.SingleWriter now keeps track of which lock it uses
  and unlock correct one.
- Testing lock for recovery cleanup.
  • Loading branch information
burqen committed May 18, 2017
1 parent 18241e8 commit 0de2f0f
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 14 deletions.
Expand Up @@ -319,6 +319,11 @@ public void startupState( boolean clean )
*/ */
private boolean clean; private boolean clean;


/**
* Don't overwrite 'dirty' bit in header on close if tree still needs cleaning.
*/
private volatile boolean needsCleaning;

/** /**
* Clean jobs part of recovery is posted here. * Clean jobs part of recovery is posted here.
*/ */
Expand Down Expand Up @@ -390,7 +395,7 @@ public GBPTree( PageCache pageCache, File indexFile, Layout<KEY,VALUE> layout, i
this.monitor.startupState( clean ); this.monitor.startupState( clean );


// Prepare tree for action // Prepare tree for action
boolean needsCleaning = !clean; needsCleaning = !clean;
clean = false; clean = false;
bumpUnstableGeneration(); bumpUnstableGeneration();
forceState(); forceState();
Expand Down Expand Up @@ -850,7 +855,7 @@ public void close() throws IOException


private void internalIndexClose() throws IOException private void internalIndexClose() throws IOException
{ {
if ( !changesSinceLastCheckpoint ) if ( !changesSinceLastCheckpoint && !needsCleaning )
{ {
clean = true; clean = true;
forceState(); forceState();
Expand Down Expand Up @@ -941,6 +946,7 @@ private void cleanCrashedPointers() throws IOException
try try
{ {
crashGenerationCleaner.clean(); crashGenerationCleaner.clean();
needsCleaning = false;
} }
catch ( Throwable e ) catch ( Throwable e )
{ {
Expand Down Expand Up @@ -1016,6 +1022,7 @@ private class SingleWriter implements Writer<KEY,VALUE>
private final InternalTreeLogic<KEY,VALUE> treeLogic; private final InternalTreeLogic<KEY,VALUE> treeLogic;
private final StructurePropagation<KEY> structurePropagation; private final StructurePropagation<KEY> structurePropagation;
private PageCursor cursor; private PageCursor cursor;
private Lock readLock;


// Writer can't live past a checkpoint because of the mutex with checkpoint, // Writer can't live past a checkpoint because of the mutex with checkpoint,
// therefore safe to locally cache these generation fields from the volatile generation in the tree // therefore safe to locally cache these generation fields from the volatile generation in the tree
Expand Down Expand Up @@ -1056,9 +1063,10 @@ void initialize() throws IOException
} }


boolean success = false; boolean success = false;
writerCheckpointMutex.readLock().lock();
try try
{ {
readLock = writerCheckpointMutex.readLock();
readLock.lock();
cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK ); cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK );
stableGeneration = stableGeneration( generation ); stableGeneration = stableGeneration( generation );
unstableGeneration = unstableGeneration( generation ); unstableGeneration = unstableGeneration( generation );
Expand All @@ -1076,7 +1084,7 @@ void initialize() throws IOException
{ {
closeCursor(); closeCursor();
writerTaken.set( false ); writerTaken.set( false );
writerCheckpointMutex.readLock().unlock(); releaseLock();
} }
} }
} }
Expand Down Expand Up @@ -1164,7 +1172,16 @@ public void close() throws IOException
", but writer is already closed." ); ", but writer is already closed." );
} }
closeCursor(); closeCursor();
writerCheckpointMutex.readLock().unlock(); releaseLock();
}

private void releaseLock()
{
if ( readLock != null )
{
readLock.unlock();
readLock = null;
}
} }


private void closeCursor() private void closeCursor()
Expand Down
Expand Up @@ -585,7 +585,7 @@ private void verifyHeaderDataAfterClose( BiConsumer<GBPTree<MutableLong,MutableL
assertArrayEquals( expectedHeader, readHeader ); assertArrayEquals( expectedHeader, readHeader );
} }


/* Check-pointing tests */ /* Mutex tests */


@Test( timeout = 5_000L ) @Test( timeout = 5_000L )
public void checkPointShouldLockOutWriter() throws Exception public void checkPointShouldLockOutWriter() throws Exception
Expand All @@ -608,7 +608,7 @@ public void checkPointShouldLockOutWriter() throws Exception
Future<?> writerClose = executor.submit( throwing( () -> index.writer().close() ) ); Future<?> writerClose = executor.submit( throwing( () -> index.writer().close() ) );


// THEN // THEN
wait( writerClose ); shouldWait( writerClose );
monitor.barrier.release(); monitor.barrier.release();


writerClose.get(); writerClose.get();
Expand All @@ -624,7 +624,7 @@ public void checkPointShouldWaitForWriter() throws Exception
{ {
// WHEN // WHEN
Barrier.Control barrier = new Barrier.Control(); Barrier.Control barrier = new Barrier.Control();
Future<?> write = executor.submit( throwing( () -> Future<?> write = executor.submit( throwing( () ->
{ {
try ( Writer<MutableLong,MutableLong> writer = index.writer() ) try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{ {
Expand All @@ -634,7 +634,7 @@ public void checkPointShouldWaitForWriter() throws Exception
} ) ); } ) );
barrier.awaitUninterruptibly(); barrier.awaitUninterruptibly();
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) ); Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) );
wait( checkpoint ); shouldWait( checkpoint );


// THEN // THEN
barrier.release(); barrier.release();
Expand Down Expand Up @@ -675,7 +675,7 @@ public void closeShouldLockOutWriter() throws Exception
} }
} ); } );


wait( write ); shouldWait( write );
barrier.release(); barrier.release();


// THEN // THEN
Expand Down Expand Up @@ -709,7 +709,7 @@ public void close() throws IOException
} }


@Test( timeout = 5_000L ) @Test( timeout = 5_000L )
public void closeShouldWaitForWriter() throws Exception public void writerShouldLockOutClose() throws Exception
{ {
// GIVEN // GIVEN
GBPTree<MutableLong,MutableLong> index = index().build(); GBPTree<MutableLong,MutableLong> index = index().build();
Expand All @@ -726,14 +726,125 @@ public void closeShouldWaitForWriter() throws Exception
} ) ); } ) );
barrier.awaitUninterruptibly(); barrier.awaitUninterruptibly();
Future<?> close = executor.submit( throwing( index::close ) ); Future<?> close = executor.submit( throwing( index::close ) );
wait( close ); shouldWait( close );


// THEN // THEN
barrier.release(); barrier.release();
close.get(); close.get();
write.get(); write.get();
} }


@Test( timeout = 5_000L )
public void cleanJobShouldLockOutCheckpoint() throws Exception
{
// GIVEN
try ( GBPTree<MutableLong,MutableLong> index = index().build() )
{
// Make dirty
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
CleanJobControlledMonitor monitor = new CleanJobControlledMonitor();
try ( GBPTree<MutableLong,MutableLong> index = index().with( monitor ).with( cleanupWork ).build() )
{
// WHEN
// Cleanup not finished
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
monitor.barrier.awaitUninterruptibly();
index.writer().close();

// THEN
Future<?> checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) );
shouldWait( checkpoint );

monitor.barrier.release();
cleanup.get();
checkpoint.get();
}
}

@Test( timeout = 5_000L )
public void cleanJobShouldLockOutClose() throws Exception
{
// GIVEN
try ( GBPTree<MutableLong,MutableLong> index = index().build() )
{
// Make dirty
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
CleanJobControlledMonitor monitor = new CleanJobControlledMonitor();
GBPTree<MutableLong,MutableLong> index = index().with( monitor ).with( cleanupWork ).build();

// WHEN
// Cleanup not finished
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
monitor.barrier.awaitUninterruptibly();

// THEN
Future<?> close = executor.submit( throwing( index::close ) );
shouldWait( close );

monitor.barrier.release();
cleanup.get();
close.get();
}

@Test( timeout = 5_000L )
public void cleanJobShouldNotLockOutWriter() throws Exception
{
// GIVEN
try ( GBPTree<MutableLong,MutableLong> index = index().build() )
{
// Make dirty
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
CleanJobControlledMonitor monitor = new CleanJobControlledMonitor();
try ( GBPTree<MutableLong,MutableLong> index = index().with( monitor ).with( cleanupWork ).build() )
{
// WHEN
// Cleanup not finished
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
monitor.barrier.awaitUninterruptibly();

// THEN
Future<?> writer = executor.submit( throwing( () -> index.writer().close() ) );
writer.get();

monitor.barrier.release();
cleanup.get();
}
}

@Test
public void writerShouldNotLockOutCleanJob() throws Exception
{
// GIVEN
try ( GBPTree<MutableLong,MutableLong> index = index().build() )
{
// Make dirty
index.writer().close();
}

RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector();
try ( GBPTree<MutableLong,MutableLong> index = index().with( cleanupWork ).build() )
{
// WHEN
try ( Writer<MutableLong,MutableLong> writer = index.writer() )
{
// THEN
Future<?> cleanup = executor.submit( throwing( cleanupWork::run ) );
// Move writer to let cleaner pass
writer.put( new MutableLong( 1 ), new MutableLong( 1 ) );
cleanup.get();
}
}
}

/* Insertion and read tests */ /* Insertion and read tests */


@Test @Test
Expand Down Expand Up @@ -1209,7 +1320,7 @@ private void insert( GBPTree<MutableLong,MutableLong> index, long key, long valu
} }
} }


private void wait( Future<?> future ) throws InterruptedException, ExecutionException private void shouldWait( Future<?> future )throws InterruptedException, ExecutionException
{ {
try try
{ {
Expand Down Expand Up @@ -1240,6 +1351,7 @@ private class GBPTreeBuilder
private Header.Reader headerReader = NO_HEADER; private Header.Reader headerReader = NO_HEADER;
private Layout<MutableLong,MutableLong> layout = GBPTreeTest.layout; private Layout<MutableLong,MutableLong> layout = GBPTreeTest.layout;
private PageCache specificPageCache; private PageCache specificPageCache;
private RecoveryCleanupWorkCollector recoveryCleanupWorkCollector = RecoveryCleanupWorkCollector.IMMEDIATE;


private GBPTreeBuilder withPageCachePageSize( int pageSize ) private GBPTreeBuilder withPageCachePageSize( int pageSize )
{ {
Expand Down Expand Up @@ -1277,6 +1389,12 @@ private GBPTreeBuilder with( PageCache pageCache )
return this; return this;
} }


private GBPTreeBuilder with( RecoveryCleanupWorkCollector recoveryCleanupWorkCollector )
{
this.recoveryCleanupWorkCollector = recoveryCleanupWorkCollector;
return this;
}

private GBPTree<MutableLong,MutableLong> build() throws IOException private GBPTree<MutableLong,MutableLong> build() throws IOException
{ {
PageCache pageCacheToUse; PageCache pageCacheToUse;
Expand All @@ -1295,7 +1413,18 @@ private GBPTree<MutableLong,MutableLong> build() throws IOException
} }


return new GBPTree<>( pageCacheToUse, indexFile, layout, tentativePageSize, monitor, headerReader, return new GBPTree<>( pageCacheToUse, indexFile, layout, tentativePageSize, monitor, headerReader,
RecoveryCleanupWorkCollector.IMMEDIATE ); recoveryCleanupWorkCollector );
}
}

private static class CleanJobControlledMonitor implements Monitor
{
private final Barrier.Control barrier = new Barrier.Control();

@Override
public void cleanupFinished( long numberOfPagesVisited, long numberOfCleanedCrashPointers, long durationMillis )
{
barrier.reached();
} }
} }


Expand Down

0 comments on commit 0de2f0f

Please sign in to comment.