Skip to content

Commit

Permalink
New FencedLock proxy implementation with local reentrancy
Browse files Browse the repository at this point in the history
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent f160d0b commit d547f0d
Show file tree
Hide file tree
Showing 27 changed files with 1,062 additions and 506 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.RaftGroupIdImpl;
import com.hazelcast.raft.service.lock.FencedLock;
import com.hazelcast.raft.service.lock.RaftLockOwnershipState;
import com.hazelcast.raft.service.lock.proxy.AbstractRaftFencedLockProxy;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.spi.InternalCompletableFuture;
Expand All @@ -36,14 +37,12 @@
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.CREATE_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.DESTROY_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.FORCE_UNLOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_COUNT_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_FENCE_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_OWNERSHIP_STATE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.TRY_LOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.UNLOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.BOOLEAN_RESPONSE_DECODER;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.INT_RESPONSE_DECODER;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.LONG_RESPONSE_DECODER;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.LOCK_OWNERSHIP_STATE_RESPONSE_DECODER;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.encodeRequest;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.invoke;
import static com.hazelcast.raft.service.lock.client.RaftLockProxy.prepareClientMessage;
Expand Down Expand Up @@ -86,40 +85,41 @@ private RaftFencedLockProxy(HazelcastInstance instance, RaftGroupId groupId, Str
}

