Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Kafka payload encoding configurable #1584

Merged
merged 4 commits into from
Aug 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions exporter/kafkaexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@

Kafka exporter exports traces to Kafka. This exporter uses a synchronous producer
that blocks and does not batch messages, therefore it should be used with batch and queued retry
processors for higher throughput and resiliency.
processors for higher throughput and resiliency. Message payload encoding is configurable.

Message payloads are serialized OTLP `ExportTraceServiceRequest`.

The following settings are required:
- `protocol_version` (no default): Kafka protocol version e.g. 2.0.0

The following settings can be optionally configured:
- `brokers` (default = localhost:9092): The list of kafka brokers
- `topic` (default = otlp_spans): The name of the kafka topic to export to
- `encoding` (default = otlp_proto): The encoding of the payload sent to kafka. Available encodings:
- `otlp_proto`: the payload is serialized to `ExportTraceServiceRequest`.
- `jaeger_proto`: the payload is serialized to a single Jaeger proto `Span`.
- `jaeger_json`: the payload is serialized to a single Jaeger JSON Span using `jsonpb`.
- `metadata`
- `full` (default = true): Whether to maintain a full set of metadata.
When disabled the client does not make the initial request to broker at the startup.
Expand Down
2 changes: 2 additions & 0 deletions exporter/kafkaexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Config struct {
ProtocolVersion string `mapstructure:"protocol_version"`
// The name of the kafka topic to export to (default "otlp_spans")
Topic string `mapstructure:"topic"`
// Encoding of the messages (default "otlp_proto")
Encoding string `mapstructure:"encoding"`

// Metadata is the namespace for metadata management properties used by the
// Client, and shared by the Producer/Consumer.
Expand Down
5 changes: 3 additions & 2 deletions exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,9 @@ func TestLoadConfig(t *testing.T) {
NumConsumers: 2,
QueueSize: 10,
},
Topic: "spans",
Brokers: []string{"foo:123", "bar:456"},
Topic: "spans",
Encoding: "otlp_proto",
Brokers: []string{"foo:123", "bar:456"},
Metadata: Metadata{
Full: false,
Retry: MetadataRetry{
Expand Down
38 changes: 31 additions & 7 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import (
)

const (
typeStr = "kafka"
defaultTopic = "otlp_spans"
defaultBroker = "localhost:9092"
typeStr = "kafka"
defaultTopic = "otlp_spans"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a generic defaultTopic not perceived to be associated with the encoding? e.g. just spans

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please create a separate issue for this. Ideally, this should not clash with other topic names used in OSS systems Zipkin/Jaeger.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that the non OTLP encodings will be used in legacy deployments that already use different topic names (e.g. jaeger-spans, zipkin).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it is reasonable to have a default topic aligned with the default encoding - so if someone is changing the encoding they are likely to change the topic, so nevermind.

defaultEncoding = "otlp_proto"
defaultBroker = "localhost:9092"
// default from sarama.NewConfig()
defaultMetadataRetryMax = 3
// default from sarama.NewConfig()
Expand All @@ -35,12 +36,30 @@ const (
defaultMetadataFull = true
)

// FactoryOption applies changes to kafkaExporterFactory.
type FactoryOption func(factory *kafkaExporterFactory)

// WithAddMarshallers adds marshallers.
func WithAddMarshallers(encodingMarshaller map[string]Marshaller) FactoryOption {
return func(factory *kafkaExporterFactory) {
for encoding, marshaller := range encodingMarshaller {
factory.marshallers[encoding] = marshaller
}
}
}

// NewFactory creates Kafka exporter factory.
func NewFactory() component.ExporterFactory {
func NewFactory(options ...FactoryOption) component.ExporterFactory {
f := &kafkaExporterFactory{
marshallers: defaultMarshallers(),
}
for _, o := range options {
o(f)
}
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter))
exporterhelper.WithTraces(f.createTraceExporter))
}

func createDefaultConfig() configmodels.Exporter {
Expand All @@ -57,6 +76,7 @@ func createDefaultConfig() configmodels.Exporter {
QueueSettings: qs,
Brokers: []string{defaultBroker},
Topic: defaultTopic,
Encoding: defaultEncoding,
Metadata: Metadata{
Full: defaultMetadataFull,
Retry: MetadataRetry{
Expand All @@ -67,13 +87,17 @@ func createDefaultConfig() configmodels.Exporter {
}
}

func createTraceExporter(
type kafkaExporterFactory struct {
marshallers map[string]Marshaller
}

func (f *kafkaExporterFactory) createTraceExporter(
_ context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
oCfg := cfg.(*Config)
exp, err := newExporter(*oCfg, params)
exp, err := newExporter(*oCfg, params, f.marshallers)
if err != nil {
return nil, err
}
Expand Down
43 changes: 40 additions & 3 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcheck"
"go.opentelemetry.io/collector/consumer/pdata"
)

func TestCreateDefaultConfig(t *testing.T) {
Expand All @@ -39,7 +40,8 @@ func TestCreateTracesExporter(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}
Expand All @@ -48,8 +50,43 @@ func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// we get the error because the exporter
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
f := kafkaExporterFactory{marshallers: defaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}

func TestWithMarshallers(t *testing.T) {
cm := &customMarshaller{}
f := NewFactory(WithAddMarshallers(map[string]Marshaller{cm.Encoding(): cm}))
cfg := createDefaultConfig().(*Config)
// disable contacting broker
cfg.Metadata.Full = false

t.Run("custom_encoding", func(t *testing.T) {
cfg.Encoding = cm.Encoding()
exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
require.NotNil(t, exporter)
})
t.Run("default_encoding", func(t *testing.T) {
cfg.Encoding = new(otlpProtoMarshaller).Encoding()
exporter, err := f.CreateTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, exporter)
})
}

type customMarshaller struct {
}

var _ Marshaller = (*customMarshaller)(nil)

func (c customMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
panic("implement me")
}

func (c customMarshaller) Encoding() string {
return "custom"
}
98 changes: 98 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"bytes"

"github.com/gogo/protobuf/jsonpb"
jaegerproto "github.com/jaegertracing/jaeger/model"

"go.opentelemetry.io/collector/component/componenterror"
"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

type jaegerMarshaller struct {
marshaller jaegerSpanMarshaller
}

var _ Marshaller = (*jaegerMarshaller)(nil)

func (j jaegerMarshaller) Marshal(traces pdata.Traces) ([]Message, error) {
batches, err := jaegertranslator.InternalTracesToJaegerProto(traces)
if err != nil {
return nil, err
}
var messages []Message
var errs []error
for _, batch := range batches {
for _, span := range batch.Spans {
span.Process = batch.Process
bts, err := j.marshaller.marshall(span)
// continue to process spans that can be serialized
if err != nil {
errs = append(errs, err)
continue
}
messages = append(messages, Message{Value: bts})
}
}
return messages, componenterror.CombineErrors(errs)
}

func (j jaegerMarshaller) Encoding() string {
return j.marshaller.encoding()
}

type jaegerSpanMarshaller interface {
marshall(span *jaegerproto.Span) ([]byte, error)
encoding() string
}

type jaegerProtoSpanMarshaller struct {
}

var _ jaegerSpanMarshaller = (*jaegerProtoSpanMarshaller)(nil)

func (p jaegerProtoSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
return span.Marshal()
}

func (p jaegerProtoSpanMarshaller) encoding() string {
return "jaeger_proto"
}

type jaegerJSONSpanMarshaller struct {
pbMarshaller *jsonpb.Marshaler
}

var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil)

func newJaegerJSONMarshaller() *jaegerJSONSpanMarshaller {
return &jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
}
}

func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
out := new(bytes.Buffer)
err := p.pbMarshaller.Marshal(out, span)
return out.Bytes(), err
}

func (p jaegerJSONSpanMarshaller) encoding() string {
return "jaeger_json"
}
95 changes: 95 additions & 0 deletions exporter/kafkaexporter/jaeger_marshaller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
// Copyright 2020 The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafkaexporter

import (
"bytes"
"testing"

"github.com/gogo/protobuf/jsonpb"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/consumer/pdata"
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func TestJaegerMarshaller(t *testing.T) {
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetName("foo")
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetStartTime(pdata.TimestampUnixNano(10))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetEndTime(pdata.TimestampUnixNano(20))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetTraceID(pdata.NewTraceID([]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}))
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().At(0).SetSpanID(pdata.NewSpanID([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
batches, err := jaegertranslator.InternalTracesToJaegerProto(td)
require.NoError(t, err)

batches[0].Spans[0].Process = batches[0].Process
jaegerProtoBytes, err := batches[0].Spans[0].Marshal()
require.NoError(t, err)
require.NotNil(t, jaegerProtoBytes)

jsonMarshaller := &jsonpb.Marshaler{}
jsonByteBuffer := new(bytes.Buffer)
require.NoError(t, jsonMarshaller.Marshal(jsonByteBuffer, batches[0].Spans[0]))

tests := []struct {
unmarshaller Marshaller
encoding string
messages []Message
}{
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
},
encoding: "jaeger_proto",
messages: []Message{{Value: jaegerProtoBytes}},
},
{
unmarshaller: jaegerMarshaller{
marshaller: jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
},
},
encoding: "jaeger_json",
messages: []Message{{Value: jsonByteBuffer.Bytes()}},
},
}
for _, test := range tests {
t.Run(test.encoding, func(t *testing.T) {
messages, err := test.unmarshaller.Marshal(td)
require.NoError(t, err)
assert.Equal(t, test.messages, messages)
assert.Equal(t, test.encoding, test.unmarshaller.Encoding())
})
}
}

func TestJaegerMarshaller_error_covert_traceID(t *testing.T) {
marshaller := jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
}
td := pdata.NewTraces()
td.ResourceSpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().Resize(1)
td.ResourceSpans().At(0).InstrumentationLibrarySpans().At(0).Spans().Resize(1)
// fails in zero traceID
messages, err := marshaller.Marshal(td)
require.Error(t, err)
assert.Nil(t, messages)
}
Loading