Skip to content

Commit

Permalink
Make Kafka payload encoding configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Aug 19, 2020
1 parent 0079740 commit 3b849e3
Show file tree
Hide file tree
Showing 28 changed files with 895 additions and 105 deletions.
10 changes: 7 additions & 3 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

Kafka exporter exports traces to Kafka. This exporter uses a synchronous producer
that blocks and does not batch messages, therefore it should be used with batch and queued retry
processors for higher throughput and resiliency.
processors for higher throughput and resiliency. Message payload encoding is configurable.

Message payloads are serialized OTLP `ExportTraceServiceRequest`.

The following settings are required:
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
- `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
- `zipkin_thrift`: the payload is serialized into Zipkin Thrift spans.
- `zipkin_json`: the payload is serialized into Zipkin V2 JSON spans.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {
ProtocolVersion string `mapstructure:"protocol_version"`
// The name of the kafka topic to export to (default "otlp_spans")
Topic string `mapstructure:"topic"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Expand Down
5 changes: 3 additions & 2 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Brokers: []string{"foo:123", "bar:456"},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
Metadata: Metadata{
Full: false,
Retry: MetadataRetry{
Expand Down
8 changes: 5 additions & 3 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

const (
typeStr = "kafka"
defaultTopic = "otlp_spans"
defaultBroker = "localhost:9092"
typeStr = "kafka"
defaultTopic = "otlp_spans"
defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
// default from sarama.NewConfig()
Expand Down Expand Up @@ -57,6 +58,7 @@ func createDefaultConfig() configmodels.Exporter {
QueueSettings: qs,
Brokers: []string{defaultBroker},
Topic: defaultTopic,
Encoding: defaultEncoding,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand Down
105 changes: 105 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
jaegerproto "github.com/jaegertracing/jaeger/model"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func init() {
jaegerProtoMarshaller := &jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
}
RegisterMarshaller(jaegerProtoMarshaller)
jaegerJSONMarshaller := &jaegerMarshaller{
marshaller: jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
},
}
RegisterMarshaller(jaegerJSONMarshaller)
}

type jaegerMarshaller struct {
marshaller jaegerSpanMarshaller
}

var _ Marshaller = (*jaegerMarshaller)(nil)

func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
if err != nil {
return nil, err
}
var messages []Message
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
bts, err := j.marshaller.marshall(span)
// continue to process spans that can be serialized
if err != nil {
errs = append(errs, err)
continue
}
messages = append(messages, Message{Value: bts})
}
}
return messages, componenterror.CombineErrors(errs)
}

func (j jaegerMarshaller) Encoding() string {
return j.marshaller.encoding()
}

type jaegerSpanMarshaller interface {
marshall(span *jaegerproto.Span) ([]byte, error)
encoding() string
}

type jaegerProtoSpanMarshaller struct {
}

var _ jaegerSpanMarshaller = (*jaegerProtoSpanMarshaller)(nil)

func (p jaegerProtoSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
return span.Marshal()
}

func (p jaegerProtoSpanMarshaller) encoding() string {
return "jaeger_proto"
}

type jaegerJSONSpanMarshaller struct {
pbMarshaller *jsonpb.Marshaler
}

var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil)

func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
out := new(bytes.Buffer)
err := p.pbMarshaller.Marshal(out, span)
return out.Bytes(), err
}

func (p jaegerJSONSpanMarshaller) encoding() string {
return "jaeger_json"
}
95 changes: 95 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"bytes"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func TestJaegerMarshaller(t *testing.T) {
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo")
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(10))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetEndTime(pdata.TimestampUnixNano(20))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
require.NoError(t, err)

batches[0].Spans[0].Process = batches[0].Process
jaegerProtoBytes, err := batches[0].Spans[0].Marshal()
require.NoError(t, err)
require.NotNil(t, jaegerProtoBytes)

jsonMarshaller := &jsonpb.Marshaler{}
jsonByteBuffer := new(bytes.Buffer)
require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0]))

