Skip to content

Commit

Permalink
Raft lock & semaphore proxy clean up
Browse files Browse the repository at this point in the history
* Make fenced lock proxy stateless
* Do not retry on OperationTimeoutException
* Minor cleanups & refactorings
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent 1fb9eaa commit e70df3c
Show file tree
Hide file tree
Showing 15 changed files with 129 additions and 247 deletions.
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public RaftGroupId decodeClientMessage(ClientMessage msg) {


private final HazelcastClientInstanceImpl client; private final HazelcastClientInstanceImpl client;


public RaftFencedLockProxy(HazelcastInstance instance, RaftGroupId groupId, String name) { private RaftFencedLockProxy(HazelcastInstance instance, RaftGroupId groupId, String name) {
super(SessionManagerProvider.get(getClient(instance)), groupId, name); super(SessionManagerProvider.get(getClient(instance)), groupId, name);
this.client = getClient(instance); this.client = getClient(instance);
} }
Expand Down Expand Up @@ -111,14 +111,14 @@ protected InternalCompletableFuture<Object> doForceUnlock(RaftGroupId groupId, S
} }


@Override @Override
protected InternalCompletableFuture<Long> doGetLockFence(RaftGroupId groupId, String name) { protected InternalCompletableFuture<Long> doGetLockFence(RaftGroupId groupId, String name, long sessionId, long threadId) {
ClientMessage msg = encodeRequest(LOCK_FENCE_TYPE, groupId, name, -1, -1); ClientMessage msg = encodeRequest(LOCK_FENCE_TYPE, groupId, name, sessionId, threadId);
return invoke(client, name, msg, LONG_RESPONSE_DECODER); return invoke(client, name, msg, LONG_RESPONSE_DECODER);
} }


@Override @Override
protected InternalCompletableFuture<Integer> doGetLockCount(RaftGroupId groupId, String name) { protected InternalCompletableFuture<Integer> doGetLockCount(RaftGroupId groupId, String name, long sessionId, long threadId) {
ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, -1, -1); ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, sessionId, threadId);
return invoke(client, name, msg, INT_RESPONSE_DECODER); return invoke(client, name, msg, INT_RESPONSE_DECODER);
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@


import static com.hazelcast.client.impl.protocol.util.ParameterUtil.calculateDataSize; import static com.hazelcast.client.impl.protocol.util.ParameterUtil.calculateDataSize;
import static com.hazelcast.raft.impl.RaftGroupIdImpl.dataSize; import static com.hazelcast.raft.impl.RaftGroupIdImpl.dataSize;
import static com.hazelcast.raft.service.lock.RaftLockService.INVALID_FENCE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.CREATE_TYPE; import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.CREATE_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.DESTROY_TYPE; import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.DESTROY_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.FORCE_UNLOCK_TYPE; import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.FORCE_UNLOCK_TYPE;
Expand All @@ -49,6 +50,7 @@
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_TYPE; import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.LOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.TRY_LOCK_TYPE; import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.TRY_LOCK_TYPE;
import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.UNLOCK_TYPE; import static com.hazelcast.raft.service.lock.client.LockMessageTaskFactoryProvider.UNLOCK_TYPE;
import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
import static com.hazelcast.raft.service.util.ClientAccessor.getClient; import static com.hazelcast.raft.service.util.ClientAccessor.getClient;
import static com.hazelcast.util.ThreadUtil.getThreadId; import static com.hazelcast.util.ThreadUtil.getThreadId;


