From 5c6d2e18bdc2daec12df4cefad9c12bab3ea4b30 Mon Sep 17 00:00:00 2001 From: Ensar Basri Kahveci Date: Wed, 13 Mar 2019 13:14:59 +0300 Subject: [PATCH 1/2] Fix a membership change bug in the Raft core When a new member is added to a CP group, it can initialize its local RaftNode after the new group member list is appended on other nodes. It does not have to wait until the new member list is committed. Dually, if the new member list is not committed and reverted, this RaftNode must step down. --- .../internal/NodeEngineRaftIntegration.java | 12 +++++-- ...ava => RaftNodeLifecycleAwareService.java} | 15 +++++--- .../hazelcast/cp/internal/RaftService.java | 21 +++++++++-- .../atomiclong/RaftAtomicLongService.java | 10 ++++-- .../atomicref/RaftAtomicRefService.java | 10 ++++-- .../spi/blocking/AbstractBlockingService.java | 10 ++++-- .../cp/internal/raft/impl/RaftNode.java | 10 +++++- .../cp/internal/raft/impl/RaftNodeImpl.java | 5 +++ .../cp/internal/raft/impl/RaftNodeStatus.java | 3 +- .../handler/AppendRequestHandlerTask.java | 36 +++++++++++++++---- ...aftGroupMembersIfCurrentGroupMemberOp.java | 2 +- .../internal/session/RaftSessionService.java | 10 ++++-- .../cp/internal/CPMemberAddRemoveTest.java | 27 +++++++++++++- 13 files changed, 139 insertions(+), 32 deletions(-) rename hazelcast/src/main/java/com/hazelcast/cp/internal/{RaftGroupLifecycleAwareService.java => RaftNodeLifecycleAwareService.java} (71%) diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java index 7262e61e9c26..8c7efa282e31 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/NodeEngineRaftIntegration.java @@ -58,6 +58,7 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.STEPPED_DOWN; import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.TERMINATED; import static com.hazelcast.spi.ExecutionService.ASYNC_EXECUTOR; @@ -234,9 +235,14 @@ private boolean send(AsyncRaftOp operation, Endpoint target) { @Override public void onNodeStatusChange(RaftNodeStatus status) { if (status == TERMINATED) { - Collection services = nodeEngine.getServices(RaftGroupLifecycleAwareService.class); - for (RaftGroupLifecycleAwareService service : services) { - service.onGroupDestroy(groupId); + Collection services = nodeEngine.getServices(RaftNodeLifecycleAwareService.class); + for (RaftNodeLifecycleAwareService service : services) { + service.onRaftGroupDestroyed(groupId); + } + } else if (status == STEPPED_DOWN) { + Collection services = nodeEngine.getServices(RaftNodeLifecycleAwareService.class); + for (RaftNodeLifecycleAwareService service : services) { + service.onRaftNodeSteppedDown(groupId); } } } diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupLifecycleAwareService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftNodeLifecycleAwareService.java similarity index 71% rename from hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupLifecycleAwareService.java rename to hazelcast/src/main/java/com/hazelcast/cp/internal/RaftNodeLifecycleAwareService.java index 96e21735f2b2..e6d1cfafdb66 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupLifecycleAwareService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftNodeLifecycleAwareService.java @@ -19,13 +19,20 @@ import com.hazelcast.cp.CPGroupId; /** - * Contains methods that are invoked on life cycle changes of a Raft group + * Contains methods that are invoked on life cycle changes of a Raft node */ -public interface RaftGroupLifecycleAwareService { +public interface RaftNodeLifecycleAwareService { /** - * Called on the thread of the Raft group when the given Raft group is + * Called on the thread of the Raft group when the given Raft node is * destroyed, either gracefully or forcefully. */ - void onGroupDestroy(CPGroupId groupId); + void onRaftGroupDestroyed(CPGroupId groupId); + + /** + * Called on the thread of the Raft group when the given Raft node is + * stepped down, either because it is shutting down, or it could not be + * added to the Raft group + */ + void onRaftNodeSteppedDown(CPGroupId groupId); } diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java index 32fc3b10abf3..38f3ebf4d48a 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java @@ -109,7 +109,8 @@ */ @SuppressWarnings({"checkstyle:methodcount", "checkstyle:classfanoutcomplexity", "checkstyle:classdataabstractioncoupling"}) public class RaftService implements ManagedService, SnapshotAwareService, GracefulShutdownAwareService, - MembershipAwareService, CPSubsystemManagementService, PreJoinAwareService { + MembershipAwareService, CPSubsystemManagementService, PreJoinAwareService, + RaftNodeLifecycleAwareService { public static final String SERVICE_NAME = "hz:core:raft"; @@ -632,8 +633,12 @@ void createRaftNode(CPGroupId groupId, Collection members, CPMembe logger.warning("Not creating RaftNode[" + groupId + "] since the CP group is already destroyed"); return; } else if (steppedDownGroupIds.contains(groupId)) { - logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already stepped down"); - return; + if (!nodeEngine.isRunning()) { + logger.fine("Not creating RaftNode[" + groupId + "] since the local CP member is already stepped down"); + return; + } + + steppedDownGroupIds.remove(groupId); } RaftIntegration integration = new NodeEngineRaftIntegration(nodeEngine, groupId, localCPMember); @@ -840,6 +845,16 @@ public void handleActiveCPMembers(RaftGroupId latestMetadataGroupId, long member metadataGroupManager.handleMetadataGroupId(latestMetadataGroupId); } + @Override + public void onRaftGroupDestroyed(CPGroupId groupId) { + destroyRaftNode(groupId); + } + + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + stepDownRaftNode(groupId); + } + private class InitializeRaftNodeTask implements Runnable { private final CPGroupId groupId; diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java index aa470a9ba18e..bb7c83488b10 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomiclong/RaftAtomicLongService.java @@ -19,7 +19,7 @@ import com.hazelcast.core.IAtomicLong; import com.hazelcast.cp.CPGroupId; import com.hazelcast.cp.internal.RaftGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.datastructures.atomiclong.proxy.RaftAtomicLongProxy; import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService; @@ -47,7 +47,7 @@ * Contains Raft-based atomic long instances, implements snapshotting, * and creates proxies */ -public class RaftAtomicLongService implements RaftManagedService, RaftRemoteService, RaftGroupLifecycleAwareService, +public class RaftAtomicLongService implements RaftManagedService, RaftRemoteService, RaftNodeLifecycleAwareService, SnapshotAwareService { /** @@ -128,7 +128,7 @@ public boolean destroyRaftObject(CPGroupId groupId, String name) { } @Override - public void onGroupDestroy(CPGroupId groupId) { + public void onRaftGroupDestroyed(CPGroupId groupId) { Iterator> iter = atomicLongs.keySet().iterator(); while (iter.hasNext()) { Tuple2 next = iter.next(); @@ -139,6 +139,10 @@ public void onGroupDestroy(CPGroupId groupId) { } } + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + } + public RaftAtomicLong getAtomicLong(CPGroupId groupId, String name) { checkNotNull(groupId); checkNotNull(name); diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java index 6fe9e03a8bea..ac8347a27ad3 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/atomicref/RaftAtomicRefService.java @@ -19,7 +19,7 @@ import com.hazelcast.core.IAtomicReference; import com.hazelcast.cp.CPGroupId; import com.hazelcast.cp.internal.RaftGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.datastructures.atomicref.proxy.RaftAtomicRefProxy; import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService; @@ -48,7 +48,7 @@ * Contains Raft-based atomic reference instances, implements snapshotting, * and creates proxies */ -public class RaftAtomicRefService implements RaftManagedService, RaftRemoteService, RaftGroupLifecycleAwareService, +public class RaftAtomicRefService implements RaftManagedService, RaftRemoteService, RaftNodeLifecycleAwareService, SnapshotAwareService { /** @@ -121,7 +121,7 @@ public void restoreSnapshot(CPGroupId groupId, long commitIndex, RaftAtomicRefSn } @Override - public void onGroupDestroy(CPGroupId groupId) { + public void onRaftGroupDestroyed(CPGroupId groupId) { Iterator> iter = atomicRefs.keySet().iterator(); while (iter.hasNext()) { Tuple2 next = iter.next(); @@ -132,6 +132,10 @@ public void onGroupDestroy(CPGroupId groupId) { } } + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + } + @Override public boolean destroyRaftObject(CPGroupId groupId, String name) { Tuple2 key = Tuple2.of(groupId, name); diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java index e129c0418eb3..d9935a881740 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/datastructures/spi/blocking/AbstractBlockingService.java @@ -17,7 +17,7 @@ package com.hazelcast.cp.internal.datastructures.spi.blocking; import com.hazelcast.cp.CPGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.datastructures.spi.RaftManagedService; import com.hazelcast.cp.internal.datastructures.spi.RaftRemoteService; @@ -65,7 +65,7 @@ * @param concrete ty;e lf the resource registry */ public abstract class AbstractBlockingService, RR extends ResourceRegistry> - implements RaftManagedService, RaftGroupLifecycleAwareService, RaftRemoteService, SessionAwareService, + implements RaftManagedService, RaftNodeLifecycleAwareService, RaftRemoteService, SessionAwareService, SnapshotAwareService, LiveOperationsTracker { public static final long WAIT_TIMEOUT_TASK_UPPER_BOUND_MILLIS = 1500; @@ -212,7 +212,7 @@ public final Collection getAttachedSessions(CPGroupId groupId) { } @Override - public final void onGroupDestroy(CPGroupId groupId) { + public final void onRaftGroupDestroyed(CPGroupId groupId) { ResourceRegistry registry = registries.get(groupId); if (registry != null) { Collection indices = registry.destroy(); @@ -220,6 +220,10 @@ public final void onGroupDestroy(CPGroupId groupId) { } } + @Override + public final void onRaftNodeSteppedDown(CPGroupId groupId) { + } + @Override public final void populate(LiveOperations liveOperations) { for (RR registry : registries.values()) { diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java index d49e35490f8b..45d647d22554 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNode.java @@ -70,11 +70,19 @@ public interface RaftNode { /** * Returns the last committed member list of the raft group this node * belongs to. Please note that the returned member list can be different - * from the current effective member list, if there is an ongoing + * from the currently effective member list, if there is an ongoing * membership change in the group. */ Collection getCommittedMembers(); + /** + * Returns the currently effective member list of the raft group this node + * belongs to. Please note that the returned member list can be different + * from the committed member list, if there is an ongoing + * membership change in the group. + */ + Collection getAppliedMembers(); + /** * Returns true if this node is {@link RaftNodeStatus#TERMINATED} or * {@link RaftNodeStatus#STEPPED_DOWN}, false otherwise. diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java index a5345e4cf164..f2f6a64cfdf3 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java @@ -171,6 +171,11 @@ public Collection getCommittedMembers() { return state.committedGroupMembers().members(); } + @Override + public Collection getAppliedMembers() { + return state.lastGroupMembers().members(); + } + @Override public void forceSetTerminatedStatus() { execute(new Runnable() { diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java index 6468669ded25..e64238b650ce 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeStatus.java @@ -38,7 +38,8 @@ public enum RaftNodeStatus { /** * When a node is removed from the cluster after a membership change is - * committed, its status becomes {@code STEPPED_DOWN}. + * committed, or a new Raft node could not be added to the Raft group, + * its status becomes {@code STEPPED_DOWN}. */ STEPPED_DOWN, diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java index 85083b8ab801..5130c9bcb92f 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/handler/AppendRequestHandlerTask.java @@ -16,6 +16,8 @@ package com.hazelcast.cp.internal.raft.impl.handler; +import com.hazelcast.core.Endpoint; +import com.hazelcast.cp.internal.raft.MembershipChangeMode; import com.hazelcast.cp.internal.raft.command.DestroyRaftGroupCmd; import com.hazelcast.cp.internal.raft.command.RaftGroupCmd; import com.hazelcast.cp.internal.raft.impl.RaftNodeImpl; @@ -26,6 +28,7 @@ import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse; import com.hazelcast.cp.internal.raft.impl.log.LogEntry; import com.hazelcast.cp.internal.raft.impl.log.RaftLog; +import com.hazelcast.cp.internal.raft.impl.state.RaftGroupMembers; import com.hazelcast.cp.internal.raft.impl.state.RaftState; import com.hazelcast.cp.internal.raft.impl.task.RaftNodeStatusAwareTask; @@ -159,7 +162,9 @@ protected void innerRun() { } raftNode.invalidateFuturesFrom(reqEntry.index()); - revertRaftGroupCmd(truncatedEntries); + if (revertPreAppliedRaftGroupCmd(truncatedEntries)) { + return; + } newEntries = Arrays.copyOfRange(req.entries(), i, req.entryCount()); break; @@ -244,7 +249,7 @@ private void preApplyRaftGroupCmd(LogEntry[] entries, long commitIndex) { } } - private void revertRaftGroupCmd(List entries) { + private boolean revertPreAppliedRaftGroupCmd(List entries) { // I am reverting appended-but-uncommitted entries and there can be at most 1 uncommitted Raft command... List commandEntries = new ArrayList(); for (LogEntry entry : entries) { @@ -258,13 +263,32 @@ private void revertRaftGroupCmd(List entries) { for (LogEntry entry : entries) { if (entry.operation() instanceof DestroyRaftGroupCmd) { raftNode.setStatus(RaftNodeStatus.ACTIVE); - return; + return false; } else if (entry.operation() instanceof UpdateRaftGroupMembersCmd) { - raftNode.setStatus(RaftNodeStatus.ACTIVE); - raftNode.resetGroupMembers(); - return; + UpdateRaftGroupMembersCmd cmd = (UpdateRaftGroupMembersCmd) entry.operation(); + Endpoint localMember = raftNode.getLocalMember(); + if (cmd.getMode() == MembershipChangeMode.ADD && cmd.getMember().equals(localMember)) { + // The Raft Dissertation, Chapter 4.1 + // Unfortunately, this decision does imply that a log entry for a configuration change + // can be removed (if leadership changes); in this case, a server must be prepared to + // fall back to the previous configuration in its log. + RaftGroupMembers lastGroupMembers = raftNode.state().lastGroupMembers(); + RaftGroupMembers committedGroupMembers = raftNode.state().committedGroupMembers(); + assert lastGroupMembers.isKnownMember(localMember) && !committedGroupMembers.isKnownMember(localMember) + : "Applied Group Members: " + lastGroupMembers + " Committed Group Members: " + committedGroupMembers + + " Reverted: " + cmd; + logger.warning("Local Endpoint: " + localMember + " could not join to the group :( Maybe another time..."); + raftNode.setStatus(RaftNodeStatus.STEPPED_DOWN); + return true; + } else { + raftNode.setStatus(RaftNodeStatus.ACTIVE); + raftNode.resetGroupMembers(); + return false; + } } } + + return false; } private AppendFailureResponse createFailureResponse(int term) { diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java index 28d659fcd20c..0c74d9620e29 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raftop/GetInitialRaftGroupMembersIfCurrentGroupMemberOp.java @@ -69,7 +69,7 @@ public void setRaftNode(RaftNode raftNode) { @Override public Object run(CPGroupId groupId, long commitIndex) { checkState(raftNode != null, "RaftNode is not injected in " + groupId); - Collection members = raftNode.getCommittedMembers(); + Collection members = raftNode.getAppliedMembers(); checkState(members.contains(cpMember), cpMember + " is not in the current committed member list: " + members + " of " + groupId); return new ArrayList(raftNode.getInitialMembers()); diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java index 2a48b2c269af..3dc8e5ebe3c3 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/session/RaftSessionService.java @@ -21,7 +21,7 @@ import com.hazelcast.core.ICompletableFuture; import com.hazelcast.cp.CPGroup; import com.hazelcast.cp.CPGroupId; -import com.hazelcast.cp.internal.RaftGroupLifecycleAwareService; +import com.hazelcast.cp.internal.RaftNodeLifecycleAwareService; import com.hazelcast.cp.internal.RaftService; import com.hazelcast.cp.internal.TermChangeAwareService; import com.hazelcast.cp.internal.raft.SnapshotAwareService; @@ -84,7 +84,7 @@ */ @SuppressWarnings({"checkstyle:methodcount"}) public class RaftSessionService implements ManagedService, SnapshotAwareService, SessionAccessor, - TermChangeAwareService, RaftGroupLifecycleAwareService, CPSessionManagementService { + TermChangeAwareService, RaftNodeLifecycleAwareService, CPSessionManagementService { public static final String SERVICE_NAME = "hz:core:raftSession"; @@ -150,10 +150,14 @@ public void onNewTermCommit(CPGroupId groupId, long commitIndex) { } @Override - public void onGroupDestroy(CPGroupId groupId) { + public void onRaftGroupDestroyed(CPGroupId groupId) { registries.remove(groupId); } + @Override + public void onRaftNodeSteppedDown(CPGroupId groupId) { + } + @Override public ICompletableFuture> getAllSessions(String groupName) { checkTrue(!METADATA_CP_GROUP_NAME.equals(groupName), "Cannot query CP sessions on the METADATA CP group!"); diff --git a/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java b/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java index 4c4867c6b757..3719c5800178 100644 --- a/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java @@ -898,7 +898,6 @@ public void run() { List activeMembers = new ArrayList(service.getMetadataGroupManager().getActiveMembers()); assertEquals(cpMembers, activeMembers); } - } }); } @@ -969,4 +968,30 @@ public void run() { }); } + @Test + public void when_crashedMemberIsReplacedByAnotherAvailableCPMember_then_membershipChangeSucceeds() throws InterruptedException, ExecutionException { + final int cpMemberCount = 3; + final HazelcastInstance[] instances = newInstances(cpMemberCount); + waitUntilCPDiscoveryCompleted(instances); + + final HazelcastInstance instance4 = factory.newHazelcastInstance(createConfig(cpMemberCount, cpMemberCount)); + instance4.getCPSubsystem().getCPSubsystemManagementService().promoteToCPMember().get(); + + CPMember cpMember3 = instances[2].getCPSubsystem().getLocalCPMember(); + instances[2].getLifecycleService().terminate(); + instances[0].getCPSubsystem().getCPSubsystemManagementService().removeCPMember(cpMember3.getUuid()); + + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + CPGroup metadataGroup = instances[0].getCPSubsystem() + .getCPSubsystemManagementService() + .getCPGroup(CPGroup.METADATA_CP_GROUP_NAME) + .get(); + assertTrue(metadataGroup.members().contains(instance4.getCPSubsystem().getLocalCPMember())); + assertEquals(cpMemberCount, metadataGroup.members().size()); + } + }); + } + } From 183c605ccadaa07d91d794bc74c498d6374a97da Mon Sep 17 00:00:00 2001 From: Ensar Basri Kahveci Date: Wed, 13 Mar 2019 17:42:42 +0300 Subject: [PATCH 2/2] Change membership change order in the CP Subsystem When a CP member is being removed from the CP Subsystem, it is removed from all CP groups. For each CP group, if another CP member is going to be added to this group as a substitute of the leaving member, the member addition is applied before the member removal. However, member addition can cause the majority to be increased unnecessarily. There is no need to increase the majority here because there might be crashed members already. To avoid this situation, we first apply the member removal, and then the member addition. --- .../internal/RaftGroupMembershipManager.java | 126 ++++++++++-------- .../cp/internal/CPMemberAddRemoveTest.java | 39 ++++++ 2 files changed, 109 insertions(+), 56 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java index a5ed1b0f2470..e37b71dd858e 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/RaftGroupMembershipManager.java @@ -247,11 +247,11 @@ public void run() { } List changes = schedule.getChanges(); - Map> changedGroups = new ConcurrentHashMap>(); CountDownLatch latch = new CountDownLatch(changes.size()); + Map> changedGroups = new ConcurrentHashMap>(); for (CPGroupMembershipChange change : changes) { - applyMembershipChangeOnRaftGroup(changedGroups, latch, change); + applyOnRaftGroup(latch, changedGroups, change); } try { @@ -269,36 +269,32 @@ private MembershipChangeSchedule getMembershipChangeSchedule() { return f.join(); } - private void applyMembershipChangeOnRaftGroup(final Map> changedGroups, - final CountDownLatch latch, - final CPGroupMembershipChange change) { - final CPGroupId groupId = change.getGroupId(); + private void applyOnRaftGroup(final CountDownLatch latch, final Map> changedGroups, + final CPGroupMembershipChange change) { ICompletableFuture future; - if (change.getMemberToAdd() == null) { - future = new SimpleCompletedFuture(change.getMembersCommitIndex()); + if (change.getMemberToRemove() != null) { + future = invocationManager.changeMembership(change.getGroupId(), change.getMembersCommitIndex(), + change.getMemberToRemove(), MembershipChangeMode.REMOVE); } else { - logger.fine("Adding " + change.getMemberToAdd() + " to " + groupId); - - future = invocationManager.changeMembership(groupId, change.getMembersCommitIndex(), - change.getMemberToAdd(), MembershipChangeMode.ADD); + future = new SimpleCompletedFuture(change.getMembersCommitIndex()); } future.andThen(new ExecutionCallback() { @Override - public void onResponse(Long addCommitIndex) { - if (change.getMemberToRemove() == null) { - changedGroups.put(groupId, Tuple2.of(change.getMembersCommitIndex(), addCommitIndex)); - latch.countDown(); + public void onResponse(Long removeCommitIndex) { + if (change.getMemberToAdd() != null) { + addMember(latch, changedGroups, change, removeCommitIndex); } else { - removeMember(changedGroups, latch, change, addCommitIndex); + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex)); + latch.countDown(); } } @Override public void onFailure(Throwable t) { - long addCommitIndex = getMemberAddCommitIndex(changedGroups, change, t); - if (addCommitIndex != NA_MEMBERS_COMMIT_INDEX) { - onResponse(addCommitIndex); + long removeCommitIndex = checkMemberRemoveCommitIndex(changedGroups, change, t); + if (removeCommitIndex != NA_MEMBERS_COMMIT_INDEX) { + onResponse(removeCommitIndex); } else { latch.countDown(); } @@ -306,32 +302,28 @@ public void onFailure(Throwable t) { }); } - private void removeMember(final Map> changedGroups, - final CountDownLatch latch, - final CPGroupMembershipChange change, - final long currentCommitIndex) { + private void addMember(final CountDownLatch latch, final Map> changedGroups, + final CPGroupMembershipChange change, final long currentCommitIndex) { ICompletableFuture future = invocationManager.changeMembership(change.getGroupId(), currentCommitIndex, - change.getMemberToRemove(), MembershipChangeMode.REMOVE); + change.getMemberToAdd(), MembershipChangeMode.ADD); future.andThen(new ExecutionCallback() { @Override - public void onResponse(Long removeCommitIndex) { - changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex)); + public void onResponse(Long addCommitIndex) { + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), addCommitIndex)); latch.countDown(); } @Override public void onFailure(Throwable t) { - long removeCommitIndex = getMemberRemoveCommitIndex(change, t); - if (removeCommitIndex != NA_MEMBERS_COMMIT_INDEX) { - changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), removeCommitIndex)); - } + checkMemberAddCommitIndex(changedGroups, change, t); latch.countDown(); } }); } - private long getMemberAddCommitIndex(Map> changedGroups, CPGroupMembershipChange change, - Throwable t) { + private void checkMemberAddCommitIndex(Map> changedGroups, CPGroupMembershipChange change, + Throwable t) { + CPMemberInfo memberToAdd = change.getMemberToAdd(); if (t instanceof MismatchingGroupMembersCommitIndexException) { MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t; String msg = "MEMBER ADD commit of " + change + " failed. Actual group members: " + m.getMembers() @@ -339,35 +331,50 @@ private long getMemberAddCommitIndex(Map> changedG // learnt group members must contain the added member and the current members I know - if (!m.getMembers().contains(change.getMemberToAdd())) { + if (!m.getMembers().contains(memberToAdd)) { logger.severe(msg); - return NA_MEMBERS_COMMIT_INDEX; + return; } - for (CPMemberInfo member : change.getMembers()) { - if (!member.equals(change.getMemberToRemove()) && !m.getMembers().contains(member)) { + if (change.getMemberToRemove() != null) { + // I expect the member to be removed to be removed from the group + if (m.getMembers().contains(change.getMemberToRemove())) { logger.severe(msg); - return NA_MEMBERS_COMMIT_INDEX; + return; + } + + // I know the removed member has left the group and the added member has joined. + // So member counts must be same... + if (m.getMembers().size() != change.getMembers().size()) { + logger.severe(msg); + return; } + } else if (m.getMembers().size() != (change.getMembers().size() + 1)) { + // if there is no removed member, I expect number of the learnt number of group members to be 1 greater than + // the current members I know + logger.severe(msg); + return; } - // If the scheduled member-remove is already done, put its result and do not retry it - if (change.getMemberToRemove() != null && !m.getMembers().contains(change.getMemberToRemove())) { - changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex())); - return NA_MEMBERS_COMMIT_INDEX; + for (CPMemberInfo member : change.getMembers()) { + if (!member.equals(change.getMemberToRemove()) && !m.getMembers().contains(member)) { + logger.severe(msg); + return; + } } - return m.getCommitIndex(); + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex())); + return; } - logger.severe("Cannot get MEMBER ADD result of " + change.getMemberToAdd() + " to " + change.getGroupId() + logger.severe("Cannot get MEMBER ADD result of " + memberToAdd + " to " + change.getGroupId() + " with members commit index: " + change.getMembersCommitIndex(), t); - return NA_MEMBERS_COMMIT_INDEX; } - private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwable t) { + @SuppressWarnings("checkstyle:cyclomaticcomplexity") + private long checkMemberRemoveCommitIndex(Map> changedGroups, + CPGroupMembershipChange change, Throwable t) { CPMemberInfo removedMember = change.getMemberToRemove(); - if (t instanceof MismatchingGroupMembersCommitIndexException) { MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t; String msg = "MEMBER REMOVE commit of " + change + " failed. Actual group members: " + m.getMembers() @@ -378,19 +385,26 @@ private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwabl return NA_MEMBERS_COMMIT_INDEX; } - if (change.getMemberToAdd() != null) { - // I expect the added member to be joined to the group - if (!m.getMembers().contains(change.getMemberToAdd())) { - logger.severe(msg); - return NA_MEMBERS_COMMIT_INDEX; - } - - // I know the removed member has left the group and the added member has joined. + // The member to be added can be already joined to the group + if (change.getMemberToAdd() != null && m.getMembers().contains(change.getMemberToAdd())) { + // I know the removed member has left the group and the member to be added has already joined to the group. // So member sizes must be same... if (m.getMembers().size() != change.getMembers().size()) { logger.severe(msg); return NA_MEMBERS_COMMIT_INDEX; } + + for (CPMemberInfo member : change.getMembers()) { + // Other group members except the removed one must be still present... + if (!member.equals(removedMember) && !m.getMembers().contains(member)) { + logger.severe(msg); + return NA_MEMBERS_COMMIT_INDEX; + } + } + + // both member-remove and member-add are done. + changedGroups.put(change.getGroupId(), Tuple2.of(change.getMembersCommitIndex(), m.getCommitIndex())); + return NA_MEMBERS_COMMIT_INDEX; } else if (m.getMembers().size() != (change.getMembers().size() - 1)) { // if there is no added member, I expect number of the learnt group members to be 1 less than // the current members I know @@ -399,7 +413,7 @@ private long getMemberRemoveCommitIndex(CPGroupMembershipChange change, Throwabl } for (CPMemberInfo member : change.getMembers()) { - // Other group members except the removed one and added one must be still present... + // Other group members except the removed one must be still present... if (!member.equals(removedMember) && !m.getMembers().contains(member)) { logger.severe(msg); return NA_MEMBERS_COMMIT_INDEX; diff --git a/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java b/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java index 3719c5800178..7cf3a81e8053 100644 --- a/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cp/internal/CPMemberAddRemoveTest.java @@ -994,4 +994,43 @@ public void run() throws Exception { }); } + @Test + public void when_crashedMemberIsRemovedAndThenNewCPMemberIsPromoted_then_membershipChangeSucceeds() + throws ExecutionException, InterruptedException { + final int cpMemberCount = 3; + final HazelcastInstance[] instances = newInstances(cpMemberCount); + waitUntilCPDiscoveryCompleted(instances); + + final CPMember cpMember3 = instances[2].getCPSubsystem().getLocalCPMember(); + instances[2].getLifecycleService().terminate(); + instances[0].getCPSubsystem().getCPSubsystemManagementService().removeCPMember(cpMember3.getUuid()); + + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + CPGroup metadataGroup = instances[0].getCPSubsystem() + .getCPSubsystemManagementService() + .getCPGroup(CPGroup.METADATA_CP_GROUP_NAME) + .get(); + assertEquals(cpMemberCount - 1, metadataGroup.members().size()); + assertFalse(metadataGroup.members().contains(cpMember3)); + } + }); + + final HazelcastInstance instance4 = factory.newHazelcastInstance(createConfig(cpMemberCount, cpMemberCount)); + instance4.getCPSubsystem().getCPSubsystemManagementService().promoteToCPMember().get(); + + assertTrueEventually(new AssertTask() { + @Override + public void run() throws Exception { + CPGroup metadataGroup = instances[0].getCPSubsystem() + .getCPSubsystemManagementService() + .getCPGroup(CPGroup.METADATA_CP_GROUP_NAME) + .get(); + assertTrue(metadataGroup.members().contains(instance4.getCPSubsystem().getLocalCPMember())); + assertEquals(cpMemberCount, metadataGroup.members().size()); + } + }); + } + }