tests := []struct {
unmarshaller Marshaller
encoding string
messages []Message
}{
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
},
encoding: "jaeger_proto",
messages: []Message{{Value: jaegerProtoBytes}},
},
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
},
},
encoding: "jaeger_json",
messages: []Message{{Value: jsonByteBuffer.Bytes()}},
},
}
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
messages, err := test.unmarshaller.Marshal(td)
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.encoding, test.unmarshaller.Encoding())
})
}
}

func TestJaegerMarshaller_error_covert_traceID(t *testing.T) {
marshaller := jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
}
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
// fails in zero traceID
messages, err := marshaller.Marshal(td)
require.Error(t, err)
assert.Nil(t, messages)
}
31 changes: 23 additions & 8 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package kafkaexporter

import (
"context"
"fmt"

"github.com/Shopify/sarama"
"go.uber.org/zap"
Expand All @@ -25,16 +26,23 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

var errUnrecognizedEncoding = fmt.Errorf("unrecognized encoding")

// kafkaProducer uses sarama to produce messages to Kafka.
type kafkaProducer struct {
producer sarama.SyncProducer
topic string
marshaller marshaller
marshaller Marshaller
logger *zap.Logger
}

// newExporter creates Kafka exporter.
func newExporter(config Config, params component.ExporterCreateParams) (*kafkaProducer, error) {
marshaller := GetMarshaller(config.Encoding)
if marshaller == nil {
return nil, errUnrecognizedEncoding
}

c := sarama.NewConfig()
// These setting are required by the sarama.SyncProducer implementation.
c.Producer.Return.Successes = true
Expand All @@ -60,21 +68,17 @@ func newExporter(config Config, params component.ExporterCreateParams) (*kafkaPr
return &kafkaProducer{
producer: producer,
topic: config.Topic,
marshaller: &protoMarshaller{},
marshaller: marshaller,
logger: params.Logger,
}, nil
}

func (e *kafkaProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int, error) {
bts, err := e.marshaller.Marshal(td)
messages, err := e.marshaller.Marshal(td)
if err != nil {
return td.SpanCount(), consumererror.Permanent(err)
}
m := &sarama.ProducerMessage{
Topic: e.topic,
Value: sarama.ByteEncoder(bts),
}
_, _, err = e.producer.SendMessage(m)
err = e.producer.SendMessages(producerMessages(messages, e.topic))
if err != nil {
return td.SpanCount(), err
}
Expand All @@ -84,3 +88,14 @@ func (e *kafkaProducer) traceDataPusher(_ context.Context, td pdata.Traces) (int
func (e *kafkaProducer) Close(context.Context) error {
return e.producer.Close()
}

func producerMessages(messages []Message, topic string) []*sarama.ProducerMessage {
producerMessages := make([]*sarama.ProducerMessage, len(messages))
for i := range messages {
producerMessages[i] = &sarama.ProducerMessage{
Topic: topic,
Value: sarama.ByteEncoder(messages[i].Value),
}
}
return producerMessages
}
19 changes: 15 additions & 4 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,20 @@ import (
"go.opentelemetry.io/collector/internal/data/testdata"
)

func TestNewExporter_wrong_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0"}
func TestNewExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
exp, err := newExporter(c, component.ExporterCreateParams{})
assert.Error(t, err)
assert.Nil(t, exp)
}

func TestNewExporter_err_encoding(t *testing.T) {
c := Config{Encoding: "foo"}
exp, err := newExporter(c, component.ExporterCreateParams{})
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
assert.Nil(t, exp)
}

func TestTraceDataPusher(t *testing.T) {
c := sarama.NewConfig()
producer := mocks.NewSyncProducer(t, c)
Expand Down Expand Up @@ -91,8 +98,12 @@ type errorMarshaller struct {
err error
}

var _ marshaller = (*errorMarshaller)(nil)
var _ Marshaller = (*errorMarshaller)(nil)

func (e errorMarshaller) Marshal(pdata.Traces) ([]byte, error) {
func (e errorMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
return nil, e.err
}

func (e errorMarshaller) Encoding() string {
panic("implement me")
}
Loading

0 comments on commit 3b849e3

Please sign in to comment.