Skip to content

Commit

Permalink
Close is not blocked by 'need clean'
Browse files Browse the repository at this point in the history
It is fine to close the GBPTree even if it still needs cleaning.
The cleaning job will fail because the pagedFile is will be
closed, but that is fine. It will just start over next time
the tree is started.
  • Loading branch information
burqen committed May 19, 2017
1 parent 074419d commit b2cbed1
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 37 deletions.
Expand Up @@ -28,7 +28,7 @@
import java.nio.file.StandardOpenOption; import java.nio.file.StandardOpenOption;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock; import java.util.concurrent.locks.StampedLock;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.function.LongSupplier; import java.util.function.LongSupplier;
Expand Down Expand Up @@ -249,12 +249,24 @@ public void startupState( boolean clean )
private volatile boolean changesSinceLastCheckpoint; private volatile boolean changesSinceLastCheckpoint;


/** /**
* Check-pointing flushes updates to stable storage. * There are a few different scenarios that involve writing or flushing that can not be happen concurrently:
* There's a critical section in check-pointing where, in order to guarantee a consistent check-pointed state * <ul>
* on stable storage, no writes are allowed to happen. * <li>Checkpoint and writing</li>
* For this reason both writer and check-pointing acquires this lock. * <li>Checkpoint and close</li>
* <li>Write and checkpoint</li>
* </ul>
* This lock is acquired as part of all of those tasks.
*/
private final Lock writerLock = new ReentrantLock();

/**
* If cleaning of crash pointers is needed the tree can not be allowed to perform a checkpoint until that job
* has finished. Therefore this lock is acquired by checkpoint and when cleaning is needed.
* <p>
* The cleaner job is responsible for releasing this lock after it has finished. This job will be executed by
* some other thread, that's why this lock is a {@link StampedLock}.
*/ */
private final ReadWriteLock writerCheckpointMutex = new StampedLock().asReadWriteLock(); private final StampedLock cleanerLock = new StampedLock();