Expand Down Expand Up @@ -102,9 +104,8 @@ public void lock() {
for (;;) { for (;;) {
long sessionId = acquireSession(); long sessionId = acquireSession();
ClientMessage msg = encodeRequest(LOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid); ClientMessage msg = encodeRequest(LOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid);
InternalCompletableFuture<Object> future = invoke(client, name, msg, LONG_RESPONSE_DECODER);
try { try {
future.join(); invoke(client, name, msg, LONG_RESPONSE_DECODER).join();
break; break;
} catch (SessionExpiredException e) { } catch (SessionExpiredException e) {
invalidateSession(sessionId); invalidateSession(sessionId);
Expand All @@ -124,9 +125,9 @@ public boolean tryLock(long time, TimeUnit unit) {
for (;;) { for (;;) {
long sessionId = acquireSession(); long sessionId = acquireSession();
ClientMessage msg = encodeRequest(TRY_LOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid, timeoutMs); ClientMessage msg = encodeRequest(TRY_LOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid, timeoutMs);
InternalCompletableFuture<Long> future = invoke(client, name, msg, LONG_RESPONSE_DECODER);
try { try {
boolean locked = future.join() > RaftLockService.INVALID_FENCE; InternalCompletableFuture<Long> future = invoke(client, name, msg, LONG_RESPONSE_DECODER);
boolean locked = (future.join() != INVALID_FENCE);
if (!locked) { if (!locked) {
releaseSession(sessionId); releaseSession(sessionId);
} }
Expand All @@ -140,14 +141,13 @@ public boolean tryLock(long time, TimeUnit unit) {
@Override @Override
public void unlock() { public void unlock() {
long sessionId = getSession(); long sessionId = getSession();
if (sessionId < 0) { if (sessionId == NO_SESSION_ID) {
throw new IllegalMonitorStateException(); throw new IllegalMonitorStateException();
} }
UUID invUid = UuidUtil.newUnsecureUUID(); UUID invUid = UuidUtil.newUnsecureUUID();
ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid); ClientMessage msg = encodeRequest(UNLOCK_TYPE, groupId, name, sessionId, getThreadId(), invUid);
InternalCompletableFuture<Object> future = invoke(client, name, msg, BOOLEAN_RESPONSE_DECODER);
try { try {
future.join(); invoke(client, name, msg, BOOLEAN_RESPONSE_DECODER).join();
} catch (SessionExpiredException e) { } catch (SessionExpiredException e) {
throw new IllegalMonitorStateException("Current thread is not owner of the lock!"); throw new IllegalMonitorStateException("Current thread is not owner of the lock!");
} finally { } finally {
Expand All @@ -163,7 +163,7 @@ public boolean isLocked() {
@Override @Override
public boolean isLockedByCurrentThread() { public boolean isLockedByCurrentThread() {
long sessionId = getSession(); long sessionId = getSession();
if (sessionId < 0) { if (sessionId == NO_SESSION_ID) {
return false; return false;
} }
ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, sessionId, getThreadId()); ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, sessionId, getThreadId());
Expand All @@ -173,7 +173,7 @@ public boolean isLockedByCurrentThread() {


@Override @Override
public int getLockCount() { public int getLockCount() {
ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, -1, -1); ClientMessage msg = encodeRequest(LOCK_COUNT_TYPE, groupId, name, NO_SESSION_ID, 0);
InternalCompletableFuture<Integer> future = invoke(client, name, msg, INT_RESPONSE_DECODER); InternalCompletableFuture<Integer> future = invoke(client, name, msg, INT_RESPONSE_DECODER);
return future.join(); return future.join();
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.hazelcast.client.util.ClientDelegatingFuture; import com.hazelcast.client.util.ClientDelegatingFuture;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ISemaphore; import com.hazelcast.core.ISemaphore;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.nio.Bits; import com.hazelcast.nio.Bits;
import com.hazelcast.raft.RaftGroupId; import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.RaftGroupIdImpl; import com.hazelcast.raft.impl.RaftGroupIdImpl;
Expand Down Expand Up @@ -61,8 +60,8 @@
*/ */
public class RaftSessionAwareSemaphoreProxy extends SessionAwareProxy implements ISemaphore { public class RaftSessionAwareSemaphoreProxy extends SessionAwareProxy implements ISemaphore {


static final ClientMessageDecoder INT_RESPONSE_DECODER = new IntResponseDecoder(); private static final ClientMessageDecoder INT_RESPONSE_DECODER = new IntResponseDecoder();
static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder(); private static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder();


public static ISemaphore create(HazelcastInstance instance, String name) { public static ISemaphore create(HazelcastInstance instance, String name) {
int dataSize = ClientMessage.HEADER_SIZE + calculateDataSize(name); int dataSize = ClientMessage.HEADER_SIZE + calculateDataSize(name);
Expand Down Expand Up @@ -91,7 +90,7 @@ public RaftGroupId decodeClientMessage(ClientMessage msg) {
private final HazelcastClientInstanceImpl client; private final HazelcastClientInstanceImpl client;
private final String name; private final String name;


public RaftSessionAwareSemaphoreProxy(HazelcastInstance instance, RaftGroupId groupId, String name) { private RaftSessionAwareSemaphoreProxy(HazelcastInstance instance, RaftGroupId groupId, String name) {
super(SessionManagerProvider.get(getClient(instance)), groupId); super(SessionManagerProvider.get(getClient(instance)), groupId);
this.client = getClient(instance); this.client = getClient(instance);
this.name = name; this.name = name;
Expand Down Expand Up @@ -131,12 +130,9 @@ public void acquire(int permits) {
msg.set(-1L); msg.set(-1L);
msg.updateFrameLength(); msg.updateFrameLength();


InternalCompletableFuture<Object> future = invoke(msg, BOOLEAN_RESPONSE_DECODER);
try { try {
future.join(); invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
return; return;
} catch (OperationTimeoutException ignored) {
// I can retry safely because my retry would be idempotent...
} catch (SessionExpiredException e) { } catch (SessionExpiredException e) {
invalidateSession(sessionId); invalidateSession(sessionId);
} }
Expand Down Expand Up @@ -176,17 +172,19 @@ public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
msg.set(timeoutMs); msg.set(timeoutMs);
msg.updateFrameLength(); msg.updateFrameLength();


InternalCompletableFuture<Boolean> future = invoke(msg, BOOLEAN_RESPONSE_DECODER);
try { try {
InternalCompletableFuture<Boolean> future = invoke(msg, BOOLEAN_RESPONSE_DECODER);
boolean acquired = future.join(); boolean acquired = future.join();
if (!acquired) { if (!acquired) {
releaseSession(sessionId, permits); releaseSession(sessionId, permits);
} }
return acquired; return acquired;
} catch (OperationTimeoutException e) {
timeoutMs = Math.max(0, (timeoutMs - (Clock.currentTimeMillis() - start)));
} catch (SessionExpiredException e) { } catch (SessionExpiredException e) {
invalidateSession(sessionId); invalidateSession(sessionId);
timeoutMs -= (Clock.currentTimeMillis() - start);
if (timeoutMs <= 0) {
return false;
}
} }
} }
} }
Expand All @@ -200,30 +198,23 @@ public void release() {
public void release(int permits) { public void release(int permits) {
checkPositive(permits, "Permits must be positive!"); checkPositive(permits, "Permits must be positive!");
long sessionId = getSession(); long sessionId = getSession();
if (sessionId < 0) { if (sessionId == NO_SESSION_ID) {
throw new IllegalStateException("No valid session!"); throw new IllegalStateException("No valid session!");
} }


UUID invocationUid = newUnsecureUUID(); UUID invocationUid = newUnsecureUUID();
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) +
Bits.LONG_SIZE_IN_BYTES * 3 + Bits.INT_SIZE_IN_BYTES; Bits.LONG_SIZE_IN_BYTES * 3 + Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, RELEASE_PERMITS_TYPE);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(permits);
msg.updateFrameLength();
try { try {
for (;;) { invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, RELEASE_PERMITS_TYPE); } catch (SessionExpiredException e) {
msg.set(invocationUid.getLeastSignificantBits()); invalidateSession(sessionId);
msg.set(invocationUid.getMostSignificantBits()); throw e;
msg.set(permits);
msg.updateFrameLength();
try {
invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
return;
} catch (OperationTimeoutException ignored) {
// I can retry safely because my retry would be idempotent...
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
throw e;
}
}
} finally { } finally {
releaseSession(sessionId, permits); releaseSession(sessionId, permits);
} }
Expand All @@ -250,13 +241,11 @@ public int drainPermits() {
msg.set(invocationUid.getMostSignificantBits()); msg.set(invocationUid.getMostSignificantBits());
msg.updateFrameLength(); msg.updateFrameLength();


InternalCompletableFuture<Integer> future = invoke(msg, INT_RESPONSE_DECODER);
try { try {
InternalCompletableFuture<Integer> future = invoke(msg, INT_RESPONSE_DECODER);
int count = future.join(); int count = future.join();
releaseSession(sessionId, DRAIN_SESSION_ACQ_COUNT - count); releaseSession(sessionId, DRAIN_SESSION_ACQ_COUNT - count);
return count; return count;
} catch (OperationTimeoutException ignored) {
// I can retry safely because my retry would be idempotent...
} catch (SessionExpiredException e) { } catch (SessionExpiredException e) {
invalidateSession(sessionId); invalidateSession(sessionId);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@
*/ */
public class RaftSessionlessSemaphoreProxy implements ISemaphore { public class RaftSessionlessSemaphoreProxy implements ISemaphore {


static final ClientMessageDecoder INT_RESPONSE_DECODER = new IntResponseDecoder(); private static final ClientMessageDecoder INT_RESPONSE_DECODER = new IntResponseDecoder();
static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder(); private static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder();


public static ISemaphore create(HazelcastInstance instance, String name) { public static ISemaphore create(HazelcastInstance instance, String name) {
int dataSize = ClientMessage.HEADER_SIZE + calculateDataSize(name); int dataSize = ClientMessage.HEADER_SIZE + calculateDataSize(name);
Expand Down Expand Up @@ -86,7 +86,7 @@ public RaftGroupId decodeClientMessage(ClientMessage msg) {
private final RaftGroupId groupId; private final RaftGroupId groupId;
private final String name; private final String name;


public RaftSessionlessSemaphoreProxy(HazelcastInstance instance, RaftGroupId groupId, String name) { private RaftSessionlessSemaphoreProxy(HazelcastInstance instance, RaftGroupId groupId, String name) {
this.client = getClient(instance); this.client = getClient(instance);
this.groupId = groupId; this.groupId = groupId;
this.name = name; this.name = name;
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ int getLockCount(String name, LockEndpoint endpoint) {
return lock.lockCount(); return lock.lockCount();
} }


long getLockFence(String name) { long getLockFence(String name, LockEndpoint endpoint) {
RaftLock lock = getResourceOrNull(name); RaftLock lock = getResourceOrNull(name);
if (lock == null) { if (lock == null) {
throw new IllegalMonitorStateException(); throw new IllegalMonitorStateException();
Expand All @@ -108,6 +108,8 @@ long getLockFence(String name) {
LockInvocationKey owner = lock.owner(); LockInvocationKey owner = lock.owner();
if (owner == null) { if (owner == null) {
throw new IllegalMonitorStateException("Lock[" + name + "] has no owner!"); throw new IllegalMonitorStateException("Lock[" + name + "] has no owner!");
} else if (endpoint != null && !owner.endpoint().equals(endpoint)) {
throw new IllegalMonitorStateException("Lock[" + name + "] is owned by " + owner.endpoint() + "!");
} }


return owner.commitIndex(); return owner.commitIndex();
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ public int getLockCount(RaftGroupId groupId, String name, LockEndpoint endpoint)
return registry != null ? registry.getLockCount(name, endpoint) : 0; return registry != null ? registry.getLockCount(name, endpoint) : 0;
} }


public long getLockFence(RaftGroupId groupId, String name) { public long getLockFence(RaftGroupId groupId, String name, LockEndpoint endpoint) {
checkNotNull(groupId); checkNotNull(groupId);
checkNotNull(name); checkNotNull(name);


return getLockRegistryOrFail(groupId, name).getLockFence(name); return getLockRegistryOrFail(groupId, name).getLockFence(name, endpoint);
} }


private LockRegistry getLockRegistryOrFail(RaftGroupId groupId, String name) { private LockRegistry getLockRegistryOrFail(RaftGroupId groupId, String name) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class GetLockFenceMessageTask extends AbstractLockMessageTask {
@Override @Override
protected void processMessage() { protected void processMessage() {
RaftInvocationManager invocationManager = getRaftInvocationManager(); RaftInvocationManager invocationManager = getRaftInvocationManager();
invocationManager.invoke(groupId, new GetLockFenceOp(name)).andThen(this); invocationManager.invoke(groupId, new GetLockFenceOp(name, sessionId, threadId)).andThen(this);
} }


@Override @Override
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -27,24 +27,20 @@


import java.io.IOException; import java.io.IOException;


import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;

/** /**
* TODO: Javadoc Pending... * TODO: Javadoc Pending...
*/ */
public class GetLockCountOp extends RaftOp implements IdentifiedDataSerializable { public class GetLockCountOp extends RaftOp implements IdentifiedDataSerializable {


private static int NO_SESSION = -1;

private String name; private String name;
private long sessionId = NO_SESSION; private long sessionId;
private long threadId; private long threadId;


public GetLockCountOp() { public GetLockCountOp() {
} }


public GetLockCountOp(String name) {
this.name = name;
}

public GetLockCountOp(String name, long sessionId, long threadId) { public GetLockCountOp(String name, long sessionId, long threadId) {
this.name = name; this.name = name;
this.sessionId = sessionId; this.sessionId = sessionId;
Expand All @@ -54,7 +50,7 @@ public GetLockCountOp(String name, long sessionId, long threadId) {
@Override @Override
public Object run(RaftGroupId groupId, long commitIndex) { public Object run(RaftGroupId groupId, long commitIndex) {
RaftLockService service = getService(); RaftLockService service = getService();
LockEndpoint endpoint = (sessionId != NO_SESSION) ? new LockEndpoint(sessionId, threadId) : null; LockEndpoint endpoint = (sessionId != NO_SESSION_ID) ? new LockEndpoint(sessionId, threadId) : null;


return service.getLockCount(groupId, name, endpoint); return service.getLockCount(groupId, name, endpoint);
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -21,29 +21,38 @@
import com.hazelcast.nio.serialization.IdentifiedDataSerializable; import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.raft.RaftGroupId; import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.RaftOp; import com.hazelcast.raft.impl.RaftOp;
import com.hazelcast.raft.service.lock.LockEndpoint;
import com.hazelcast.raft.service.lock.RaftLockDataSerializerHook; import com.hazelcast.raft.service.lock.RaftLockDataSerializerHook;
import com.hazelcast.raft.service.lock.RaftLockService; import com.hazelcast.raft.service.lock.RaftLockService;


import java.io.IOException; import java.io.IOException;


import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;

/** /**
* TODO: Javadoc Pending... * TODO: Javadoc Pending...
*/ */
public class GetLockFenceOp extends RaftOp implements IdentifiedDataSerializable { public class GetLockFenceOp extends RaftOp implements IdentifiedDataSerializable {


private String name; private String name;
private long sessionId;
private long threadId;


public GetLockFenceOp() { public GetLockFenceOp() {
} }


public GetLockFenceOp(String name) { public GetLockFenceOp(String name, long sessionId, long threadId) {
this.name = name; this.name = name;
this.sessionId = sessionId;
this.threadId = threadId;
} }


@Override @Override
public Object run(RaftGroupId groupId, long commitIndex) { public Object run(RaftGroupId groupId, long commitIndex) {
RaftLockService service = getService(); RaftLockService service = getService();
return service.getLockFence(groupId, name); LockEndpoint endpoint = (sessionId != NO_SESSION_ID) ? new LockEndpoint(sessionId, threadId) : null;

return service.getLockFence(groupId, name, endpoint);
} }


@Override @Override
Expand All @@ -64,15 +73,21 @@ public int getId() {
@Override @Override
public void writeData(ObjectDataOutput out) throws IOException { public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name); out.writeUTF(name);
out.writeLong(sessionId);
out.writeLong(threadId);
} }


@Override @Override
public void readData(ObjectDataInput in) throws IOException { public void readData(ObjectDataInput in) throws IOException {
name = in.readUTF(); name = in.readUTF();
sessionId = in.readLong();
threadId = in.readLong();
} }


@Override @Override
protected void toString(StringBuilder sb) { protected void toString(StringBuilder sb) {
sb.append(", name=").append(name); sb.append(", name=").append(name);
sb.append(", sessionId=").append(sessionId);
sb.append(", threadId=").append(threadId);
} }
} }
Loading

0 comments on commit e70df3c

Please sign in to comment.