Skip to content

Commit

Permalink
Expose marshalers in factory
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Aug 24, 2020
1 parent 4598288 commit 507dd0a
Show file tree
Hide file tree
Showing 20 changed files with 112 additions and 128 deletions.
2 changes: 1 addition & 1 deletion exporter/kafkaexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := NewFactory()
factory := NewFactory(DefaultMarshallers())
factories.Exporters[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
Expand Down
17 changes: 13 additions & 4 deletions exporter/kafkaexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,16 @@ const (
)

// NewFactory creates Kafka exporter factory.
func NewFactory() component.ExporterFactory {
// encodingMarshaller is a map with supported encodings for this factory.
// See DefaultMarshallers.
func NewFactory(encodingMarshaller map[string]Marshaller) component.ExporterFactory {
k := kafkaExporterFactory{
marshallers: encodingMarshaller,
}
return exporterhelper.NewFactory(
typeStr,
createDefaultConfig,
exporterhelper.WithTraces(createTraceExporter))
exporterhelper.WithTraces(k.createTraceExporter))
}

func createDefaultConfig() configmodels.Exporter {
Expand Down Expand Up @@ -69,13 +74,17 @@ func createDefaultConfig() configmodels.Exporter {
}
}

func createTraceExporter(
type kafkaExporterFactory struct {
marshallers map[string]Marshaller
}

func (f *kafkaExporterFactory) createTraceExporter(
_ context.Context,
params component.ExporterCreateParams,
cfg configmodels.Exporter,
) (component.TraceExporter, error) {
oCfg := cfg.(*Config)
exp, err := newExporter(*oCfg, params)
exp, err := newExporter(*oCfg, params, f.marshallers)
if err != nil {
return nil, err
}
Expand Down
8 changes: 5 additions & 3 deletions exporter/kafkaexporter/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func TestCreateTracesExporter(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// this disables contacting the broker so we can successfully create the exporter
cfg.Metadata.Full = false
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
f := kafkaExporterFactory{marshallers: DefaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
require.NoError(t, err)
assert.NotNil(t, r)
}
Expand All @@ -48,8 +49,9 @@ func TestCreateTracesExporter_err(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
// we get the error because the exporter
r, err := createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
f := kafkaExporterFactory{marshallers: DefaultMarshallers()}
r, err := f.createTraceExporter(context.Background(), component.ExporterCreateParams{}, cfg)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}
19 changes: 6 additions & 13 deletions exporter/kafkaexporter/jaeger_marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,6 @@ import (
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func init() {
jaegerProtoMarshaller := &jaegerMarshaller{
marshaller: jaegerProtoSpanMarshaller{},
}
RegisterMarshaller(jaegerProtoMarshaller)
jaegerJSONMarshaller := &jaegerMarshaller{
marshaller: jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
},
}
RegisterMarshaller(jaegerJSONMarshaller)
}

type jaegerMarshaller struct {
marshaller jaegerSpanMarshaller
}
Expand Down Expand Up @@ -94,6 +81,12 @@ type jaegerJSONSpanMarshaller struct {

var _ jaegerSpanMarshaller = (*jaegerJSONSpanMarshaller)(nil)

func newJaegerJSONMarshaller() *jaegerJSONSpanMarshaller {
return &jaegerJSONSpanMarshaller{
pbMarshaller: &jsonpb.Marshaler{},
}
}

func (p jaegerJSONSpanMarshaller) marshall(span *jaegerproto.Span) ([]byte, error) {
out := new(bytes.Buffer)
err := p.pbMarshaller.Marshal(out, span)
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/kafka_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ type kafkaProducer struct {
}

// newExporter creates Kafka exporter.
func newExporter(config Config, params component.ExporterCreateParams) (*kafkaProducer, error) {
marshaller := GetMarshaller(config.Encoding)
func newExporter(config Config, params component.ExporterCreateParams, marshallers map[string]Marshaller) (*kafkaProducer, error) {
marshaller := marshallers[config.Encoding]
if marshaller == nil {
return nil, errUnrecognizedEncoding
}
Expand Down
4 changes: 2 additions & 2 deletions exporter/kafkaexporter/kafka_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ import (

func TestNewExporter_err_version(t *testing.T) {
c := Config{ProtocolVersion: "0.0.0", Encoding: defaultEncoding}
exp, err := newExporter(c, component.ExporterCreateParams{})
exp, err := newExporter(c, component.ExporterCreateParams{}, DefaultMarshallers())
assert.Error(t, err)
assert.Nil(t, exp)
}

func TestNewExporter_err_encoding(t *testing.T) {
c := Config{Encoding: "foo"}
exp, err := newExporter(c, component.ExporterCreateParams{})
exp, err := newExporter(c, component.ExporterCreateParams{}, map[string]Marshaller{})
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
assert.Nil(t, exp)
}
Expand Down
24 changes: 12 additions & 12 deletions exporter/kafkaexporter/marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@ import (
"go.opentelemetry.io/collector/consumer/pdata"
)

var marshallers = map[string]Marshaller{}

// GetMarshaller gets a Marshaller for encoding or nil if no marshaller is registered.
func GetMarshaller(encoding string) Marshaller {
return marshallers[encoding]
}

// RegisterMarshaller register marshaller.
func RegisterMarshaller(marshaller Marshaller) {
marshallers[marshaller.Encoding()] = marshaller
}

// Marshaller marshals traces into Message array.
type Marshaller interface {
// Marshal serializes spans into Messages
Expand All @@ -43,3 +31,15 @@ type Marshaller interface {
type Message struct {
Value []byte
}

// DefaultMarshallers returns map of supported encodings with Marshaller.
func DefaultMarshallers() map[string]Marshaller {
otlp := &protoMarshaller{}
jaegerProto := jaegerMarshaller{marshaller: jaegerProtoSpanMarshaller{}}
jaegerJSON := jaegerMarshaller{marshaller: newJaegerJSONMarshaller()}
return map[string]Marshaller{
otlp.Encoding(): otlp,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
}
}
38 changes: 16 additions & 22 deletions exporter/kafkaexporter/marshaller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,22 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"go.opentelemetry.io/collector/consumer/pdata"
"github.com/stretchr/testify/require"
)

const testEncoding = "test"

func TestRegisterMarshaller(t *testing.T) {
sender := &testMarshaller{}
RegisterMarshaller(sender)
unmarshaller := GetMarshaller(testEncoding)
assert.Equal(t, sender, unmarshaller)
}

type testMarshaller struct {
}

var _ Marshaller = (*testMarshaller)(nil)

func (t testMarshaller) Marshal(pdata.Traces) ([]Message, error) {
panic("implement me")
}

func (t testMarshaller) Encoding() string {
return testEncoding
func TestDefaultMarshallers(t *testing.T) {
expectedEncodings := []string{
"otlp_proto",
"jaeger_proto",
"jaeger_json",
}
marshallers := DefaultMarshallers()
assert.Equal(t, len(expectedEncodings), len(marshallers))
for _, e := range expectedEncodings {
t.Run(e, func(t *testing.T) {
m, ok := marshallers[e]
require.True(t, ok)
assert.NotNil(t, m)
})
}
}
5 changes: 0 additions & 5 deletions exporter/kafkaexporter/otlp_marshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ import (
otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1"
)

func init() {
protoMarshaller := &protoMarshaller{}
RegisterMarshaller(protoMarshaller)
}

type protoMarshaller struct {
}

Expand Down
2 changes: 1 addition & 1 deletion receiver/kafkareceiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestLoadConfig(t *testing.T) {
factories, err := componenttest.ExampleComponents()
assert.NoError(t, err)

factory := NewFactory()
factory := NewFactory(DefaultUnmarshallers())
factories.Receivers[typeStr] = factory
cfg, err := configtest.LoadConfigFile(t, path.Join(".", "testdata", "config.yaml"), factories)
require.NoError(t, err)
Expand Down
15 changes: 11 additions & 4 deletions receiver/kafkareceiver/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,14 @@ const (
)

// NewFactory creates Kafka receiver factory.
func NewFactory() component.ReceiverFactory {
// encodingMarshaller is a map with supported encodings for this factory.
// See DefaultUnmarshallers.
func NewFactory(encodingUnmarshaler map[string]Unmarshaller) component.ReceiverFactory {
f := &kafkaReceiverFactory{unmarshalers: encodingUnmarshaler}
return receiverhelper.NewFactory(
typeStr,
createDefaultConfig,
receiverhelper.WithTraces(createTraceReceiver))
receiverhelper.WithTraces(f.createTraceReceiver))
}

func createDefaultConfig() configmodels.Receiver {
Expand All @@ -70,14 +73,18 @@ func createDefaultConfig() configmodels.Receiver {
}
}

func createTraceReceiver(
type kafkaReceiverFactory struct {
unmarshalers map[string]Unmarshaller
}

func (f *kafkaReceiverFactory) createTraceReceiver(
_ context.Context,
params component.ReceiverCreateParams,
cfg configmodels.Receiver,
nextConsumer consumer.TraceConsumer,
) (component.TraceReceiver, error) {
c := cfg.(*Config)
r, err := newReceiver(*c, params, nextConsumer)
r, err := newReceiver(*c, params, f.unmarshalers, nextConsumer)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions receiver/kafkareceiver/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func TestCreateTraceReceiver(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Brokers = []string{"invalid:9092"}
cfg.ProtocolVersion = "2.0.0"
r, err := createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
f := kafkaReceiverFactory{unmarshalers: DefaultUnmarshallers()}
r, err := f.createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
// no available broker
require.Error(t, err)
assert.Nil(t, r)
}
Expand All @@ -49,7 +51,8 @@ func TestCreateTraceReceiver_error(t *testing.T) {
cfg.ProtocolVersion = "2.0.0"
// disable contacting broker at startup
cfg.Metadata.Full = false
r, err := createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
f := kafkaReceiverFactory{unmarshalers: DefaultUnmarshallers()}
r, err := f.createTraceReceiver(context.Background(), component.ReceiverCreateParams{}, cfg, nil)
require.NoError(t, err)
assert.NotNil(t, r)
}
7 changes: 0 additions & 7 deletions receiver/kafkareceiver/jaeger_unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ import (
jaegertranslator "go.opentelemetry.io/collector/translator/trace/jaeger"
)

func init() {
jaegerJSON := &jaegerJSONSpanUnmarshaller{}
RegisterUnmarshaller(jaegerJSON)
jaegerProto := &jaegerProtoSpanUnmarshaller{}
RegisterUnmarshaller(jaegerProto)
}

type jaegerProtoSpanUnmarshaller struct {
}

Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/kafka_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ type kafkaConsumer struct {

var _ component.Receiver = (*kafkaConsumer)(nil)

func newReceiver(config Config, params component.ReceiverCreateParams, nextConsumer consumer.TraceConsumer) (*kafkaConsumer, error) {
unmarshaller := GetUnmarshaller(config.Encoding)
func newReceiver(config Config, params component.ReceiverCreateParams, unmarshalers map[string]Unmarshaller, nextConsumer consumer.TraceConsumer) (*kafkaConsumer, error) {
unmarshaller := unmarshalers[config.Encoding]
if unmarshaller == nil {
return nil, errUnrecognizedEncoding
}
Expand Down
4 changes: 2 additions & 2 deletions receiver/kafkareceiver/kafka_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestNewReceiver_version_err(t *testing.T) {
Encoding: defaultEncoding,
ProtocolVersion: "none",
}
r, err := newReceiver(c, component.ReceiverCreateParams{}, exportertest.NewNopTraceExporter())
r, err := newReceiver(c, component.ReceiverCreateParams{}, DefaultUnmarshallers(), exportertest.NewNopTraceExporter())
assert.Error(t, err)
assert.Nil(t, r)
}
Expand All @@ -49,7 +49,7 @@ func TestNewReceiver_encoding_err(t *testing.T) {
c := Config{
Encoding: "foo",
}
r, err := newReceiver(c, component.ReceiverCreateParams{}, exportertest.NewNopTraceExporter())
r, err := newReceiver(c, component.ReceiverCreateParams{}, DefaultUnmarshallers(), exportertest.NewNopTraceExporter())
require.Error(t, err)
assert.Nil(t, r)
assert.EqualError(t, err, errUnrecognizedEncoding.Error())
Expand Down
5 changes: 0 additions & 5 deletions receiver/kafkareceiver/otlp_unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ import (
otlptrace "go.opentelemetry.io/collector/internal/data/opentelemetry-proto-gen/collector/trace/v1"
)

func init() {
otlpProto := &protoUnmarshaller{}
RegisterUnmarshaller(otlpProto)
}

type protoUnmarshaller struct {
}

Expand Down
24 changes: 14 additions & 10 deletions receiver/kafkareceiver/unmarshaller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@ type Unmarshaller interface {
Encoding() string
}

var unmarshallers = map[string]Unmarshaller{}

// GetUnmarshaller returns unmarshaller for given encoding or nil if not found.
func GetUnmarshaller(encoding string) Unmarshaller {
return unmarshallers[encoding]
}

// RegisterUnmarshaller registers unmarshaller.
func RegisterUnmarshaller(unmarshaller Unmarshaller) {
unmarshallers[unmarshaller.Encoding()] = unmarshaller
// DefaultUnmarshallers returns map of supported encodings with Unmarshaller.
func DefaultUnmarshallers() map[string]Unmarshaller {
otlp := &protoUnmarshaller{}
jaegerProto := jaegerProtoSpanUnmarshaller{}
jaegerJSON := jaegerJSONSpanUnmarshaller{}
zipkinJSON := zipkinJSONSpanUnmarshaller{}
zipkinThrift := zipkinThriftSpanUnmarshaller{}
return map[string]Unmarshaller{
otlp.Encoding(): otlp,
jaegerProto.Encoding(): jaegerProto,
jaegerJSON.Encoding(): jaegerJSON,
zipkinJSON.Encoding(): zipkinJSON,
zipkinThrift.Encoding(): zipkinThrift,
}
}
Loading

0 comments on commit 507dd0a

Please sign in to comment.