Skip to content

Commit

Permalink
Polish SlaveLocksClient#stop()
Browse files Browse the repository at this point in the history
Commit makes SlaveLocksClient better handle stopping. It's methods throw
LockClientStoppedException when invoked on a stopped client.

Removed Locks.Client#releaseAll() because it was only used by SlaveLocksClient
and same functionality is available via Locks.Client#close(). Updated all
#releaseAll() tests respectively.
  • Loading branch information
lutovich committed Jun 8, 2016
1 parent c33b805 commit b7fe258
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 129 deletions.
Expand Up @@ -99,12 +99,10 @@ interface Client extends AutoCloseable
/** Release a set of exclusive locks */
void releaseExclusive( ResourceType resourceType, long resourceId );

/** Release all locks. */
void releaseAll();

/**
* Stop all active lock waiters and release them. All already held locks remains.
* All new attempts to acquire any locks will cause exceptions.
* This client can and should only be {@link #close() closed} afterwards.
*/
void stop();

Expand Down
Expand Up @@ -53,11 +53,6 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
{
}

@Override
public void releaseAll()
{
}

@Override
public void stop()
{
Expand Down
Expand Up @@ -234,20 +234,6 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
}
}

@Override
public void releaseAll()
{
stateHolder.incrementActiveClients( this );
try
{
releaseLocks();
}
finally
{
stateHolder.decrementActiveClients();
}
}

@Override
public void stop()
{
Expand Down
Expand Up @@ -91,23 +91,6 @@ public void sharedShouldWaitForExclusive() throws Exception
assertNotWaiting( clientB, clientBLock );
}

@Test
public void shouldReleaseAllLocks() throws Exception
{
// When
clientA.acquireExclusive( NODE, 1L );
clientA.acquireShared( NODE, 2l );

// Then shared locks should wait
Future<Object> clientBLock = acquireShared( clientB, NODE, 1L ).callAndAssertWaiting();

// And when
clientA.releaseAll();

// Then this should not block
assertNotWaiting( clientB, clientBLock );
}

@Test
public void shouldTrySharedLock() throws Exception
{
Expand Down
Expand Up @@ -101,11 +101,17 @@ public void shouldNotBeAbleToAcquireExclusiveLockFromClosedClient()
clientA.acquireExclusive( NODE, 1l );
}

@Test
public void shouldNotBeAbleToAcquireLocksUsingTryFromClosedClient()
@Test( expected = LockClientStoppedException.class )
public void shouldNotBeAbleToTryAcquireSharedLockFromClosedClient()
{
clientA.close();
clientA.trySharedLock( NODE, 1L );
}

@Test( expected = LockClientStoppedException.class )
public void shouldNotBeAbleToTryAcquireExclusiveLockFromClosedClient()
{
clientA.close();
Assert.assertFalse( clientA.trySharedLock( NODE, 1l ) );
Assert.assertFalse( clientA.tryExclusiveLock( NODE, 1l ) );
clientA.tryExclusiveLock( NODE, 1L );
}
}
Expand Up @@ -217,7 +217,7 @@ public void shouldUpgradeAndDowngradeSameSharedLock() throws InterruptedExceptio
Future<Object> exclusiveLockFuture = acquireExclusive( clientB, NODE, 1L ).callAndAssertWaiting();

// and when
clientA.releaseAll();
clientA.releaseShared( NODE, 1L );

// exclusive lock should be received
assertNotWaiting( clientB, exclusiveLockFuture );
Expand Down
Expand Up @@ -153,12 +153,6 @@ public void releaseExclusiveThrowsWhenClientStopped()
stoppedClient().releaseExclusive( ResourceTypes.NODE, 1 );
}

@Test( expected = LockClientStoppedException.class )
public void releaseAllThrowsWhenClientStopped()
{
stoppedClient().releaseAll();
}

