Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

schedulers: region should splitted if region is too hot. #6618

Merged
merged 23 commits into from
Jul 11, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -828,8 +828,8 @@ func (mc *Cluster) MockRegionInfo(regionID uint64, leaderStoreID uint64,
followerStoreIDs, learnerStoreIDs []uint64, epoch *metapb.RegionEpoch) *core.RegionInfo {
region := &metapb.Region{
Id: regionID,
StartKey: []byte(fmt.Sprintf("%20d", regionID)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In before, regions key will order by [1,2,3,4,5], there are no key range for the buckets.
In this pr, regions key will order by [10,20,30,40,50], I can put some buckets in one regions.

EndKey: []byte(fmt.Sprintf("%20d", regionID+1)),
StartKey: []byte(fmt.Sprintf("%20d0", regionID)),
EndKey: []byte(fmt.Sprintf("%20d0", regionID+1)),
RegionEpoch: epoch,
}
var leader *metapb.Peer
Expand Down
79 changes: 59 additions & 20 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ var (
hotSchedulerRegionBucketsNotHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_buckets_not_hot")
hotSchedulerSplitSuccessCounter = schedulerCounter.WithLabelValues(HotRegionName, "split_success")
hotSchedulerNeedSplitBeforeScheduleCounter = schedulerCounter.WithLabelValues(HotRegionName, "need_split_before_move_peer")
hotSchedulerRegionIsTooHotCounter = schedulerCounter.WithLabelValues(HotRegionName, "region_is_too_hot")

hotSchedulerMoveLeaderCounter = schedulerCounter.WithLabelValues(HotRegionName, moveLeader.String())
hotSchedulerMovePeerCounter = schedulerCounter.WithLabelValues(HotRegionName, movePeer.String())
Expand Down Expand Up @@ -159,21 +160,23 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sche.
// It makes each dim rate or count become `weight` times to the origin value.
func (h *baseHotScheduler) summaryPendingInfluence() {
for id, p := range h.regionPendings {
from := h.stInfos[p.from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}
for _, from := range p.froms {
from := h.stInfos[from]
to := h.stInfos[p.to]
maxZombieDur := p.maxZombieDuration
weight, needGC := calcPendingInfluence(p.op, maxZombieDur)

if needGC {
delete(h.regionPendings, id)
continue
}

if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
if from != nil && weight > 0 {
from.AddInfluence(&p.origin, -weight)
}
if to != nil && weight > 0 {
to.AddInfluence(&p.origin, weight)
}
}
}
for storeID, info := range h.stInfos {
Expand Down Expand Up @@ -203,6 +206,7 @@ const (

minHotScheduleInterval = time.Second
maxHotScheduleInterval = 20 * time.Second
regionTooHotThreshold = 0.3
)

var (
Expand Down Expand Up @@ -294,7 +298,7 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster sche.ClusterInfor
return nil
}

func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
func (h *hotScheduler) tryAddPendingInfluence(op *operator.Operator, srcStore []uint64, dstStore uint64, infl statistics.Influence, maxZombieDur time.Duration) bool {
regionID := op.RegionID()
_, ok := h.regionPendings[regionID]
if ok {
Expand Down Expand Up @@ -664,6 +668,16 @@ func (bs *balanceSolver) solve() []*operator.Operator {
}
}
bs.cur.mainPeerStat = mainPeerStat
if regionTooHot(srcStore, mainPeerStat) {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
hotSchedulerRegionIsTooHotCounter.Inc()
ops := bs.createSplitOperator([]*core.RegionInfo{bs.cur.region})
if len(ops) > 0 {
bs.ops = ops
bs.cur.calcPeersRate(bs.firstPriority, bs.secondPriority)
bs.best = bs.cur
return ops
}
}

for _, dstStore := range bs.filterDstStores() {
bs.cur.dstStore = dstStore
Expand Down Expand Up @@ -723,24 +737,38 @@ func (bs *balanceSolver) tryAddPendingInfluence() bool {
if bs.best == nil || len(bs.ops) == 0 {
return false
}
if bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
isSplit := bs.ops[0].Kind() == operator.OpSplit
if !isSplit && bs.best.srcStore.IsTiFlash() != bs.best.dstStore.IsTiFlash() {
hotSchedulerNotSameEngineCounter.Inc()
return false
}
maxZombieDur := bs.calcMaxZombieDur()

// TODO: Process operators atomically.
// main peer
srcStoreID := bs.best.srcStore.GetID()
dstStoreID := bs.best.dstStore.GetID()

srcStoreIDs := make([]uint64, 0)
dstStoreID := uint64(0)
if isSplit {
region := bs.GetRegion(bs.ops[0].RegionID())
for id, _ := range region.GetStoreIDs() {
srcStoreIDs = append(srcStoreIDs, id)
}
} else {
srcStoreIDs = append(srcStoreIDs, bs.best.srcStore.GetID())
dstStoreID = bs.best.dstStore.GetID()
}
infl := bs.collectPendingInfluence(bs.best.mainPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreID, dstStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[0], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
bufferflies marked this conversation as resolved.
Show resolved Hide resolved
return false
}
if isSplit {
return true
}
// revert peers
if bs.best.revertPeerStat != nil && len(bs.ops) > 1 {
infl := bs.collectPendingInfluence(bs.best.revertPeerStat)
if !bs.sche.tryAddPendingInfluence(bs.ops[1], dstStoreID, srcStoreID, infl, maxZombieDur) {
if !bs.sche.tryAddPendingInfluence(bs.ops[1], srcStoreIDs, dstStoreID, infl, maxZombieDur) {
return false
}
}
Expand Down Expand Up @@ -1529,6 +1557,10 @@ func (bs *balanceSolver) createSplitOperator(regions []*core.RegionInfo) []*oper
for _, region := range regions {
createFunc(region)
}
// the split buckets's priority is highest
if len(operators) > 0 {
bs.cur.progressiveRank = -5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is it used for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make split stragy is the best stragy.

}
return operators
}

Expand Down Expand Up @@ -1789,3 +1821,10 @@ func dimToString(dim int) string {
func prioritiesToDim(priorities []string) (firstPriority int, secondPriority int) {
return stringToDim(priorities[0]), stringToDim(priorities[1])
}

// regionTooHot returns true if any dim of the hot region is greater than the store threshold.
func regionTooHot(store *statistics.StoreLoadDetail, region *statistics.HotPeerStat) bool {
return slice.AnyOf(store.LoadPred.Current.Loads, func(i int) bool {
return region.Loads[i] > store.LoadPred.Current.Loads[i]*regionTooHotThreshold
})
}
62 changes: 61 additions & 1 deletion pkg/schedule/schedulers/hot_region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package schedulers

import (
"encoding/hex"
"fmt"
"math"
"testing"
"time"
Expand Down Expand Up @@ -141,7 +142,7 @@ func checkGCPendingOpInfos(re *require.Assertions, enablePlacementRules bool) {
op.Start()
op.SetStatusReachTime(operator.CREATED, time.Now().Add(-5*statistics.StoreHeartBeatReportInterval*time.Second))
op.SetStatusReachTime(operator.STARTED, time.Now().Add((-5*statistics.StoreHeartBeatReportInterval+1)*time.Second))
return newPendingInfluence(op, 2, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration())
return newPendingInfluence(op, []uint64{2}, 4, statistics.Influence{}, hb.conf.GetStoreStatZombieDuration())
}
justDoneOpInfluence := func(region *core.RegionInfo, ty opType) *pendingInfluence {
infl := notDoneOpInfluence(region, ty)
Expand Down Expand Up @@ -202,6 +203,65 @@ func TestHotWriteRegionScheduleByteRateOnly(t *testing.T) {
checkHotWriteRegionScheduleByteRateOnly(re, true /* enable placement rules */)
}

func TestSplitRegionInIfRegionIsTooHot(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
cancel, _, tc, oc := prepareSchedulersTest()
defer cancel()
tc.SetHotRegionCacheHitsThreshold(1)
hb, err := CreateScheduler(statistics.Read.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
re.NoError(err)
b := &metapb.Buckets{
RegionId: 1,
PeriodInMs: 1000,
Keys: [][]byte{[]byte(fmt.Sprintf("%21d", 11)),
[]byte(fmt.Sprintf("%21d", 12))},
Stats: &metapb.BucketStats{
ReadBytes: []uint64{10 * units.KiB},
ReadKeys: []uint64{256},
ReadQps: []uint64{0},
WriteBytes: []uint64{0},
WriteQps: []uint64{0},
WriteKeys: []uint64{0},
},
}

task := buckets.NewCheckPeerTask(b)
re.True(tc.HotBucketCache.CheckAsync(task))
time.Sleep(time.Millisecond * 10)

tc.AddRegionStore(1, 3)
tc.AddRegionStore(2, 2)
tc.AddRegionStore(3, 2)

tc.UpdateStorageReadBytes(1, 6*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(2, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageReadBytes(3, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
// Region 1, 2 and 3 are hot regions.
addRegionInfo(tc, statistics.Read, []testRegionInfo{
{1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0},
})
ops, _ := hb.Schedule(tc, false)
re.Len(ops, 1)
re.Equal(operator.OpSplit, ops[0].Kind())
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 0)

tc.UpdateStorageWrittenBytes(1, 6*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(2, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
tc.UpdateStorageWrittenBytes(3, 1*units.MiB*statistics.StoreHeartBeatReportInterval)
// Region 1, 2 and 3 are hot regions.
addRegionInfo(tc, statistics.Write, []testRegionInfo{
{1, []uint64{1, 2, 3}, 4 * units.MiB, 0, 0},
})
hb, err = CreateScheduler(statistics.Write.String(), oc, storage.NewStorageWithMemoryBackend(), nil)
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 1)
re.Equal(operator.OpSplit, ops[0].Kind())
ops, _ = hb.Schedule(tc, false)
re.Len(ops, 0)
}

func TestSplitBuckets(t *testing.T) {
re := require.New(t)
statistics.Denoising = false
Expand Down
7 changes: 4 additions & 3 deletions pkg/schedule/schedulers/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,16 @@ func getKeyRanges(args []string) ([]core.KeyRange, error) {

type pendingInfluence struct {
op *operator.Operator
from, to uint64
froms []uint64
to uint64
origin statistics.Influence
maxZombieDuration time.Duration
}

func newPendingInfluence(op *operator.Operator, from, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence {
func newPendingInfluence(op *operator.Operator, froms []uint64, to uint64, infl statistics.Influence, maxZombieDur time.Duration) *pendingInfluence {
return &pendingInfluence{
op: op,
from: from,
froms: froms,
to: to,
origin: infl,
maxZombieDuration: maxZombieDur,
Expand Down