/** /**
* Page size, i.e. tree node size, of the tree nodes in this tree. The page size is determined on * Page size, i.e. tree node size, of the tree nodes in this tree. The page size is determined on
Expand Down Expand Up @@ -759,8 +771,8 @@ private void checkpoint( IOLimiter ioLimiter, Header.Writer headerWriter ) throw


// Block writers, or if there's a current writer then wait for it to complete and then block // Block writers, or if there's a current writer then wait for it to complete and then block
// From this point and till the lock is released we know that the tree won't change. // From this point and till the lock is released we know that the tree won't change.
Lock writeLock = writerCheckpointMutex.writeLock(); long stamp = cleanerLock.writeLock();
writeLock.lock(); writerLock.lock();
try try
{ {
// Flush dirty pages since that last flush above. This should be a very small set of pages // Flush dirty pages since that last flush above. This should be a very small set of pages
Expand All @@ -787,7 +799,8 @@ private void checkpoint( IOLimiter ioLimiter, Header.Writer headerWriter ) throw
{ {
// Unblock writers, any writes after this point and up until the next checkpoint will have // Unblock writers, any writes after this point and up until the next checkpoint will have
// the new unstable generation. // the new unstable generation.
writeLock.unlock(); writerLock.unlock();
cleanerLock.unlockWrite( stamp );
} }
} }


Expand All @@ -809,8 +822,7 @@ private void assertRecoveryCleanSuccessful() throws IOException
@Override @Override
public void close() throws IOException public void close() throws IOException
{ {
Lock writeLock = writerCheckpointMutex.writeLock(); writerLock.lock();
writeLock.lock();
try try
{ {
if ( closed ) if ( closed )
Expand All @@ -835,7 +847,7 @@ public void close() throws IOException
} }
finally finally
{ {
writeLock.unlock(); writerLock.unlock();
} }
} }


Expand Down Expand Up @@ -911,8 +923,7 @@ private CleanupJob createCleanupJob( boolean needsCleaning ) throws IOException
} }
else else
{ {
Lock readLock = writerCheckpointMutex.readLock(); long stamp = cleanerLock.writeLock();
readLock.lock();


long generation = this.generation; long generation = this.generation;
long stableGeneration = stableGeneration( generation ); long stableGeneration = stableGeneration( generation );
Expand All @@ -922,7 +933,7 @@ private CleanupJob createCleanupJob( boolean needsCleaning ) throws IOException
CrashGenerationCleaner crashGenerationCleaner = CrashGenerationCleaner crashGenerationCleaner =
new CrashGenerationCleaner( pagedFile, bTreeNode, IdSpace.MIN_TREE_NODE_ID, highTreeNodeId, new CrashGenerationCleaner( pagedFile, bTreeNode, IdSpace.MIN_TREE_NODE_ID, highTreeNodeId,
stableGeneration, unstableGeneration, monitor ); stableGeneration, unstableGeneration, monitor );
return new GBPTreeCleanupJob( crashGenerationCleaner, readLock ); return new GBPTreeCleanupJob( crashGenerationCleaner, cleanerLock, stamp );
} }
} }


Expand Down Expand Up @@ -987,7 +998,7 @@ private class SingleWriter implements Writer<KEY,VALUE>
private final InternalTreeLogic<KEY,VALUE> treeLogic; private final InternalTreeLogic<KEY,VALUE> treeLogic;
private final StructurePropagation<KEY> structurePropagation; private final StructurePropagation<KEY> structurePropagation;
private PageCursor cursor; private PageCursor cursor;
private Lock readLock; private Lock sessionLock;


// Writer can't live past a checkpoint because of the mutex with checkpoint, // Writer can't live past a checkpoint because of the mutex with checkpoint,
// therefore safe to locally cache these generation fields from the volatile generation in the tree // therefore safe to locally cache these generation fields from the volatile generation in the tree
Expand All @@ -1006,13 +1017,13 @@ private class SingleWriter implements Writer<KEY,VALUE>
* Either fully initialized: * Either fully initialized:
* <ul> * <ul>
* <li>{@link #writerTaken} - true</li> * <li>{@link #writerTaken} - true</li>
* <li>{@link #writerCheckpointMutex} - locked</li> * <li>{@link #writerLock} - locked</li>
* <li>{@link #cursor} - not null</li> * <li>{@link #cursor} - not null</li>
* </ul> * </ul>
* Of fully closed: * Of fully closed:
* <ul> * <ul>
* <li>{@link #writerTaken} - false</li> * <li>{@link #writerTaken} - false</li>
* <li>{@link #writerCheckpointMutex} - unlocked</li> * <li>{@link #writerLock} - unlocked</li>
* <li>{@link #cursor} - null</li> * <li>{@link #cursor} - null</li>
* </ul> * </ul>
* *
Expand All @@ -1030,8 +1041,7 @@ void initialize() throws IOException
boolean success = false; boolean success = false;
try try
{ {
readLock = writerCheckpointMutex.readLock(); writerLock.lock();
readLock.lock();
cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK ); cursor = openRootCursor( PagedFile.PF_SHARED_WRITE_LOCK );
stableGeneration = stableGeneration( generation ); stableGeneration = stableGeneration( generation );
unstableGeneration = unstableGeneration( generation ); unstableGeneration = unstableGeneration( generation );
Expand Down Expand Up @@ -1135,16 +1145,7 @@ public void close() throws IOException
", but writer is already closed." ); ", but writer is already closed." );
} }
closeCursor(); closeCursor();
releaseLock(); writerLock.unlock();
}

private void releaseLock()
{
if ( readLock != null )
{
readLock.unlock();
readLock = null;
}
} }


private void closeCursor() private void closeCursor()
Expand Down
Expand Up @@ -21,22 +21,25 @@


import java.io.IOException; import java.io.IOException;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.StampedLock;


class GBPTreeCleanupJob implements CleanupJob class GBPTreeCleanupJob implements CleanupJob
{ {
private final CrashGenerationCleaner crashGenerationCleaner; private final CrashGenerationCleaner crashGenerationCleaner;
private final Lock readLock; private final StampedLock stampedLock;
private final long stamp;
private volatile boolean needed; private volatile boolean needed;
private volatile Exception failure; private volatile Exception failure;


/** /**
* @param crashGenerationCleaner {@link CrashGenerationCleaner} to use for cleaning. * @param crashGenerationCleaner {@link CrashGenerationCleaner} to use for cleaning.
* @param lock {@link Lock} to be released when job has either successfully finished or failed. * @param lock {@link Lock} to be released when job has either successfully finished or failed.
*/ */
GBPTreeCleanupJob( CrashGenerationCleaner crashGenerationCleaner, Lock lock ) GBPTreeCleanupJob( CrashGenerationCleaner crashGenerationCleaner, StampedLock lock, long stamp )
{ {
this.crashGenerationCleaner = crashGenerationCleaner; this.crashGenerationCleaner = crashGenerationCleaner;
this.readLock = lock; this.stampedLock = lock;
this.stamp = stamp;
this.needed = true; this.needed = true;


} }
Expand Down Expand Up @@ -73,7 +76,7 @@ public void run()
} }
finally finally
{ {
readLock.unlock(); stampedLock.unlockWrite( stamp );
} }
} }
} }
Expand Up @@ -769,7 +769,7 @@ public void cleanJobShouldLockOutCheckpoint() throws Exception
} }


@Test( timeout = 5_000L ) @Test( timeout = 5_000L )
public void cleanJobShouldLockOutClose() throws Exception public void cleanJobShouldNotLockOutClose() throws Exception
{ {
// GIVEN // GIVEN
try ( GBPTree<MutableLong,MutableLong> index = index().build() ) try ( GBPTree<MutableLong,MutableLong> index = index().build() )
Expand All @@ -789,11 +789,10 @@ public void cleanJobShouldLockOutClose() throws Exception


// THEN // THEN
Future<?> close = executor.submit( throwing( index::close ) ); Future<?> close = executor.submit( throwing( index::close ) );
shouldWait( close ); close.get();


monitor.barrier.release(); monitor.barrier.release();
cleanup.get(); cleanup.get();
close.get();
} }


@Test( timeout = 5_000L ) @Test( timeout = 5_000L )
Expand Down

0 comments on commit b2cbed1

Please sign in to comment.