Skip to content
Permalink
Browse files

Store retried wait keys in a single container

When a wait key is expired, all its retried duplicates must expire
as well.
  • Loading branch information...
metanet authored and mdogan committed Jan 7, 2019
1 parent b79c559 commit 4214776dd35bbead33209ceb2fb52fe9656b334e
Showing with 1,141 additions and 540 deletions.
  1. +1 −1 hazelcast-client/src/main/java/com/hazelcast/client/ClientOutOfMemoryHandler.java
  2. +12 −8 .../java/com/hazelcast/client/cp/internal/datastructures/countdownlatch/RaftCountDownLatchProxy.java
  3. +4 −2 hazelcast-client/src/main/java/com/hazelcast/client/impl/clientside/HazelcastClientInstanceImpl.java
  4. +10 −6 hazelcast-client/src/main/java/com/hazelcast/client/impl/clientside/LifecycleServiceImpl.java
  5. +48 −24 hazelcast/src/main/java/com/hazelcast/cp/internal/MetadataRaftGroupManager.java
  6. +7 −2 ...ast/src/main/java/com/hazelcast/cp/internal/datastructures/RaftDataServiceDataSerializerHook.java
  7. +19 −17 ...ast/src/main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/AwaitInvocationKey.java
  8. +14 −17 ...ast/src/main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/RaftCountDownLatch.java
  9. +6 −6 ...main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/RaftCountDownLatchRegistry.java
  10. +6 −6 .../main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/RaftCountDownLatchService.java
  11. +9 −3 ...rc/main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/client/AwaitMessageTask.java
  12. +5 −5 ...ain/java/com/hazelcast/cp/internal/datastructures/countdownlatch/client/CountDownMessageTask.java
  13. +11 −3 ...cast/src/main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/operation/AwaitOp.java
  14. +13 −13 .../src/main/java/com/hazelcast/cp/internal/datastructures/countdownlatch/operation/CountDownOp.java
  15. +2 −2 ...n/java/com/hazelcast/cp/internal/datastructures/countdownlatch/proxy/RaftCountDownLatchProxy.java
  16. +25 −29 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/LockInvocationKey.java
  17. +33 −53 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/RaftLock.java
  18. +10 −0 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/RaftLockOwnershipState.java
  19. +11 −12 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/RaftLockRegistry.java
  20. +54 −47 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/RaftLockService.java
  21. +5 −5 ...st/src/main/java/com/hazelcast/cp/internal/datastructures/lock/client/ForceUnlockMessageTask.java
  22. +4 −4 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/client/LockMessageTask.java
  23. +5 −5 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/client/UnlockMessageTask.java
  24. +8 −8 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/operation/AbstractLockOp.java
  25. +7 −7 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/operation/ForceUnlockOp.java
  26. +2 −2 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/operation/LockOp.java
  27. +2 −2 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/operation/TryLockOp.java
  28. +1 −1 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/lock/operation/UnlockOp.java
  29. +5 −5 ...rc/main/java/com/hazelcast/cp/internal/datastructures/lock/proxy/AbstractRaftFencedLockProxy.java
  30. +16 −27 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/AcquireInvocationKey.java
  31. +48 −42 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/RaftSemaphore.java
  32. +9 −9 ...lcast/src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/RaftSemaphoreRegistry.java
  33. +44 −17 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/RaftSemaphoreService.java
  34. +6 −6 ...ain/java/com/hazelcast/cp/internal/datastructures/semaphore/client/AcquirePermitsMessageTask.java
  35. +3 −3 ...main/java/com/hazelcast/cp/internal/datastructures/semaphore/client/ChangePermitsMessageTask.java
  36. +3 −3 .../main/java/com/hazelcast/cp/internal/datastructures/semaphore/client/DrainPermitsMessageTask.java
  37. +3 −3 ...ain/java/com/hazelcast/cp/internal/datastructures/semaphore/client/ReleasePermitsMessageTask.java
  38. +6 −6 ...c/main/java/com/hazelcast/cp/internal/datastructures/semaphore/operation/AbstractSemaphoreOp.java
  39. +1 −1 ...t/src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/operation/ChangePermitsOp.java
  40. +1 −1 ...st/src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/operation/DrainPermitsOp.java
  41. +1 −1 .../src/main/java/com/hazelcast/cp/internal/datastructures/semaphore/operation/ReleasePermitsOp.java
  42. +36 −30 .../src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java
  43. +53 −30 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/BlockingResource.java
  44. +44 −37 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/ResourceRegistry.java
  45. +7 −5 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/WaitKey.java
  46. +132 −0 hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/WaitKeyContainer.java
  47. +18 −14 ...c/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/operation/ExpireWaitKeysOp.java
  48. +4 −1 hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java
  49. +52 −0 hazelcast/src/main/java/com/hazelcast/cp/internal/util/UUIDSerializationUtil.java
  50. +29 −0 ...lcast/src/test/java/com/hazelcast/cp/internal/datastructures/lock/RaftFencedLockAdvancedTest.java
  51. +138 −5 hazelcast/src/test/java/com/hazelcast/cp/internal/datastructures/lock/RaftFencedLockFailureTest.java
  52. +146 −0 ...t/src/test/java/com/hazelcast/cp/internal/datastructures/semaphore/RaftSemaphoreAdvancedTest.java
  53. +2 −4 ...st/src/test/java/com/hazelcast/cp/internal/datastructures/semaphore/RaftSemaphoreFailureTest.java
