Skip to content

Commit

Permalink
[kafkareceiver] Add Azure Resource Log Support (#28626)
Browse files Browse the repository at this point in the history
**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
Add support for the Azure Resource Log format so that Diagnostic logs
from Azure can be consumed by a Kafka Receiver.

**Link to tracking Issue:** <Issue number if applicable>
#18210 

**Testing:** <Describe what testing was performed and which tests were
added.>
Local testing to ensure that data is being converted correctly when
Streaming from Azure Diagnostic Settings to Event Hubs and using the
Kafka Receiver.

**Documentation:** <Describe the documentation added.>
Added a bullet for the added azure format.
  • Loading branch information
cparkins committed Dec 7, 2023
1 parent cdfaa45 commit 4255836
Show file tree
Hide file tree
Showing 12 changed files with 213 additions and 94 deletions.
16 changes: 16 additions & 0 deletions .chloggen/kafka-receiver-add-azureresourcelog-support.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: kafkareceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add the ability to consume logs from Azure Diagnostic Settings streamed through Event Hubs using the Kafka API.

# One or more tracking issues related to the change
issues: [18210]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:
1 change: 1 addition & 0 deletions receiver/kafkareceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The following settings can be optionally configured:
- `raw`: (logs only) the payload's bytes are inserted as the body of a log record.
- `text`: (logs only) the payload are decoded as text and inserted as the body of a log record. By default, it uses UTF-8 to decode. You can use `text_<ENCODING>`, like `text_utf-8`, `text_shift_jis`, etc., to customize this behavior.
- `json`: (logs only) the payload is decoded as JSON and inserted as the body of a log record.
- `azure_resource_logs`: (logs only) the payload is converted from Azure Resource Logs format to OTel format.
- `group_id` (default = otel-collector): The consumer group that receiver will be consuming messages from
- `client_id` (default = otel-collector): The consumer client ID that receiver will use
- `initial_offset` (default = latest): The initial offset to use if no offset was previously committed. Must be `latest` or `earliest`.
Expand Down
32 changes: 32 additions & 0 deletions receiver/kafkareceiver/azureresourcelogs_unmarshaler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver"

import (
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure"
)

type azureResourceLogsUnmarshaler struct {
unmarshaler *azure.ResourceLogsUnmarshaler
}

func newAzureResourceLogsUnmarshaler(version string, logger *zap.Logger) LogsUnmarshaler {
return azureResourceLogsUnmarshaler{
unmarshaler: &azure.ResourceLogsUnmarshaler{
Version: version,
Logger: logger,
},
}
}

func (r azureResourceLogsUnmarshaler) Unmarshal(buf []byte) (plog.Logs, error) {
return r.unmarshaler.UnmarshalLogs(buf)
}

func (r azureResourceLogsUnmarshaler) Encoding() string {
return "azure_resource_logs"
}
16 changes: 16 additions & 0 deletions receiver/kafkareceiver/azureresourcelogs_unmarshaler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package kafkareceiver

import (
"testing"

"github.com/stretchr/testify/assert"
"go.uber.org/zap"
)

func TestNewAzureResourceLogsUnmarshaler(t *testing.T) {
um := newAzureResourceLogsUnmarshaler("Test Version", zap.NewNop())
assert.Equal(t, "azure_resource_logs", um.Encoding())
}
70 changes: 64 additions & 6 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ package kafkareceiver // import "github.com/open-telemetry/opentelemetry-collect

import (
"context"
"fmt"
"strings"
"time"

"go.opencensus.io/stats/view"
Expand Down Expand Up @@ -37,6 +39,8 @@ const (
defaultAutoCommitInterval = 1 * time.Second
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaReceiverFactory)

Expand Down Expand Up @@ -72,9 +76,9 @@ func NewFactory(options ...FactoryOption) receiver.Factory {
_ = view.Register(metricViews()...)

f := &kafkaReceiverFactory{
tracesUnmarshalers: defaultTracesUnmarshalers(),
metricsUnmarshalers: defaultMetricsUnmarshalers(),
logsUnmarshalers: defaultLogsUnmarshalers(),
tracesUnmarshalers: map[string]TracesUnmarshaler{},
metricsUnmarshalers: map[string]MetricsUnmarshaler{},
logsUnmarshalers: map[string]LogsUnmarshaler{},
}
for _, o := range options {
o(f)
Expand Down Expand Up @@ -129,8 +133,17 @@ func (f *kafkaReceiverFactory) createTracesReceiver(
cfg component.Config,
nextConsumer consumer.Traces,
) (receiver.Traces, error) {
for encoding, unmarshal := range defaultTracesUnmarshalers() {
f.tracesUnmarshalers[encoding] = unmarshal
}

c := cfg.(*Config)
r, err := newTracesReceiver(*c, set, f.tracesUnmarshalers, nextConsumer)
unmarshaler := f.tracesUnmarshalers[c.Encoding]
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}

r, err := newTracesReceiver(*c, set, unmarshaler, nextConsumer)
if err != nil {
return nil, err
}
Expand All @@ -143,8 +156,17 @@ func (f *kafkaReceiverFactory) createMetricsReceiver(
cfg component.Config,
nextConsumer consumer.Metrics,
) (receiver.Metrics, error) {
for encoding, unmarshal := range defaultMetricsUnmarshalers() {
f.metricsUnmarshalers[encoding] = unmarshal
}

c := cfg.(*Config)
r, err := newMetricsReceiver(*c, set, f.metricsUnmarshalers, nextConsumer)
unmarshaler := f.metricsUnmarshalers[c.Encoding]
if unmarshaler == nil {
return nil, errUnrecognizedEncoding
}

r, err := newMetricsReceiver(*c, set, unmarshaler, nextConsumer)
if err != nil {
return nil, err
}
Expand All @@ -157,10 +179,46 @@ func (f *kafkaReceiverFactory) createLogsReceiver(
cfg component.Config,
nextConsumer consumer.Logs,
) (receiver.Logs, error) {
for encoding, unmarshaler := range defaultLogsUnmarshalers(set.BuildInfo.Version, set.Logger) {
f.logsUnmarshalers[encoding] = unmarshaler
}

c := cfg.(*Config)
r, err := newLogsReceiver(*c, set, f.logsUnmarshalers, nextConsumer)
unmarshaler, err := getLogsUnmarshaler(c.Encoding, f.logsUnmarshalers)
if err != nil {
return nil, err
}

r, err := newLogsReceiver(*c, set, unmarshaler, nextConsumer)
if err != nil {
return nil, err
}
return r, nil
}

func getLogsUnmarshaler(encoding string, unmarshalers map[string]LogsUnmarshaler) (LogsUnmarshaler, error) {
var enc string
unmarshaler, ok := unmarshalers[encoding]
if !ok {
split := strings.SplitN(encoding, "_", 2)
prefix := split[0]
if len(split) > 1 {
enc = split[1]
}
unmarshaler, ok = unmarshalers[prefix].(LogsUnmarshalerWithEnc)
if !ok {
return nil, errUnrecognizedEncoding
}
}

if unmarshalerWithEnc, ok := unmarshaler.(LogsUnmarshalerWithEnc); ok {
// This should be called even when enc is an empty string to initialize the encoding.
unmarshaler, err := unmarshalerWithEnc.WithEnc(enc)
if err != nil {
return nil, err
}
return unmarshaler, nil
}

return unmarshaler, nil
}
28 changes: 26 additions & 2 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package kafkareceiver

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
Expand All @@ -14,6 +15,7 @@ import (
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
"go.opentelemetry.io/collector/receiver/receivertest"
"go.uber.org/zap"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand Down Expand Up @@ -119,7 +121,7 @@ func TestCreateLogsReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()}
f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())}
r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
// no available broker
require.Error(t, err)
Expand All @@ -131,12 +133,34 @@ func TestCreateLogsReceiver_error(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// disable contacting broker at startup
cfg.Metadata.Full = false
f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers()}
f := kafkaReceiverFactory{logsUnmarshalers: defaultLogsUnmarshalers("Test Version", zap.NewNop())}
r, err := f.createLogsReceiver(context.Background(), receivertest.NewNopCreateSettings(), cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}

func TestGetLogsUnmarshaler_encoding_text_error(t *testing.T) {
tests := []struct {
name string
encoding string
}{
{
name: "text encoding has typo",
encoding: "text_uft-8",
},
{
name: "text encoding is a random string",
encoding: "text_vnbqgoba156",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, err := getLogsUnmarshaler(test.encoding, defaultLogsUnmarshalers("Test Version", zap.NewNop()))
assert.ErrorContains(t, err, fmt.Sprintf("unsupported encoding '%v'", test.encoding[5:]))
})
}
}

func TestWithLogsUnmarshalers(t *testing.T) {
unmarshaler := &customLogsUnmarshaler{}
f := NewFactory(withLogsUnmarshalers(unmarshaler))
Expand Down
7 changes: 6 additions & 1 deletion receiver/kafkareceiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/open-telemetry/opentelemetry-collector-contrib/exporter/kafkaexporter v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/internal/kafka v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/jaeger v0.90.1
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/zipkin v0.90.1
github.com/openzipkin/zipkin-go v0.4.2
Expand Down Expand Up @@ -59,6 +60,7 @@ require (
github.com/pierrec/lz4/v4 v4.1.18 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/relvacode/iso8601 v1.3.0 // indirect
github.com/uber/jaeger-client-go v2.30.0+incompatible // indirect
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/xdg-go/pbkdf2 v1.0.0 // indirect
Expand All @@ -76,10 +78,11 @@ require (
go.uber.org/atomic v1.11.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.15.0 // indirect
golang.org/x/exp v0.0.0-20231127185646-65229373498e // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231016165738-49dd2c1f3d0b // indirect
google.golang.org/grpc v1.59.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand All @@ -106,3 +109,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil
replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest => ../../pkg/pdatatest

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden

replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/azure => ../../pkg/translator/azure
8 changes: 6 additions & 2 deletions receiver/kafkareceiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 4255836

Please sign in to comment.