Skip to content

Commit

Permalink
Make increasePermits() and decreasePermits() idempontent for session …
Browse files Browse the repository at this point in the history
…aware semaphore
  • Loading branch information
metanet authored and mdogan committed Feb 1, 2019
1 parent ba68a51 commit 135162e
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,13 +268,31 @@ public void reducePermits(int reduction) {
return;
}

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES
long sessionId = acquireSession();
if (sessionId == NO_SESSION_ID) {
throw new IllegalStateException("No valid session!");
}

long threadId = getThreadId();
UUID invocationUid = newUnsecureUUID();

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
+ Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, NO_SESSION_ID, dataSize, CHANGE_PERMITS_TYPE);
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, CHANGE_PERMITS_TYPE);
msg.set(threadId);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(-reduction);
msg.updateFrameLength();

invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
try {
invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
throw e;
} finally {
releaseSession(sessionId);
}
}

@Override
Expand All @@ -284,13 +302,31 @@ public void increasePermits(int increase) {
return;
}

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES
long sessionId = acquireSession();
if (sessionId == NO_SESSION_ID) {
throw new IllegalStateException("No valid session!");
}

long threadId = getThreadId();
UUID invocationUid = newUnsecureUUID();

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
+ Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, NO_SESSION_ID, dataSize, CHANGE_PERMITS_TYPE);
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, CHANGE_PERMITS_TYPE);
msg.set(threadId);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(increase);
msg.updateFrameLength();

invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
try {
invoke(msg, BOOLEAN_RESPONSE_DECODER).join();
} catch (SessionExpiredException e) {
invalidateSession(sessionId);
throw e;
} finally {
releaseSession(sessionId);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,9 +246,15 @@ public void reducePermits(int reduction) {
return;
}

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES
long globalThreadId = getGlobalThreadId(groupId, globallyUniqueThreadIdCtor);
UUID invocationUid = newUnsecureUUID();

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
+ Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, CHANGE_PERMITS_TYPE);
msg.set(globalThreadId);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(-reduction);
msg.updateFrameLength();

Expand All @@ -262,9 +268,15 @@ public void increasePermits(int increase) {
return;
}

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES
long globalThreadId = getGlobalThreadId(groupId, globallyUniqueThreadIdCtor);
UUID invocationUid = newUnsecureUUID();

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
+ Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, CHANGE_PERMITS_TYPE);
msg.set(globalThreadId);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(increase);
msg.updateFrameLength();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import com.hazelcast.raft.impl.session.SessionExpiredException;
import com.hazelcast.raft.impl.util.Tuple2;
import com.hazelcast.raft.service.blocking.operation.InvalidateWaitKeysOp;
import com.hazelcast.raft.service.session.AbstractSessionManager;
import com.hazelcast.raft.service.spi.RaftRemoteService;
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.ManagedService;
Expand All @@ -49,6 +48,7 @@
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;

import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
import static com.hazelcast.util.Preconditions.checkNotNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -207,7 +207,7 @@ protected final void scheduleTimeout(RaftGroupId groupId, W waitKey, long timeou
}

