diff --git a/.chloggen/deltatocumulative-limits.yaml b/.chloggen/deltatocumulative-limits.yaml new file mode 100644 index 0000000000000..1ff0ff7a8fa7e --- /dev/null +++ b/.chloggen/deltatocumulative-limits.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: "enhancement" + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: "deltatocumulativeprocessor" + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: introduce configurable stream limit + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31488] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Adds `max_streams` option that allows to set upper bound (default = unlimited) + to the number of tracked streams. Any additional streams exceeding the limit + are dropped. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/internal/exp/metrics/staleness/staleness.go b/internal/exp/metrics/staleness/staleness.go index 9e5f989495f8a..4bf25c1989204 100644 --- a/internal/exp/metrics/staleness/staleness.go +++ b/internal/exp/metrics/staleness/staleness.go @@ -13,6 +13,11 @@ import ( // We override how Now() is returned, so we can have deterministic tests var NowFunc = time.Now +var ( + _ streams.Map[any] = (*Staleness[any])(nil) + _ streams.Evictor = (*Staleness[any])(nil) +) + // Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can // call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is // older than the `max` @@ -82,3 +87,9 @@ func (s *Staleness[T]) Next() time.Time { _, ts := s.pq.Peek() return ts } + +func (s *Staleness[T]) Evict() identity.Stream { + id, _ := s.pq.Pop() + s.items.Delete(id) + return id +} diff --git a/internal/exp/metrics/streams/streams.go b/internal/exp/metrics/streams/streams.go index 1e25102b3b470..90bebb63c0914 100644 --- a/internal/exp/metrics/streams/streams.go +++ b/internal/exp/metrics/streams/streams.go @@ -50,3 +50,9 @@ func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool { func (m HashMap[T]) Len() int { return len((map[identity.Stream]T)(m)) } + +// Evictors remove the "least important" stream based on some strategy such as +// the oldest, least active, etc. +type Evictor interface { + Evict() identity.Stream +} diff --git a/processor/deltatocumulativeprocessor/README.md b/processor/deltatocumulativeprocessor/README.md index 1a639128fca86..e80eeb2ad441a 100644 --- a/processor/deltatocumulativeprocessor/README.md +++ b/processor/deltatocumulativeprocessor/README.md @@ -25,6 +25,8 @@ processors: deltatocumulative: # how long until a series not receiving new samples is removed [ max_stale: | default = 5m ] + + # upper limit of streams to track. new streams exceeding this limit + # will be dropped + [ max_streams: | default = 0 (off) ] ``` - -There is no further configuration required. All delta samples are converted to cumulative. diff --git a/processor/deltatocumulativeprocessor/config.go b/processor/deltatocumulativeprocessor/config.go index b5744a9779b71..54628386f6e08 100644 --- a/processor/deltatocumulativeprocessor/config.go +++ b/processor/deltatocumulativeprocessor/config.go @@ -13,12 +13,26 @@ import ( var _ component.ConfigValidator = (*Config)(nil) type Config struct { - MaxStale time.Duration `json:"max_stale"` + MaxStale time.Duration `json:"max_stale"` + MaxStreams int `json:"max_streams"` } func (c *Config) Validate() error { if c.MaxStale <= 0 { return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale) } + if c.MaxStreams <= 0 { + return fmt.Errorf("max_streams must be a positive number (got %d)", c.MaxStreams) + } return nil } + +func createDefaultConfig() component.Config { + return &Config{ + MaxStale: 5 * time.Minute, + + // disable. TODO: find good default + // https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603 + MaxStreams: 0, + } +} diff --git a/processor/deltatocumulativeprocessor/factory.go b/processor/deltatocumulativeprocessor/factory.go index b2fba4e00fc2a..47b968d14f17a 100644 --- a/processor/deltatocumulativeprocessor/factory.go +++ b/processor/deltatocumulativeprocessor/factory.go @@ -6,7 +6,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele import ( "context" "fmt" - "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -23,10 +22,6 @@ func NewFactory() processor.Factory { ) } -func createDefaultConfig() component.Config { - return &Config{MaxStale: 5 * time.Minute} -} - func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) { pcfg, ok := cfg.(*Config) if !ok { diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit.go b/processor/deltatocumulativeprocessor/internal/streams/limit.go new file mode 100644 index 0000000000000..0378960ee6573 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/limit.go @@ -0,0 +1,63 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + +import ( + "errors" + "fmt" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" +) + +func Limit[T any](m Map[T], max int) LimitMap[T] { + return LimitMap[T]{Map: m, Max: max} +} + +type LimitMap[T any] struct { + Max int + + Evictor streams.Evictor + streams.Map[T] +} + +func (m LimitMap[T]) Store(id identity.Stream, v T) error { + if m.Map.Len() < m.Max { + return m.Map.Store(id, v) + } + + errl := ErrLimit(m.Max) + if m.Evictor != nil { + gone := m.Evictor.Evict() + if err := m.Map.Store(id, v); err != nil { + return err + } + return ErrEvicted{ErrLimit: errl, id: gone} + } + return errl +} + +type ErrLimit int + +func (e ErrLimit) Error() string { + return fmt.Sprintf("stream limit of %d reached", e) +} + +func AtLimit(err error) bool { + var errLimit ErrLimit + return errors.As(err, &errLimit) +} + +type ErrEvicted struct { + ErrLimit + id Ident +} + +func (e ErrEvicted) Error() string { + return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id) +} + +func (e ErrEvicted) Unwrap() error { + return e.ErrLimit +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/limit_test.go b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go new file mode 100644 index 0000000000000..d0c5af6e56662 --- /dev/null +++ b/processor/deltatocumulativeprocessor/internal/streams/limit_test.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package streams_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity" + exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams" + "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random" +) + +func TestLimit(t *testing.T) { + sum := random.Sum() + + items := make(exp.HashMap[data.Number]) + lim := streams.Limit(items, 10) + + ids := make([]identity.Stream, 10) + + // write until limit must work + for i := 0; i < 10; i++ { + id, dp := sum.Stream() + ids[i] = id + err := lim.Store(id, dp) + require.NoError(t, err) + } + + // one over limit must be rejected + { + id, dp := sum.Stream() + err := lim.Store(id, dp) + want := streams.ErrLimit(10) + require.ErrorAs(t, err, &want) + require.True(t, streams.AtLimit(err)) + } + + // after removing one, must be accepted again + { + lim.Delete(ids[0]) + + id, dp := sum.Stream() + err := lim.Store(id, dp) + require.NoError(t, err) + } +} diff --git a/processor/deltatocumulativeprocessor/internal/streams/streams.go b/processor/deltatocumulativeprocessor/internal/streams/streams.go index c6f725c7bfa27..1b34f806b272a 100644 --- a/processor/deltatocumulativeprocessor/internal/streams/streams.go +++ b/processor/deltatocumulativeprocessor/internal/streams/streams.go @@ -33,3 +33,5 @@ func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) { v, _ := a.Map.Load(id) return v, err } + +type Evictor = streams.Evictor diff --git a/processor/deltatocumulativeprocessor/processor.go b/processor/deltatocumulativeprocessor/processor.go index 909e0c7fbf140..0a351b5eecac6 100644 --- a/processor/deltatocumulativeprocessor/processor.go +++ b/processor/deltatocumulativeprocessor/processor.go @@ -55,6 +55,13 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo proc.stale = stale dps = stale } + if cfg.MaxStreams > 0 { + lim := streams.Limit(dps, cfg.MaxStreams) + if proc.exp != nil { + lim.Evictor = proc.exp + } + dps = lim + } proc.aggr = streams.IntoAggregator(dps) return &proc