From 97db1984ac5f14cd53e4d09b4ceb827f45b23f2e Mon Sep 17 00:00:00 2001 From: Keran Yang Date: Fri, 16 Jun 2023 12:23:13 -0400 Subject: [PATCH] refactor: remove redundant delta calculations for rater (#795) When we calculate processing rate, we calculate deltas between all pairs of timestamped counts within lookback seconds. If two consecutive timestamped counts all finished collecting metrics, then the delta calculation should only execute once. This PR introduce the concept of `window`. When a timestamp counts finish collecting counts, we mark the it as window-closed and calculate the delta and save the delta. Such that future GetRate calls won't need to re-calculate the delta and instead directly use the delta attribute of that closed window. This brings down the time complexity of calculating vertex level rate from O(lookback seconds * no. of pods) to O(lookback seconds) Signed-off-by: Keran Yang --- pkg/daemon/server/service/rater/helper.go | 74 +++++++++++-------- .../server/service/rater/helper_test.go | 50 ++++++++++++- .../service/rater/timestamped_counts.go | 57 +++++++++++++- .../service/rater/timestamped_counts_test.go | 24 ++++++ 4 files changed, 168 insertions(+), 37 deletions(-) diff --git a/pkg/daemon/server/service/rater/helper.go b/pkg/daemon/server/service/rater/helper.go index f0055332b..3b9252e35 100644 --- a/pkg/daemon/server/service/rater/helper.go +++ b/pkg/daemon/server/service/rater/helper.go @@ -26,16 +26,31 @@ const IndexNotFound = -1 // UpdateCount updates the count of processed messages for a pod at a given time func UpdateCount(q *sharedqueue.OverflowQueue[*TimestampedCounts], time int64, podName string, count float64) { + items := q.Items() + // find the element matching the input timestamp and update it - for _, i := range q.Items() { + for _, i := range items { if i.timestamp == time { i.Update(podName, count) return } } + // if we cannot find a matching element, it means we need to add a new timestamped count to the queue tc := NewTimestampedCounts(time) tc.Update(podName, count) + + // close the window for the most recent timestamped count + switch n := q.Length(); n { + case 0: + // if the queue is empty, we just append the new timestamped count + case 1: + // if the queue has only one element, we close the window for this element + items[0].CloseWindow(nil) + default: + // if the queue has more than one element, we close the window for the most recent element + items[n-1].CloseWindow(items[n-2]) + } q.Append(tc) } @@ -47,43 +62,25 @@ func CalculateRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookbackSec } counts := q.Items() startIndex := findStartIndex(lookbackSeconds, counts) - if startIndex == IndexNotFound { + endIndex := findEndIndex(counts) + if startIndex == IndexNotFound || endIndex == IndexNotFound { return 0 } delta := float64(0) // time diff in seconds. - timeDiff := counts[n-1].timestamp - counts[startIndex].timestamp + timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp if timeDiff == 0 { // if the time difference is 0, we return 0 to avoid division by 0 // this should not happen in practice because we are using a 10s interval return 0 } - for i := startIndex; i < n-1; i++ { - delta = delta + calculateDelta(counts[i], counts[i+1]) - } - return delta / float64(timeDiff) -} - -func calculateDelta(c1, c2 *TimestampedCounts) float64 { - tc1 := c1.Snapshot() - tc2 := c2.Snapshot() - delta := float64(0) - // Iterate over the podCounts of the second TimestampedCounts - for pod, count2 := range tc2 { - // If the pod also exists in the first TimestampedCounts - if count1, ok := tc1[pod]; ok { - // If the count has decreased, it means the pod restarted - if count2 < count1 { - delta += count2 - } else { // If the count has increased or stayed the same - delta += count2 - count1 - } - } else { // If the pod only exists in the second TimestampedCounts, it's a new pod - delta += count2 + for i := startIndex; i < endIndex; i++ { + if counts[i+1] != nil && counts[i+1].IsWindowClosed() { + delta += counts[i+1].delta } } - return delta + return delta / float64(timeDiff) } // CalculatePodRate calculates the rate of a pod in the last lookback seconds @@ -94,20 +91,23 @@ func CalculatePodRate(q *sharedqueue.OverflowQueue[*TimestampedCounts], lookback } counts := q.Items() startIndex := findStartIndex(lookbackSeconds, counts) - if startIndex == IndexNotFound { + endIndex := findEndIndex(counts) + if startIndex == IndexNotFound || endIndex == IndexNotFound { return 0 } delta := float64(0) // time diff in seconds. - timeDiff := counts[n-1].timestamp - counts[startIndex].timestamp + timeDiff := counts[endIndex].timestamp - counts[startIndex].timestamp if timeDiff == 0 { // if the time difference is 0, we return 0 to avoid division by 0 // this should not happen in practice because we are using a 10s interval return 0 } - for i := startIndex; i < n-1; i++ { - delta = delta + calculatePodDelta(counts[i], counts[i+1], podName) + for i := startIndex; i < endIndex; i++ { + if c1, c2 := counts[i], counts[i+1]; c1 != nil && c2 != nil && c1.IsWindowClosed() && c2.IsWindowClosed() { + delta += calculatePodDelta(c1, c2, podName) + } } return delta / float64(timeDiff) } @@ -133,14 +133,14 @@ func calculatePodDelta(c1, c2 *TimestampedCounts, podName string) float64 { func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int { n := len(counts) now := time.Now().Truncate(time.Second * 10).Unix() - if now-counts[n-2].timestamp > lookbackSeconds { + if n < 2 || now-counts[n-2].timestamp > lookbackSeconds { // if the second last element is already outside the lookback window, we return IndexNotFound return IndexNotFound } startIndex := n - 2 for i := n - 2; i >= 0; i-- { - if now-counts[i].timestamp <= lookbackSeconds { + if now-counts[i].timestamp <= lookbackSeconds && counts[i].IsWindowClosed() { startIndex = i } else { break @@ -148,3 +148,13 @@ func findStartIndex(lookbackSeconds int64, counts []*TimestampedCounts) int { } return startIndex } + +func findEndIndex(counts []*TimestampedCounts) int { + for i := len(counts) - 1; i >= 0; i-- { + // if a window is not closed, we exclude it from the rate calculation + if counts[i].IsWindowClosed() { + return i + } + } + return IndexNotFound +} diff --git a/pkg/daemon/server/service/rater/helper_test.go b/pkg/daemon/server/service/rater/helper_test.go index 1c7abb51d..19cfeb00e 100644 --- a/pkg/daemon/server/service/rater/helper_test.go +++ b/pkg/daemon/server/service/rater/helper_test.go @@ -78,7 +78,7 @@ func TestUpdateCount(t *testing.T) { assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"]) }) - t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenUpdateNewTimeWithPod", func(t *testing.T) { + t.Run("givenTimeNotExistsCountAvailable_whenUpdate_thenUpdateNewTimeWithPodAndCloseWindowForPrevTime", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) tc := NewTimestampedCounts(TestTime) tc.Update("pod1", 10.0) @@ -89,6 +89,8 @@ func TestUpdateCount(t *testing.T) { assert.Equal(t, 2, q.Length()) assert.Equal(t, 10.0, q.Items()[0].podCounts["pod1"]) assert.Equal(t, 20.0, q.Items()[1].podCounts["pod1"]) + assert.Equal(t, true, tc.IsWindowClosed()) + assert.Equal(t, 10.0, tc.delta) }) t.Run("givenTimeNotExistsCountNotAvailable_whenUpdate_thenAddEmptyItem", func(t *testing.T) { @@ -128,12 +130,15 @@ func TestCalculateRate(t *testing.T) { tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc1.Update("pod1", 5.0) q.Append(tc1) + tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc2.Update("pod1", 10.0) q.Append(tc2) + tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc3.Update("pod1", 20.0) q.Append(tc3) + tc3.CloseWindow(tc2) assert.Equal(t, 0.0, CalculateRate(q, 5)) assert.Equal(t, 1.0, CalculateRate(q, 15)) @@ -146,6 +151,33 @@ func TestCalculateRate(t *testing.T) { assert.Equal(t, 0.75, CalculatePodRate(q, 100, "pod1")) }) + t.Run("singlePod_givenCountIncreases_whenCalculateRate_thenReturnRate_excludeOpenWindow", func(t *testing.T) { + q := sharedqueue.New[*TimestampedCounts](1800) + now := time.Now() + + tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) + tc1.Update("pod1", 5.0) + q.Append(tc1) + tc1.CloseWindow(nil) + tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) + tc2.Update("pod1", 10.0) + q.Append(tc2) + tc2.CloseWindow(tc1) + tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) + tc3.Update("pod1", 20.0) + q.Append(tc3) + + assert.Equal(t, 0.0, CalculateRate(q, 5)) + assert.Equal(t, 0.0, CalculateRate(q, 15)) + assert.Equal(t, 0.5, CalculateRate(q, 25)) + assert.Equal(t, 0.5, CalculateRate(q, 100)) + + assert.Equal(t, 0.0, CalculatePodRate(q, 5, "pod1")) + assert.Equal(t, 0.0, CalculatePodRate(q, 15, "pod1")) + assert.Equal(t, 0.5, CalculatePodRate(q, 25, "pod1")) + assert.Equal(t, 0.5, CalculatePodRate(q, 100, "pod1")) + }) + t.Run("singlePod_givenCountDecreases_whenCalculateRate_thenReturnRate", func(t *testing.T) { q := sharedqueue.New[*TimestampedCounts](1800) now := time.Now() @@ -153,15 +185,19 @@ func TestCalculateRate(t *testing.T) { tc1 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 30) tc1.Update("pod1", 200.0) q.Append(tc1) + tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 20) tc2.Update("pod1", 100.0) q.Append(tc2) + tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update("pod1", 50.0) q.Append(tc3) + tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update("pod1", 80.0) q.Append(tc4) + tc4.CloseWindow(tc3) assert.Equal(t, 0.0, CalculateRate(q, 5)) assert.Equal(t, 3.0, CalculateRate(q, 15)) @@ -184,18 +220,22 @@ func TestCalculateRate(t *testing.T) { tc1.Update("pod1", 200.0) tc1.Update("pod2", 100.0) q.Append(tc1) + tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) tc2.Update("pod1", 100.0) tc2.Update("pod2", 200.0) q.Append(tc2) + tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update("pod1", 50.0) tc3.Update("pod2", 300.0) q.Append(tc3) + tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update("pod1", 80.0) tc4.Update("pod2", 400.0) q.Append(tc4) + tc4.CloseWindow(tc3) // vertex rate assert.Equal(t, 0.0, CalculateRate(q, 5)) @@ -228,20 +268,24 @@ func TestCalculateRate(t *testing.T) { tc1.Update("pod2", 90.0) tc1.Update("pod3", 50.0) q.Append(tc1) + tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) tc2.Update("pod1", 100.0) tc2.Update("pod2", 200.0) q.Append(tc2) + tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update("pod1", 50.0) tc3.Update("pod2", 300.0) tc3.Update("pod4", 100.0) q.Append(tc3) + tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update("pod2", 400.0) tc4.Update("pod3", 200.0) tc4.Update("pod100", 200.0) q.Append(tc4) + tc4.CloseWindow(tc3) // vertex rate assert.Equal(t, 0.0, CalculateRate(q, 5)) @@ -295,20 +339,24 @@ func TestCalculateRate(t *testing.T) { tc1.Update("pod2", 90.0) tc1.Update("pod3", 50.0) q.Append(tc1) + tc1.CloseWindow(nil) tc2 := NewTimestampedCounts(now.Truncate(time.Second*10).Unix() - 20) tc2.Update("pod1", 100.0) tc2.Update("pod2", 200.0) q.Append(tc2) + tc2.CloseWindow(tc1) tc3 := NewTimestampedCounts(now.Truncate(CountWindow).Unix() - 10) tc3.Update("pod1", 50.0) tc3.Update("pod2", 300.0) tc3.Update("pod4", 100.0) q.Append(tc3) + tc3.CloseWindow(tc2) tc4 := NewTimestampedCounts(now.Truncate(CountWindow).Unix()) tc4.Update("pod2", 400.0) tc4.Update("pod3", 200.0) tc4.Update("pod100", 200.0) q.Append(tc4) + tc4.CloseWindow(tc3) assert.Equal(t, 0.0, CalculateRate(q, 5)) assert.Equal(t, 50.0, CalculateRate(q, 15)) diff --git a/pkg/daemon/server/service/rater/timestamped_counts.go b/pkg/daemon/server/service/rater/timestamped_counts.go index ffa075674..fb772ec98 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts.go +++ b/pkg/daemon/server/service/rater/timestamped_counts.go @@ -31,17 +31,24 @@ type TimestampedCounts struct { timestamp int64 // podName to count mapping podCounts map[string]float64 - lock *sync.RWMutex + // isWindowClosed indicates whether we have finished collecting pod counts for this timestamp + isWindowClosed bool + // delta is the total count change from the previous window, it's valid only when isWindowClosed is true + delta float64 + lock *sync.RWMutex } func NewTimestampedCounts(t int64) *TimestampedCounts { return &TimestampedCounts{ - timestamp: t, - podCounts: make(map[string]float64), - lock: new(sync.RWMutex), + timestamp: t, + podCounts: make(map[string]float64), + isWindowClosed: false, + delta: 0, + lock: new(sync.RWMutex), } } +// Update updates the count for a pod if the current window is not closed func (tc *TimestampedCounts) Update(podName string, count float64) { tc.lock.Lock() defer tc.lock.Unlock() @@ -56,6 +63,10 @@ func (tc *TimestampedCounts) Update(podName string, count float64) { // hence we'd rather keep the count as it is to avoid wrong rate calculation. return } + if tc.isWindowClosed { + // we skip updating if the window is already closed. + return + } tc.podCounts[podName] = count } @@ -71,6 +82,44 @@ func (tc *TimestampedCounts) Snapshot() map[string]float64 { return counts } +// IsWindowClosed returns whether the window is closed +func (tc *TimestampedCounts) IsWindowClosed() bool { + tc.lock.RLock() + defer tc.lock.RUnlock() + return tc.isWindowClosed +} + +// CloseWindow closes the window and calculates the delta by comparing the current pod counts with the previous window +func (tc *TimestampedCounts) CloseWindow(prev *TimestampedCounts) { + // prepare pod counts for both current and previous window for delta calculation + var prevPodCounts map[string]float64 + if prev == nil { + prevPodCounts = make(map[string]float64) + } else { + prevPodCounts = prev.Snapshot() + } + currPodCounts := tc.Snapshot() + + // calculate the delta by comparing the current pod counts with the previous window + delta := 0.0 + for key, currCount := range currPodCounts { + prevCount := prevPodCounts[key] // if key doesn't exist in prevPodCounts, prevCount is 0 + if currCount < prevCount { + // this can happen when a pod is restarted during the window + // we count the new count as the delta + delta += currCount + } else { + delta += currCount - prevCount + } + } + + // finalize the window by setting isWindowClosed to true and delta to the calculated value + tc.lock.Lock() + defer tc.lock.Unlock() + tc.isWindowClosed = true + tc.delta = delta +} + // ToString returns a string representation of the TimestampedCounts // it's used for debugging purpose func (tc *TimestampedCounts) ToString() string { diff --git a/pkg/daemon/server/service/rater/timestamped_counts_test.go b/pkg/daemon/server/service/rater/timestamped_counts_test.go index d5a8c4944..514f4b285 100644 --- a/pkg/daemon/server/service/rater/timestamped_counts_test.go +++ b/pkg/daemon/server/service/rater/timestamped_counts_test.go @@ -26,6 +26,8 @@ func TestNewTimestampedCounts(t *testing.T) { tc := NewTimestampedCounts(1620000000) assert.Equal(t, int64(1620000000), tc.timestamp) assert.Equal(t, 0, len(tc.podCounts)) + assert.Equal(t, false, tc.isWindowClosed) + assert.Equal(t, 0.0, tc.delta) } func TestTimestampedCounts_Update(t *testing.T) { @@ -41,6 +43,28 @@ func TestTimestampedCounts_Update(t *testing.T) { assert.Equal(t, 2, len(tc.podCounts)) assert.Equal(t, 20, int(tc.podCounts["pod1"])) assert.Equal(t, 30, int(tc.podCounts["pod2"])) + assert.Equal(t, false, tc.isWindowClosed) + assert.Equal(t, 0.0, tc.delta) + + tc.CloseWindow(nil) + assert.Equal(t, true, tc.isWindowClosed) + // (20-0) + (30-0) = 50 + assert.Equal(t, 50.0, tc.delta) + // verify that updating pod counts doesn't take effect if the window is already closed + tc.Update("pod1", 10.0) + assert.Equal(t, 20, int(tc.podCounts["pod1"])) + tc.Update("pod2", 20.0) + assert.Equal(t, 30, int(tc.podCounts["pod2"])) + + tc2 := NewTimestampedCounts(1620000001) + tc2.Update("pod1", 40.0) + assert.Equal(t, 40.0, tc2.podCounts["pod1"]) + tc2.Update("pod2", 10.0) + assert.Equal(t, 10.0, tc2.podCounts["pod2"]) + tc2.CloseWindow(tc) + assert.Equal(t, true, tc2.isWindowClosed) + // (40-20) + 10 = 30 + assert.Equal(t, 30.0, tc2.delta) } func TestTimestampedCounts_Snapshot(t *testing.T) {