Skip to content

Commit

Permalink
support range for schedulers
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Oct 11, 2019
1 parent 0858b08 commit 565fbc5
Show file tree
Hide file tree
Showing 19 changed files with 166 additions and 75 deletions.
12 changes: 6 additions & 6 deletions server/cluster.go
Expand Up @@ -614,18 +614,18 @@ func (c *RaftCluster) GetStoreRegions(storeID uint64) []*core.RegionInfo {
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *RaftCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, opts...)
func (c *RaftCluster) RandLeaderRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandLeaderRegion(storeID, startKey, endKey, opts...)
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *RaftCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, opts...)
func (c *RaftCluster) RandFollowerRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandFollowerRegion(storeID, startKey, endKey, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (c *RaftCluster) RandPendingRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, opts...)
func (c *RaftCluster) RandPendingRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
return c.core.RandPendingRegion(storeID, startKey, endKey, opts...)
}

// RandHotRegionFromStore randomly picks a hot region in specified store.
Expand Down
10 changes: 5 additions & 5 deletions server/cluster_test.go
Expand Up @@ -841,10 +841,10 @@ func (s *testRegionsInfoSuite) Test(c *C) {
}

for i := uint64(0); i < n; i++ {
region := cache.RandLeaderRegion(i, core.HealthRegion())
region := cache.RandLeaderRegion(i, []byte(""), []byte(""), core.HealthRegion())
c.Assert(region.GetLeader().GetStoreId(), Equals, i)

region = cache.RandFollowerRegion(i, core.HealthRegion())
region = cache.RandFollowerRegion(i, []byte(""), []byte(""), core.HealthRegion())
c.Assert(region.GetLeader().GetStoreId(), Not(Equals), i)

c.Assert(region.GetStorePeer(i), NotNil)
Expand All @@ -860,14 +860,14 @@ func (s *testRegionsInfoSuite) Test(c *C) {
// All regions will be filtered out if they have pending peers.
for i := uint64(0); i < n; i++ {
for j := 0; j < cache.GetStoreLeaderCount(i); j++ {
region := cache.RandLeaderRegion(i, core.HealthRegion())
region := cache.RandLeaderRegion(i, []byte(""), []byte(""), core.HealthRegion())
newRegion := region.Clone(core.WithPendingPeers(region.GetPeers()))
cache.SetRegion(newRegion)
}
c.Assert(cache.RandLeaderRegion(i, core.HealthRegion()), IsNil)
c.Assert(cache.RandLeaderRegion(i, []byte(""), []byte(""), core.HealthRegion()), IsNil)
}
for i := uint64(0); i < n; i++ {
c.Assert(cache.RandFollowerRegion(i, core.HealthRegion()), IsNil)
c.Assert(cache.RandFollowerRegion(i, []byte(""), []byte(""), core.HealthRegion()), IsNil)
}
}

Expand Down
18 changes: 9 additions & 9 deletions server/core/basic_cluster.go
Expand Up @@ -152,24 +152,24 @@ func (bc *BasicCluster) UpdateStoreStatus(storeID uint64, leaderCount int, regio
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandFollowerRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.RandFollowerRegion(storeID, opts...)
return bc.Regions.RandFollowerRegion(storeID, startKey, endKey, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandLeaderRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.RandLeaderRegion(storeID, opts...)
return bc.Regions.RandLeaderRegion(storeID, startKey, endKey, opts...)
}

// RandPendingRegion returns a random region that has a pending peer on the store.
func (bc *BasicCluster) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
func (bc *BasicCluster) RandPendingRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
bc.RLock()
defer bc.RUnlock()
return bc.Regions.RandPendingRegion(storeID, opts...)
return bc.Regions.RandPendingRegion(storeID, startKey, endKey, opts...)
}

// GetRegionCount gets the total count of RegionInfo of regionMap.
Expand Down Expand Up @@ -308,9 +308,9 @@ func (bc *BasicCluster) Length() int {

// RegionSetInformer provides access to a shared informer of regions.
type RegionSetInformer interface {
RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo
RandFollowerRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo
RandLeaderRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo
RandPendingRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo
GetAverageRegionSize() int64
GetStoreRegionCount(storeID uint64) int
GetRegion(id uint64) *RegionInfo
Expand Down
20 changes: 10 additions & 10 deletions server/core/region.go
Expand Up @@ -750,23 +750,23 @@ func (r *RegionsInfo) GetStoreLearnerCount(storeID uint64) int {
}

// RandRegion get a region by random
func (r *RegionsInfo) RandRegion(opts ...RegionOption) *RegionInfo {
return randRegion(r.tree, opts...)
func (r *RegionsInfo) RandRegion(startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
return randRegion(r.tree, startKey, endKey, opts...)
}

// RandPendingRegion randomly gets a store's region with a pending peer.
func (r *RegionsInfo) RandPendingRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], opts...)
func (r *RegionsInfo) RandPendingRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
return randRegion(r.pendingPeers[storeID], startKey, endKey, opts...)
}

// RandLeaderRegion randomly gets a store's leader region.
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], opts...)
func (r *RegionsInfo) RandLeaderRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
return randRegion(r.leaders[storeID], startKey, endKey, opts...)
}