@Override
protected InternalCompletableFuture<Long> doLock(RaftGroupId groupId, String name, long sessionId, long threadId, UUID invocationUid) {
protected final InternalCompletableFuture<RaftLockOwnershipState> doLock(RaftGroupId groupId, String name,
long sessionId, long threadId,
UUID invocationUid) {
ClientMessage msg = encodeRequest(LOCK_TYPE, groupId, name, sessionId, threadId, invocationUid);
return invoke(client, name, msg, LONG_RESPONSE_DECODER);
return invoke(client, name, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Long> doTryLock(RaftGroupId groupId, String name, long sessionId, long threadId, UUID invocationUid,
long timeoutMillis) {
protected final InternalCompletableFuture<RaftLockOwnershipState> doTryLock(RaftGroupId groupId, String name,
long sessionId, long threadId,
UUID invocationUid, long timeoutMillis) {
ClientMessage msg = encodeRequest(TRY_LOCK_TYPE, groupId, name, sessionId, threadId, invocationUid, timeoutMillis);
return invoke(client, name, msg, LONG_RESPONSE_DECODER);
return invoke(client, name, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Object> doUnlock(RaftGroupId groupId, String name, long sessionId, long threadId, UUID invocationUid) {
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, name, sessionId, threadId, invocationUid);
protected final InternalCompletableFuture<Object> doUnlock(RaftGroupId groupId, String name,
long sessionId, long threadId,
UUID invocationUid, int releaseCount) {
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, name, sessionId, threadId, invocationUid, releaseCount);
return invoke(client, name, msg, BOOLEAN_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Object> doForceUnlock(RaftGroupId groupId, String name, long expectedFence, UUID invocationUid) {
protected final InternalCompletableFuture<Object> doForceUnlock(RaftGroupId groupId, String name,
UUID invocationUid, long expectedFence) {
ClientMessage msg = encodeRequest(FORCE_UNLOCK_TYPE, groupId, name, -1, -1, invocationUid, expectedFence);
return invoke(client, name, msg, BOOLEAN_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Long> doGetLockFence(RaftGroupId groupId, String name, long sessionId, long threadId) {
ClientMessage msg = encodeRequest(LOCK_FENCE_TYPE, groupId, name, sessionId, threadId);
return invoke(client, name, msg, LONG_RESPONSE_DECODER);
}

@Override
protected InternalCompletableFuture<Integer> doGetLockCount(RaftGroupId groupId, String name, long sessionId, long threadId) {
ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, sessionId, threadId);
return invoke(client, name, msg, INT_RESPONSE_DECODER);
protected final InternalCompletableFuture<RaftLockOwnershipState> doGetLockOwnershipState(RaftGroupId groupId,
String name) {
ClientMessage msg = encodeRequest(LOCK_OWNERSHIP_STATE, groupId, name, -1, -1);
return invoke(client, name, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,38 @@
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.RaftGroupIdImpl;
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.service.lock.RaftLockService;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.lock.RaftLockOwnershipState;
import com.hazelcast.raft.service.lock.RaftLockService;
import com.hazelcast.raft.service.session.SessionAwareProxy;
import com.hazelcast.raft.service.session.SessionManagerProvider;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.util.UuidUtil;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;

import static com.hazelcast.client.impl.protocol.util.ParameterUtil.calculateDataSize;
import static com.hazelcast.raft.impl.RaftGroupIdImpl.dataSize;
import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.CREATE_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.DESTROY_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.FORCE_UNLOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_COUNT_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_FENCE_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_OWNERSHIP_STATE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.TRY_LOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.UNLOCK_TYPE;
import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
import static com.hazelcast.raft.service.util.ClientAccessor.getClient;
import static com.hazelcast.util.ThreadUtil.getThreadId;
import static com.hazelcast.util.UuidUtil.newUnsecureUUID;

/**
* TODO: Javadoc Pending...
*/
public class RaftLockProxy extends SessionAwareProxy implements ILock {

static final ClientMessageDecoder INT_RESPONSE_DECODER = new IntResponseDecoder();
static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder();
static final ClientMessageDecoder LONG_RESPONSE_DECODER = new LongResponseDecoder();
static final ClientMessageDecoder LOCK_OWNERSHIP_STATE_RESPONSE_DECODER = new RaftLockOwnershipStateResponseDecoder();

public static ILock create(HazelcastInstance instance, String name) {
int dataSize = ClientMessage.HEADER_SIZE + calculateDataSize(name);
Expand Down Expand Up @@ -101,12 +99,14 @@ private RaftLockProxy(HazelcastInstance instance, RaftGroupId groupId, String na

@Override
public void lock() {
UUID invUid = UuidUtil.newUnsecureUUID();
UUID invUid = newUnsecureUUID();
for (;;) {
long sessionId = acquireSession();
ClientMessage msg = encodeRequest(LOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid);
try {
invoke(client, name, msg, LONG_RESPONSE_DECODER).join();
RaftLockOwnershipState ownership = RaftLockProxy.<RaftLockOwnershipState>invoke(client, name, msg,
LOCK_OWNERSHIP_STATE_RESPONSE_DECODER).join();
assert ownership.isLocked();
break;
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
Expand All @@ -121,18 +121,18 @@ public boolean tryLock() {

@Override
public boolean tryLock(long time, TimeUnit unit) {
UUID invUid = UuidUtil.newUnsecureUUID();
UUID invUid = newUnsecureUUID();
long timeoutMs = Math.max(0, unit.toMillis(time));
for (;;) {
long sessionId = acquireSession();
ClientMessage msg = encodeRequest(TRY_LOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid, timeoutMs);
try {
InternalCompletableFuture<Long> future = invoke(client, name, msg, LONG_RESPONSE_DECODER);
boolean locked = (future.join() != INVALID_FENCE);
if (!locked) {
releaseSession(sessionId);
RaftLockOwnershipState ownership = RaftLockProxy.<RaftLockOwnershipState>invoke(client, name, msg,
LOCK_OWNERSHIP_STATE_RESPONSE_DECODER).join();
if (ownership.isLocked()) {
return ownership.isLocked();
}
return locked;
releaseSession(sessionId);
} catch (WaitKeyCancelledException e) {
return false;
} catch (SessionExpiredException e) {
Expand All @@ -147,8 +147,8 @@ public void unlock() {
if (sessionId == NO_SESSION_ID) {
throw new IllegalMonitorStateException();
}
UUID invUid = UuidUtil.newUnsecureUUID();
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid);
UUID invUid = newUnsecureUUID();
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid, 1);
try {
invoke(client, name, msg, BOOLEAN_RESPONSE_DECODER).join();
} catch (SessionExpiredException e) {
Expand All @@ -169,20 +169,23 @@ public boolean isLockedByCurrentThread() {
if (sessionId == NO_SESSION_ID) {
return false;
}
ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, sessionId, getThreadId());
InternalCompletableFuture<Integer> future = invoke(client, name, msg, INT_RESPONSE_DECODER);
return future.join() > 0;

ClientMessage msg = encodeRequest(LOCK_OWNERSHIP_STATE, groupId, name, -1, -1);
InternalCompletableFuture<RaftLockOwnershipState> f = invoke(client, name, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
RaftLockOwnershipState ownership = f.join();
return (ownership.getSessionId() == sessionId && ownership.getThreadId() == getThreadId());
}

@Override
public int getLockCount() {
ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, NO_SESSION_ID, 0);
InternalCompletableFuture<Integer> future = invoke(client, name, msg, INT_RESPONSE_DECODER);
return future.join();
ClientMessage msg = encodeRequest(LOCK_OWNERSHIP_STATE, groupId, name, -1, -1);
InternalCompletableFuture<RaftLockOwnershipState> f = invoke(client, name, msg, LOCK_OWNERSHIP_STATE_RESPONSE_DECODER);
RaftLockOwnershipState ownership = f.join();
return ownership.getLockCount();
}

@Override
public boolean tryLock(long time, TimeUnit unit, long leaseTime, TimeUnit leaseUnit) throws InterruptedException {
public boolean tryLock(long time, TimeUnit unit, long leaseTime, TimeUnit leaseUnit) {
throw new UnsupportedOperationException();
}

Expand All @@ -193,15 +196,14 @@ public void lock(long leaseTime, TimeUnit timeUnit) {

@Override
public void forceUnlock() {
ClientMessage msg = encodeRequest(LOCK_FENCE_TYPE, groupId, name, -1, -1);
long fence = RaftLockProxy.<Long>invoke(client, name, msg, LONG_RESPONSE_DECODER).join();

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 5;
msg = prepareClientMessage(groupId, name, dataSize, FORCE_UNLOCK_TYPE);
setRequestParams(msg, -1, -1, UuidUtil.newUnsecureUUID());
msg.set(fence);
msg.updateFrameLength();
ClientMessage msg = encodeRequest(LOCK_OWNERSHIP_STATE, groupId, name, -1, -1);
RaftLockOwnershipState ownership = RaftLockProxy.<RaftLockOwnershipState>invoke(client, name, msg,
LOCK_OWNERSHIP_STATE_RESPONSE_DECODER).join();
if (!ownership.isLocked()) {
throw new IllegalMonitorStateException("Lock[" + name + "] has no owner!");
}

msg = encodeRequest(FORCE_UNLOCK_TYPE, groupId, name, -1, -1, newUnsecureUUID(), ownership.getFence());
invoke(client, name, msg, BOOLEAN_RESPONSE_DECODER).join();
}

Expand All @@ -221,7 +223,7 @@ public long getRemainingLeaseTime() {
}

@Override
public void lockInterruptibly() throws InterruptedException {
public void lockInterruptibly() {
throw new UnsupportedOperationException();
}

Expand Down Expand Up @@ -270,6 +272,17 @@ static ClientMessage encodeRequest(int messageTypeId, RaftGroupId groupId, Strin
return msg;
}

static ClientMessage encodeRequest(int messageTypeId, RaftGroupId groupId, String name, long sessionId,
long threadId, UUID invUid, int val) {
int dataSize = ClientMessage.HEADER_SIZE
+ dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4 + Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, messageTypeId);
setRequestParams(msg, sessionId, threadId, invUid);
msg.set(val);
msg.updateFrameLength();
return msg;
}

static ClientMessage encodeRequest(int messageTypeId, RaftGroupId groupId, String name, long sessionId,
long threadId, UUID invUid, long val) {

Expand Down Expand Up @@ -309,24 +322,21 @@ static ClientMessage prepareClientMessage(RaftGroupId groupId, String name, int
return msg;
}

private static class IntResponseDecoder implements ClientMessageDecoder {
@Override
public Integer decodeClientMessage(ClientMessage msg) {
return msg.getInt();
}
}

private static class BooleanResponseDecoder implements ClientMessageDecoder {
@Override
public Boolean decodeClientMessage(ClientMessage msg) {
return msg.getBoolean();
}
}

private static class LongResponseDecoder implements ClientMessageDecoder {
private static class RaftLockOwnershipStateResponseDecoder implements ClientMessageDecoder {
@Override
public Long decodeClientMessage(ClientMessage msg) {
return msg.getLong();
public RaftLockOwnershipState decodeClientMessage(ClientMessage msg) {
long fence = msg.getLong();
int lockCount = msg.getInt();
long sessionId = msg.getLong();
long threadId = msg.getLong();
return new RaftLockOwnershipState(fence, lockCount, sessionId, threadId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public interface FencedLock extends DistributedObject {

boolean isLockedByCurrentThread();

// returns the true lock count if the lock is acquired by the caller endpoint
// returns 1 if the lock is acquired by another endpoint because reentrant acquires are local
// returns 0 otherwise
int getLockCount();

RaftGroupId getGroupId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@

import java.util.UUID;

import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;

/**
* TODO: Javadoc Pending...
*/
Expand Down Expand Up @@ -56,26 +54,25 @@ AcquireResult acquire(String name, LockEndpoint endpoint, long commitIndex, UUID
AcquireResult tryAcquire(String name, LockEndpoint endpoint, long commitIndex, UUID invocationUid, long timeoutMs) {
boolean wait = (timeoutMs > 0);
AcquireResult result = getOrInitResource(name).acquire(endpoint, commitIndex, invocationUid, wait);
long fence = result.fence;

for (LockInvocationKey waitKey : result.cancelled) {
removeWaitKey(waitKey);
}

if (wait && fence == INVALID_FENCE) {
if (wait && !result.ownership.isLocked()) {
addWaitKey(new LockInvocationKey(name, endpoint, commitIndex, invocationUid), timeoutMs);
}

return result;
}

ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid) {
ReleaseResult release(String name, LockEndpoint endpoint, UUID invocationUid, int lockCount) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
return ReleaseResult.FAILED;
}

ReleaseResult result = lock.release(endpoint, invocationUid);
ReleaseResult result = lock.release(endpoint, invocationUid, lockCount);
for (LockInvocationKey key : result.notifications) {
removeWaitKey(key);
}
Expand All @@ -97,34 +94,9 @@ ReleaseResult forceRelease(String name, long expectedFence, UUID invocationUid)
return result;
}

int getLockCount(String name, LockEndpoint endpoint) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
return 0;
}

if (endpoint != null) {
LockInvocationKey owner = lock.owner();
return (owner != null && endpoint.equals(owner.endpoint())) ? lock.lockCount() : 0;
}

return lock.lockCount();
}

long getLockFence(String name, LockEndpoint endpoint) {
RaftLockOwnershipState getLockOwnershipState(String name) {
RaftLock lock = getResourceOrNull(name);
if (lock == null) {
throw new IllegalMonitorStateException();
}

LockInvocationKey owner = lock.owner();
if (owner == null) {
throw new IllegalMonitorStateException("Lock[" + name + "] has no owner!");
} else if (endpoint != null && !owner.endpoint().equals(endpoint)) {
throw new IllegalMonitorStateException("Lock[" + name + "] is owned by " + owner.endpoint() + "!");
}

return owner.commitIndex();
return lock != null ? lock.lockOwnershipState() : RaftLockOwnershipState.NOT_LOCKED;
}

@Override
Expand Down

0 comments on commit d547f0d

Please sign in to comment.