From 0de2f0f9fda8db0bdebabeb973df1b4a7aa7002b Mon Sep 17 00:00:00 2001 From: Anton Persson Date: Mon, 3 Apr 2017 10:38:14 +0200 Subject: [PATCH] Don't overwrite dirty state and writer use correct lock - GBPTree leave state 'dirty' on close if it was not cleaned. - GBPTree.SingleWriter now keeps track of which lock it uses and unlock correct one. - Testing lock for recovery cleanup. --- .../neo4j/index/internal/gbptree/GBPTree.java | 27 +++- .../index/internal/gbptree/GBPTreeTest.java | 147 ++++++++++++++++-- 2 files changed, 160 insertions(+), 14 deletions(-) diff --git a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java index fee1e81135168..6acd1c1704ac8 100644 --- a/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java +++ b/community/index/src/main/java/org/neo4j/index/internal/gbptree/GBPTree.java @@ -319,6 +319,11 @@ public void startupState( boolean clean ) */ private boolean clean; + /** + * Don't overwrite 'dirty' bit in header on close if tree still needs cleaning. + */ + private volatile boolean needsCleaning; + /** * Clean jobs part of recovery is posted here. */ @@ -390,7 +395,7 @@ public GBPTree( PageCache pageCache, File indexFile, Layout layout, i this.monitor.startupState( clean ); // Prepare tree for action - boolean needsCleaning = !clean; + needsCleaning = !clean; clean = false; bumpUnstableGeneration(); forceState(); @@ -850,7 +855,7 @@ public void close() throws IOException private void internalIndexClose() throws IOException { - if ( !changesSinceLastCheckpoint ) + if ( !changesSinceLastCheckpoint && !needsCleaning ) { clean = true; forceState(); @@ -941,6 +946,7 @@ private void cleanCrashedPointers() throws IOException try { crashGenerationCleaner.clean(); + needsCleaning = false; } catch ( Throwable e ) { @@ -1016,6 +1022,7 @@ private class SingleWriter implements Writer private final InternalTreeLogic treeLogic; private final StructurePropagation structurePropagation; private PageCursor cursor; + private Lock readLock; // Writer can't live past a checkpoint because of the mutex with checkpoint, // therefore safe to locally cache these generation fields from the volatile generation in the tree @@ -1056,9 +1063,10 @@ void initialize() throws IOException } boolean success = false; - writerCheckpointMutex.readLock().lock(); try { + readLock = writerCheckpointMutex.readLock(); + readLock.lock(); cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK ); stableGeneration = stableGeneration( generation ); unstableGeneration = unstableGeneration( generation ); @@ -1076,7 +1084,7 @@ void initialize() throws IOException { closeCursor(); writerTaken.set( false ); - writerCheckpointMutex.readLock().unlock(); + releaseLock(); } } } @@ -1164,7 +1172,16 @@ public void close() throws IOException ", but writer is already closed." ); } closeCursor(); - writerCheckpointMutex.readLock().unlock(); + releaseLock(); + } + + private void releaseLock() + { + if ( readLock != null ) + { + readLock.unlock(); + readLock = null; + } } private void closeCursor() diff --git a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java index 5c698b23afb56..5938fedc64604 100644 --- a/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java +++ b/community/index/src/test/java/org/neo4j/index/internal/gbptree/GBPTreeTest.java @@ -585,7 +585,7 @@ private void verifyHeaderDataAfterClose( BiConsumer writerClose = executor.submit( throwing( () -> index.writer().close() ) ); // THEN - wait( writerClose ); + shouldWait( writerClose ); monitor.barrier.release(); writerClose.get(); @@ -624,7 +624,7 @@ public void checkPointShouldWaitForWriter() throws Exception { // WHEN Barrier.Control barrier = new Barrier.Control(); - Future write = executor.submit( throwing( () -> + Future write = executor.submit( throwing( () -> { try ( Writer writer = index.writer() ) { @@ -634,7 +634,7 @@ public void checkPointShouldWaitForWriter() throws Exception } ) ); barrier.awaitUninterruptibly(); Future checkpoint = executor.submit( throwing( () -> index.checkpoint( unlimited() ) ) ); - wait( checkpoint ); + shouldWait( checkpoint ); // THEN barrier.release(); @@ -675,7 +675,7 @@ public void closeShouldLockOutWriter() throws Exception } } ); - wait( write ); + shouldWait( write ); barrier.release(); // THEN @@ -709,7 +709,7 @@ public void close() throws IOException } @Test( timeout = 5_000L ) - public void closeShouldWaitForWriter() throws Exception + public void writerShouldLockOutClose() throws Exception { // GIVEN GBPTree index = index().build(); @@ -726,7 +726,7 @@ public void closeShouldWaitForWriter() throws Exception } ) ); barrier.awaitUninterruptibly(); Future close = executor.submit( throwing( index::close ) ); - wait( close ); + shouldWait( close ); // THEN barrier.release(); @@ -734,6 +734,117 @@ public void closeShouldWaitForWriter() throws Exception write.get(); } + @Test( timeout = 5_000L ) + public void cleanJobShouldLockOutCheckpoint() throws Exception + { + // GIVEN + try ( GBPTree index = index().build() ) + { + // Make dirty + index.writer().close(); + } + + RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + CleanJobControlledMonitor monitor = new CleanJobControlledMonitor(); + try ( GBPTree index = index().with( monitor ).with( cleanupWork ).build() ) + { + // WHEN + // Cleanup not finished + Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + monitor.barrier.awaitUninterruptibly(); + index.writer().close(); + + // THEN + Future checkpoint = executor.submit( throwing( () -> index.checkpoint( IOLimiter.unlimited() ) ) ); + shouldWait( checkpoint ); + + monitor.barrier.release(); + cleanup.get(); + checkpoint.get(); + } + } + + @Test( timeout = 5_000L ) + public void cleanJobShouldLockOutClose() throws Exception + { + // GIVEN + try ( GBPTree index = index().build() ) + { + // Make dirty + index.writer().close(); + } + + RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + CleanJobControlledMonitor monitor = new CleanJobControlledMonitor(); + GBPTree index = index().with( monitor ).with( cleanupWork ).build(); + + // WHEN + // Cleanup not finished + Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + monitor.barrier.awaitUninterruptibly(); + + // THEN + Future close = executor.submit( throwing( index::close ) ); + shouldWait( close ); + + monitor.barrier.release(); + cleanup.get(); + close.get(); + } + + @Test( timeout = 5_000L ) + public void cleanJobShouldNotLockOutWriter() throws Exception + { + // GIVEN + try ( GBPTree index = index().build() ) + { + // Make dirty + index.writer().close(); + } + + RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + CleanJobControlledMonitor monitor = new CleanJobControlledMonitor(); + try ( GBPTree index = index().with( monitor ).with( cleanupWork ).build() ) + { + // WHEN + // Cleanup not finished + Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + monitor.barrier.awaitUninterruptibly(); + + // THEN + Future writer = executor.submit( throwing( () -> index.writer().close() ) ); + writer.get(); + + monitor.barrier.release(); + cleanup.get(); + } + } + + @Test + public void writerShouldNotLockOutCleanJob() throws Exception + { + // GIVEN + try ( GBPTree index = index().build() ) + { + // Make dirty + index.writer().close(); + } + + RecoveryCleanupWorkCollector cleanupWork = new GroupingRecoveryCleanupWorkCollector(); + try ( GBPTree index = index().with( cleanupWork ).build() ) + { + // WHEN + try ( Writer writer = index.writer() ) + { + // THEN + Future cleanup = executor.submit( throwing( cleanupWork::run ) ); + // Move writer to let cleaner pass + writer.put( new MutableLong( 1 ), new MutableLong( 1 ) ); + cleanup.get(); + } + } + } + /* Insertion and read tests */ @Test @@ -1209,7 +1320,7 @@ private void insert( GBPTree index, long key, long valu } } - private void wait( Future future ) throws InterruptedException, ExecutionException + private void shouldWait( Future future )throws InterruptedException, ExecutionException { try { @@ -1240,6 +1351,7 @@ private class GBPTreeBuilder private Header.Reader headerReader = NO_HEADER; private Layout layout = GBPTreeTest.layout; private PageCache specificPageCache; + private RecoveryCleanupWorkCollector recoveryCleanupWorkCollector = RecoveryCleanupWorkCollector.IMMEDIATE; private GBPTreeBuilder withPageCachePageSize( int pageSize ) { @@ -1277,6 +1389,12 @@ private GBPTreeBuilder with( PageCache pageCache ) return this; } + private GBPTreeBuilder with( RecoveryCleanupWorkCollector recoveryCleanupWorkCollector ) + { + this.recoveryCleanupWorkCollector = recoveryCleanupWorkCollector; + return this; + } + private GBPTree build() throws IOException { PageCache pageCacheToUse; @@ -1295,7 +1413,18 @@ private GBPTree build() throws IOException } return new GBPTree<>( pageCacheToUse, indexFile, layout, tentativePageSize, monitor, headerReader, - RecoveryCleanupWorkCollector.IMMEDIATE ); + recoveryCleanupWorkCollector ); + } + } + + private static class CleanJobControlledMonitor implements Monitor + { + private final Barrier.Control barrier = new Barrier.Control(); + + @Override + public void cleanupFinished( long numberOfPagesVisited, long numberOfCleanedCrashPointers, long durationMillis ) + { + barrier.reached(); } }