Skip to content

Commit

Permalink
[processor/memorylimiter] Only drops traces, not logs or metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Dani Louca <dlouca@splunk.com>

**Description:** 

This change 062e64f caused the memory limiter to "only" start the `checkMemLimits` routine for the ml instance used by the traces processor . 
In other words, metrics and logs processor will NOT [drop/refuse](https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/memorylimiterprocessor/memorylimiter.go#L206) data and will pass them down to the next consumer regardless of the current memory pressure as their instance of ml->forcingDrop will not be set.

The simplest solution, is to call start for each processor (metrics, logs, traces) , but this will not be efficient as we'll be running 3 instances of `checkMemLimits`, ie: multiple GC .
But at the same we need to allow multiple instances, with different configs, example: `memory_limiter/another` and `memory_limiter`

````
extensions:
  memory_ballast:
    size_mib: 4

receivers:
  otlp:
    protocols:
      grpc:
      http:
processors:
  memory_limiter:
    check_interval: 2s
    limit_mib: 10

  memory_limiter/another:
    check_interval: 1s
    limit_mib: 100
exporters:
  logging:
    logLevel: info

service:
  telemetry:
    logs:
      level: "info"
  pipelines:
    metrics:
      receivers: [otlp]
      processors: [memory_limiter]
      exporters: [logging]
    metrics/default:
      receivers: [otlp]
      processors: [memory_limiter]
      exporters: [logging]
    traces:
      receivers: [otlp]
      processors: [memory_limiter/another]
      exporters: [logging]
  extensions: [memory_ballast]
````

The fix adds a global map to keep track of the different instance and add ~~sync once~~ mutex for the start and shutdown call, so only the first processor can launch the `checkMemLimits` routine and the last one to call `shutdown` to take it down.
If shutdown was called and no `checkMemLimits` has started, then we'll return an error message; unit tests were updated to handle this.


**Testing:** 
Tested with above config and using splunk otel instance with valid data.
Made sure only a single `checkMemLimits` is running when there is a single config for memory-limiter and more than one when we have multiple.
I also verified that under memory pressure, when we pass the soft limit, all data types, traces, logs and metrics are getting dropped.

One we agree on this solution, I will look into adding more unit test to validate the change
  • Loading branch information
dloucasfx committed Feb 24, 2022
1 parent 3cbba96 commit 52d6518
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 27 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
## 🧰 Bug fixes 🧰

- Initialized logger with collector to avoid potential race condition panic on `Shutdown` (#4827)
- In addition to traces, now logs and metrics processors will start the memory limiter.
Added thread-safe logic so only the first processor can launch the `checkMemLimits` go-routine and the last processor
that calls shutdown to terminate it; this is done per memory limiter instance.
Added memory limiter factory to cache initiated object and be reused by similar config. This guarantees a single
running `checkMemLimits` per config (#4886)

## v0.45.0 Beta

Expand Down
64 changes: 48 additions & 16 deletions processor/memorylimiterprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package memorylimiterprocessor // import "go.opentelemetry.io/collector/processo

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config"
Expand All @@ -30,14 +31,24 @@ const (

var processorCapabilities = consumer.Capabilities{MutatesData: false}

type factory struct {
// memoryLimiters stores memoryLimiter instances with unique configs that multiple processors can reuse.
// This avoids running multiple memory checks (ie: GC) for every processor using the same processor config.
memoryLimiters map[config.ComponentID]*memoryLimiter
lock sync.Mutex
}

// NewFactory returns a new factory for the Memory Limiter processor.
func NewFactory() component.ProcessorFactory {
f := &factory{
memoryLimiters: map[config.ComponentID]*memoryLimiter{},
}
return component.NewProcessorFactory(
typeStr,
createDefaultConfig,
component.WithTracesProcessor(createTracesProcessor),
component.WithMetricsProcessor(createMetricsProcessor),
component.WithLogsProcessor(createLogsProcessor))
component.WithTracesProcessor(f.createTracesProcessor),
component.WithMetricsProcessor(f.createMetricsProcessor),
component.WithLogsProcessor(f.createLogsProcessor))
}

// CreateDefaultConfig creates the default configuration for processor. Notice
Expand All @@ -48,57 +59,78 @@ func createDefaultConfig() config.Processor {
}
}

func createTracesProcessor(
func (f *factory) createTracesProcessor(
_ context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Traces,
) (component.TracesProcessor, error) {
ml, err := newMemoryLimiter(set, cfg.(*Config))
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewTracesProcessor(
cfg,
nextConsumer,
ml.processTraces,
memLimiter.processTraces,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithStart(ml.start),
processorhelper.WithShutdown(ml.shutdown))
processorhelper.WithStart(memLimiter.start),
processorhelper.WithShutdown(memLimiter.shutdown))
}

func createMetricsProcessor(
func (f *factory) createMetricsProcessor(
_ context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Metrics,
) (component.MetricsProcessor, error) {
ml, err := newMemoryLimiter(set, cfg.(*Config))
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewMetricsProcessor(
cfg,
nextConsumer,
ml.processMetrics,
memLimiter.processMetrics,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
processorhelper.WithStart(memLimiter.start),
processorhelper.WithShutdown(memLimiter.shutdown))
}

func createLogsProcessor(
func (f *factory) createLogsProcessor(
_ context.Context,
set component.ProcessorCreateSettings,
cfg config.Processor,
nextConsumer consumer.Logs,
) (component.LogsProcessor, error) {
ml, err := newMemoryLimiter(set, cfg.(*Config))
memLimiter, err := f.getMemoryLimiter(set, cfg)
if err != nil {
return nil, err
}
return processorhelper.NewLogsProcessor(
cfg,
nextConsumer,
ml.processLogs,
memLimiter.processLogs,
processorhelper.WithCapabilities(processorCapabilities),
processorhelper.WithShutdown(ml.shutdown))
processorhelper.WithStart(memLimiter.start),
processorhelper.WithShutdown(memLimiter.shutdown))
}

// getMemoryLimiter checks if we have a cached memoryLimiter with a specific config,
// otherwise initialize and add one to the store.
func (f *factory) getMemoryLimiter(set component.ProcessorCreateSettings, cfg config.Processor) (*memoryLimiter, error) {
f.lock.Lock()
defer f.lock.Unlock()

if memLimiter, ok := f.memoryLimiters[cfg.ID()]; ok {
return memLimiter, nil
}

memLimiter, err := newMemoryLimiter(set, cfg.(*Config))
if err != nil {
return nil, err
}

f.memoryLimiters[cfg.ID()] = memLimiter
return memLimiter, nil
}
20 changes: 18 additions & 2 deletions processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package memorylimiterprocessor

import (
"context"
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -61,18 +62,33 @@ func TestCreateProcessor(t *testing.T) {
pCfg.MemorySpikeLimitMiB = 1907
pCfg.CheckInterval = 100 * time.Millisecond

errorCheck := fmt.Errorf("no existing monitoring routine is running")
tp, err = factory.CreateTracesProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, tp)
assert.NoError(t, tp.Shutdown(context.Background()))
// test if we can shutdown a monitoring routine that has not started
assert.Error(t, errorCheck, tp.Shutdown(context.Background()))
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))

mp, err = factory.CreateMetricsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, mp)
assert.NoError(t, mp.Shutdown(context.Background()))
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

lp, err = factory.CreateLogsProcessor(context.Background(), componenttest.NewNopProcessorCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, lp)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))