// RandFollowerRegion randomly gets a store's follower region.
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], opts...)
func (r *RegionsInfo) RandFollowerRegion(storeID uint64, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
return randRegion(r.followers[storeID], startKey, endKey, opts...)
}

// GetLeader return leader RegionInfo by storeID and regionID(now only used in test)
Expand Down Expand Up @@ -831,9 +831,9 @@ type RegionsContainer interface {
RandomRegion(startKey, endKey []byte) *RegionInfo
}

func randRegion(regions RegionsContainer, opts ...RegionOption) *RegionInfo {
func randRegion(regions RegionsContainer, startKey, endKey []byte, opts ...RegionOption) *RegionInfo {
for i := 0; i < randomRegionMaxRetry; i++ {
region := regions.RandomRegion(nil, nil)
region := regions.RandomRegion(startKey, endKey)
if region == nil {
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion server/core/region_test.go
Expand Up @@ -148,7 +148,7 @@ func BenchmarkRandomRegion(b *testing.B) {
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
regions.RandRegion()
regions.RandRegion([]byte(""), []byte(""))
}
}

Expand Down
8 changes: 4 additions & 4 deletions server/namespace_cluster.go
Expand Up @@ -62,9 +62,9 @@ func (c *namespaceCluster) checkRegion(region *core.RegionInfo) bool {
const randRegionMaxRetry = 10

// RandFollowerRegion returns a random region that has a follower on the store.
func (c *namespaceCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
func (c *namespaceCluster) RandFollowerRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
for i := 0; i < randRegionMaxRetry; i++ {
r := c.Cluster.RandFollowerRegion(storeID, opts...)
r := c.Cluster.RandFollowerRegion(storeID, startKey, endKey, opts...)
if r == nil {
return nil
}
Expand All @@ -76,9 +76,9 @@ func (c *namespaceCluster) RandFollowerRegion(storeID uint64, opts ...core.Regio
}

// RandLeaderRegion returns a random region that has leader on the store.
func (c *namespaceCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
func (c *namespaceCluster) RandLeaderRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
for i := 0; i < randRegionMaxRetry; i++ {
r := c.Cluster.RandLeaderRegion(storeID, opts...)
r := c.Cluster.RandLeaderRegion(storeID, startKey, endKey, opts...)
if r == nil {
return nil
}
Expand Down
8 changes: 4 additions & 4 deletions server/schedule/range_cluster.go
Expand Up @@ -101,13 +101,13 @@ func (r *RangeCluster) GetTolerantSizeRatio() float64 {
}

// RandFollowerRegion returns a random region that has a follower on the store.
func (r *RangeCluster) RandFollowerRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandFollowerRegion(storeID, opts...)
func (r *RangeCluster) RandFollowerRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandFollowerRegion(storeID, startKey, endKey, opts...)
}

// RandLeaderRegion returns a random region that has leader on the store.
func (r *RangeCluster) RandLeaderRegion(storeID uint64, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandLeaderRegion(storeID, opts...)
func (r *RangeCluster) RandLeaderRegion(storeID uint64, startKey, endKey []byte, opts ...core.RegionOption) *core.RegionInfo {
return r.regions.RandLeaderRegion(storeID, startKey, endKey, opts...)
}

// GetAverageRegionSize returns the average region approximate size.
Expand Down
16 changes: 12 additions & 4 deletions server/schedulers/balance_leader.go
Expand Up @@ -29,7 +29,11 @@ import (

func init() {
schedule.RegisterScheduler("balance-leader", func(opController *schedule.OperatorController, args []string) (schedule.Scheduler, error) {
return newBalanceLeaderScheduler(opController), nil
startKey, endKey, err := getRangeKeys(args)
if err != nil {
return nil, err
}
return newBalanceLeaderScheduler(opController, []string{startKey, endKey}), nil
})
}

Expand All @@ -39,6 +43,8 @@ const balanceLeaderRetryLimit = 10
type balanceLeaderScheduler struct {
*baseScheduler
name string
startKey []byte
endKey []byte
selector *selector.BalanceSelector
taintStores *cache.TTLUint64
opController *schedule.OperatorController
Expand All @@ -47,12 +53,14 @@ type balanceLeaderScheduler struct {

// newBalanceLeaderScheduler creates a scheduler that tends to keep leaders on
// each store balanced.
func newBalanceLeaderScheduler(opController *schedule.OperatorController, opts ...BalanceLeaderCreateOption) schedule.Scheduler {
func newBalanceLeaderScheduler(opController *schedule.OperatorController, args []string, opts ...BalanceLeaderCreateOption) schedule.Scheduler {
taintStores := newTaintCache()
base := newBaseScheduler(opController)

s := &balanceLeaderScheduler{
baseScheduler: base,
startKey: []byte(args[0]),
endKey: []byte(args[1]),
taintStores: taintStores,
opController: opController,
counter: balanceLeaderCounter,
Expand Down Expand Up @@ -160,7 +168,7 @@ func (l *balanceLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.
// the best follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderOut(cluster schedule.Cluster, source *core.StoreInfo) []*operator.Operator {
sourceID := source.GetID()
region := cluster.RandLeaderRegion(sourceID, core.HealthRegion())
region := cluster.RandLeaderRegion(sourceID, l.startKey, l.endKey, core.HealthRegion())
if region == nil {
log.Debug("store has no leader", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", sourceID))
schedulerCounter.WithLabelValues(l.GetName(), "no-leader-region").Inc()
Expand All @@ -180,7 +188,7 @@ func (l *balanceLeaderScheduler) transferLeaderOut(cluster schedule.Cluster, sou
// the worst follower peer and transfers the leader.
func (l *balanceLeaderScheduler) transferLeaderIn(cluster schedule.Cluster, target *core.StoreInfo) []*operator.Operator {
targetID := target.GetID()
region := cluster.RandFollowerRegion(targetID, core.HealthRegion())
region := cluster.RandFollowerRegion(targetID, l.startKey, l.endKey, core.HealthRegion())
if region == nil {
log.Debug("store has no follower", zap.String("scheduler", l.GetName()), zap.Uint64("store-id", targetID))
schedulerCounter.WithLabelValues(l.GetName(), "no-follower-region").Inc()
Expand Down
18 changes: 13 additions & 5 deletions server/schedulers/balance_region.go
Expand Up @@ -32,7 +32,11 @@ import (

func init() {
schedule.RegisterScheduler("balance-region", func(opController *schedule.OperatorController, args []string) (schedule.Scheduler, error) {
return newBalanceRegionScheduler(opController), nil
startKey, endKey, err := getRangeKeys(args)
if err != nil {
return nil, err
}
return newBalanceRegionScheduler(opController, []string{startKey, endKey}), nil
})
}

Expand All @@ -53,6 +57,8 @@ const (
type balanceRegionScheduler struct {
*baseScheduler
name string
startKey []byte
endKey []byte
selector *selector.BalanceSelector
opController *schedule.OperatorController
hitsCounter *hitsStoreBuilder
Expand All @@ -61,10 +67,12 @@ type balanceRegionScheduler struct {

// newBalanceRegionScheduler creates a scheduler that tends to keep regions on
// each store balanced.
func newBalanceRegionScheduler(opController *schedule.OperatorController, opts ...BalanceRegionCreateOption) schedule.Scheduler {
func newBalanceRegionScheduler(opController *schedule.OperatorController, args []string, opts ...BalanceRegionCreateOption) schedule.Scheduler {
base := newBaseScheduler(opController)
s := &balanceRegionScheduler{
baseScheduler: base,
startKey: []byte(args[0]),
endKey: []byte(args[1]),
opController: opController,
hitsCounter: newHitsStoreBuilder(hitsStoreTTL, hitsStoreCountThreshold),
counter: balanceRegionCounter,
Expand Down Expand Up @@ -135,14 +143,14 @@ func (s *balanceRegionScheduler) Schedule(cluster schedule.Cluster) []*operator.
for i := 0; i < balanceRegionRetryLimit; i++ {
// Priority picks the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
region := cluster.RandPendingRegion(sourceID, core.HealthRegionAllowPending())
region := cluster.RandPendingRegion(sourceID, s.startKey, s.endKey, core.HealthRegionAllowPending())
if region == nil {
// Then picks the region that has a follower in the source store.
region = cluster.RandFollowerRegion(sourceID, core.HealthRegion())
region = cluster.RandFollowerRegion(sourceID, s.startKey, s.endKey, core.HealthRegion())
}
if region == nil {
// Last, picks the region has the leader in the source store.
region = cluster.RandLeaderRegion(sourceID, core.HealthRegion())
region = cluster.RandLeaderRegion(sourceID, s.startKey, s.endKey, core.HealthRegion())
}
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
Expand Down
17 changes: 13 additions & 4 deletions server/schedulers/evict_leader.go
Expand Up @@ -27,34 +27,43 @@ import (

func init() {
schedule.RegisterScheduler("evict-leader", func(opController *schedule.OperatorController, args []string) (schedule.Scheduler, error) {
if len(args) != 1 {
if len(args) == 0 {
return nil, errors.New("evict-leader needs 1 argument")
}
id, err := strconv.ParseUint(args[0], 10, 64)
if err != nil {
return nil, errors.WithStack(err)
}
return newEvictLeaderScheduler(opController, id), nil

startKey, endKey, err := getRangeKeys(args[1:])
if err != nil {
return nil, err
}
return newEvictLeaderScheduler(opController, id, []string{startKey, endKey}), nil
})
}

type evictLeaderScheduler struct {
*baseScheduler
startKey []byte
endKey []byte
name string
storeID uint64
selector *selector.RandomSelector
}

// newEvictLeaderScheduler creates an admin scheduler that transfers all leaders
// out of a store.
func newEvictLeaderScheduler(opController *schedule.OperatorController, storeID uint64) schedule.Scheduler {
func newEvictLeaderScheduler(opController *schedule.OperatorController, storeID uint64, args []string) schedule.Scheduler {
name := fmt.Sprintf("evict-leader-scheduler-%d", storeID)
filters := []filter.Filter{
filter.StoreStateFilter{ActionScope: name, TransferLeader: true},
}
base := newBaseScheduler(opController)
return &evictLeaderScheduler{
baseScheduler: base,
startKey: []byte(args[0]),
endKey: []byte(args[1]),
name: name,
storeID: storeID,
selector: selector.NewRandomSelector(filters),
Expand Down Expand Up @@ -83,7 +92,7 @@ func (s *evictLeaderScheduler) IsScheduleAllowed(cluster schedule.Cluster) bool

func (s *evictLeaderScheduler) Schedule(cluster schedule.Cluster) []*operator.Operator {
schedulerCounter.WithLabelValues(s.GetName(), "schedule").Inc()
region := cluster.RandLeaderRegion(s.storeID, core.HealthRegion())
region := cluster.RandLeaderRegion(s.storeID, s.startKey, s.endKey, core.HealthRegion())
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-leader").Inc()
return nil
Expand Down

0 comments on commit 565fbc5

Please sign in to comment.