Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

movingaverage: reduce memory consume #5798

Merged
merged 3 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/mock/mockcluster/mockcluster.go
Expand Up @@ -389,7 +389,7 @@ func (mc *Cluster) AddRegionWithReadInfo(
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReportInterval(0, reportInterval))
r = r.Clone(core.SetReadQuery(readQuery))
filledNum := mc.HotCache.GetFilledPeriod(statistics.Read)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand All @@ -410,7 +410,7 @@ func (mc *Cluster) AddRegionWithPeerReadInfo(regionID, leaderStoreID, targetStor
otherPeerStoreIDs []uint64, filledNums ...int) []*statistics.HotPeerStat {
r := mc.newMockRegionInfo(regionID, leaderStoreID, otherPeerStoreIDs...)
r = r.Clone(core.SetReadBytes(readBytes), core.SetReadKeys(readKeys), core.SetReportInterval(0, reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.Read)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand Down Expand Up @@ -438,7 +438,7 @@ func (mc *Cluster) AddRegionLeaderWithReadInfo(
r = r.Clone(core.SetReadKeys(readKeys))
r = r.Clone(core.SetReadQuery(readQuery))
r = r.Clone(core.SetReportInterval(0, reportInterval))
filledNum := mc.HotCache.GetFilledPeriod(statistics.Read)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand Down Expand Up @@ -466,7 +466,7 @@ func (mc *Cluster) AddLeaderRegionWithWriteInfo(
r = r.Clone(core.SetReportInterval(0, reportInterval))
r = r.Clone(core.SetWrittenQuery(writtenQuery))

filledNum := mc.HotCache.GetFilledPeriod(statistics.Write)
filledNum := statistics.DefaultAotSize
if len(filledNums) > 0 {
filledNum = filledNums[0]
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/movingaverage/avg_over_time.go
Expand Up @@ -131,3 +131,11 @@ func (aot *AvgOverTime) Clone() *AvgOverTime {
func (aot *AvgOverTime) GetIntervalSum() time.Duration {
return aot.intervalSum
}

// GetInstantaneous returns the value just added.
func (aot *AvgOverTime) GetInstantaneous() float64 {
if aot.que.Len() == 0 || aot.que.Back() == nil {
return 0
}
return aot.que.Back().(deltaWithInterval).delta
}
2 changes: 1 addition & 1 deletion pkg/movingaverage/avg_over_time_test.go
Expand Up @@ -84,7 +84,7 @@ func TestMinFilled(t *testing.T) {
for aotSize := 2; aotSize < 10; aotSize++ {
for mfSize := 2; mfSize < 10; mfSize++ {
tm := NewTimeMedian(aotSize, mfSize, interval)
for i := 0; i < tm.GetFilledPeriod(); i++ {
for i := 0; i < aotSize; i++ {
re.Equal(0.0, tm.Get())
tm.Add(rate*interval.Seconds(), interval)
}
Expand Down
27 changes: 11 additions & 16 deletions pkg/movingaverage/median_filter.go
Expand Up @@ -23,12 +23,11 @@ type MedianFilter struct {
// It is not thread safe to read and write records at the same time.
// If there are concurrent read and write, the read may get an old value.
// And we should avoid concurrent write.
records []float64
size uint64
count uint64
instantaneous float64
isUpdated bool
result float64
records []float64
size uint64
count uint64
isUpdated bool
result float64
}

// NewMedianFilter returns a MedianFilter.
Expand All @@ -43,7 +42,6 @@ func NewMedianFilter(size int) *MedianFilter {

// Add adds a data point.
func (r *MedianFilter) Add(n float64) {
r.instantaneous = n
r.records[r.count%r.size] = n
r.count++
r.isUpdated = true
Expand All @@ -68,34 +66,31 @@ func (r *MedianFilter) Get() float64 {

// Reset cleans the data set.
func (r *MedianFilter) Reset() {
r.instantaneous = 0
r.count = 0
r.isUpdated = true
}

// Set = Reset + Add.
func (r *MedianFilter) Set(n float64) {
r.instantaneous = n
r.records[0] = n
r.count = 1
r.isUpdated = true
}

// GetInstantaneous returns the value just added.
func (r *MedianFilter) GetInstantaneous() float64 {
return r.instantaneous
return r.records[(r.count-1)%r.size]
}

// Clone returns a copy of MedianFilter
func (r *MedianFilter) Clone() *MedianFilter {
records := make([]float64, len(r.records))
copy(records, r.records)
return &MedianFilter{
records: records,
size: r.size,
count: r.count,
instantaneous: r.instantaneous,
isUpdated: r.isUpdated,
result: r.result,
records: records,
size: r.size,
count: r.count,
isUpdated: r.isUpdated,
result: r.result,
}
}
1 change: 1 addition & 0 deletions pkg/movingaverage/moving_average_test.go
Expand Up @@ -43,6 +43,7 @@ func checkAdd(re *require.Assertions, ma MovingAvg, data []float64, expected []f
re.Len(data, len(expected))
for i, x := range data {
ma.Add(x)
re.Equal(x, ma.GetInstantaneous())
re.LessOrEqual(math.Abs(ma.Get()-expected[i]), 1e-7)
}
}
Expand Down
28 changes: 7 additions & 21 deletions pkg/movingaverage/time_median.go
Expand Up @@ -21,20 +21,15 @@ import "time"
// Delay is aotSize * mfSize * reportInterval/4
// and the min filled period is aotSize * reportInterval, which is not related with mfSize
type TimeMedian struct {
aot *AvgOverTime
mf *MedianFilter
aotSize int
mfSize int
instantaneous float64
aot *AvgOverTime
mf *MedianFilter
}

// NewTimeMedian returns a TimeMedian with given size.
func NewTimeMedian(aotSize, mfSize int, reportInterval time.Duration) *TimeMedian {
return &TimeMedian{
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
mf: NewMedianFilter(mfSize),
aotSize: aotSize,
mfSize: mfSize,
aot: NewAvgOverTime(time.Duration(aotSize) * reportInterval),
mf: NewMedianFilter(mfSize),
}
}

Expand All @@ -45,7 +40,6 @@ func (t *TimeMedian) Get() float64 {

// Add adds recent change to TimeMedian.
func (t *TimeMedian) Add(delta float64, interval time.Duration) {
t.instantaneous = delta / interval.Seconds()
t.aot.Add(delta, interval)
if t.aot.IsFull() {
t.mf.Add(t.aot.Get())
Expand All @@ -57,23 +51,15 @@ func (t *TimeMedian) Set(avg float64) {
t.mf.Set(avg)
}

// GetFilledPeriod returns filled period.
func (t *TimeMedian) GetFilledPeriod() int { // it is unrelated with mfSize
return t.aotSize
}

// GetInstantaneous returns instantaneous speed
func (t *TimeMedian) GetInstantaneous() float64 {
return t.instantaneous
return t.aot.GetInstantaneous()
}

// Clone returns a copy of TimeMedian
func (t *TimeMedian) Clone() *TimeMedian {
return &TimeMedian{
aot: t.aot.Clone(),
mf: t.mf.Clone(),
aotSize: t.aotSize,
mfSize: t.mfSize,
instantaneous: t.instantaneous,
aot: t.aot.Clone(),
mf: t.mf.Clone(),
}
}
15 changes: 0 additions & 15 deletions server/statistics/hot_cache.go
Expand Up @@ -16,9 +16,7 @@ package statistics

import (
"context"
"time"

"github.com/tikv/pd/pkg/movingaverage"
"github.com/tikv/pd/server/core"
)

Expand Down Expand Up @@ -190,19 +188,6 @@ func (w *HotCache) ExpiredWriteItems(region *core.RegionInfo) []*HotPeerStat {
return w.writeCache.collectExpiredItems(region)
}

// GetFilledPeriod returns filled period.
// This is used for mockcluster, for test purpose.
func (w *HotCache) GetFilledPeriod(kind RWType) int {
var reportIntervalSecs int
switch kind {
case Write:
reportIntervalSecs = w.writeCache.kind.ReportInterval()
case Read:
reportIntervalSecs = w.readCache.kind.ReportInterval()
}
return movingaverage.NewTimeMedian(DefaultAotSize, rollingWindowsSize, time.Duration(reportIntervalSecs)*time.Second).GetFilledPeriod()
}

// GetThresholds returns thresholds.
// This is used for test purpose.
func (w *HotCache) GetThresholds(kind RWType, storeID uint64) []float64 {
Expand Down