@@ -65,7 +65,7 @@ private static void closeSockets(HazelcastClientInstanceImpl client) {

private static void tryShutdown(HazelcastClientInstanceImpl client) {
try {
client.doShutdown();
client.doShutdown(false);
} catch (Throwable ignored) {
ignore(ignored);
}
@@ -31,7 +31,6 @@
import com.hazelcast.nio.Bits;
import com.hazelcast.spi.InternalCompletableFuture;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.UuidUtil;

import java.util.UUID;
import java.util.concurrent.TimeUnit;
@@ -45,6 +44,7 @@
import static com.hazelcast.cp.internal.datastructures.countdownlatch.client.CountDownLatchMessageTaskFactoryProvider.GET_ROUND_TYPE;
import static com.hazelcast.cp.internal.datastructures.countdownlatch.client.CountDownLatchMessageTaskFactoryProvider.TRY_SET_COUNT_TYPE;
import static com.hazelcast.util.Preconditions.checkNotNull;
import static com.hazelcast.util.UuidUtil.newUnsecureUUID;

/**
* Client-side Raft-based proxy implementation of {@link ICountDownLatch}
@@ -67,10 +67,14 @@
public boolean await(long timeout, TimeUnit unit) {
checkNotNull(unit);

UUID invocationUid = newUnsecureUUID();
long timeoutMillis = Math.max(0, unit.toMillis(timeout));

int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(objectName) + Bits.LONG_SIZE_IN_BYTES;
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(objectName)
+ Bits.LONG_SIZE_IN_BYTES * 3;
ClientMessage msg = prepareClientMessage(groupId, objectName, dataSize, AWAIT_TYPE);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(timeoutMillis);
msg.updateFrameLength();

@@ -80,13 +84,13 @@ public boolean await(long timeout, TimeUnit unit) {
@Override
public void countDown() {
int round = getRound();
UUID invocationUid = UuidUtil.newUnsecureUUID();
UUID invocationUid = newUnsecureUUID();
for (;;) {
try {
countDown(round, invocationUid);
return;
} catch (OperationTimeoutException ignored) {
EmptyStatement.ignore(ignored);
} catch (OperationTimeoutException e) {
EmptyStatement.ignore(e);
// I can retry safely because my retry would be idempotent...
}
}
@@ -101,12 +105,12 @@ private int getRound() {
}

private void countDown(int round, UUID invocationUid) {
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(objectName) + Bits.INT_SIZE_IN_BYTES
+ Bits.LONG_SIZE_IN_BYTES * 2;
int dataSize = ClientMessage.HEADER_SIZE + dataSize(groupId) + calculateDataSize(objectName) + Bits.LONG_SIZE_IN_BYTES * 2
+ Bits.INT_SIZE_IN_BYTES;
ClientMessage msg = prepareClientMessage(groupId, objectName, dataSize, COUNT_DOWN_TYPE);
msg.set(round);
msg.set(invocationUid.getLeastSignificantBits());
msg.set(invocationUid.getMostSignificantBits());
msg.set(round);
msg.updateFrameLength();

invoke(msg, INT_RESPONSE_DECODER).join();
@@ -960,8 +960,10 @@ public void shutdown() {
getLifecycleService().shutdown();
}

public void doShutdown() {
proxySessionManager.shutdown();
public void doShutdown(boolean isGraceful) {
if (isGraceful) {
proxySessionManager.shutdown();
}
proxyManager.destroy();
connectionManager.shutdown();
clusterService.shutdown();
@@ -143,23 +143,27 @@ public boolean isRunning() {

@Override
public void shutdown() {
doShutdown(true);
}

@Override
public void terminate() {
doShutdown(false);
}

private void doShutdown(boolean isGraceful) {
if (!active.compareAndSet(true, false)) {
return;
}

fireLifecycleEvent(SHUTTING_DOWN);
HazelcastClient.shutdown(client.getName());
client.doShutdown();
client.doShutdown(isGraceful);
fireLifecycleEvent(SHUTDOWN);

shutdownExecutor();
}

@Override
public void terminate() {
shutdown();
}

private void shutdownExecutor() {
executor.shutdown();
try {
@@ -426,6 +426,10 @@ public void triggerRemoveMember(CPMember leavingMember) {
return;
}

initializeMembershipChangeContextForLeavingMember(leavingMember);
}

private void initializeMembershipChangeContextForLeavingMember(CPMember leavingMember) {
List<CPGroupId> leavingGroupIds = new ArrayList<CPGroupId>();
List<CPGroupMembershipChangeContext> leavingGroups = new ArrayList<CPGroupMembershipChangeContext>();
for (RaftGroup group : groups.values()) {
@@ -439,16 +443,10 @@ public void triggerRemoveMember(CPMember leavingMember) {
leavingGroupIds.add(groupId);
leavingGroups.add(new CPGroupMembershipChangeContext(groupId, group.getMembersCommitIndex(),
group.memberImpls(), substitute, leavingMember));
if (logger.isFineEnabled()) {
logger.fine("Substituted " + leavingMember + " with " + substitute + " in " + group);
}
} else {
leavingGroupIds.add(groupId);
leavingGroups.add(new CPGroupMembershipChangeContext(groupId, group.getMembersCommitIndex(),
group.memberImpls(), null, leavingMember));
if (logger.isFineEnabled()) {
logger.fine("Could not find a substitute for " + leavingMember + " in " + group);
}
}
}

@@ -760,12 +758,7 @@ public void run() {

@Override
public void run() {
if (isDiscoveryCompleted()) {
return;
}

if (!nodeEngine.getClusterService().isJoined()) {
scheduleDiscoveryInitialCPMembersTask();
if (shouldSkipOrReschedule()) {
return;
}

@@ -777,25 +770,16 @@ public void run() {
return;
}
}

latestMembers = members;

if (members.size() < config.getCPMemberCount()) {
if (logger.isFineEnabled()) {
logger.warning("Waiting for " + config.getCPMemberCount() + " CP members to join the cluster. "
+ "Current CP member count: " + members.size());
}
scheduleDiscoveryInitialCPMembersTask();
if (rescheduleIfCPMemberCountNotSatisfied(members)) {
return;
}

List<CPMember> cpMembers = getInitialCPMembers(members);

if (!cpMembers.contains(getLocalMember())) {
if (logger.isFineEnabled()) {
logger.fine("I am not an initial CP member! I'll serve as an AP member.");
}
localMember.set(null);
disableDiscovery();
if (completeDiscoveryIfNotCPMember(cpMembers)) {
return;
}

@@ -813,6 +797,46 @@ public void run() {
scheduleRaftGroupMembershipManagementTasks();
}

private boolean shouldSkipOrReschedule() {
if (isDiscoveryCompleted()) {
return true;
}

if (!nodeEngine.getClusterService().isJoined()) {
scheduleDiscoveryInitialCPMembersTask();
return true;
}

return false;
}

private boolean rescheduleIfCPMemberCountNotSatisfied(Collection<Member> members) {
if (members.size() < config.getCPMemberCount()) {
if (logger.isFineEnabled()) {
logger.fine("Waiting for " + config.getCPMemberCount() + " CP members to join the cluster. "
+ "Current CP member count: " + members.size());
}

scheduleDiscoveryInitialCPMembersTask();
return true;
}
return false;
}

private boolean completeDiscoveryIfNotCPMember(List<CPMember> cpMembers) {
if (!cpMembers.contains(getLocalMember())) {
if (logger.isFineEnabled()) {
logger.fine("I am not an initial CP member! I'll serve as an AP member.");
}

localMember.set(null);
disableDiscovery();
return true;
}

return false;
}

@SuppressWarnings("unchecked")
private boolean commitInitialMetadataRaftGroup(List<CPMember> initialCPMembers) {
int metadataGroupSize = config.getGroupSize();
@@ -16,6 +16,7 @@

package com.hazelcast.cp.internal.datastructures;

import com.hazelcast.cp.internal.datastructures.spi.blocking.WaitKeyContainer;
import com.hazelcast.internal.serialization.DataSerializerHook;
import com.hazelcast.internal.serialization.impl.FactoryIdHelper;
import com.hazelcast.nio.serialization.DataSerializableFactory;
@@ -33,8 +34,10 @@

public static final int F_ID = FactoryIdHelper.getFactoryId(RAFT_DS_FACTORY, FACTORY_ID);

public static final int EXPIRE_WAIT_KEYS_OP = 1;
public static final int DESTROY_RAFT_OBJECT_OP = 2;

public static final int WAIT_KEY_CONTAINER = 1;
public static final int EXPIRE_WAIT_KEYS_OP = 2;
public static final int DESTROY_RAFT_OBJECT_OP = 3;

@Override
public int getFactoryId() {
@@ -47,6 +50,8 @@ public DataSerializableFactory createFactory() {
@Override
public IdentifiedDataSerializable create(int typeId) {
switch (typeId) {
case WAIT_KEY_CONTAINER:
return new WaitKeyContainer();
case EXPIRE_WAIT_KEYS_OP:
return new ExpireWaitKeysOp();
case DESTROY_RAFT_OBJECT_OP:
@@ -17,37 +17,35 @@
package com.hazelcast.cp.internal.datastructures.countdownlatch;

import com.hazelcast.core.ICountDownLatch;
import com.hazelcast.cp.internal.datastructures.spi.blocking.WaitKey;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.IdentifiedDataSerializable;
import com.hazelcast.cp.internal.datastructures.spi.blocking.WaitKey;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.cp.internal.session.AbstractProxySessionManager.NO_SESSION_ID;
import static com.hazelcast.cp.internal.util.UUIDSerializationUtil.readUUID;
import static com.hazelcast.cp.internal.util.UUIDSerializationUtil.writeUUID;
import static com.hazelcast.util.Preconditions.checkNotNull;

/**
* Represents a {@link ICountDownLatch#await(long, TimeUnit)}} invocation
*/
public class AwaitInvocationKey implements WaitKey, IdentifiedDataSerializable {

private String name;
private long commitIndex;
private UUID invocationUid;

AwaitInvocationKey() {
}

AwaitInvocationKey(String name, long commitIndex) {
checkNotNull(name);
this.name = name;
AwaitInvocationKey(long commitIndex, UUID invocationUid) {
checkNotNull(invocationUid);
this.commitIndex = commitIndex;
}

@Override
public String name() {
return name;
this.invocationUid = invocationUid;
}

@Override
@@ -60,6 +58,11 @@ public long commitIndex() {
return commitIndex;
}

@Override
public UUID invocationUid() {
return invocationUid;
}

@Override
public int getFactoryId() {
return RaftCountDownLatchDataSerializerHook.F_ID;
@@ -72,14 +75,14 @@ public int getId() {

@Override
public void writeData(ObjectDataOutput out) throws IOException {
out.writeUTF(name);
out.writeLong(commitIndex);
writeUUID(out, invocationUid);
}

@Override
public void readData(ObjectDataInput in) throws IOException {
name = in.readUTF();
commitIndex = in.readLong();
invocationUid = readUUID(in);
}

@Override
@@ -96,19 +99,18 @@ public boolean equals(Object o) {
if (commitIndex != that.commitIndex) {
return false;
}
return name.equals(that.name);
return invocationUid.equals(that.invocationUid);
}

@Override
public int hashCode() {
int result = name.hashCode();
result = 31 * result + (int) (commitIndex ^ (commitIndex >>> 32));
int result = (int) (commitIndex ^ (commitIndex >>> 32));
result = 31 * result + invocationUid.hashCode();
return result;
}

@Override
public String toString() {
return "AwaitInvocationKey{" + "name='" + name + '\'' + ", commitIndex=" + commitIndex + '}';
return "AwaitInvocationKey{" + "commitIndex=" + commitIndex + ", invocationUid=" + invocationUid + '}';
}

}

0 comments on commit 4214776

Please sign in to comment.
You can’t perform that action at this time.