Skip to content

Commit

Permalink
Add possibility to grab cleaner and writer lock atomicly
Browse files Browse the repository at this point in the history
  • Loading branch information
burqen committed May 19, 2017
1 parent cd4d8ee commit d64fe5c
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 18 deletions.
Expand Up @@ -764,8 +764,7 @@ private void checkpoint( IOLimiter ioLimiter, Header.Writer headerWriter ) throw

// Block writers, or if there's a current writer then wait for it to complete and then block
// From this point and till the lock is released we know that the tree won't change.
lock.cleanerLock();
lock.writerLock();
lock.writerAndCleanerLock();
try
{
// Flush dirty pages since that last flush above. This should be a very small set of pages
Expand All @@ -792,8 +791,7 @@ private void checkpoint( IOLimiter ioLimiter, Header.Writer headerWriter ) throw
{
// Unblock writers, any writes after this point and up until the next checkpoint will have
// the new unstable generation.
lock.writerUnlock();
lock.cleanerUnlock();
lock.writerAndCleanerUnlock();
}
}

Expand Down
Expand Up @@ -31,6 +31,14 @@ class GBPTreeLock
private static final long cleanerLockBit = 0x00000000_00000002L;
private volatile long state;

// Used for testing
GBPTreeLock copy()
{
GBPTreeLock copy = new GBPTreeLock();
copy.state = state;
return copy;
}

void writerLock()
{
doLock( writerLockBit );
Expand All @@ -51,14 +59,24 @@ void cleanerUnlock()
doUnlock( cleanerLockBit );
}

void writerAndCleanerLock()
{
doLock( writerLockBit | cleanerLockBit );
}

void writerAndCleanerUnlock()
{
doUnlock( writerLockBit | cleanerLockBit );
}

private void doLock( long targetLockBit )
{
long currentState;
long newState;
do
{
currentState = state;
while ( isLocked( currentState, targetLockBit ) )
while ( !canLock( currentState, targetLockBit ) )
{
// sleep
sleep();
Expand All @@ -68,19 +86,14 @@ private void doLock( long targetLockBit )
} while ( !UnsafeUtil.compareAndSwapLong( this, stateOffset, currentState, newState ) );
}

private boolean isLocked( long state, long targetLockBit )
{
return (state & targetLockBit) == targetLockBit;
}

private void doUnlock( long targetLockBit )
{
long currentState;
long newState;
do
{
currentState = state;
if ( !isLocked( currentState, targetLockBit) )
if ( !canUnlock( currentState, targetLockBit) )
{
throw new IllegalStateException( "Can not unlock lock that is already locked" );
}
Expand All @@ -89,6 +102,16 @@ private void doUnlock( long targetLockBit )
while ( !UnsafeUtil.compareAndSwapLong( this, stateOffset, currentState, newState ) );
}

private boolean canLock( long state, long targetLockBit )
{
return (state & targetLockBit) == 0;
}

private boolean canUnlock( long state, long targetLockBit )
{
return (state & targetLockBit) == targetLockBit;
}

private void sleep()
{
LockSupport.parkNanos( TimeUnit.MILLISECONDS.toNanos( 10 ) );
Expand Down
Expand Up @@ -41,6 +41,7 @@ public class GBPTreeLockTest
// State LL - locked | locked

private GBPTreeLock lock = new GBPTreeLock();
private GBPTreeLock copy;
private final ExecutorService executor = Executors.newSingleThreadExecutor();

@Test
Expand Down Expand Up @@ -103,6 +104,20 @@ public void test_LU_UU_LU() throws Exception
assertLU();
}

@Test
public void test_UU_LL_UU() throws Exception
{
// given
assertUU();

// then
lock.writerAndCleanerLock();
assertLL();

lock.writerAndCleanerUnlock();
assertUU();
}

private void assertThrow( Runnable unlock )
{
try
Expand All @@ -116,11 +131,11 @@ private void assertThrow( Runnable unlock )
}
}

private void assertBlock( Runnable lock, Runnable unlock ) throws ExecutionException, InterruptedException
private void assertBlock( Runnable runLock, Runnable runUnlock ) throws ExecutionException, InterruptedException
{
Future<?> future = executor.submit( lock );
Future<?> future = executor.submit( runLock );
shouldWait( future );
unlock.run();
runUnlock.run();
future.get();
}

Expand All @@ -141,23 +156,34 @@ private void assertUU()
{
assertThrow( lock::writerUnlock );
assertThrow( lock::cleanerUnlock );
assertThrow( lock::writerAndCleanerUnlock );
}

private void assertUL() throws ExecutionException, InterruptedException
{
assertThrow( lock::writerUnlock );
assertBlock( lock::cleanerLock, lock::cleanerUnlock );
assertThrow( lock::writerAndCleanerUnlock );
copy = lock.copy();
assertBlock( copy::cleanerLock, copy::cleanerUnlock );
copy = lock.copy();
assertBlock( copy::writerAndCleanerLock, copy::cleanerUnlock );
}

private void assertLU() throws ExecutionException, InterruptedException
{
assertBlock( lock::writerLock, lock::writerUnlock );
assertThrow( lock::cleanerUnlock );
assertThrow( lock::writerAndCleanerUnlock );
copy = lock.copy();
assertBlock( copy::writerLock, copy::writerUnlock );
}

private void assertLL() throws ExecutionException, InterruptedException
{
assertBlock( lock::writerLock, lock::writerUnlock );
assertBlock( lock::cleanerLock, lock::cleanerUnlock );
copy = lock.copy();
assertBlock( copy::writerLock, copy::writerUnlock );
copy = lock.copy();
assertBlock( copy::cleanerLock, copy::cleanerUnlock );
copy = lock.copy();
assertBlock( copy::writerAndCleanerLock, copy::writerAndCleanerUnlock );
}
}

0 comments on commit d64fe5c

Please sign in to comment.