Skip to content

Commit

Permalink
Revert fenced lock proxy to local re-entrancy
Browse files Browse the repository at this point in the history
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent bd2eb6d commit a0985c6
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,25 @@
import com.hazelcast.raft.service.session.SessionAwareProxy;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.util.Clock;
import com.hazelcast.util.UuidUtil;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;
import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
import static com.hazelcast.util.Preconditions.checkNotNull;
import static com.hazelcast.util.ThreadUtil.getThreadId;
import static com.hazelcast.util.UuidUtil.newUnsecureUUID;

/**
* TODO: Javadoc Pending...
*/
public abstract class AbstractRaftFencedLockProxy extends SessionAwareProxy implements FencedLock {

// thread id -> lock state
private final ConcurrentMap<Long, LockState> lockStates = new ConcurrentHashMap<Long, LockState>();
protected final String name;

public AbstractRaftFencedLockProxy(AbstractSessionManager sessionManager, RaftGroupId groupId, String name) {
Expand All @@ -49,11 +53,18 @@ public AbstractRaftFencedLockProxy(AbstractSessionManager sessionManager, RaftGr
@Override
public final long lock() {
long threadId = getThreadId();
UUID invocationUid = UuidUtil.newUnsecureUUID();
long fence = tryReentrantLock(threadId);
if (fence != INVALID_FENCE) {
return fence;
}

UUID invocationUid = newUnsecureUUID();
for (;;) {
long sessionId = acquireSession();
try {
return doLock(groupId, name, sessionId, threadId, invocationUid).join();
fence = doLock(groupId, name, sessionId, threadId, invocationUid).join();
lockStates.put(threadId, new LockState(sessionId, fence));
return fence;
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
}
Expand All @@ -70,15 +81,22 @@ public final long tryLock(long time, TimeUnit unit) {
checkNotNull(unit);

long threadId = getThreadId();
UUID invocationUid = UuidUtil.newUnsecureUUID();
long fence = tryReentrantLock(threadId);
if (fence != INVALID_FENCE) {
return fence;
}

UUID invocationUid = newUnsecureUUID();
long timeoutMillis = Math.max(0, unit.toMillis(time));
long start;
for (;;) {
start = Clock.currentTimeMillis();
long sessionId = acquireSession();
try {
long fence = doTryLock(groupId, name, sessionId, threadId, invocationUid, timeoutMillis).join();
if (fence == INVALID_FENCE) {
fence = doTryLock(groupId, name, sessionId, threadId, invocationUid, timeoutMillis).join();
if (fence != INVALID_FENCE) {
lockStates.put(threadId, new LockState(sessionId, fence));
} else {
releaseSession(sessionId);
}
return fence;
Expand All @@ -92,30 +110,62 @@ public final long tryLock(long time, TimeUnit unit) {
}
}

private long tryReentrantLock(long threadId) {
LockState lockState = lockStates.get(threadId);
if (lockState != null) {
if (lockState.sessionId == getSession()) {
lockState.lockCount++;
return lockState.fence;
}
lockStates.remove(threadId);
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "] because Session["
+ lockState.sessionId + "] is closed by server!");
}
return INVALID_FENCE;
}

@Override
public final void unlock() {
long sessionId = getSession();
if (sessionId == NO_SESSION_ID) {
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name
+ "] because session not found!");
}
UUID invocationUid = UuidUtil.newUnsecureUUID();
long threadId = getThreadId();
LockState lockState = lockStates.get(threadId);
if (lockState == null) {
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "]");
}
if (lockState.sessionId != sessionId) {
lockStates.remove(threadId);
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "] because Session["
+ lockState.sessionId + "] is closed by server!");
}
if (lockState.lockCount > 1) {
lockState.lockCount--;
return;
}

try {
doUnlock(groupId, name, sessionId, getThreadId(), invocationUid).join();
doUnlock(groupId, name, sessionId, threadId, newUnsecureUUID()).join();
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "] because Session["
+ sessionId + "] is closed by server!");
} finally {
lockStates.remove(threadId);
releaseSession(sessionId);
}
}

@Override
public final void forceUnlock() {
long fence = doGetLockFence(groupId, name, NO_SESSION_ID, 0).join();
UUID invocationUid = UuidUtil.newUnsecureUUID();
doForceUnlock(groupId, name, fence, invocationUid).join();
try {
long fence = doGetLockFence(groupId, name, NO_SESSION_ID, 0).join();
doForceUnlock(groupId, name, fence, newUnsecureUUID()).join();
} finally {
lockStates.remove(getThreadId());
}
}

@Override
Expand All @@ -125,7 +175,12 @@ public final long getFence() {
throw new IllegalMonitorStateException();
}

return doGetLockFence(groupId, name, sessionId, getThreadId()).join();
LockState lockState = lockStates.get(getThreadId());
if (lockState == null) {
throw new IllegalMonitorStateException();
}

return lockState.fence;
}

@Override
Expand All @@ -135,16 +190,17 @@ public final boolean isLocked() {

@Override
public final boolean isLockedByCurrentThread() {
long sessionId = getSession();
if (sessionId == NO_SESSION_ID) {
return false;
}

return doGetLockCount(groupId, name, sessionId, getThreadId()).join() > 0;
LockState lockState = lockStates.get(getThreadId());
return (lockState != null && lockState.sessionId == getSession());
}

@Override
public final int getLockCount() {
LockState lockState = lockStates.get(getThreadId());
if (lockState != null && lockState.sessionId == getSession()) {
return lockState.lockCount;
}

return doGetLockCount(groupId, name, NO_SESSION_ID, 0).join();
}

Expand Down Expand Up @@ -181,4 +237,15 @@ protected abstract InternalCompletableFuture<Long> doGetLockFence(RaftGroupId gr
protected abstract InternalCompletableFuture<Integer> doGetLockCount(RaftGroupId groupId, String name, long sessionId,
long threadId);

private static class LockState {
final long sessionId;
final long fence;
int lockCount;

LockState(long sessionId, long fence) {
this.sessionId = sessionId;
this.fence = fence;
this.lockCount = 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ public void run() {
}
});

sleepSeconds(3);
lock.unlock();

assertTrueEventually(new AssertTask() {
Expand Down Expand Up @@ -449,11 +448,10 @@ public void run() {
}
}).get();

long fence2 = lock.lock();
lock.lock();

assertTrue(fence2 > fence1);
assertTrue(lock.isLockedByCurrentThread());
assertEquals(1, lock.getLockCount());
assertEquals(2, lock.getLockCount());
}

@Test(timeout = 60000)
Expand Down

0 comments on commit a0985c6

Please sign in to comment.