@Test
public void sharedLockCanBeStopped() throws Exception
{
Expand Down Expand Up @@ -186,15 +180,15 @@ public void exclusiveLockCanBeStopped() throws Exception
@Test
public void acquireSharedLockAfterSharedLockStoppedOtherThread() throws Exception
{
acquireExclusiveLockInThisThread();
AcquiredLock thisThreadsExclusiveLock = acquireExclusiveLockInThisThread();

LockAcquisition sharedLockAcquisition1 = acquireSharedLockInAnotherThread();
assertThreadIsWaitingForLock( sharedLockAcquisition1 );

sharedLockAcquisition1.stop();
assertLockAcquisitionFailed( sharedLockAcquisition1 );

releaseAllLocksInThisThread();
thisThreadsExclusiveLock.release();

LockAcquisition sharedLockAcquisition2 = acquireSharedLockInAnotherThread();
assertLockAcquisitionSucceeded( sharedLockAcquisition2 );
Expand All @@ -203,15 +197,15 @@ public void acquireSharedLockAfterSharedLockStoppedOtherThread() throws Exceptio
@Test
public void acquireExclusiveLockAfterExclusiveLockStoppedOtherThread() throws Exception
{
acquireExclusiveLockInThisThread();
AcquiredLock thisThreadsExclusiveLock = acquireExclusiveLockInThisThread();

LockAcquisition exclusiveLockAcquisition1 = acquireExclusiveLockInAnotherThread();
assertThreadIsWaitingForLock( exclusiveLockAcquisition1 );

exclusiveLockAcquisition1.stop();
assertLockAcquisitionFailed( exclusiveLockAcquisition1 );

releaseAllLocksInThisThread();
thisThreadsExclusiveLock.release();

LockAcquisition exclusiveLockAcquisition2 = acquireExclusiveLockInAnotherThread();
assertLockAcquisitionSucceeded( exclusiveLockAcquisition2 );
Expand All @@ -220,15 +214,15 @@ public void acquireExclusiveLockAfterExclusiveLockStoppedOtherThread() throws Ex
@Test
public void acquireSharedLockAfterExclusiveLockStoppedOtherThread() throws Exception
{
acquireExclusiveLockInThisThread();
AcquiredLock thisThreadsExclusiveLock = acquireExclusiveLockInThisThread();

LockAcquisition exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread();
assertThreadIsWaitingForLock( exclusiveLockAcquisition );

exclusiveLockAcquisition.stop();
assertLockAcquisitionFailed( exclusiveLockAcquisition );

releaseAllLocksInThisThread();
thisThreadsExclusiveLock.release();

LockAcquisition sharedLockAcquisition = acquireSharedLockInAnotherThread();
assertLockAcquisitionSucceeded( sharedLockAcquisition );
Expand All @@ -237,15 +231,15 @@ public void acquireSharedLockAfterExclusiveLockStoppedOtherThread() throws Excep
@Test
public void acquireExclusiveLockAfterSharedLockStoppedOtherThread() throws Exception
{
acquireExclusiveLockInThisThread();
AcquiredLock thisThreadsExclusiveLock = acquireExclusiveLockInThisThread();

LockAcquisition sharedLockAcquisition = acquireSharedLockInAnotherThread();
assertThreadIsWaitingForLock( sharedLockAcquisition );

sharedLockAcquisition.stop();
assertLockAcquisitionFailed( sharedLockAcquisition );

releaseAllLocksInThisThread();
thisThreadsExclusiveLock.release();

LockAcquisition exclusiveLockAcquisition = acquireExclusiveLockInAnotherThread();
assertLockAcquisitionSucceeded( exclusiveLockAcquisition );
Expand Down Expand Up @@ -290,7 +284,7 @@ public void closeClientAfterExclusiveLockStopped() throws Exception
@Test
public void acquireExclusiveLockWhileHoldingSharedLockCanBeStopped() throws Exception
{
acquireSharedLockInThisThread();
AcquiredLock thisThreadsSharedLock = acquireSharedLockInThisThread();

CountDownLatch sharedLockAcquired = new CountDownLatch( 1 );
CountDownLatch startExclusiveLock = new CountDownLatch( 1 );
Expand All @@ -304,7 +298,7 @@ public void acquireExclusiveLockWhileHoldingSharedLockCanBeStopped() throws Exce
acquisition.stop();
assertLockAcquisitionFailed( acquisition );

releaseAllLocksInThisThread();
thisThreadsSharedLock.release();
assertNoLocksHeld();
}

Expand All @@ -323,7 +317,7 @@ private Locks.Client stoppedClient()

private void closeClientAfterLockStopped( boolean shared ) throws Exception
{
acquireExclusiveLockInThisThread();
AcquiredLock thisThreadsExclusiveLock = acquireExclusiveLockInThisThread();

CountDownLatch firstLockAcquired = new CountDownLatch( 1 );
LockAcquisition
Expand All @@ -337,14 +331,14 @@ private void closeClientAfterLockStopped( boolean shared ) throws Exception
assertLockAcquisitionFailed( acquisition );
assertLocksHeld( RESOURCE_ID );

releaseAllLocksInThisThread();
thisThreadsExclusiveLock.release();
assertNoLocksHeld();
}

private void acquireLockAfterOtherLockStoppedSameThread( boolean firstLockShared, boolean secondLockShared )
throws Exception
{
acquireExclusiveLockInThisThread();
AcquiredLock thisThreadsExclusiveLock = acquireExclusiveLockInThisThread();

CountDownLatch firstLockFailed = new CountDownLatch( 1 );
CountDownLatch startSecondLock = new CountDownLatch( 1 );
Expand All @@ -356,27 +350,24 @@ private void acquireLockAfterOtherLockStoppedSameThread( boolean firstLockShared

lockAcquisition.stop();
await( firstLockFailed );
releaseAllLocksInThisThread();
thisThreadsExclusiveLock.release();
startSecondLock.countDown();

assertLockAcquisitionSucceeded( lockAcquisition );
}

private void acquireSharedLockInThisThread()
private AcquiredLock acquireSharedLockInThisThread()
{
client.acquireShared( RESOURCE_TYPE, RESOURCE_ID );
assertLocksHeld( RESOURCE_ID );
return AcquiredLock.shared( client, RESOURCE_TYPE, RESOURCE_ID );
}

private void acquireExclusiveLockInThisThread()
private AcquiredLock acquireExclusiveLockInThisThread()
{
client.acquireExclusive( RESOURCE_TYPE, RESOURCE_ID );
assertLocksHeld( RESOURCE_ID );
}

private void releaseAllLocksInThisThread()
{
client.releaseAll();
return AcquiredLock.exclusive( client, RESOURCE_TYPE, RESOURCE_ID );
}

private LockAcquisition acquireSharedLockInAnotherThread()
Expand Down Expand Up @@ -684,4 +675,42 @@ void stop()
getClient().stop();
}
}

private static class AcquiredLock
{
final Locks.Client client;
final boolean shared;
final Locks.ResourceType resourceType;
final long resourceId;

AcquiredLock( Locks.Client client, boolean shared, Locks.ResourceType resourceType, long resourceId )
{
this.client = client;
this.shared = shared;
this.resourceType = resourceType;
this.resourceId = resourceId;
}

static AcquiredLock shared( Locks.Client client, Locks.ResourceType resourceType, long resourceId )
{
return new AcquiredLock( client, true, resourceType, resourceId );
}

static AcquiredLock exclusive( Locks.Client client, Locks.ResourceType resourceType, long resourceId )
{
return new AcquiredLock( client, false, resourceType, resourceId );
}

void release()
{
if ( shared )
{
client.releaseShared( resourceType, resourceId );
}
else
{
client.releaseExclusive( resourceType, resourceId );
}
}
}
}

0 comments on commit b7fe258

Please sign in to comment.