diff --git a/.chloggen/deltatocumulative-evict-only-stale.yaml b/.chloggen/deltatocumulative-evict-only-stale.yaml new file mode 100644 index 000000000000..eb253a3c61d7 --- /dev/null +++ b/.chloggen/deltatocumulative-evict-only-stale.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: deltatocumulativeprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Evict only stale streams + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [33014] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Changes eviction behavior to only evict streams that are actually stale. + Currently, once the stream limit is hit, on each new stream the oldest tracked one is evicted. + Under heavy load this can rapidly delete all streams over and over, rendering the processor useless. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index ce69321cadd9..975e7750a446 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -88,10 +88,15 @@ func (s *Staleness[T]) Next() time.Time { return ts } -func (s *Staleness[T]) Evict() identity.Stream { +func (s *Staleness[T]) Evict() (identity.Stream, bool) { + _, ts := s.pq.Peek() + if ts.Add(s.Max).Before(time.Now()) { + return identity.Stream{}, false + } + id, _ := s.pq.Pop() s.items.Delete(id) - return id + return id, true } func (s *Staleness[T]) Clear() { diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 03fba7812a43..8e64ae0e0305 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -58,6 +58,8 @@ func (m HashMap[T]) Clear() { // Evictors remove the "least important" stream based on some strategy such as // the oldest, least active, etc. +// +// Returns whether a stream was evicted and if so the now gone stream id type Evictor interface { - Evict() identity.Stream + Evict() (gone identity.Stream, ok bool) } diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go index 3e021b6d5d74..dd1d927687c9 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -12,7 +12,12 @@ import ( ) func Limit[T any](m Map[T], max int) LimitMap[T] { - return LimitMap[T]{Map: m, Max: max} + return LimitMap[T]{ + Map: m, Max: max, + Evictor: EvictorFunc(func() (identity.Stream, bool) { + return identity.Stream{}, false + }), + } } type LimitMap[T any] struct { @@ -23,21 +28,27 @@ type LimitMap[T any] struct { } func (m LimitMap[T]) Store(id identity.Stream, v T) error { - _, ok := m.Map.Load(id) - avail := m.Map.Len() < m.Max - if ok || avail { - return m.Map.Store(id, v) - } + _, exist := m.Map.Load(id) - errl := ErrLimit(m.Max) - if m.Evictor != nil { - gone := m.Evictor.Evict() - if err := m.Map.Store(id, v); err != nil { - return err + var errEv error + // if not already tracked and no space: try to evict + if !exist && m.Map.Len() >= m.Max { + errl := ErrLimit(m.Max) + gone, ok := m.Evictor.Evict() + if !ok { + // if no eviction possible, fail as there is no space + return errl } - return ErrEvicted{ErrLimit: errl, Ident: gone} + errEv = ErrEvicted{ErrLimit: errl, Ident: gone} + } + + // there was space, or we made space: store it + if err := m.Map.Store(id, v); err != nil { + return err } - return errl + + // we may have evicted something, let the caller know + return errEv } type ErrLimit int @@ -59,3 +70,9 @@ type ErrEvicted struct { func (e ErrEvicted) Error() string { return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.Ident) } + +type EvictorFunc func() (identity.Stream, bool) + +func (ev EvictorFunc) Evict() (identity.Stream, bool) { + return ev() +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go index 04ffffbde5f5..440e466dc2e3 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -57,3 +57,52 @@ func TestLimit(t *testing.T) { require.NoError(t, err) } } + +func TestLimitEvict(t *testing.T) { + sum := random.Sum() + evictable := make(map[identity.Stream]struct{}) + + items := make(exp.HashMap[data.Number]) + lim := streams.Limit(items, 5) + lim.Evictor = streams.EvictorFunc(func() (identity.Stream, bool) { + for id := range evictable { + delete(evictable, id) + return id, true + } + return identity.Stream{}, false + }) + + ids := make([]identity.Stream, 10) + dps := make([]data.Number, 10) + for i := 0; i < 10; i++ { + id, dp := sum.Stream() + ids[i] = id + dps[i] = dp + } + + // store up to limit must work + for i := 0; i < 5; i++ { + err := lim.Store(ids[i], dps[i]) + require.NoError(t, err) + } + + // store beyond limit must fail + for i := 5; i < 10; i++ { + err := lim.Store(ids[i], dps[i]) + require.Equal(t, streams.ErrLimit(5), err) + } + + // put two streams up for eviction + evictable[ids[2]] = struct{}{} + evictable[ids[3]] = struct{}{} + + // while evictable do so, fail again afterwards + for i := 5; i < 10; i++ { + err := lim.Store(ids[i], dps[i]) + if i < 7 { + require.Equal(t, streams.ErrEvicted{ErrLimit: streams.ErrLimit(5), Ident: ids[i-3]}, err) + } else { + require.Equal(t, streams.ErrLimit(5), err) + } + } +} diff --git a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go index 4809d74fda04..51081099995d 100644 --- a/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go +++ b/processor/deltatocumulativeprocessor/internal/telemetry/faults_test.go @@ -140,11 +140,11 @@ type ts = pcommon.Timestamp // HeadEvictor drops the first stream on Evict() type HeadEvictor[T any] struct{ streams.Map[T] } -func (e HeadEvictor[T]) Evict() (evicted identity.Stream) { +func (e HeadEvictor[T]) Evict() (evicted identity.Stream, ok bool) { e.Items()(func(id identity.Stream, _ T) bool { e.Delete(id) evicted = id return false }) - return evicted + return evicted, true }