Skip to content

Commit

Permalink
Fix membership change & snapshot bug
Browse files Browse the repository at this point in the history
Leader can take a snapshot even when there is an ongoing membership
change using the committed member list.
  • Loading branch information
metanet committed Feb 13, 2019
1 parent f10b7aa commit 658115e
Show file tree
Hide file tree
Showing 8 changed files with 220 additions and 30 deletions.
Expand Up @@ -48,6 +48,7 @@
import com.hazelcast.cp.internal.raft.impl.log.SnapshotEntry;
import com.hazelcast.cp.internal.raft.impl.state.FollowerState;
import com.hazelcast.cp.internal.raft.impl.state.LeaderState;
import com.hazelcast.cp.internal.raft.impl.state.RaftGroupMembers;
import com.hazelcast.cp.internal.raft.impl.state.RaftState;
import com.hazelcast.cp.internal.raft.impl.task.MembershipChangeTask;
import com.hazelcast.cp.internal.raft.impl.task.PreVoteTask;
Expand Down Expand Up @@ -720,22 +721,24 @@ private void takeSnapshotIfCommitIndexAdvanced() {
return;
}

// We don't support snapshots while there's a membership change or the Raft group is being destroyed...
if (status != ACTIVE) {
if (isTerminatedOrSteppedDown()) {
// If the status is UPDATING_MEMBER_LIST or TERMINATING, it means the status is normally ACTIVE
// and there is an appended but not committed RaftGroupCmd.
// If the status is TERMINATED or STEPPED_DOWN, then there will not be any new appends.
return;
}

RaftLog log = state.log();
Object snapshot = raftIntegration.takeSnapshot(commitIndex);
if (snapshot instanceof Throwable) {
Throwable t = (Throwable) snapshot;
logger.severe("Could not take snapshot for " + groupId + " commit index: " + commitIndex, t);
logger.severe("Could not take snapshot at commit index: " + commitIndex, t);
return;
}

LogEntry committedEntry = log.getLogEntry(commitIndex);
SnapshotEntry snapshotEntry = new SnapshotEntry(committedEntry.term(), commitIndex, snapshot,
state.membersLogIndex(), state.members());
int snapshotTerm = log.getLogEntry(commitIndex).term();
RaftGroupMembers members = state.committedGroupMembers();
SnapshotEntry snapshotEntry = new SnapshotEntry(snapshotTerm, commitIndex, snapshot, members.index(), members.members());

long minMatchIndex = 0L;
LeaderState leaderState = state.leaderState();
Expand Down
Expand Up @@ -146,6 +146,10 @@ public List<LogEntry> truncateEntriesFrom(long entryIndex) {
return truncated;
}

public boolean checkAvailableCapacity(int requestedCapacity) {
return (logs.getCapacity() - logs.size()) >= requestedCapacity;
}

/**
* Appends new entries to the Raft log.
*
Expand All @@ -159,9 +163,10 @@ public void appendEntries(LogEntry... newEntries) {
int lastTerm = lastLogOrSnapshotTerm();
long lastIndex = lastLogOrSnapshotIndex();

assert logs.getCapacity() - logs.size() >= newEntries.length
: "Not enough capacity! Capacity: " + logs.getCapacity()
+ ", Size: " + logs.size() + ", New entries: " + newEntries.length;
if (!checkAvailableCapacity(newEntries.length)) {
throw new IllegalStateException("Not enough capacity! Capacity: " + logs.getCapacity()
+ ", Size: " + logs.size() + ", New entries: " + newEntries.length);
}

for (LogEntry entry : newEntries) {
if (entry.term() < lastTerm) {
Expand Down
Expand Up @@ -24,6 +24,7 @@
import com.hazelcast.cp.internal.raft.impl.RaftNodeStatus;
import com.hazelcast.cp.internal.raft.impl.command.ApplyRaftGroupMembersCmd;
import com.hazelcast.cp.internal.raft.impl.log.LogEntry;
import com.hazelcast.cp.internal.raft.impl.log.RaftLog;
import com.hazelcast.cp.internal.raft.impl.state.RaftState;
import com.hazelcast.cp.internal.raft.impl.util.SimpleCompletableFuture;
import com.hazelcast.logging.ILogger;
Expand Down Expand Up @@ -78,9 +79,16 @@ public void run() {
logger.fine("Replicating: " + operation + " in term: " + state.term());
}

long newEntryLogIndex = state.log().lastLogOrSnapshotIndex() + 1;
RaftLog log = state.log();

if (!log.checkAvailableCapacity(1)) {
resultFuture.setResult(new IllegalStateException("Not enough capacity in RaftLog!"));
return;
}

long newEntryLogIndex = log.lastLogOrSnapshotIndex() + 1;
raftNode.registerFuture(newEntryLogIndex, resultFuture);
state.log().appendEntries(new LogEntry(state.term(), newEntryLogIndex, operation));
log.appendEntries(new LogEntry(state.term(), newEntryLogIndex, operation));

handleRaftGroupCmd(newEntryLogIndex, operation);

Expand Down
Expand Up @@ -393,7 +393,7 @@ public void when_followerSlowsDown_then_itCatchesLeaderEventually() throws Execu

assertEquals(0, getCommitIndex(slowFollower));

group.resetAllDropRulesFrom(leader.getLocalMember());
group.resetAllRulesFrom(leader.getLocalMember());

assertTrueEventually(new AssertTask() {
@Override
Expand Down Expand Up @@ -437,7 +437,7 @@ public void run() {
}
}, 5);

group.resetAllDropRulesFrom(leader.getLocalMember());
group.resetAllRulesFrom(leader.getLocalMember());
group.merge();

RaftNodeImpl newLeader = group.waitUntilLeaderElected();
Expand Down
Expand Up @@ -444,7 +444,7 @@ public void run() {
}
});

group.resetAllDropRulesFrom(leader.getLocalMember());
group.resetAllRulesFrom(leader.getLocalMember());

assertTrueEventually(new AssertTask() {
@Override
Expand Down
Expand Up @@ -21,25 +21,29 @@
import com.hazelcast.core.ICompletableFuture;
import com.hazelcast.cp.exception.StaleAppendRequestException;
import com.hazelcast.cp.internal.raft.MembershipChangeType;
import com.hazelcast.cp.internal.raft.impl.command.ApplyRaftGroupMembersCmd;
import com.hazelcast.cp.internal.raft.impl.dataservice.ApplyRaftRunnable;
import com.hazelcast.cp.internal.raft.impl.dataservice.RaftDataService;
import com.hazelcast.cp.internal.raft.impl.dto.AppendFailureResponse;
import com.hazelcast.cp.internal.raft.impl.dto.AppendRequest;
import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse;
import com.hazelcast.cp.internal.raft.impl.dto.InstallSnapshot;
import com.hazelcast.cp.internal.raft.impl.log.LogEntry;
import com.hazelcast.cp.internal.raft.impl.testing.LocalRaftGroup;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.util.function.Function;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
Expand All @@ -55,6 +59,7 @@
import static com.hazelcast.cp.internal.raft.impl.RaftUtil.newGroupWithService;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
Expand Down Expand Up @@ -175,7 +180,7 @@ public void run() {

leader.replicate(new ApplyRaftRunnable("valFinal")).get();

group.resetAllDropRulesFrom(leader.getLocalMember());
group.resetAllRulesFrom(leader.getLocalMember());

assertTrueEventually(new AssertTask() {
@Override
Expand Down Expand Up @@ -226,7 +231,7 @@ public void run() {

leader.replicate(new ApplyRaftRunnable("valFinal")).get();

group.resetAllDropRulesFrom(leader.getLocalMember());
group.resetAllRulesFrom(leader.getLocalMember());

assertTrueEventually(new AssertTask() {
@Override
Expand All @@ -250,7 +255,7 @@ public void run() {
}
});

group.resetAllDropRulesFrom(slowFollower.getLocalMember());
group.resetAllRulesFrom(slowFollower.getLocalMember());

final long commitIndex = getCommitIndex(leader);

Expand Down Expand Up @@ -302,7 +307,7 @@ public void run() {

leader.replicate(new ApplyRaftRunnable("valFinal")).get();

group.resetAllDropRulesFrom(leader.getLocalMember());
group.resetAllRulesFrom(leader.getLocalMember());

assertTrueEventually(new AssertTask() {
@Override
Expand Down Expand Up @@ -484,7 +489,143 @@ public void run() {
assertEquals(ACTIVE, getStatus(slowFollower));
}
});
}

@Test
public void testMembershipChangeBlocksSnapshotBug() throws ExecutionException, InterruptedException {
// The comments below show how the code behaves before the mentioned bug is fixed.

int commitIndexAdvanceCount = 50;
final int uncommittedEntryCount = 10;
RaftAlgorithmConfig config = new RaftAlgorithmConfig()
.setCommitIndexAdvanceCountToSnapshot(commitIndexAdvanceCount)
.setUncommittedEntryCountToRejectNewAppends(uncommittedEntryCount);
group = newGroupWithService(3, config);
group.start();

final RaftNodeImpl leader = group.waitUntilLeaderElected();
final RaftNodeImpl[] followers = group.getNodesExcept(leader.getLocalMember());

group.dropMessagesToMember(leader.getLocalMember(), followers[0].getLocalMember(), AppendRequest.class);

while (getSnapshotEntry(leader).index() == 0) {
leader.replicate(new ApplyRaftRunnable("into_snapshot")).get();
}

// now, the leader has taken a snapshot.
// It also keeps some already committed entries in the log because followers[0] hasn't appended them.
// LOG: [ <46 - 49>, <50>], SNAPSHOT INDEX: 50, COMMIT INDEX: 50

long leaderCommitIndex = getCommitIndex(leader);
do {
leader.replicate(new ApplyRaftRunnable("committed_after_snapshot")).get();
} while (getCommitIndex(leader) < leaderCommitIndex + commitIndexAdvanceCount - 1);

// committing new entries.
// one more entry is needed to take the next snapshot.
// LOG: [ <46 - 49>, <50>, <51 - 99 (committed)> ], SNAPSHOT INDEX: 50, COMMIT INDEX: 99

group.dropMessagesToMember(leader.getLocalMember(), followers[1].getLocalMember(), AppendRequest.class);

for (int i = 0; i < uncommittedEntryCount - 1; i++) {
leader.replicate(new ApplyRaftRunnable("uncommitted_after_snapshot"));
}

// appended some more entries which will not be committed because the leader has no majority.
// the last uncommitted index is reserved for membership changed.
// LOG: [ <46 - 49>, <50>, <51 - 99 (committed)>, <100 - 108 (uncommitted)> ], SNAPSHOT INDEX: 50, COMMIT INDEX: 99
// There are only 2 empty indices in the log.

RaftNodeImpl newRaftNode = group.createNewRaftNode();

Function<Object, Object> alterFunc = new Function<Object, Object>() {
@Override
public Object apply(Object o) {
if (o instanceof AppendRequest) {
AppendRequest request = (AppendRequest) o;
LogEntry[] entries = request.entries();
if (entries.length > 0) {
if (entries[entries.length - 1].operation() instanceof ApplyRaftGroupMembersCmd) {
entries = Arrays.copyOf(entries, entries.length - 1);
return new AppendRequest(request.leader(), request.term(), request.prevLogTerm(), request.prevLogIndex(), request.leaderCommitIndex(), entries);
} else if (entries[0].operation() instanceof ApplyRaftGroupMembersCmd) {
entries = new LogEntry[0];
return new AppendRequest(request.leader(), request.term(), request.prevLogTerm(), request.prevLogIndex(), request.leaderCommitIndex(), entries);
}
}
}

return null;
}
};

group.alterMessagesToMember(leader.getLocalMember(), followers[1].getLocalMember(), alterFunc);
group.alterMessagesToMember(leader.getLocalMember(), newRaftNode.getLocalMember(), alterFunc);

final long lastLogIndex1 = getLastLogOrSnapshotEntry(leader).index();

leader.replicateMembershipChange(newRaftNode.getLocalMember(), MembershipChangeType.ADD);

// When the membership change entry is appended, the leader's Log will be as following:
// LOG: [ <46 - 49>, <50>, <51 - 99 (committed)>, <100 - 108 (uncommitted)>, <109 (membership change)> ], SNAPSHOT INDEX: 50, COMMIT INDEX: 99

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertTrue(getLastLogOrSnapshotEntry(leader).index() > lastLogIndex1);
}
});

group.allowMessagesToMember(leader.getLocalMember(), followers[1].getLocalMember(), AppendRequest.class);

System.out.println();

// Then, only the entries before the membership change will be committed because we alter the append request. The log will be:
// LOG: [ <46 - 49>, <50>, <51 - 108 (committed)>, <109 (membership change)> ], SNAPSHOT INDEX: 50, COMMIT INDEX: 108
// There is only 1 empty index in the log.

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(lastLogIndex1, getCommitIndex(leader));
assertEquals(lastLogIndex1, getCommitIndex(followers[1]));
}
});


// assertTrueEventually(new AssertTask() {
// @Override
// public void run() {
// assertEquals(lastLogIndex1 + 1, getCommitIndex(leader));
// assertEquals(lastLogIndex1 + 1, getCommitIndex(followers[1]));
// }
// });

final long lastLogIndex2 = getLastLogOrSnapshotEntry(leader).index();

leader.replicate(new ApplyRaftRunnable("after_membership_change_append"));

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertTrue(getLastLogOrSnapshotEntry(leader).index() > lastLogIndex2);
}
});

// Now the log is full. There is no empty space left.
// LOG: [ <46 - 49>, <50>, <51 - 108 (committed)>, <109 (membership change)>, <110 (uncommitted)> ], SNAPSHOT INDEX: 50, COMMIT INDEX: 108

final long lastLogIndex3 = getLastLogOrSnapshotEntry(leader).index();

ICompletableFuture f = leader.replicate(new ApplyRaftRunnable("after_membership_change_append"));

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertTrue(getLastLogOrSnapshotEntry(leader).index() > lastLogIndex3);
}
});

assertFalse(f.isDone());
}
}
Expand Up @@ -23,6 +23,7 @@
import com.hazelcast.cp.internal.raft.impl.RaftNodeImpl;
import com.hazelcast.cp.internal.raft.impl.RaftUtil;
import com.hazelcast.test.AssertTask;
import com.hazelcast.util.function.Function;
import org.junit.Assert;

import java.util.Arrays;
Expand Down Expand Up @@ -450,10 +451,18 @@ public void allowMessagesToAll(Endpoint from, Class messageType) {
}

/**
* Resets all drop rules from endpoint.
* Resets all rules from endpoint.
*/
public void resetAllDropRulesFrom(Endpoint endpoint) {
getIntegration(getIndexOf(endpoint)).resetAllDropRules();
public void resetAllRulesFrom(Endpoint endpoint) {
getIntegration(getIndexOf(endpoint)).resetAllRules();
}

public void alterMessagesToMember(Endpoint from, Endpoint to, Function<Object, Object> function) {
getIntegration(getIndexOf(from)).alterMessagesToEndpoint(to, function);
}

void removeAlterMessageRuleToMember(Endpoint from, Endpoint to) {
getIntegration(getIndexOf(from)).removeAlterMessageRuleToEndpoint(to);
}

public void terminateNode(int index) {
Expand Down

0 comments on commit 658115e

Please sign in to comment.