Skip to content

Commit

Permalink
Address pull request comments.
Browse files Browse the repository at this point in the history
Also avoid clearing the waitList if it was not changed since last clearing, which would happen when we get a lock without doing any waiting.
  • Loading branch information
chrisvest committed Jun 15, 2017
1 parent 12bfc84 commit 75f30be
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 46 deletions.
Expand Up @@ -44,6 +44,7 @@
*/
public class SimpleBitSet extends StampedLock implements PrimitiveIntIterable
{
private long lastCheckPointKey;
private long[] data;

public SimpleBitSet( int size )
Expand Down Expand Up @@ -108,11 +109,37 @@ public void remove( SimpleBitSet other )
unlockWrite( stamp );
}

public void clear()
public long checkPointAndPut( long checkPoint, int key )
{
long stamp = writeLock();
Arrays.fill( data, 0 );
unlockWrite( stamp );
// We only need to clear the bit set if it was modified since the last check point
if ( !validate( checkPoint ) || key != lastCheckPointKey )
{
long stamp = writeLock();
int idx = key >>> 6;
if ( idx < data.length )
{
Arrays.fill( data, 0 );
}
else
{
int len = data.length;
len = findNewLength( idx, len );
data = new long[len];
}
data[idx] = data[idx] | (1L<< (key & 63));
lastCheckPointKey = key;
checkPoint = tryConvertToOptimisticRead( stamp );
}
return checkPoint;
}

private int findNewLength( int idx, int len )
{
while ( len <= idx )
{
len *= 2;
}
return len;
}

public int size()
Expand All @@ -125,8 +152,7 @@ public int size()

private void ensureCapacity( int arrayIndex )
{
while(data.length <= arrayIndex)
data = Arrays.copyOf(data, data.length * 2);
data = Arrays.copyOf( data, findNewLength( arrayIndex, data.length ) );
}

//
Expand Down
Expand Up @@ -29,6 +29,8 @@
import static junit.framework.Assert.assertTrue;
import static junit.framework.TestCase.assertFalse;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.junit.Assert.assertThat;

public class SimpleBitSetTest
Expand Down Expand Up @@ -162,4 +164,95 @@ public void shouldAllowIterating() throws Exception
assertThat( found, equalTo( asList( 4, 7, 63, 78 ) ));
}

@Test
public void checkPointOnUnchangedSetMustDoNothing() throws Exception
{
SimpleBitSet set = new SimpleBitSet( 16 );
int key = 10;
set.put( key );
long checkpoint = 0;
checkpoint = set.checkPointAndPut( checkpoint, key );
assertThat( set.checkPointAndPut( checkpoint, key ), is( checkpoint ) );
assertTrue( set.contains( key ) );
}

@Test
public void checkPointOnUnchangedSetButWithDifferentKeyMustUpdateSet() throws Exception
{
SimpleBitSet set = new SimpleBitSet( 16 );
int key = 10;
set.put( key );
long checkpoint = 0;
checkpoint = set.checkPointAndPut( checkpoint, key );
assertThat( set.checkPointAndPut( checkpoint, key + 1 ), is( not( checkpoint ) ) );
assertTrue( set.contains( key + 1 ) );
assertFalse( set.contains( key ) );
}

@Test
public void checkPointOnChangedSetMustClearState() throws Exception
{
SimpleBitSet set = new SimpleBitSet( 16 );
int key = 10;
set.put( key );
long checkpoint = 0;
checkpoint = set.checkPointAndPut( checkpoint, key );
set.put( key + 1 );
assertThat( set.checkPointAndPut( checkpoint, key ), is( not( checkpoint ) ) );
assertTrue( set.contains( key ) );
assertFalse( set.contains( key + 1 ) );
}

@Test
public void checkPointMustBeAbleToExpandCapacity() throws Exception
{
SimpleBitSet set = new SimpleBitSet( 16 );
int key = 10;
int key2 = 255;
set.put( key );
long checkpoint = 0;
checkpoint = set.checkPointAndPut( checkpoint, key );
assertThat( set.checkPointAndPut( checkpoint, key2 ), is( not( checkpoint ) ) );
assertTrue( set.contains( key2 ) );
assertFalse( set.contains( key ) );
}

@Test
public void modificationsMustTakeWriteLocks() throws Exception
{
// We can observe that a write lock was taken, by seeing that an optimistic read lock was invalidated.
SimpleBitSet set = new SimpleBitSet( 16 );
long stamp = set.tryOptimisticRead();

set.put( 8 );
assertFalse( set.validate( stamp ) );
stamp = set.tryOptimisticRead();

set.put( 8 );
assertFalse( set.validate( stamp ) );
stamp = set.tryOptimisticRead();

SimpleBitSet other = new SimpleBitSet( 16 );
other.put( 3 );
set.put( other );
assertFalse( set.validate( stamp ) );
stamp = set.tryOptimisticRead();

set.remove( 3 );
assertFalse( set.validate( stamp ) );
stamp = set.tryOptimisticRead();

set.remove( 3 );
assertFalse( set.validate( stamp ) );
stamp = set.tryOptimisticRead();

other.put( 8 );
set.remove( other );
assertFalse( set.validate( stamp ) );
stamp = set.tryOptimisticRead();

other.put( 8 );
set.remove( other );
assertFalse( set.validate( stamp ) );
}
}
Expand Up @@ -31,6 +31,7 @@
import org.neo4j.collection.primitive.PrimitiveIntIterator;
import org.neo4j.collection.primitive.PrimitiveLongIntMap;
import org.neo4j.collection.primitive.PrimitiveLongVisitor;
import org.neo4j.graphdb.TransactionFailureException;
import org.neo4j.kernel.DeadlockDetectedException;
import org.neo4j.kernel.impl.enterprise.lock.forseti.ForsetiLockManager.DeadlockResolutionStrategy;
import org.neo4j.kernel.impl.locking.LockClientStateHolder;
Expand Down Expand Up @@ -85,11 +86,12 @@ public class ForsetiClient implements Locks.Client
*/
private final PrimitiveLongIntMap[] sharedLockCounts;

