Skip to content

Commit

Permalink
[processor/memory_limiter] Update config validation errors
Browse files Browse the repository at this point in the history
  - Fix names of the config fields that are validated in the error messages
  - Move the validation from start to the initialization phrase
  • Loading branch information
dmitryax committed Dec 11, 2023
1 parent b0f618e commit 78ad81b
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 130 deletions.
27 changes: 27 additions & 0 deletions .chloggen/memory-limiter-update-config-validation-errors.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: processor/memory_limiter

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update config validation errors

# One or more tracking issues or pull requests related to the change
issues: [9059]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
- Fix names of the config fields that are validated in the error messages
- Move the validation from start to the initialization phrase
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
15 changes: 15 additions & 0 deletions processor/memorylimiterprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,20 @@ var _ component.Config = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if cfg.CheckInterval <= 0 {
return errCheckIntervalOutOfRange
}
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return errLimitOutOfRange
}
if cfg.MemoryLimitPercentage > 100 || cfg.MemorySpikePercentage > 100 {
return errPercentageLimitOutOfRange
}
if cfg.MemoryLimitMiB > 0 && cfg.MemoryLimitMiB <= cfg.MemorySpikeLimitMiB {
return errMemSpikeLimitOutOfRange
}
if cfg.MemoryLimitPercentage > 0 && cfg.MemoryLimitPercentage <= cfg.MemorySpikePercentage {
return errMemSpikePercentageLimitOutOfRange
}
return nil
}
68 changes: 68 additions & 0 deletions processor/memorylimiterprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,71 @@ func TestUnmarshalConfig(t *testing.T) {
MemorySpikeLimitMiB: 500,
}, cfg)
}

func TestConfigValidate(t *testing.T) {
tests := []struct {
name string
cfg *Config
err error
}{
{
name: "valid",
cfg: func() *Config {
cfg := createDefaultConfig().(*Config)
cfg.MemoryLimitMiB = 5722
cfg.MemorySpikeLimitMiB = 1907
cfg.CheckInterval = 100 * time.Millisecond
return cfg
}(),
err: nil,
},
{
name: "zero check interval",
cfg: &Config{
CheckInterval: 0,
},
err: errCheckIntervalOutOfRange,
},
{
name: "unset memory limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 0,
MemoryLimitPercentage: 0,
},
err: errLimitOutOfRange,
},
{
name: "invalid memory spike limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitMiB: 10,
MemorySpikeLimitMiB: 10,
},
err: errMemSpikeLimitOutOfRange,
},
{
name: "invalid memory percentage limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 101,
},
err: errPercentageLimitOutOfRange,
},
{
name: "invalid memory spike percentage limit",
cfg: &Config{
CheckInterval: 1 * time.Second,
MemoryLimitPercentage: 50,
MemorySpikePercentage: 60,
},
err: errMemSpikePercentageLimitOutOfRange,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := tt.cfg.Validate()
assert.Equal(t, tt.err, err)
})
}
}
19 changes: 3 additions & 16 deletions processor/memorylimiterprocessor/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,25 @@ func TestCreateProcessor(t *testing.T) {

cfg := factory.CreateDefaultConfig()

// This processor can't be created with the default config.
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Nil(t, tp)
assert.Error(t, err, "created processor with invalid settings")

mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Nil(t, mp)
assert.Error(t, err, "created processor with invalid settings")

lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.Nil(t, lp)
assert.Error(t, err, "created processor with invalid settings")

// Create processor with a valid config.
pCfg := cfg.(*Config)
pCfg.MemoryLimitMiB = 5722
pCfg.MemorySpikeLimitMiB = 1907
pCfg.CheckInterval = 100 * time.Millisecond

tp, err = factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, tp)
// test if we can shutdown a monitoring routine that has not started
assert.ErrorIs(t, tp.Shutdown(context.Background()), errShutdownNotStarted)
assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost()))

mp, err = factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, mp)
assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost()))

lp, err = factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop())
assert.NoError(t, err)
assert.NotNil(t, lp)
assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost()))
Expand Down
38 changes: 12 additions & 26 deletions processor/memorylimiterprocessor/memorylimiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,18 +34,16 @@ var (

// Construction errors

errCheckIntervalOutOfRange = errors.New(
"checkInterval must be greater than zero")
errCheckIntervalOutOfRange = errors.New("check_interval must be greater than zero")

errLimitOutOfRange = errors.New(
"memAllocLimit or memoryLimitPercentage must be greater than zero")
errLimitOutOfRange = errors.New("limit_mib or limit_percentage must be greater than zero")

errMemSpikeLimitOutOfRange = errors.New(
"memSpikeLimit must be smaller than memAllocLimit")
errMemSpikeLimitOutOfRange = errors.New("spike_limit_mib must be smaller than limit_mib")

errMemSpikePercentageLimitOutOfRange = errors.New("spike_limit_percentage must be smaller than limit_percentage")

errPercentageLimitOutOfRange = errors.New(
"memoryLimitPercentage and memorySpikePercentage must be greater than zero and less than or equal to hundred",
)
"limit_percentage and spike_limit_percentage must be greater than zero and less than or equal to hundred")

