Skip to content

Commit

Permalink
[processor/deltatocumulative] limit tracked streams (#31488)
Browse files Browse the repository at this point in the history
**Description:** Adds a configurable upper limit to the number of
tracked streams. This allows to introduce a upper bound to the memory
usage.

**Testing:** Test case was added

**Documentation:** README was updated
  • Loading branch information
sh0rez committed Mar 11, 2024
1 parent 0672df1 commit 97f685e
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 8 deletions.
30 changes: 30 additions & 0 deletions .chloggen/deltatocumulative-limits.yaml
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: "enhancement"

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: "deltatocumulativeprocessor"

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: introduce configurable stream limit

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [31488]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Adds `max_streams` option that allows to set upper bound (default = unlimited)
to the number of tracked streams. Any additional streams exceeding the limit
are dropped.
# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
11 changes: 11 additions & 0 deletions internal/exp/metrics/staleness/staleness.go
Expand Up @@ -13,6 +13,11 @@ import (
// We override how Now() is returned, so we can have deterministic tests
var NowFunc = time.Now

var (
_ streams.Map[any] = (*Staleness[any])(nil)
_ streams.Evictor = (*Staleness[any])(nil)
)

// Staleness a a wrapper over a map that adds an additional "staleness" value to each entry. Users can
// call ExpireOldEntries() to automatically remove all entries from the map whole staleness value is
// older than the `max`
Expand Down Expand Up @@ -82,3 +87,9 @@ func (s *Staleness[T]) Next() time.Time {
_, ts := s.pq.Peek()
return ts
}

func (s *Staleness[T]) Evict() identity.Stream {
id, _ := s.pq.Pop()
s.items.Delete(id)
return id
}
6 changes: 6 additions & 0 deletions internal/exp/metrics/streams/streams.go
Expand Up @@ -50,3 +50,9 @@ func (m HashMap[T]) Items() func(yield func(identity.Stream, T) bool) bool {
func (m HashMap[T]) Len() int {
return len((map[identity.Stream]T)(m))
}

// Evictors remove the "least important" stream based on some strategy such as
// the oldest, least active, etc.
type Evictor interface {
Evict() identity.Stream
}
6 changes: 4 additions & 2 deletions processor/deltatocumulativeprocessor/README.md
Expand Up @@ -25,6 +25,8 @@ processors:
deltatocumulative:
# how long until a series not receiving new samples is removed
[ max_stale: <duration> | default = 5m ]

# upper limit of streams to track. new streams exceeding this limit
# will be dropped
[ max_streams: <int> | default = 0 (off) ]
```

There is no further configuration required. All delta samples are converted to cumulative.
16 changes: 15 additions & 1 deletion processor/deltatocumulativeprocessor/config.go
Expand Up @@ -13,12 +13,26 @@ import (
var _ component.ConfigValidator = (*Config)(nil)

type Config struct {
MaxStale time.Duration `json:"max_stale"`
MaxStale time.Duration `json:"max_stale"`
MaxStreams int `json:"max_streams"`
}

func (c *Config) Validate() error {
if c.MaxStale <= 0 {
return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale)
}
if c.MaxStreams <= 0 {
return fmt.Errorf("max_streams must be a positive number (got %d)", c.MaxStreams)
}
return nil
}

func createDefaultConfig() component.Config {
return &Config{
MaxStale: 5 * time.Minute,

// disable. TODO: find good default
// https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/31603
MaxStreams: 0,
}
}
5 changes: 0 additions & 5 deletions processor/deltatocumulativeprocessor/factory.go
Expand Up @@ -6,7 +6,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -23,10 +22,6 @@ func NewFactory() processor.Factory {
)
}

func createDefaultConfig() component.Config {
return &Config{MaxStale: 5 * time.Minute}
}

func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) {
pcfg, ok := cfg.(*Config)
if !ok {
Expand Down
63 changes: 63 additions & 0 deletions processor/deltatocumulativeprocessor/internal/streams/limit.go
@@ -0,0 +1,63 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"

import (
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
)

func Limit[T any](m Map[T], max int) LimitMap[T] {
return LimitMap[T]{Map: m, Max: max}
}

type LimitMap[T any] struct {
Max int

Evictor streams.Evictor
streams.Map[T]
}

func (m LimitMap[T]) Store(id identity.Stream, v T) error {
if m.Map.Len() < m.Max {
return m.Map.Store(id, v)
}

errl := ErrLimit(m.Max)
if m.Evictor != nil {
gone := m.Evictor.Evict()
if err := m.Map.Store(id, v); err != nil {
return err
}
return ErrEvicted{ErrLimit: errl, id: gone}
}
return errl
}

type ErrLimit int

func (e ErrLimit) Error() string {
return fmt.Sprintf("stream limit of %d reached", e)
}

func AtLimit(err error) bool {
var errLimit ErrLimit
return errors.As(err, &errLimit)
}

type ErrEvicted struct {
ErrLimit
id Ident
}

func (e ErrEvicted) Error() string {
return fmt.Sprintf("%s. evicted stream %s", e.ErrLimit, e.id)
}

func (e ErrEvicted) Unwrap() error {
return e.ErrLimit
}
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package streams_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random"
)

func TestLimit(t *testing.T) {
sum := random.Sum()

items := make(exp.HashMap[data.Number])
lim := streams.Limit(items, 10)

ids := make([]identity.Stream, 10)

// write until limit must work
for i := 0; i < 10; i++ {
id, dp := sum.Stream()
ids[i] = id
err := lim.Store(id, dp)
require.NoError(t, err)
}

// one over limit must be rejected
{
id, dp := sum.Stream()
err := lim.Store(id, dp)
want := streams.ErrLimit(10)
require.ErrorAs(t, err, &want)
require.True(t, streams.AtLimit(err))
}

// after removing one, must be accepted again
{
lim.Delete(ids[0])

id, dp := sum.Stream()
err := lim.Store(id, dp)
require.NoError(t, err)
}
}
Expand Up @@ -33,3 +33,5 @@ func (a MapAggr[D]) Aggregate(id Ident, dp D) (D, error) {
v, _ := a.Map.Load(id)
return v, err
}

type Evictor = streams.Evictor
7 changes: 7 additions & 0 deletions processor/deltatocumulativeprocessor/processor.go
Expand Up @@ -55,6 +55,13 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo
proc.stale = stale
dps = stale
}
if cfg.MaxStreams > 0 {
lim := streams.Limit(dps, cfg.MaxStreams)
if proc.exp != nil {

Check failure on line 60 in processor/deltatocumulativeprocessor/processor.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor)

proc.exp undefined (type Processor has no field or method exp)
lim.Evictor = proc.exp

Check failure on line 61 in processor/deltatocumulativeprocessor/processor.go

View workflow job for this annotation

GitHub Actions / govulncheck (processor)

proc.exp undefined (type Processor has no field or method exp)
}
dps = lim
}

proc.aggr = streams.IntoAggregator(dps)
return &proc
Expand Down

0 comments on commit 97f685e

Please sign in to comment.