Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: region should splitted if region is too hot. #6618

Merged
merged 23 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -808,8 +808,8 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In before, regions key will order by [1,2,3,4,5], there are no key range for the buckets.
In this pr, regions key will order by [10,20,30,40,50], I can put some buckets in one regions.

EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
StartKey: []byte(fmt.Sprintf("%20d0", regionID)),
EndKey: []byte(fmt.Sprintf("%20d0", regionID+1)),
RegionEpoch: epoch,
}
var leader *metapb.Peer
Expand Down
98 changes: 72 additions & 26 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@
hotSchedulerSnapshotSenderLimitCounter = schedulerCounter.WithLabelValues(HotRegionName, "snapshot_sender_limit")

// counter related with the split region
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerNotFoundSplitKeysCounter = schedulerCounter.WithLabelValues(HotRegionName, "not_found_split_keys")
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerRegionBucketsSingleHotSpotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_single_hot_spot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerRegionTooHotNeedSplitCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot_need_split")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -159,21 +161,23 @@
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence() {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}
for _, from := range p.froms {
from := h.stInfos[from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
}
for storeID, info := range h.stInfos {
Expand Down Expand Up @@ -203,6 +207,7 @@

minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
regionTooHotThreshold = 0.3
)

var (
Expand Down Expand Up @@ -294,7 +299,7 @@
return nil
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
Expand Down Expand Up @@ -664,6 +669,16 @@
}
}
bs.cur.mainPeerStat = mainPeerStat
if tooHotNeedSplit(srcStore, mainPeerStat) {
hotSchedulerRegionTooHotNeedSplitCounter.Inc()
ops := bs.createSplitOperator([]*core.RegionInfo{bs.cur.region}, true /*too hot need to split*/)
if len(ops) > 0 {
bs.ops = ops
bs.cur.calcPeersRate(bs.firstPriority, bs.secondPriority)
bs.best = bs.cur
return ops
}
}

for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
Expand Down Expand Up @@ -723,24 +738,38 @@
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
isSplit := bs.ops[0].Kind() == operator.OpSplit
if !isSplit && bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
hotSchedulerNotSameEngineCounter.Inc()
return false
}
maxZombieDur := bs.calcMaxZombieDur()

// TODO: Process operators atomically.
// main peer
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()

srcStoreIDs := make([]uint64, 0)
dstStoreID := uint64(0)
if isSplit {
region := bs.GetRegion(bs.ops[0].RegionID())
for id := range region.GetStoreIDs() {
srcStoreIDs = append(srcStoreIDs, id)
}
} else {
srcStoreIDs = append(srcStoreIDs, bs.best.srcStore.GetID())
dstStoreID = bs.best.dstStore.GetID()
}
infl := bs.collectPendingInfluence(bs.best.mainPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if isSplit {
return true
}
// revert peers
if bs.best.revertPeerStat != nil && len(bs.ops) > 1 {
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[1], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
return false
}
}
Expand Down Expand Up @@ -1435,7 +1464,7 @@
}
}
if len(splitRegions) > 0 {
return bs.createSplitOperator(splitRegions)
return bs.createSplitOperator(splitRegions, false /* region is too big need split before move */)

Check warning on line 1467 in pkg/schedule/schedulers/hot_region.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/hot_region.go#L1467

Added line #L1467 was not covered by tests
}

srcStoreID := bs.cur.srcStore.GetID()
Expand Down Expand Up @@ -1475,7 +1504,8 @@
}

// createSplitOperator creates split operators for the given regions.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*operator.Operator {
// isTooHot true indicates that the region is too hot and needs split.
func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo, isTooHot bool) []*operator.Operator {
if len(regions) == 0 {
return nil
}
Expand All @@ -1492,6 +1522,11 @@
hotSchedulerRegionBucketsNotHotCounter.Inc()
return
}
// skip if only one hot buckets exists on this region.
if len(stats) <= 1 && isTooHot {
hotSchedulerRegionBucketsSingleHotSpotCounter.Inc()
return

Check warning on line 1528 in pkg/schedule/schedulers/hot_region.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/schedulers/hot_region.go#L1527-L1528

Added lines #L1527 - L1528 were not covered by tests
}
startKey, endKey := region.GetStartKey(), region.GetEndKey()
splitKey := make([][]byte, 0)
for _, stat := range stats {
Expand Down Expand Up @@ -1529,6 +1564,10 @@
for _, region := range regions {
createFunc(region)
}
// the split bucket's priority is highest
if len(operators) > 0 {
bs.cur.progressiveRank = -5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make split stragy is the best stragy.

}
return operators
}

Expand Down Expand Up @@ -1789,3 +1828,10 @@
func prioritiesToDim(priorities []string) (firstPriority int, secondPriority int) {
return stringToDim(priorities[0]), stringToDim(priorities[1])
}

// tooHotNeedSplit returns true if any dim of the hot region is greater than the store threshold.
func tooHotNeedSplit(store *statistics.StoreLoadDetail, region *statistics.HotPeerStat) bool {
return slice.AnyOf(store.LoadPred.Current.Loads, func(i int) bool {
return region.Loads[i] > store.LoadPred.Current.Loads[i]*regionTooHotThreshold
})
}
68 changes: 66 additions & 2 deletions pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package schedulers

