From d08365b127b92e4fcec763cb563ebf568a728985 Mon Sep 17 00:00:00 2001 From: JF Technology <57098161+jf-tech@users.noreply.github.com> Date: Wed, 23 Feb 2022 15:35:09 +1300 Subject: [PATCH 1/3] add more maths helpers --- maths/maths.go | 37 ++++++++++++++++++++++++ maths/maths_test.go | 68 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) diff --git a/maths/maths.go b/maths/maths.go index 9015bd7..9af577b 100644 --- a/maths/maths.go +++ b/maths/maths.go @@ -1,5 +1,7 @@ package maths +import "math" + // MaxInt returns the bigger value of the two input ints. func MaxInt(x, y int) int { if x > y { @@ -16,6 +18,41 @@ func MinInt(x, y int) int { return y } +// AbsInt returns the absolute value of an int value. +func AbsInt(a int) int { + if a < 0 { + return -a + } + return a +} + // MaxIntValue is the max value for type int. // https://groups.google.com/forum/#!msg/golang-nuts/a9PitPAHSSU/ziQw1-QHw3EJ const MaxIntValue = int(^uint(0) >> 1) + +// MaxI64 returns the bigger value of the two input int64s. +func MaxI64(x, y int64) int64 { + if x > y { + return x + } + return y +} + +// MinI64 returns the smaller value of the two input int64s. +func MinI64(x, y int64) int64 { + if x < y { + return x + } + return y +} + +// AbsI64 returns the absolute value of an int64 value. +func AbsI64(a int64) int64 { + if a < 0 { + return -a + } + return a +} + +// MaxI64Value is the max value for type int64. +const MaxI64Value = math.MaxInt64 diff --git a/maths/maths_test.go b/maths/maths_test.go index 6759098..a53e3e9 100644 --- a/maths/maths_test.go +++ b/maths/maths_test.go @@ -43,3 +43,71 @@ func TestMinMaxInt(t *testing.T) { }) } } + +func TestAbsIntAndInt64(t *testing.T) { + tests := []struct { + name string + in int + expected int + }{ + { + name: "in > 0", + in: 1, + expected: 1, + }, + { + name: "in == 0", + in: 0, + expected: 0, + }, + { + name: "in < 0", + in: -4, + expected: 4, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expected, AbsInt(test.in)) + assert.Equal(t, int64(test.expected), AbsI64(int64(test.in))) + }) + } +} + +func TestMinMaxI64(t *testing.T) { + tests := []struct { + name string + x int64 + y int64 + expectedMin int64 + expectedMax int64 + }{ + { + name: "x less than y", + x: 1, + y: 2, + expectedMin: 1, + expectedMax: 2, + }, + { + name: "x greater than y", + x: 2, + y: 1, + expectedMin: 1, + expectedMax: 2, + }, + { + name: "x equal to y", + x: 2, + y: 2, + expectedMin: 2, + expectedMax: 2, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + assert.Equal(t, test.expectedMin, MinI64(test.x, test.y)) + assert.Equal(t, test.expectedMax, MaxI64(test.x, test.y)) + }) + } +} From ff69e6204a9cf0d0f9a3c2c6c80ef0ba076dba1d Mon Sep 17 00:00:00 2001 From: JF Technology <57098161+jf-tech@users.noreply.github.com> Date: Thu, 24 Feb 2022 14:42:37 +1300 Subject: [PATCH 2/3] Add NewTimedSlidingWindowI64 --- times/clock.go | 17 ++++ times/clock_test.go | 15 +++ times/timedSlidingWindow.go | 156 +++++++++++++++++++++++++++++++ times/timedSlidingWindow_test.go | 90 ++++++++++++++++++ 4 files changed, 278 insertions(+) create mode 100644 times/clock.go create mode 100644 times/clock_test.go create mode 100644 times/timedSlidingWindow.go create mode 100644 times/timedSlidingWindow_test.go diff --git a/times/clock.go b/times/clock.go new file mode 100644 index 0000000..b5db075 --- /dev/null +++ b/times/clock.go @@ -0,0 +1,17 @@ +package times + +import "time" + +type Clock interface { + Now() time.Time +} + +type osClock struct{} + +func (*osClock) Now() time.Time { + return time.Now() +} + +func NewOSClock() *osClock { + return &osClock{} +} diff --git a/times/clock_test.go b/times/clock_test.go new file mode 100644 index 0000000..568be8c --- /dev/null +++ b/times/clock_test.go @@ -0,0 +1,15 @@ +package times + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOSClock(t *testing.T) { + c := NewOSClock() + cnow := c.Now() + osnow := time.Now() + assert.True(t, cnow.Before(osnow) || cnow.Equal(osnow)) +} diff --git a/times/timedSlidingWindow.go b/times/timedSlidingWindow.go new file mode 100644 index 0000000..a3debb1 --- /dev/null +++ b/times/timedSlidingWindow.go @@ -0,0 +1,156 @@ +package times + +import ( + "time" + + "github.com/jf-tech/go-corelib/maths" +) + +/* +Until we move to golang generic, the interface{} based generic implementation is simply too +slow, compared with raw type (int, int64, etc) implementation. For example, we compared this +generic interface{} based implementation against an nearly identical but with direct int type +implementation, the benchmark is not even close: too many int<->interface{} conversion induced +heap escape: + +BenchmarkTimedSlidingWindowIntRaw-8 100 11409978 ns/op 600 B/op 3 allocs/op +BenchmarkTimedSlidingWindowIntIFace-8 31 37520116 ns/op 11837979 B/op 1479595 allocs/op + +So the decision is to comment out the interface{} implementation for reference only. + +type TimedSlidingWindowOp func(a, b interface{}) interface{} + +type TimedSlidingWindowCfg struct { + Clock Clock + Window, Bucket time.Duration + Adder, Subtracter TimedSlidingWindowOp +} + +type TimedSlidingWindow struct { + cfg TimedSlidingWindowCfg + n int + buckets []interface{} + start, end int + startTime time.Time + total interface{} +} + +func (s *TimedSlidingWindow) Add(amount interface{}) { + now := s.cfg.Clock.Now() + idx := int(now.Sub(s.startTime) / s.cfg.Bucket) + e2 := s.end + if s.end < s.start { + e2 += s.n + } + if s.start+idx-e2 < s.n { + for i := e2 + 1; i <= s.start+idx; i++ { + s.total = s.cfg.Subtracter(s.total, s.buckets[i%s.n]) + s.buckets[i%s.n] = nil + } + s.end = (s.start + idx) % s.n + newStart := maths.MaxInt(s.start+idx-s.n+1, s.start) + s.startTime = s.startTime.Add(time.Duration(newStart-s.start) * s.cfg.Bucket) + s.start = newStart + s.buckets[s.end] = s.cfg.Adder(s.buckets[s.end], amount) + s.total = s.cfg.Adder(s.total, amount) + } else { + for i := 0; i < s.n; i++ { + s.buckets[i] = nil + } + s.start, s.end = 0, 0 + s.buckets[0] = amount + s.total = amount + s.startTime = now + } +} + +func (s *TimedSlidingWindow) Total() interface{} { + s.Add(nil) + return s.total +} + +func NewTimedSlidingWindow(cfg TimedSlidingWindowCfg) *TimedSlidingWindow { + if cfg.Window == 0 || cfg.Window%cfg.Bucket != 0 { + panic("time window must be non-zero multiple of bucket") + } + n := int(cfg.Window / cfg.Bucket) + return &TimedSlidingWindow{ + cfg: cfg, + n: n, + buckets: make([]interface{}, n), + startTime: cfg.Clock.Now(), + } +} +*/ + +type TimedSlidingWindowI64 struct { + clock Clock + window, bucket time.Duration + n int + buckets []int64 + start, end int + startTime time.Time + total int64 +} + +func (t *TimedSlidingWindowI64) Add(amount int64) { + now := t.clock.Now() + idx := int(now.Sub(t.startTime) / t.bucket) + e2 := t.end + if t.end < t.start { + e2 += t.n + } + if t.start+idx-e2 < t.n { + for i := e2 + 1; i <= t.start+idx; i++ { + t.total -= t.buckets[i%t.n] + t.buckets[i%t.n] = 0 + } + t.end = (t.start + idx) % t.n + newStart := maths.MaxInt(t.start+idx-t.n+1, t.start) + t.startTime = t.startTime.Add(time.Duration(newStart-t.start) * t.bucket) + t.start = newStart + t.buckets[t.end] += amount + t.total += amount + } else { + for i := 0; i < t.n; i++ { + t.buckets[i] = 0 + } + t.start, t.end = 0, 0 + t.buckets[0] = amount + t.total = amount + t.startTime = now + } +} + +func (t *TimedSlidingWindowI64) Total() int64 { + t.Add(0) + return t.total +} + +func (t *TimedSlidingWindowI64) Reset() { + for i := 0; i < t.n; i++ { + t.buckets[i] = 0 + } + t.start, t.end = 0, 0 + t.startTime = t.clock.Now() + t.total = 0 +} + +func NewTimedSlidingWindowI64(window, bucket time.Duration, clock ...Clock) *TimedSlidingWindowI64 { + if window == 0 || bucket == 0 || window%bucket != 0 { + panic("window must be a non-zero multiple of non-zero bucket") + } + c := Clock(NewOSClock()) + if len(clock) > 0 { + c = clock[0] + } + n := int(window / bucket) + return &TimedSlidingWindowI64{ + clock: c, + window: window, + bucket: bucket, + n: n, + buckets: make([]int64, n), + startTime: c.Now(), + } +} diff --git a/times/timedSlidingWindow_test.go b/times/timedSlidingWindow_test.go new file mode 100644 index 0000000..23cf78a --- /dev/null +++ b/times/timedSlidingWindow_test.go @@ -0,0 +1,90 @@ +package times + +import ( + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +type testClock struct { + now time.Duration +} + +func (tc *testClock) Now() time.Time { + return time.Unix(0, int64(tc.now.Round(time.Nanosecond))) +} + +func (tc *testClock) adv(d time.Duration) { + tc.now += d +} + +func TestTimedSlidingWindowI64(t *testing.T) { + tc := &testClock{} + sw := NewTimedSlidingWindowI64(5*time.Second, 1*time.Second, tc) + test := func(adv time.Duration, add int64, expected []int64) { + tc.adv(adv) + sw.Add(add) + assert.Equal(t, expected, sw.buckets) + total := int64(0) + for i := 0; i < len(expected); i++ { + total += expected[i] + } + assert.Equal(t, total, sw.Total()) + } + // [ 0 1 2 3 4] + // 5 + test(0, 5, []int64{5, 0, 0, 0, 0}) + + // [ 0 1 2 3 4] + // 11 + test(0, 6, []int64{11, 0, 0, 0, 0}) + + // [ 0 1 2 3 4] + // 11 4 + test(2*time.Second, 4, []int64{11, 0, 4, 0, 0}) + + // [ 0 1 2 3 4] + // 2 4 + test(4*time.Second, 2, []int64{0, 2, 4, 0, 0}) + + // [ 0 1 2 3 4] + // 9 + test(10*time.Second, 9, []int64{9, 0, 0, 0, 0}) + + sw.Reset() + test(10*time.Second, 7, []int64{7, 0, 0, 0, 0}) + + assert.PanicsWithValue(t, "window must be a non-zero multiple of non-zero bucket", func() { + NewTimedSlidingWindowI64(0, time.Second) + }) + + assert.PanicsWithValue(t, "window must be a non-zero multiple of non-zero bucket", func() { + NewTimedSlidingWindowI64(time.Minute, 0, tc) + }) +} + +const ( + tswBenchSeed = int64(1234) + tswBenchWindow = time.Minute + tswBenchBucket = time.Second + tswBenchAddCount = 100000 + tswBenchAddRange = 1000 + tswBenchClockAdvRange = 2 * time.Minute +) + +func BenchmarkTimedSlidingWindowI64(b *testing.B) { + rand.Seed(tswBenchSeed) + tc := &testClock{} + sw := NewTimedSlidingWindowI64(tswBenchWindow, tswBenchBucket, tc) + for i := 0; i < b.N; i++ { + sw.Reset() + for j := 0; j < tswBenchAddCount; j++ { + tc.adv(time.Duration(rand.Int63() % int64(tswBenchClockAdvRange))) + add := rand.Int63() % tswBenchAddRange + sw.Add(add) + sw.Total() + } + } +} From d190269efc85d80d33c52bc0bf9e06138aa6b908 Mon Sep 17 00:00:00 2001 From: JF Technology <57098161+jf-tech@users.noreply.github.com> Date: Thu, 24 Feb 2022 14:49:14 +1300 Subject: [PATCH 3/3] comments --- times/clock.go | 2 ++ times/timedSlidingWindow.go | 10 ++++++++++ 2 files changed, 12 insertions(+) diff --git a/times/clock.go b/times/clock.go index b5db075..fe06ee7 100644 --- a/times/clock.go +++ b/times/clock.go @@ -2,6 +2,7 @@ package times import "time" +// Clock tells the current time. type Clock interface { Now() time.Time } @@ -12,6 +13,7 @@ func (*osClock) Now() time.Time { return time.Now() } +// NewOSClock returns a Clock interface implementation that uses time.Now. func NewOSClock() *osClock { return &osClock{} } diff --git a/times/timedSlidingWindow.go b/times/timedSlidingWindow.go index a3debb1..9c45968 100644 --- a/times/timedSlidingWindow.go +++ b/times/timedSlidingWindow.go @@ -83,6 +83,7 @@ func NewTimedSlidingWindow(cfg TimedSlidingWindowCfg) *TimedSlidingWindow { } */ +// TimedSlidingWindowI64 offers a way to aggregate int64 values over a time-based sliding window. type TimedSlidingWindowI64 struct { clock Clock window, bucket time.Duration @@ -93,6 +94,7 @@ type TimedSlidingWindowI64 struct { total int64 } +// Add adds a new int64 value into the current sliding window. func (t *TimedSlidingWindowI64) Add(amount int64) { now := t.clock.Now() idx := int(now.Sub(t.startTime) / t.bucket) @@ -122,11 +124,13 @@ func (t *TimedSlidingWindowI64) Add(amount int64) { } } +// Total returns the aggregated int64 value over the current sliding window. func (t *TimedSlidingWindowI64) Total() int64 { t.Add(0) return t.total } +// Reset resets the sliding window and clear the existing aggregated value. func (t *TimedSlidingWindowI64) Reset() { for i := 0; i < t.n; i++ { t.buckets[i] = 0 @@ -136,6 +140,12 @@ func (t *TimedSlidingWindowI64) Reset() { t.total = 0 } +// NewTimedSlidingWindowI64 creates a new time-based sliding window for int64 value +// aggregation. window is the sliding window "width", and bucket is the granularity of +// how the window is divided. Both must be non-zero and window must be of an integer +// multiple of bucket. Be careful of not making bucket too small as it would increase +// the internal bucket memory allocation. If no clock is passed in, then os time.Now +// clock will be used. func NewTimedSlidingWindowI64(window, bucket time.Duration, clock ...Clock) *TimedSlidingWindowI64 { if window == 0 || bucket == 0 || window%bucket != 0 { panic("window must be a non-zero multiple of non-zero bucket")