From 4255836c67b8bd5528a3952de954c9ca3612b335 Mon Sep 17 00:00:00 2001 From: Chris Parkins Date: Thu, 7 Dec 2023 03:11:39 -0700 Subject: [PATCH] [kafkareceiver] Add Azure Resource Log Support (#28626) **Description:** Add support for the Azure Resource Log format so that Diagnostic logs from Azure can be consumed by a Kafka Receiver. **Link to tracking Issue:** #18210 **Testing:** Local testing to ensure that data is being converted correctly when Streaming from Azure Diagnostic Settings to Event Hubs and using the Kafka Receiver. **Documentation:** Added a bullet for the added azure format. --- ...receiver-add-azureresourcelog-support.yaml | 16 +++++ receiver/kafkareceiver/README.md | 1 + .../azureresourcelogs_unmarshaler.go | 32 +++++++++ .../azureresourcelogs_unmarshaler_test.go | 16 +++++ receiver/kafkareceiver/factory.go | 70 +++++++++++++++++-- receiver/kafkareceiver/factory_test.go | 28 +++++++- receiver/kafkareceiver/go.mod | 7 +- receiver/kafkareceiver/go.sum | 8 ++- receiver/kafkareceiver/kafka_receiver.go | 48 +++---------- receiver/kafkareceiver/kafka_receiver_test.go | 64 +++++++---------- receiver/kafkareceiver/unmarshaler.go | 13 ++-- receiver/kafkareceiver/unmarshaler_test.go | 4 +- 12 files changed, 213 insertions(+), 94 deletions(-) create mode 100644 .chloggen/kafka-receiver-add-azureresourcelog-support.yaml create mode 100644 receiver/kafkareceiver/azureresourcelogs_unmarshaler.go create mode 100644 receiver/kafkareceiver/azureresourcelogs_unmarshaler_test.go diff --git a/.chloggen/kafka-receiver-add-azureresourcelog-support.yaml b/.chloggen/kafka-receiver-add-azureresourcelog-support.yaml new file mode 100644 index 0000000000000..d73ae5b3d2b65 --- /dev/null +++ b/.chloggen/kafka-receiver-add-azureresourcelog-support.yaml @@ -0,0 +1,16 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: kafkareceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add the ability to consume logs from Azure Diagnostic Settings streamed through Event Hubs using the Kafka API. + +# One or more tracking issues related to the change +issues: [18210] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: diff --git a/receiver/kafkareceiver/README.md b/receiver/kafkareceiver/README.md index bd970d5caaf16..9d622b50976ac 100644 --- a/receiver/kafkareceiver/README.md +++ b/receiver/kafkareceiver/README.md @@ -42,6 +42,7 @@ The following settings can be optionally configured: - `raw`: (logs only) the payload's bytes are inserted as the body of a log record. - `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior. - `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record. + - `azure_resource_logs`: (logs only) the payload is converted from Azure Resource Logs format to OTel format. - `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from - `client_id` (default = otel-collector): The consumer client ID that receiver will use - `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`. diff --git a/receiver/kafkareceiver/azureresourcelogs_unmarshaler.go b/receiver/kafkareceiver/azureresourcelogs_unmarshaler.go new file mode 100644 index 0000000000000..b042bc052390d --- /dev/null +++ b/receiver/kafkareceiver/azureresourcelogs_unmarshaler.go @@ -0,0 +1,32 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver" + +import ( + "go.opentelemetry.io/collector/pdata/plog" + "go.uber.org/zap" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure" +) + +type azureResourceLogsUnmarshaler struct { + unmarshaler *azure.ResourceLogsUnmarshaler +} + +func newAzureResourceLogsUnmarshaler(version string, logger *zap.Logger) LogsUnmarshaler { + return azureResourceLogsUnmarshaler{ + unmarshaler: &azure.ResourceLogsUnmarshaler{ + Version: version, + Logger: logger, + }, + } +} + +func (r azureResourceLogsUnmarshaler) Unmarshal(buf []byte) (plog.Logs, error) { + return r.unmarshaler.UnmarshalLogs(buf) +} + +func (r azureResourceLogsUnmarshaler) Encoding() string { + return "azure_resource_logs" +} diff --git a/receiver/kafkareceiver/azureresourcelogs_unmarshaler_test.go b/receiver/kafkareceiver/azureresourcelogs_unmarshaler_test.go new file mode 100644 index 0000000000000..371a65aa30463 --- /dev/null +++ b/receiver/kafkareceiver/azureresourcelogs_unmarshaler_test.go @@ -0,0 +1,16 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package kafkareceiver + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" +) + +func TestNewAzureResourceLogsUnmarshaler(t *testing.T) { + um := newAzureResourceLogsUnmarshaler("Test Version", zap.NewNop()) + assert.Equal(t, "azure_resource_logs", um.Encoding()) +} diff --git a/receiver/kafkareceiver/factory.go b/receiver/kafkareceiver/factory.go index fab1bcb8e345b..91737264e182d 100644 --- a/receiver/kafkareceiver/factory.go +++ b/receiver/kafkareceiver/factory.go @@ -5,6 +5,8 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" + "fmt" + "strings" "time" "go.opencensus.io/stats/view" @@ -37,6 +39,8 @@ const ( defaultAutoCommitInterval = 1 * time.Second ) +var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") + // FactoryOption applies changes to kafkaExporterFactory. type FactoryOption func(factory *kafkaReceiverFactory) @@ -72,9 +76,9 @@ func NewFactory(options ...FactoryOption) receiver.Factory { _ = view.Register(metricViews()...) f := &kafkaReceiverFactory{ - tracesUnmarshalers: defaultTracesUnmarshalers(), - metricsUnmarshalers: defaultMetricsUnmarshalers(), - logsUnmarshalers: defaultLogsUnmarshalers(), + tracesUnmarshalers: map[string]TracesUnmarshaler{}, + metricsUnmarshalers: map[string]MetricsUnmarshaler{}, + logsUnmarshalers: map[string]LogsUnmarshaler{}, } for _, o := range options { o(f) @@ -129,8 +133,17 @@ func (f *kafkaReceiverFactory) createTracesReceiver( cfg component.Config, nextConsumer consumer.Traces, ) (receiver.Traces, error) { + for encoding, unmarshal := range defaultTracesUnmarshalers() { + f.tracesUnmarshalers[encoding] = unmarshal + } + c := cfg.(*Config) - r, err := newTracesReceiver(*c, set, f.tracesUnmarshalers, nextConsumer) + unmarshaler := f.tracesUnmarshalers[c.Encoding] + if unmarshaler == nil { + return nil, errUnrecognizedEncoding + } + + r, err := newTracesReceiver(*c, set, unmarshaler, nextConsumer) if err != nil { return nil, err } @@ -143,8 +156,17 @@ func (f *kafkaReceiverFactory) createMetricsReceiver( cfg component.Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { + for encoding, unmarshal := range defaultMetricsUnmarshalers() { + f.metricsUnmarshalers[encoding] = unmarshal + } + c := cfg.(*Config) - r, err := newMetricsReceiver(*c, set, f.metricsUnmarshalers, nextConsumer) + unmarshaler := f.metricsUnmarshalers[c.Encoding] + if unmarshaler == nil { + return nil, errUnrecognizedEncoding + } + + r, err := newMetricsReceiver(*c, set, unmarshaler, nextConsumer) if err != nil { return nil, err } @@ -157,10 +179,46 @@ func (f *kafkaReceiverFactory) createLogsReceiver( cfg component.Config, nextConsumer consumer.Logs, ) (receiver.Logs, error) { + for encoding, unmarshaler := range defaultLogsUnmarshalers(set.BuildInfo.Version, set.Logger) { + f.logsUnmarshalers[encoding] = unmarshaler + } + c := cfg.(*Config) - r, err := newLogsReceiver(*c, set, f.logsUnmarshalers, nextConsumer) + unmarshaler, err := getLogsUnmarshaler(c.Encoding, f.logsUnmarshalers) + if err != nil { + return nil, err + } + + r, err := newLogsReceiver(*c, set, unmarshaler, nextConsumer) if err != nil { return nil, err } return r, nil } + +func getLogsUnmarshaler(encoding string, unmarshalers map[string]LogsUnmarshaler) (LogsUnmarshaler, error) { + var enc string + unmarshaler, ok := unmarshalers[encoding] + if !ok { + split := strings.SplitN(encoding, "_", 2) + prefix := split[0] + if len(split) > 1 { + enc = split[1] + } + unmarshaler, ok = unmarshalers[prefix].(LogsUnmarshalerWithEnc) + if !ok { + return nil, errUnrecognizedEncoding + } + } + + if unmarshalerWithEnc, ok := unmarshaler.(LogsUnmarshalerWithEnc); ok { + // This should be called even when enc is an empty string to initialize the encoding. + unmarshaler, err := unmarshalerWithEnc.WithEnc(enc) + if err != nil { + return nil, err + } + return unmarshaler, nil + } + + return unmarshaler, nil +} diff --git a/receiver/kafkareceiver/factory_test.go b/receiver/kafkareceiver/factory_test.go index 8dbcff101f967..25d999886b900 100644 --- a/receiver/kafkareceiver/factory_test.go +++ b/receiver/kafkareceiver/factory_test.go @@ -5,6 +5,7 @@ package kafkareceiver import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" @@ -14,6 +15,7 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receivertest" + "go.uber.org/zap" ) func TestCreateDefaultConfig(t *testing.T) { @@ -119,7 +121,7 @@ func TestCreateLogsReceiver(t *testing.T) { cfg := createDefaultConfig().(*Config) cfg.Brokers = []string{"invalid:9092"} cfg.ProtocolVersion = "2.0.0" - f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()} + f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())} r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) // no available broker require.Error(t, err) @@ -131,12 +133,34 @@ func TestCreateLogsReceiver_error(t *testing.T) { cfg.ProtocolVersion = "2.0.0" // disable contacting broker at startup cfg.Metadata.Full = false - f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()} + f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())} r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil) require.NoError(t, err) assert.NotNil(t, r) } +func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) { + tests := []struct { + name string + encoding string + }{ + { + name: "text encoding has typo", + encoding: "text_uft-8", + }, + { + name: "text encoding is a random string", + encoding: "text_vnbqgoba156", + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers("Test Version", zap.NewNop())) + assert.ErrorContains(t, err, fmt.Sprintf("unsupported encoding '%v'", test.encoding[5:])) + }) + } +} + func TestWithLogsUnmarshalers(t *testing.T) { unmarshaler := &customLogsUnmarshaler{} f := NewFactory(withLogsUnmarshalers(unmarshaler)) diff --git a/receiver/kafkareceiver/go.mod b/receiver/kafkareceiver/go.mod index 5430c4a10dc1c..b1a8fa6ab90f7 100644 --- a/receiver/kafkareceiver/go.mod +++ b/receiver/kafkareceiver/go.mod @@ -11,6 +11,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.90.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.90.1 github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.90.1 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.90.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.90.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.90.1 github.com/openzipkin/zipkin-go v0.4.2 @@ -59,6 +60,7 @@ require ( github.com/pierrec/lz4/v4 v4.1.18 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + github.com/relvacode/iso8601 v1.3.0 // indirect github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect @@ -76,10 +78,11 @@ require ( go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.15.0 // indirect + golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect golang.org/x/net v0.18.0 // indirect golang.org/x/sys v0.15.0 // indirect golang.org/x/text v0.14.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/grpc v1.59.0 // indirect google.golang.org/protobuf v1.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect @@ -106,3 +109,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden + +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure => ../../pkg/translator/azure diff --git a/receiver/kafkareceiver/go.sum b/receiver/kafkareceiver/go.sum index 2fa3a01bef072..9d8b2ae02eace 100644 --- a/receiver/kafkareceiver/go.sum +++ b/receiver/kafkareceiver/go.sum @@ -141,6 +141,8 @@ github.com/prometheus/procfs v0.11.1 h1:xRC8Iq1yyca5ypa9n1EZnWZkt7dwcoRPQwX/5gwa github.com/prometheus/statsd_exporter v0.22.7 h1:7Pji/i2GuhK6Lu7DHrtTkFmNBCudCPT1pX2CziuyQR0= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/relvacode/iso8601 v1.3.0 h1:HguUjsGpIMh/zsTczGN3DVJFxTU/GX+MMmzcKoMO7ko= +github.com/relvacode/iso8601 v1.3.0/go.mod h1:FlNp+jz+TXpyRqgmM7tnzHHzBnz776kmAH2h3sZCn0I= github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= @@ -218,6 +220,8 @@ golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58 golang.org/x/crypto v0.15.0 h1:frVn1TEaCEaZcn3Tmd7Y2b5KKPaZ+I32Q2OA3kYp5TA= golang.org/x/crypto v0.15.0/go.mod h1:4ChreQoLWfG3xLDer1WdlH5NdlQ3+mwnQq1YTKY+72g= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/exp v0.0.0-20231127185646-65229373498e h1:Gvh4YaCaXNs6dKTlfgismwWZKyjVZXwOPfIyUaqU3No= +golang.org/x/exp v0.0.0-20231127185646-65229373498e/go.mod h1:iRJReGqOEeBhDZGkGbynYwcHlctCvnjTYIamk7uXpHI= golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= @@ -287,8 +291,8 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d h1:uvYuEyMHKNt+lT4K3bN6fGswmK8qSvcreM3BwjDh+y4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d/go.mod h1:+Bk1OCOj40wS2hwAMA+aCW9ypzm63QTBBHp6lQ3p+9M= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b h1:ZlWIi1wSK56/8hn4QcBp/j9M7Gt3U/3hZw3mC7vDICo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b/go.mod h1:swOH3j0KzcDDgGUWr+SNpyTen5YrXjS3eyPzFYKc6lc= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/receiver/kafkareceiver/kafka_receiver.go b/receiver/kafkareceiver/kafka_receiver.go index 9758a5e877435..8ae63f8c9e261 100644 --- a/receiver/kafkareceiver/kafka_receiver.go +++ b/receiver/kafkareceiver/kafka_receiver.go @@ -6,7 +6,6 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect import ( "context" "fmt" - "strings" "sync" "github.com/IBM/sarama" @@ -25,7 +24,6 @@ const ( transport = "kafka" ) -var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding") var errInvalidInitialOffset = fmt.Errorf("invalid initial offset") // kafkaTracesConsumer uses sarama to consume and handle messages from kafka. @@ -80,8 +78,7 @@ var _ receiver.Traces = (*kafkaTracesConsumer)(nil) var _ receiver.Metrics = (*kafkaMetricsConsumer)(nil) var _ receiver.Logs = (*kafkaLogsConsumer)(nil) -func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] +func newTracesReceiver(config Config, set receiver.CreateSettings, unmarshaler TracesUnmarshaler, nextConsumer consumer.Traces) (*kafkaTracesConsumer, error) { if unmarshaler == nil { return nil, errUnrecognizedEncoding } @@ -185,8 +182,7 @@ func (c *kafkaTracesConsumer) Shutdown(context.Context) error { return c.consumerGroup.Close() } -func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) { - unmarshaler := unmarshalers[config.Encoding] +func newMetricsReceiver(config Config, set receiver.CreateSettings, unmarshaler MetricsUnmarshaler, nextConsumer consumer.Metrics) (*kafkaMetricsConsumer, error) { if unmarshaler == nil { return nil, errUnrecognizedEncoding } @@ -287,7 +283,10 @@ func (c *kafkaMetricsConsumer) Shutdown(context.Context) error { return c.consumerGroup.Close() } -func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) { +func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshaler LogsUnmarshaler, nextConsumer consumer.Logs) (*kafkaLogsConsumer, error) { + if unmarshaler == nil { + return nil, errUnrecognizedEncoding + } c := sarama.NewConfig() c.ClientID = config.ClientID c.Metadata.Full = config.Metadata.Full @@ -300,19 +299,15 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma } else { return nil, err } - unmarshaler, err := getLogsUnmarshaler(config.Encoding, unmarshalers) - if err != nil { - return nil, err - } if config.ProtocolVersion != "" { var version sarama.KafkaVersion - version, err = sarama.ParseKafkaVersion(config.ProtocolVersion) + version, err := sarama.ParseKafkaVersion(config.ProtocolVersion) if err != nil { return nil, err } c.Version = version } - if err = kafka.ConfigureAuthentication(config.Authentication, c); err != nil { + if err := kafka.ConfigureAuthentication(config.Authentication, c); err != nil { return nil, err } client, err := sarama.NewConsumerGroup(config.Brokers, config.GroupID, c) @@ -332,33 +327,6 @@ func newLogsReceiver(config Config, set receiver.CreateSettings, unmarshalers ma }, nil } -func getLogsUnmarshaler(encoding string, unmarshalers map[string]LogsUnmarshaler) (LogsUnmarshaler, error) { - var enc string - unmarshaler, ok := unmarshalers[encoding] - if !ok { - split := strings.SplitN(encoding, "_", 2) - prefix := split[0] - if len(split) > 1 { - enc = split[1] - } - unmarshaler, ok = unmarshalers[prefix].(LogsUnmarshalerWithEnc) - if !ok { - return nil, errUnrecognizedEncoding - } - } - - if unmarshalerWithEnc, ok := unmarshaler.(LogsUnmarshalerWithEnc); ok { - // This should be called even when enc is an empty string to initialize the encoding. - unmarshaler, err := unmarshalerWithEnc.WithEnc(enc) - if err != nil { - return nil, err - } - return unmarshaler, nil - } - - return unmarshaler, nil -} - func (c *kafkaLogsConsumer) Start(_ context.Context, host component.Host) error { ctx, cancel := context.WithCancel(context.Background()) c.cancelConsumeLoop = cancel diff --git a/receiver/kafkareceiver/kafka_receiver_test.go b/receiver/kafkareceiver/kafka_receiver_test.go index ea29e0c365a6e..be46036d4ff7f 100644 --- a/receiver/kafkareceiver/kafka_receiver_test.go +++ b/receiver/kafkareceiver/kafka_receiver_test.go @@ -6,7 +6,6 @@ package kafkareceiver import ( "context" "errors" - "fmt" "sync" "testing" "time" @@ -38,7 +37,8 @@ func TestNewTracesReceiver_version_err(t *testing.T) { Encoding: defaultEncoding, ProtocolVersion: "none", } - r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultTracesUnmarshalers()[c.Encoding] + r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, r) } @@ -47,7 +47,8 @@ func TestNewTracesReceiver_encoding_err(t *testing.T) { c := Config{ Encoding: "foo", } - r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultTracesUnmarshalers()[c.Encoding] + r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) @@ -68,7 +69,8 @@ func TestNewTracesReceiver_err_auth_type(t *testing.T) { Full: false, }, } - r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultTracesUnmarshalers()[c.Encoding] + r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") assert.Nil(t, r) @@ -79,7 +81,8 @@ func TestNewTracesReceiver_initial_offset_err(t *testing.T) { InitialOffset: "foo", Encoding: defaultEncoding, } - r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), defaultTracesUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultTracesUnmarshalers()[c.Encoding] + r, err := newTracesReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errInvalidInitialOffset.Error()) @@ -299,7 +302,8 @@ func TestNewMetricsReceiver_version_err(t *testing.T) { Encoding: defaultEncoding, ProtocolVersion: "none", } - r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] + r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, r) } @@ -308,7 +312,8 @@ func TestNewMetricsReceiver_encoding_err(t *testing.T) { c := Config{ Encoding: "foo", } - r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] + r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) @@ -329,7 +334,8 @@ func TestNewMetricsExporter_err_auth_type(t *testing.T) { Full: false, }, } - r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] + r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") assert.Nil(t, r) @@ -340,7 +346,8 @@ func TestNewMetricsReceiver_initial_offset_err(t *testing.T) { InitialOffset: "foo", Encoding: defaultEncoding, } - r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), defaultMetricsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultMetricsUnmarshalers()[c.Encoding] + r, err := newMetricsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errInvalidInitialOffset.Error()) @@ -558,7 +565,8 @@ func TestNewLogsReceiver_version_err(t *testing.T) { Encoding: defaultEncoding, ProtocolVersion: "none", } - r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] + r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) assert.Error(t, err) assert.Nil(t, r) } @@ -567,7 +575,8 @@ func TestNewLogsReceiver_encoding_err(t *testing.T) { c := Config{ Encoding: "foo", } - r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] + r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errUnrecognizedEncoding.Error()) @@ -588,7 +597,8 @@ func TestNewLogsExporter_err_auth_type(t *testing.T) { Full: false, }, } - r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] + r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) assert.Error(t, err) assert.Contains(t, err.Error(), "failed to load TLS config") assert.Nil(t, r) @@ -599,7 +609,8 @@ func TestNewLogsReceiver_initial_offset_err(t *testing.T) { InitialOffset: "foo", Encoding: defaultEncoding, } - r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[c.Encoding] + r, err := newLogsReceiver(c, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) require.Error(t, err) assert.Nil(t, r) assert.EqualError(t, err, errInvalidInitialOffset.Error()) @@ -917,39 +928,18 @@ func TestGetLogsUnmarshaler_encoding_text(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { - _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers()) + _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers("Test Version", zap.NewNop())) assert.NoError(t, err) }) } } -func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) { - tests := []struct { - name string - encoding string - }{ - { - name: "text encoding has typo", - encoding: "text_uft-8", - }, - { - name: "text encoding is a random string", - encoding: "text_vnbqgoba156", - }, - } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - _, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers()) - assert.ErrorContains(t, err, fmt.Sprintf("unsupported encoding '%v'", test.encoding[5:])) - }) - } -} - func TestCreateLogsReceiver_encoding_text_error(t *testing.T) { cfg := Config{ Encoding: "text_uft-8", } - _, err := newLogsReceiver(cfg, receivertest.NewNopCreateSettings(), defaultLogsUnmarshalers(), consumertest.NewNop()) + unmarshaler := defaultLogsUnmarshalers("Test Version", zap.NewNop())[cfg.Encoding] + _, err := newLogsReceiver(cfg, receivertest.NewNopCreateSettings(), unmarshaler, consumertest.NewNop()) // encoding error comes first assert.Error(t, err, "unsupported encoding") } diff --git a/receiver/kafkareceiver/unmarshaler.go b/receiver/kafkareceiver/unmarshaler.go index a1416f53ceede..bf44be7b496ee 100644 --- a/receiver/kafkareceiver/unmarshaler.go +++ b/receiver/kafkareceiver/unmarshaler.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" + "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv1" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin/zipkinv2" @@ -71,15 +72,17 @@ func defaultMetricsUnmarshalers() map[string]MetricsUnmarshaler { } } -func defaultLogsUnmarshalers() map[string]LogsUnmarshaler { +func defaultLogsUnmarshalers(version string, logger *zap.Logger) map[string]LogsUnmarshaler { + azureResourceLogs := newAzureResourceLogsUnmarshaler(version, logger) otlpPb := newPdataLogsUnmarshaler(&plog.ProtoUnmarshaler{}, defaultEncoding) raw := newRawLogsUnmarshaler() text := newTextLogsUnmarshaler() json := newJSONLogsUnmarshaler() return map[string]LogsUnmarshaler{ - otlpPb.Encoding(): otlpPb, - raw.Encoding(): raw, - text.Encoding(): text, - json.Encoding(): json, + azureResourceLogs.Encoding(): azureResourceLogs, + otlpPb.Encoding(): otlpPb, + raw.Encoding(): raw, + text.Encoding(): text, + json.Encoding(): json, } } diff --git a/receiver/kafkareceiver/unmarshaler_test.go b/receiver/kafkareceiver/unmarshaler_test.go index 1e0775db04ba0..fd1f998ee0a7d 100644 --- a/receiver/kafkareceiver/unmarshaler_test.go +++ b/receiver/kafkareceiver/unmarshaler_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" ) func TestDefaultTracesUnMarshaler(t *testing.T) { @@ -51,8 +52,9 @@ func TestDefaultLogsUnMarshaler(t *testing.T) { "raw", "text", "json", + "azure_resource_logs", } - marshalers := defaultLogsUnmarshalers() + marshalers := defaultLogsUnmarshalers("Test Version", zap.NewNop()) assert.Equal(t, len(expectedEncodings), len(marshalers)) for _, e := range expectedEncodings { t.Run(e, func(t *testing.T) {