Skip to content

Commit

Permalink
Merge pull request hazelcast#14693 from metanet/cp-subsystem/master/f…
Browse files Browse the repository at this point in the history
…ix/membership-change-bug

Change membership change order in the CP Subsystem
  • Loading branch information
metanet committed Mar 14, 2019
2 parents add5201 + 183c605 commit 210f531
Show file tree
Hide file tree
Showing 14 changed files with 248 additions and 88 deletions.
Expand Up @@ -58,6 +58,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.STEPPED_DOWN;
import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.TERMINATED;
import static com.hazelcast.spi.ExecutionService.ASYNC_EXECUTOR;

Expand Down Expand Up @@ -234,9 +235,14 @@ private boolean send(AsyncRaftOp operation, Endpoint target) {
@Override
public void onNodeStatusChange(RaftNodeStatus status) {
if (status == TERMINATED) {
Collection<RaftGroupLifecycleAwareService> services = nodeEngine.getServices(RaftGroupLifecycleAwareService.class);
for (RaftGroupLifecycleAwareService service : services) {
service.onGroupDestroy(groupId);
Collection<RaftNodeLifecycleAwareService> services = nodeEngine.getServices(RaftNodeLifecycleAwareService.class);
for (RaftNodeLifecycleAwareService service : services) {
service.onRaftGroupDestroyed(groupId);
}
} else if (status == STEPPED_DOWN) {
Collection<RaftNodeLifecycleAwareService> services = nodeEngine.getServices(RaftNodeLifecycleAwareService.class);
for (RaftNodeLifecycleAwareService service : services) {
service.onRaftNodeSteppedDown(groupId);
}
}
}
Expand Down
Expand Up @@ -247,11 +247,11 @@ public void run() {
}

List<CPGroupMembershipChange> changes = schedule.getChanges();
Map<CPGroupId, Tuple2<Long, Long>> changedGroups = new ConcurrentHashMap<CPGroupId, Tuple2<Long, Long>>();
CountDownLatch latch = new CountDownLatch(changes.size());
Map<CPGroupId, Tuple2<Long, Long>> changedGroups = new ConcurrentHashMap<CPGroupId, Tuple2<Long, Long>>();

for (CPGroupMembershipChange change : changes) {
applyMembershipChangeOnRaftGroup(changedGroups, latch, change);
applyOnRaftGroup(latch, changedGroups, change);
}

try {
Expand All @@ -269,105 +269,112 @@ private MembershipChangeSchedule getMembershipChangeSchedule() {
return f.join();
}

private void applyMembershipChangeOnRaftGroup(final Map<CPGroupId, Tuple2<Long, Long>> changedGroups,
final CountDownLatch latch,
final CPGroupMembershipChange change) {
final CPGroupId groupId = change.getGroupId();
private void applyOnRaftGroup(final CountDownLatch latch, final Map<CPGroupId, Tuple2<Long, Long>> changedGroups,
final CPGroupMembershipChange change) {
ICompletableFuture<Long> future;
if (change.getMemberToAdd() == null) {
future = new SimpleCompletedFuture<Long>(change.getMembersCommitIndex());
if (change.getMemberToRemove() != null) {
future = invocationManager.changeMembership(change.getGroupId(), change.getMembersCommitIndex(),
change.getMemberToRemove(), MembershipChangeMode.REMOVE);
} else {
logger.fine("Adding " + change.getMemberToAdd() + " to " + groupId);

future = invocationManager.changeMembership(groupId, change.getMembersCommitIndex(),
change.getMemberToAdd(), MembershipChangeMode.ADD);
future = new SimpleCompletedFuture<Long>(change.getMembersCommitIndex());
}

future.andThen(new ExecutionCallback<Long>() {
@Override
public void onResponse(Long addCommitIndex) {
if (change.getMemberToRemove() == null) {
changedGroups.put(groupId, Tuple2.of(change.getMembersCommitIndex(), addCommitIndex));
latch.countDown();
public void onResponse(Long removeCommitIndex) {
if (change.getMemberToAdd() != null) {
addMember(latch, changedGroups, change, removeCommitIndex);
} else {
removeMember(changedGroups, latch, change, addCommitIndex);
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex));
latch.countDown();
}
}

@Override
public void onFailure(Throwable t) {
long addCommitIndex = getMemberAddCommitIndex(changedGroups, change, t);
if (addCommitIndex != NA_MEMBERS_COMMIT_INDEX) {
onResponse(addCommitIndex);
long removeCommitIndex = checkMemberRemoveCommitIndex(changedGroups, change, t);
if (removeCommitIndex != NA_MEMBERS_COMMIT_INDEX) {
onResponse(removeCommitIndex);
} else {
latch.countDown();
}
}
});
}

private void removeMember(final Map<CPGroupId, Tuple2<Long, Long>> changedGroups,
final CountDownLatch latch,
final CPGroupMembershipChange change,
final long currentCommitIndex) {
private void addMember(final CountDownLatch latch, final Map<CPGroupId, Tuple2<Long, Long>> changedGroups,
final CPGroupMembershipChange change, final long currentCommitIndex) {
ICompletableFuture<Long> future = invocationManager.changeMembership(change.getGroupId(), currentCommitIndex,
change.getMemberToRemove(), MembershipChangeMode.REMOVE);
change.getMemberToAdd(), MembershipChangeMode.ADD);
future.andThen(new ExecutionCallback<Long>() {
@Override
public void onResponse(Long removeCommitIndex) {
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex));
public void onResponse(Long addCommitIndex) {
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), addCommitIndex));
latch.countDown();
}

@Override
public void onFailure(Throwable t) {
long removeCommitIndex = getMemberRemoveCommitIndex(change, t);
if (removeCommitIndex != NA_MEMBERS_COMMIT_INDEX) {
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex));
}
checkMemberAddCommitIndex(changedGroups, change, t);
latch.countDown();
}
});
}

