Skip to content

Commit

Permalink
Make fenced lock re-entrancy non-local
Browse files Browse the repository at this point in the history
Consider the following scenario:

1) client.lock();
Successful.
2) The client encounters a network partition
3) client.unlock();
Fails with operation timeout and also not executed
on the server, but still local lock state is deleted.
4) Network partition issue is gone. The client still owns the lock.
5) client.getLockCount();
Returns 1, because client asks to the server.
6) client.lock();
Successful
7) client.getLockCount();
Returns 1, because client uses the local lock state to get the count,
but the actual lock count is 2.

Keeping local lock state is a difficult task and easily leads to
problematic scenarios as above.
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent 135162e commit 60cda09
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.lock.FencedLock;
import com.hazelcast.raft.service.lock.RaftLockService;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.session.SessionAwareProxy;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.util.Clock;

import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;
Expand All @@ -43,8 +41,6 @@
public abstract class AbstractRaftFencedLockProxy extends SessionAwareProxy implements FencedLock {

protected final String name;
// thread id -> lock state
private final ConcurrentMap<Long, LockState> lockStates = new ConcurrentHashMap<Long, LockState>();

public AbstractRaftFencedLockProxy(AbstractSessionManager sessionManager, RaftGroupId groupId, String name) {
super(sessionManager, groupId);
Expand All @@ -54,18 +50,11 @@ public AbstractRaftFencedLockProxy(AbstractSessionManager sessionManager, RaftGr
@Override
public final long lock() {
long threadId = getThreadId();
long fence = tryReentrantLock(threadId);
if (fence != INVALID_FENCE) {
return fence;
}

UUID invocationUid = newUnsecureUUID();
for (;;) {
long sessionId = acquireSession();
try {
fence = doLock(groupId, name, sessionId, threadId, invocationUid).join();
lockStates.put(threadId, new LockState(sessionId, fence));
return fence;
return doLock(groupId, name, sessionId, threadId, invocationUid).join();
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
}
Expand All @@ -82,22 +71,15 @@ public final long tryLock(long time, TimeUnit unit) {
checkNotNull(unit);

long threadId = getThreadId();
long fence = tryReentrantLock(threadId);
if (fence != INVALID_FENCE) {
return fence;
}

UUID invocationUid = newUnsecureUUID();
long timeoutMillis = Math.max(0, unit.toMillis(time));
long start;
for (;;) {
start = Clock.currentTimeMillis();
long sessionId = acquireSession();
try {
fence = doTryLock(groupId, name, sessionId, threadId, invocationUid, timeoutMillis).join();
if (fence != INVALID_FENCE) {
lockStates.put(threadId, new LockState(sessionId, fence));
} else {
long fence = doTryLock(groupId, name, sessionId, threadId, invocationUid, timeoutMillis).join();
if (fence == INVALID_FENCE) {
releaseSession(sessionId);
}
return fence;
Expand All @@ -113,62 +95,28 @@ public final long tryLock(long time, TimeUnit unit) {
}
}

private long tryReentrantLock(long threadId) {
LockState lockState = lockStates.get(threadId);
if (lockState != null) {
if (lockState.sessionId == getSession()) {
lockState.lockCount++;
return lockState.fence;
}
lockStates.remove(threadId);
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "] because Session["
+ lockState.sessionId + "] is closed by server!");
}
return INVALID_FENCE;
}

@Override
public final void unlock() {
long sessionId = getSession();
if (sessionId == NO_SESSION_ID) {
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name
+ "] because session not found!");
}
long threadId = getThreadId();
LockState lockState = lockStates.get(threadId);
if (lockState == null) {
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "]");
}
if (lockState.sessionId != sessionId) {
lockStates.remove(threadId);
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "] because Session["
+ lockState.sessionId + "] is closed by server!");
}
if (lockState.lockCount > 1) {
lockState.lockCount--;
return;
}

try {
doUnlock(groupId, name, sessionId, threadId, newUnsecureUUID()).join();
doUnlock(groupId, name, sessionId, getThreadId(), newUnsecureUUID()).join();
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
throw new IllegalMonitorStateException("Current thread is not owner of the Lock[" + name + "] because Session["
+ sessionId + "] is closed by server!");
} finally {
lockStates.remove(threadId);
releaseSession(sessionId);
}
}

@Override
public final void forceUnlock() {
try {
long fence = doGetLockFence(groupId, name, NO_SESSION_ID, 0).join();
doForceUnlock(groupId, name, fence, newUnsecureUUID()).join();
} finally {
lockStates.remove(getThreadId());
}
long fence = doGetLockFence(groupId, name, NO_SESSION_ID, 0).join();
doForceUnlock(groupId, name, fence, newUnsecureUUID()).join();
}

@Override
Expand All @@ -178,12 +126,7 @@ public final long getFence() {
throw new IllegalMonitorStateException();
}

LockState lockState = lockStates.get(getThreadId());
if (lockState == null) {
throw new IllegalMonitorStateException();
}

return lockState.fence;
return doGetLockFence(groupId, name, sessionId, getThreadId()).join();
}

@Override
Expand All @@ -193,17 +136,16 @@ public final boolean isLocked() {

@Override
public final boolean isLockedByCurrentThread() {
LockState lockState = lockStates.get(getThreadId());
return (lockState != null && lockState.sessionId == getSession());
long sessionId = getSession();
if (sessionId == NO_SESSION_ID) {
return false;
}

return doGetLockCount(groupId, name, sessionId, getThreadId()).join() > 0;
}

@Override
public final int getLockCount() {
LockState lockState = lockStates.get(getThreadId());
if (lockState != null && lockState.sessionId == getSession()) {
return lockState.lockCount;
}

return doGetLockCount(groupId, name, NO_SESSION_ID, 0).join();
}

Expand Down Expand Up @@ -240,15 +182,4 @@ protected abstract InternalCompletableFuture<Long> doGetLockFence(RaftGroupId gr
protected abstract InternalCompletableFuture<Integer> doGetLockCount(RaftGroupId groupId, String name, long sessionId,
long threadId);

private static class LockState {
final long sessionId;
final long fence;
int lockCount;

LockState(long sessionId, long fence) {
this.sessionId = sessionId;
this.fence = fence;
this.lockCount = 1;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ public void run() {
lock.lock();

assertTrue(lock.isLockedByCurrentThread());
assertEquals(2, lock.getLockCount());
assertEquals(1, lock.getLockCount());
}

@Test(timeout = 60000)
Expand Down

0 comments on commit 60cda09

Please sign in to comment.