Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scheduler: hot-region-scheduler store pick consider the history loads to decrease incorrect operator #6276

Closed
wants to merge 17 commits into from
Closed
8 changes: 8 additions & 0 deletions pkg/mock/mockcluster/mockcluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
defaultRegionSize = 96 * units.MiB // 96MiB
)

var _ statistics.StoreStatInformer = &Cluster{}

// Cluster is used to mock a cluster for test purpose.
type Cluster struct {
*core.BasicCluster
Expand Down Expand Up @@ -114,6 +116,12 @@ func (mc *Cluster) GetStoresLoads() map[uint64][]float64 {
return mc.HotStat.GetStoresLoads()
}

// GetStoresHistoryLoads gets stores load statistics.
func (mc *Cluster) GetStoresHistoryLoads() map[uint64][][]float64 {
mc.HotStat.FilterUnhealthyStore(mc)
return mc.HotStat.GetStoresHistoryLoads()
}

// GetStore gets a store with a given store ID.
func (mc *Cluster) GetStore(storeID uint64) *core.StoreInfo {
return mc.Stores.GetStore(storeID)
Expand Down
1 change: 1 addition & 0 deletions pkg/movingaverage/avg_over_time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestMinFilled(t *testing.T) {
for aotSize := 2; aotSize < 10; aotSize++ {
for mfSize := 2; mfSize < 10; mfSize++ {
tm := NewTimeMedian(aotSize, mfSize, interval)
re.Equal([]float64{}, tm.GetAll())
for i := 0; i < aotSize; i++ {
re.Equal(0.0, tm.Get())
tm.Add(rate*interval.Seconds(), interval)
Expand Down
7 changes: 7 additions & 0 deletions pkg/movingaverage/exponential_moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ type EMA struct {
instantaneous float64
}

var _ MovingAvg = &EMA{}

// NewEMA returns an EMA.
func NewEMA(decays ...float64) *EMA {
decay := defaultDecay
Expand Down Expand Up @@ -70,6 +72,11 @@ func (e *EMA) Get() float64 {
return e.value
}

// GetAll returns all the data set.
func (e *EMA) GetAll() []float64 {
return []float64{e.Get()}
}

// Reset cleans the data set.
func (e *EMA) Reset() {
e.count = 0
Expand Down
7 changes: 7 additions & 0 deletions pkg/movingaverage/hull_moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ type HMA struct {
wma []*WMA
}

var _ MovingAvg = &HMA{}

// NewHMA returns a WMA.
func NewHMA(sizes ...float64) *HMA {
size := defaultHMASize
Expand Down Expand Up @@ -54,6 +56,11 @@ func (h *HMA) Get() float64 {
return h.wma[2].Get()
}

// GetAll returns all the data points.
func (h *HMA) GetAll() []float64 {
return h.wma[2].GetAll()
}

// Reset cleans the data set.
func (h *HMA) Reset() {
h.wma[0] = NewWMA(int(h.size / 2))
Expand Down
10 changes: 10 additions & 0 deletions pkg/movingaverage/max_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package movingaverage

import "github.com/elliotchance/pie/v2"

var _ MovingAvg = &MaxFilter{}

// MaxFilter works as a maximum filter with specified window size.
// There are at most `size` data points for calculating.
type MaxFilter struct {
Expand Down Expand Up @@ -50,6 +52,14 @@ func (r *MaxFilter) Get() float64 {
return pie.Max(records)
}

// GetAll returns all the data points.
func (r *MaxFilter) GetAll() []float64 {
if r.count < r.size {
return r.records[:r.count]
}
return r.records
}

// Reset cleans the data set.
func (r *MaxFilter) Reset() {
r.count = 0
Expand Down
1 change: 1 addition & 0 deletions pkg/movingaverage/max_filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ func TestMaxFilter(t *testing.T) {

mf := NewMaxFilter(5)
re.Equal(empty, mf.Get())
re.Equal([]float64{}, mf.GetAll())

checkReset(re, mf, empty)
checkAdd(re, mf, data, expected)
Expand Down
8 changes: 8 additions & 0 deletions pkg/movingaverage/median_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ func (r *MedianFilter) Get() float64 {
return r.result
}

// GetAll return all the data set.
func (r *MedianFilter) GetAll() []float64 {
if r.count < r.size {
return r.records[:r.count]
}
return r.records
}

// Reset cleans the data set.
func (r *MedianFilter) Reset() {
r.count = 0
Expand Down
2 changes: 2 additions & 0 deletions pkg/movingaverage/moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ type MovingAvg interface {
Add(data float64)
// Get returns the moving average.
Get() float64
// GetAll returns all the data points.
GetAll() []float64
// GetInstantaneous returns the value just added.
GetInstantaneous() float64
// Reset cleans the data set.
Expand Down
1 change: 1 addition & 0 deletions pkg/movingaverage/moving_average_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestMedianFilter(t *testing.T) {

mf := NewMedianFilter(5)
re.Equal(empty, mf.Get())
re.Equal([]float64{}, mf.GetAll())

checkReset(re, mf, empty)
checkAdd(re, mf, data, expected)
Expand Down
5 changes: 5 additions & 0 deletions pkg/movingaverage/time_median.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ func (t *TimeMedian) Get() float64 {
return t.mf.Get()
}

// GetAll returns all the data points in the median filter.
func (t *TimeMedian) GetAll() []float64 {
return t.mf.GetAll()
}

// Add adds recent change to TimeMedian.
func (t *TimeMedian) Add(delta float64, interval time.Duration) {
t.aot.Add(delta, interval)
Expand Down
10 changes: 10 additions & 0 deletions pkg/movingaverage/weight_moving_average.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package movingaverage

const defaultWMASize = 10

var _ MovingAvg = &WMA{}

// WMA works as a weight with specified window size.
// There are at most `size` data points for calculating.
// References:https://en.wikipedia.org/wiki/Moving_average#Weighted_moving_average
Expand Down Expand Up @@ -64,6 +66,14 @@ func (w *WMA) Get() float64 {
return w.score / float64((w.size+1)*w.size/2)
}

// GetAll returns all the data points.
func (w *WMA) GetAll() []float64 {
if w.count < w.size {
return w.records[:w.count]
}
return w.records
}

// Reset cleans the data set.
func (w *WMA) Reset() {
w.count = 0
Expand Down
94 changes: 82 additions & 12 deletions pkg/schedule/schedulers/hot_region.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,14 @@ func (h *baseHotScheduler) prepareForBalance(rw statistics.RWType, cluster sched
h.summaryPendingInfluence(cluster)
h.storesLoads = cluster.GetStoresLoads()
isTraceRegionFlow := cluster.GetOpts().IsTraceRegionFlow()
storesHistoryLoads := cluster.GetStoresHistoryLoads()

prepare := func(regionStats map[uint64][]*statistics.HotPeerStat, resource constant.ResourceKind) {
ty := buildResourceType(rw, resource)
h.stLoadInfos[ty] = statistics.SummaryStoresLoad(
h.stInfos,
h.storesLoads,
storesHistoryLoads,
regionStats,
isTraceRegionFlow,
rw, resource)
Expand Down Expand Up @@ -266,7 +268,6 @@ func (h *hotScheduler) dispatch(typ statistics.RWType, cluster schedule.Cluster)
if h.conf.IsForbidRWType(typ) {
return nil
}

switch typ {
case statistics.Read:
return h.balanceHotReadRegions(cluster)
Expand Down Expand Up @@ -463,6 +464,7 @@ type balanceSolver struct {
betterThan func(*solution) bool
rankToDimString func() string
checkByPriorityAndTolerance func(loads []float64, f func(int) bool) bool
checkHistoryLoadsByPriority func(loads [][]float64, f func(int) bool) bool
}

func (bs *balanceSolver) init() {
Expand Down Expand Up @@ -527,10 +529,13 @@ func (bs *balanceSolver) pickCheckPolicyV1() {
switch {
case bs.resourceTy == writeLeader:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly
case bs.sche.conf.IsStrictPickingStoreEnabled():
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceAllOf
bs.checkHistoryLoadsByPriority = bs.checkHistoryLoadsByPriorityAndToleranceAllOf
default:
bs.checkByPriorityAndTolerance = bs.checkByPriorityAndToleranceFirstOnly
bs.checkHistoryLoadsByPriority = bs.checkHistoryByPriorityAndToleranceFirstOnly
}
}

Expand Down Expand Up @@ -599,7 +604,6 @@ func (bs *balanceSolver) solve() []*operator.Operator {
if !bs.isValid() {
return nil
}

bs.cur = &solution{}
tryUpdateBestSolution := func() {
if label, ok := bs.filterUniformStore(); ok {
Expand Down Expand Up @@ -781,12 +785,17 @@ func (bs *balanceSolver) filterSrcStores() map[uint64]*statistics.StoreLoadDetai
continue
}

if bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("src-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkSrcByPriorityAndTolerance(detail.LoadPred.Min(), &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}

if !bs.checkSrcHistoryLoadByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, srcToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("src-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("src-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
}
return ret
}
Expand All @@ -797,6 +806,17 @@ func (bs *balanceSolver) checkSrcByPriorityAndTolerance(minLoad, expectLoad *sta
})
}

func (bs *balanceSolver) checkSrcHistoryLoadByPriorityAndTolerance(current, expectLoad *statistics.StoreLoad, toleranceRatio float64) bool {
if len(current.HistoryLoads) == 0 {
return true
}
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
return current.HistoryLoads[i][j] > toleranceRatio*expectLoad.HistoryLoads[i][j]
})
})
}

// filterHotPeers filtered hot peers from statistics.HotPeerStat and deleted the peer if its region is in pending status.
// The returned hotPeer count in controlled by `max-peer-number`.
func (bs *balanceSolver) filterHotPeers(storeLoad *statistics.StoreLoadDetail) (ret []*statistics.HotPeerStat) {
Expand Down Expand Up @@ -995,12 +1015,17 @@ func (bs *balanceSolver) pickDstStores(filters []filter.Filter, candidates []*st
}
if filter.Target(bs.GetOpts(), store, filters) {
id := store.GetID()
if bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
ret[id] = detail
hotSchedulerResultCounter.WithLabelValues("dst-store-succ", strconv.FormatUint(id, 10)).Inc()
} else {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed", strconv.FormatUint(id, 10)).Inc()
if !bs.checkDstByPriorityAndTolerance(detail.LoadPred.Max(), &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}

if !bs.checkDstHistoryLoadsByPriorityAndTolerance(&detail.LoadPred.Current, &detail.LoadPred.Expect, dstToleranceRatio) {
hotSchedulerResultCounter.WithLabelValues("dst-store-history-loads-failed-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
continue
}
hotSchedulerResultCounter.WithLabelValues("dst-store-succ-"+bs.resourceTy.String(), strconv.FormatUint(id, 10)).Inc()
ret[id] = detail
}
}
return ret
Expand All @@ -1012,6 +1037,14 @@ func (bs *balanceSolver) checkDstByPriorityAndTolerance(maxLoad, expect *statist
})
}

func (bs *balanceSolver) checkDstHistoryLoadsByPriorityAndTolerance(current, expect *statistics.StoreLoad, toleranceRatio float64) bool {
return bs.checkHistoryLoadsByPriority(current.HistoryLoads, func(i int) bool {
return slice.AllOf(current.HistoryLoads[i], func(j int) bool {
return current.HistoryLoads[i][j]*toleranceRatio < expect.HistoryLoads[i][j]
})
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -1021,6 +1054,15 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAllOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryLoadsByPriorityAndToleranceAllOf(loads [][]float64, f func(int) bool) bool {
return slice.AllOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return true
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
Expand All @@ -1030,10 +1072,23 @@ func (bs *balanceSolver) checkByPriorityAndToleranceAnyOf(loads []float64, f fun
})
}

func (bs *balanceSolver) checkHistoryByPriorityAndToleranceAnyOf(loads [][]float64, f func(int) bool) bool {
return slice.AnyOf(loads, func(i int) bool {
if bs.isSelectedDim(i) {
return f(i)
}
return false
})
}

func (bs *balanceSolver) checkByPriorityAndToleranceFirstOnly(loads []float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) checkHistoryByPriorityAndToleranceFirstOnly(_ [][]float64, f func(int) bool) bool {
return f(bs.firstPriority)
}

func (bs *balanceSolver) enableExpectation() bool {
return bs.sche.conf.GetDstToleranceRatio() > 0 && bs.sche.conf.GetSrcToleranceRatio() > 0
}
Expand Down Expand Up @@ -1542,6 +1597,21 @@ func (ty opType) String() string {

type resourceType int

func (rt resourceType) String() string {
switch rt {
case writePeer:
return "write-peer"
case writeLeader:
return "write-leader"
case readLeader:
return "read-leader"
case readPeer:
return "read-peer"
default:
return "unknown"
}
}

const (
writePeer resourceType = iota
writeLeader
Expand Down
Loading