From 9ae18a3a2bef192d7b1853f87ce2d488897fe630 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 5 Jun 2019 12:00:09 +0800 Subject: [PATCH] add light peer without considering the influence (#1554) (#1563) Signed-off-by: Ryan Leung --- server/schedule/mockcluster.go | 29 ++++- server/schedule/operator.go | 116 ++++++++++++++++---- server/schedule/operator_controller.go | 31 ++++++ server/schedule/operator_controller_test.go | 13 +-- server/schedule/region_scatterer.go | 6 +- server/schedulers/scheduler_test.go | 28 +++++ 6 files changed, 189 insertions(+), 34 deletions(-) diff --git a/server/schedule/mockcluster.go b/server/schedule/mockcluster.go index 4bdcc337aee..25fdf7b2e93 100644 --- a/server/schedule/mockcluster.go +++ b/server/schedule/mockcluster.go @@ -27,7 +27,15 @@ import ( "go.uber.org/zap" ) -// MockCluster is used to mock clusterInfo for test use +// MockHeadbeatStream is used to mock HeadbeatStream for test use. +type MockHeadbeatStream struct{} + +// SendMsg is used to send the message. +func (m MockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { + return +} + +// MockCluster is used to mock clusterInfo for test use. type MockCluster struct { *BasicCluster *core.MockIDAllocator @@ -396,6 +404,15 @@ func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator) StoreId: s.ToStore, } region = region.Clone(core.WithAddPeer(peer)) + case AddLightPeer: + if region.GetStorePeer(s.ToStore) != nil { + panic("Add peer that exists") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + } + region = region.Clone(core.WithAddPeer(peer)) case RemovePeer: if region.GetStorePeer(s.FromStore) == nil { panic("Remove peer that doesn't exist") @@ -414,6 +431,16 @@ func (mc *MockCluster) ApplyOperatorStep(region *core.RegionInfo, op *Operator) IsLearner: true, } region = region.Clone(core.WithAddPeer(peer)) + case AddLightLearner: + if region.GetStorePeer(s.ToStore) != nil { + panic("Add learner that exists") + } + peer := &metapb.Peer{ + Id: s.PeerID, + StoreId: s.ToStore, + IsLearner: true, + } + region = region.Clone(core.WithAddPeer(peer)) case PromoteLearner: if region.GetStoreLearner(s.ToStore) == nil { panic("Promote peer that doesn't exist") diff --git a/server/schedule/operator.go b/server/schedule/operator.go index c0a7a176f82..c6c880a355e 100644 --- a/server/schedule/operator.go +++ b/server/schedule/operator.go @@ -60,7 +60,7 @@ func (tl TransferLeader) IsFinish(region *core.RegionInfo) bool { return region.GetLeader().GetStoreId() == tl.ToStore } -// Influence calculates the store difference that current step make +// Influence calculates the store difference that current step makes. func (tl TransferLeader) Influence(opInfluence OpInfluence, region *core.RegionInfo) { from := opInfluence.GetStoreInfluence(tl.FromStore) to := opInfluence.GetStoreInfluence(tl.ToStore) @@ -92,7 +92,7 @@ func (ap AddPeer) IsFinish(region *core.RegionInfo) bool { return false } -// Influence calculates the store difference that current step make +// Influence calculates the store difference that current step makes. func (ap AddPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(ap.ToStore) @@ -122,7 +122,7 @@ func (al AddLearner) IsFinish(region *core.RegionInfo) bool { return false } -// Influence calculates the store difference that current step make +// Influence calculates the store difference that current step makes. func (al AddLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { to := opInfluence.GetStoreInfluence(al.ToStore) @@ -151,7 +151,7 @@ func (pl PromoteLearner) IsFinish(region *core.RegionInfo) bool { return false } -// Influence calculates the store difference that current step make +// Influence calculates the store difference that current step makes. func (pl PromoteLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) {} // RemovePeer is an OperatorStep that removes a region peer. @@ -168,7 +168,7 @@ func (rp RemovePeer) IsFinish(region *core.RegionInfo) bool { return region.GetStorePeer(rp.FromStore) == nil } -// Influence calculates the store difference that current step make +// Influence calculates the store difference that current step makes. func (rp RemovePeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { from := opInfluence.GetStoreInfluence(rp.FromStore) @@ -183,9 +183,9 @@ type MergeRegion struct { // there are two regions involved in merge process, // so to keep them from other scheduler, // both of them should add MerRegion operatorStep. - // But actually, tikv just need the region want to be merged to get the merge request, + // But actually, TiKV just needs the region want to be merged to get the merge request, // thus use a IsPassive mark to indicate that - // this region doesn't need to send merge request to tikv. + // this region doesn't need to send merge request to TiKV. IsPassive bool } @@ -193,7 +193,7 @@ func (mr MergeRegion) String() string { return fmt.Sprintf("merge region %v into region %v", mr.FromRegion.GetId(), mr.ToRegion.GetId()) } -// IsFinish checks if current step is finished +// IsFinish checks if current step is finished. func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool { if mr.IsPassive { return !bytes.Equal(region.GetStartKey(), mr.ToRegion.StartKey) || !bytes.Equal(region.GetEndKey(), mr.ToRegion.EndKey) @@ -201,7 +201,7 @@ func (mr MergeRegion) IsFinish(region *core.RegionInfo) bool { return false } -// Influence calculates the store difference that current step make +// Influence calculates the store difference that current step makes. func (mr MergeRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) { if mr.IsPassive { for _, p := range region.GetPeers() { @@ -229,7 +229,7 @@ func (sr SplitRegion) IsFinish(region *core.RegionInfo) bool { return !bytes.Equal(region.GetStartKey(), sr.StartKey) || !bytes.Equal(region.GetEndKey(), sr.EndKey) } -// Influence calculates the store difference that current step make. +// Influence calculates the store difference that current step makes. func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo) { for _, p := range region.GetPeers() { inf := opInfluence.GetStoreInfluence(p.GetStoreId()) @@ -240,6 +240,64 @@ func (sr SplitRegion) Influence(opInfluence OpInfluence, region *core.RegionInfo } } +// AddLightPeer is an OperatorStep that adds a region peer without considering the influence. +type AddLightPeer struct { + ToStore, PeerID uint64 +} + +func (ap AddLightPeer) String() string { + return fmt.Sprintf("add peer %v on store %v", ap.PeerID, ap.ToStore) +} + +// IsFinish checks if current step is finished. +func (ap AddLightPeer) IsFinish(region *core.RegionInfo) bool { + if p := region.GetStoreVoter(ap.ToStore); p != nil { + if p.GetId() != ap.PeerID { + log.Warn("obtain unexpected peer", zap.String("expect", ap.String()), zap.Uint64("obtain-voter", p.GetId())) + return false + } + return region.GetPendingVoter(p.GetId()) == nil + } + return false +} + +// Influence calculates the store difference that current step makes. +func (ap AddLightPeer) Influence(opInfluence OpInfluence, region *core.RegionInfo) { + to := opInfluence.GetStoreInfluence(ap.ToStore) + + to.RegionSize += region.GetApproximateSize() + to.RegionCount++ +} + +// AddLightLearner is an OperatorStep that adds a region learner peer without considering the influence. +type AddLightLearner struct { + ToStore, PeerID uint64 +} + +func (al AddLightLearner) String() string { + return fmt.Sprintf("add learner peer %v on store %v", al.PeerID, al.ToStore) +} + +// IsFinish checks if current step is finished. +func (al AddLightLearner) IsFinish(region *core.RegionInfo) bool { + if p := region.GetStoreLearner(al.ToStore); p != nil { + if p.GetId() != al.PeerID { + log.Warn("obtain unexpected peer", zap.String("expect", al.String()), zap.Uint64("obtain-learner", p.GetId())) + return false + } + return region.GetPendingLearner(p.GetId()) == nil + } + return false +} + +// Influence calculates the store difference that current step makes. +func (al AddLightLearner) Influence(opInfluence OpInfluence, region *core.RegionInfo) { + to := opInfluence.GetStoreInfluence(al.ToStore) + + to.RegionSize += region.GetApproximateSize() + to.RegionCount++ +} + // Operator contains execution steps generated by scheduler. type Operator struct { desc string @@ -278,7 +336,7 @@ func (o *Operator) String() string { return s } -// MarshalJSON serialize custom types to JSON +// MarshalJSON serializes custom types to JSON. func (o *Operator) MarshalJSON() ([]byte, error) { return []byte(`"` + o.String() + `"`), nil } @@ -347,12 +405,12 @@ func (o *Operator) Check(region *core.RegionInfo) OperatorStep { return nil } -// SetPriorityLevel set the priority level for operator +// SetPriorityLevel sets the priority level for operator. func (o *Operator) SetPriorityLevel(level core.PriorityLevel) { o.level = level } -// GetPriorityLevel get the priority level +// GetPriorityLevel gets the priority level. func (o *Operator) GetPriorityLevel() core.PriorityLevel { return o.level } @@ -379,7 +437,7 @@ func (o *Operator) IsTimeout() bool { return false } -// UnfinishedInfluence calculates the store difference which unfinished operator steps make +// UnfinishedInfluence calculates the store difference which unfinished operator steps make. func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.RegionInfo) { for step := atomic.LoadInt32(&o.currentStep); int(step) < len(o.steps); step++ { if !o.steps[int(step)].IsFinish(region) { @@ -388,7 +446,7 @@ func (o *Operator) UnfinishedInfluence(opInfluence OpInfluence, region *core.Reg } } -// TotalInfluence calculates the store difference which whole operator steps make +// TotalInfluence calculates the store difference which whole operator steps make. func (o *Operator) TotalInfluence(opInfluence OpInfluence, region *core.RegionInfo) { for step := 0; step < len(o.steps); step++ { o.steps[int(step)].Influence(opInfluence, region) @@ -418,8 +476,12 @@ func (o *Operator) History() []OperatorHistory { }) case AddPeer: addPeerStores = append(addPeerStores, s.ToStore) + case AddLightPeer: + addPeerStores = append(addPeerStores, s.ToStore) case AddLearner: addPeerStores = append(addPeerStores, s.ToStore) + case AddLightLearner: + addPeerStores = append(addPeerStores, s.ToStore) case RemovePeer: removePeerStores = append(removePeerStores, s.FromStore) } @@ -437,7 +499,7 @@ func (o *Operator) History() []OperatorHistory { return histories } -// CreateRemovePeerOperator creates an Operator that removes a peer from region. +// CreateRemovePeerOperator creates an operator that removes a peer from region. func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, region *core.RegionInfo, storeID uint64) (*Operator, error) { removeKind, steps, err := removePeerSteps(cluster, region, storeID, getRegionFollowerIDs(region)) if err != nil { @@ -446,7 +508,7 @@ func CreateRemovePeerOperator(desc string, cluster Cluster, kind OperatorKind, r return NewOperator(desc, region.GetID(), region.GetRegionEpoch(), removeKind|kind, steps...), nil } -// CreateAddPeerSteps creates an OperatorStep list that add a new Peer. +// CreateAddPeerSteps creates an OperatorStep list that add a new peer. func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep { var st []OperatorStep if cluster.IsRaftLearnerEnabled() { @@ -462,7 +524,23 @@ func CreateAddPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []Opera return st } -// CreateMovePeerOperator creates an Operator that replaces an old peer with a new peer. +// CreateAddLightPeerSteps creates an OperatorStep list that add a new peer without considering the influence. +func CreateAddLightPeerSteps(newStore uint64, peerID uint64, cluster Cluster) []OperatorStep { + var st []OperatorStep + if cluster.IsRaftLearnerEnabled() { + st = []OperatorStep{ + AddLightLearner{ToStore: newStore, PeerID: peerID}, + PromoteLearner{ToStore: newStore, PeerID: peerID}, + } + } else { + st = []OperatorStep{ + AddLightPeer{ToStore: newStore, PeerID: peerID}, + } + } + return st +} + +// CreateMovePeerOperator creates an operator that replaces an old peer with a new peer. func CreateMovePeerOperator(desc string, cluster Cluster, region *core.RegionInfo, kind OperatorKind, oldStore, newStore uint64, peerID uint64) (*Operator, error) { removeKind, steps, err := removePeerSteps(cluster, region, oldStore, append(getRegionFollowerIDs(region), newStore)) if err != nil { @@ -503,7 +581,7 @@ func removePeerSteps(cluster Cluster, region *core.RegionInfo, storeID uint64, f return } -// CreateMergeRegionOperator creates an Operator that merge two region into one +// CreateMergeRegionOperator creates an operator that merge two region into one. func CreateMergeRegionOperator(desc string, cluster Cluster, source *core.RegionInfo, target *core.RegionInfo, kind OperatorKind) ([]*Operator, error) { steps, kinds, err := matchPeerSteps(cluster, source, target) if err != nil { diff --git a/server/schedule/operator_controller.go b/server/schedule/operator_controller.go index 2390cc0df43..9e11924daef 100644 --- a/server/schedule/operator_controller.go +++ b/server/schedule/operator_controller.go @@ -338,6 +338,21 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step }, } oc.hbStreams.SendMsg(region, cmd) + case AddLightPeer: + if region.GetStorePeer(st.ToStore) != nil { + // The newly added peer is pending. + return + } + cmd := &pdpb.RegionHeartbeatResponse{ + ChangePeer: &pdpb.ChangePeer{ + ChangeType: eraftpb.ConfChangeType_AddNode, + Peer: &metapb.Peer{ + Id: st.PeerID, + StoreId: st.ToStore, + }, + }, + } + oc.hbStreams.SendMsg(region, cmd) case AddLearner: if region.GetStorePeer(st.ToStore) != nil { // The newly added peer is pending. @@ -354,6 +369,22 @@ func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step }, } oc.hbStreams.SendMsg(region, cmd) + case AddLightLearner: + if region.GetStorePeer(st.ToStore) != nil { + // The newly added peer is pending. + return + } + cmd := &pdpb.RegionHeartbeatResponse{ + ChangePeer: &pdpb.ChangePeer{ + ChangeType: eraftpb.ConfChangeType_AddLearnerNode, + Peer: &metapb.Peer{ + Id: st.PeerID, + StoreId: st.ToStore, + IsLearner: true, + }, + }, + } + oc.hbStreams.SendMsg(region, cmd) case PromoteLearner: cmd := &pdpb.RegionHeartbeatResponse{ ChangePeer: &pdpb.ChangePeer{ diff --git a/server/schedule/operator_controller_test.go b/server/schedule/operator_controller_test.go index 4eb3ad378cd..e6fa52211f0 100644 --- a/server/schedule/operator_controller_test.go +++ b/server/schedule/operator_controller_test.go @@ -20,7 +20,6 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" - "github.com/pingcap/pd/server/core" ) var _ = Suite(&testOperatorControllerSuite{}) @@ -56,17 +55,10 @@ func (t *testOperatorControllerSuite) TestGetOpInfluence(c *C) { c.Assert(oc.GetOperator(2), NotNil) } -type mockHeadbeatStream struct{} - -func (m mockHeadbeatStream) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) { - return -} - func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { opt := NewMockSchedulerOptions() tc := NewMockCluster(opt) - oc := NewOperatorController(tc, nil) - oc.hbStreams = mockHeadbeatStream{} + oc := NewOperatorController(tc, MockHeadbeatStream{}) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderRegion(1, 1, 2) @@ -98,8 +90,7 @@ func (t *testOperatorControllerSuite) TestOperatorStatus(c *C) { func (t *testOperatorControllerSuite) TestPollDispatchRegion(c *C) { opt := NewMockSchedulerOptions() tc := NewMockCluster(opt) - oc := NewOperatorController(tc, nil) - oc.hbStreams = mockHeadbeatStream{} + oc := NewOperatorController(tc, MockHeadbeatStream{}) tc.AddLeaderStore(1, 2) tc.AddLeaderStore(2, 0) tc.AddLeaderRegion(1, 1, 2) diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 83b57fe0ec3..815e9076012 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -160,7 +160,7 @@ func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, // Creates the first step if _, ok := originStoreIDs[targetLeaderPeer.GetStoreId()]; !ok { - st := CreateAddPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster) + st := CreateAddLightPeerSteps(targetLeaderPeer.GetStoreId(), targetLeaderPeer.GetId(), r.cluster) steps = append(steps, st...) // Do not transfer leader to the newly added peer // Ref: https://github.com/tikv/tikv/issues/3819 @@ -184,13 +184,13 @@ func (r *RegionScatterer) createOperator(origin *core.RegionInfo, replacedPeers, continue } if replacedPeers[j].GetStoreId() == originLeaderStoreID { - st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) + st := CreateAddLightPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) st = append(st, RemovePeer{FromStore: replacedPeers[j].GetStoreId()}) deferSteps = append(deferSteps, st...) kind |= OpRegion | OpLeader continue } - st := CreateAddPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) + st := CreateAddLightPeerSteps(peer.GetStoreId(), peer.GetId(), r.cluster) steps = append(steps, st...) steps = append(steps, RemovePeer{FromStore: replacedPeers[j].GetStoreId()}) kind |= OpRegion diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index c398ea0455b..4638ab5b494 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -219,6 +219,34 @@ func (s *testScatterRegionSuite) scatter(c *C, numStores, numRegions uint64) { } } +func (s *testScatterRegionSuite) TestStorelimit(c *C) { + opt := schedule.NewMockSchedulerOptions() + tc := schedule.NewMockCluster(opt) + oc := schedule.NewOperatorController(tc, schedule.MockHeadbeatStream{}) + + // Add stores 1~6. + for i := uint64(1); i <= 5; i++ { + tc.AddRegionStore(i, 0) + } + + // Add regions 1~4. + seq := newSequencer(3) + // Region 1 has the same distribution with the Region 2, which is used to test selectPeerToReplace. + tc.AddLeaderRegion(1, 1, 2, 3) + for i := uint64(2); i <= 5; i++ { + tc.AddLeaderRegion(i, seq.next(), seq.next(), seq.next()) + } + + scatterer := schedule.NewRegionScatterer(tc, namespace.DefaultClassifier) + + for i := uint64(1); i <= 5; i++ { + region := tc.GetRegion(i) + if op, _ := scatterer.Scatter(region); op != nil { + c.Assert(oc.AddOperator(op), IsTrue) + } + } +} + var _ = Suite(&testRejectLeaderSuite{}) type testRejectLeaderSuite struct{}