Skip to content

Commit 0bb6683

Browse files
metanetmdogan
authored andcommitted
RaftSemaphore operation timeout fixes
1 parent a871011 commit 0bb6683

22 files changed

+918
-107
lines changed

hazelcast-raft-client/src/main/java/com/hazelcast/raft/service/semaphore/client/RaftSessionAwareSemaphoreProxy.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import static com.hazelcast.raft.service.util.ClientAccessor.getClient;
5353
import static com.hazelcast.util.Preconditions.checkNotNegative;
5454
import static com.hazelcast.util.Preconditions.checkPositive;
55+
import static com.hazelcast.util.ThreadUtil.getThreadId;
5556
import static com.hazelcast.util.UuidUtil.newUnsecureUUID;
5657
import static java.lang.Math.max;
5758

@@ -118,12 +119,14 @@ public void acquire() {
118119
@Override
119120
public void acquire(int permits) {
120121
checkPositive(permits, "Permits must be positive!");
122+
long threadId = getThreadId();
121123
UUID invocationUid = newUnsecureUUID();
122-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
124+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 5
123125
+ Bits.INT_SIZE_IN_BYTES;
124126
for (;;) {
125127
long sessionId = acquireSession(permits);
126128
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, ACQUIRE_PERMITS_TYPE);
129+
msg.set(threadId);
127130
msg.set(invocationUid.getLeastSignificantBits());
128131
msg.set(invocationUid.getMostSignificantBits());
129132
msg.set(permits);
@@ -158,14 +161,16 @@ public boolean tryAcquire(long timeout, TimeUnit unit) {
158161
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
159162
checkPositive(permits, "Permits must be positive!");
160163
long timeoutMs = max(0, unit.toMillis(timeout));
164+
long threadId = getThreadId();
161165
UUID invocationUid = newUnsecureUUID();
162-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
166+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 5
163167
+ Bits.INT_SIZE_IN_BYTES;
164168
long start;
165169
for (;;) {
166170
start = Clock.currentTimeMillis();
167171
long sessionId = acquireSession(permits);
168172
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, ACQUIRE_PERMITS_TYPE);
173+
msg.set(threadId);
169174
msg.set(invocationUid.getLeastSignificantBits());
170175
msg.set(invocationUid.getMostSignificantBits());
171176
msg.set(permits);
@@ -202,10 +207,12 @@ public void release(int permits) {
202207
throw new IllegalStateException("No valid session!");
203208
}
204209

210+
long threadId = getThreadId();
205211
UUID invocationUid = newUnsecureUUID();
206212
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) +
207-
Bits.LONG_SIZE_IN_BYTES * 3 + Bits.INT_SIZE_IN_BYTES;
213+
Bits.LONG_SIZE_IN_BYTES * 4 + Bits.INT_SIZE_IN_BYTES;
208214
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, RELEASE_PERMITS_TYPE);
215+
msg.set(threadId);
209216
msg.set(invocationUid.getLeastSignificantBits());
210217
msg.set(invocationUid.getMostSignificantBits());
211218
msg.set(permits);
@@ -232,11 +239,13 @@ public int availablePermits() {
232239

233240
@Override
234241
public int drainPermits() {
242+
long threadId = getThreadId();
235243
UUID invocationUid = newUnsecureUUID();
236-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 3;
244+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4;
237245
for (;;) {
238246
long sessionId = acquireSession(DRAIN_SESSION_ACQ_COUNT);
239247
ClientMessage msg = prepareClientMessage(groupId, name, sessionId, dataSize, DRAIN_PERMITS_TYPE);
248+
msg.set(threadId);
240249
msg.set(invocationUid.getLeastSignificantBits());
241250
msg.set(invocationUid.getMostSignificantBits());
242251
msg.updateFrameLength();

hazelcast-raft-client/src/main/java/com/hazelcast/raft/service/semaphore/client/RaftSessionlessSemaphoreProxy.java

Lines changed: 45 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import com.hazelcast.raft.impl.RaftGroupIdImpl;
3030
import com.hazelcast.raft.service.semaphore.RaftSemaphoreService;
3131
import com.hazelcast.spi.InternalCompletableFuture;
32+
import com.hazelcast.util.ConstructorFunction;
3233

3334
import java.util.UUID;
3435
import java.util.concurrent.TimeUnit;
3536

3637
import static com.hazelcast.client.impl.protocol.util.ParameterUtil.calculateDataSize;
3738
import static com.hazelcast.raft.impl.RaftGroupIdImpl.dataSize;
39+
import static com.hazelcast.raft.service.atomiclong.client.AtomicLongMessageTaskFactoryProvider.ADD_AND_GET_TYPE;
3840
import static com.hazelcast.raft.service.semaphore.client.SemaphoreMessageTaskFactoryProvider.ACQUIRE_PERMITS_TYPE;
3941
import static com.hazelcast.raft.service.semaphore.client.SemaphoreMessageTaskFactoryProvider.AVAILABLE_PERMITS_TYPE;
4042
import static com.hazelcast.raft.service.semaphore.client.SemaphoreMessageTaskFactoryProvider.CHANGE_PERMITS_TYPE;
@@ -43,6 +45,7 @@
4345
import static com.hazelcast.raft.service.semaphore.client.SemaphoreMessageTaskFactoryProvider.DRAIN_PERMITS_TYPE;
4446
import static com.hazelcast.raft.service.semaphore.client.SemaphoreMessageTaskFactoryProvider.INIT_SEMAPHORE_TYPE;
4547
import static com.hazelcast.raft.service.semaphore.client.SemaphoreMessageTaskFactoryProvider.RELEASE_PERMITS_TYPE;
48+
import static com.hazelcast.raft.service.semaphore.proxy.GloballyUniqueThreadIdUtil.getGlobalThreadId;
4649
import static com.hazelcast.raft.service.session.AbstractSessionManager.NO_SESSION_ID;
4750
import static com.hazelcast.raft.service.util.ClientAccessor.getClient;
4851
import static com.hazelcast.util.Preconditions.checkNotNegative;
@@ -56,6 +59,7 @@
5659
public class RaftSessionlessSemaphoreProxy implements ISemaphore {
5760

5861
private static final ClientMessageDecoder INT_RESPONSE_DECODER = new IntResponseDecoder();
62+
private static final ClientMessageDecoder LONG_RESPONSE_DECODER = new LongResponseDecoder();
5963
private static final ClientMessageDecoder BOOLEAN_RESPONSE_DECODER = new BooleanResponseDecoder();
6064

6165
public static ISemaphore create(HazelcastInstance instance, String name) {
@@ -85,11 +89,32 @@ public RaftGroupId decodeClientMessage(ClientMessage msg) {
8589
private final HazelcastClientInstanceImpl client;
8690
private final RaftGroupId groupId;
8791
private final String name;
92+
private final ConstructorFunction<RaftGroupId, Long> globallyUniqueThreadIdCtor;
8893

89-
private RaftSessionlessSemaphoreProxy(HazelcastInstance instance, RaftGroupId groupId, String name) {
94+
private RaftSessionlessSemaphoreProxy(HazelcastInstance instance, RaftGroupId groupId, final String name) {
9095
this.client = getClient(instance);
9196
this.groupId = groupId;
9297
this.name = name;
98+
this.globallyUniqueThreadIdCtor = new ConstructorFunction<RaftGroupId, Long>() {
99+
@Override
100+
public Long createNew(RaftGroupId groupId) {
101+
int dataSize = ClientMessage.HEADER_SIZE
102+
+ RaftGroupIdImpl.dataSize(groupId) + calculateDataSize(name)
103+
+ Bits.LONG_SIZE_IN_BYTES;
104+
105+
ClientMessage msg = ClientMessage.createForEncode(dataSize);
106+
msg.setMessageType(ADD_AND_GET_TYPE);
107+
msg.setRetryable(false);
108+
msg.setOperationName("");
109+
RaftGroupIdImpl.writeTo(groupId, msg);
110+
msg.set(name);
111+
msg.set(1L);
112+
msg.updateFrameLength();
113+
114+
ClientInvocationFuture future = new ClientInvocation(client, msg, getName()).invoke();
115+
return new ClientDelegatingFuture<Long>(future, client.getSerializationService(), LONG_RESPONSE_DECODER).join();
116+
}
117+
};
93118
}
94119

95120
@Override
@@ -115,10 +140,12 @@ public void acquire() {
115140
public void acquire(int permits) {
116141
checkPositive(permits, "Permits must be positive!");
117142

143+
long globalThreadId = getGlobalThreadId(groupId, globallyUniqueThreadIdCtor);
118144
UUID invocationUid = newUnsecureUUID();
119-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
145+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 5
120146
+ Bits.INT_SIZE_IN_BYTES;
121147
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, ACQUIRE_PERMITS_TYPE);
148+
msg.set(globalThreadId);
122149
msg.set(invocationUid.getLeastSignificantBits());
123150
msg.set(invocationUid.getMostSignificantBits());
124151
msg.set(permits);
@@ -147,11 +174,13 @@ public boolean tryAcquire(long timeout, TimeUnit unit) {
147174
public boolean tryAcquire(int permits, long timeout, TimeUnit unit) {
148175
checkPositive(permits, "Permits must be positive!");
149176

177+
long globalThreadId = getGlobalThreadId(groupId, globallyUniqueThreadIdCtor);
150178
UUID invocationUid = newUnsecureUUID();
151179
long timeoutMs = max(0, unit.toMillis(timeout));
152-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
180+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 5
153181
+ Bits.INT_SIZE_IN_BYTES;
154182
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, ACQUIRE_PERMITS_TYPE);
183+
msg.set(globalThreadId);
155184
msg.set(invocationUid.getLeastSignificantBits());
156185
msg.set(invocationUid.getMostSignificantBits());
157186
msg.set(permits);
@@ -171,10 +200,12 @@ public void release() {
171200
public void release(int permits) {
172201
checkPositive(permits, "Permits must be positive!");
173202

203+
long globalThreadId = getGlobalThreadId(groupId, globallyUniqueThreadIdCtor);
174204
UUID invocationUid = newUnsecureUUID();
175-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 3
205+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4
176206
+ Bits.INT_SIZE_IN_BYTES;
177207
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, RELEASE_PERMITS_TYPE);
208+
msg.set(globalThreadId);
178209
msg.set(invocationUid.getLeastSignificantBits());
179210
msg.set(invocationUid.getMostSignificantBits());
180211
msg.set(permits);
@@ -195,9 +226,11 @@ public int availablePermits() {
195226

196227
@Override
197228
public int drainPermits() {
229+
long globalThreadId = getGlobalThreadId(groupId, globallyUniqueThreadIdCtor);
198230
UUID invocationUid = newUnsecureUUID();
199-
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 3;
231+
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(name) + Bits.LONG_SIZE_IN_BYTES * 4;
200232
ClientMessage msg = prepareClientMessage(groupId, name, dataSize, DRAIN_PERMITS_TYPE);
233+
msg.set(globalThreadId);
201234
msg.set(invocationUid.getLeastSignificantBits());
202235
msg.set(invocationUid.getMostSignificantBits());
203236
msg.updateFrameLength();
@@ -294,6 +327,13 @@ public Integer decodeClientMessage(ClientMessage msg) {
294327
}
295328
}
296329

330+
private static class LongResponseDecoder implements ClientMessageDecoder {
331+
@Override
332+
public Long decodeClientMessage(ClientMessage msg) {
333+
return msg.getLong();
334+
}
335+
}
336+
297337
private static class BooleanResponseDecoder implements ClientMessageDecoder {
298338
@Override
299339
public Boolean decodeClientMessage(ClientMessage msg) {

hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/semaphore/client/RaftSessionAwareSemaphoreClientBasicTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.hazelcast.core.ISemaphore;
66
import com.hazelcast.raft.RaftGroupId;
77
import com.hazelcast.raft.impl.session.SessionExpiredException;
8-
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
8+
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
99
import com.hazelcast.raft.service.semaphore.RaftSessionAwareSemaphoreBasicTest;
1010
import com.hazelcast.raft.service.session.AbstractSessionManager;
1111
import com.hazelcast.raft.service.session.SessionManagerProvider;
@@ -34,7 +34,7 @@ protected HazelcastInstance[] createInstances() {
3434
TestHazelcastFactory f = (TestHazelcastFactory) factory;
3535
semaphoreInstance = f.newHazelcastClient();
3636
SessionExpiredException.register(getClient(semaphoreInstance).getClientExceptionFactory());
37-
LockRequestCancelledException.register(getClient(semaphoreInstance).getClientExceptionFactory());
37+
WaitKeyCancelledException.register(getClient(semaphoreInstance).getClientExceptionFactory());
3838
return instances;
3939
}
4040

hazelcast-raft-client/src/test/java/com/hazelcast/raft/service/semaphore/client/RaftSessionlessSemaphoreClientBasicTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import com.hazelcast.core.ISemaphore;
66
import com.hazelcast.raft.RaftGroupId;
77
import com.hazelcast.raft.impl.session.SessionExpiredException;
8-
import com.hazelcast.raft.service.lock.exception.LockRequestCancelledException;
8+
import com.hazelcast.raft.service.exception.WaitKeyCancelledException;
99
import com.hazelcast.raft.service.semaphore.RaftSessionlessSemaphoreBasicTest;
1010
import com.hazelcast.test.HazelcastSerialClassRunner;
1111
import com.hazelcast.test.TestHazelcastInstanceFactory;
@@ -34,7 +34,7 @@ protected HazelcastInstance[] createInstances() {
3434
TestHazelcastFactory f = (TestHazelcastFactory) factory;
3535
client = f.newHazelcastClient();
3636
SessionExpiredException.register(getClient(client).getClientExceptionFactory());
37-
LockRequestCancelledException.register(getClient(client).getClientExceptionFactory());
37+
WaitKeyCancelledException.register(getClient(client).getClientExceptionFactory());
3838
return instances;
3939
}
4040

0 commit comments

Comments
 (0)