From 12b3d9ccb8d20f072a25737c4b745512d60e2ee0 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Fri, 26 Jun 2020 16:56:18 -0700 Subject: [PATCH] Use default configs for testbed receivers (#1214) Signed-off-by: Bogdan Drutu --- testbed/testbed/receivers.go | 103 ++++++++++++++++++++++------------- 1 file changed, 64 insertions(+), 39 deletions(-) diff --git a/testbed/testbed/receivers.go b/testbed/testbed/receivers.go index dd0136354e9..94704d6c152 100644 --- a/testbed/testbed/receivers.go +++ b/testbed/testbed/receivers.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/config/configprotocol" "go.opentelemetry.io/collector/receiver/jaegerreceiver" "go.opentelemetry.io/collector/receiver/opencensusreceiver" "go.opentelemetry.io/collector/receiver/otlpreceiver" @@ -36,7 +37,7 @@ import ( // an exporter. type DataReceiver interface { Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error - Stop() + Stop() error // Generate a config string to place in exporter part of collector config // so that it can send data to this receiver. @@ -59,7 +60,7 @@ func (mb *DataReceiverBase) ReportFatalError(err error) { } // GetFactory of the specified kind. Returns the factory for a component type. -func (mb *DataReceiverBase) GetFactory(kind component.Kind, componentType configmodels.Type) component.Factory { +func (mb *DataReceiverBase) GetFactory(_ component.Kind, _ configmodels.Type) component.Factory { return nil } @@ -75,7 +76,8 @@ func (mb *DataReceiverBase) GetExporters() map[configmodels.DataType]map[configm // OCDataReceiver implements OpenCensus format receiver. type OCDataReceiver struct { DataReceiverBase - receiver *opencensusreceiver.Receiver + traceReceiver component.TraceReceiver + metricsReceiver component.MetricsReceiver } // Ensure OCDataReceiver implements DataReceiver. @@ -90,18 +92,31 @@ func NewOCDataReceiver(port int) *OCDataReceiver { } func (or *OCDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - addr := fmt.Sprintf("localhost:%d", or.Port) + factory := opencensusreceiver.Factory{} + cfg := factory.CreateDefaultConfig().(*opencensusreceiver.Config) + cfg.SetName(or.ProtocolName()) + cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port) var err error - or.receiver, err = opencensusreceiver.New("opencensus", "tcp", addr, tc, mc) - if err != nil { + if or.traceReceiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc); err != nil { return err } - - return or.receiver.Start(context.Background(), or) + if or.metricsReceiver, err = factory.CreateMetricsReceiver(zap.NewNop(), cfg, mc); err != nil { + return err + } + if err = or.traceReceiver.Start(context.Background(), or); err != nil { + return err + } + return or.metricsReceiver.Start(context.Background(), or) } -func (or *OCDataReceiver) Stop() { - or.receiver.Shutdown(context.Background()) +func (or *OCDataReceiver) Stop() error { + if err := or.traceReceiver.Shutdown(context.Background()); err != nil { + return err + } + if err := or.metricsReceiver.Shutdown(context.Background()); err != nil { + return err + } + return nil } func (or *OCDataReceiver) GenConfigYAMLStr() string { @@ -128,13 +143,16 @@ func NewJaegerDataReceiver(port int) *JaegerDataReceiver { return &JaegerDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - jaegerCfg := jaegerreceiver.Configuration{ - CollectorGRPCPort: jr.Port, +func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error { + factory := jaegerreceiver.Factory{} + cfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config) + cfg.SetName(jr.ProtocolName()) + cfg.Protocols["grpc"] = &configprotocol.ProtocolServerSettings{ + Endpoint: fmt.Sprintf("localhost:%d", jr.Port), } var err error params := component.ReceiverCreateParams{Logger: zap.NewNop()} - jr.receiver, err = jaegerreceiver.New("jaeger", &jaegerCfg, tc, params) + jr.receiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc) if err != nil { return err } @@ -142,12 +160,8 @@ func (jr *JaegerDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume return jr.receiver.Start(context.Background(), jr) } -func (jr *JaegerDataReceiver) Stop() { - if jr.receiver != nil { - if err := jr.receiver.Shutdown(context.Background()); err != nil { - log.Printf("Cannot stop Jaeger receiver: %s", err.Error()) - } - } +func (jr *JaegerDataReceiver) Stop() error { + return jr.receiver.Shutdown(context.Background()) } func (jr *JaegerDataReceiver) GenConfigYAMLStr() string { @@ -165,7 +179,8 @@ func (jr *JaegerDataReceiver) ProtocolName() string { // OTLPDataReceiver implements OTLP format receiver. type OTLPDataReceiver struct { DataReceiverBase - receiver *otlpreceiver.Receiver + traceReceiver component.TraceReceiver + metricsReceiver component.MetricsReceiver } // Ensure OTLPDataReceiver implements DataReceiver. @@ -180,18 +195,32 @@ func NewOTLPDataReceiver(port int) *OTLPDataReceiver { } func (or *OTLPDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - addr := fmt.Sprintf("localhost:%d", or.Port) + factory := otlpreceiver.Factory{} + cfg := factory.CreateDefaultConfig().(*otlpreceiver.Config) + cfg.SetName(or.ProtocolName()) + cfg.Endpoint = fmt.Sprintf("localhost:%d", or.Port) var err error - or.receiver, err = otlpreceiver.New("otlp", "tcp", addr, tc, mc) - if err != nil { + params := component.ReceiverCreateParams{Logger: zap.NewNop()} + if or.traceReceiver, err = factory.CreateTraceReceiver(context.Background(), params, cfg, tc); err != nil { return err } - - return or.receiver.Start(context.Background(), or) + if or.metricsReceiver, err = factory.CreateMetricsReceiver(context.Background(), params, cfg, mc); err != nil { + return err + } + if err = or.traceReceiver.Start(context.Background(), or); err != nil { + return err + } + return or.metricsReceiver.Start(context.Background(), or) } -func (or *OTLPDataReceiver) Stop() { - or.receiver.Shutdown(context.Background()) +func (or *OTLPDataReceiver) Stop() error { + if err := or.traceReceiver.Shutdown(context.Background()); err != nil { + return err + } + if err := or.metricsReceiver.Shutdown(context.Background()); err != nil { + return err + } + return nil } func (or *OTLPDataReceiver) GenConfigYAMLStr() string { @@ -209,7 +238,7 @@ func (or *OTLPDataReceiver) ProtocolName() string { // ZipkinDataReceiver implements Zipkin format receiver. type ZipkinDataReceiver struct { DataReceiverBase - receiver *zipkinreceiver.ZipkinReceiver + receiver component.TraceReceiver } const DefaultZipkinAddressPort = 9411 @@ -218,13 +247,13 @@ func NewZipkinDataReceiver(port int) *ZipkinDataReceiver { return &ZipkinDataReceiver{DataReceiverBase: DataReceiverBase{Port: port}} } -func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsumer) error { - var err error +func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, _ *MockMetricConsumer) error { factory := zipkinreceiver.Factory{} cfg := factory.CreateDefaultConfig().(*zipkinreceiver.Config) - cfg.NameVal = "zipkin" + cfg.SetName(zr.ProtocolName()) cfg.Endpoint = fmt.Sprintf("localhost:%d", zr.Port) - zr.receiver, err = zipkinreceiver.New(cfg, tc) + var err error + zr.receiver, err = factory.CreateTraceReceiver(context.Background(), zap.NewNop(), cfg, tc) if err != nil { return err @@ -233,12 +262,8 @@ func (zr *ZipkinDataReceiver) Start(tc *MockTraceConsumer, mc *MockMetricConsume return zr.receiver.Start(context.Background(), zr) } -func (zr *ZipkinDataReceiver) Stop() { - if zr.receiver != nil { - if err := zr.receiver.Shutdown(context.Background()); err != nil { - log.Printf("Cannot stop Zipkin receiver: %s", err.Error()) - } - } +func (zr *ZipkinDataReceiver) Stop() error { + return zr.receiver.Shutdown(context.Background()) } func (zr *ZipkinDataReceiver) GenConfigYAMLStr() string {