diff --git a/server/cluster.go b/server/cluster.go index 0a52ee99f28..119473db7a0 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -628,23 +628,23 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo { } // RandLeaderRegion returns a random region that has leader on the store. -func (c *RaftCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { - return c.core.RandLeaderRegion(storeID, opts...) +func (c *RaftCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo { + return c.core.RandLeaderRegion(storeID, ranges, opts...) } // RandFollowerRegion returns a random region that has a follower on the store. -func (c *RaftCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { - return c.core.RandFollowerRegion(storeID, opts...) +func (c *RaftCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo { + return c.core.RandFollowerRegion(storeID, ranges, opts...) } // RandPendingRegion returns a random region that has a pending peer on the store. -func (c *RaftCluster) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { - return c.core.RandPendingRegion(storeID, opts...) +func (c *RaftCluster) RandPendingRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo { + return c.core.RandPendingRegion(storeID, ranges, opts...) } // RandLearnerRegion returns a random region that has a learner peer on the store. -func (c *RaftCluster) RandLearnerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { - return c.core.RandLearnerRegion(storeID, opts...) +func (c *RaftCluster) RandLearnerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo { + return c.core.RandLearnerRegion(storeID, ranges, opts...) } // RandHotRegionFromStore randomly picks a hot region in specified store. diff --git a/server/cluster_test.go b/server/cluster_test.go index fe1f32bff6d..991d6e8bdda 100644 --- a/server/cluster_test.go +++ b/server/cluster_test.go @@ -840,10 +840,10 @@ func (s *testRegionsInfoSuite) Test(c *C) { } for i := uint64(0); i < n; i++ { - region := cluster.RandLeaderRegion(i, core.HealthRegion()) + region := cache.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}) c.Assert(region.GetLeader().GetStoreId(), Equals, i) - region = cluster.RandFollowerRegion(i, core.HealthRegion()) + region = cache.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}) c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i) c.Assert(region.GetStorePeer(i), NotNil) @@ -859,14 +859,14 @@ func (s *testRegionsInfoSuite) Test(c *C) { // All regions will be filtered out if they have pending peers. for i := uint64(0); i < n; i++ { for j := 0; j < cache.GetStoreLeaderCount(i); j++ { - region := cluster.RandLeaderRegion(i, core.HealthRegion()) + region := cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()) newRegion := region.Clone(core.WithPendingPeers(region.GetPeers())) cache.SetRegion(newRegion) } - c.Assert(cluster.RandLeaderRegion(i, core.HealthRegion()), IsNil) + c.Assert(cluster.RandLeaderRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil) } for i := uint64(0); i < n; i++ { - c.Assert(cluster.RandFollowerRegion(i, core.HealthRegion()), IsNil) + c.Assert(cluster.RandFollowerRegion(i, []core.KeyRange{core.NewKeyRange("", "")}, core.HealthRegion()), IsNil) } } diff --git a/server/coordinator_test.go b/server/coordinator_test.go index 8b6a3dbac31..021972983c1 100644 --- a/server/coordinator_test.go +++ b/server/coordinator_test.go @@ -715,13 +715,14 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { co.run() storage = tc.RaftCluster.storage c.Assert(co.schedulers, HasLen, 3) - bls, err := schedule.CreateScheduler("balance-leader", oc, storage, nil) + bls, err := schedule.CreateScheduler("balance-leader", oc, storage, schedule.ConfigSliceDecoder("balance-leader", []string{"", ""})) c.Assert(err, IsNil) c.Assert(co.addScheduler(bls), IsNil) - brs, err := schedule.CreateScheduler("balance-region", oc, storage, nil) + brs, err := schedule.CreateScheduler("balance-region", oc, storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) c.Assert(co.addScheduler(brs), IsNil) c.Assert(co.schedulers, HasLen, 5) + // the scheduler option should contain 7 items // the `hot scheduler` and `label scheduler` are disabled c.Assert(co.cluster.opt.GetSchedulers(), HasLen, 7) @@ -732,7 +733,6 @@ func (s *testCoordinatorSuite) TestPersistScheduler(c *C) { c.Assert(co.cluster.opt.Persist(co.cluster.storage), IsNil) co.stop() co.wg.Wait() - _, newOpt, err = newTestScheduleConfig() c.Assert(err, IsNil) c.Assert(newOpt.Reload(co.cluster.storage), IsNil) @@ -931,7 +931,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloaded(c *C) { }, nil, nil, c) defer cleanup() oc := co.opController - lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil) + lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) c.Assert(tc.addRegionStore(4, 100), IsNil) c.Assert(tc.addRegionStore(3, 100), IsNil) @@ -971,7 +971,7 @@ func (s *testOperatorControllerSuite) TestStoreOverloadedWithReplace(c *C) { }, nil, nil, c) defer cleanup() oc := co.opController - lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, nil) + lb, err := schedule.CreateScheduler("balance-region", oc, tc.storage, schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) c.Assert(tc.addRegionStore(4, 100), IsNil) @@ -1031,7 +1031,7 @@ func (s *testScheduleControllerSuite) TestController(c *C) { c.Assert(tc.addLeaderRegion(1, 1), IsNil) c.Assert(tc.addLeaderRegion(2, 2), IsNil) - scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), nil) + scheduler, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""})) c.Assert(err, IsNil) lb := &mockLimitScheduler{ Scheduler: scheduler, @@ -1101,7 +1101,7 @@ func (s *testScheduleControllerSuite) TestInterval(c *C) { _, co, cleanup := prepare(nil, nil, nil, c) defer cleanup() - lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), nil) + lb, err := schedule.CreateScheduler("balance-leader", co.opController, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""})) c.Assert(err, IsNil) sc := newScheduleController(co, lb) diff --git a/server/core/basic_cluster.go b/server/core/basic_cluster.go index 9bb99409427..3730ff5b85c 100644 --- a/server/core/basic_cluster.go +++ b/server/core/basic_cluster.go @@ -155,39 +155,42 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regio const randomRegionMaxRetry = 10 // RandFollowerRegion returns a random region that has a follower on the store. -func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo { +func (bc *BasicCluster) RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { bc.RLock() - regions := bc.Regions.RandFollowerRegions(storeID, randomRegionMaxRetry) + regions := bc.Regions.RandFollowerRegions(storeID, ranges, randomRegionMaxRetry) bc.RUnlock() return bc.selectRegion(regions, opts...) } // RandLeaderRegion returns a random region that has leader on the store. -func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo { +func (bc *BasicCluster) RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { bc.RLock() - regions := bc.Regions.RandLeaderRegions(storeID, randomRegionMaxRetry) + regions := bc.Regions.RandLeaderRegions(storeID, ranges, randomRegionMaxRetry) bc.RUnlock() return bc.selectRegion(regions, opts...) } // RandPendingRegion returns a random region that has a pending peer on the store. -func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo { +func (bc *BasicCluster) RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { bc.RLock() - regions := bc.Regions.RandPendingRegions(storeID, randomRegionMaxRetry) + regions := bc.Regions.RandPendingRegions(storeID, ranges, randomRegionMaxRetry) bc.RUnlock() return bc.selectRegion(regions, opts...) } // RandLearnerRegion returns a random region that has a learner peer on the store. -func (bc *BasicCluster) RandLearnerRegion(storeID uint64, opts ...RegionOption) *RegionInfo { +func (bc *BasicCluster) RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo { bc.RLock() - regions := bc.Regions.RandLearnerRegions(storeID, randomRegionMaxRetry) + regions := bc.Regions.RandLearnerRegions(storeID, ranges, randomRegionMaxRetry) bc.RUnlock() return bc.selectRegion(regions, opts...) } func (bc *BasicCluster) selectRegion(regions []*RegionInfo, opts ...RegionOption) *RegionInfo { for _, r := range regions { + if r == nil { + break + } if slice.AllOf(opts, func(i int) bool { return opts[i](r) }) { return r } @@ -331,10 +334,10 @@ func (bc *BasicCluster) Length() int { // RegionSetInformer provides access to a shared informer of regions. type RegionSetInformer interface { - RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo - RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo - RandLearnerRegion(storeID uint64, opts ...RegionOption) *RegionInfo - RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo + RandFollowerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo + RandLeaderRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo + RandLearnerRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo + RandPendingRegion(storeID uint64, ranges []KeyRange, opts ...RegionOption) *RegionInfo GetAverageRegionSize() int64 GetStoreRegionCount(storeID uint64) int GetRegion(id uint64) *RegionInfo @@ -359,3 +362,17 @@ type StoreSetController interface { AttachAvailableFunc(id uint64, f func() bool) } + +// KeyRange is a key range. +type KeyRange struct { + StartKey []byte `json:"start-key"` + EndKey []byte `json:"end-key"` +} + +// NewKeyRange create a KeyRange with the given start key and end key. +func NewKeyRange(startKey, endKey string) KeyRange { + return KeyRange{ + StartKey: []byte(startKey), + EndKey: []byte(endKey), + } +} diff --git a/server/core/region.go b/server/core/region.go index 4a69e12ce97..2b726d05b42 100644 --- a/server/core/region.go +++ b/server/core/region.go @@ -17,6 +17,7 @@ import ( "bytes" "encoding/hex" "fmt" + "math/rand" "reflect" "strings" @@ -507,9 +508,14 @@ func (rst *regionSubTree) RandomRegions(n int, startKey, endKey []byte) []*Regio if rst.length() == 0 { return nil } + regions := make([]*RegionInfo, 0, n) for i := 0; i < n; i++ { - regions = append(regions, rst.regionTree.RandomRegion(startKey, endKey)) + region := rst.regionTree.RandomRegion(startKey, endKey) + if region == nil || !isInvolved(region, startKey, endKey) { + continue + } + regions = append(regions, region) } return regions } @@ -742,43 +748,57 @@ func (r *RegionsInfo) GetStoreLearnerCount(storeID uint64) int { } // RandPendingRegion randomly gets a store's region with a pending peer. -func (r *RegionsInfo) RandPendingRegion(storeID uint64) *RegionInfo { - return r.pendingPeers[storeID].RandomRegion(nil, nil) +func (r *RegionsInfo) RandPendingRegion(storeID uint64, ranges []KeyRange) *RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.pendingPeers[storeID].RandomRegion(startKey, endKey) } // RandPendingRegions randomly gets a store's n regions with a pending peer. -func (r *RegionsInfo) RandPendingRegions(storeID uint64, n int) []*RegionInfo { - return r.pendingPeers[storeID].RandomRegions(n, nil, nil) +func (r *RegionsInfo) RandPendingRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.pendingPeers[storeID].RandomRegions(n, startKey, endKey) } // RandLeaderRegion randomly gets a store's leader region. -func (r *RegionsInfo) RandLeaderRegion(storeID uint64) *RegionInfo { - return r.leaders[storeID].RandomRegion(nil, nil) +func (r *RegionsInfo) RandLeaderRegion(storeID uint64, ranges []KeyRange) *RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.leaders[storeID].RandomRegion(startKey, endKey) } // RandLeaderRegions randomly gets a store's n leader regions. -func (r *RegionsInfo) RandLeaderRegions(storeID uint64, n int) []*RegionInfo { - return r.leaders[storeID].RandomRegions(n, nil, nil) +func (r *RegionsInfo) RandLeaderRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.leaders[storeID].RandomRegions(n, startKey, endKey) } // RandFollowerRegion randomly gets a store's follower region. -func (r *RegionsInfo) RandFollowerRegion(storeID uint64) *RegionInfo { - return r.followers[storeID].RandomRegion(nil, nil) +func (r *RegionsInfo) RandFollowerRegion(storeID uint64, ranges []KeyRange) *RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.followers[storeID].RandomRegion(startKey, endKey) } // RandFollowerRegions randomly gets a store's n follower regions. -func (r *RegionsInfo) RandFollowerRegions(storeID uint64, n int) []*RegionInfo { - return r.followers[storeID].RandomRegions(n, nil, nil) +func (r *RegionsInfo) RandFollowerRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.followers[storeID].RandomRegions(n, startKey, endKey) } // RandLearnerRegion randomly gets a store's learner region. -func (r *RegionsInfo) RandLearnerRegion(storeID uint64) *RegionInfo { - return r.learners[storeID].RandomRegion(nil, nil) +func (r *RegionsInfo) RandLearnerRegion(storeID uint64, ranges []KeyRange) *RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.learners[storeID].RandomRegion(startKey, endKey) } // RandLearnerRegions randomly gets a store's n learner regions. -func (r *RegionsInfo) RandLearnerRegions(storeID uint64, n int) []*RegionInfo { - return r.learners[storeID].RandomRegions(n, nil, nil) +func (r *RegionsInfo) RandLearnerRegions(storeID uint64, ranges []KeyRange, n int) []*RegionInfo { + startKey, endKey := r.GetKeys(ranges) + return r.learners[storeID].RandomRegions(n, startKey, endKey) +} + +// GetKeys gets the start key and end key from random key range. +func (r *RegionsInfo) GetKeys(ranges []KeyRange) ([]byte, []byte) { + idx := rand.Intn(len(ranges)) + return ranges[idx].StartKey, ranges[idx].EndKey } // GetLeader return leader RegionInfo by storeID and regionID(now only used in test) @@ -883,6 +903,10 @@ func DiffRegionKeyInfo(origin *RegionInfo, other *RegionInfo) string { return strings.Join(ret, ", ") } +func isInvolved(region *RegionInfo, startKey, endKey []byte) bool { + return bytes.Compare(region.GetStartKey(), startKey) >= 0 && (len(endKey) == 0 || (len(region.GetEndKey()) > 0 && bytes.Compare(region.GetEndKey(), endKey) <= 0)) +} + // HexRegionKey converts region key to hex format. Used for formating region in // logs. func HexRegionKey(key []byte) []byte { diff --git a/server/core/region_test.go b/server/core/region_test.go index 47497aad9ab..0bf52d5685e 100644 --- a/server/core/region_test.go +++ b/server/core/region_test.go @@ -145,7 +145,7 @@ func BenchmarkRandomRegion(b *testing.B) { } b.ResetTimer() for i := 0; i < b.N; i++ { - regions.RandLeaderRegion(1) + regions.RandLeaderRegion(1, []KeyRange{NewKeyRange("", "")}) } } diff --git a/server/schedule/range_cluster.go b/server/schedule/range_cluster.go index 77f7d879a0f..37a6f474268 100644 --- a/server/schedule/range_cluster.go +++ b/server/schedule/range_cluster.go @@ -102,13 +102,13 @@ func (r *RangeCluster) GetTolerantSizeRatio() float64 { } // RandFollowerRegion returns a random region that has a follower on the store. -func (r *RangeCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { - return r.subCluster.RandFollowerRegion(storeID, opts...) +func (r *RangeCluster) RandFollowerRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo { + return r.subCluster.RandFollowerRegion(storeID, ranges, opts...) } // RandLeaderRegion returns a random region that has leader on the store. -func (r *RangeCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo { - return r.subCluster.RandLeaderRegion(storeID, opts...) +func (r *RangeCluster) RandLeaderRegion(storeID uint64, ranges []core.KeyRange, opts ...core.RegionOption) *core.RegionInfo { + return r.subCluster.RandLeaderRegion(storeID, ranges, opts...) } // GetAverageRegionSize returns the average region approximate size. diff --git a/server/schedulers/adjacent_region.go b/server/schedulers/adjacent_region.go index fa6083c85c9..6f5654f5e6f 100644 --- a/server/schedulers/adjacent_region.go +++ b/server/schedulers/adjacent_region.go @@ -60,6 +60,7 @@ func init() { } conf.LeaderLimit = defaultAdjacentLeaderLimit conf.PeerLimit = defaultAdjacentPeerLimit + conf.Name = balanceAdjacentRegionName return nil } }) @@ -77,6 +78,7 @@ func init() { } type balanceAdjacentRegionConfig struct { + Name string `json:"name"` LeaderLimit uint64 `json:"leader-limit"` PeerLimit uint64 `json:"peer-limit"` } @@ -128,7 +130,7 @@ func newBalanceAdjacentRegionScheduler(opController *schedule.OperatorController } func (l *balanceAdjacentRegionScheduler) GetName() string { - return balanceAdjacentRegionName + return l.conf.Name } func (l *balanceAdjacentRegionScheduler) GetType() string { diff --git a/server/schedulers/balance_leader.go b/server/schedulers/balance_leader.go index a674e1ce273..e16ad7c053d 100644 --- a/server/schedulers/balance_leader.go +++ b/server/schedulers/balance_leader.go @@ -23,28 +23,49 @@ import ( "github.com/pingcap/pd/server/schedule/filter" "github.com/pingcap/pd/server/schedule/operator" "github.com/pingcap/pd/server/schedule/opt" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) +const ( + balanceLeaderName = "balance-leader-scheduler" + // balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store. + balanceLeaderRetryLimit = 10 +) + func init() { schedule.RegisterSliceDecoderBuilder("balance-leader", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { + conf, ok := v.(*balanceLeaderSchedulerConfig) + if !ok { + return ErrScheduleConfigNotExist + } + ranges, err := getKeyRanges(args) + if err != nil { + return errors.WithStack(err) + } + conf.Ranges = ranges + conf.Name = balanceLeaderName return nil } }) - schedule.RegisterScheduler("balance-leader", func(opController *schedule.OperatorController, storage *core.Storage, mapper schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newBalanceLeaderScheduler(opController), nil + schedule.RegisterScheduler("balance-leader", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &balanceLeaderSchedulerConfig{} + decoder(conf) + return newBalanceLeaderScheduler(opController, conf), nil }) } -// balanceLeaderRetryLimit is the limit to retry schedule for selected source store and target store. -const balanceLeaderRetryLimit = 10 +type balanceLeaderSchedulerConfig struct { + Name string `json:"name"` + Ranges []core.KeyRange `json:"ranges"` +} type balanceLeaderScheduler struct { *baseScheduler - name string + conf *balanceLeaderSchedulerConfig opController *schedule.OperatorController filters []filter.Filter counter *prometheus.CounterVec @@ -52,11 +73,12 @@ type balanceLeaderScheduler struct { // newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on // each store balanced. -func newBalanceLeaderScheduler(opController *schedule.OperatorController, opts ...BalanceLeaderCreateOption) schedule.Scheduler { +func newBalanceLeaderScheduler(opController *schedule.OperatorController, conf *balanceLeaderSchedulerConfig, opts ...BalanceLeaderCreateOption) schedule.Scheduler { base := newBaseScheduler(opController) s := &balanceLeaderScheduler{ baseScheduler: base, + conf: conf, opController: opController, counter: balanceLeaderCounter, } @@ -80,21 +102,22 @@ func WithBalanceLeaderCounter(counter *prometheus.CounterVec) BalanceLeaderCreat // WithBalanceLeaderName sets the name for the scheduler. func WithBalanceLeaderName(name string) BalanceLeaderCreateOption { return func(s *balanceLeaderScheduler) { - s.name = name + s.conf.Name = name } } func (l *balanceLeaderScheduler) GetName() string { - if l.name != "" { - return l.name - } - return "balance-leader-scheduler" + return l.conf.Name } func (l *balanceLeaderScheduler) GetType() string { return "balance-leader" } +func (l *balanceLeaderScheduler) EncodeConfig() ([]byte, error) { + return schedule.EncodeConfig(l.conf) +} + func (l *balanceLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return l.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } @@ -154,7 +177,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera // the best follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderOut(cluster opt.Cluster, source *core.StoreInfo) []*operator.Operator { sourceID := source.GetID() - region := cluster.RandLeaderRegion(sourceID, core.HealthRegion()) + region := cluster.RandLeaderRegion(sourceID, l.conf.Ranges, core.HealthRegion()) if region == nil { log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", sourceID)) schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc() @@ -181,7 +204,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(cluster opt.Cluster, source * // the worst follower peer and transfers the leader. func (l *balanceLeaderScheduler) transferLeaderIn(cluster opt.Cluster, target *core.StoreInfo) []*operator.Operator { targetID := target.GetID() - region := cluster.RandFollowerRegion(targetID, core.HealthRegion()) + region := cluster.RandFollowerRegion(targetID, l.conf.Ranges, core.HealthRegion()) if region == nil { log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", targetID)) schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc() diff --git a/server/schedulers/balance_region.go b/server/schedulers/balance_region.go index 879546cae3b..6622c1970e1 100644 --- a/server/schedulers/balance_region.go +++ b/server/schedulers/balance_region.go @@ -25,6 +25,7 @@ import ( "github.com/pingcap/pd/server/schedule/filter" "github.com/pingcap/pd/server/schedule/operator" "github.com/pingcap/pd/server/schedule/opt" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" ) @@ -32,11 +33,23 @@ import ( func init() { schedule.RegisterSliceDecoderBuilder("balance-region", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { + conf, ok := v.(*balanceRegionSchedulerConfig) + if !ok { + return ErrScheduleConfigNotExist + } + ranges, err := getKeyRanges(args) + if err != nil { + return errors.WithStack(err) + } + conf.Ranges = ranges + conf.Name = balanceRegionName return nil } }) schedule.RegisterScheduler("balance-region", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newBalanceRegionScheduler(opController), nil + conf := &balanceRegionSchedulerConfig{} + decoder(conf) + return newBalanceRegionScheduler(opController, conf), nil }) } @@ -46,9 +59,14 @@ const ( balanceRegionName = "balance-region-scheduler" ) +type balanceRegionSchedulerConfig struct { + Name string `json:"name"` + Ranges []core.KeyRange `json:"ranges"` +} + type balanceRegionScheduler struct { *baseScheduler - name string + conf *balanceRegionSchedulerConfig opController *schedule.OperatorController filters []filter.Filter counter *prometheus.CounterVec @@ -56,10 +74,11 @@ type balanceRegionScheduler struct { // newBalanceRegionScheduler creates a scheduler that tends to keep regions on // each store balanced. -func newBalanceRegionScheduler(opController *schedule.OperatorController, opts ...BalanceRegionCreateOption) schedule.Scheduler { +func newBalanceRegionScheduler(opController *schedule.OperatorController, conf *balanceRegionSchedulerConfig, opts ...BalanceRegionCreateOption) schedule.Scheduler { base := newBaseScheduler(opController) s := &balanceRegionScheduler{ baseScheduler: base, + conf: conf, opController: opController, counter: balanceRegionCounter, } @@ -83,21 +102,22 @@ func WithBalanceRegionCounter(counter *prometheus.CounterVec) BalanceRegionCreat // WithBalanceRegionName sets the name for the scheduler. func WithBalanceRegionName(name string) BalanceRegionCreateOption { return func(s *balanceRegionScheduler) { - s.name = name + s.conf.Name = name } } func (s *balanceRegionScheduler) GetName() string { - if s.name != "" { - return s.name - } - return balanceRegionName + return s.conf.Name } func (s *balanceRegionScheduler) GetType() string { return "balance-region" } +func (s *balanceRegionScheduler) EncodeConfig() ([]byte, error) { + return schedule.EncodeConfig(s.conf) +} + func (s *balanceRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() } @@ -115,14 +135,14 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera for i := 0; i < balanceRegionRetryLimit; i++ { // Priority picks the region that has a pending peer. // Pending region may means the disk is overload, remove the pending region firstly. - region := cluster.RandPendingRegion(sourceID, core.HealthRegionAllowPending()) + region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, core.HealthRegionAllowPending()) if region == nil { // Then picks the region that has a follower in the source store. - region = cluster.RandFollowerRegion(sourceID, core.HealthRegion()) + region = cluster.RandFollowerRegion(sourceID, s.conf.Ranges, core.HealthRegion()) } if region == nil { // Last, picks the region has the leader in the source store. - region = cluster.RandLeaderRegion(sourceID, core.HealthRegion()) + region = cluster.RandLeaderRegion(sourceID, s.conf.Ranges, core.HealthRegion()) } if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() @@ -166,7 +186,7 @@ func (s *balanceRegionScheduler) transferPeer(cluster opt.Cluster, region *core. scoreGuard := filter.NewDistinctScoreFilter(s.GetName(), cluster.GetLocationLabels(), stores, source) checker := checker.NewReplicaChecker(cluster, s.GetName()) exclude := make(map[uint64]struct{}) - excludeFilter := filter.NewExcludedFilter(s.name, nil, exclude) + excludeFilter := filter.NewExcludedFilter(s.GetName(), nil, exclude) for { storeID, _ := checker.SelectBestReplacementStore(region, oldPeer, scoreGuard, excludeFilter) if storeID == 0 { diff --git a/server/schedulers/balance_test.go b/server/schedulers/balance_test.go index 58a21ffe187..9754974f683 100644 --- a/server/schedulers/balance_test.go +++ b/server/schedulers/balance_test.go @@ -165,7 +165,7 @@ func (s *testBalanceLeaderSchedulerSuite) SetUpTest(c *C) { opt := mockoption.NewScheduleOptions() s.tc = mockcluster.NewCluster(opt) s.oc = schedule.NewOperatorController(s.ctx, nil, nil) - lb, err := schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), nil) + lb, err := schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""})) c.Assert(err, IsNil) s.lb = lb } @@ -213,10 +213,10 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceLimit(c *C) { } func (s *testBalanceLeaderSchedulerSuite) TestBalanceLeaderScheduleStrategy(c *C) { - // Stores: 1 2 3 4 - // Leader Count: 10 10 10 10 - // Leader Size : 10000 100 100 100 - // Region1: L F F F + // Stores: 1 2 3 4 + // Leader Count: 10 10 10 10 + // Leader Size : 10000 100 100 100 + // Region1: L F F F s.tc.AddLeaderStore(1, 10, 10000) s.tc.AddLeaderStore(2, 10, 100) s.tc.AddLeaderStore(3, 10, 100) @@ -251,10 +251,10 @@ func (s *testBalanceSpeedSuite) TestTolerantRatio(c *C) { func (s *testBalanceLeaderSchedulerSuite) TestBalanceLeaderTolerantRatio(c *C) { // default leader tolerant ratio is 5, when schedule by count - // Stores: 1 2 3 4 - // Leader Count: 14->15 10 10 10 - // Leader Size : 100 100 100 100 - // Region1: L F F F + // Stores: 1 2 3 4 + // Leader Count: 14->15 10 10 10 + // Leader Size : 100 100 100 100 + // Region1: L F F F s.tc.AddLeaderStore(1, 14, 100) s.tc.AddLeaderStore(2, 10, 100) s.tc.AddLeaderStore(3, 10, 100) @@ -335,10 +335,10 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceFilter(c *C) { } func (s *testBalanceLeaderSchedulerSuite) TestLeaderWeight(c *C) { - // Stores: 1 2 3 4 + // Stores: 1 2 3 4 // Leaders: 10 10 10 10 - // Weight: 0.5 0.9 1 2 - // Region1: L F F F + // Weight: 0.5 0.9 1 2 + // Region1: L F F F s.tc.AddLeaderStore(1, 10) s.tc.AddLeaderStore(2, 10) @@ -414,6 +414,97 @@ func (s *testBalanceLeaderSchedulerSuite) TestBalanceSelector(c *C) { testutil.CheckTransferLeader(c, s.schedule()[0], operator.OpBalance, 4, 3) } +var _ = Suite(&testBalanceLeaderRangeSchedulerSuite{}) + +type testBalanceLeaderRangeSchedulerSuite struct { + ctx context.Context + cancel context.CancelFunc + tc *mockcluster.Cluster + lb schedule.Scheduler + oc *schedule.OperatorController +} + +func (s *testBalanceLeaderRangeSchedulerSuite) SetUpTest(c *C) { + s.ctx, s.cancel = context.WithCancel(context.Background()) + opt := mockoption.NewScheduleOptions() + s.tc = mockcluster.NewCluster(opt) + s.oc = schedule.NewOperatorController(s.ctx, nil, nil) +} + +func (s *testBalanceLeaderRangeSchedulerSuite) TearDownTest(c *C) { + s.cancel() +} + +func (s *testBalanceLeaderRangeSchedulerSuite) TestSingleRangeBalance(c *C) { + // Stores: 1 2 3 4 + // Leaders: 10 10 10 10 + // Weight: 0.5 0.9 1 2 + // Region1: L F F F + + s.tc.AddLeaderStore(1, 10) + s.tc.AddLeaderStore(2, 10) + s.tc.AddLeaderStore(3, 10) + s.tc.AddLeaderStore(4, 10) + s.tc.UpdateStoreLeaderWeight(1, 0.5) + s.tc.UpdateStoreLeaderWeight(2, 0.9) + s.tc.UpdateStoreLeaderWeight(3, 1) + s.tc.UpdateStoreLeaderWeight(4, 2) + s.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4) + lb, err := schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), NotNil) + lb, err = schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"h", "n"})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) + lb, err = schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"b", "f"})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) + lb, err = schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", "a"})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) + lb, err = schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"g", ""})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) + lb, err = schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", "f"})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) + lb, err = schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"b", ""})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) +} + +func (s *testBalanceLeaderRangeSchedulerSuite) TestMultiRangeBalance(c *C) { + // Stores: 1 2 3 4 + // Leaders: 10 10 10 10 + // Weight: 0.5 0.9 1 2 + // Region1: L F F F + + s.tc.AddLeaderStore(1, 10) + s.tc.AddLeaderStore(2, 10) + s.tc.AddLeaderStore(3, 10) + s.tc.AddLeaderStore(4, 10) + s.tc.UpdateStoreLeaderWeight(1, 0.5) + s.tc.UpdateStoreLeaderWeight(2, 0.9) + s.tc.UpdateStoreLeaderWeight(3, 1) + s.tc.UpdateStoreLeaderWeight(4, 2) + s.tc.AddLeaderRegionWithRange(1, "a", "g", 1, 2, 3, 4) + lb, err := schedule.CreateScheduler("balance-leader", s.oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", "g", "o", "t"})) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc)[0].RegionID(), Equals, uint64(1)) + s.tc.RemoveRegion(s.tc.GetRegion(1)) + s.tc.AddLeaderRegionWithRange(2, "p", "r", 1, 2, 3, 4) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc)[0].RegionID(), Equals, uint64(2)) + s.tc.RemoveRegion(s.tc.GetRegion(2)) + s.tc.AddLeaderRegionWithRange(3, "u", "w", 1, 2, 3, 4) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) + s.tc.RemoveRegion(s.tc.GetRegion(3)) + s.tc.AddLeaderRegionWithRange(4, "", "", 1, 2, 3, 4) + c.Assert(err, IsNil) + c.Assert(lb.Schedule(s.tc), IsNil) +} + var _ = Suite(&testBalanceRegionSchedulerSuite{}) type testBalanceRegionSchedulerSuite struct { @@ -434,7 +525,7 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance(c *C) { tc := mockcluster.NewCluster(opt) oc := schedule.NewOperatorController(s.ctx, nil, nil) - sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil) + sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) opt.SetMaxReplicas(1) @@ -469,7 +560,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas3(c *C) { newTestReplication(opt, 3, "zone", "rack", "host") - sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil) + sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) // Store 1 has the largest region score, so the balancer try to replace peer in store 1. @@ -524,7 +615,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplicas5(c *C) { newTestReplication(opt, 5, "zone", "rack", "host") - sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil) + sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) tc.AddLabelsStore(1, 4, map[string]string{"zone": "z1", "rack": "r1", "host": "h1"}) @@ -579,7 +670,7 @@ func (s *testBalanceRegionSchedulerSuite) TestBalance1(c *C) { opt.TolerantSizeRatio = 1 - sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil) + sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) tc.AddRegionStore(1, 11) @@ -614,7 +705,7 @@ func (s *testBalanceRegionSchedulerSuite) TestStoreWeight(c *C) { tc := mockcluster.NewCluster(opt) oc := schedule.NewOperatorController(s.ctx, nil, nil) - sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil) + sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) opt.SetMaxReplicas(1) @@ -641,7 +732,7 @@ func (s *testBalanceRegionSchedulerSuite) TestReplacePendingRegion(c *C) { newTestReplication(opt, 3, "zone", "rack", "host") - sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), nil) + sb, err := schedule.CreateScheduler("balance-region", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-region", []string{"", ""})) c.Assert(err, IsNil) // Store 1 has the largest region score, so the balancer try to replace peer in store 1. @@ -1000,7 +1091,7 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) { hb := mockhbstream.NewHeartbeatStreams(tc.ID) oc := schedule.NewOperatorController(ctx, tc, hb) - mb, err := schedule.CreateScheduler("random-merge", oc, core.NewStorage(kv.NewMemoryKV()), nil) + mb, err := schedule.CreateScheduler("random-merge", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("random-merge", []string{"", ""})) c.Assert(err, IsNil) tc.AddRegionStore(1, 4) diff --git a/server/schedulers/evict_leader.go b/server/schedulers/evict_leader.go index 9d6f9f2aedc..00da1e50a99 100644 --- a/server/schedulers/evict_leader.go +++ b/server/schedulers/evict_leader.go @@ -41,9 +41,14 @@ func init() { if err != nil { return errors.WithStack(err) } + ranges, err := getKeyRanges(args[1:]) + if err != nil { + return errors.WithStack(err) + } name := fmt.Sprintf("evict-leader-scheduler-%d", id) conf.StoreID = id conf.Name = name + conf.Ranges = ranges return nil } @@ -57,8 +62,9 @@ func init() { } type evictLeaderSchedulerConfig struct { - Name string `json:"name"` - StoreID uint64 `json:"store-id"` + Name string `json:"name"` + StoreID uint64 `json:"store-id"` + Ranges []core.KeyRange `json:"ranges"` } type evictLeaderScheduler struct { @@ -108,7 +114,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { func (s *evictLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - region := cluster.RandLeaderRegion(s.conf.StoreID, core.HealthRegion()) + region := cluster.RandLeaderRegion(s.conf.StoreID, s.conf.Ranges, core.HealthRegion()) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-leader").Inc() return nil diff --git a/server/schedulers/grant_leader.go b/server/schedulers/grant_leader.go index 141d8cd966e..721ea066c51 100644 --- a/server/schedulers/grant_leader.go +++ b/server/schedulers/grant_leader.go @@ -40,8 +40,13 @@ func init() { if err != nil { return errors.WithStack(err) } + ranges, err := getKeyRanges(args[1:]) + if err != nil { + return errors.WithStack(err) + } conf.StoreID = id conf.Name = fmt.Sprintf("grant-leader-scheduler-%d", id) + conf.Ranges = ranges return nil } }) @@ -54,8 +59,9 @@ func init() { } type grandLeaderConfig struct { - Name string `json:"name"` - StoreID uint64 `json:"store-id"` + Name string `json:"name"` + StoreID uint64 `json:"store-id"` + Ranges []core.KeyRange `json:"ranges"` } // grantLeaderScheduler transfers all leaders to peers in the store. @@ -100,7 +106,7 @@ func (s *grantLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { func (s *grantLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc() - region := cluster.RandFollowerRegion(s.conf.StoreID, core.HealthRegion()) + region := cluster.RandFollowerRegion(s.conf.StoreID, s.conf.Ranges, core.HealthRegion()) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc() return nil diff --git a/server/schedulers/label.go b/server/schedulers/label.go index 623946fee17..39d40b3b029 100644 --- a/server/schedulers/label.go +++ b/server/schedulers/label.go @@ -21,52 +21,74 @@ import ( "github.com/pingcap/pd/server/schedule/operator" "github.com/pingcap/pd/server/schedule/opt" "github.com/pingcap/pd/server/schedule/selector" + "github.com/pkg/errors" "go.uber.org/zap" ) +const labelSchedulerName = "label-scheduler" + func init() { schedule.RegisterSliceDecoderBuilder("label", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { + conf, ok := v.(*labelSchedulerConfig) + if !ok { + return ErrScheduleConfigNotExist + } + ranges, err := getKeyRanges(args) + if err != nil { + return errors.WithStack(err) + } + conf.Ranges = ranges + conf.Name = labelSchedulerName return nil } }) schedule.RegisterScheduler("label", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newLabelScheduler(opController), nil + conf := &labelSchedulerConfig{} + decoder(conf) + return newLabelScheduler(opController, conf), nil }) } -const labelSchedulerName = "label-scheduler" +type labelSchedulerConfig struct { + Name string `json:"name"` + Ranges []core.KeyRange `json:"ranges"` +} type labelScheduler struct { - name string *baseScheduler + conf *labelSchedulerConfig selector *selector.BalanceSelector } // LabelScheduler is mainly based on the store's label information for scheduling. // Now only used for reject leader schedule, that will move the leader out of // the store with the specific label. -func newLabelScheduler(opController *schedule.OperatorController) schedule.Scheduler { +func newLabelScheduler(opController *schedule.OperatorController, conf *labelSchedulerConfig) schedule.Scheduler { filters := []filter.Filter{ filter.StoreStateFilter{ActionScope: labelSchedulerName, TransferLeader: true}, } kind := core.NewScheduleKind(core.LeaderKind, core.ByCount) return &labelScheduler{ - name: labelSchedulerName, baseScheduler: newBaseScheduler(opController), + conf: conf, selector: selector.NewBalanceSelector(kind, filters), } } func (s *labelScheduler) GetName() string { - return s.name + return s.conf.Name } func (s *labelScheduler) GetType() string { return "label" } +func (s *labelScheduler) EncodeConfig() ([]byte, error) { + return schedule.EncodeConfig(s.conf) +} + func (s *labelScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } @@ -86,7 +108,7 @@ func (s *labelScheduler) Schedule(cluster opt.Cluster) []*operator.Operator { } log.Debug("label scheduler reject leader store list", zap.Reflect("stores", rejectLeaderStores)) for id := range rejectLeaderStores { - if region := cluster.RandLeaderRegion(id); region != nil { + if region := cluster.RandLeaderRegion(id, s.conf.Ranges); region != nil { log.Debug("label scheduler selects region to transfer leader", zap.Uint64("region-id", region.GetID())) excludeStores := make(map[uint64]struct{}) for _, p := range region.GetDownPeers() { diff --git a/server/schedulers/random_merge.go b/server/schedulers/random_merge.go index 567546ad28b..80574caa0f8 100644 --- a/server/schedulers/random_merge.go +++ b/server/schedulers/random_merge.go @@ -22,49 +22,71 @@ import ( "github.com/pingcap/pd/server/schedule/operator" "github.com/pingcap/pd/server/schedule/opt" "github.com/pingcap/pd/server/schedule/selector" + "github.com/pkg/errors" ) +const randomMergeName = "random-merge-scheduler" + func init() { schedule.RegisterSliceDecoderBuilder("random-merge", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { + conf, ok := v.(*randomMergeSchedulerConfig) + if !ok { + return ErrScheduleConfigNotExist + } + ranges, err := getKeyRanges(args) + if err != nil { + return errors.WithStack(err) + } + conf.Ranges = ranges + conf.Name = randomMergeName return nil } }) schedule.RegisterScheduler("random-merge", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newRandomMergeScheduler(opController), nil + conf := &randomMergeSchedulerConfig{} + decoder(conf) + return newRandomMergeScheduler(opController, conf), nil }) } -const randomMergeName = "random-merge-scheduler" +type randomMergeSchedulerConfig struct { + Name string `json:"name"` + Ranges []core.KeyRange `json:"ranges"` +} type randomMergeScheduler struct { - name string *baseScheduler + conf *randomMergeSchedulerConfig selector *selector.RandomSelector } // newRandomMergeScheduler creates an admin scheduler that randomly picks two adjacent regions // then merges them. -func newRandomMergeScheduler(opController *schedule.OperatorController) schedule.Scheduler { +func newRandomMergeScheduler(opController *schedule.OperatorController, conf *randomMergeSchedulerConfig) schedule.Scheduler { filters := []filter.Filter{ - filter.StoreStateFilter{ActionScope: randomMergeName, MoveRegion: true}, + filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true}, } base := newBaseScheduler(opController) return &randomMergeScheduler{ - name: randomMergeName, baseScheduler: base, + conf: conf, selector: selector.NewRandomSelector(filters), } } func (s *randomMergeScheduler) GetName() string { - return s.name + return s.conf.Name } func (s *randomMergeScheduler) GetType() string { return "random-merge" } +func (s *randomMergeScheduler) EncodeConfig() ([]byte, error) { + return schedule.EncodeConfig(s.conf) +} + func (s *randomMergeScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return s.opController.OperatorCount(operator.OpMerge) < cluster.GetMergeScheduleLimit() } @@ -78,7 +100,7 @@ func (s *randomMergeScheduler) Schedule(cluster opt.Cluster) []*operator.Operato schedulerCounter.WithLabelValues(s.GetName(), "no-source-store").Inc() return nil } - region := cluster.RandLeaderRegion(store.GetID(), core.HealthRegion()) + region := cluster.RandLeaderRegion(store.GetID(), s.conf.Ranges, core.HealthRegion()) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() return nil diff --git a/server/schedulers/scatter_range.go b/server/schedulers/scatter_range.go index 09234abc2a7..2cb8461af98 100644 --- a/server/schedulers/scatter_range.go +++ b/server/schedulers/scatter_range.go @@ -29,7 +29,6 @@ import ( ) func init() { - // args: [start-key, end-key, range-name]. schedule.RegisterSliceDecoderBuilder("scatter-range", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { @@ -154,11 +153,13 @@ func newScatterRangeScheduler(opController *schedule.OperatorController, storage name: name, balanceLeader: newBalanceLeaderScheduler( opController, + &balanceLeaderSchedulerConfig{Ranges: []core.KeyRange{core.NewKeyRange("", "")}}, WithBalanceLeaderName("scatter-range-leader"), WithBalanceLeaderCounter(scatterRangeLeaderCounter), ), balanceRegion: newBalanceRegionScheduler( opController, + &balanceRegionSchedulerConfig{Ranges: []core.KeyRange{core.NewKeyRange("", "")}}, WithBalanceRegionName("scatter-range-region"), WithBalanceRegionCounter(scatterRangeRegionCounter), ), diff --git a/server/schedulers/scheduler_test.go b/server/schedulers/scheduler_test.go index 96a5ee5bc2c..fbc9d1b6010 100644 --- a/server/schedulers/scheduler_test.go +++ b/server/schedulers/scheduler_test.go @@ -40,7 +40,7 @@ func (s *testShuffleLeaderSuite) TestShuffle(c *C) { opt := mockoption.NewScheduleOptions() tc := mockcluster.NewCluster(opt) - sl, err := schedule.CreateScheduler("shuffle-leader", schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + sl, err := schedule.CreateScheduler("shuffle-leader", schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("shuffle-leader", []string{"", ""})) c.Assert(err, IsNil) c.Assert(sl.Schedule(tc), IsNil) @@ -294,7 +294,7 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { // The label scheduler transfers leader out of store1. oc := schedule.NewOperatorController(ctx, nil, nil) - sl, err := schedule.CreateScheduler("label", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigJSONDecoder(nil)) + sl, err := schedule.CreateScheduler("label", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("label", []string{"", ""})) c.Assert(err, IsNil) op := sl.Schedule(tc) testutil.CheckTransferLeader(c, op[0], operator.OpLeader, 1, 3) @@ -306,7 +306,7 @@ func (s *testRejectLeaderSuite) TestRejectLeader(c *C) { // As store3 is disconnected, store1 rejects leader. Balancer will not create // any operators. - bs, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), nil) + bs, err := schedule.CreateScheduler("balance-leader", oc, core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("balance-leader", []string{"", ""})) c.Assert(err, IsNil) op = bs.Schedule(tc) c.Assert(op, IsNil) @@ -450,7 +450,7 @@ func (s *testShuffleRegionSuite) TestShuffle(c *C) { opt := mockoption.NewScheduleOptions() tc := mockcluster.NewCluster(opt) - sl, err := schedule.CreateScheduler("shuffle-region", schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), nil) + sl, err := schedule.CreateScheduler("shuffle-region", schedule.NewOperatorController(ctx, nil, nil), core.NewStorage(kv.NewMemoryKV()), schedule.ConfigSliceDecoder("shuffle-region", []string{"", ""})) c.Assert(err, IsNil) c.Assert(sl.IsScheduleAllowed(tc), IsTrue) c.Assert(sl.Schedule(tc), IsNil) diff --git a/server/schedulers/shuffle_hot_region.go b/server/schedulers/shuffle_hot_region.go index dbe73dc45de..90fd9ec00eb 100644 --- a/server/schedulers/shuffle_hot_region.go +++ b/server/schedulers/shuffle_hot_region.go @@ -28,6 +28,8 @@ import ( "go.uber.org/zap" ) +const shuffleHotRegionName = "shuffle-hot-region-scheduler" + func init() { schedule.RegisterSliceDecoderBuilder("shuffle-hot-region", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { @@ -43,6 +45,7 @@ func init() { } conf.Limit = limit } + conf.Name = shuffleHotRegionName return nil } }) @@ -55,6 +58,7 @@ func init() { } type shuffleHotRegionSchedulerConfig struct { + Name string `json:"name"` Limit uint64 `json:"limit"` } @@ -83,7 +87,7 @@ func newShuffleHotRegionScheduler(opController *schedule.OperatorController, con } func (s *shuffleHotRegionScheduler) GetName() string { - return "shuffle-hot-region-scheduler" + return s.conf.Name } func (s *shuffleHotRegionScheduler) GetType() string { diff --git a/server/schedulers/shuffle_leader.go b/server/schedulers/shuffle_leader.go index 3690232c606..f56ecfba38f 100644 --- a/server/schedulers/shuffle_leader.go +++ b/server/schedulers/shuffle_leader.go @@ -20,50 +20,72 @@ import ( "github.com/pingcap/pd/server/schedule/operator" "github.com/pingcap/pd/server/schedule/opt" "github.com/pingcap/pd/server/schedule/selector" + "github.com/pkg/errors" ) +const shuffleLeaderName = "shuffle-leader-scheduler" + func init() { schedule.RegisterSliceDecoderBuilder("shuffle-leader", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { + conf, ok := v.(*shuffleLeaderSchedulerConfig) + if !ok { + return ErrScheduleConfigNotExist + } + ranges, err := getKeyRanges(args) + if err != nil { + return errors.WithStack(err) + } + conf.Ranges = ranges + conf.Name = shuffleLeaderName return nil } }) schedule.RegisterScheduler("shuffle-leader", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newShuffleLeaderScheduler(opController), nil + conf := &shuffleLeaderSchedulerConfig{} + decoder(conf) + return newShuffleLeaderScheduler(opController, conf), nil }) } -const shuffleLeaderName = "shuffle-leader-scheduler" +type shuffleLeaderSchedulerConfig struct { + Name string `json:"name"` + Ranges []core.KeyRange `json:"ranges"` +} type shuffleLeaderScheduler struct { - name string *baseScheduler + conf *shuffleLeaderSchedulerConfig selector *selector.RandomSelector } // newShuffleLeaderScheduler creates an admin scheduler that shuffles leaders // between stores. -func newShuffleLeaderScheduler(opController *schedule.OperatorController) schedule.Scheduler { +func newShuffleLeaderScheduler(opController *schedule.OperatorController, conf *shuffleLeaderSchedulerConfig) schedule.Scheduler { filters := []filter.Filter{ - filter.StoreStateFilter{ActionScope: shuffleLeaderName, TransferLeader: true}, + filter.StoreStateFilter{ActionScope: conf.Name, TransferLeader: true}, } base := newBaseScheduler(opController) return &shuffleLeaderScheduler{ - name: shuffleLeaderName, baseScheduler: base, + conf: conf, selector: selector.NewRandomSelector(filters), } } func (s *shuffleLeaderScheduler) GetName() string { - return s.name + return s.conf.Name } func (s *shuffleLeaderScheduler) GetType() string { return "shuffle-leader" } +func (s *shuffleLeaderScheduler) EncodeConfig() ([]byte, error) { + return schedule.EncodeConfig(s.conf) +} + func (s *shuffleLeaderScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return s.opController.OperatorCount(operator.OpLeader) < cluster.GetLeaderScheduleLimit() } @@ -79,7 +101,7 @@ func (s *shuffleLeaderScheduler) Schedule(cluster opt.Cluster) []*operator.Opera schedulerCounter.WithLabelValues(s.GetName(), "no-target-store").Inc() return nil } - region := cluster.RandFollowerRegion(targetStore.GetID(), core.HealthRegion()) + region := cluster.RandFollowerRegion(targetStore.GetID(), s.conf.Ranges, core.HealthRegion()) if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-follower").Inc() return nil diff --git a/server/schedulers/shuffle_region.go b/server/schedulers/shuffle_region.go index d288e406944..1946bea53d0 100644 --- a/server/schedulers/shuffle_region.go +++ b/server/schedulers/shuffle_region.go @@ -22,50 +22,72 @@ import ( "github.com/pingcap/pd/server/schedule/operator" "github.com/pingcap/pd/server/schedule/opt" "github.com/pingcap/pd/server/schedule/selector" + "github.com/pkg/errors" "go.uber.org/zap" ) +const shuffleRegionName = "shuffle-region-scheduler" + func init() { schedule.RegisterSliceDecoderBuilder("shuffle-region", func(args []string) schedule.ConfigDecoder { return func(v interface{}) error { + conf, ok := v.(*shuffleRegionSchedulerConfig) + if !ok { + return ErrScheduleConfigNotExist + } + ranges, err := getKeyRanges(args) + if err != nil { + return errors.WithStack(err) + } + conf.Ranges = ranges + conf.Name = shuffleRegionName return nil } }) - schedule.RegisterScheduler("shuffle-region", func(opController *schedule.OperatorController, straoge *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { - return newShuffleRegionScheduler(opController), nil + schedule.RegisterScheduler("shuffle-region", func(opController *schedule.OperatorController, storage *core.Storage, decoder schedule.ConfigDecoder) (schedule.Scheduler, error) { + conf := &shuffleRegionSchedulerConfig{} + decoder(conf) + return newShuffleRegionScheduler(opController, conf), nil }) } -const shuffleRegionName = "shuffle-region-scheduler" +type shuffleRegionSchedulerConfig struct { + Name string `json:"name"` + Ranges []core.KeyRange `json:"ranges"` +} type shuffleRegionScheduler struct { - name string *baseScheduler + conf *shuffleRegionSchedulerConfig selector *selector.RandomSelector } // newShuffleRegionScheduler creates an admin scheduler that shuffles regions // between stores. -func newShuffleRegionScheduler(opController *schedule.OperatorController) schedule.Scheduler { +func newShuffleRegionScheduler(opController *schedule.OperatorController, conf *shuffleRegionSchedulerConfig) schedule.Scheduler { filters := []filter.Filter{ - filter.StoreStateFilter{ActionScope: shuffleRegionName, MoveRegion: true}, + filter.StoreStateFilter{ActionScope: conf.Name, MoveRegion: true}, } base := newBaseScheduler(opController) return &shuffleRegionScheduler{ - name: shuffleRegionName, baseScheduler: base, + conf: conf, selector: selector.NewRandomSelector(filters), } } func (s *shuffleRegionScheduler) GetName() string { - return s.name + return s.conf.Name } func (s *shuffleRegionScheduler) GetType() string { return "shuffle-region" } +func (s *shuffleRegionScheduler) EncodeConfig() ([]byte, error) { + return schedule.EncodeConfig(s.conf) +} + func (s *shuffleRegionScheduler) IsScheduleAllowed(cluster opt.Cluster) bool { return s.opController.OperatorCount(operator.OpRegion) < cluster.GetRegionScheduleLimit() } @@ -104,9 +126,9 @@ func (s *shuffleRegionScheduler) scheduleRemovePeer(cluster opt.Cluster) (*core. return nil, nil } - region := cluster.RandFollowerRegion(source.GetID(), core.HealthRegion()) + region := cluster.RandFollowerRegion(source.GetID(), s.conf.Ranges, core.HealthRegion()) if region == nil { - region = cluster.RandLeaderRegion(source.GetID(), core.HealthRegion()) + region = cluster.RandLeaderRegion(source.GetID(), s.conf.Ranges, core.HealthRegion()) } if region == nil { schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc() diff --git a/server/schedulers/utils.go b/server/schedulers/utils.go index d16f2efc770..85cc63c1d3c 100644 --- a/server/schedulers/utils.go +++ b/server/schedulers/utils.go @@ -15,6 +15,7 @@ package schedulers import ( "context" + "net/url" "time" "github.com/montanaflynn/stats" @@ -149,3 +150,23 @@ const ( func newTaintCache(ctx context.Context) *cache.TTLUint64 { return cache.NewIDTTL(ctx, taintCacheGCInterval, taintCacheTTL) } + +func getKeyRanges(args []string) ([]core.KeyRange, error) { + var ranges []core.KeyRange + for len(args) > 1 { + startKey, err := url.QueryUnescape(args[0]) + if err != nil { + return nil, err + } + endKey, err := url.QueryUnescape(args[1]) + if err != nil { + return nil, err + } + args = args[2:] + ranges = append(ranges, core.NewKeyRange(startKey, endKey)) + } + if len(ranges) == 0 { + return []core.KeyRange{core.NewKeyRange("", "")}, nil + } + return ranges, nil +}