diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java index acb8c6e76482..16ffe7addbe9 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java @@ -764,7 +764,7 @@ public boolean installSnapshot(SnapshotEntry snapshot) { return false; } else if (commitIndex == snapshot.index()) { logger.warning("Ignored snapshot: " + snapshot + " since commit index is same."); - return false; + return true; } state.commitIndex(snapshot.index()); diff --git a/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java b/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java index 040eb5063761..bc8511404999 100644 --- a/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java @@ -22,6 +22,7 @@ import com.hazelcast.cp.internal.raft.impl.dataservice.ApplyRaftRunnable; import com.hazelcast.cp.internal.raft.impl.dataservice.RaftDataService; import com.hazelcast.cp.internal.raft.impl.dto.AppendRequest; +import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse; import com.hazelcast.cp.internal.raft.impl.testing.LocalRaftGroup; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastSerialClassRunner; @@ -182,6 +183,77 @@ public void run() { }); } + @Test + public void when_leaderMissesInstallSnapshotResponse_then_itAdvancesMatchIndexWithNextInstallSnapshotResponse() + throws ExecutionException, InterruptedException { + final int entryCount = 50; + RaftAlgorithmConfig raftAlgorithmConfig = new RaftAlgorithmConfig().setCommitIndexAdvanceCountToSnapshot(entryCount) + .setAppendRequestBackoffTimeoutInMillis(1000); + group = newGroupWithService(3, raftAlgorithmConfig); + group.start(); + + final RaftNodeImpl leader = group.waitUntilLeaderElected(); + + RaftNodeImpl[] followers = group.getNodesExcept(leader.getLocalMember()); + final RaftNodeImpl slowFollower = followers[1]; + + // the leader cannot send AppendEntriesRPC to the follower + group.dropMessagesToMember(leader.getLocalMember(), slowFollower.getLocalMember(), AppendRequest.class); + + // the follower cannot send append response to the leader after installing the snapshot + group.dropMessagesToMember(slowFollower.getLocalMember(), leader.getLocalMember(), AppendSuccessResponse.class); + + for (int i = 0; i < entryCount; i++) { + leader.replicate(new ApplyRaftRunnable("val" + i)).get(); + } + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + assertEquals(entryCount, getSnapshotEntry(leader).index()); + } + }); + + leader.replicate(new ApplyRaftRunnable("valFinal")).get(); + + group.resetAllDropRulesFrom(leader.getLocalMember()); + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + for (RaftNodeImpl raftNode : group.getNodesExcept(slowFollower.getLocalMember())) { + assertEquals(entryCount + 1, getCommitIndex(raftNode)); + RaftDataService service = group.getService(raftNode); + assertEquals(entryCount + 1, service.size()); + for (int i = 0; i < entryCount; i++) { + assertEquals(("val" + i), service.get(i + 1)); + } + assertEquals("valFinal", service.get(51)); + } + + assertEquals(entryCount, getCommitIndex(slowFollower)); + RaftDataService service = group.getService(slowFollower); + assertEquals(entryCount, service.size()); + for (int i = 0; i < entryCount; i++) { + assertEquals(("val" + i), service.get(i + 1)); + } + } + }); + + group.resetAllDropRulesFrom(slowFollower.getLocalMember()); + + final long commitIndex = getCommitIndex(leader); + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + for (RaftNode raftNode : group.getNodesExcept(leader.getLocalMember())) { + assertEquals(commitIndex, getMatchIndex(leader, raftNode.getLocalMember())); + } + } + }); + } + @Test public void when_followerMissesTheLastEntryThatGoesIntoTheSnapshot_then_itInstallsSnapshot() throws ExecutionException, InterruptedException { final int entryCount = 50;