Skip to content
Permalink
Browse files

Fix InstallSnapshotRPC bug

If a snapshot is already installed, we must notify the leader
so that it can advance follower's match index
  • Loading branch information...
metanet committed Feb 5, 2019
1 parent 0c77aef commit 45917e927e1efef705388a0371183077bd18afec
@@ -764,7 +764,7 @@ public boolean installSnapshot(SnapshotEntry snapshot) {
return false;
} else if (commitIndex == snapshot.index()) {
logger.warning("Ignored snapshot: " + snapshot + " since commit index is same.");
return false;
return true;
}

state.commitIndex(snapshot.index());
@@ -22,6 +22,7 @@
import com.hazelcast.cp.internal.raft.impl.dataservice.ApplyRaftRunnable;
import com.hazelcast.cp.internal.raft.impl.dataservice.RaftDataService;
import com.hazelcast.cp.internal.raft.impl.dto.AppendRequest;
import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse;
import com.hazelcast.cp.internal.raft.impl.testing.LocalRaftGroup;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastSerialClassRunner;
@@ -182,6 +183,77 @@ public void run() {
});
}

@Test
public void when_leaderMissesInstallSnapshotResponse_then_itAdvancesMatchIndexWithNextInstallSnapshotResponse()
throws ExecutionException, InterruptedException {
final int entryCount = 50;
RaftAlgorithmConfig raftAlgorithmConfig = new RaftAlgorithmConfig().setCommitIndexAdvanceCountToSnapshot(entryCount)
.setAppendRequestBackoffTimeoutInMillis(1000);
group = newGroupWithService(3, raftAlgorithmConfig);
group.start();

final RaftNodeImpl leader = group.waitUntilLeaderElected();

RaftNodeImpl[] followers = group.getNodesExcept(leader.getLocalMember());
final RaftNodeImpl slowFollower = followers[1];

// the leader cannot send AppendEntriesRPC to the follower
group.dropMessagesToMember(leader.getLocalMember(), slowFollower.getLocalMember(), AppendRequest.class);

// the follower cannot send append response to the leader after installing the snapshot
group.dropMessagesToMember(slowFollower.getLocalMember(), leader.getLocalMember(), AppendSuccessResponse.class);

for (int i = 0; i < entryCount; i++) {
leader.replicate(new ApplyRaftRunnable("val" + i)).get();
}

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(entryCount, getSnapshotEntry(leader).index());
}
});

leader.replicate(new ApplyRaftRunnable("valFinal")).get();

group.resetAllDropRulesFrom(leader.getLocalMember());

assertTrueEventually(new AssertTask() {
@Override
public void run() {
for (RaftNodeImpl raftNode : group.getNodesExcept(slowFollower.getLocalMember())) {
assertEquals(entryCount + 1, getCommitIndex(raftNode));
RaftDataService service = group.getService(raftNode);
assertEquals(entryCount + 1, service.size());
for (int i = 0; i < entryCount; i++) {
assertEquals(("val" + i), service.get(i + 1));
}
assertEquals("valFinal", service.get(51));
}

assertEquals(entryCount, getCommitIndex(slowFollower));
RaftDataService service = group.getService(slowFollower);
assertEquals(entryCount, service.size());
for (int i = 0; i < entryCount; i++) {
assertEquals(("val" + i), service.get(i + 1));
}
}
});

group.resetAllDropRulesFrom(slowFollower.getLocalMember());

final long commitIndex = getCommitIndex(leader);

assertTrueEventually(new AssertTask() {
@Override
public void run() {
for (RaftNode raftNode : group.getNodesExcept(leader.getLocalMember())) {
assertEquals(commitIndex, getMatchIndex(leader, raftNode.getLocalMember()));
}
}
});
}

@Test
public void when_followerMissesTheLastEntryThatGoesIntoTheSnapshot_then_itInstallsSnapshot() throws ExecutionException, InterruptedException {
final int entryCount = 50;

0 comments on commit 45917e9

Please sign in to comment.
You can’t perform that action at this time.