Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: allow empty region to be scheduled and use a sperate tolerance config in scatter range scheduler (#4106) #4116

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 15 additions & 4 deletions server/schedulers/balance_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,24 +148,35 @@ func (s *balanceRegionScheduler) Schedule(cluster opt.Cluster) []*operator.Opera
return stores[i].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), iOp, -1) >
stores[j].RegionScore(opts.GetRegionScoreFormulaVersion(), opts.GetHighSpaceRatio(), opts.GetLowSpaceRatio(), jOp, -1)
})

var allowBalanceEmptyRegion func(*core.RegionInfo) bool

switch cluster.(type) {
case *schedule.RangeCluster:
// allow empty region to be scheduled in range cluster
allowBalanceEmptyRegion = func(region *core.RegionInfo) bool { return true }
default:
allowBalanceEmptyRegion = opt.AllowBalanceEmptyRegion(cluster)
}

for _, source := range stores {
sourceID := source.GetID()
retryLimit := s.retryQuota.GetLimit(source)
for i := 0; i < retryLimit; i++ {
// Priority pick the region that has a pending peer.
// Pending region may means the disk is overload, remove the pending region firstly.
region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
region := cluster.RandPendingRegion(sourceID, s.conf.Ranges, opt.HealthAllowPending(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
if region == nil {
// Then pick the region that has a follower in the source store.
region = cluster.RandFollowerRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
region = cluster.RandFollowerRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if region == nil {
// Then pick the region has the leader in the source store.
region = cluster.RandLeaderRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
region = cluster.RandLeaderRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if region == nil {
// Finally pick learner.
region = cluster.RandLearnerRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), opt.AllowBalanceEmptyRegion(cluster))
region = cluster.RandLearnerRegion(sourceID, s.conf.Ranges, opt.HealthRegion(cluster), opt.ReplicatedRegion(cluster), allowBalanceEmptyRegion)
}
if region == nil {
schedulerCounter.WithLabelValues(s.GetName(), "no-region").Inc()
Expand Down
48 changes: 27 additions & 21 deletions server/schedulers/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,22 @@ func (s *testBalanceSuite) TestTolerantRatio(c *C) {

tc.SetTolerantSizeRatio(0)
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}), Equals, int64(leaderTolerantSizeRatio))
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}), Equals, int64(adjustTolerantRatio(tc)*float64(regionSize)))
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}), Equals, int64(adjustTolerantRatio(tc)*float64(regionSize)))
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}), Equals, int64(adjustTolerantRatio(tc)*float64(regionSize)))
k := core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(adjustTolerantRatio(tc, k)*float64(regionSize)))
k = core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(adjustTolerantRatio(tc, k)*float64(regionSize)))
k = core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(adjustTolerantRatio(tc, k)*float64(regionSize)))

tc.SetTolerantSizeRatio(10)
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}), Equals, int64(tc.GetScheduleConfig().TolerantSizeRatio))
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}), Equals, int64(adjustTolerantRatio(tc)*float64(regionSize)))
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}), Equals, int64(adjustTolerantRatio(tc)*float64(regionSize)))
c.Assert(getTolerantResource(tc, region, core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}), Equals, int64(adjustTolerantRatio(tc)*float64(regionSize)))
k = core.ScheduleKind{Resource: core.LeaderKind, Policy: core.ByCount}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(tc.GetScheduleConfig().TolerantSizeRatio))
k = core.ScheduleKind{Resource: core.LeaderKind, Policy: core.BySize}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(adjustTolerantRatio(tc, k)*float64(regionSize)))
k = core.ScheduleKind{Resource: core.RegionKind, Policy: core.ByCount}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(adjustTolerantRatio(tc, k)*float64(regionSize)))
k = core.ScheduleKind{Resource: core.RegionKind, Policy: core.BySize}
c.Assert(getTolerantResource(tc, region, k), Equals, int64(adjustTolerantRatio(tc, k)*float64(regionSize)))
}

var _ = Suite(&testBalanceLeaderSchedulerSuite{})
Expand Down Expand Up @@ -1007,28 +1014,29 @@ func (s *testRandomMergeSchedulerSuite) TestMerge(c *C) {
c.Assert(mb.IsScheduleAllowed(tc), IsFalse)
}

var _ = Suite(&testScatterRangeLeaderSuite{})
var _ = Suite(&testScatterRangeSuite{})