/** @see {@link #sharedLockCounts} */
/** @see #sharedLockCounts */
private final PrimitiveLongIntMap[] exclusiveLockCounts;

/** List of other clients this client is waiting for. */
private final SimpleBitSet waitList = new SimpleBitSet( 64 );
private long waitListCheckPoint;

// To be able to close Locks.Client instance properly we should be able to do couple of things:
// - have a possibility to prevent new clients to come
Expand All @@ -113,12 +115,12 @@ public class ForsetiClient implements Locks.Client
*/
private volatile ForsetiLockManager.Lock waitingForLock;

public ForsetiClient( int id,
ConcurrentMap<Long,ForsetiLockManager.Lock>[] lockMaps,
WaitStrategy<AcquireLockTimeoutException>[] waitStrategies,
Pool<ForsetiClient> clientPool,
DeadlockResolutionStrategy deadlockResolutionStrategy,
IntFunction<ForsetiClient> clientById )
ForsetiClient( int id,
ConcurrentMap<Long,ForsetiLockManager.Lock>[] lockMaps,
WaitStrategy<AcquireLockTimeoutException>[] waitStrategies,
Pool<ForsetiClient> clientPool,
DeadlockResolutionStrategy deadlockResolutionStrategy,
IntFunction<ForsetiClient> clientById )
{
this.clientId = id;
this.lockMaps = lockMaps;
Expand Down Expand Up @@ -237,15 +239,14 @@ else if ( existingLock instanceof ExclusiveLock )
waitFor( existingLock, resourceType, resourceId, tries++ );
}

// Got the lock, no longer waiting for anyone.
clearWaitList();

// Make a local note about the fact that we now hold this lock
heldShareLocks.put( resourceId, 1 );
}
}
finally
{
clearWaitList();
waitingForLock = null;
stateHolder.decrementActiveClients();
}
}
Expand Down Expand Up @@ -294,12 +295,13 @@ public void acquireExclusive( ResourceType resourceType, long... resourceIds ) t
waitFor( existingLock, resourceType, resourceId, tries++ );
}

clearWaitList();
heldLocks.put( resourceId, 1 );
}
}
finally
{
clearWaitList();
waitingForLock = null;
stateHolder.decrementActiveClients();
}
}
Expand Down Expand Up @@ -597,17 +599,17 @@ public int getLockSessionId()
return clientId;
}

public int waitListSize()
int waitListSize()
{
return waitList.size();
}

public void copyWaitListTo( SimpleBitSet other )
void copyWaitListTo( SimpleBitSet other )
{
other.put( waitList );
}

public boolean isWaitingFor( int clientId )
boolean isWaitingFor( int clientId )
{
return clientId != this.clientId && waitList.contains( clientId );
}
Expand Down Expand Up @@ -738,39 +740,28 @@ private boolean tryUpgradeToExclusiveWithShareLockHeld(
}

return true;

}
catch ( DeadlockDetectedException e )
catch ( Throwable e )
{
sharedLock.releaseUpdateLock( this );
// wait list is not cleared here as in other catch blocks because it is cleared in
// markAsWaitingFor() before throwing DeadlockDetectedException
throw e;
}
catch ( LockClientStoppedException e )
{
handleUpgradeToExclusiveFailure( sharedLock );
throw e;
if ( e instanceof DeadlockDetectedException || e instanceof LockClientStoppedException )
{
throw (RuntimeException) e;
}
throw new TransactionFailureException( "Failed to upgrade shared lock to exclusive: " + sharedLock, e );
}
catch ( Throwable e )
finally
{
handleUpgradeToExclusiveFailure( sharedLock );
throw new RuntimeException( e );
clearWaitList();
waitingForLock = null;
}
}
return false;
}

private void handleUpgradeToExclusiveFailure( SharedLock sharedLock )
{
sharedLock.releaseUpdateLock( this );
clearWaitList();
}

private void clearWaitList()
{
waitList.clear();
waitList.put( clientId );
waitListCheckPoint = waitList.checkPointAndPut( waitListCheckPoint, clientId );
}

private void waitFor( ForsetiLockManager.Lock lock, ResourceType type, long resourceId, int tries )
Expand Down Expand Up @@ -805,13 +796,10 @@ private void waitFor( ForsetiLockManager.Lock lock, ResourceType type, long reso
if ( isDeadlockReal( lock, tries ) )
{
// After checking several times, this really does look like a real deadlock.
waitList.clear();
waitingForLock = null;
throw new DeadlockDetectedException( message );
}
}
}
waitingForLock = null;
}

private boolean isDeadlockReal( ForsetiLockManager.Lock lock, int tries )
Expand Down Expand Up @@ -871,7 +859,7 @@ private void collectNextOwners( Set<ForsetiLockManager.Lock> waitedUpon, Set<For
* @return an approximate (assuming data is concurrently being edited) count of the number of locks held by this
* client.
*/
public int lockCount()
int lockCount()
{
int count = 0;
for ( PrimitiveLongIntMap sharedLockCount : sharedLockCounts )
Expand All @@ -885,7 +873,7 @@ public int lockCount()
return count;
}

public String describeWaitList()
String describeWaitList()
{
StringBuilder sb = new StringBuilder( format( "%nClient[%d] waits for [", id() ) );
PrimitiveIntIterator iter = waitList.iterator();
Expand Down

0 comments on commit 75f30be

Please sign in to comment.