Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: fix region scatter may transfer leader to removed peer #1482

Merged
merged 11 commits into from Apr 2, 2019
7 changes: 6 additions & 1 deletion server/grpc_service.go
Expand Up @@ -602,10 +602,15 @@ func (s *Server) ScatterRegion(ctx context.Context, request *pdpb.ScatterRegionR
}
region = core.NewRegionInfo(request.GetRegion(), request.GetLeader())
}

cluster.RLock()
defer cluster.RUnlock()
co := cluster.coordinator
if op := co.regionScatterer.Scatter(region); op != nil {
op, err := co.regionScatterer.Scatter(region)
if err != nil {
return nil, err
}
if op != nil {
co.opController.AddOperator(op)
}

Expand Down
6 changes: 5 additions & 1 deletion server/handler.go
Expand Up @@ -642,7 +642,11 @@ func (h *Handler) AddScatterRegionOperator(regionID uint64) error {
return ErrRegionNotFound(regionID)
}

op := c.regionScatterer.Scatter(region)
op, err := c.regionScatterer.Scatter(region)
if err != nil {
return err
}

if op == nil {
return nil
}
Expand Down
3 changes: 3 additions & 0 deletions server/schedule/mockcluster.go
Expand Up @@ -398,6 +398,9 @@ func (mc *MockCluster) ApplyOperator(op *Operator) {
if region.GetStorePeer(s.FromStore) == nil {
panic("Remove peer that doesn't exist")
}
if region.GetLeader().GetStoreId() == s.FromStore {
panic("Cannot remove the leader peer")
}
region = region.Clone(core.WithRemoveStorePeer(s.FromStore))
case AddLearner:
if region.GetStorePeer(s.ToStore) != nil {
Expand Down
39 changes: 33 additions & 6 deletions server/schedule/operator.go
Expand Up @@ -436,12 +436,8 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
// CreateAddPeerSteps creates an OperatorStep list that add a new Peer.
func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep {
var st []OperatorStep
if cluster.IsRaftLearnerEnabled() {
st = []OperatorStep{
Expand All @@ -453,6 +449,16 @@ func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInf
AddPeer{ToStore: newStore, PeerID: peerID},
}
}
return st
}

// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer.
func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) {
removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore))
if err != nil {
return nil, err
}
st := CreateAddPeerSteps(newStore, peerID, cluster)
steps = append(st, steps...)
return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind|OpRegion, steps...), nil
}
Expand Down Expand Up @@ -630,3 +636,24 @@ func getIntersectionStores(a []*metapb.Peer, b []*metapb.Peer) map[uint64]struct

return intersection
}

// CheckOperatorValid checks if the operator is valid.
func CheckOperatorValid(op *Operator) bool {
removeStores := []uint64{}
for _, step := range op.steps {
if tr, ok := step.(TransferLeader); ok {
for _, store := range removeStores {
if store == tr.FromStore {
return false
}
if store == tr.ToStore {
return false
}
}
}
if rp, ok := step.(RemovePeer); ok {
removeStores = append(removeStores, rp.FromStore)
}
}
return true
}
101 changes: 83 additions & 18 deletions server/schedule/region_scatterer.go
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/pd/server/core"
"github.com/pingcap/pd/server/namespace"
"github.com/pkg/errors"
)

type selectedStores struct {
Expand Down Expand Up @@ -79,23 +80,28 @@ func NewRegionScatterer(cluster Cluster, classifier namespace.Classifier) *Regio
}