type testScatterRangeLeaderSuite struct {
type testScatterRangeSuite struct {
ctx context.Context
cancel context.CancelFunc
}

func (s *testScatterRangeLeaderSuite) SetUpSuite(c *C) {
func (s *testScatterRangeSuite) SetUpSuite(c *C) {
s.ctx, s.cancel = context.WithCancel(context.Background())
}

func (s *testScatterRangeLeaderSuite) TearDownSuite(c *C) {
func (s *testScatterRangeSuite) TearDownSuite(c *C) {
s.cancel()
}

func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
func (s *testScatterRangeSuite) TestBalance(c *C) {
opt := config.NewTestOptions()
// TODO: enable palcementrules
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(opt)
tc.DisableFeature(versioninfo.JointConsensus)
tc.SetTolerantSizeRatio(2.5)
// range cluster use a special tolerant ratio, cluster opt take no impact
tc.SetTolerantSizeRatio(10000)
// Add stores 1,2,3,4,5.
tc.AddRegionStore(1, 0)
tc.AddRegionStore(2, 0)
Expand All @@ -1053,17 +1061,16 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
})
id += 4
}
// empty case
// empty region case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
regionInfo := core.NewRegionInfo(
meta,
meta.Peers[leader],
core.SetApproximateKeys(96),
core.SetApproximateSize(96),
core.SetApproximateKeys(1),
core.SetApproximateSize(1),
)

tc.Regions.SetRegion(regionInfo)
}
for i := 0; i < 100; i++ {
Expand All @@ -1087,7 +1094,7 @@ func (s *testScatterRangeLeaderSuite) TestBalance(c *C) {
}
}

func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
func (s *testScatterRangeSuite) TestBalanceLeaderLimit(c *C) {
opt := config.NewTestOptions()
opt.SetPlacementRuleEnabled(false)
tc := mockcluster.NewCluster(opt)
Expand Down Expand Up @@ -1118,7 +1125,6 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
id += 4
}

// empty case
regions[49].EndKey = []byte("")
for _, meta := range regions {
leader := rand.Intn(4) % 3
Expand Down Expand Up @@ -1163,7 +1169,7 @@ func (s *testScatterRangeLeaderSuite) TestBalanceLeaderLimit(c *C) {
c.Check(maxLeaderCount-minLeaderCount, Greater, 10)
}

func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
func (s *testScatterRangeSuite) TestConcurrencyUpdateConfig(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
oc := schedule.NewOperatorController(s.ctx, nil, nil)
Expand All @@ -1189,7 +1195,7 @@ func (s *testScatterRangeLeaderSuite) TestConcurrencyUpdateConfig(c *C) {
ch <- struct{}{}
}

func (s *testScatterRangeLeaderSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
func (s *testScatterRangeSuite) TestBalanceWhenRegionNotHeartbeat(c *C) {
opt := config.NewTestOptions()
tc := mockcluster.NewCluster(opt)
// Add stores 1,2,3.
Expand Down
28 changes: 19 additions & 9 deletions server/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/core"
"github.com/tikv/pd/server/schedule"
"github.com/tikv/pd/server/schedule/operator"
"github.com/tikv/pd/server/schedule/opt"
"github.com/tikv/pd/server/statistics"
Expand Down Expand Up @@ -80,25 +81,34 @@ func shouldBalance(cluster opt.Cluster, source, target *core.StoreInfo, region *
}

func getTolerantResource(cluster opt.Cluster, region *core.RegionInfo, kind core.ScheduleKind) int64 {
tolerantSizeRatio := adjustTolerantRatio(cluster, kind)
if kind.Resource == core.LeaderKind && kind.Policy == core.ByCount {
tolerantSizeRatio := cluster.GetOpts().GetTolerantSizeRatio()
if tolerantSizeRatio == 0 {
tolerantSizeRatio = leaderTolerantSizeRatio
}
leaderCount := int64(1.0 * tolerantSizeRatio)
return leaderCount
return int64(tolerantSizeRatio)
}

regionSize := region.GetApproximateSize()
if regionSize < cluster.GetAverageRegionSize() {
regionSize = cluster.GetAverageRegionSize()
}
regionSize = int64(float64(regionSize) * adjustTolerantRatio(cluster))
regionSize = int64(float64(regionSize) * tolerantSizeRatio)
return regionSize
}

func adjustTolerantRatio(cluster opt.Cluster) float64 {
tolerantSizeRatio := cluster.GetOpts().GetTolerantSizeRatio()
func adjustTolerantRatio(cluster opt.Cluster, kind core.ScheduleKind) float64 {
var tolerantSizeRatio float64
switch c := cluster.(type) {
case *schedule.RangeCluster:
// range cluster use a separate configuration
tolerantSizeRatio = c.GetTolerantSizeRatio()
default:
tolerantSizeRatio = cluster.GetOpts().GetTolerantSizeRatio()
}
if kind.Resource == core.LeaderKind && kind.Policy == core.ByCount {
if tolerantSizeRatio == 0 {
return leaderTolerantSizeRatio
}
return tolerantSizeRatio
}
if tolerantSizeRatio == 0 {
var maxRegionCount float64
stores := cluster.GetStores()
Expand Down