Skip to content

Commit

Permalink
Check local lock client first on slaves
Browse files Browse the repository at this point in the history
By adding explicit methods for re-entry on the Locks.Client interface we
can use the local lock client for checking if the lock is held locally,
and achieve the same behaviour of locking on the master first, and not
talking to the master on re-entry, while lowering the overhead of lock
maintenance on the slave.

The lock maps in the SlaveLockClient completely disappear, and instead
the state of the local lock client is used, since that already contains
the same state.

Not only does this lower the memory footprint (for slaves that execute
a write load), it also simplifies the code of the SlaveLockClient.
  • Loading branch information
thobe committed Jan 20, 2017
1 parent 2b8fd95 commit b6ed3e6
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 69 deletions.
Expand Up @@ -94,6 +94,10 @@ interface Client extends ResourceLocker, AutoCloseable
/** Try grabbing shared lock, not waiting and returning a boolean indicating if we got the lock. */ /** Try grabbing shared lock, not waiting and returning a boolean indicating if we got the lock. */
boolean trySharedLock( ResourceType resourceType, long resourceId ); boolean trySharedLock( ResourceType resourceType, long resourceId );


boolean reEnterShared( ResourceType resourceType, long resourceId );

boolean reEnterExclusive( ResourceType resourceType, long resourceId );

/** Release a set of shared locks */ /** Release a set of shared locks */
void releaseShared( ResourceType resourceType, long resourceId ); void releaseShared( ResourceType resourceType, long resourceId );


Expand Down
Expand Up @@ -48,6 +48,18 @@ public boolean trySharedLock( ResourceType resourceType, long resourceId )
return false; return false;
} }


@Override
public boolean reEnterShared( ResourceType resourceType, long resourceId )
{
return false;
}

@Override
public boolean reEnterExclusive( ResourceType resourceType, long resourceId )
{
return false;
}

@Override @Override
public void releaseShared( ResourceType resourceType, long resourceId ) public void releaseShared( ResourceType resourceType, long resourceId )
{ {
Expand Down
Expand Up @@ -227,6 +227,48 @@ public boolean trySharedLock( ResourceType resourceType, long resourceId )
} }
} }


@Override
public boolean reEnterShared( ResourceType resourceType, long resourceId )
{
stateHolder.incrementActiveClients( this );
try
{
return reEnter( localShared( resourceType ), resourceId );
}
finally
{
stateHolder.decrementActiveClients();
}
}

@Override
public boolean reEnterExclusive( ResourceType resourceType, long resourceId )
{
stateHolder.incrementActiveClients( this );
try
{
return reEnter( localExclusive( resourceType ), resourceId );
}
finally
{
stateHolder.decrementActiveClients();
}
}

private boolean reEnter( PrimitiveLongObjectMap<LockResource> localLocks, long resourceId )
{
LockResource resource = localLocks.get( resourceId );
if ( resource != null )
{
resource.acquireReference();
return true;
}
else
{
return false;
}
}

@Override @Override
public void releaseShared( ResourceType resourceType, long resourceId ) public void releaseShared( ResourceType resourceType, long resourceId )
{ {
Expand Down
Expand Up @@ -230,6 +230,19 @@ public boolean trySharedLock( ResourceType resourceType, long resourceId )
return localClient.trySharedLock( resourceType, resourceId ); return localClient.trySharedLock( resourceType, resourceId );
} }


@Override
public boolean reEnterShared( ResourceType resourceType, long resourceId )
{
return localClient.reEnterShared( resourceType, resourceId );
}

@Override
public boolean reEnterExclusive( ResourceType resourceType, long resourceId )
{
ensureHoldingToken();
return localClient.reEnterExclusive( resourceType, resourceId );
}

@Override @Override
public void releaseShared( ResourceType resourceType, long resourceId ) public void releaseShared( ResourceType resourceType, long resourceId )
{ {
Expand Down
Expand Up @@ -75,6 +75,18 @@ public boolean trySharedLock( ResourceType resourceType, long resourceId )
throw new UnsupportedOperationException( "Should not be needed" ); throw new UnsupportedOperationException( "Should not be needed" );
} }


@Override
public boolean reEnterShared( ResourceType resourceType, long resourceId )
{
throw new UnsupportedOperationException( "Should not be needed" );
}

@Override
public boolean reEnterExclusive( ResourceType resourceType, long resourceId )
{
throw new UnsupportedOperationException( "Should not be needed" );
}

@Override @Override
public void releaseShared( ResourceType resourceType, long resourceId ) public void releaseShared( ResourceType resourceType, long resourceId )
{ {
Expand Down
Expand Up @@ -444,6 +444,18 @@ public boolean trySharedLock( ResourceType resourceType, long resourceId )
return register( resourceType, false, resourceId ); return register( resourceType, false, resourceId );
} }


@Override
public boolean reEnterShared( ResourceType resourceType, long resourceId )
{
throw new UnsupportedOperationException();
}

@Override
public boolean reEnterExclusive( ResourceType resourceType, long resourceId )
{
throw new UnsupportedOperationException();
}