protected final void heartbeatSession(RaftGroupId groupId, long sessionId) {
if (sessionId == AbstractSessionManager.NO_SESSION_ID) {
if (sessionId == NO_SESSION_ID) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,19 +199,35 @@ AcquireResult drain(long sessionId, long threadId, UUID invocationUid) {
return new AcquireResult(drained, cancelled);
}

Tuple2<Boolean, Collection<SemaphoreInvocationKey>> change(int permits) {
ReleaseResult change(long sessionId, long threadId, UUID invocationUid, int permits) {
if (permits == 0) {
Collection<SemaphoreInvocationKey> c = Collections.emptyList();
return Tuple2.of(false, c);
return ReleaseResult.failed(Collections.<SemaphoreInvocationKey>emptyList());
}

Collection<SemaphoreInvocationKey> cancelled = cancelWaitKeys(sessionId, threadId);

if (sessionId != NO_SESSION_ID) {
SessionState state = sessionStates.get(sessionId);
if (state == null) {
state = new SessionState();
sessionStates.put(sessionId, state);
}

if (state.invocationRefUids.containsKey(Tuple2.of(threadId, invocationUid))) {
Collection<SemaphoreInvocationKey> c = Collections.emptyList();
return ReleaseResult.successful(c, c);
}

state.invocationRefUids.put(Tuple2.of(threadId, invocationUid), permits);
}

available += permits;
initialized = true;

Collection<SemaphoreInvocationKey> keys =
Collection<SemaphoreInvocationKey> acquired =
permits > 0 ? assignPermitsToWaitKeys() : Collections.<SemaphoreInvocationKey>emptyList();

return Tuple2.of(true, keys);
return ReleaseResult.successful(acquired, cancelled);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.hazelcast.core.ISemaphore;
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.service.RaftInvocationManager;
import com.hazelcast.raft.impl.util.Tuple2;
import com.hazelcast.raft.service.blocking.AbstractBlockingService;
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
import com.hazelcast.raft.service.semaphore.RaftSemaphore.AcquireResult;
Expand Down Expand Up @@ -81,11 +80,6 @@ public boolean initSemaphore(RaftGroupId groupId, String name, int permits) {
Collection<SemaphoreInvocationKey> acquired = getOrInitRegistry(groupId).init(name, permits);
notifyWaitKeys(groupId, acquired, true);

if (acquired.size() > 0 && logger.isFineEnabled()) {
logger.fine("Semaphore[" + name + "] in " + groupId + " acquired permits: " + acquired
+ " after initialized with " + permits + " permits");
}

return true;
} catch (IllegalStateException ignored) {
return false;
Expand All @@ -103,15 +97,10 @@ public boolean acquirePermits(RaftGroupId groupId, long commitIndex, String name
SemaphoreInvocationKey key = new SemaphoreInvocationKey(name, commitIndex, sessionId, threadId, invocationUid, permits);
AcquireResult result = getOrInitRegistry(groupId).acquire(name, key, timeoutMs);

// if (logger.isFineEnabled()) {
if (result.acquired > 0) {
logger.warning("Semaphore[" + name + "] in " + groupId + " acquired permits: " + result.acquired
if (logger.isFineEnabled()) {
logger.fine("Semaphore[" + name + "] in " + groupId + " acquired permits: " + result.acquired
+ " by <" + sessionId + ", " + threadId + ", " + invocationUid + ">");
} else {
logger.warning("Semaphore[" + name + "] in " + groupId + " NOT acquired permits by <" + sessionId + ", " + threadId
+ ", " + invocationUid + ">");
}
// }

notifyCancelledWaitKeys(groupId, name, result.cancelled);

Expand All @@ -129,28 +118,26 @@ public void releasePermits(RaftGroupId groupId, String name, long sessionId, lon
notifyWaitKeys(groupId, result.acquired, true);

if (!result.success) {
logger.warning("Semaphore[" + name + "] in " + groupId + " NOT released permits: " + permits
+ " by <" + sessionId + ", " + threadId + ", " + invocationUid + ">");
throw new IllegalArgumentException();
} else {
logger.warning("Semaphore[" + name + "] in " + groupId + " released permits: " + permits
+ " by <" + sessionId + ", " + threadId + ", " + invocationUid + ">");
logger.warning("Semaphore[" + name + "] in " + groupId + " acquired permits: " + result.acquired);
}
}

public int drainPermits(RaftGroupId groupId, String name, long sessionId, long threadId, UUID invocationUid) {
heartbeatSession(groupId, sessionId);
AcquireResult result = getOrInitRegistry(groupId).drainPermits(name, sessionId, threadId, invocationUid);
notifyCancelledWaitKeys(groupId, name, result.cancelled);

return result.acquired;
}

public boolean changePermits(RaftGroupId groupId, String name, int permits) {
Tuple2<Boolean, Collection<SemaphoreInvocationKey>> t = getOrInitRegistry(groupId).changePermits(name, permits);
notifyWaitKeys(groupId, t.element2, true);
public boolean changePermits(RaftGroupId groupId, String name, long sessionId, long threadId, UUID invocationUid,
int permits) {
heartbeatSession(groupId, sessionId);
ReleaseResult result = getOrInitRegistry(groupId).changePermits(name, sessionId, threadId, invocationUid, permits);
notifyCancelledWaitKeys(groupId, name, result.cancelled);
notifyWaitKeys(groupId, result.acquired, true);

return t.element1;
return result.success;
}

private void notifyCancelledWaitKeys(RaftGroupId groupId, String name, Collection<SemaphoreInvocationKey> waitKeys) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.util.Tuple2;
import com.hazelcast.raft.service.blocking.ResourceRegistry;
import com.hazelcast.raft.service.semaphore.RaftSemaphore.AcquireResult;
import com.hazelcast.raft.service.semaphore.RaftSemaphore.ReleaseResult;
Expand Down Expand Up @@ -95,13 +94,17 @@ AcquireResult drainPermits(String name, long sessionId, long threadId, UUID invo
return result;
}

Tuple2<Boolean, Collection<SemaphoreInvocationKey>> changePermits(String name, int permits) {
Tuple2<Boolean, Collection<SemaphoreInvocationKey>> t = getOrInitResource(name).change(permits);
for (SemaphoreInvocationKey key : t.element2) {
ReleaseResult changePermits(String name, long sessionId, long threadId, UUID invocationUid, int permits) {
ReleaseResult result = getOrInitResource(name).change(sessionId, threadId, invocationUid, permits);
for (SemaphoreInvocationKey key : result.acquired) {
removeWaitKey(key);
}

for (SemaphoreInvocationKey key : result.cancelled) {
removeWaitKey(key);
}

return t;
return result;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.instance.Node;
import com.hazelcast.nio.Connection;
import com.hazelcast.raft.impl.RaftOp;
import com.hazelcast.raft.impl.service.RaftInvocationManager;
import com.hazelcast.raft.service.semaphore.operation.ChangePermitsOp;

import java.util.UUID;

/**
* TODO: Javadoc Pending...
*/
public class ChangePermitsMessageTask extends AbstractSemaphoreMessageTask {

private long threadId;
private UUID invocationUid;
private int permits;

ChangePermitsMessageTask(ClientMessage clientMessage, Node node, Connection connection) {
Expand All @@ -36,12 +41,17 @@ public class ChangePermitsMessageTask extends AbstractSemaphoreMessageTask {
@Override
protected void processMessage() {
RaftInvocationManager invocationManager = getRaftInvocationManager();
invocationManager.invoke(groupId, new ChangePermitsOp(name, permits)).andThen(this);
RaftOp op = new ChangePermitsOp(name, sessionId, threadId, invocationUid, permits);
invocationManager.invoke(groupId, op).andThen(this);
}

@Override
protected Object decodeClientMessage(ClientMessage clientMessage) {
super.decodeClientMessage(clientMessage);
threadId = clientMessage.getLong();
long least = clientMessage.getLong();
long most = clientMessage.getLong();
invocationUid = new UUID(most, least);
permits = clientMessage.getInt();
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,31 @@
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.raft.RaftGroupId;
import com.hazelcast.raft.impl.RaftOp;
import com.hazelcast.raft.service.semaphore.RaftSemaphoreDataSerializerHook;
import com.hazelcast.raft.service.semaphore.RaftSemaphoreService;

import java.io.IOException;
import java.util.UUID;

/**
* TODO: Javadoc Pending...
*/
public class ChangePermitsOp extends RaftOp implements IdentifiedDataSerializable {
public class ChangePermitsOp extends AbstractSemaphoreOp implements IdentifiedDataSerializable {

private String name;
private int permits;

public ChangePermitsOp() {
}

public ChangePermitsOp(String name, int permits) {
this.name = name;
public ChangePermitsOp(String name, long sessionId, long threadId, UUID invocationUid, int permits) {
super(name, sessionId, threadId, invocationUid);
this.permits = permits;
}

@Override
public Object run(RaftGroupId groupId, long commitIndex) {
RaftSemaphoreService service = getService();
return service.changePermits(groupId, name, permits);
return service.changePermits(groupId, name, sessionId, threadId, invocationUid, permits);
}

@Override
Expand All @@ -65,19 +64,19 @@ public int getId() {

@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name);
super.writeData(out);
out.writeInt(permits);
}

@Override
public void readData(ObjectDataInput in) throws IOException {
name = in.readUTF();
super.readData(in);
permits = in.readInt();
}

@Override
protected void toString(StringBuilder sb) {
sb.append(", name=").append(name)
.append(", permits=").append(permits);
super.toString(sb);
sb.append(", permits=").append(permits);
}
}
Loading

0 comments on commit 135162e

Please sign in to comment.