Skip to content

Commit

Permalink
GEODE-9269: Make the lock holding has the same order. (apache#6470) (a…
Browse files Browse the repository at this point in the history
…pache#6498)

* When handling TXLockService, always trying to acquire destroy read lock
    before getting the synchronized batchLocks.
  * If destroy read lock is not acquired, thread will wait until the grantor
    is destroyed.

(cherry picked from commit 235cd1b)
  • Loading branch information
pivotal-eshu committed May 20, 2021
1 parent dd6a4e1 commit 3f8f9f9
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 83 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public class DLockGrantor {
*
* guarded.By batchLocks
*/
private final Map batchLocks = new HashMap();
final Map<Object, DLockBatch> batchLocks = new HashMap<>();

/**
* Handles special lock-reservation type for transactions.
Expand Down Expand Up @@ -472,7 +472,8 @@ private void throwIfDestroyed(boolean destroyed) {
/**
* Handles request for a batch of locks using optimization for transactions.
* <p>
* Synchronizes on {@link #batchLocks}.
* Acquires destroy read lock before synchronizing on {@link #batchLocks}.
* If read lock not acquired, wait for the Grantor to be destroyed.
*
* @throws LockGrantorDestroyedException if grantor is destroyed
*/
Expand All @@ -483,46 +484,51 @@ void handleLockBatch(DLockRequestMessage request) throws InterruptedException {
// when the member-departure is announced.
handler.waitForInProcessDepartures();

synchronized (this.batchLocks) { // assures serial processing
waitWhileInitializing();
if (request.checkForTimeout()) {
cleanupSuspendState(request);
return;
}

final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch]");
}
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
waitWhileInitializing();
if (request.checkForTimeout()) {
cleanupSuspendState(request);
return;
}
if (acquireDestroyReadLock(0)) {
try {
checkDestroyed();
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] request: {}",
request);
}
synchronized (batchLocks) { // assures serial processing
final boolean isTraceEnabled_DLS = logger.isTraceEnabled(LogMarker.DLS_VERBOSE);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch]");
}

DLockBatch batch = (DLockBatch) request.getObjectName();
checkIfHostDeparted(batch.getOwner());
resMgr.makeReservation((IdentityArrayList) batch.getReqs());
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] granting {}",
batch.getBatchId());
checkDestroyed();
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] request: {}",
request);
}

DLockBatch batch = (DLockBatch) request.getObjectName();
checkIfHostDeparted(batch.getOwner());
makeReservation(batch);
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.handleLockBatch] granting {}",
batch.getBatchId());
}
batchLocks.put(batch.getBatchId(), batch);
request.respondWithGrant(Long.MAX_VALUE);
}
this.batchLocks.put(batch.getBatchId(), batch);
request.respondWithGrant(Long.MAX_VALUE);
} catch (CommitConflictException ex) {
request.respondWithTryLockFailed(ex.getMessage());
} finally {
releaseDestroyReadLock();
}
} else {
waitUntilDestroyed();
checkDestroyed();
}
}