@Override @Override
public void releaseShared( ResourceType resourceType, long resourceId ) public void releaseShared( ResourceType resourceType, long resourceId )
{ {
Expand Down
Expand Up @@ -70,8 +70,6 @@ class SlaveLocksClient implements Locks.Client
private final AvailabilityGuard availabilityGuard; private final AvailabilityGuard availabilityGuard;


// Using atomic ints to avoid creating garbage through boxing. // Using atomic ints to avoid creating garbage through boxing.
private final Map<ResourceType, Map<Long, AtomicInteger>> sharedLocks;
private final Map<ResourceType, Map<Long, AtomicInteger>> exclusiveLocks;
private final Log log; private final Log log;
private boolean initialized; private boolean initialized;
private volatile boolean stopped; private volatile boolean stopped;
Expand All @@ -90,43 +88,31 @@ public SlaveLocksClient(
this.requestContextFactory = requestContextFactory; this.requestContextFactory = requestContextFactory;
this.availabilityGuard = availabilityGuard; this.availabilityGuard = availabilityGuard;
this.log = logProvider.getLog( getClass() ); this.log = logProvider.getLog( getClass() );
sharedLocks = new HashMap<>();
exclusiveLocks = new HashMap<>();
}

private Map<Long, AtomicInteger> getLockMap(
Map<ResourceType, Map<Long, AtomicInteger>> resourceMap,
ResourceType resourceType )
{
Map<Long, AtomicInteger> lockMap = resourceMap.get( resourceType );
if ( lockMap == null )
{
lockMap = new HashMap<>();
resourceMap.put( resourceType, lockMap );
}
return lockMap;
} }


@Override @Override
public void acquireShared( LockTracer tracer, ResourceType resourceType, long... resourceIds ) throws AcquireLockTimeoutException public void acquireShared( LockTracer tracer, ResourceType resourceType, long... resourceIds ) throws AcquireLockTimeoutException
{ {
assertNotStopped(); assertNotStopped();


Map<Long,AtomicInteger> lockMap = getLockMap( sharedLocks, resourceType ); long[] newResourceIds = firstTimeSharedLocks( resourceType, resourceIds );
long[] newResourceIds = onlyFirstTimeLocks( lockMap, resourceIds );
if ( newResourceIds.length > 0 ) if ( newResourceIds.length > 0 )
{ {
try ( LockWaitEvent event = tracer.waitForLock( false, resourceType, resourceIds ) ) try ( LockWaitEvent event = tracer.waitForLock( false, resourceType, newResourceIds ) )
{ {
acquireSharedOnMaster( resourceType, newResourceIds ); acquireSharedOnMaster( resourceType, newResourceIds );
} }
for ( long resourceId : newResourceIds ) catch ( Throwable failure )
{ {
if ( client.trySharedLock( resourceType, resourceId ) ) if ( resourceIds != newResourceIds )
{ {
lockMap.put( resourceId, new AtomicInteger( 1 ) ); releaseShared( resourceType, resourceIds, newResourceIds );
} }
else throw failure;
}
for ( long resourceId : newResourceIds )
{
if ( !client.trySharedLock( resourceType, resourceId ) )
{ {
throw new LocalDeadlockDetectedException( throw new LocalDeadlockDetectedException(
client, localLockManager, resourceType, resourceId, READ ); client, localLockManager, resourceType, resourceId, READ );
Expand All @@ -141,21 +127,24 @@ public void acquireExclusive( LockTracer tracer, ResourceType resourceType, long
{ {
assertNotStopped(); assertNotStopped();


Map<Long, AtomicInteger> lockMap = getLockMap( exclusiveLocks, resourceType ); long[] newResourceIds = firstTimeExclusiveLocks( resourceType, resourceIds );
long[] newResourceIds = onlyFirstTimeLocks( lockMap, resourceIds );
if ( newResourceIds.length > 0 ) if ( newResourceIds.length > 0 )
{ {
try ( LockWaitEvent event = tracer.waitForLock( true, resourceType, resourceIds ) ) try ( LockWaitEvent event = tracer.waitForLock( true, resourceType, newResourceIds ) )
{ {
acquireExclusiveOnMaster( resourceType, newResourceIds ); acquireExclusiveOnMaster( resourceType, newResourceIds );
} }
for ( long resourceId : newResourceIds ) catch ( Throwable failure )
{ {
if ( client.tryExclusiveLock( resourceType, resourceId ) ) if ( resourceIds != newResourceIds )
{ {
lockMap.put( resourceId, new AtomicInteger( 1 ) ); releaseExclusive( resourceType, resourceIds, newResourceIds );
} }
else throw failure;
}
for ( long resourceId : newResourceIds )
{
if ( !client.tryExclusiveLock( resourceType, resourceId ) )
{ {
throw new LocalDeadlockDetectedException( throw new LocalDeadlockDetectedException(
client, localLockManager, resourceType, resourceId, WRITE ); client, localLockManager, resourceType, resourceId, WRITE );
Expand All @@ -176,42 +165,32 @@ public boolean trySharedLock( ResourceType resourceType, long resourceId )
throw newUnsupportedDirectTryLockUsageException(); throw newUnsupportedDirectTryLockUsageException();
} }


@Override
public boolean reEnterShared( ResourceType resourceType, long resourceId )
{
throw new UnsupportedOperationException();
}

@Override
public boolean reEnterExclusive( ResourceType resourceType, long resourceId )
{
throw new UnsupportedOperationException();
}

@Override @Override
public void releaseShared( ResourceType resourceType, long resourceId ) public void releaseShared( ResourceType resourceType, long resourceId )
{ {
assertNotStopped(); assertNotStopped();


Map<Long, AtomicInteger> lockMap = getLockMap( sharedLocks, resourceType ); client.releaseShared( resourceType, resourceId );
AtomicInteger counter = lockMap.get( resourceId );
if ( counter == null )
{
throw new IllegalStateException( this + " cannot release lock it does not hold: SHARED " +
resourceType + "[" + resourceId + "]" );
}
if ( counter.decrementAndGet() == 0 )
{
lockMap.remove( resourceId );
client.releaseShared( resourceType, resourceId );
}
} }


@Override @Override
public void releaseExclusive( ResourceType resourceType, long resourceId ) public void releaseExclusive( ResourceType resourceType, long resourceId )
{ {
assertNotStopped(); assertNotStopped();


Map<Long, AtomicInteger> lockMap = getLockMap( exclusiveLocks, resourceType ); client.releaseExclusive( resourceType, resourceId );
AtomicInteger counter = lockMap.get( resourceId );
if ( counter == null )
{
throw new IllegalStateException( this + " cannot release lock it does not hold: EXCLUSIVE " +
resourceType + "[" + resourceId + "]" );
}
if ( counter.decrementAndGet() == 0 )
{
lockMap.remove( resourceId );
client.releaseExclusive( resourceType, resourceId );
}
} }


@Override @Override
Expand All @@ -226,8 +205,6 @@ public void stop()
public void close() public void close()
{ {
client.close(); client.close();
sharedLocks.clear();
exclusiveLocks.clear();
if ( initialized ) if ( initialized )
{ {
if ( !stopped ) if ( !stopped )
Expand Down Expand Up @@ -299,18 +276,29 @@ private void endLockSessionOnMaster( boolean success )
} }
} }


private long[] onlyFirstTimeLocks( Map<Long,AtomicInteger> lockMap, long[] resourceIds ) private long[] firstTimeSharedLocks( ResourceType resourceType, long[] resourceIds )
{ {
int cursor = 0; int cursor = 0;
for ( int i = 0; i < resourceIds.length; i++ ) for ( int i = 0; i < resourceIds.length; i++ )
{ {
AtomicInteger preExistingLock = lockMap.get( resourceIds[i] ); if ( !client.reEnterShared( resourceType, resourceIds[i] ) )
if ( preExistingLock != null )
{ {
// We already hold this lock, just increment the local reference count resourceIds[cursor++] = resourceIds[i];
preExistingLock.incrementAndGet();
} }
else }
if ( cursor == 0 )
{
return PrimitiveLongCollections.EMPTY_LONG_ARRAY;
}
return cursor == resourceIds.length ? resourceIds : Arrays.copyOf( resourceIds, cursor );
}

private long[] firstTimeExclusiveLocks( ResourceType resourceType, long[] resourceIds )
{
int cursor = 0;
for ( int i = 0; i < resourceIds.length; i++ )
{
if ( !client.reEnterExclusive( resourceType, resourceIds[i] ) )
{ {
resourceIds[cursor++] = resourceIds[i]; resourceIds[cursor++] = resourceIds[i];
} }
Expand All @@ -322,6 +310,36 @@ private long[] onlyFirstTimeLocks( Map<Long,AtomicInteger> lockMap, long[] resou
return cursor == resourceIds.length ? resourceIds : Arrays.copyOf( resourceIds, cursor ); return cursor == resourceIds.length ? resourceIds : Arrays.copyOf( resourceIds, cursor );
} }


private void releaseShared( ResourceType resourceType, long[] resourceIds, long[] excludedIds )
{
for ( int i = 0, j = 0; i < resourceIds.length; i++ )
{
if ( resourceIds[i] == excludedIds[j] )
{
j++;
}
else
{
client.releaseShared( resourceType, resourceIds[i] );
}
}
}

private void releaseExclusive( ResourceType resourceType, long[] resourceIds, long[] excludedIds )
{
for ( int i = 0, j = 0; i < resourceIds.length; i++ )
{
if ( resourceIds[i] == excludedIds[j] )
{
j++;
}
else
{
client.releaseShared( resourceType, resourceIds[i] );
}
}
}

private void acquireSharedOnMaster( ResourceType resourceType, long... resourceIds ) private void acquireSharedOnMaster( ResourceType resourceType, long... resourceIds )
{ {
if ( resourceType == ResourceTypes.NODE if ( resourceType == ResourceTypes.NODE
Expand Down

0 comments on commit b6ed3e6

Please sign in to comment.