Skip to content

Commit

Permalink
Make deferring lock client track reentrancy count
Browse files Browse the repository at this point in the history
This is done to make sure sequence:
 1) acquireExclusive(1)
 2) acquireExclusive(1)
 3) releaseExclusive(1)
 4) acquireAllDeferredLocks()

results in one exclusive lock being grabbed for (1).
  • Loading branch information
lutovich committed Jul 21, 2016
1 parent 3a9e67b commit 7e353a5
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 58 deletions.
Expand Up @@ -55,11 +55,6 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
{ {
} }


@Override
public void releaseAll()
{
}

@Override @Override
public void stop() public void stop()
{ {
Expand Down
@@ -1,8 +1,10 @@
package org.neo4j.kernel.impl.locking.deferred; package org.neo4j.kernel.impl.locking.deferred;


import org.apache.commons.lang3.mutable.MutableInt;

import java.util.Arrays; import java.util.Arrays;
import java.util.Set; import java.util.Map;
import java.util.TreeSet; import java.util.TreeMap;


import org.neo4j.kernel.impl.locking.AcquireLockTimeoutException; import org.neo4j.kernel.impl.locking.AcquireLockTimeoutException;
import org.neo4j.kernel.impl.locking.LockClientStoppedException; import org.neo4j.kernel.impl.locking.LockClientStoppedException;
Expand All @@ -12,7 +14,7 @@
public class DeferringLockClient implements Locks.Client public class DeferringLockClient implements Locks.Client
{ {
private final Locks.Client clientDelegate; private final Locks.Client clientDelegate;
private final Set<LockUnit> locks = new TreeSet<>(); private final Map<LockUnit,MutableInt> locks = new TreeMap<>();
private volatile boolean stopped; private volatile boolean stopped;


public DeferringLockClient( Locks.Client clientDelegate ) public DeferringLockClient( Locks.Client clientDelegate )
Expand All @@ -25,23 +27,17 @@ public void acquireShared( Locks.ResourceType resourceType, long... resourceIds
{ {
for ( long resourceId : resourceIds ) for ( long resourceId : resourceIds )
{ {
queueLock( resourceType, resourceId, false ); addLock( resourceType, resourceId, false );
} }
} }


private void queueLock( Locks.ResourceType resourceType, long resourceId, boolean exclusive )
{
assertNotStopped();
locks.add( new LockUnit( resourceType, resourceId, exclusive ) );
}

@Override @Override
public void acquireExclusive( Locks.ResourceType resourceType, long... resourceIds ) public void acquireExclusive( Locks.ResourceType resourceType, long... resourceIds )
throws AcquireLockTimeoutException throws AcquireLockTimeoutException
{ {
for ( long resourceId : resourceIds ) for ( long resourceId : resourceIds )
{ {
queueLock( resourceType, resourceId, true ); addLock( resourceType, resourceId, true );
} }
} }


Expand All @@ -60,23 +56,13 @@ public boolean trySharedLock( Locks.ResourceType resourceType, long resourceId )
@Override @Override
public void releaseShared( Locks.ResourceType resourceType, long resourceId ) public void releaseShared( Locks.ResourceType resourceType, long resourceId )
{ {
final LockUnit unit = new LockUnit( resourceType, resourceId, false ); removeLock( resourceType, resourceId, false );
if ( !locks.remove( unit ) )
{
throw new IllegalStateException( "Cannot release lock that it does not hold: " +
resourceType + "[" + resourceId + "]." );
}
} }


@Override @Override
public void releaseExclusive( Locks.ResourceType resourceType, long resourceId ) public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
{ {
final LockUnit unit = new LockUnit( resourceType, resourceId, true ); removeLock( resourceType, resourceId, true );
if ( !locks.remove( unit ) )
{
throw new IllegalStateException( "Cannot release lock that it does not hold: " +
resourceType + "[" + resourceId + "]." );
}
} }


void acquireDeferredLocks() void acquireDeferredLocks()
Expand All @@ -85,18 +71,15 @@ void acquireDeferredLocks()
int cursor = 0; int cursor = 0;
Locks.ResourceType currentType = null; Locks.ResourceType currentType = null;
boolean currentExclusive = false; boolean currentExclusive = false;
for ( LockUnit lockUnit : locks ) for ( LockUnit lockUnit : locks.keySet() )
{ {
// TODO perhaps also add a condition which sends batches over a certain size threshold // TODO perhaps also add a condition which sends batches over a certain size threshold
if ( currentType == null || if ( currentType == null ||
(currentType.typeId() != lockUnit.resourceType().typeId() || (currentType.typeId() != lockUnit.resourceType().typeId() ||
currentExclusive != lockUnit.isExclusive()) ) currentExclusive != lockUnit.isExclusive()) )
{ {
// New type, i.e. flush the current array down to delegate in one call // New type, i.e. flush the current array down to delegate in one call
if ( !flushLocks( current, cursor, currentType, currentExclusive ) ) flushLocks( current, cursor, currentType, currentExclusive );
{
break;
}


cursor = 0; cursor = 0;
currentType = lockUnit.resourceType(); currentType = lockUnit.resourceType();
Expand All @@ -113,14 +96,14 @@ void acquireDeferredLocks()
flushLocks( current, cursor, currentType, currentExclusive ); flushLocks( current, cursor, currentType, currentExclusive );
} }


private boolean flushLocks( long[] current, int cursor, Locks.ResourceType currentType, boolean currentExclusive ) private void flushLocks( long[] current, int cursor, Locks.ResourceType currentType, boolean exclusive )
{ {
assertNotStopped(); assertNotStopped();


if ( cursor > 0 ) if ( cursor > 0 )
{ {
long[] resourceIds = Arrays.copyOf( current, cursor ); long[] resourceIds = Arrays.copyOf( current, cursor );
if ( currentExclusive ) if ( exclusive )
{ {
clientDelegate.acquireExclusive( currentType, resourceIds ); clientDelegate.acquireExclusive( currentType, resourceIds );
} }
Expand All @@ -129,7 +112,6 @@ private boolean flushLocks( long[] current, int cursor, Locks.ResourceType curre
clientDelegate.acquireShared( currentType, resourceIds ); clientDelegate.acquireShared( currentType, resourceIds );
} }
} }
return true;
} }


@Override @Override
Expand All @@ -152,9 +134,46 @@ public int getLockSessionId()
return clientDelegate.getLockSessionId(); return clientDelegate.getLockSessionId();
} }


private void assertNotStopped() { private void assertNotStopped()
if( stopped ) { {
if ( stopped )
{
throw new LockClientStoppedException( this ); throw new LockClientStoppedException( this );
} }
} }

private void addLock( Locks.ResourceType resourceType, long resourceId, boolean exclusive )
{
assertNotStopped();

LockUnit lockUnit = new LockUnit( resourceType, resourceId, exclusive );
MutableInt lockCount = locks.get( lockUnit );
if ( lockCount == null )
{
lockCount = new MutableInt();
locks.put( lockUnit, lockCount );
}
lockCount.increment();
}

private void removeLock( Locks.ResourceType resourceType, long resourceId, boolean exclusive )
{
assertNotStopped();

LockUnit lockUnit = new LockUnit( resourceType, resourceId, exclusive );
MutableInt lockCount = locks.get( lockUnit );
if ( lockCount == null )
{
throw new IllegalStateException(
"Cannot release " + (exclusive ? "exclusive" : "shared") + " lock that it " +
"does not hold: " + resourceType + "[" + resourceId + "]." );
}

lockCount.decrement();

if ( lockCount.intValue() == 0 )
{
locks.remove( lockUnit );
}
}
} }
Expand Up @@ -38,6 +38,8 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;


public class DeferringLockClientTest public class DeferringLockClientTest
{ {
Expand Down Expand Up @@ -122,25 +124,106 @@ public void shouldDeferAllLocks() throws Exception
@Test @Test
public void shouldStopUnderlyingClient() throws Exception public void shouldStopUnderlyingClient() throws Exception
{ {
// GIVEN
Locks.Client actualClient = mock( Locks.Client.class );
DeferringLockClient client = new DeferringLockClient( actualClient );

// WHEN
client.stop();

// THEN
verify( actualClient ).stop();
} }


@Test @Test
public void shouldCloseUnderlyingClient() throws Exception public void shouldCloseUnderlyingClient() throws Exception
{ {
// GIVEN
Locks.Client actualClient = mock( Locks.Client.class );
DeferringLockClient client = new DeferringLockClient( actualClient );

// WHEN
client.close();

// THEN
verify( actualClient ).close();
} }


@Test @Test
public void acquireReleasedLocksOnCommit() throws Exception public void exclusiveLockAcquiredMultipleTimesCanNotBeReleasedAtOnce() throws Exception
{ {
// TODO: Is this really what we want to do? // GIVEN
TestLocks actualLocks = new TestLocks();
TestLocksClient actualClient = actualLocks.newClient();
DeferringLockClient client = new DeferringLockClient( actualClient );

client.acquireExclusive( ResourceTypes.NODE, 1 );
client.acquireExclusive( ResourceTypes.NODE, 1 );
client.releaseExclusive( ResourceTypes.NODE, 1 );

// WHEN
client.acquireDeferredLocks();

// THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, true ) ) );
} }


@Test @Test
public void shouldKeepCounterOverLocks() throws Exception public void sharedLockAcquiredMultipleTimesCanNotBeReleasedAtOnce() throws Exception
{ {
// GIVEN
TestLocks actualLocks = new TestLocks();
TestLocksClient actualClient = actualLocks.newClient();
DeferringLockClient client = new DeferringLockClient( actualClient );

client.acquireShared( ResourceTypes.NODE, 1 );
client.acquireShared( ResourceTypes.NODE, 1 );
client.releaseShared( ResourceTypes.NODE, 1 );

// WHEN
client.acquireDeferredLocks();

// THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, false ) ) );
} }


// TODO: Not a complete list of tests @Test
public void acquireBothSharedAndExclusiveLockThenReleaseShared()
{
// GIVEN
TestLocks actualLocks = new TestLocks();
TestLocksClient actualClient = actualLocks.newClient();
DeferringLockClient client = new DeferringLockClient( actualClient );

client.acquireShared( ResourceTypes.NODE, 1 );
client.acquireExclusive( ResourceTypes.NODE, 1 );
client.releaseShared( ResourceTypes.NODE, 1 );

// WHEN
client.acquireDeferredLocks();

// THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, true ) ) );
}

@Test
public void acquireBothSharedAndExclusiveLockThenReleaseExclusive()
{
// GIVEN
TestLocks actualLocks = new TestLocks();
TestLocksClient actualClient = actualLocks.newClient();
DeferringLockClient client = new DeferringLockClient( actualClient );

client.acquireShared( ResourceTypes.NODE, 1 );
client.acquireExclusive( ResourceTypes.NODE, 1 );
client.releaseExclusive( ResourceTypes.NODE, 1 );

// WHEN
client.acquireDeferredLocks();

// THEN
actualClient.assertRegisteredLocks( Collections.singleton( new LockUnit( ResourceTypes.NODE, 1, false ) ) );
}


private static class TestLocks extends LifecycleAdapter implements Locks private static class TestLocks extends LifecycleAdapter implements Locks
{ {
Expand Down
Expand Up @@ -468,22 +468,6 @@ public void releaseExclusive( Locks.ResourceType resourceType, long resourceId )
} }
} }


@Override
public void releaseAll()
{
// increment number of active clients if we can't do so we are closed so exiting
stateHolder.incrementActiveClients( this );

try
{
releaseAllClientLocks();
}
finally
{
stateHolder.decrementActiveClients();
}
}

private void releaseAllClientLocks() private void releaseAllClientLocks()
{ {
// Force the release of all locks held. // Force the release of all locks held.
Expand Down

0 comments on commit 7e353a5

Please sign in to comment.