diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java index 7262e61e9c26..8c7efa282e31 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java @@ -58,6 +58,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.STEPPED_DOWN; import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.TERMINATED; import static com.hazelcast.spi.ExecutionService.ASYNC_EXECUTOR; @@ -234,9 +235,14 @@ private boolean send(AsyncRaftOp operation, Endpoint target) { @Override public void onNodeStatusChange(RaftNodeStatus status) { if (status == TERMINATED) { - Collection services = nodeEngine.getServices(RaftGroupLifecycleAwareService.class); - for (RaftGroupLifecycleAwareService service : services) { - service.onGroupDestroy(groupId); + Collection services = nodeEngine.getServices(RaftNodeLifecycleAwareService.class); + for (RaftNodeLifecycleAwareService service : services) { + service.onRaftGroupDestroyed(groupId); + } + } else if (status == STEPPED_DOWN) { + Collection services = nodeEngine.getServices(RaftNodeLifecycleAwareService.class); + for (RaftNodeLifecycleAwareService service : services) { + service.onRaftNodeSteppedDown(groupId); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java index a5ed1b0f2470..e37b71dd858e 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java @@ -247,11 +247,11 @@ public void run() { } List changes = schedule.getChanges(); - Map> changedGroups = new ConcurrentHashMap>(); CountDownLatch latch = new CountDownLatch(changes.size()); + Map> changedGroups = new ConcurrentHashMap>(); for (CPGroupMembershipChange change : changes) { - applyMembershipChangeOnRaftGroup(changedGroups, latch, change); + applyOnRaftGroup(latch, changedGroups, change); } try { @@ -269,36 +269,32 @@ private MembershipChangeSchedule getMembershipChangeSchedule() { return f.join(); } - private void applyMembershipChangeOnRaftGroup(final Map> changedGroups, - final CountDownLatch latch, - final CPGroupMembershipChange change) { - final CPGroupId groupId = change.getGroupId(); + private void applyOnRaftGroup(final CountDownLatch latch, final Map> changedGroups, + final CPGroupMembershipChange change) { ICompletableFuture future; - if (change.getMemberToAdd() == null) { - future = new SimpleCompletedFuture(change.getMembersCommitIndex()); + if (change.getMemberToRemove() != null) { + future = invocationManager.changeMembership(change.getGroupId(), change.getMembersCommitIndex(), + change.getMemberToRemove(), MembershipChangeMode.REMOVE); } else { - logger.fine("Adding " + change.getMemberToAdd() + " to " + groupId); - - future = invocationManager.changeMembership(groupId, change.getMembersCommitIndex(), - change.getMemberToAdd(), MembershipChangeMode.ADD); + future = new SimpleCompletedFuture(change.getMembersCommitIndex()); } future.andThen(new ExecutionCallback() { @Override - public void onResponse(Long addCommitIndex) { - if (change.getMemberToRemove() == null) { - changedGroups.put(groupId, Tuple2.of(change.getMembersCommitIndex(), addCommitIndex)); - latch.countDown(); + public void onResponse(Long removeCommitIndex) { + if (change.getMemberToAdd() != null) { + addMember(latch, changedGroups, change, removeCommitIndex); } else { - removeMember(changedGroups, latch, change, addCommitIndex); + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex)); + latch.countDown(); } } @Override public void onFailure(Throwable t) { - long addCommitIndex = getMemberAddCommitIndex(changedGroups, change, t); - if (addCommitIndex != NA_MEMBERS_COMMIT_INDEX) { - onResponse(addCommitIndex); + long removeCommitIndex = checkMemberRemoveCommitIndex(changedGroups, change, t); + if (removeCommitIndex != NA_MEMBERS_COMMIT_INDEX) { + onResponse(removeCommitIndex); } else { latch.countDown(); } @@ -306,32 +302,28 @@ public void onFailure(Throwable t) { }); } - private void removeMember(final Map> changedGroups, - final CountDownLatch latch, - final CPGroupMembershipChange change, - final long currentCommitIndex) { + private void addMember(final CountDownLatch latch, final Map> changedGroups, + final CPGroupMembershipChange change, final long currentCommitIndex) { ICompletableFuture future = invocationManager.changeMembership(change.getGroupId(), currentCommitIndex, - change.getMemberToRemove(), MembershipChangeMode.REMOVE); + change.getMemberToAdd(), MembershipChangeMode.ADD); future.andThen(new ExecutionCallback() { @Override - public void onResponse(Long removeCommitIndex) { - changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex)); + public void onResponse(Long addCommitIndex) { + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), addCommitIndex)); latch.countDown(); } @Override public void onFailure(Throwable t) { - long removeCommitIndex = getMemberRemoveCommitIndex(change, t); - if (removeCommitIndex != NA_MEMBERS_COMMIT_INDEX) { - changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex)); - } + checkMemberAddCommitIndex(changedGroups, change, t); latch.countDown(); } }); } - private long getMemberAddCommitIndex(Map> changedGroups, CPGroupMembershipChange change, - Throwable t) { + private void checkMemberAddCommitIndex(Map> changedGroups, CPGroupMembershipChange change, + Throwable t) { + CPMemberInfo memberToAdd = change.getMemberToAdd(); if (t instanceof MismatchingGroupMembersCommitIndexException) { MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t; String msg = "MEMBER ADD commit of " + change + " failed. Actual group members: " + m.getMembers() @@ -339,35 +331,50 @@ private long getMemberAddCommitIndex(Map> changedG // learnt group members must contain the added member and the current members I know - if (!m.getMembers().contains(change.getMemberToAdd())) { + if (!m.getMembers().contains(memberToAdd)) { logger.severe(msg); - return NA_MEMBERS_COMMIT_INDEX; + return; } - for (CPMemberInfo member : change.getMembers()) { - if (!member.equals(change.getMemberToRemove()) && !m.getMembers().contains(member)) { + if (change.getMemberToRemove() != null) { + // I expect the member to be removed to be removed from the group + if (m.getMembers().contains(change.getMemberToRemove())) { logger.severe(msg); - return NA_MEMBERS_COMMIT_INDEX; + return; + } + + // I know the removed member has left the group and the added member has joined. + // So member counts must be same... + if (m.getMembers().size() != change.getMembers().size()) { + logger.severe(msg); + return; } + } else if (m.getMembers().size() != (change.getMembers().size() + 1)) { + // if there is no removed member, I expect number of the learnt number of group members to be 1 greater than + // the current members I know + logger.severe(msg); + return; } - // If the scheduled member-remove is already done, put its result and do not retry it - if (change.getMemberToRemove() != null && !m.getMembers().contains(change.getMemberToRemove())) { - changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex())); - return NA_MEMBERS_COMMIT_INDEX; + for (CPMemberInfo member : change.getMembers()) { + if (!member.equals(change.getMemberToRemove()) && !m.getMembers().contains(member)) { + logger.severe(msg); + return; + } } - return m.getCommitIndex(); + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex())); + return; } - logger.severe("Cannot get MEMBER ADD result of " + change.getMemberToAdd() + " to " + change.getGroupId() + logger.severe("Cannot get MEMBER ADD result of " + memberToAdd + " to " + change.getGroupId() + " with members commit index: " + change.getMembersCommitIndex(), t); - return NA_MEMBERS_COMMIT_INDEX; } - private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwable t) { + @SuppressWarnings("checkstyle:cyclomaticcomplexity") + private long checkMemberRemoveCommitIndex(Map> changedGroups, + CPGroupMembershipChange change, Throwable t) { CPMemberInfo removedMember = change.getMemberToRemove(); - if (t instanceof MismatchingGroupMembersCommitIndexException) { MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t; String msg = "MEMBER REMOVE commit of " + change + " failed. Actual group members: " + m.getMembers() @@ -378,19 +385,26 @@ private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwabl return NA_MEMBERS_COMMIT_INDEX; } - if (change.getMemberToAdd() != null) { - // I expect the added member to be joined to the group - if (!m.getMembers().contains(change.getMemberToAdd())) { - logger.severe(msg); - return NA_MEMBERS_COMMIT_INDEX; - } - - // I know the removed member has left the group and the added member has joined. + // The member to be added can be already joined to the group + if (change.getMemberToAdd() != null && m.getMembers().contains(change.getMemberToAdd())) { + // I know the removed member has left the group and the member to be added has already joined to the group. // So member sizes must be same... if (m.getMembers().size() != change.getMembers().size()) { logger.severe(msg); return NA_MEMBERS_COMMIT_INDEX; } + + for (CPMemberInfo member : change.getMembers()) { + // Other group members except the removed one must be still present... + if (!member.equals(removedMember) && !m.getMembers().contains(member)) { + logger.severe(msg); + return NA_MEMBERS_COMMIT_INDEX; + } + } + + // both member-remove and member-add are done. + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex())); + return NA_MEMBERS_COMMIT_INDEX; } else if (m.getMembers().size() != (change.getMembers().size() - 1)) { // if there is no added member, I expect number of the learnt group members to be 1 less than // the current members I know @@ -399,7 +413,7 @@ private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwabl } for (CPMemberInfo member : change.getMembers()) { - // Other group members except the removed one and added one must be still present... + // Other group members except the removed one must be still present... if (!member.equals(removedMember) && !m.getMembers().contains(member)) { logger.severe(msg); return NA_MEMBERS_COMMIT_INDEX; diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupLifecycleAwareService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftNodeLifecycleAwareService.java similarity index 71% rename from hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupLifecycleAwareService.java rename to hazelcast/src/main/java/com/hazelcast/cp/internal/RaftNodeLifecycleAwareService.java index 96e21735f2b2..e6d1cfafdb66 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupLifecycleAwareService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftNodeLifecycleAwareService.java @@ -19,13 +19,20 @@ import com.hazelcast.cp.CPGroupId; /** - * Contains methods that are invoked on life cycle changes of a Raft group + * Contains methods that are invoked on life cycle changes of a Raft node */ -public interface RaftGroupLifecycleAwareService { +public interface RaftNodeLifecycleAwareService { /** - * Called on the thread of the Raft group when the given Raft group is + * Called on the thread of the Raft group when the given Raft node is * destroyed, either gracefully or forcefully. */ - void onGroupDestroy(CPGroupId groupId); + void onRaftGroupDestroyed(CPGroupId groupId); + + /** + * Called on the thread of the Raft group when the given Raft node is + * stepped down, either because it is shutting down, or it could not be + * added to the Raft group + */ + void onRaftNodeSteppedDown(CPGroupId groupId); } diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java index 32fc3b10abf3..38f3ebf4d48a 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java @@ -109,7 +109,8 @@ */ @SuppressWarnings({"checkstyle:methodcount", "checkstyle:classfanoutcomplexity", "checkstyle:classdataabstractioncoupling"}) public class RaftService implements ManagedService, SnapshotAwareService, GracefulShutdownAwareService, - MembershipAwareService, CPSubsystemManagementService, PreJoinAwareService { + MembershipAwareService, CPSubsystemManagementService, PreJoinAwareService, + RaftNodeLifecycleAwareService { public static final String SERVICE_NAME = "hz:core:raft"; @@ -632,8 +633,12 @@ void createRaftNode(CPGroupId groupId, Collection members, CPMembe logger.warning("Not creating RaftNode[" + groupId + "] since the CP group is already destroyed"); return; } else if (steppedDownGroupIds.contains(groupId)) { - logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already stepped down"); - return; + if (!nodeEngine.isRunning()) { + logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already stepped down"); + return; + } + + steppedDownGroupIds.remove(groupId); } RaftIntegration integration = new NodeEngineRaftIntegration(nodeEngine, groupId, localCPMember); @@ -840,6 +845,16 @@ public void handleActiveCPMembers(RaftGroupId latestMetadataGroupId, long member metadataGroupManager.handleMetadataGroupId(latestMetadataGroupId); } + @Override + public void onRaftGroupDestroyed(CPGroupId groupId) { + destroyRaftNode(groupId); + } + + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + stepDownRaftNode(groupId); + } + private class InitializeRaftNodeTask implements Runnable { private final CPGroupId groupId; diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java index aa470a9ba18e..bb7c83488b10 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java @@ -19,7 +19,7 @@ import com.hazelcast.core.IAtomicLong; import com.hazelcast.cp.CPGroupId; import com.hazelcast.cp.internal.RaftGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.datastructures.atomiclong.proxy.RaftAtomicLongProxy; import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService; @@ -47,7 +47,7 @@ * Contains Raft-based atomic long instances, implements snapshotting, * and creates proxies */ -public class RaftAtomicLongService implements RaftManagedService, RaftRemoteService, RaftGroupLifecycleAwareService, +public class RaftAtomicLongService implements RaftManagedService, RaftRemoteService, RaftNodeLifecycleAwareService, SnapshotAwareService { /** @@ -128,7 +128,7 @@ public boolean destroyRaftObject(CPGroupId groupId, String name) { } @Override - public void onGroupDestroy(CPGroupId groupId) { + public void onRaftGroupDestroyed(CPGroupId groupId) { Iterator> iter = atomicLongs.keySet().iterator(); while (iter.hasNext()) { Tuple2 next = iter.next(); @@ -139,6 +139,10 @@ public void onGroupDestroy(CPGroupId groupId) { } } + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + } + public RaftAtomicLong getAtomicLong(CPGroupId groupId, String name) { checkNotNull(groupId); checkNotNull(name); diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java index 6fe9e03a8bea..ac8347a27ad3 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java @@ -19,7 +19,7 @@ import com.hazelcast.core.IAtomicReference; import com.hazelcast.cp.CPGroupId; import com.hazelcast.cp.internal.RaftGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.datastructures.atomicref.proxy.RaftAtomicRefProxy; import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService; @@ -48,7 +48,7 @@ * Contains Raft-based atomic reference instances, implements snapshotting, * and creates proxies */ -public class RaftAtomicRefService implements RaftManagedService, RaftRemoteService, RaftGroupLifecycleAwareService, +public class RaftAtomicRefService implements RaftManagedService, RaftRemoteService, RaftNodeLifecycleAwareService, SnapshotAwareService { /** @@ -121,7 +121,7 @@ public void restoreSnapshot(CPGroupId groupId, long commitIndex, RaftAtomicRefSn } @Override - public void onGroupDestroy(CPGroupId groupId) { + public void onRaftGroupDestroyed(CPGroupId groupId) { Iterator> iter = atomicRefs.keySet().iterator(); while (iter.hasNext()) { Tuple2 next = iter.next(); @@ -132,6 +132,10 @@ public void onGroupDestroy(CPGroupId groupId) { } } + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + } + @Override public boolean destroyRaftObject(CPGroupId groupId, String name) { Tuple2 key = Tuple2.of(groupId, name); diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java index e129c0418eb3..d9935a881740 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java @@ -17,7 +17,7 @@ package com.hazelcast.cp.internal.datastructures.spi.blocking; import com.hazelcast.cp.CPGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService; import com.hazelcast.cp.internal.datastructures.spi.RaftRemoteService; @@ -65,7 +65,7 @@ * @param concrete ty;e lf the resource registry */ public abstract class AbstractBlockingService, RR extends ResourceRegistry> - implements RaftManagedService, RaftGroupLifecycleAwareService, RaftRemoteService, SessionAwareService, + implements RaftManagedService, RaftNodeLifecycleAwareService, RaftRemoteService, SessionAwareService, SnapshotAwareService, LiveOperationsTracker { public static final long WAIT_TIMEOUT_TASK_UPPER_BOUND_MILLIS = 1500; @@ -212,7 +212,7 @@ public final Collection getAttachedSessions(CPGroupId groupId) { } @Override - public final void onGroupDestroy(CPGroupId groupId) { + public final void onRaftGroupDestroyed(CPGroupId groupId) { ResourceRegistry registry = registries.get(groupId); if (registry != null) { Collection indices = registry.destroy(); @@ -220,6 +220,10 @@ public final void onGroupDestroy(CPGroupId groupId) { } } + @Override + public final void onRaftNodeSteppedDown(CPGroupId groupId) { + } + @Override public final void populate(LiveOperations liveOperations) { for (RR registry : registries.values()) { diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java index d49e35490f8b..45d647d22554 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java @@ -70,11 +70,19 @@ public interface RaftNode { /** * Returns the last committed member list of the raft group this node * belongs to. Please note that the returned member list can be different - * from the current effective member list, if there is an ongoing + * from the currently effective member list, if there is an ongoing * membership change in the group. */ Collection getCommittedMembers(); + /** + * Returns the currently effective member list of the raft group this node + * belongs to. Please note that the returned member list can be different + * from the committed member list, if there is an ongoing + * membership change in the group. + */ + Collection getAppliedMembers(); + /** * Returns true if this node is {@link RaftNodeStatus#TERMINATED} or * {@link RaftNodeStatus#STEPPED_DOWN}, false otherwise. diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java index a5345e4cf164..f2f6a64cfdf3 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java @@ -171,6 +171,11 @@ public Collection getCommittedMembers() { return state.committedGroupMembers().members(); } + @Override + public Collection getAppliedMembers() { + return state.lastGroupMembers().members(); + } + @Override public void forceSetTerminatedStatus() { execute(new Runnable() { diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java index 6468669ded25..e64238b650ce 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java @@ -38,7 +38,8 @@ public enum RaftNodeStatus { /** * When a node is removed from the cluster after a membership change is - * committed, its status becomes {@code STEPPED_DOWN}. + * committed, or a new Raft node could not be added to the Raft group, + * its status becomes {@code STEPPED_DOWN}. */ STEPPED_DOWN, diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java index 85083b8ab801..5130c9bcb92f 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java @@ -16,6 +16,8 @@ package com.hazelcast.cp.internal.raft.impl.handler; +import com.hazelcast.core.Endpoint; +import com.hazelcast.cp.internal.raft.MembershipChangeMode; import com.hazelcast.cp.internal.raft.command.DestroyRaftGroupCmd; import com.hazelcast.cp.internal.raft.command.RaftGroupCmd; import com.hazelcast.cp.internal.raft.impl.RaftNodeImpl; @@ -26,6 +28,7 @@ import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse; import com.hazelcast.cp.internal.raft.impl.log.LogEntry; import com.hazelcast.cp.internal.raft.impl.log.RaftLog; +import com.hazelcast.cp.internal.raft.impl.state.RaftGroupMembers; import com.hazelcast.cp.internal.raft.impl.state.RaftState; import com.hazelcast.cp.internal.raft.impl.task.RaftNodeStatusAwareTask; @@ -159,7 +162,9 @@ protected void innerRun() { } raftNode.invalidateFuturesFrom(reqEntry.index()); - revertRaftGroupCmd(truncatedEntries); + if (revertPreAppliedRaftGroupCmd(truncatedEntries)) { + return; + } newEntries = Arrays.copyOfRange(req.entries(), i, req.entryCount()); break; @@ -244,7 +249,7 @@ private void preApplyRaftGroupCmd(LogEntry[] entries, long commitIndex) { } } - private void revertRaftGroupCmd(List entries) { + private boolean revertPreAppliedRaftGroupCmd(List entries) { // I am reverting appended-but-uncommitted entries and there can be at most 1 uncommitted Raft command... List commandEntries = new ArrayList(); for (LogEntry entry : entries) { @@ -258,13 +263,32 @@ private void revertRaftGroupCmd(List entries) { for (LogEntry entry : entries) { if (entry.operation() instanceof DestroyRaftGroupCmd) { raftNode.setStatus(RaftNodeStatus.ACTIVE); - return; + return false; } else if (entry.operation() instanceof UpdateRaftGroupMembersCmd) { - raftNode.setStatus(RaftNodeStatus.ACTIVE); - raftNode.resetGroupMembers(); - return; + UpdateRaftGroupMembersCmd cmd = (UpdateRaftGroupMembersCmd) entry.operation(); + Endpoint localMember = raftNode.getLocalMember(); + if (cmd.getMode() == MembershipChangeMode.ADD && cmd.getMember().equals(localMember)) { + // The Raft Dissertation, Chapter 4.1 + // Unfortunately, this decision does imply that a log entry for a configuration change + // can be removed (if leadership changes); in this case, a server must be prepared to + // fall back to the previous configuration in its log. + RaftGroupMembers lastGroupMembers = raftNode.state().lastGroupMembers(); + RaftGroupMembers committedGroupMembers = raftNode.state().committedGroupMembers(); + assert lastGroupMembers.isKnownMember(localMember) && !committedGroupMembers.isKnownMember(localMember) + : "Applied Group Members: " + lastGroupMembers + " Committed Group Members: " + committedGroupMembers + + " Reverted: " + cmd; + logger.warning("Local Endpoint: " + localMember + " could not join to the group :( Maybe another time..."); + raftNode.setStatus(RaftNodeStatus.STEPPED_DOWN); + return true; + } else { + raftNode.setStatus(RaftNodeStatus.ACTIVE); + raftNode.resetGroupMembers(); + return false; + } } } + + return false; } private AppendFailureResponse createFailureResponse(int term) { diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java index 28d659fcd20c..0c74d9620e29 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java @@ -69,7 +69,7 @@ public void setRaftNode(RaftNode raftNode) { @Override public Object run(CPGroupId groupId, long commitIndex) { checkState(raftNode != null, "RaftNode is not injected in " + groupId); - Collection members = raftNode.getCommittedMembers(); + Collection members = raftNode.getAppliedMembers(); checkState(members.contains(cpMember), cpMember + " is not in the current committed member list: " + members + " of " + groupId); return new ArrayList(raftNode.getInitialMembers()); diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java index 2a48b2c269af..3dc8e5ebe3c3 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java @@ -21,7 +21,7 @@ import com.hazelcast.core.ICompletableFuture; import com.hazelcast.cp.CPGroup; import com.hazelcast.cp.CPGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.TermChangeAwareService; import com.hazelcast.cp.internal.raft.SnapshotAwareService; @@ -84,7 +84,7 @@ */ @SuppressWarnings({"checkstyle:methodcount"}) public class RaftSessionService implements ManagedService, SnapshotAwareService, SessionAccessor, - TermChangeAwareService, RaftGroupLifecycleAwareService, CPSessionManagementService { + TermChangeAwareService, RaftNodeLifecycleAwareService, CPSessionManagementService { public static final String SERVICE_NAME = "hz:core:raftSession"; @@ -150,10 +150,14 @@ public void onNewTermCommit(CPGroupId groupId, long commitIndex) { } @Override - public void onGroupDestroy(CPGroupId groupId) { + public void onRaftGroupDestroyed(CPGroupId groupId) { registries.remove(groupId); } + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + } + @Override public ICompletableFuture> getAllSessions(String groupName) { checkTrue(!METADATA_CP_GROUP_NAME.equals(groupName), "Cannot query CP sessions on the METADATA CP group!"); diff --git a/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java b/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java index 4c4867c6b757..7cf3a81e8053 100644 --- a/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java @@ -898,7 +898,6 @@ public void run() { List activeMembers = new ArrayList(service.getMetadataGroupManager().getActiveMembers()); assertEquals(cpMembers, activeMembers); } - } }); } @@ -969,4 +968,69 @@ public void run() { }); } + @Test + public void when_crashedMemberIsReplacedByAnotherAvailableCPMember_then_membershipChangeSucceeds() throws InterruptedException, ExecutionException { + final int cpMemberCount = 3; + final HazelcastInstance[] instances = newInstances(cpMemberCount); + waitUntilCPDiscoveryCompleted(instances); + + final HazelcastInstance instance4 = factory.newHazelcastInstance(createConfig(cpMemberCount, cpMemberCount)); + instance4.getCPSubsystem().getCPSubsystemManagementService().promoteToCPMember().get(); + + CPMember cpMember3 = instances[2].getCPSubsystem().getLocalCPMember(); + instances[2].getLifecycleService().terminate(); + instances[0].getCPSubsystem().getCPSubsystemManagementService().removeCPMember(cpMember3.getUuid()); + + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + CPGroup metadataGroup = instances[0].getCPSubsystem() + .getCPSubsystemManagementService() + .getCPGroup(CPGroup.METADATA_CP_GROUP_NAME) + .get(); + assertTrue(metadataGroup.members().contains(instance4.getCPSubsystem().getLocalCPMember())); + assertEquals(cpMemberCount, metadataGroup.members().size()); + } + }); + } + + @Test + public void when_crashedMemberIsRemovedAndThenNewCPMemberIsPromoted_then_membershipChangeSucceeds() + throws ExecutionException, InterruptedException { + final int cpMemberCount = 3; + final HazelcastInstance[] instances = newInstances(cpMemberCount); + waitUntilCPDiscoveryCompleted(instances); + + final CPMember cpMember3 = instances[2].getCPSubsystem().getLocalCPMember(); + instances[2].getLifecycleService().terminate(); + instances[0].getCPSubsystem().getCPSubsystemManagementService().removeCPMember(cpMember3.getUuid()); + + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + CPGroup metadataGroup = instances[0].getCPSubsystem() + .getCPSubsystemManagementService() + .getCPGroup(CPGroup.METADATA_CP_GROUP_NAME) + .get(); + assertEquals(cpMemberCount - 1, metadataGroup.members().size()); + assertFalse(metadataGroup.members().contains(cpMember3)); + } + }); + + final HazelcastInstance instance4 = factory.newHazelcastInstance(createConfig(cpMemberCount, cpMemberCount)); + instance4.getCPSubsystem().getCPSubsystemManagementService().promoteToCPMember().get(); + + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + CPGroup metadataGroup = instances[0].getCPSubsystem() + .getCPSubsystemManagementService() + .getCPGroup(CPGroup.METADATA_CP_GROUP_NAME) + .get(); + assertTrue(metadataGroup.members().contains(instance4.getCPSubsystem().getLocalCPMember())); + assertEquals(cpMemberCount, metadataGroup.members().size()); + } + }); + } + }