From 78ad81bfedf2087ce5922e8d2b7df9006b9a09c3 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Sun, 10 Dec 2023 15:42:21 -0800 Subject: [PATCH] [processor/memory_limiter] Update config validation errors - Fix names of the config fields that are validated in the error messages - Move the validation from start to the initialization phrase --- ...miter-update-config-validation-errors.yaml | 27 ++++++ processor/memorylimiterprocessor/config.go | 15 +++ .../memorylimiterprocessor/config_test.go | 68 ++++++++++++++ .../memorylimiterprocessor/factory_test.go | 19 +--- .../memorylimiterprocessor/memorylimiter.go | 38 +++----- .../memorylimiter_test.go | 91 +------------------ 6 files changed, 128 insertions(+), 130 deletions(-) create mode 100755 .chloggen/memory-limiter-update-config-validation-errors.yaml diff --git a/.chloggen/memory-limiter-update-config-validation-errors.yaml b/.chloggen/memory-limiter-update-config-validation-errors.yaml new file mode 100755 index 00000000000..eb7730e8da6 --- /dev/null +++ b/.chloggen/memory-limiter-update-config-validation-errors.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: processor/memory_limiter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Update config validation errors + +# One or more tracking issues or pull requests related to the change +issues: [9059] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + - Fix names of the config fields that are validated in the error messages + - Move the validation from start to the initialization phrase + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] \ No newline at end of file diff --git a/processor/memorylimiterprocessor/config.go b/processor/memorylimiterprocessor/config.go index e16ebeb67ee..3f526fb4f4f 100644 --- a/processor/memorylimiterprocessor/config.go +++ b/processor/memorylimiterprocessor/config.go @@ -39,5 +39,20 @@ var _ component.Config = (*Config)(nil) // Validate checks if the processor configuration is valid func (cfg *Config) Validate() error { + if cfg.CheckInterval <= 0 { + return errCheckIntervalOutOfRange + } + if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 { + return errLimitOutOfRange + } + if cfg.MemoryLimitPercentage > 100 || cfg.MemorySpikePercentage > 100 { + return errPercentageLimitOutOfRange + } + if cfg.MemoryLimitMiB > 0 && cfg.MemoryLimitMiB <= cfg.MemorySpikeLimitMiB { + return errMemSpikeLimitOutOfRange + } + if cfg.MemoryLimitPercentage > 0 && cfg.MemoryLimitPercentage <= cfg.MemorySpikePercentage { + return errMemSpikePercentageLimitOutOfRange + } return nil } diff --git a/processor/memorylimiterprocessor/config_test.go b/processor/memorylimiterprocessor/config_test.go index 076d037dba0..37f9f7a51c5 100644 --- a/processor/memorylimiterprocessor/config_test.go +++ b/processor/memorylimiterprocessor/config_test.go @@ -36,3 +36,71 @@ func TestUnmarshalConfig(t *testing.T) { MemorySpikeLimitMiB: 500, }, cfg) } + +func TestConfigValidate(t *testing.T) { + tests := []struct { + name string + cfg *Config + err error + }{ + { + name: "valid", + cfg: func() *Config { + cfg := createDefaultConfig().(*Config) + cfg.MemoryLimitMiB = 5722 + cfg.MemorySpikeLimitMiB = 1907 + cfg.CheckInterval = 100 * time.Millisecond + return cfg + }(), + err: nil, + }, + { + name: "zero check interval", + cfg: &Config{ + CheckInterval: 0, + }, + err: errCheckIntervalOutOfRange, + }, + { + name: "unset memory limit", + cfg: &Config{ + CheckInterval: 1 * time.Second, + MemoryLimitMiB: 0, + MemoryLimitPercentage: 0, + }, + err: errLimitOutOfRange, + }, + { + name: "invalid memory spike limit", + cfg: &Config{ + CheckInterval: 1 * time.Second, + MemoryLimitMiB: 10, + MemorySpikeLimitMiB: 10, + }, + err: errMemSpikeLimitOutOfRange, + }, + { + name: "invalid memory percentage limit", + cfg: &Config{ + CheckInterval: 1 * time.Second, + MemoryLimitPercentage: 101, + }, + err: errPercentageLimitOutOfRange, + }, + { + name: "invalid memory spike percentage limit", + cfg: &Config{ + CheckInterval: 1 * time.Second, + MemoryLimitPercentage: 50, + MemorySpikePercentage: 60, + }, + err: errMemSpikePercentageLimitOutOfRange, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + assert.Equal(t, tt.err, err) + }) + } +} diff --git a/processor/memorylimiterprocessor/factory_test.go b/processor/memorylimiterprocessor/factory_test.go index e7a274dba06..29816264b64 100644 --- a/processor/memorylimiterprocessor/factory_test.go +++ b/processor/memorylimiterprocessor/factory_test.go @@ -31,38 +31,25 @@ func TestCreateProcessor(t *testing.T) { cfg := factory.CreateDefaultConfig() - // This processor can't be created with the default config. - tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) - assert.Nil(t, tp) - assert.Error(t, err, "created processor with invalid settings") - - mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) - assert.Nil(t, mp) - assert.Error(t, err, "created processor with invalid settings") - - lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) - assert.Nil(t, lp) - assert.Error(t, err, "created processor with invalid settings") - // Create processor with a valid config. pCfg := cfg.(*Config) pCfg.MemoryLimitMiB = 5722 pCfg.MemorySpikeLimitMiB = 1907 pCfg.CheckInterval = 100 * time.Millisecond - tp, err = factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + tp, err := factory.CreateTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, tp) // test if we can shutdown a monitoring routine that has not started assert.ErrorIs(t, tp.Shutdown(context.Background()), errShutdownNotStarted) assert.NoError(t, tp.Start(context.Background(), componenttest.NewNopHost())) - mp, err = factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + mp, err := factory.CreateMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, mp) assert.NoError(t, mp.Start(context.Background(), componenttest.NewNopHost())) - lp, err = factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) + lp, err := factory.CreateLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), cfg, consumertest.NewNop()) assert.NoError(t, err) assert.NotNil(t, lp) assert.NoError(t, lp.Start(context.Background(), componenttest.NewNopHost())) diff --git a/processor/memorylimiterprocessor/memorylimiter.go b/processor/memorylimiterprocessor/memorylimiter.go index 0520e13339a..6cf4a3e67f6 100644 --- a/processor/memorylimiterprocessor/memorylimiter.go +++ b/processor/memorylimiterprocessor/memorylimiter.go @@ -34,18 +34,16 @@ var ( // Construction errors - errCheckIntervalOutOfRange = errors.New( - "checkInterval must be greater than zero") + errCheckIntervalOutOfRange = errors.New("check_interval must be greater than zero") - errLimitOutOfRange = errors.New( - "memAllocLimit or memoryLimitPercentage must be greater than zero") + errLimitOutOfRange = errors.New("limit_mib or limit_percentage must be greater than zero") - errMemSpikeLimitOutOfRange = errors.New( - "memSpikeLimit must be smaller than memAllocLimit") + errMemSpikeLimitOutOfRange = errors.New("spike_limit_mib must be smaller than limit_mib") + + errMemSpikePercentageLimitOutOfRange = errors.New("spike_limit_percentage must be smaller than limit_percentage") errPercentageLimitOutOfRange = errors.New( - "memoryLimitPercentage and memorySpikePercentage must be greater than zero and less than or equal to hundred", - ) + "limit_percentage and spike_limit_percentage must be greater than zero and less than or equal to hundred") errShutdownNotStarted = errors.New("no existing monitoring routine is running") ) @@ -86,13 +84,6 @@ const minGCIntervalWhenSoftLimited = 10 * time.Second // newMemoryLimiter returns a new memorylimiter processor. func newMemoryLimiter(set processor.CreateSettings, cfg *Config) (*memoryLimiter, error) { - if cfg.CheckInterval <= 0 { - return nil, errCheckIntervalOutOfRange - } - if cfg.MemoryLimitMiB == 0 && cfg.MemoryLimitPercentage == 0 { - return nil, errLimitOutOfRange - } - logger := set.Logger usageChecker, err := getMemUsageChecker(cfg, logger) if err != nil { @@ -129,7 +120,7 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro memAllocLimit := uint64(cfg.MemoryLimitMiB) * mibBytes memSpikeLimit := uint64(cfg.MemorySpikeLimitMiB) * mibBytes if cfg.MemoryLimitMiB != 0 { - return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit) + return newFixedMemUsageChecker(memAllocLimit, memSpikeLimit), nil } totalMemory, err := getMemoryFn() if err != nil { @@ -139,7 +130,8 @@ func getMemUsageChecker(cfg *Config, logger *zap.Logger) (*memUsageChecker, erro zap.Uint64("total_memory_mib", totalMemory/mibBytes), zap.Uint32("limit_percentage", cfg.MemoryLimitPercentage), zap.Uint32("spike_limit_percentage", cfg.MemorySpikePercentage)) - return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), uint64(cfg.MemorySpikePercentage)) + return newPercentageMemUsageChecker(totalMemory, uint64(cfg.MemoryLimitPercentage), + uint64(cfg.MemorySpikePercentage)), nil } func (ml *memoryLimiter) start(_ context.Context, host component.Host) error { @@ -319,10 +311,7 @@ func (d memUsageChecker) aboveHardLimit(ms *runtime.MemStats) bool { return ms.Alloc >= d.memAllocLimit } -func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChecker, error) { - if memSpikeLimit >= memAllocLimit { - return nil, errMemSpikeLimitOutOfRange - } +func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) *memUsageChecker { if memSpikeLimit == 0 { // If spike limit is unspecified use 20% of mem limit. memSpikeLimit = memAllocLimit / 5 @@ -330,12 +319,9 @@ func newFixedMemUsageChecker(memAllocLimit, memSpikeLimit uint64) (*memUsageChec return &memUsageChecker{ memAllocLimit: memAllocLimit, memSpikeLimit: memSpikeLimit, - }, nil + } } -func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) (*memUsageChecker, error) { - if percentageLimit > 100 || percentageLimit <= 0 || percentageSpike > 100 || percentageSpike <= 0 { - return nil, errPercentageLimitOutOfRange - } +func newPercentageMemUsageChecker(totalMemory uint64, percentageLimit, percentageSpike uint64) *memUsageChecker { return newFixedMemUsageChecker(percentageLimit*totalMemory/100, percentageSpike*totalMemory/100) } diff --git a/processor/memorylimiterprocessor/memorylimiter_test.go b/processor/memorylimiterprocessor/memorylimiter_test.go index c13e1b90b8b..c1afdc17bcf 100644 --- a/processor/memorylimiterprocessor/memorylimiter_test.go +++ b/processor/memorylimiterprocessor/memorylimiter_test.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configtelemetry" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/internal/iruntime" "go.opentelemetry.io/collector/pdata/plog" @@ -29,71 +28,6 @@ import ( "go.opentelemetry.io/collector/processor/processortest" ) -func TestNew(t *testing.T) { - type args struct { - nextConsumer consumer.Traces - checkInterval time.Duration - memoryLimitMiB uint32 - memorySpikeLimitMiB uint32 - } - sink := new(consumertest.TracesSink) - tests := []struct { - name string - args args - wantErr error - }{ - { - name: "zero_checkInterval", - args: args{ - nextConsumer: sink, - }, - wantErr: errCheckIntervalOutOfRange, - }, - { - name: "zero_memAllocLimit", - args: args{ - nextConsumer: sink, - checkInterval: 100 * time.Millisecond, - }, - wantErr: errLimitOutOfRange, - }, - { - name: "memSpikeLimit_gt_memAllocLimit", - args: args{ - nextConsumer: sink, - checkInterval: 100 * time.Millisecond, - memoryLimitMiB: 1, - memorySpikeLimitMiB: 2, - }, - wantErr: errMemSpikeLimitOutOfRange, - }, - { - name: "success", - args: args{ - nextConsumer: sink, - checkInterval: 100 * time.Millisecond, - memoryLimitMiB: 1024, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - cfg := createDefaultConfig().(*Config) - cfg.CheckInterval = tt.args.checkInterval - cfg.MemoryLimitMiB = tt.args.memoryLimitMiB - cfg.MemorySpikeLimitMiB = tt.args.memorySpikeLimitMiB - got, err := newMemoryLimiter(processortest.NewNopCreateSettings(), cfg) - if tt.wantErr != nil { - assert.ErrorIs(t, err, tt.wantErr) - return - } - assert.NoError(t, err) - assert.NoError(t, got.start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, got.shutdown(context.Background())) - }) - } -} - // TestMetricsMemoryPressureResponse manipulates results from querying memory and // check expected side effects. func TestMetricsMemoryPressureResponse(t *testing.T) { @@ -309,11 +243,6 @@ func TestGetDecision(t *testing.T) { memSpikeLimit: 20 * mibBytes, }, d) }) - t.Run("fixed_limit_error", func(t *testing.T) { - d, err := getMemUsageChecker(&Config{MemoryLimitMiB: 20, MemorySpikeLimitMiB: 100}, zap.NewNop()) - require.Error(t, err) - assert.Nil(t, d) - }) t.Cleanup(func() { getMemoryFn = iruntime.TotalMemory @@ -329,26 +258,12 @@ func TestGetDecision(t *testing.T) { memSpikeLimit: 10 * mibBytes, }, d) }) - t.Run("percentage_limit_error", func(t *testing.T) { - d, err := getMemUsageChecker(&Config{MemoryLimitPercentage: 101, MemorySpikePercentage: 10}, zap.NewNop()) - require.Error(t, err) - assert.Nil(t, d) - d, err = getMemUsageChecker(&Config{MemoryLimitPercentage: 99, MemorySpikePercentage: 101}, zap.NewNop()) - require.Error(t, err) - assert.Nil(t, d) - }) } func TestRefuseDecision(t *testing.T) { - decison1000Limit30Spike30, err := newPercentageMemUsageChecker(1000, 60, 30) - require.NoError(t, err) - decison1000Limit60Spike50, err := newPercentageMemUsageChecker(1000, 60, 50) - require.NoError(t, err) - decison1000Limit40Spike20, err := newPercentageMemUsageChecker(1000, 40, 20) - require.NoError(t, err) - decison1000Limit40Spike60, err := newPercentageMemUsageChecker(1000, 40, 60) - require.Error(t, err) - assert.Nil(t, decison1000Limit40Spike60) + decison1000Limit30Spike30 := newPercentageMemUsageChecker(1000, 60, 30) + decison1000Limit60Spike50 := newPercentageMemUsageChecker(1000, 60, 50) + decison1000Limit40Spike20 := newPercentageMemUsageChecker(1000, 40, 20) tests := []struct { name string