// Scatter relocates the region.
func (r *RegionScatterer) Scatter(region *core.RegionInfo) *Operator {
func (r *RegionScatterer) Scatter(region *core.RegionInfo) (*Operator, error) {
if r.cluster.IsRegionHot(region.GetID()) {
return nil
return nil, errors.Errorf("region %d is a hot region", region.GetID())
}

if len(region.GetPeers()) != r.cluster.GetMaxReplicas() {
return nil
return nil, errors.Errorf("the number replicas of region %d is not expected", region.GetID())
}

if region.GetLeader() == nil {
return nil, errors.Errorf("region %d has no leader", region.GetID())
}

return r.scatterRegion(region)
return r.scatterRegion(region), nil
}

func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {
steps := make([]OperatorStep, 0, len(region.GetPeers()))

stores := r.collectAvailableStores(region)
var kind OperatorKind
var (
targetPeers []*metapb.Peer
replacedPeers []*metapb.Peer
)
for _, peer := range region.GetPeers() {
if len(stores) == 0 {
// Reset selected stores if we have no available stores.
Expand All @@ -105,31 +111,90 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo) *Operator {

if r.selected.put(peer.GetStoreId()) {
delete(stores, peer.GetStoreId())
targetPeers = append(targetPeers, peer)
replacedPeers = append(replacedPeers, peer)
continue
}
newPeer := r.selectPeerToReplace(stores, region, peer)
if newPeer == nil {
targetPeers = append(targetPeers, peer)
replacedPeers = append(replacedPeers, peer)
continue
}

// Remove it from stores and mark it as selected.
delete(stores, newPeer.GetStoreId())
r.selected.put(newPeer.GetStoreId())
targetPeers = append(targetPeers, newPeer)
replacedPeers = append(replacedPeers, peer)
}
return r.createOperator(region, replacedPeers, targetPeers)
}

op, err := CreateMovePeerOperator("scatter-peer", r.cluster, region, OpAdmin,
peer.GetStoreId(), newPeer.GetStoreId(), newPeer.GetId())
if err != nil {
continue
func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, targetPeers []*metapb.Peer) *Operator {
// Randomly pick a leader
i := rand.Intn(len(targetPeers))
targetLeaderPeer := targetPeers[i]

storeIDs := origin.GetStoreIds()
steps := make([]OperatorStep, 0, len(targetPeers)*2+1)
deferSteps := make([]OperatorStep, 0, 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add some comments to explain why we put these steps in the end.

var kind OperatorKind
sameLeader := targetLeaderPeer.GetStoreId() == origin.GetLeader().GetStoreId()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about using a variable for origin.GetLeader().GetStoreId()

// No need to do anything
if sameLeader {
isSame := true
for _, peer := range targetPeers {
if _, ok := storeIDs[peer.GetStoreId()]; !ok {
isSame = false
break
}
}
if isSame {
return nil
}
steps = append(steps, op.steps...)
steps = append(steps, TransferLeader{ToStore: newPeer.GetStoreId()})
kind |= op.Kind()
}

if len(steps) == 0 {
return nil
// Creates the first step
if _, ok := storeIDs[targetLeaderPeer.GetStoreId()]; !ok {
st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster)
steps = append(steps, st...)
// Do not transfer leader to the newly added peer
deferSteps = append(deferSteps, TransferLeader{FromStore: origin.GetLeader().GetStoreId(), ToStore: targetLeaderPeer.GetStoreId()})
deferSteps = append(deferSteps, RemovePeer{FromStore: replacedPeers[i].GetStoreId()})
kind |= OpLeader
kind |= OpRegion
} else {
if !sameLeader {
steps = append(steps, TransferLeader{FromStore: origin.GetLeader().GetStoreId(), ToStore: targetLeaderPeer.GetStoreId()})
kind |= OpLeader
}
}

// For the other steps
for j, peer := range targetPeers {
if peer.GetId() == targetLeaderPeer.GetId() {
continue
}
if _, ok := storeIDs[peer.GetStoreId()]; ok {
continue
}
if replacedPeers[j].GetStoreId() == origin.GetLeader().GetStoreId() {
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
deferSteps = append(deferSteps, st...)
kind |= OpRegion | OpLeader
continue
}
st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster)
steps = append(steps, st...)
steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()})
kind |= OpRegion
}
return NewOperator("scatter-region", region.GetID(), region.GetRegionEpoch(), kind, steps...)

steps = append(steps, deferSteps...)
op := NewOperator("scatter-region", origin.GetID(), origin.GetRegionEpoch(), kind, steps...)
op.SetPriorityLevel(core.HighPriority)
return op
}

func (r *RegionScatterer) selectPeerToReplace(stores map[uint64]*core.StoreInfo, region *core.RegionInfo, oldPeer *metapb.Peer) *metapb.Peer {
Expand Down
9 changes: 7 additions & 2 deletions server/schedulers/scheduler_test.go
Expand Up @@ -174,6 +174,10 @@ func (s *testScatterRegionSuite) TestFiveStores(c *C) {
s.scatter(c, 5, 5)
}

func (s *testScatterRegionSuite) checkOperator(op *schedule.Operator, c *C) {
c.Assert(schedule.CheckOperatorValid(op), IsTrue)
}

func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
opt := schedule.NewMockSchedulerOptions()
tc := schedule.NewMockCluster(opt)
Expand All @@ -184,7 +188,7 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {
}

// Add regions 1~4.
seq := newSequencer(numStores)
seq := newSequencer(3)
// Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace.
tc.AddLeaderRegion(1, 1, 2, 3)
for i := uint64(2); i <= numRegions; i++ {
Expand All @@ -195,7 +199,8 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) {

for i := uint64(1); i <= numRegions; i++ {
region := tc.GetRegion(i)
if op := scatterer.Scatter(region); op != nil {
if op, _ := scatterer.Scatter(region); op != nil {
s.checkOperator(op, c)
tc.ApplyOperator(op)
}
}
Expand Down
2 changes: 1 addition & 1 deletion tests/cmd/pdctl_test.go
Expand Up @@ -1041,7 +1041,7 @@ func (s *cmdTestSuite) TestOperator(c *C) {
args = []string{"-u", pdAddr, "operator", "show", "region"}
_, output, err = executeCommandC(cmd, args...)
c.Assert(err, IsNil)
c.Assert(strings.Contains(string(output), "transfer leader from store 0 to store 3"), IsTrue)
c.Assert(strings.Contains(string(output), "scatter-region"), IsTrue)
}

func (s *cmdTestSuite) TestMember(c *C) {
Expand Down