errShutdownNotStarted = errors.New("no existing monitoring routine is running")
)
Expand Down Expand Up @@ -86,13 +84,6 @@ const minGCIntervalWhenSoftLimited = 10 * time.Second

// newMemoryLimiter returns a new memorylimiter processor.
func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter, error) {
if cfg.CheckInterval <= 0 {
return nil, errCheckIntervalOutOfRange
}
if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 {
return nil, errLimitOutOfRange
}

logger := set.Logger
usageChecker, err := getMemUsageChecker(cfg, logger)
if err != nil {
Expand Down Expand Up @@ -129,7 +120,7 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes
memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes
if cfg.MemoryLimitMiB != 0 {
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit)
return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil
}
totalMemory, err := getMemoryFn()
if err != nil {
Expand All @@ -139,7 +130,8 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro
zap.Uint64("total_memory_mib", totalMemory/mibBytes),
zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage),
zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage))
return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage),
uint64(cfg.MemorySpikePercentage)), nil
}

func (ml *memoryLimiter) start(_ context.Context, host component.Host) error {
Expand Down Expand Up @@ -319,23 +311,17 @@ func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool {
return ms.Alloc >= d.memAllocLimit
}

func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) {
if memSpikeLimit >= memAllocLimit {
return nil, errMemSpikeLimitOutOfRange
}
func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) *memUsageChecker {
if memSpikeLimit == 0 {
// If spike limit is unspecified use 20% of mem limit.
memSpikeLimit = memAllocLimit / 5
}
return &memUsageChecker{
memAllocLimit: memAllocLimit,
memSpikeLimit: memSpikeLimit,
}, nil
}
}

func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) (*memUsageChecker, error) {
if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 {
return nil, errPercentageLimitOutOfRange
}
func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) *memUsageChecker {
return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100)
}
91 changes: 3 additions & 88 deletions processor/memorylimiterprocessor/memorylimiter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configtelemetry"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumertest"
"go.opentelemetry.io/collector/internal/iruntime"
"go.opentelemetry.io/collector/pdata/plog"
Expand All @@ -29,71 +28,6 @@ import (
"go.opentelemetry.io/collector/processor/processortest"
)

func TestNew(t *testing.T) {
type args struct {
nextConsumer consumer.Traces
checkInterval time.Duration
memoryLimitMiB uint32
memorySpikeLimitMiB uint32
}
sink := new(consumertest.TracesSink)
tests := []struct {
name string
args args
wantErr error
}{
{
name: "zero_checkInterval",
args: args{
nextConsumer: sink,
},
wantErr: errCheckIntervalOutOfRange,
},
{
name: "zero_memAllocLimit",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
},
wantErr: errLimitOutOfRange,
},
{
name: "memSpikeLimit_gt_memAllocLimit",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
memoryLimitMiB: 1,
memorySpikeLimitMiB: 2,
},
wantErr: errMemSpikeLimitOutOfRange,
},
{
name: "success",
args: args{
nextConsumer: sink,
checkInterval: 100 * time.Millisecond,
memoryLimitMiB: 1024,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.CheckInterval = tt.args.checkInterval
cfg.MemoryLimitMiB = tt.args.memoryLimitMiB
cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB
got, err := newMemoryLimiter(processortest.NewNopCreateSettings(), cfg)
if tt.wantErr != nil {
assert.ErrorIs(t, err, tt.wantErr)
return
}
assert.NoError(t, err)
assert.NoError(t, got.start(context.Background(), componenttest.NewNopHost()))
assert.NoError(t, got.shutdown(context.Background()))
})
}
}

// TestMetricsMemoryPressureResponse manipulates results from querying memory and
// check expected side effects.
func TestMetricsMemoryPressureResponse(t *testing.T) {
Expand Down Expand Up @@ -309,11 +243,6 @@ func TestGetDecision(t *testing.T) {
memSpikeLimit: 20 * mibBytes,
}, d)
})
t.Run("fixed_limit_error", func(t *testing.T) {
d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop())
require.Error(t, err)
assert.Nil(t, d)
})

t.Cleanup(func() {
getMemoryFn = iruntime.TotalMemory
Expand All @@ -329,26 +258,12 @@ func TestGetDecision(t *testing.T) {
memSpikeLimit: 10 * mibBytes,
}, d)
})
t.Run("percentage_limit_error", func(t *testing.T) {
d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop())
require.Error(t, err)
assert.Nil(t, d)
d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop())
require.Error(t, err)
assert.Nil(t, d)
})
}

func TestRefuseDecision(t *testing.T) {
decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30)
require.NoError(t, err)
decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50)
require.NoError(t, err)
decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20)
require.NoError(t, err)
decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60)
require.Error(t, err)
assert.Nil(t, decison1000Limit40Spike60)
decison1000Limit30Spike30 := newPercentageMemUsageChecker(1000, 60, 30)
decison1000Limit60Spike50 := newPercentageMemUsageChecker(1000, 60, 50)
decison1000Limit40Spike20 := newPercentageMemUsageChecker(1000, 40, 20)

tests := []struct {
name string
Expand Down

0 comments on commit 78ad81b

Please sign in to comment.