From 5c92bd3731c04b96a81c3eef24e37ec8350dabf5 Mon Sep 17 00:00:00 2001 From: sh0rez Date: Mon, 13 May 2024 14:15:09 +0200 Subject: [PATCH 1/2] deltatocumulative: evict only stale streams changes eviction behavior at limit to only delete streams if they are actually stale. Current behavior just deletes the oldest, which leads to rapid deletion of all streams under heavy load, making this processor unusable. --- .../deltatocumulative-evict-only-stale.yaml | 30 ++++++++++++ internal/exp/metrics/staleness/staleness.go | 10 ++-- internal/exp/metrics/streams/streams.go | 2 +- .../internal/streams/limit.go | 43 +++++++++++----- .../internal/streams/limit_test.go | 49 +++++++++++++++++++ .../internal/telemetry/faults_test.go | 4 +- 6 files changed, 119 insertions(+), 19 deletions(-) create mode 100644 .chloggen/deltatocumulative-evict-only-stale.yaml diff --git a/.chloggen/deltatocumulative-evict-only-stale.yaml b/.chloggen/deltatocumulative-evict-only-stale.yaml new file mode 100644 index 000000000000..eb253a3c61d7 --- /dev/null +++ b/.chloggen/deltatocumulative-evict-only-stale.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Evict only stale streams + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33014] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Changes eviction behavior to only evict streams that are actually stale. + Currently, once the stream limit is hit, on each new stream the oldest tracked one is evicted. + Under heavy load this can rapidly delete all streams over and over, rendering the processor useless. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index ce69321cadd9..b4982a57af23 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -88,10 +88,14 @@ func (s *Staleness[T]) Next() time.Time { return ts } -func (s *Staleness[T]) Evict() identity.Stream { - id, _ := s.pq.Pop() +func (s *Staleness[T]) Evict() (identity.Stream, bool) { + id, ts := s.pq.Pop() + if ts.Add(s.Max).Before(time.Now()) { + return identity.Stream{}, false + } + s.items.Delete(id) - return id + return id, true } func (s *Staleness[T]) Clear() { diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 03fba7812a43..9fd04ad278a1 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -59,5 +59,5 @@ func (m HashMap[T]) Clear() { // Evictors remove the "least important" stream based on some strategy such as // the oldest, least active, etc. type Evictor interface { - Evict() identity.Stream + Evict() (identity.Stream, bool) } diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 3e021b6d5d74..dd1d927687c9 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -12,7 +12,12 @@ import ( ) func Limit[T any](m Map[T], max int) LimitMap[T] { - return LimitMap[T]{Map: m, Max: max} + return LimitMap[T]{ + Map: m, Max: max, + Evictor: EvictorFunc(func() (identity.Stream, bool) { + return identity.Stream{}, false + }), + } } type LimitMap[T any] struct { @@ -23,21 +28,27 @@ type LimitMap[T any] struct { } func (m LimitMap[T]) Store(id identity.Stream, v T) error { - _, ok := m.Map.Load(id) - avail := m.Map.Len() < m.Max - if ok || avail { - return m.Map.Store(id, v) - } + _, exist := m.Map.Load(id) - errl := ErrLimit(m.Max) - if m.Evictor != nil { - gone := m.Evictor.Evict() - if err := m.Map.Store(id, v); err != nil { - return err + var errEv error + // if not already tracked and no space: try to evict + if !exist && m.Map.Len() >= m.Max { + errl := ErrLimit(m.Max) + gone, ok := m.Evictor.Evict() + if !ok { + // if no eviction possible, fail as there is no space + return errl } - return ErrEvicted{ErrLimit: errl, Ident: gone} + errEv = ErrEvicted{ErrLimit: errl, Ident: gone} + } + + // there was space, or we made space: store it + if err := m.Map.Store(id, v); err != nil { + return err } - return errl + + // we may have evicted something, let the caller know + return errEv } type ErrLimit int @@ -59,3 +70,9 @@ type ErrEvicted struct { func (e ErrEvicted) Error() string { return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) } + +type EvictorFunc func() (identity.Stream, bool) + +func (ev EvictorFunc) Evict() (identity.Stream, bool) { + return ev() +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go index 04ffffbde5f5..440e466dc2e3 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -57,3 +57,52 @@ func TestLimit(t *testing.T) { require.NoError(t, err) } } + +func TestLimitEvict(t *testing.T) { + sum := random.Sum() + evictable := make(map[identity.Stream]struct{}) + + items := make(exp.HashMap[data.Number]) + lim := streams.Limit(items, 5) + lim.Evictor = streams.EvictorFunc(func() (identity.Stream, bool) { + for id := range evictable { + delete(evictable, id) + return id, true + } + return identity.Stream{}, false + }) + + ids := make([]identity.Stream, 10) + dps := make([]data.Number, 10) + for i := 0; i < 10; i++ { + id, dp := sum.Stream() + ids[i] = id + dps[i] = dp + } + + // store up to limit must work + for i := 0; i < 5; i++ { + err := lim.Store(ids[i], dps[i]) + require.NoError(t, err) + } + + // store beyond limit must fail + for i := 5; i < 10; i++ { + err := lim.Store(ids[i], dps[i]) + require.Equal(t, streams.ErrLimit(5), err) + } + + // put two streams up for eviction + evictable[ids[2]] = struct{}{} + evictable[ids[3]] = struct{}{} + + // while evictable do so, fail again afterwards + for i := 5; i < 10; i++ { + err := lim.Store(ids[i], dps[i]) + if i < 7 { + require.Equal(t, streams.ErrEvicted{ErrLimit: streams.ErrLimit(5), Ident: ids[i-3]}, err) + } else { + require.Equal(t, streams.ErrLimit(5), err) + } + } +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 4809d74fda04..51081099995d 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -140,11 +140,11 @@ type ts = pcommon.Timestamp // HeadEvictor drops the first stream on Evict() type HeadEvictor[T any] struct{ streams.Map[T] } -func (e HeadEvictor[T]) Evict() (evicted identity.Stream) { +func (e HeadEvictor[T]) Evict() (evicted identity.Stream, ok bool) { e.Items()(func(id identity.Stream, _ T) bool { e.Delete(id) evicted = id return false }) - return evicted + return evicted, true } From eece2d992998dc6d48e12680dde865b43f3cdb3f Mon Sep 17 00:00:00 2001 From: sh0rez Date: Wed, 15 May 2024 21:51:01 +0200 Subject: [PATCH 2/2] staleness: peek, doc --- internal/exp/metrics/staleness/staleness.go | 3 ++- internal/exp/metrics/streams/streams.go | 4 +++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index b4982a57af23..975e7750a446 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -89,11 +89,12 @@ func (s *Staleness[T]) Next() time.Time { } func (s *Staleness[T]) Evict() (identity.Stream, bool) { - id, ts := s.pq.Pop() + _, ts := s.pq.Peek() if ts.Add(s.Max).Before(time.Now()) { return identity.Stream{}, false } + id, _ := s.pq.Pop() s.items.Delete(id) return id, true } diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 9fd04ad278a1..8e64ae0e0305 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -58,6 +58,8 @@ func (m HashMap[T]) Clear() { // Evictors remove the "least important" stream based on some strategy such as // the oldest, least active, etc. +// +// Returns whether a stream was evicted and if so the now gone stream id type Evictor interface { - Evict() (identity.Stream, bool) + Evict() (gone identity.Stream, ok bool) }