Skip to content

Commit

Permalink
unsafe recovery: Fixing learner store being ignored error in 7.1 (tik…
Browse files Browse the repository at this point in the history
…v#6744)

ref tikv#6690, ref tikv#6710

Fix learner store/ replica being ignored error in auto detect mode.

Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
v01dstar and ti-chi-bot[bot] committed Jul 5, 2023
1 parent 2164c07 commit 9954576
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
3 changes: 0 additions & 3 deletions server/cluster/unsafe_recovery_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -706,9 +706,6 @@ func (u *unsafeRecoveryController) getFailedPeers(region *metapb.Region) []*meta

var failedPeers []*metapb.Peer
for _, peer := range region.Peers {
if peer.Role == metapb.PeerRole_Learner || peer.Role == metapb.PeerRole_DemotingVoter {
continue
}
if u.isFailed(peer) {
failedPeers = append(failedPeers, peer)
}
Expand Down
42 changes: 42 additions & 0 deletions server/cluster/unsafe_recovery_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,48 @@ func TestForceLeaderForCommitMerge(t *testing.T) {
re.Equal(demoteFailedVoter, recoveryController.GetStage())
}

// Failed learner replica/ store should be considered by auto-recover.
func TestAutoDetectModeWithOneLearner(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

_, opt, _ := newTestScheduleConfig()
cluster := newTestRaftCluster(ctx, mockid.NewIDAllocator(), opt, storage.NewStorageWithMemoryBackend(), core.NewBasicCluster())
cluster.coordinator = newCoordinator(ctx, cluster, hbstream.NewTestHeartbeatStreams(ctx, cluster.meta.GetId(), cluster, true))
cluster.coordinator.run()
for _, store := range newTestStores(1, "6.0.0") {
re.NoError(cluster.PutStore(store.GetMeta()))
}
recoveryController := newUnsafeRecoveryController(cluster)
re.NoError(recoveryController.RemoveFailedStores(nil, 60, true))

storeReport := pdpb.StoreReport{
PeerReports: []*pdpb.PeerReport{
{
RaftState: &raft_serverpb.RaftLocalState{LastIndex: 10, HardState: &eraftpb.HardState{Term: 1, Commit: 10}},
RegionState: &raft_serverpb.RegionLocalState{
Region: &metapb.Region{
Id: 1001,
RegionEpoch: &metapb.RegionEpoch{ConfVer: 7, Version: 10},
Peers: []*metapb.Peer{
{Id: 11, StoreId: 1}, {Id: 12, StoreId: 2}, {Id: 13, StoreId: 3, Role: metapb.PeerRole_Learner}}}}},
},
}
req := newStoreHeartbeat(1, &storeReport)
req.StoreReport.Step = 1
resp := &pdpb.StoreHeartbeatResponse{}
recoveryController.HandleStoreHeartbeat(req, resp)
hasStore3AsFailedStore := false
for _, failedStore := range resp.RecoveryPlan.ForceLeader.FailedStores {
if failedStore == 3 {
hasStore3AsFailedStore = true
break
}
}
re.True(hasStore3AsFailedStore)
}

func TestAutoDetectMode(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 9954576

Please sign in to comment.