Skip to content

Commit

Permalink
Fix membership change bugs in the Raft core
Browse files Browse the repository at this point in the history
* When the last appended entry of a follower is a membership change,
it must update its status to ACTIVE once it installs a snapshot because
snapshots always contain a committed member list

* Multiple membership changes can be committed before a slow follower
appends & commits them. When a slow follower appends these changes,
it needs to commit them one by one.

* Handle invocation exceptions properly on the Raft service layer
  • Loading branch information
metanet committed Feb 6, 2019
1 parent 9a0d299 commit 55bd69c
Show file tree
Hide file tree
Showing 13 changed files with 280 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -765,12 +765,11 @@ public boolean addActiveMember(long commitIndex, CPMemberInfo member) {

List<CPGroupMembershipChangeContext> changes = getGroupMembershipChangesForNewMember(member);
if (changes.size() > 0) {
membershipChangeContext = MembershipChangeContext.memberAdded(singletonList(commitIndex), member, changes);
if (logger.isFineEnabled()) {
logger.fine("CP group rebalancing is triggered for " + changes);
logger.fine("CP group rebalancing is triggered for " + member + ", changes: " + membershipChangeContext);
}

membershipChangeContext = MembershipChangeContext.memberAdded(singletonList(commitIndex), member, changes);

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,9 @@ public void run() {
return;
}

logger.fine("Handling " + membershipChangeContext);
if (logger.isFineEnabled()) {
logger.fine("Handling " + membershipChangeContext);
}

List<CPGroupMembershipChangeContext> changes = membershipChangeContext.getChanges();
Map<CPGroupId, Tuple2<Long, Long>> changedGroups = new ConcurrentHashMap<CPGroupId, Tuple2<Long, Long>>();
Expand Down Expand Up @@ -340,9 +342,8 @@ private ICompletableFuture<Long> newCompletedFuture(long idx) {
}

private long getMemberAddCommitIndex(CPGroupMembershipChangeContext ctx, Throwable t) {
if (t.getCause() instanceof MismatchingGroupMembersCommitIndexException) {
MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t.getCause();

if (t instanceof MismatchingGroupMembersCommitIndexException) {
MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t;
String msg = "MEMBER ADD commit of " + ctx.getMemberToAdd() + " to " + ctx.getGroupId()
+ " with members commit index: " + ctx.getMembersCommitIndex() + " failed. Actual group members: "
+ m.getMembers() + " with commit index: " + m.getCommitIndex();
Expand Down Expand Up @@ -377,9 +378,8 @@ private long getMemberAddCommitIndex(CPGroupMembershipChangeContext ctx, Throwab
private long getMemberRemoveCommitIndex(CPGroupMembershipChangeContext ctx, Throwable t) {
CPMemberInfo removedMember = ctx.getMemberToRemove();

if (t.getCause() instanceof MismatchingGroupMembersCommitIndexException) {
MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t.getCause();

if (t instanceof MismatchingGroupMembersCommitIndexException) {
MismatchingGroupMembersCommitIndexException m = (MismatchingGroupMembersCommitIndexException) t;
String msg = "MEMBER REMOVE commit of " + removedMember + " to " + ctx.getGroupId()
+ " failed. Actual group members: " + m.getMembers() + " with commit index: " + m.getCommitIndex();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;

import static com.hazelcast.cp.internal.raft.QueryPolicy.LEADER_LOCAL;
Expand Down Expand Up @@ -128,7 +129,7 @@ public void onResponse(List<CPMemberInfo> members) {

@Override
public void onFailure(Throwable t) {
resultFuture.setResult(t);
resultFuture.setResult(new ExecutionException(t));
}
});
}
Expand All @@ -146,7 +147,7 @@ public void onResponse(RaftGroupId groupId) {

@Override
public void onFailure(Throwable t) {
if (t.getCause() instanceof CannotCreateRaftGroupException) {
if (t instanceof CannotCreateRaftGroupException) {
logger.fine("Could not create CP group: " + groupName + " with members: " + members,
t.getCause());
invokeGetMembersToCreateRaftGroup(groupName, groupSize, resultFuture);
Expand Down
14 changes: 2 additions & 12 deletions hazelcast/src/main/java/com/hazelcast/cp/internal/RaftService.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
Expand All @@ -97,7 +96,6 @@
import static com.hazelcast.internal.config.ConfigValidator.checkCPSubsystemConfig;
import static com.hazelcast.spi.ExecutionService.ASYNC_EXECUTOR;
import static com.hazelcast.spi.ExecutionService.SYSTEM_EXECUTOR;
import static com.hazelcast.util.ExceptionUtil.peel;
import static com.hazelcast.util.Preconditions.checkFalse;
import static com.hazelcast.util.Preconditions.checkTrue;
import static java.util.Collections.newSetFromMap;
Expand Down Expand Up @@ -722,13 +720,6 @@ private ICompletableFuture<Void> invokeTriggerRemoveMember(CPMemberInfo member)
return invocationManager.invoke(getMetadataGroupId(), new TriggerRemoveCPMemberOp(member));
}

private boolean isRemoved(CPMemberInfo member) {
RaftOp op = new GetActiveCPMembersOp();
InternalCompletableFuture<List<CPMemberInfo>> f = invocationManager.query(getMetadataGroupId(), op, LEADER_LOCAL);
List<CPMemberInfo> members = f.join();
return !members.contains(member);
}

public static String withoutDefaultGroupName(String name) {
name = name.trim();
int i = name.indexOf("@");
Expand Down Expand Up @@ -809,9 +800,8 @@ public void onResponse(CPGroupInfo group) {

@Override
public void onFailure(Throwable t) {
RuntimeException cause = peel(t);
if (cause instanceof CPGroupDestroyedException) {
CPGroupId destroyedGroupId = ((CPGroupDestroyedException) cause).getGroupId();
if (t instanceof CPGroupDestroyedException) {
CPGroupId destroyedGroupId = ((CPGroupDestroyedException) t).getGroupId();
destroyedGroupIds.add(destroyedGroupId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import com.hazelcast.util.RandomPicker;
import com.hazelcast.util.collection.Long2ObjectHashMap;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
Expand All @@ -70,7 +69,7 @@
import java.util.concurrent.TimeUnit;

import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.ACTIVE;
import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.CHANGING_MEMBERSHIP;
import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.UPDATING_GROUP_MEMBER_LIST;
import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.STEPPED_DOWN;
import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.TERMINATED;
import static com.hazelcast.cp.internal.raft.impl.RaftNodeStatus.TERMINATING;
Expand Down Expand Up @@ -342,7 +341,7 @@ public boolean canReplicateNewEntry(Object operation) {

if (status == TERMINATING) {
return false;
} else if (status == CHANGING_MEMBERSHIP) {
} else if (status == UPDATING_GROUP_MEMBER_LIST) {
return !(operation instanceof RaftGroupCmd);
}

Expand Down Expand Up @@ -493,11 +492,6 @@ public void sendAppendRequest(Endpoint follower) {

assert prevEntry != null : "Follower: " + follower + ", next index: " + nextIndex;

if (prevEntry.index() < state.commitIndex()) {
// send at most one ApplyRaftGroupMembersOp in single batch
entries = trimEntriesIfContainsMultipleMembershipChanges(entries);
}

AppendRequest appendRequest = new AppendRequest(getLocalMember(), state.term(), prevEntry.term(), prevEntry.index(),
state.commitIndex(), entries);

Expand All @@ -513,32 +507,6 @@ public void sendAppendRequest(Endpoint follower) {
send(appendRequest, follower);
}

/**
* If log entries contains multiple membership change entries, then splits entries to send only a single
* membership change in single append-entries request.
*/
private LogEntry[] trimEntriesIfContainsMultipleMembershipChanges(LogEntry[] entries) {
int trim = entries.length;
boolean found = false;
for (int i = 0; i < entries.length; i++) {
LogEntry entry = entries[i];
if (entry.operation() instanceof ApplyRaftGroupMembersCmd) {
if (found) {
trim = i;
break;
} else {
found = true;
}
}
}

if (trim < entries.length) {
logger.fine("Trimming append entries up to index of the second ApplyRaftGroupMembersOp: " + trim);
return Arrays.copyOf(entries, trim);
}
return entries;
}

/**
* Applies committed log entries between {@code lastApplied} and {@code commitIndex}, if there's any available.
* If new entries are applied, {@link RaftState}'s {@code lastApplied} field is updated.
Expand Down Expand Up @@ -592,10 +560,17 @@ private void applyLogEntry(LogEntry entry) {
Object operation = entry.operation();
if (operation instanceof RaftGroupCmd) {
if (operation instanceof DestroyRaftGroupCmd) {
assert status == TERMINATING;
setStatus(TERMINATED);
} else if (operation instanceof ApplyRaftGroupMembersCmd) {
assert status == CHANGING_MEMBERSHIP : "STATUS: " + status;
if (state.lastGroupMembers().index() < entry.index()) {
setStatus(UPDATING_GROUP_MEMBER_LIST);
ApplyRaftGroupMembersCmd op = (ApplyRaftGroupMembersCmd) operation;
updateGroupMembers(entry.index(), op.getMembers());
}

assert status == UPDATING_GROUP_MEMBER_LIST : "STATUS: " + status;
assert state.lastGroupMembers().index() == entry.index();

state.commitGroupMembers();
ApplyRaftGroupMembersCmd cmd = (ApplyRaftGroupMembersCmd) operation;
if (cmd.getMember().equals(localMember) && cmd.getChangeType() == MembershipChangeType.REMOVE) {
Expand Down Expand Up @@ -692,7 +667,9 @@ public void invalidateFuturesFrom(long entryIndex) {
}
}

logger.warning("Invalidated " + count + " futures from log index: " + entryIndex);
if (count > 0) {
logger.warning("Invalidated " + count + " futures from log index: " + entryIndex);
}
}

/**
Expand All @@ -712,7 +689,9 @@ private void invalidateFuturesUntil(long entryIndex) {
}
}

logger.warning("Invalidated " + count + " futures until log index: " + entryIndex);
if (count > 0) {
logger.warning("Invalidated " + count + " futures until log index: " + entryIndex);
}
}

/**
Expand Down Expand Up @@ -777,8 +756,12 @@ public boolean installSnapshot(SnapshotEntry snapshot) {

raftIntegration.restoreSnapshot(snapshot.operation(), snapshot.index());

// If I am installing a snapshot, it means I am still present in the last member list so I don't need to update status.
// If I am installing a snapshot, it means I am still present in the last member list,
// but it is possible that the last entry I appended before the snapshot could be a membership change.
// Because of this, I need to update my status.
// Nevertheless, I may not be present in the restored member list, which is ok.

setStatus(ACTIVE);
state.restoreGroupMembers(snapshot.groupMembersLogIndex(), snapshot.groupMembers());
printMemberState();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ public enum RaftNodeStatus {

/**
* During membership changes, node statuses become
* {@code CHANGING_MEMBERSHIP} and they apply requested change once
* {@code UPDATING_GROUP_MEMBER_LIST} and they apply requested change once
* the entry is appended to the log. Once log is committed, if the related
* node is the being removed from group, status becomes
* {@link #STEPPED_DOWN}, otherwise {@link #ACTIVE}.
*/
CHANGING_MEMBERSHIP,
UPDATING_GROUP_MEMBER_LIST,

/**
* When a node is removed from the cluster after a membership change is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.hazelcast.cp.internal.raft.impl.handler;

import com.hazelcast.cp.internal.raft.command.DestroyRaftGroupCmd;
import com.hazelcast.cp.internal.raft.command.RaftGroupCmd;
import com.hazelcast.cp.internal.raft.impl.RaftNodeImpl;
import com.hazelcast.cp.internal.raft.impl.RaftNodeStatus;
import com.hazelcast.cp.internal.raft.impl.command.ApplyRaftGroupMembersCmd;
Expand All @@ -28,12 +29,12 @@
import com.hazelcast.cp.internal.raft.impl.state.RaftState;
import com.hazelcast.cp.internal.raft.impl.task.RaftNodeStatusAwareTask;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static com.hazelcast.cp.internal.raft.impl.RaftRole.FOLLOWER;
import static java.lang.Math.min;
import static java.util.Arrays.asList;

/**
* Handles {@link AppendRequest} sent by the leader. Responds with
Expand Down Expand Up @@ -126,12 +127,12 @@ protected void innerRun() {
}
}

LogEntry[] newEntries = null;
// Process any new entries
if (req.entryCount() > 0) {
// Delete any conflicting entries, skip any duplicates
long lastLogIndex = raftLog.lastLogOrSnapshotIndex();

LogEntry[] newEntries = null;
for (int i = 0; i < req.entryCount(); i++) {
LogEntry reqEntry = req.entries()[i];

Expand All @@ -156,7 +157,7 @@ protected void innerRun() {
}

raftNode.invalidateFuturesFrom(reqEntry.index());
handleRaftGroupCmd(truncated, true);
revertRaftGroupCmd(truncated);

newEntries = Arrays.copyOfRange(req.entries(), i, req.entryCount());
break;
Expand All @@ -170,7 +171,6 @@ protected void innerRun() {
}

raftLog.appendEntries(newEntries);
handleRaftGroupCmd(asList(newEntries), false);
}
}

Expand All @@ -185,34 +185,68 @@ protected void innerRun() {
logger.fine("Setting commit index: " + newCommitIndex);
}
state.commitIndex(newCommitIndex);
raftNode.applyLogEntries();
}

raftNode.updateLastAppendEntriesTimestamp();

// If I just appended any new entry or the leader is trying to adjust my match index, I must send a response.
// Otherwise, I just learnt the last commit index and I don't need to send a response.
if (req.entryCount() > 0 || oldCommitIndex == state.commitIndex()) {
AppendSuccessResponse resp = new AppendSuccessResponse(raftNode.getLocalMember(), state.term(), lastLogIndex);
raftNode.send(resp, req.leader());
try {
// If I just appended any new entry or the leader is trying to adjust my match index, I must send a response.
// Otherwise, I just learnt the last commit index and I don't need to send a response.
if (req.entryCount() > 0 || oldCommitIndex == state.commitIndex()) {
AppendSuccessResponse resp = new AppendSuccessResponse(raftNode.getLocalMember(), state.term(), lastLogIndex);
raftNode.send(resp, req.leader());
}
} finally {
if (state.commitIndex() > oldCommitIndex) {
raftNode.applyLogEntries();
}
if (newEntries != null) {
preApplyRaftGroupCmd(newEntries, state.commitIndex());
}
}
}


private void preApplyRaftGroupCmd(LogEntry[] entries, long commitIndex) {
// There can be at most one appended & not-committed command in the log
for (LogEntry entry : entries) {
Object operation = entry.operation();
if (entry.index() <= commitIndex || !(operation instanceof RaftGroupCmd)) {
continue;
}

if (operation instanceof DestroyRaftGroupCmd) {
raftNode.setStatus(RaftNodeStatus.TERMINATING);
} else if (operation instanceof ApplyRaftGroupMembersCmd) {
raftNode.setStatus(RaftNodeStatus.UPDATING_GROUP_MEMBER_LIST);
ApplyRaftGroupMembersCmd op = (ApplyRaftGroupMembersCmd) operation;
raftNode.updateGroupMembers(entry.index(), op.getMembers());
} else {
assert false : "Invalid command: " + operation + " in " + raftNode.getGroupId();
}

return;
}
}

private void handleRaftGroupCmd(List<LogEntry> entries, boolean revert) {
private void revertRaftGroupCmd(List<LogEntry> entries) {
// I am reverting appended-but-uncommitted entries and there can be at most 1 uncommitted Raft command...
List<LogEntry> commandEntries = new ArrayList<LogEntry>();
for (LogEntry entry : entries) {
if (entry.operation() instanceof RaftGroupCmd) {
commandEntries.add(entry);
}
}

assert commandEntries.size() <= 1 : "Reverted command entries: " + commandEntries;

for (LogEntry entry : entries) {
if (entry.operation() instanceof DestroyRaftGroupCmd) {
RaftNodeStatus status = revert ? RaftNodeStatus.ACTIVE : RaftNodeStatus.TERMINATING;
raftNode.setStatus(status);
raftNode.setStatus(RaftNodeStatus.ACTIVE);
return;
} else if (entry.operation() instanceof ApplyRaftGroupMembersCmd) {
RaftNodeStatus status = revert ? RaftNodeStatus.ACTIVE : RaftNodeStatus.CHANGING_MEMBERSHIP;
raftNode.setStatus(status);
if (revert) {
raftNode.resetGroupMembers();
} else {
ApplyRaftGroupMembersCmd op = (ApplyRaftGroupMembersCmd) entry.operation();
raftNode.updateGroupMembers(entry.index(), op.getMembers());
}
raftNode.setStatus(RaftNodeStatus.ACTIVE);
raftNode.resetGroupMembers();
return;
}
}
Expand Down
Loading

0 comments on commit 55bd69c

Please sign in to comment.