private void checkIfHostDeparted(InternalDistributedMember owner) {
void makeReservation(DLockBatch batch) {
resMgr.makeReservation((IdentityArrayList) batch.getReqs());
}

void checkIfHostDeparted(InternalDistributedMember owner) {
// Already held batchLocks; hold membersDepartedTime lock just for clarity
synchronized (membersDepartedTime) {
// the transaction host/txLock requester has departed.
Expand All @@ -543,7 +549,7 @@ private void checkIfHostDeparted(InternalDistributedMember owner) {
*/
public DLockBatch[] getLockBatches(InternalDistributedMember owner) {
// Key: Object batchId, Value: DLockBatch batch
synchronized (this.batchLocks) {
synchronized (batchLocks) {
// put owner into the map first so that no new threads will handle in-flight requests
// from the departed member to lock keys
recordMemberDepartedTime(owner);
Expand Down Expand Up @@ -587,7 +593,8 @@ Map getMembersDepartedTimeRecords() {
* Get the batch for the given batchId (for example use a txLockId from TXLockBatch in order to
* update its participants). This operation was added as part of the solution to bug 32999.
* <p>
* Acquires acquireDestroyReadLock. Synchronizes on batchLocks.
* Acquires destroy read lock before synchronizing on {@link #batchLocks}.
* If read lock not acquired, wait for the Grantor to be destroyed.
* <p>
* see org.apache.geode.internal.cache.TXCommitMessage#updateLockMembers()
*
Expand All @@ -602,18 +609,20 @@ public DLockBatch getLockBatch(Object batchId) throws InterruptedException {
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.getLockBatch] enter: {}", batchId);
}
synchronized (this.batchLocks) {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}

waitWhileInitializing();
if (acquireDestroyReadLock(0)) {
try {
checkDestroyed();
ret = (DLockBatch) this.batchLocks.get(batchId);
synchronized (batchLocks) {
checkDestroyed();
ret = batchLocks.get(batchId);
}
} finally {
releaseDestroyReadLock();
}
} else {
waitUntilDestroyed();
checkDestroyed();
}
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.getLockBatch] exit: {}", batchId);
Expand All @@ -625,7 +634,8 @@ public DLockBatch getLockBatch(Object batchId) throws InterruptedException {
* Update the batch for the given batch. This operation was added as part of the solution to bug
* 32999.
* <p>
* Acquires acquireDestroyReadLock. Synchronizes on batchLocks.
* Acquires destroy read lock before synchronizing on {@link #batchLocks}.
* If read lock not acquired, wait for the Grantor to be destroyed.
* <p>
* see org.apache.geode.internal.cache.locks.TXCommitMessage#updateLockMembers()
*
Expand All @@ -639,21 +649,22 @@ public void updateLockBatch(Object batchId, DLockBatch newBatch) throws Interrup
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.updateLockBatch] enter: {}", batchId);
}
synchronized (this.batchLocks) {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
waitWhileInitializing();
if (acquireDestroyReadLock(0)) {
try {
checkDestroyed();
final DLockBatch oldBatch = (DLockBatch) this.batchLocks.get(batchId);
if (oldBatch != null) {
this.batchLocks.put(batchId, newBatch);
synchronized (batchLocks) {
checkDestroyed();
final DLockBatch oldBatch = batchLocks.get(batchId);
if (oldBatch != null) {
batchLocks.put(batchId, newBatch);
}
}
} finally {
releaseDestroyReadLock();
}
} else {
waitUntilDestroyed();
checkDestroyed();
}
if (isTraceEnabled_DLS) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.updateLockBatch] exit: {}", batchId);
Expand All @@ -663,7 +674,8 @@ public void updateLockBatch(Object batchId, DLockBatch newBatch) throws Interrup
/**
* Releases the transaction optimized lock batch.
* <p>
* Acquires acquireDestroyReadLock. Synchronizes on batchLocks.
* Acquires destroy read lock before synchronizing on {@link #batchLocks}.
* If read lock not acquired, wait for the Grantor to be destroyed.
*
* @param batchId the identify of the transaction lock batch to release
* @param owner the member that has created and locked the lock batch
Expand All @@ -674,24 +686,29 @@ public void releaseLockBatch(Object batchId, InternalDistributedMember owner)
if (logger.isTraceEnabled(LogMarker.DLS_VERBOSE)) {
logger.trace(LogMarker.DLS_VERBOSE, "[DLockGrantor.releaseLockBatch]");
}
synchronized (this.batchLocks) {
waitWhileInitializing();
if (!acquireDestroyReadLock(0)) {
waitUntilDestroyed();
checkDestroyed();
}
waitWhileInitializing();
if (acquireDestroyReadLock(0)) {
try {
checkDestroyed();
DLockBatch batch = (DLockBatch) this.batchLocks.remove(batchId);
if (batch != null) {
this.resMgr.releaseReservation((IdentityArrayList) batch.getReqs());
synchronized (batchLocks) {
checkDestroyed();
DLockBatch batch = batchLocks.remove(batchId);
if (batch != null) {
releaseReservation(batch);
}
}
} finally {
releaseDestroyReadLock();
}
} else {
waitUntilDestroyed();
checkDestroyed();
}
}

void releaseReservation(DLockBatch batch) {
resMgr.releaseReservation((IdentityArrayList) batch.getReqs());
}

/**
* Returns true if the request comes from the local member.
*
Expand Down Expand Up @@ -1385,7 +1402,7 @@ void postRemoteReleaseLock(Object objectName) throws InterruptedException {
* @return true if destroy read lock was acquired
* @throws DistributedSystemDisconnectedException if system has been disconnected
*/
private boolean acquireDestroyReadLock(long millis) throws InterruptedException {
boolean acquireDestroyReadLock(long millis) throws InterruptedException {
boolean interrupted = Thread.interrupted();
try {
if (interrupted && this.dlock.isInterruptibleLockRequest()) {
Expand All @@ -1411,7 +1428,7 @@ private boolean acquireDestroyReadLock(long millis) throws InterruptedException
/**
* Releases a read lock on the destroy ReadWrite lock.
*/
private void releaseDestroyReadLock() {
void releaseDestroyReadLock() {
this.destroyLock.readLock().unlock();
}

Expand Down

0 comments on commit 3f8f9f9

Please sign in to comment.