diff --git a/server/grpc_service.go b/server/grpc_service.go index b0e343e5398..94f4c58e51c 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -602,10 +602,15 @@ func (s *Server) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionR } region = core.NewRegionInfo(request.GetRegion(), request.GetLeader()) } + cluster.RLock() defer cluster.RUnlock() co := cluster.coordinator - if op := co.regionScatterer.Scatter(region); op != nil { + op, err := co.regionScatterer.Scatter(region) + if err != nil { + return nil, err + } + if op != nil { co.opController.AddOperator(op) } diff --git a/server/handler.go b/server/handler.go index 26bb3c55629..eb6cad154dc 100644 --- a/server/handler.go +++ b/server/handler.go @@ -642,7 +642,11 @@ func (h *Handler) AddScatterRegionOperator(regionID uint64) error { return ErrRegionNotFound(regionID) } - op := c.regionScatterer.Scatter(region) + op, err := c.regionScatterer.Scatter(region) + if err != nil { + return err + } + if op == nil { return nil } diff --git a/server/schedule/mockcluster.go b/server/schedule/mockcluster.go index 24b3568a637..e2223a231db 100644 --- a/server/schedule/mockcluster.go +++ b/server/schedule/mockcluster.go @@ -398,6 +398,9 @@ func (mc *MockCluster) ApplyOperator(op *Operator) { if region.GetStorePeer(s.FromStore) == nil { panic("Remove peer that doesn't exist") } + if region.GetLeader().GetStoreId() == s.FromStore { + panic("Cannot remove the leader peer") + } region = region.Clone(core.WithRemoveStorePeer(s.FromStore)) case AddLearner: if region.GetStorePeer(s.ToStore) != nil { diff --git a/server/schedule/operator.go b/server/schedule/operator.go index ca78345d2e2..b2ad340b1bc 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -436,12 +436,8 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil } -// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer. -func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) { - removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore)) - if err != nil { - return nil, err - } +// CreateAddPeerSteps creates an OperatorStep list that add a new Peer. +func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep { var st []OperatorStep if cluster.IsRaftLearnerEnabled() { st = []OperatorStep{ @@ -453,6 +449,16 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf AddPeer{ToStore: newStore, PeerID: peerID}, } } + return st +} + +// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer. +func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) { + removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore)) + if err != nil { + return nil, err + } + st := CreateAddPeerSteps(newStore, peerID, cluster) steps = append(st, steps...) return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil } @@ -630,3 +636,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct return intersection } + +// CheckOperatorValid checks if the operator is valid. +func CheckOperatorValid(op *Operator) bool { + removeStores := []uint64{} + for _, step := range op.steps { + if tr, ok := step.(TransferLeader); ok { + for _, store := range removeStores { + if store == tr.FromStore { + return false + } + if store == tr.ToStore { + return false + } + } + } + if rp, ok := step.(RemovePeer); ok { + removeStores = append(removeStores, rp.FromStore) + } + } + return true +} diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 22bbdb8df88..8e205739ad7 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" + "github.com/pkg/errors" ) type selectedStores struct { @@ -79,23 +80,28 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio } // Scatter relocates the region. -func (r *RegionScatterer) Scatter(region *core.RegionInfo) *Operator { +func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*Operator, error) { if r.cluster.IsRegionHot(region.GetID()) { - return nil + return nil, errors.Errorf("region %d is a hot region", region.GetID()) } if len(region.GetPeers()) != r.cluster.GetMaxReplicas() { - return nil + return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID()) + } + + if region.GetLeader() == nil { + return nil, errors.Errorf("region %d has no leader", region.GetID()) } - return r.scatterRegion(region) + return r.scatterRegion(region), nil } func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator { - steps := make([]OperatorStep, 0, len(region.GetPeers())) - stores := r.collectAvailableStores(region) - var kind OperatorKind + var ( + targetPeers []*metapb.Peer + replacedPeers []*metapb.Peer + ) for _, peer := range region.GetPeers() { if len(stores) == 0 { // Reset selected stores if we have no available stores. @@ -105,31 +111,93 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator { if r.selected.put(peer.GetStoreId()) { delete(stores, peer.GetStoreId()) + targetPeers = append(targetPeers, peer) + replacedPeers = append(replacedPeers, peer) continue } newPeer := r.selectPeerToReplace(stores, region, peer) if newPeer == nil { + targetPeers = append(targetPeers, peer) + replacedPeers = append(replacedPeers, peer) continue } - // Remove it from stores and mark it as selected. delete(stores, newPeer.GetStoreId()) r.selected.put(newPeer.GetStoreId()) + targetPeers = append(targetPeers, newPeer) + replacedPeers = append(replacedPeers, peer) + } + return r.createOperator(region, replacedPeers, targetPeers) +} - op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin, - peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId()) - if err != nil { - continue +func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, targetPeers []*metapb.Peer) *Operator { + // Randomly pick a leader + i := rand.Intn(len(targetPeers)) + targetLeaderPeer := targetPeers[i] + originLeaderStoreID := origin.GetLeader().GetStoreId() + + originStoreIDs := origin.GetStoreIds() + steps := make([]OperatorStep, 0, len(targetPeers)*3+1) + // deferSteps will append to the end of the steps + deferSteps := make([]OperatorStep, 0, 5) + var kind OperatorKind + sameLeader := targetLeaderPeer.GetStoreId() == originLeaderStoreID + // No need to do anything + if sameLeader { + isSame := true + for _, peer := range targetPeers { + if _, ok := originStoreIDs[peer.GetStoreId()]; !ok { + isSame = false + break + } + } + if isSame { + return nil } - steps = append(steps, op.steps...) - steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()}) - kind |= op.Kind() } - if len(steps) == 0 { - return nil + // Creates the first step + if _, ok := originStoreIDs[targetLeaderPeer.GetStoreId()]; !ok { + st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster) + steps = append(steps, st...) + // Do not transfer leader to the newly added peer + // Ref: https://github.com/tikv/tikv/issues/3819 + deferSteps = append(deferSteps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()}) + deferSteps = append(deferSteps, RemovePeer{FromStore: replacedPeers[i].GetStoreId()}) + kind |= OpLeader + kind |= OpRegion + } else { + if !sameLeader { + steps = append(steps, TransferLeader{FromStore: originLeaderStoreID, ToStore: targetLeaderPeer.GetStoreId()}) + kind |= OpLeader + } + } + + // For the other steps + for j, peer := range targetPeers { + if peer.GetId() == targetLeaderPeer.GetId() { + continue + } + if _, ok := originStoreIDs[peer.GetStoreId()]; ok { + continue + } + if replacedPeers[j].GetStoreId() == originLeaderStoreID { + st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) + st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()}) + deferSteps = append(deferSteps, st...) + kind |= OpRegion | OpLeader + continue + } + st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) + steps = append(steps, st...) + steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()}) + kind |= OpRegion } - return NewOperator("scatter-region", region.GetID(), region.GetRegionEpoch(), kind, steps...) + + steps = append(steps, deferSteps...) + op := NewOperator("scatter-region", origin.GetID(), origin.GetRegionEpoch(), kind, steps...) + op.SetPriorityLevel(core.HighPriority) + return op } func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer { diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 3f90cec7243..c398ea0455b 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -174,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) { s.scatter(c, 5, 5) } +func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) { + c.Assert(schedule.CheckOperatorValid(op), IsTrue) +} + func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { opt := schedule.NewMockSchedulerOptions() tc := schedule.NewMockCluster(opt) @@ -184,7 +188,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { } // Add regions 1~4. - seq := newSequencer(numStores) + seq := newSequencer(3) // Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace. tc.AddLeaderRegion(1, 1, 2, 3) for i := uint64(2); i <= numRegions; i++ { @@ -195,7 +199,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { for i := uint64(1); i <= numRegions; i++ { region := tc.GetRegion(i) - if op := scatterer.Scatter(region); op != nil { + if op, _ := scatterer.Scatter(region); op != nil { + s.checkOperator(op, c) tc.ApplyOperator(op) } } diff --git a/tests/cmd/pdctl_test.go b/tests/cmd/pdctl_test.go index 68bbea82415..2a0008e27e8 100644 --- a/tests/cmd/pdctl_test.go +++ b/tests/cmd/pdctl_test.go @@ -1041,7 +1041,7 @@ func (s *cmdTestSuite) TestOperator(c *C) { args = []string{"-u", pdAddr, "operator", "show", "region"} _, output, err = executeCommandC(cmd, args...) c.Assert(err, IsNil) - c.Assert(strings.Contains(string(output), "transfer leader from store 0 to store 3"), IsTrue) + c.Assert(strings.Contains(string(output), "scatter-region"), IsTrue) } func (s *cmdTestSuite) TestMember(c *C) {