From 45917e927e1efef705388a0371183077bd18afec Mon Sep 17 00:00:00 2001 From: Ensar Basri Kahveci Date: Tue, 5 Feb 2019 12:25:29 +0300 Subject: [PATCH] Fix InstallSnapshotRPC bug If a snapshot is already installed, we must notify the leader so that it can advance follower's match index --- .../cp/internal/raft/impl/RaftNodeImpl.java | 2 +- .../cp/internal/raft/impl/SnapshotTest.java | 72 +++++++++++++++++++ 2 files changed, 73 insertions(+), 1 deletion(-) diff --git a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java index acb8c6e76482..16ffe7addbe9 100644 --- a/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/cp/internal/raft/impl/RaftNodeImpl.java @@ -764,7 +764,7 @@ public boolean installSnapshot(SnapshotEntry snapshot) { return false; } else if (commitIndex == snapshot.index()) { logger.warning("Ignored snapshot: " + snapshot + " since commit index is same."); - return false; + return true; } state.commitIndex(snapshot.index()); diff --git a/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java b/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java index 040eb5063761..bc8511404999 100644 --- a/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java +++ b/hazelcast/src/test/java/com/hazelcast/cp/internal/raft/impl/SnapshotTest.java @@ -22,6 +22,7 @@ import com.hazelcast.cp.internal.raft.impl.dataservice.ApplyRaftRunnable; import com.hazelcast.cp.internal.raft.impl.dataservice.RaftDataService; import com.hazelcast.cp.internal.raft.impl.dto.AppendRequest; +import com.hazelcast.cp.internal.raft.impl.dto.AppendSuccessResponse; import com.hazelcast.cp.internal.raft.impl.testing.LocalRaftGroup; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastSerialClassRunner; @@ -182,6 +183,77 @@ public void run() { }); } + @Test + public void when_leaderMissesInstallSnapshotResponse_then_itAdvancesMatchIndexWithNextInstallSnapshotResponse() + throws ExecutionException, InterruptedException { + final int entryCount = 50; + RaftAlgorithmConfig raftAlgorithmConfig = new RaftAlgorithmConfig().setCommitIndexAdvanceCountToSnapshot(entryCount) + .setAppendRequestBackoffTimeoutInMillis(1000); + group = newGroupWithService(3, raftAlgorithmConfig); + group.start(); + + final RaftNodeImpl leader = group.waitUntilLeaderElected(); + + RaftNodeImpl[] followers = group.getNodesExcept(leader.getLocalMember()); + final RaftNodeImpl slowFollower = followers[1]; + + // the leader cannot send AppendEntriesRPC to the follower + group.dropMessagesToMember(leader.getLocalMember(), slowFollower.getLocalMember(), AppendRequest.class); + + // the follower cannot send append response to the leader after installing the snapshot + group.dropMessagesToMember(slowFollower.getLocalMember(), leader.getLocalMember(), AppendSuccessResponse.class); + + for (int i = 0; i < entryCount; i++) { + leader.replicate(new ApplyRaftRunnable("val" + i)).get(); + } + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + assertEquals(entryCount, getSnapshotEntry(leader).index()); + } + }); + + leader.replicate(new ApplyRaftRunnable("valFinal")).get(); + + group.resetAllDropRulesFrom(leader.getLocalMember()); + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + for (RaftNodeImpl raftNode : group.getNodesExcept(slowFollower.getLocalMember())) { + assertEquals(entryCount + 1, getCommitIndex(raftNode)); + RaftDataService service = group.getService(raftNode); + assertEquals(entryCount + 1, service.size()); + for (int i = 0; i < entryCount; i++) { + assertEquals(("val" + i), service.get(i + 1)); + } + assertEquals("valFinal", service.get(51)); + } + + assertEquals(entryCount, getCommitIndex(slowFollower)); + RaftDataService service = group.getService(slowFollower); + assertEquals(entryCount, service.size()); + for (int i = 0; i < entryCount; i++) { + assertEquals(("val" + i), service.get(i + 1)); + } + } + }); + + group.resetAllDropRulesFrom(slowFollower.getLocalMember()); + + final long commitIndex = getCommitIndex(leader); + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + for (RaftNode raftNode : group.getNodesExcept(leader.getLocalMember())) { + assertEquals(commitIndex, getMatchIndex(leader, raftNode.getLocalMember())); + } + } + }); + } + @Test public void when_followerMissesTheLastEntryThatGoesIntoTheSnapshot_then_itInstallsSnapshot() throws ExecutionException, InterruptedException { final int entryCount = 50;