Skip to content

Commit

Permalink
Impelement partitioning for OTLP logs
Browse files Browse the repository at this point in the history
  • Loading branch information
epchris committed Jun 18, 2024
1 parent 744141a commit 9098914
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 9 deletions.
1 change: 1 addition & 0 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The following settings can be optionally configured:
- `raw`: if the log record body is a byte array, it is sent as is. Otherwise, it is serialized to JSON. Resource and record attributes are discarded.
- `partition_traces_by_id` (default = false): configures the exporter to include the trace ID as the message key in trace messages sent to kafka. *Please note:* this setting does not have any effect on Jaeger encoding exporters since Jaeger exporters include trace ID as the message key by default.
- `partition_metrics_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in metric messages sent to kafka.
- `partition_logs_by_resource_attributes` (default = false) configures the exporter to include the hash of sorted resource attributes as the message partitioning key in log messages sent to kafka.
- `auth`
- `plain_text`
- `username`: The username to use.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type Config struct {

PartitionMetricsByResourceAttributes bool `mapstructure:"partition_metrics_by_resource_attributes"`

PartitionLogsByResourceAttributes bool `mapstructure:"partition_logs_by_resource_attributes"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Metadata Metadata `mapstructure:"metadata"`
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
Expand Down Expand Up @@ -114,6 +115,7 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
Authentication: kafka.Authentication{
Expand Down Expand Up @@ -168,6 +170,7 @@ func TestLoadConfig(t *testing.T) {
Encoding: "otlp_proto",
PartitionTracesByID: true,
PartitionMetricsByResourceAttributes: true,
PartitionLogsByResourceAttributes: true,
Brokers: []string{"foo:123", "bar:456"},
ClientID: "test_client_id",
ResolveCanonicalBootstrapServersOnly: true,
Expand Down
3 changes: 3 additions & 0 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ const (
defaultFluxMaxMessages = 0
// partitioning metrics by resource attributes is disabled by default
defaultPartitionMetricsByResourceAttributesEnabled = false
// partitioning logs by resource attributes is disabled by default
defaultPartitionLogsByResourceAttributesEnabled = false
)

// FactoryOption applies changes to kafkaExporterFactory.
Expand Down Expand Up @@ -102,6 +104,7 @@ func createDefaultConfig() component.Config {
Topic: "",
Encoding: defaultEncoding,
PartitionMetricsByResourceAttributes: defaultPartitionMetricsByResourceAttributesEnabled,
PartitionLogsByResourceAttributes: defaultPartitionLogsByResourceAttributesEnabled,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down
62 changes: 62 additions & 0 deletions exporter/kafkaexporter/marshaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/collector/pdata/ptrace"
conventions "go.opentelemetry.io/collector/semconv/v1.6.1"
Expand Down Expand Up @@ -135,6 +136,67 @@ func TestOTLPMetricsJsonMarshaling(t *testing.T) {
}
}

func TestOTLPLogsJsonMarshaling(t *testing.T) {
tests := []struct {
name string
keyEnabled bool
messagePartitionKeys []sarama.Encoder
}{
{
name: "partitioning_disabled",
keyEnabled: false,
messagePartitionKeys: []sarama.Encoder{nil},
},
{
name: "partitioning_enabled",
keyEnabled: true,
messagePartitionKeys: []sarama.Encoder{
sarama.ByteEncoder{0x62, 0x7f, 0x20, 0x34, 0x85, 0x49, 0x55, 0x2e, 0xfa, 0x93, 0xae, 0xd7, 0xde, 0x91, 0xd7, 0x16},
sarama.ByteEncoder{0x75, 0x6b, 0xb4, 0xd6, 0xff, 0xeb, 0x92, 0x22, 0xa, 0x68, 0x65, 0x48, 0xe0, 0xd3, 0x94, 0x44},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
log := plog.NewLogs()
r := pcommon.NewResource()
r.Attributes().PutStr("service.name", "my_service_name")
r.Attributes().PutStr("service.instance.id", "kek_x_1")
r.CopyTo(log.ResourceLogs().AppendEmpty().Resource())

rl := log.ResourceLogs().At(0)
rl.SetSchemaUrl(conventions.SchemaURL)

sl := rl.ScopeLogs().AppendEmpty()
plog.NewScopeLogs()
l := sl.LogRecords().AppendEmpty()
l.Body().SetStr("Hello World")
l.Attributes().PutStr("attribute", "value")

r1 := pcommon.NewResource()
r1.Attributes().PutStr("service.instance.id", "kek_x_2")
r1.Attributes().PutStr("service.name", "my_service_name")
r1.CopyTo(log.ResourceLogs().AppendEmpty().Resource())

standardMarshaler := logsMarshalers()["otlp_json"]
keyableMarshaler, ok := standardMarshaler.(KeyableLogsMarshaler)
require.True(t, ok, "Must be a KeyableLogsMarshaler")
if tt.keyEnabled {
keyableMarshaler.Key()
}

msgs, err := standardMarshaler.Marshal(log, "KafkaTopicX")
require.NoError(t, err, "Must have marshaled the data without error")

require.Len(t, msgs, len(tt.messagePartitionKeys), "Number of messages must be %d, but was %d", len(tt.messagePartitionKeys), len(msgs))

for i := 0; i < len(tt.messagePartitionKeys); i++ {
require.Equal(t, tt.messagePartitionKeys[i], msgs[i].Key, "message %d has incorrect key", i)
}
})
}
}

func TestOTLPTracesJsonMarshaling(t *testing.T) {
t.Parallel()

Expand Down
56 changes: 47 additions & 9 deletions exporter/kafkaexporter/pdata_marshaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,30 +14,68 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
)

// KeyableLogsMarshaler is an extension of the LgosMarshaler interface intended to provide partition key capabilities
// for log messages
type KeyableLogsMarshaler interface {
LogsMarshaler
Key()
}

type pdataLogsMarshaler struct {
marshaler plog.Marshaler
encoding string
keyed bool
}

// Key configures the pdataLogsMarshaler to set the message key on the kafka messages
func (p *pdataLogsMarshaler) Key() {
p.keyed = true
}

func (p pdataLogsMarshaler) Marshal(ld plog.Logs, topic string) ([]*sarama.ProducerMessage, error) {
bts, err := p.marshaler.MarshalLogs(ld)
if err != nil {
return nil, err
}
return []*sarama.ProducerMessage{
{
var msgs []*sarama.ProducerMessage

if p.keyed {
logs := ld.ResourceLogs()

for i := 0; i < logs.Len(); i++ {
resourceLogs := logs.At(i)
var hash = pdatautil.MapHash(resourceLogs.Resource().Attributes())

newLogs := plog.NewLogs()
resourceLogs.CopyTo(newLogs.ResourceLogs().AppendEmpty())

bts, err := p.marshaler.MarshalLogs(newLogs)
if err != nil {
return nil, err
}

msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
Key: sarama.ByteEncoder(hash[:]),
})
}
} else {
bts, err := p.marshaler.MarshalLogs(ld)
if err != nil {
return nil, err
}
msgs = append(msgs, &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(bts),
},
}, nil
})
}

return msgs, nil
}

func (p pdataLogsMarshaler) Encoding() string {
return p.encoding
}

func newPdataLogsMarshaler(marshaler plog.Marshaler, encoding string) LogsMarshaler {
return pdataLogsMarshaler{
return &pdataLogsMarshaler{
marshaler: marshaler,
encoding: encoding,
}
Expand Down
1 change: 1 addition & 0 deletions exporter/kafkaexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ kafka:
timeout: 10s
partition_traces_by_id: true
partition_metrics_by_resource_attributes: true
partition_logs_by_resource_attributes: true
auth:
plain_text:
username: jdoe
Expand Down

0 comments on commit 9098914

Please sign in to comment.