Skip to content

Commit

Permalink
[deltatocumulative]: limit tracked streams
Browse files Browse the repository at this point in the history
  • Loading branch information
sh0rez committed Mar 5, 2024
1 parent f003e0e commit 181f34a
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 8 deletions.
6 changes: 4 additions & 2 deletions processor/deltatocumulativeprocessor/README.md
Expand Up @@ -25,6 +25,8 @@ processors:
deltatocumulative:
# how long until a series not receiving new samples is removed
[ max_stale: <duration> | default = 5m ]

# upper limit of streams to track. new streams exceeding this limit
# will be dropped
[ max_streams: <int> | default = off]
```

There is no further configuration required. All delta samples are converted to cumulative.
13 changes: 12 additions & 1 deletion processor/deltatocumulativeprocessor/config.go
Expand Up @@ -13,12 +13,23 @@ import (
var _ component.ConfigValidator = (*Config)(nil)

type Config struct {
MaxStale time.Duration `json:"max_stale"`
MaxStale time.Duration `json:"max_stale"`
MaxStreams int `json:"max_streams"`
}

func (c *Config) Validate() error {
if c.MaxStale <= 0 {
return fmt.Errorf("max_stale must be a positive duration (got %s)", c.MaxStale)
}
if c.MaxStreams <= 0 {
return fmt.Errorf("max_streams must be a positive number (got %d)", c.MaxStreams)
}
return nil
}

func createDefaultConfig() component.Config {
return &Config{
MaxStale: 5 * time.Minute,
MaxStreams: 0, // disable. TODO: find good default
}
}
5 changes: 0 additions & 5 deletions processor/deltatocumulativeprocessor/factory.go
Expand Up @@ -6,7 +6,6 @@ package deltatocumulativeprocessor // import "github.com/open-telemetry/opentele
import (
"context"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
Expand All @@ -23,10 +22,6 @@ func NewFactory() processor.Factory {
)
}

func createDefaultConfig() component.Config {
return &Config{MaxStale: 5 * time.Minute}
}

func createMetricsProcessor(_ context.Context, set processor.CreateSettings, cfg component.Config, next consumer.Metrics) (processor.Metrics, error) {
pcfg, ok := cfg.(*Config)
if !ok {
Expand Down
39 changes: 39 additions & 0 deletions processor/deltatocumulativeprocessor/internal/streams/limit.go
@@ -0,0 +1,39 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package streams // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"

import (
"errors"
"fmt"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
)

func Limit[T any](m Map[T], max int) Map[T] {
return LimitMap[T]{Map: m, Max: max}
}

type LimitMap[T any] struct {
Max int
streams.Map[T]
}

func (m LimitMap[T]) Store(id identity.Stream, v T) error {
if m.Map.Len() >= m.Max {
return ErrLimit(m.Max)
}
return m.Map.Store(id, v)
}

type ErrLimit int

func (e ErrLimit) Error() string {
return fmt.Sprintf("stream limit of %d reached", e)
}

func AtLimit(err error) bool {
var errLimit ErrLimit
return errors.As(err, &errLimit)
}
@@ -0,0 +1,51 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package streams_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/identity"
exp "github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/data"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/streams"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/deltatocumulativeprocessor/internal/testdata/random"
)

func TestLimit(t *testing.T) {
sum := random.Sum()

items := make(exp.HashMap[data.Number])
lim := streams.Limit(items, 10)

ids := make([]identity.Stream, 10)

// write until limit must work
for i := 0; i < 10; i++ {
id, dp := sum.Stream()
ids[i] = id
err := lim.Store(id, dp)
require.NoError(t, err)
}

// one over limit must be rejected
{
id, dp := sum.Stream()
err := lim.Store(id, dp)
want := streams.ErrLimit(10)
require.ErrorAs(t, err, &want)
require.True(t, streams.AtLimit(err))
}

// after removing one, must be accepted again
{
lim.Delete(ids[0])

id, dp := sum.Stream()
err := lim.Store(id, dp)
require.NoError(t, err)
}
}
3 changes: 3 additions & 0 deletions processor/deltatocumulativeprocessor/processor.go
Expand Up @@ -53,6 +53,9 @@ func newProcessor(cfg *Config, log *zap.Logger, next consumer.Metrics) *Processo
proc.exp = &exp
dps = &exp
}
if cfg.MaxStreams > 0 {
dps = streams.Limit(dps, cfg.MaxStreams)
}

proc.aggr = streams.IntoAggregator(dps)
return &proc
Expand Down

0 comments on commit 181f34a

Please sign in to comment.