import (
"encoding/hex"
"fmt"
"math"
"testing"
"time"
Expand Down Expand Up @@ -141,7 +142,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
op.Start()
op.SetStatusReachTime(operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
op.SetStatusReachTime(operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration())
return newPendingInfluence(op, []uint64{2}, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration())
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
Expand Down Expand Up @@ -202,6 +203,68 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
}

func TestSplitIfRegionTooHot(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetHotRegionCacheHitsThreshold(1)
hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
b := &metapb.Buckets{
RegionId: 1,
PeriodInMs: 1000,
Keys: [][]byte{
[]byte(fmt.Sprintf("%21d", 11)),
[]byte(fmt.Sprintf("%21d", 12)),
[]byte(fmt.Sprintf("%21d", 13)),
},
Stats: &metapb.BucketStats{
ReadBytes: []uint64{10 * units.KiB, 11 * units.KiB},
ReadKeys: []uint64{256, 256},
ReadQps: []uint64{0, 0},
WriteBytes: []uint64{0, 0},
WriteQps: []uint64{0, 0},
WriteKeys: []uint64{0, 0},
},
}

task := buckets.NewCheckPeerTask(b)
re.True(tc.HotBucketCache.CheckAsync(task))
time.Sleep(time.Millisecond * 10)

tc.AddRegionStore(1, 3)
tc.AddRegionStore(2, 2)
tc.AddRegionStore(3, 2)

tc.UpdateStorageReadBytes(1, 6*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(2, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
// Region 1, 2 and 3 are hot regions.
addRegionInfo(tc, statistics.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0},
})
ops, _ := hb.Schedule(tc, false)
re.Len(ops, 1)
re.Equal(operator.OpSplit, ops[0].Kind())
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 0)

tc.UpdateStorageWrittenBytes(1, 6*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(3, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
// Region 1, 2 and 3 are hot regions.
addRegionInfo(tc, statistics.Write, []testRegionInfo{
{1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0},
})
hb, _ = CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
re.Equal(operator.OpSplit, ops[0].Kind())
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 0)
}

func TestSplitBuckets(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
Expand All @@ -211,6 +274,7 @@ func TestSplitBuckets(t *testing.T) {
hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
solve := newBalanceSolver(hb.(*hotScheduler), tc, statistics.Read, transferLeader)
solve.cur = &solution{}
region := core.NewTestRegionInfo(1, 1, []byte(""), []byte(""))

// the hot range is [a,c],[e,f]
Expand All @@ -231,7 +295,7 @@ func TestSplitBuckets(t *testing.T) {
task := buckets.NewCheckPeerTask(b)
re.True(tc.HotBucketCache.CheckAsync(task))
time.Sleep(time.Millisecond * 10)
ops := solve.createSplitOperator([]*core.RegionInfo{region})
ops := solve.createSplitOperator([]*core.RegionInfo{region}, false)
re.Equal(1, len(ops))
op := ops[0]
re.Equal(splitBucket, op.Desc())
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,16 @@ func getKeyRanges(args []string) ([]core.KeyRange, error) {

type pendingInfluence struct {
op *operator.Operator
from, to uint64
froms []uint64
to uint64
origin statistics.Influence
maxZombieDuration time.Duration
}

func newPendingInfluence(op *operator.Operator, from, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence {
func newPendingInfluence(op *operator.Operator, froms []uint64, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence {
return &pendingInfluence{
op: op,
from: from,
froms: froms,
to: to,
origin: infl,
maxZombieDuration: maxZombieDur,
Expand Down
4 changes: 4 additions & 0 deletions server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,10 @@ func (c *ScheduleConfig) adjust(meta *configutil.ConfigMetaData, reloading bool)
configutil.AdjustUint64(&c.HotRegionsReservedDays, defaultHotRegionsReservedDays)
}

if !meta.IsDefined("max-movable-hot-peer-size") {
configutil.AdjustInt64(&c.MaxMovableHotPeerSize, defaultMaxMovableHotPeerSize)
}

if !meta.IsDefined("slow-store-evicting-affected-store-ratio-threshold") {
configutil.AdjustFloat64(&c.SlowStoreEvictingAffectedStoreRatioThreshold, defaultSlowStoreEvictingAffectedStoreRatioThreshold)
}
Expand Down
6 changes: 1 addition & 5 deletions server/config/persist_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -624,11 +624,7 @@ func (o *PersistOptions) IsLocationReplacementEnabled() bool {

// GetMaxMovableHotPeerSize returns the max movable hot peer size.
func (o *PersistOptions) GetMaxMovableHotPeerSize() int64 {
size := o.GetScheduleConfig().MaxMovableHotPeerSize
if size <= 0 {
size = defaultMaxMovableHotPeerSize
}
return size
return o.GetScheduleConfig().MaxMovableHotPeerSize
}

// IsDebugMetricsEnabled returns if debug metrics is enabled.
Expand Down