assert.NoError(t, lp.Shutdown(context.Background()))
assert.NoError(t, tp.Shutdown(context.Background()))
assert.NoError(t, mp.Shutdown(context.Background()))
// verify that no monitoring routine is running
assert.Error(t, errorCheck, tp.Shutdown(context.Background()))

// start and shutdown a new monitoring routine
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, lp.Shutdown(context.Background()))
// calling it again should throw an error
assert.Error(t, errorCheck, lp.Shutdown(context.Background()))
}
35 changes: 26 additions & 9 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"runtime"
"sync"
"sync/atomic"
"time"

Expand Down Expand Up @@ -81,6 +82,9 @@ type memoryLimiter struct {
configMismatchedLogged bool

obsrep *obsreport.Processor

refCounterLock sync.Mutex
refCounter int
}

// Minimum interval between forced GC when in soft limited mode. We don't want to
Expand Down Expand Up @@ -148,13 +152,20 @@ func (ml *memoryLimiter) start(_ context.Context, host component.Host) error {
break
}
}

ml.startMonitoring()
return nil
}

func (ml *memoryLimiter) shutdown(context.Context) error {
ml.ticker.Stop()
ml.refCounterLock.Lock()
defer ml.refCounterLock.Unlock()

if ml.refCounter == 0 {
return fmt.Errorf("no existing monitoring routine is running")
} else if ml.refCounter == 1 {
ml.ticker.Stop()
}
ml.refCounter--
return nil
}

Expand Down Expand Up @@ -230,14 +241,20 @@ func (ml *memoryLimiter) readMemStats() *runtime.MemStats {
return ms
}

// startMonitoring starts a ticker'd goroutine that will check memory usage
// every checkInterval period.
// startMonitoring starts a single ticker'd goroutine per instance
// that will check memory usage every checkInterval period.
func (ml *memoryLimiter) startMonitoring() {
go func() {
for range ml.ticker.C {
ml.checkMemLimits()
}
}()
ml.refCounterLock.Lock()
defer ml.refCounterLock.Unlock()

ml.refCounter++
if ml.refCounter == 1 {
go func() {
for range ml.ticker.C {
ml.checkMemLimits()
}
}()
}
}

// forcingDrop indicates when memory resources need to be released.
Expand Down
1 change: 1 addition & 0 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ func TestNew(t *testing.T) {
return
}
if got != nil {
assert.NoError(t, got.start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, got.shutdown(context.Background()))
}
})
Expand Down

0 comments on commit 52d6518

Please sign in to comment.