From d3bdb82142e41d3e008cdb4beb9dd60b9efc9a73 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Wed, 7 Feb 2024 16:55:59 -0800 Subject: [PATCH 1/2] [component] Deprecate ErrNilNextConsumer --- .chloggen/deprecate_ErrNilNextConsumer.yaml | 25 ++++ component/component.go | 2 + connector/connector.go | 32 ++++ connector/connector_test.go | 139 +++++++++++------- processor/processor.go | 14 ++ processor/processor_test.go | 73 +++++---- processor/processorhelper/logs.go | 4 - processor/processorhelper/logs_test.go | 3 - processor/processorhelper/metrics.go | 4 - processor/processorhelper/metrics_test.go | 3 - processor/processorhelper/traces.go | 4 - processor/processorhelper/traces_test.go | 3 - receiver/otlpreceiver/factory.go | 12 +- receiver/otlpreceiver/factory_test.go | 45 ------ receiver/otlpreceiver/otlp.go | 18 +-- receiver/otlpreceiver/otlp_test.go | 6 +- receiver/receiver.go | 20 ++- receiver/receiver_test.go | 67 ++++++--- receiver/scraperhelper/scrapercontroller.go | 3 - .../scraperhelper/scrapercontroller_test.go | 14 +- 20 files changed, 275 insertions(+), 216 deletions(-) create mode 100755 .chloggen/deprecate_ErrNilNextConsumer.yaml diff --git a/.chloggen/deprecate_ErrNilNextConsumer.yaml b/.chloggen/deprecate_ErrNilNextConsumer.yaml new file mode 100755 index 00000000000..67c2a8c4cb0 --- /dev/null +++ b/.chloggen/deprecate_ErrNilNextConsumer.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: component + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: deprecate component.ErrNilNextConsumer + +# One or more tracking issues or pull requests related to the change +issues: [9526] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/component/component.go b/component/component.go index e5f9e78c978..2408fa647b3 100644 --- a/component/component.go +++ b/component/component.go @@ -11,6 +11,8 @@ import ( var ( // ErrNilNextConsumer can be returned by receiver, or processor Start factory funcs that create the Component if the // expected next Consumer is nil. + // Deprecated: [v0.95.0] The next consumer is now checked as part of the creation of the pipelines. + // This error will be removed in a future release. ErrNilNextConsumer = errors.New("nil next Consumer") // ErrDataTypeIsNotSupported can be returned by receiver, exporter or processor factory funcs that create the diff --git a/connector/connector.go b/connector/connector.go index 9fc576cb54e..94927092117 100644 --- a/connector/connector.go +++ b/connector/connector.go @@ -5,6 +5,7 @@ package connector // import "go.opentelemetry.io/collector/connector" import ( "context" + "errors" "fmt" "go.uber.org/zap" @@ -13,6 +14,10 @@ import ( "go.opentelemetry.io/collector/consumer" ) +var ( + errNilNextConsumer = errors.New("nil next Consumer") +) + // A Traces connector acts as an exporter from a traces pipeline and a receiver // to one or more traces, metrics, or logs pipelines. // Traces feeds a consumer.Traces, consumer.Metrics, or consumer.Logs with data. @@ -456,6 +461,9 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component. // CreateTracesToTraces creates a Traces connector based on the settings and config. func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -472,6 +480,9 @@ func (b *Builder) CreateTracesToTraces(ctx context.Context, set CreateSettings, // CreateTracesToMetrics creates a Traces connector based on the settings and config. func (b *Builder) CreateTracesToMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -488,6 +499,9 @@ func (b *Builder) CreateTracesToMetrics(ctx context.Context, set CreateSettings, // CreateTracesToLogs creates a Traces connector based on the settings and config. func (b *Builder) CreateTracesToLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -504,6 +518,9 @@ func (b *Builder) CreateTracesToLogs(ctx context.Context, set CreateSettings, ne // CreateMetricsToTraces creates a Metrics connector based on the settings and config. func (b *Builder) CreateMetricsToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -520,6 +537,9 @@ func (b *Builder) CreateMetricsToTraces(ctx context.Context, set CreateSettings, // CreateMetricsToMetrics creates a Metrics connector based on the settings and config. func (b *Builder) CreateMetricsToMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -536,6 +556,9 @@ func (b *Builder) CreateMetricsToMetrics(ctx context.Context, set CreateSettings // CreateMetricsToLogs creates a Metrics connector based on the settings and config. func (b *Builder) CreateMetricsToLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -552,6 +575,9 @@ func (b *Builder) CreateMetricsToLogs(ctx context.Context, set CreateSettings, n // CreateLogsToTraces creates a Logs connector based on the settings and config. func (b *Builder) CreateLogsToTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -568,6 +594,9 @@ func (b *Builder) CreateLogsToTraces(ctx context.Context, set CreateSettings, ne // CreateLogsToMetrics creates a Logs connector based on the settings and config. func (b *Builder) CreateLogsToMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) @@ -584,6 +613,9 @@ func (b *Builder) CreateLogsToMetrics(ctx context.Context, set CreateSettings, n // CreateLogsToLogs creates a Logs connector based on the settings and config. func (b *Builder) CreateLogsToLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("connector %q is not configured", set.ID) diff --git a/connector/connector_test.go b/connector/connector_test.go index 9c7facd8512..7384b50621d 100644 --- a/connector/connector_test.go +++ b/connector/connector_test.go @@ -28,25 +28,25 @@ func TestNewFactoryNoOptions(t *testing.T) { assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeTraces)) - _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeMetrics)) - _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeLogs)) - _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeTraces)) - _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeMetrics)) - _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeLogs)) - _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeTraces)) - _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeMetrics)) - _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeLogs)) } @@ -60,30 +60,30 @@ func TestNewFactoryWithSameTypes(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToTracesStability()) - _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToMetricsStability()) - _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToLogsStability()) - _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) - _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeMetrics)) - _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeLogs)) - _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeTraces)) - _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeLogs)) - _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeTraces)) - _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeMetrics)) } @@ -99,35 +99,35 @@ func TestNewFactoryWithTranslateTypes(t *testing.T) { assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeTraces, component.DataTypeTraces)) - _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeMetrics, component.DataTypeMetrics)) - _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.Equal(t, err, errDataTypes(testID, component.DataTypeLogs, component.DataTypeLogs)) assert.Equal(t, component.StabilityLevelDevelopment, factory.TracesToMetricsStability()) - _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToLogsStability()) - _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToTracesStability()) - _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelStable, factory.MetricsToLogsStability()) - _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelDeprecated, factory.LogsToTracesStability()) - _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToMetricsStability()) - _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, nil) + _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{ID: testID}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) } @@ -147,33 +147,33 @@ func TestNewFactoryWithAllTypes(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToTracesStability()) - _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err := factory.CreateTracesToTraces(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelDevelopment, factory.TracesToMetricsStability()) - _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateTracesToMetrics(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesToLogsStability()) - _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateTracesToLogs(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToTracesStability()) - _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateMetricsToTraces(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsToMetricsStability()) - _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateMetricsToMetrics(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelStable, factory.MetricsToLogsStability()) - _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateMetricsToLogs(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelDeprecated, factory.LogsToTracesStability()) - _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateLogsToTraces(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToMetricsStability()) - _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateLogsToMetrics(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsToLogsStability()) - _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateLogsToLogs(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) } @@ -236,9 +236,12 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) testCases := []struct { - name string - id component.ID - err func(component.DataType, component.DataType) string + name string + id component.ID + err func(component.DataType, component.DataType) string + nextTraces consumer.Traces + nextLogs consumer.Logs + nextMetrics consumer.Metrics }{ { name: "unknown", @@ -246,6 +249,9 @@ func TestBuilder(t *testing.T) { err: func(component.DataType, component.DataType) string { return "connector factory not available for: \"unknown\"" }, + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { name: "err", @@ -253,6 +259,9 @@ func TestBuilder(t *testing.T) { err: func(expType, rcvType component.DataType) string { return fmt.Sprintf("connector \"err\" cannot connect from %s to %s: telemetry type is not supported", expType, rcvType) }, + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { name: "all", @@ -260,6 +269,9 @@ func TestBuilder(t *testing.T) { err: func(component.DataType, component.DataType) string { return "" }, + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { name: "all/named", @@ -267,6 +279,19 @@ func TestBuilder(t *testing.T) { err: func(component.DataType, component.DataType) string { return "" }, + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), + }, + { + name: "no next consumer", + id: component.MustNewID("unknown"), + err: func(_, _ component.DataType) string { + return "nil next Consumer" + }, + nextTraces: nil, + nextLogs: nil, + nextMetrics: nil, }, } @@ -275,7 +300,7 @@ func TestBuilder(t *testing.T) { cfgs := map[component.ID]component.Config{tt.id: defaultCfg} b := NewBuilder(cfgs, factories) - t2t, err := b.CreateTracesToTraces(context.Background(), createSettings(tt.id), nil) + t2t, err := b.CreateTracesToTraces(context.Background(), createSettings(tt.id), tt.nextTraces) if expectedErr := tt.err(component.DataTypeTraces, component.DataTypeTraces); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, t2t) @@ -283,7 +308,7 @@ func TestBuilder(t *testing.T) { assert.NoError(t, err) assert.Equal(t, nopInstance, t2t) } - t2m, err := b.CreateTracesToMetrics(context.Background(), createSettings(tt.id), nil) + t2m, err := b.CreateTracesToMetrics(context.Background(), createSettings(tt.id), tt.nextMetrics) if expectedErr := tt.err(component.DataTypeTraces, component.DataTypeMetrics); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, t2m) @@ -291,7 +316,7 @@ func TestBuilder(t *testing.T) { assert.NoError(t, err) assert.Equal(t, nopInstance, t2m) } - t2l, err := b.CreateTracesToLogs(context.Background(), createSettings(tt.id), nil) + t2l, err := b.CreateTracesToLogs(context.Background(), createSettings(tt.id), tt.nextLogs) if expectedErr := tt.err(component.DataTypeTraces, component.DataTypeLogs); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, t2l) @@ -300,7 +325,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, t2l) } - m2t, err := b.CreateMetricsToTraces(context.Background(), createSettings(tt.id), nil) + m2t, err := b.CreateMetricsToTraces(context.Background(), createSettings(tt.id), tt.nextTraces) if expectedErr := tt.err(component.DataTypeMetrics, component.DataTypeTraces); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, m2t) @@ -309,7 +334,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, m2t) } - m2m, err := b.CreateMetricsToMetrics(context.Background(), createSettings(tt.id), nil) + m2m, err := b.CreateMetricsToMetrics(context.Background(), createSettings(tt.id), tt.nextMetrics) if expectedErr := tt.err(component.DataTypeMetrics, component.DataTypeMetrics); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, m2m) @@ -318,7 +343,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, m2m) } - m2l, err := b.CreateMetricsToLogs(context.Background(), createSettings(tt.id), nil) + m2l, err := b.CreateMetricsToLogs(context.Background(), createSettings(tt.id), tt.nextLogs) if expectedErr := tt.err(component.DataTypeMetrics, component.DataTypeLogs); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, m2l) @@ -327,7 +352,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, m2l) } - l2t, err := b.CreateLogsToTraces(context.Background(), createSettings(tt.id), nil) + l2t, err := b.CreateLogsToTraces(context.Background(), createSettings(tt.id), tt.nextTraces) if expectedErr := tt.err(component.DataTypeLogs, component.DataTypeTraces); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, l2t) @@ -336,7 +361,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, l2t) } - l2m, err := b.CreateLogsToMetrics(context.Background(), createSettings(tt.id), nil) + l2m, err := b.CreateLogsToMetrics(context.Background(), createSettings(tt.id), tt.nextMetrics) if expectedErr := tt.err(component.DataTypeLogs, component.DataTypeMetrics); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, l2m) @@ -345,7 +370,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, l2m) } - l2l, err := b.CreateLogsToLogs(context.Background(), createSettings(tt.id), nil) + l2l, err := b.CreateLogsToLogs(context.Background(), createSettings(tt.id), tt.nextLogs) if expectedErr := tt.err(component.DataTypeLogs, component.DataTypeLogs); expectedErr != "" { assert.EqualError(t, err, expectedErr) assert.Nil(t, l2l) @@ -380,39 +405,39 @@ func TestBuilderMissingConfig(t *testing.T) { bErr := NewBuilder(map[component.ID]component.Config{}, factories) missingID := component.MustNewIDWithName("all", "missing") - t2t, err := bErr.CreateTracesToTraces(context.Background(), createSettings(missingID), nil) + t2t, err := bErr.CreateTracesToTraces(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, t2t) - t2m, err := bErr.CreateTracesToMetrics(context.Background(), createSettings(missingID), nil) + t2m, err := bErr.CreateTracesToMetrics(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, t2m) - t2l, err := bErr.CreateTracesToLogs(context.Background(), createSettings(missingID), nil) + t2l, err := bErr.CreateTracesToLogs(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, t2l) - m2t, err := bErr.CreateMetricsToTraces(context.Background(), createSettings(missingID), nil) + m2t, err := bErr.CreateMetricsToTraces(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, m2t) - m2m, err := bErr.CreateMetricsToMetrics(context.Background(), createSettings(missingID), nil) + m2m, err := bErr.CreateMetricsToMetrics(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, m2m) - m2l, err := bErr.CreateMetricsToLogs(context.Background(), createSettings(missingID), nil) + m2l, err := bErr.CreateMetricsToLogs(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, m2l) - l2t, err := bErr.CreateLogsToTraces(context.Background(), createSettings(missingID), nil) + l2t, err := bErr.CreateLogsToTraces(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, l2t) - l2m, err := bErr.CreateLogsToMetrics(context.Background(), createSettings(missingID), nil) + l2m, err := bErr.CreateLogsToMetrics(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, l2m) - l2l, err := bErr.CreateLogsToLogs(context.Background(), createSettings(missingID), nil) + l2l, err := bErr.CreateLogsToLogs(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "connector \"all/missing\" is not configured") assert.Nil(t, l2l) } diff --git a/processor/processor.go b/processor/processor.go index 5bb3f5953cb..98feba69a57 100644 --- a/processor/processor.go +++ b/processor/processor.go @@ -5,6 +5,7 @@ package processor // import "go.opentelemetry.io/collector/processor" import ( "context" + "errors" "fmt" "go.uber.org/zap" @@ -13,6 +14,10 @@ import ( "go.opentelemetry.io/collector/consumer" ) +var ( + errNilNextConsumer = errors.New("nil next Consumer") +) + // Traces is a processor that can consume traces. type Traces interface { component.Component @@ -229,6 +234,9 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component. // CreateTraces creates a Traces processor based on the settings and config. func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("processor %q is not configured", set.ID) @@ -245,6 +253,9 @@ func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next con // CreateMetrics creates a Metrics processor based on the settings and config. func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("processor %q is not configured", set.ID) @@ -261,6 +272,9 @@ func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next co // CreateLogs creates a Logs processor based on the settings and config. func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("processor %q is not configured", set.ID) diff --git a/processor/processor_test.go b/processor/processor_test.go index c72f7edb698..83207ba6955 100644 --- a/processor/processor_test.go +++ b/processor/processor_test.go @@ -24,11 +24,11 @@ func TestNewFactory(t *testing.T) { func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - _, err := factory.CreateTracesProcessor(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err := factory.CreateTracesProcessor(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) - _, err = factory.CreateMetricsProcessor(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateMetricsProcessor(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) - _, err = factory.CreateLogsProcessor(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateLogsProcessor(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) } @@ -45,15 +45,15 @@ func TestNewFactoryWithOptions(t *testing.T) { assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) assert.Equal(t, component.StabilityLevelAlpha, factory.TracesProcessorStability()) - _, err := factory.CreateTracesProcessor(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err := factory.CreateTracesProcessor(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelBeta, factory.MetricsProcessorStability()) - _, err = factory.CreateMetricsProcessor(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateMetricsProcessor(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) assert.Equal(t, component.StabilityLevelUnmaintained, factory.LogsProcessorStability()) - _, err = factory.CreateLogsProcessor(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateLogsProcessor(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.NoError(t, err) } @@ -110,27 +110,50 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) testCases := []struct { - name string - id component.ID - err string + name string + id component.ID + err string + nextTraces consumer.Traces + nextLogs consumer.Logs + nextMetrics consumer.Metrics }{ { - name: "unknown", - id: component.MustNewID("unknown"), - err: "processor factory not available for: \"unknown\"", + name: "unknown", + id: component.MustNewID("unknown"), + err: "processor factory not available for: \"unknown\"", + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), + }, + { + name: "err", + id: component.MustNewID("err"), + err: "telemetry type is not supported", + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { - name: "err", - id: component.MustNewID("err"), - err: "telemetry type is not supported", + name: "all", + id: component.MustNewID("all"), + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { - name: "all", - id: component.MustNewID("all"), + name: "all/named", + id: component.MustNewIDWithName("all", "named"), + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { - name: "all/named", - id: component.MustNewIDWithName("all", "named"), + name: "no next consumer", + id: component.MustNewID("unknown"), + err: "nil next Consumer", + nextTraces: nil, + nextLogs: nil, + nextMetrics: nil, }, } @@ -139,7 +162,7 @@ func TestBuilder(t *testing.T) { cfgs := map[component.ID]component.Config{tt.id: defaultCfg} b := NewBuilder(cfgs, factories) - te, err := b.CreateTraces(context.Background(), createSettings(tt.id), nil) + te, err := b.CreateTraces(context.Background(), createSettings(tt.id), tt.nextTraces) if tt.err != "" { assert.EqualError(t, err, tt.err) assert.Nil(t, te) @@ -148,7 +171,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, te) } - me, err := b.CreateMetrics(context.Background(), createSettings(tt.id), nil) + me, err := b.CreateMetrics(context.Background(), createSettings(tt.id), tt.nextMetrics) if tt.err != "" { assert.EqualError(t, err, tt.err) assert.Nil(t, me) @@ -157,7 +180,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, me) } - le, err := b.CreateLogs(context.Background(), createSettings(tt.id), nil) + le, err := b.CreateLogs(context.Background(), createSettings(tt.id), tt.nextLogs) if tt.err != "" { assert.EqualError(t, err, tt.err) assert.Nil(t, le) @@ -186,15 +209,15 @@ func TestBuilderMissingConfig(t *testing.T) { bErr := NewBuilder(map[component.ID]component.Config{}, factories) missingID := component.MustNewIDWithName("all", "missing") - te, err := bErr.CreateTraces(context.Background(), createSettings(missingID), nil) + te, err := bErr.CreateTraces(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "processor \"all/missing\" is not configured") assert.Nil(t, te) - me, err := bErr.CreateMetrics(context.Background(), createSettings(missingID), nil) + me, err := bErr.CreateMetrics(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "processor \"all/missing\" is not configured") assert.Nil(t, me) - le, err := bErr.CreateLogs(context.Background(), createSettings(missingID), nil) + le, err := bErr.CreateLogs(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "processor \"all/missing\" is not configured") assert.Nil(t, le) } diff --git a/processor/processorhelper/logs.go b/processor/processorhelper/logs.go index b392702584a..ade2f45a385 100644 --- a/processor/processorhelper/logs.go +++ b/processor/processorhelper/logs.go @@ -39,10 +39,6 @@ func NewLogsProcessor( return nil, errors.New("nil logsFunc") } - if nextConsumer == nil { - return nil, component.ErrNilNextConsumer - } - eventOptions := spanAttributes(set.ID) bs := fromOptions(options) logsConsumer, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { diff --git a/processor/processorhelper/logs_test.go b/processor/processorhelper/logs_test.go index 383459dc689..0fe43773309 100644 --- a/processor/processorhelper/logs_test.go +++ b/processor/processorhelper/logs_test.go @@ -47,9 +47,6 @@ func TestNewLogsProcessor_WithOptions(t *testing.T) { func TestNewLogsProcessor_NilRequiredFields(t *testing.T) { _, err := NewLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), &testLogsCfg, consumertest.NewNop(), nil) assert.Error(t, err) - - _, err = NewLogsProcessor(context.Background(), processortest.NewNopCreateSettings(), &testLogsCfg, nil, newTestLProcessor(nil)) - assert.Equal(t, component.ErrNilNextConsumer, err) } func TestNewLogsProcessor_ProcessLogError(t *testing.T) { diff --git a/processor/processorhelper/metrics.go b/processor/processorhelper/metrics.go index 08168c460d5..ac3802722d0 100644 --- a/processor/processorhelper/metrics.go +++ b/processor/processorhelper/metrics.go @@ -39,10 +39,6 @@ func NewMetricsProcessor( return nil, errors.New("nil metricsFunc") } - if nextConsumer == nil { - return nil, component.ErrNilNextConsumer - } - eventOptions := spanAttributes(set.ID) bs := fromOptions(options) metricsConsumer, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { diff --git a/processor/processorhelper/metrics_test.go b/processor/processorhelper/metrics_test.go index 19bd1b159a0..b965c4d45c3 100644 --- a/processor/processorhelper/metrics_test.go +++ b/processor/processorhelper/metrics_test.go @@ -47,9 +47,6 @@ func TestNewMetricsProcessor_WithOptions(t *testing.T) { func TestNewMetricsProcessor_NilRequiredFields(t *testing.T) { _, err := NewMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), &testMetricsCfg, consumertest.NewNop(), nil) assert.Error(t, err) - - _, err = NewMetricsProcessor(context.Background(), processortest.NewNopCreateSettings(), &testMetricsCfg, nil, newTestMProcessor(nil)) - assert.Equal(t, component.ErrNilNextConsumer, err) } func TestNewMetricsProcessor_ProcessMetricsError(t *testing.T) { diff --git a/processor/processorhelper/traces.go b/processor/processorhelper/traces.go index 703fa98e076..578f65c7efa 100644 --- a/processor/processorhelper/traces.go +++ b/processor/processorhelper/traces.go @@ -39,10 +39,6 @@ func NewTracesProcessor( return nil, errors.New("nil tracesFunc") } - if nextConsumer == nil { - return nil, component.ErrNilNextConsumer - } - eventOptions := spanAttributes(set.ID) bs := fromOptions(options) traceConsumer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { diff --git a/processor/processorhelper/traces_test.go b/processor/processorhelper/traces_test.go index 1ba4f6d1951..f2a59473a44 100644 --- a/processor/processorhelper/traces_test.go +++ b/processor/processorhelper/traces_test.go @@ -47,9 +47,6 @@ func TestNewTracesProcessor_WithOptions(t *testing.T) { func TestNewTracesProcessor_NilRequiredFields(t *testing.T) { _, err := NewTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), &testTracesCfg, consumertest.NewNop(), nil) assert.Error(t, err) - - _, err = NewTracesProcessor(context.Background(), processortest.NewNopCreateSettings(), &testTracesCfg, nil, newTestTProcessor(nil)) - assert.Equal(t, component.ErrNilNextConsumer, err) } func TestNewTracesProcessor_ProcessTraceError(t *testing.T) { diff --git a/receiver/otlpreceiver/factory.go b/receiver/otlpreceiver/factory.go index 204887f4788..3e1df39032b 100644 --- a/receiver/otlpreceiver/factory.go +++ b/receiver/otlpreceiver/factory.go @@ -80,9 +80,7 @@ func createTraces( return nil, err } - if err = r.Unwrap().registerTraceConsumer(nextConsumer); err != nil { - return nil, err - } + r.Unwrap().registerTraceConsumer(nextConsumer) return r, nil } @@ -105,9 +103,7 @@ func createMetrics( return nil, err } - if err = r.Unwrap().registerMetricsConsumer(consumer); err != nil { - return nil, err - } + r.Unwrap().registerMetricsConsumer(consumer) return r, nil } @@ -130,9 +126,7 @@ func createLog( return nil, err } - if err = r.Unwrap().registerLogsConsumer(consumer); err != nil { - return nil, err - } + r.Unwrap().registerLogsConsumer(consumer) return r, nil } diff --git a/receiver/otlpreceiver/factory_test.go b/receiver/otlpreceiver/factory_test.go index 82f8ea9b090..21a3a051f27 100644 --- a/receiver/otlpreceiver/factory_test.go +++ b/receiver/otlpreceiver/factory_test.go @@ -116,21 +116,6 @@ func TestCreateTracesReceiver(t *testing.T) { wantStartErr: true, sink: consumertest.NewNop(), }, - { - name: "no_next_consumer", - cfg: &Config{ - Protocols: Protocols{ - GRPC: defaultGRPCSettings, - HTTP: &HTTPConfig{ - ServerConfig: &confighttp.ServerConfig{ - Endpoint: "127.0.0.1:1122", - }, - }, - }, - }, - wantErr: true, - sink: nil, - }, { name: "no_http_or_grcp_config", cfg: &Config{ @@ -225,21 +210,6 @@ func TestCreateMetricReceiver(t *testing.T) { wantStartErr: true, sink: consumertest.NewNop(), }, - { - name: "no_next_consumer", - cfg: &Config{ - Protocols: Protocols{ - GRPC: defaultGRPCSettings, - HTTP: &HTTPConfig{ - ServerConfig: &confighttp.ServerConfig{ - Endpoint: "127.0.0.1:1122", - }, - }, - }, - }, - wantErr: true, - sink: nil, - }, { name: "no_http_or_grcp_config", cfg: &Config{ @@ -334,21 +304,6 @@ func TestCreateLogReceiver(t *testing.T) { wantStartErr: true, sink: consumertest.NewNop(), }, - { - name: "no_next_consumer", - cfg: &Config{ - Protocols: Protocols{ - GRPC: defaultGRPCSettings, - HTTP: &HTTPConfig{ - ServerConfig: &confighttp.ServerConfig{ - Endpoint: "127.0.0.1:1122", - }, - }, - }, - }, - wantErr: true, - sink: nil, - }, { name: "no_http_or_grcp_config", cfg: &Config{ diff --git a/receiver/otlpreceiver/otlp.go b/receiver/otlpreceiver/otlp.go index da94a6eca71..29f83765475 100644 --- a/receiver/otlpreceiver/otlp.go +++ b/receiver/otlpreceiver/otlp.go @@ -198,26 +198,14 @@ func (r *otlpReceiver) Shutdown(ctx context.Context) error { return err } -func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) error { - if tc == nil { - return component.ErrNilNextConsumer - } +func (r *otlpReceiver) registerTraceConsumer(tc consumer.Traces) { r.nextTraces = tc - return nil } -func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) error { - if mc == nil { - return component.ErrNilNextConsumer - } +func (r *otlpReceiver) registerMetricsConsumer(mc consumer.Metrics) { r.nextMetrics = mc - return nil } -func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) error { - if lc == nil { - return component.ErrNilNextConsumer - } +func (r *otlpReceiver) registerLogsConsumer(lc consumer.Logs) { r.nextLogs = lc - return nil } diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 11ae3222822..8ae1ac66843 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -788,9 +788,9 @@ func newReceiver(t *testing.T, settings component.TelemetrySettings, cfg *Config set.ID = id r, err := newOtlpReceiver(cfg, &set) require.NoError(t, err) - require.NoError(t, r.registerTraceConsumer(c)) - require.NoError(t, r.registerMetricsConsumer(c)) - require.NoError(t, r.registerLogsConsumer(c)) + r.registerTraceConsumer(c) + r.registerMetricsConsumer(c) + r.registerLogsConsumer(c) return r } diff --git a/receiver/receiver.go b/receiver/receiver.go index bb525a1e03c..3b0f0ac8371 100644 --- a/receiver/receiver.go +++ b/receiver/receiver.go @@ -5,6 +5,7 @@ package receiver // import "go.opentelemetry.io/collector/receiver" import ( "context" + "errors" "fmt" "go.uber.org/zap" @@ -13,6 +14,10 @@ import ( "go.opentelemetry.io/collector/consumer" ) +var ( + errNilNextConsumer = errors.New("nil next Consumer") +) + // Traces receiver receives traces. // Its purpose is to translate data from any format to the collector's internal trace format. // TracesReceiver feeds a consumer.Traces with data. @@ -60,7 +65,7 @@ type Factory interface { // CreateTracesReceiver creates a TracesReceiver based on this config. // If the receiver type does not support tracing or if the config is not valid - // an error will be returned instead. + // an error will be returned instead. `nextConsumer` is never nil. CreateTracesReceiver(ctx context.Context, set CreateSettings, cfg component.Config, nextConsumer consumer.Traces) (Traces, error) // TracesReceiverStability gets the stability level of the TracesReceiver. @@ -68,7 +73,7 @@ type Factory interface { // CreateMetricsReceiver creates a MetricsReceiver based on this config. // If the receiver type does not support metrics or if the config is not valid - // an error will be returned instead. + // an error will be returned instead. `nextConsumer` is never nil. CreateMetricsReceiver(ctx context.Context, set CreateSettings, cfg component.Config, nextConsumer consumer.Metrics) (Metrics, error) // MetricsReceiverStability gets the stability level of the MetricsReceiver. @@ -76,7 +81,7 @@ type Factory interface { // CreateLogsReceiver creates a LogsReceiver based on this config. // If the receiver type does not support the data type or if the config is not valid - // an error will be returned instead. + // an error will be returned instead. `nextConsumer` is never nil. CreateLogsReceiver(ctx context.Context, set CreateSettings, cfg component.Config, nextConsumer consumer.Logs) (Logs, error) // LogsReceiverStability gets the stability level of the LogsReceiver. @@ -236,6 +241,9 @@ func NewBuilder(cfgs map[component.ID]component.Config, factories map[component. // CreateTraces creates a Traces receiver based on the settings and config. func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next consumer.Traces) (Traces, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("receiver %q is not configured", set.ID) @@ -252,6 +260,9 @@ func (b *Builder) CreateTraces(ctx context.Context, set CreateSettings, next con // CreateMetrics creates a Metrics receiver based on the settings and config. func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next consumer.Metrics) (Metrics, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("receiver %q is not configured", set.ID) @@ -268,6 +279,9 @@ func (b *Builder) CreateMetrics(ctx context.Context, set CreateSettings, next co // CreateLogs creates a Logs receiver based on the settings and config. func (b *Builder) CreateLogs(ctx context.Context, set CreateSettings, next consumer.Logs) (Logs, error) { + if next == nil { + return nil, errNilNextConsumer + } cfg, existsCfg := b.cfgs[set.ID] if !existsCfg { return nil, fmt.Errorf("receiver %q is not configured", set.ID) diff --git a/receiver/receiver_test.go b/receiver/receiver_test.go index 910d562d476..00b93dba549 100644 --- a/receiver/receiver_test.go +++ b/receiver/receiver_test.go @@ -24,11 +24,11 @@ func TestNewFactory(t *testing.T) { func() component.Config { return &defaultCfg }) assert.EqualValues(t, testType, factory.Type()) assert.EqualValues(t, &defaultCfg, factory.CreateDefaultConfig()) - _, err := factory.CreateTracesReceiver(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err := factory.CreateTracesReceiver(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) - _, err = factory.CreateMetricsReceiver(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateMetricsReceiver(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) - _, err = factory.CreateLogsReceiver(context.Background(), CreateSettings{}, &defaultCfg, nil) + _, err = factory.CreateLogsReceiver(context.Background(), CreateSettings{}, &defaultCfg, consumertest.NewNop()) assert.Error(t, err) } @@ -110,27 +110,50 @@ func TestBuilder(t *testing.T) { require.NoError(t, err) testCases := []struct { - name string - id component.ID - err string + name string + id component.ID + err string + nextTraces consumer.Traces + nextLogs consumer.Logs + nextMetrics consumer.Metrics }{ { - name: "unknown", - id: component.MustNewID("unknown"), - err: "receiver factory not available for: \"unknown\"", + name: "unknown", + id: component.MustNewID("unknown"), + err: "receiver factory not available for: \"unknown\"", + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), + }, + { + name: "err", + id: component.MustNewID("err"), + err: "telemetry type is not supported", + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { - name: "err", - id: component.MustNewID("err"), - err: "telemetry type is not supported", + name: "all", + id: component.MustNewID("all"), + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { - name: "all", - id: component.MustNewID("all"), + name: "all/named", + id: component.MustNewIDWithName("all", "named"), + nextTraces: consumertest.NewNop(), + nextLogs: consumertest.NewNop(), + nextMetrics: consumertest.NewNop(), }, { - name: "all/named", - id: component.MustNewIDWithName("all", "named"), + name: "no next consumer", + id: component.MustNewID("unknown"), + err: "nil next Consumer", + nextTraces: nil, + nextLogs: nil, + nextMetrics: nil, }, } @@ -139,7 +162,7 @@ func TestBuilder(t *testing.T) { cfgs := map[component.ID]component.Config{tt.id: defaultCfg} b := NewBuilder(cfgs, factories) - te, err := b.CreateTraces(context.Background(), createSettings(tt.id), nil) + te, err := b.CreateTraces(context.Background(), createSettings(tt.id), tt.nextTraces) if tt.err != "" { assert.EqualError(t, err, tt.err) assert.Nil(t, te) @@ -148,7 +171,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, te) } - me, err := b.CreateMetrics(context.Background(), createSettings(tt.id), nil) + me, err := b.CreateMetrics(context.Background(), createSettings(tt.id), tt.nextMetrics) if tt.err != "" { assert.EqualError(t, err, tt.err) assert.Nil(t, me) @@ -157,7 +180,7 @@ func TestBuilder(t *testing.T) { assert.Equal(t, nopInstance, me) } - le, err := b.CreateLogs(context.Background(), createSettings(tt.id), nil) + le, err := b.CreateLogs(context.Background(), createSettings(tt.id), tt.nextLogs) if tt.err != "" { assert.EqualError(t, err, tt.err) assert.Nil(t, le) @@ -186,15 +209,15 @@ func TestBuilderMissingConfig(t *testing.T) { bErr := NewBuilder(map[component.ID]component.Config{}, factories) missingID := component.MustNewIDWithName("all", "missing") - te, err := bErr.CreateTraces(context.Background(), createSettings(missingID), nil) + te, err := bErr.CreateTraces(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "receiver \"all/missing\" is not configured") assert.Nil(t, te) - me, err := bErr.CreateMetrics(context.Background(), createSettings(missingID), nil) + me, err := bErr.CreateMetrics(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "receiver \"all/missing\" is not configured") assert.Nil(t, me) - le, err := bErr.CreateLogs(context.Background(), createSettings(missingID), nil) + le, err := bErr.CreateLogs(context.Background(), createSettings(missingID), consumertest.NewNop()) assert.EqualError(t, err, "receiver \"all/missing\" is not configured") assert.Nil(t, le) } diff --git a/receiver/scraperhelper/scrapercontroller.go b/receiver/scraperhelper/scrapercontroller.go index d2b7410da67..2f2896f6903 100644 --- a/receiver/scraperhelper/scrapercontroller.go +++ b/receiver/scraperhelper/scrapercontroller.go @@ -70,9 +70,6 @@ func NewScraperControllerReceiver( nextConsumer consumer.Metrics, options ...ScraperControllerOption, ) (component.Component, error) { - if nextConsumer == nil { - return nil, component.ErrNilNextConsumer - } if cfg.CollectionInterval <= 0 { return nil, errors.New("collection_interval must be a positive duration") diff --git a/receiver/scraperhelper/scrapercontroller_test.go b/receiver/scraperhelper/scrapercontroller_test.go index 8e8ceca1d94..1c41258ff18 100644 --- a/receiver/scraperhelper/scrapercontroller_test.go +++ b/receiver/scraperhelper/scrapercontroller_test.go @@ -17,7 +17,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" @@ -76,7 +75,6 @@ type metricsTestCase struct { scrapers int scraperControllerSettings *ControllerConfig - nilNextConsumer bool scrapeErr error expectedNewErr string expectScraped bool @@ -97,12 +95,6 @@ func TestScrapeController(t *testing.T) { scrapers: 2, expectScraped: true, }, - { - name: "AddMetricsScrapers_NilNextConsumerError", - scrapers: 2, - nilNextConsumer: true, - expectedNewErr: "nil next Consumer", - }, { name: "AddMetricsScrapersWithCollectionInterval_InvalidCollectionIntervalError", scrapers: 2, @@ -147,17 +139,13 @@ func TestScrapeController(t *testing.T) { tickerCh := make(chan time.Time) options = append(options, WithTickerChannel(tickerCh)) - var nextConsumer consumer.Metrics sink := new(consumertest.MetricsSink) - if !test.nilNextConsumer { - nextConsumer = sink - } cfg := newTestNoDelaySettings() if test.scraperControllerSettings != nil { cfg = test.scraperControllerSettings } - mr, err := NewScraperControllerReceiver(cfg, receiver.CreateSettings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, nextConsumer, options...) + mr, err := NewScraperControllerReceiver(cfg, receiver.CreateSettings{ID: receiverID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, sink, options...) if test.expectedNewErr != "" { assert.EqualError(t, err, test.expectedNewErr) return From 2b750e1453be2ad9482d16d90150a8c7f84ffe24 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 23 Feb 2024 13:44:46 -0800 Subject: [PATCH 2/2] Update component/component.go --- component/component.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/component/component.go b/component/component.go index 2408fa647b3..32d69f7eda0 100644 --- a/component/component.go +++ b/component/component.go @@ -11,7 +11,7 @@ import ( var ( // ErrNilNextConsumer can be returned by receiver, or processor Start factory funcs that create the Component if the // expected next Consumer is nil. - // Deprecated: [v0.95.0] The next consumer is now checked as part of the creation of the pipelines. + // Deprecated: [v0.96.0] The next consumer is now checked as part of the creation of the pipelines. // This error will be removed in a future release. ErrNilNextConsumer = errors.New("nil next Consumer")