private long getMemberAddCommitIndex(Map<CPGroupId, Tuple2<Long, Long>> changedGroups, CPGroupMembershipChange change,
Throwable t) {
private void checkMemberAddCommitIndex(Map<CPGroupId, Tuple2<Long, Long>> changedGroups, CPGroupMembershipChange change,
Throwable t) {
CPMemberInfo memberToAdd = change.getMemberToAdd();
if (t instanceof MismatchingGroupMembersCommitIndexException) {
MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t;
String msg = "MEMBER ADD commit of " + change + " failed. Actual group members: " + m.getMembers()
+ " with commit index: " + m.getCommitIndex();

// learnt group members must contain the added member and the current members I know

if (!m.getMembers().contains(change.getMemberToAdd())) {
if (!m.getMembers().contains(memberToAdd)) {
logger.severe(msg);
return NA_MEMBERS_COMMIT_INDEX;
return;
}

for (CPMemberInfo member : change.getMembers()) {
if (!member.equals(change.getMemberToRemove()) && !m.getMembers().contains(member)) {
if (change.getMemberToRemove() != null) {
// I expect the member to be removed to be removed from the group
if (m.getMembers().contains(change.getMemberToRemove())) {
logger.severe(msg);
return NA_MEMBERS_COMMIT_INDEX;
return;
}

// I know the removed member has left the group and the added member has joined.
// So member counts must be same...
if (m.getMembers().size() != change.getMembers().size()) {
logger.severe(msg);
return;
}
} else if (m.getMembers().size() != (change.getMembers().size() + 1)) {
// if there is no removed member, I expect number of the learnt number of group members to be 1 greater than
// the current members I know
logger.severe(msg);
return;
}

// If the scheduled member-remove is already done, put its result and do not retry it
if (change.getMemberToRemove() != null && !m.getMembers().contains(change.getMemberToRemove())) {
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex()));
return NA_MEMBERS_COMMIT_INDEX;
for (CPMemberInfo member : change.getMembers()) {
if (!member.equals(change.getMemberToRemove()) && !m.getMembers().contains(member)) {
logger.severe(msg);
return;
}
}

return m.getCommitIndex();
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex()));
return;
}

logger.severe("Cannot get MEMBER ADD result of " + change.getMemberToAdd() + " to " + change.getGroupId()
logger.severe("Cannot get MEMBER ADD result of " + memberToAdd + " to " + change.getGroupId()
+ " with members commit index: " + change.getMembersCommitIndex(), t);
return NA_MEMBERS_COMMIT_INDEX;
}

private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwable t) {
@SuppressWarnings("checkstyle:cyclomaticcomplexity")
private long checkMemberRemoveCommitIndex(Map<CPGroupId, Tuple2<Long, Long>> changedGroups,
CPGroupMembershipChange change, Throwable t) {
CPMemberInfo removedMember = change.getMemberToRemove();

if (t instanceof MismatchingGroupMembersCommitIndexException) {
MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t;
String msg = "MEMBER REMOVE commit of " + change + " failed. Actual group members: " + m.getMembers()
Expand All @@ -378,19 +385,26 @@ private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwabl
return NA_MEMBERS_COMMIT_INDEX;
}

if (change.getMemberToAdd() != null) {
// I expect the added member to be joined to the group
if (!m.getMembers().contains(change.getMemberToAdd())) {
logger.severe(msg);
return NA_MEMBERS_COMMIT_INDEX;
}

// I know the removed member has left the group and the added member has joined.
// The member to be added can be already joined to the group
if (change.getMemberToAdd() != null && m.getMembers().contains(change.getMemberToAdd())) {
// I know the removed member has left the group and the member to be added has already joined to the group.
// So member sizes must be same...
if (m.getMembers().size() != change.getMembers().size()) {
logger.severe(msg);
return NA_MEMBERS_COMMIT_INDEX;
}

for (CPMemberInfo member : change.getMembers()) {
// Other group members except the removed one must be still present...
if (!member.equals(removedMember) && !m.getMembers().contains(member)) {
logger.severe(msg);
return NA_MEMBERS_COMMIT_INDEX;
}
}

// both member-remove and member-add are done.
changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex()));
return NA_MEMBERS_COMMIT_INDEX;
} else if (m.getMembers().size() != (change.getMembers().size() - 1)) {
// if there is no added member, I expect number of the learnt group members to be 1 less than
// the current members I know
Expand All @@ -399,7 +413,7 @@ private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwabl
}

for (CPMemberInfo member : change.getMembers()) {
// Other group members except the removed one and added one must be still present...
// Other group members except the removed one must be still present...
if (!member.equals(removedMember) && !m.getMembers().contains(member)) {
logger.severe(msg);
return NA_MEMBERS_COMMIT_INDEX;
Expand Down
Expand Up @@ -19,13 +19,20 @@
import com.hazelcast.cp.CPGroupId;

/**
* Contains methods that are invoked on life cycle changes of a Raft group
* Contains methods that are invoked on life cycle changes of a Raft node
*/
public interface RaftGroupLifecycleAwareService {
public interface RaftNodeLifecycleAwareService {

/**
* Called on the thread of the Raft group when the given Raft group is
* Called on the thread of the Raft group when the given Raft node is
* destroyed, either gracefully or forcefully.
*/
void onGroupDestroy(CPGroupId groupId);
void onRaftGroupDestroyed(CPGroupId groupId);

/**
* Called on the thread of the Raft group when the given Raft node is
* stepped down, either because it is shutting down, or it could not be
* added to the Raft group
*/
void onRaftNodeSteppedDown(CPGroupId groupId);
}
21 changes: 18 additions & 3 deletions hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java
Expand Up @@ -109,7 +109,8 @@
*/
@SuppressWarnings({"checkstyle:methodcount", "checkstyle:classfanoutcomplexity", "checkstyle:classdataabstractioncoupling"})
public class RaftService implements ManagedService, SnapshotAwareService<MetadataRaftGroupSnapshot>, GracefulShutdownAwareService,
MembershipAwareService, CPSubsystemManagementService, PreJoinAwareService {
MembershipAwareService, CPSubsystemManagementService, PreJoinAwareService,
RaftNodeLifecycleAwareService {

public static final String SERVICE_NAME = "hz:core:raft";

Expand Down Expand Up @@ -632,8 +633,12 @@ void createRaftNode(CPGroupId groupId, Collection<CPMemberInfo> members, CPMembe
logger.warning("Not creating RaftNode[" + groupId + "] since the CP group is already destroyed");
return;
} else if (steppedDownGroupIds.contains(groupId)) {
logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already stepped down");
return;
if (!nodeEngine.isRunning()) {
logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already stepped down");
return;
}

steppedDownGroupIds.remove(groupId);
}

RaftIntegration integration = new NodeEngineRaftIntegration(nodeEngine, groupId, localCPMember);
Expand Down Expand Up @@ -840,6 +845,16 @@ public void handleActiveCPMembers(RaftGroupId latestMetadataGroupId, long member
metadataGroupManager.handleMetadataGroupId(latestMetadataGroupId);
}

@Override
public void onRaftGroupDestroyed(CPGroupId groupId) {
destroyRaftNode(groupId);
}

@Override
public void onRaftNodeSteppedDown(CPGroupId groupId) {
stepDownRaftNode(groupId);
}

private class InitializeRaftNodeTask implements Runnable {
private final CPGroupId groupId;

Expand Down
Expand Up @@ -19,7 +19,7 @@
import com.hazelcast.core.IAtomicLong;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService;
import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService;
import com.hazelcast.cp.internal.RaftService;
import com.hazelcast.cp.internal.datastructures.atomiclong.proxy.RaftAtomicLongProxy;
import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService;
Expand Down Expand Up @@ -47,7 +47,7 @@
* Contains Raft-based atomic long instances, implements snapshotting,
* and creates proxies
*/
public class RaftAtomicLongService implements RaftManagedService, RaftRemoteService, RaftGroupLifecycleAwareService,
public class RaftAtomicLongService implements RaftManagedService, RaftRemoteService, RaftNodeLifecycleAwareService,
SnapshotAwareService<RaftAtomicLongSnapshot> {

/**
Expand Down Expand Up @@ -128,7 +128,7 @@ public boolean destroyRaftObject(CPGroupId groupId, String name) {
}

@Override
public void onGroupDestroy(CPGroupId groupId) {
public void onRaftGroupDestroyed(CPGroupId groupId) {
Iterator<Tuple2<CPGroupId, String>> iter = atomicLongs.keySet().iterator();
while (iter.hasNext()) {
Tuple2<CPGroupId, String> next = iter.next();
Expand All @@ -139,6 +139,10 @@ public void onGroupDestroy(CPGroupId groupId) {
}
}

@Override
public void onRaftNodeSteppedDown(CPGroupId groupId) {
}

public RaftAtomicLong getAtomicLong(CPGroupId groupId, String name) {
checkNotNull(groupId);
checkNotNull(name);
Expand Down
Expand Up @@ -19,7 +19,7 @@
import com.hazelcast.core.IAtomicReference;
import com.hazelcast.cp.CPGroupId;
import com.hazelcast.cp.internal.RaftGroupId;
import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService;
import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService;
import com.hazelcast.cp.internal.RaftService;
import com.hazelcast.cp.internal.datastructures.atomicref.proxy.RaftAtomicRefProxy;
import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService;
Expand Down Expand Up @@ -48,7 +48,7 @@
* Contains Raft-based atomic reference instances, implements snapshotting,
* and creates proxies
*/
public class RaftAtomicRefService implements RaftManagedService, RaftRemoteService, RaftGroupLifecycleAwareService,
public class RaftAtomicRefService implements RaftManagedService, RaftRemoteService, RaftNodeLifecycleAwareService,
SnapshotAwareService<RaftAtomicRefSnapshot> {

/**
Expand Down Expand Up @@ -121,7 +121,7 @@ public void restoreSnapshot(CPGroupId groupId, long commitIndex, RaftAtomicRefSn
}

@Override
public void onGroupDestroy(CPGroupId groupId) {
public void onRaftGroupDestroyed(CPGroupId groupId) {
Iterator<Tuple2<CPGroupId, String>> iter = atomicRefs.keySet().iterator();
while (iter.hasNext()) {
Tuple2<CPGroupId, String> next = iter.next();
Expand All @@ -132,6 +132,10 @@ public void onGroupDestroy(CPGroupId groupId) {
}
}

@Override
public void onRaftNodeSteppedDown(CPGroupId groupId) {
}

@Override
public boolean destroyRaftObject(CPGroupId groupId, String name) {
Tuple2<CPGroupId, String> key = Tuple2.of(groupId, name);
Expand Down

0 comments on commit 210f531

Please sign in to comment.