From 05c88d98215885f9c404ef72cdbb52f46ab06532 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Thu, 7 Mar 2024 16:52:29 -0800 Subject: [PATCH 1/2] [receiver/opencensus] refactor the code for maintenance and generate lifecycle tests --- .chloggen/opencensusreceiver_lifecycle.yaml | 27 +++ receiver/opencensusreceiver/factory.go | 16 +- receiver/opencensusreceiver/factory_test.go | 17 +- .../generated_component_test.go | 13 + .../internal/ocmetrics/opencensus_test.go | 5 +- receiver/opencensusreceiver/metadata.yaml | 3 +- receiver/opencensusreceiver/opencensus.go | 227 +++++++----------- .../opencensusreceiver/opencensus_test.go | 150 +++++++++--- 8 files changed, 247 insertions(+), 211 deletions(-) create mode 100644 .chloggen/opencensusreceiver_lifecycle.yaml diff --git a/.chloggen/opencensusreceiver_lifecycle.yaml b/.chloggen/opencensusreceiver_lifecycle.yaml new file mode 100644 index 0000000000000..b102d94efe66a --- /dev/null +++ b/.chloggen/opencensusreceiver_lifecycle.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: bug_fix + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: opencensusreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Refactor the opencensusreceiver to pass lifecycle tests and avoid leaking gRPC connections. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31643] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/opencensusreceiver/factory.go b/receiver/opencensusreceiver/factory.go index 8ee917bcdf4e0..eb04f2c6ee60b 100644 --- a/receiver/opencensusreceiver/factory.go +++ b/receiver/opencensusreceiver/factory.go @@ -47,16 +47,10 @@ func createTracesReceiver( cfg component.Config, nextConsumer consumer.Traces, ) (receiver.Traces, error) { - var err error r := receivers.GetOrAdd(cfg, func() component.Component { rCfg := cfg.(*Config) - var recv *ocReceiver - recv, err = newOpenCensusReceiver(rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, set, rCfg.buildOptions()...) - return recv + return newOpenCensusReceiver(rCfg, nil, nil, set, rCfg.buildOptions()...) }) - if err != nil { - return nil, err - } r.Unwrap().(*ocReceiver).traceConsumer = nextConsumer return r, nil @@ -68,16 +62,10 @@ func createMetricsReceiver( cfg component.Config, nextConsumer consumer.Metrics, ) (receiver.Metrics, error) { - var err error r := receivers.GetOrAdd(cfg, func() component.Component { rCfg := cfg.(*Config) - var recv *ocReceiver - recv, err = newOpenCensusReceiver(rCfg.NetAddr.Transport, rCfg.NetAddr.Endpoint, nil, nil, set, rCfg.buildOptions()...) - return recv + return newOpenCensusReceiver(rCfg, nil, nil, set, rCfg.buildOptions()...) }) - if err != nil { - return nil, err - } r.Unwrap().(*ocReceiver).metricsConsumer = nextConsumer return r, nil diff --git a/receiver/opencensusreceiver/factory_test.go b/receiver/opencensusreceiver/factory_test.go index 32b5602443398..aa55ec5b61068 100644 --- a/receiver/opencensusreceiver/factory_test.go +++ b/receiver/opencensusreceiver/factory_test.go @@ -86,14 +86,12 @@ func TestCreateTracesReceiver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tr, err := createTracesReceiver(ctx, set, tt.cfg, consumertest.NewNop()) + require.NoError(t, err) + err = tr.Start(context.Background(), componenttest.NewNopHost()) if (err != nil) != tt.wantErr { t.Errorf("factory.CreateTracesReceiver() error = %v, wantErr %v", err, tt.wantErr) - return - } - if tr != nil { - require.NoError(t, tr.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, tr.Shutdown(context.Background())) } + require.NoError(t, tr.Shutdown(context.Background())) }) } } @@ -152,14 +150,15 @@ func TestCreateMetricsReceiver(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { tc, err := createMetricsReceiver(context.Background(), set, tt.cfg, consumertest.NewNop()) + require.NoError(t, err) + err = tc.Start(context.Background(), componenttest.NewNopHost()) + defer func() { + require.NoError(t, tc.Shutdown(context.Background())) + }() if (err != nil) != tt.wantErr { t.Errorf("factory.CreateMetricsReceiver() error = %v, wantErr %v", err, tt.wantErr) return } - if tc != nil { - require.NoError(t, tc.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, tc.Shutdown(context.Background())) - } }) } } diff --git a/receiver/opencensusreceiver/generated_component_test.go b/receiver/opencensusreceiver/generated_component_test.go index 55f390324d6d2..5d848da2b2d4b 100644 --- a/receiver/opencensusreceiver/generated_component_test.go +++ b/receiver/opencensusreceiver/generated_component_test.go @@ -8,6 +8,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/confmap/confmaptest" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver" @@ -51,5 +52,17 @@ func TestComponentLifecycle(t *testing.T) { err = c.Shutdown(context.Background()) require.NoError(t, err) }) + t.Run(test.name+"-lifecycle", func(t *testing.T) { + firstRcvr, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + host := componenttest.NewNopHost() + require.NoError(t, err) + require.NoError(t, firstRcvr.Start(context.Background(), host)) + require.NoError(t, firstRcvr.Shutdown(context.Background())) + secondRcvr, err := test.createFn(context.Background(), receivertest.NewNopCreateSettings(), cfg) + require.NoError(t, err) + require.NoError(t, secondRcvr.Start(context.Background(), host)) + require.NoError(t, secondRcvr.Shutdown(context.Background())) + }) } } diff --git a/receiver/opencensusreceiver/internal/ocmetrics/opencensus_test.go b/receiver/opencensusreceiver/internal/ocmetrics/opencensus_test.go index 8a4211fd1cb04..e9f14d1f002aa 100644 --- a/receiver/opencensusreceiver/internal/ocmetrics/opencensus_test.go +++ b/receiver/opencensusreceiver/internal/ocmetrics/opencensus_test.go @@ -357,11 +357,8 @@ func ocReceiverOnGRPCServer(t *testing.T, sr consumer.Metrics, set receiver.Crea ln, err := net.Listen("tcp", "localhost:") require.NoError(t, err, "Failed to find an available address to run the gRPC server: %v", err) - doneFnList := []func(){func() { ln.Close() }} done := func() { - for _, doneFn := range doneFnList { - doneFn() - } + _ = ln.Close() } oci, err := New(sr, set) diff --git a/receiver/opencensusreceiver/metadata.yaml b/receiver/opencensusreceiver/metadata.yaml index f90969ca0641c..9df6cb29366fb 100644 --- a/receiver/opencensusreceiver/metadata.yaml +++ b/receiver/opencensusreceiver/metadata.yaml @@ -15,5 +15,4 @@ status: codeowners: active: [open-telemetry/collector-approvers] tests: - config: - skip_lifecycle: true \ No newline at end of file + config: \ No newline at end of file diff --git a/receiver/opencensusreceiver/opencensus.go b/receiver/opencensusreceiver/opencensus.go index d89ce069c8f82..4882791b44aff 100644 --- a/receiver/opencensusreceiver/opencensus.go +++ b/receiver/opencensusreceiver/opencensus.go @@ -30,7 +30,7 @@ import ( // ocReceiver is the type that exposes Trace and Metrics reception. type ocReceiver struct { - mu sync.Mutex + cfg *Config ln net.Listener serverGRPC *grpc.Server serverHTTP *http.Server @@ -45,31 +45,24 @@ type ocReceiver struct { traceConsumer consumer.Traces metricsConsumer consumer.Metrics - startTracesReceiverOnce sync.Once - startMetricsReceiverOnce sync.Once + stopWG sync.WaitGroup - settings receiver.CreateSettings + settings receiver.CreateSettings + multiplexer cmux.CMux } // newOpenCensusReceiver just creates the OpenCensus receiver services. It is the caller's // responsibility to invoke the respective Start*Reception methods as well // as the various Stop*Reception methods to end it. func newOpenCensusReceiver( - transport string, - addr string, + cfg *Config, tc consumer.Traces, mc consumer.Metrics, settings receiver.CreateSettings, opts ...ocOption, -) (*ocReceiver, error) { - // TODO: (@odeke-em) use options to enable address binding changes. - ln, err := net.Listen(transport, addr) - if err != nil { - return nil, fmt.Errorf("failed to bind to address %q: %w", addr, err) - } - +) *ocReceiver { ocr := &ocReceiver{ - ln: ln, + cfg: cfg, corsOrigins: []string{}, // Disable CORS by default. gatewayMux: gatewayruntime.NewServeMux(), traceConsumer: tc, @@ -81,184 +74,134 @@ func newOpenCensusReceiver( opt.withReceiver(ocr) } - return ocr, nil + return ocr } // Start runs the trace receiver on the gRPC server. Currently // it also enables the metrics receiver too. -func (ocr *ocReceiver) Start(_ context.Context, host component.Host) error { +func (ocr *ocReceiver) Start(ctx context.Context, host component.Host) error { + var err error + ocr.serverGRPC, err = ocr.grpcServerSettings.ToServerContext(ctx, host, ocr.settings.TelemetrySettings) + if err != nil { + return err + } + var mux http.Handler = ocr.gatewayMux + if len(ocr.corsOrigins) > 0 { + co := cors.Options{AllowedOrigins: ocr.corsOrigins} + mux = cors.New(co).Handler(mux) + } + ocr.serverHTTP = &http.Server{Handler: mux, ReadHeaderTimeout: 20 * time.Second} hasConsumer := false if ocr.traceConsumer != nil { hasConsumer = true - if err := ocr.registerTraceConsumer(host); err != nil { + ocr.traceReceiver, err = octrace.New(ocr.traceConsumer, ocr.settings) + if err != nil { return err } + agenttracepb.RegisterTraceServiceServer(ocr.serverGRPC, ocr.traceReceiver) } if ocr.metricsConsumer != nil { hasConsumer = true - if err := ocr.registerMetricsConsumer(host); err != nil { + ocr.metricsReceiver, err = ocmetrics.New(ocr.metricsConsumer, ocr.settings) + if err != nil { return err } + agentmetricspb.RegisterMetricsServiceServer(ocr.serverGRPC, ocr.metricsReceiver) } if !hasConsumer { return errors.New("cannot start receiver: no consumers were specified") } - - if err := ocr.startServer(); err != nil { - return err + ocr.ln, err = net.Listen(ocr.cfg.NetAddr.Transport, ocr.cfg.NetAddr.Endpoint) + if err != nil { + return fmt.Errorf("failed to bind to address %q: %w", ocr.cfg.NetAddr.Endpoint, err) } + // Register the grpc-gateway on the HTTP server mux + var c context.Context + c, ocr.cancel = context.WithCancel(context.Background()) - // At this point we've successfully started all the services/receivers. - // Add other start routines here. - return nil -} - -func (ocr *ocReceiver) registerTraceConsumer(host component.Host) error { - var err error - - ocr.startTracesReceiverOnce.Do(func() { - ocr.traceReceiver, err = octrace.New(ocr.traceConsumer, ocr.settings) - if err != nil { - return - } - - var srv *grpc.Server - srv, err = ocr.grpcServer(host) - if err != nil { - return - } - - agenttracepb.RegisterTraceServiceServer(srv, ocr.traceReceiver) + endpoint := ocr.ln.Addr().String() - }) + _, ok := ocr.ln.(*net.UnixListener) + if ok { + endpoint = "unix:" + endpoint + } - return err -} + // Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port. + ocr.multiplexer = cmux.New(ocr.ln) + grpcL := ocr.multiplexer.MatchWithWriters( + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), + cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto")) -func (ocr *ocReceiver) registerMetricsConsumer(host component.Host) error { - var err error + httpL := ocr.multiplexer.Match(cmux.Any()) + ocr.stopWG.Add(1) + startWG := sync.WaitGroup{} + startWG.Add(3) - ocr.startMetricsReceiverOnce.Do(func() { - ocr.metricsReceiver, err = ocmetrics.New(ocr.metricsConsumer, ocr.settings) - if err != nil { - return + go func() { + defer ocr.stopWG.Done() + startWG.Done() + // Check for cmux.ErrServerClosed, because during the shutdown this is not properly close before closing the cmux, + if err := ocr.serverGRPC.Serve(grpcL); !errors.Is(err, grpc.ErrServerStopped) && !errors.Is(err, cmux.ErrServerClosed) && err != nil { + ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) } - - var srv *grpc.Server - srv, err = ocr.grpcServer(host) - if err != nil { - return + }() + go func() { + startWG.Done() + if err := ocr.serverHTTP.Serve(httpL); !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrServerClosed) && err != nil { + ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) } + }() + go func() { + startWG.Done() + if err := ocr.multiplexer.Serve(); !errors.Is(err, cmux.ErrServerClosed) && err != nil { + ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) + } + }() - agentmetricspb.RegisterMetricsServiceServer(srv, ocr.metricsReceiver) - }) - return err -} + startWG.Wait() -func (ocr *ocReceiver) grpcServer(host component.Host) (*grpc.Server, error) { - ocr.mu.Lock() - defer ocr.mu.Unlock() + opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithBlock()} + if err := agenttracepb.RegisterTraceServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil { + return err + } - if ocr.serverGRPC == nil { - var err error - ocr.serverGRPC, err = ocr.grpcServerSettings.ToServer(host, ocr.settings.TelemetrySettings) - if err != nil { - return nil, err - } + if err := agentmetricspb.RegisterMetricsServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil { + return err } - return ocr.serverGRPC, nil + // At this point we've successfully started all the services/receivers. + // Add other start routines here. + return nil } // Shutdown is a method to turn off receiving. func (ocr *ocReceiver) Shutdown(context.Context) error { - ocr.mu.Lock() - defer ocr.mu.Unlock() - - var err error - if ocr.serverHTTP != nil { - err = ocr.serverHTTP.Close() - } - - if ocr.ln != nil { - _ = ocr.ln.Close() - } - - // TODO: @(odeke-em) investigate what utility invoking (*grpc.Server).Stop() - // gives us yet we invoke (net.Listener).Close(). - // Sure (*grpc.Server).Stop() enables proper shutdown but imposes - // a painful and artificial wait time that goes into 20+seconds yet most of our - // tests and code should be reactive in less than even 1second. - // ocr.serverGRPC.Stop() if ocr.cancel != nil { ocr.cancel() } - return err -} - -func (ocr *ocReceiver) httpServer() *http.Server { - ocr.mu.Lock() - defer ocr.mu.Unlock() - - if ocr.serverHTTP == nil { - var mux http.Handler = ocr.gatewayMux - if len(ocr.corsOrigins) > 0 { - co := cors.Options{AllowedOrigins: ocr.corsOrigins} - mux = cors.New(co).Handler(mux) - } - ocr.serverHTTP = &http.Server{Handler: mux, ReadHeaderTimeout: 20 * time.Second} + if ocr.serverGRPC != nil { + ocr.serverGRPC.Stop() + ocr.stopWG.Wait() } - return ocr.serverHTTP -} - -func (ocr *ocReceiver) startServer() error { - // Register the grpc-gateway on the HTTP server mux - var c context.Context - c, ocr.cancel = context.WithCancel(context.Background()) - opts := []grpc.DialOption{grpc.WithTransportCredentials(insecure.NewCredentials())} - endpoint := ocr.ln.Addr().String() - - _, ok := ocr.ln.(*net.UnixListener) - if ok { - endpoint = "unix:" + endpoint + if ocr.serverHTTP != nil { + _ = ocr.serverHTTP.Close() } - if err := agenttracepb.RegisterTraceServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil { - return err + if ocr.ln != nil { + _ = ocr.ln.Close() } - if err := agentmetricspb.RegisterMetricsServiceHandlerFromEndpoint(c, ocr.gatewayMux, endpoint, opts); err != nil { - return err + if ocr.multiplexer != nil { + ocr.multiplexer.Close() } - // Start the gRPC and HTTP/JSON (grpc-gateway) servers on the same port. - m := cmux.New(ocr.ln) - grpcL := m.MatchWithWriters( - cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"), - cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc+proto")) + ocr.traceConsumer = nil + ocr.metricsConsumer = nil - httpL := m.Match(cmux.Any()) - go func() { - // Check for cmux.ErrServerClosed, because during the shutdown this is not properly close before closing the cmux, - // see TODO in Shutdown. - if err := ocr.serverGRPC.Serve(grpcL); !errors.Is(err, grpc.ErrServerStopped) && !errors.Is(err, cmux.ErrServerClosed) && err != nil { - ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) - } - }() - go func() { - if err := ocr.httpServer().Serve(httpL); !errors.Is(err, http.ErrServerClosed) && err != nil { - ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) - } - }() - go func() { - if err := m.Serve(); !errors.Is(err, cmux.ErrServerClosed) && err != nil { - ocr.settings.TelemetrySettings.ReportStatus(component.NewFatalErrorEvent(err)) - } - }() return nil } diff --git a/receiver/opencensusreceiver/opencensus_test.go b/receiver/opencensusreceiver/opencensus_test.go index 1846baae582ac..c722f7fafcb96 100644 --- a/receiver/opencensusreceiver/opencensus_test.go +++ b/receiver/opencensusreceiver/opencensus_test.go @@ -30,6 +30,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configgrpc" + "go.opentelemetry.io/collector/config/confignet" "go.opentelemetry.io/collector/config/configtls" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumertest" @@ -56,17 +57,20 @@ func TestGrpcGateway_endToEnd(t *testing.T) { // Set the buffer count to 1 to make it flush the test span immediately. sink := new(consumertest.TracesSink) - ocr, err := newOpenCensusReceiver("tcp", addr, sink, nil, receivertest.NewNopCreateSettings()) - require.NoError(t, err, "Failed to create trace receiver: %v", err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + ocr := newOpenCensusReceiver(cfg, sink, nil, receivertest.NewNopCreateSettings()) - err = ocr.Start(context.Background(), componenttest.NewNopHost()) + err := ocr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err, "Failed to start trace receiver: %v", err) t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) }) - // TODO(songy23): make starting server deterministic - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - url := fmt.Sprintf("http://%s/v1/trace", addr) // Verify that CORS is not enabled by default, but that it gives a method not allowed error. @@ -95,6 +99,7 @@ func TestGrpcGateway_endToEnd(t *testing.T) { req.Header.Set("Content-Type", "application/json") client := &http.Client{} + defer client.CloseIdleConnections() resp, err := client.Do(req) require.NoError(t, err, "Error posting trace to grpc-gateway server: %v", err) @@ -141,12 +146,18 @@ func TestGrpcGateway_endToEnd(t *testing.T) { func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) corsOrigins := []string{"allowed-*.com"} - - ocr, err := newOpenCensusReceiver("tcp", addr, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), withCorsOrigins(corsOrigins)) - require.NoError(t, err, "Failed to create trace receiver: %v", err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + ocr := newOpenCensusReceiver(cfg, consumertest.NewNop(), nil, receivertest.NewNopCreateSettings(), withCorsOrigins(corsOrigins)) t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) }) - err = ocr.Start(context.Background(), componenttest.NewNopHost()) + err := ocr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err, "Failed to start trace receiver: %v", err) // TODO(songy23): make starting server deterministic @@ -165,12 +176,18 @@ func TestTraceGrpcGatewayCors_endToEnd(t *testing.T) { func TestMetricsGrpcGatewayCors_endToEnd(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) corsOrigins := []string{"allowed-*.com"} - - ocr, err := newOpenCensusReceiver("tcp", addr, nil, consumertest.NewNop(), receivertest.NewNopCreateSettings(), withCorsOrigins(corsOrigins)) - require.NoError(t, err, "Failed to create metrics receiver: %v", err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + ocr := newOpenCensusReceiver(cfg, nil, consumertest.NewNop(), receivertest.NewNopCreateSettings(), withCorsOrigins(corsOrigins)) t.Cleanup(func() { require.NoError(t, ocr.Shutdown(context.Background())) }) - err = ocr.Start(context.Background(), componenttest.NewNopHost()) + err := ocr.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err, "Failed to start metrics receiver: %v", err) // TODO(songy23): make starting server deterministic @@ -193,6 +210,7 @@ func verifyCorsResp(t *testing.T, url string, origin string, wantStatus int, wan req.Header.Set("Access-Control-Request-Method", "POST") client := &http.Client{} + defer client.CloseIdleConnections() resp, err := client.Do(req) require.NoError(t, err, "Error sending OPTIONS to grpc-gateway server: %v", err) @@ -219,8 +237,15 @@ func verifyCorsResp(t *testing.T, url string, origin string, wantStatus int, wan func TestStopWithoutStartNeverCrashes(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - ocr, err := newOpenCensusReceiver("tcp", addr, nil, nil, receivertest.NewNopCreateSettings()) - require.NoError(t, err, "Failed to create an OpenCensus receiver: %v", err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + ocr := newOpenCensusReceiver(cfg, nil, nil, receivertest.NewNopCreateSettings()) // Stop it before ever invoking Start*. require.NoError(t, ocr.Shutdown(context.Background())) } @@ -230,16 +255,31 @@ func TestNewPortAlreadyUsed(t *testing.T) { ln, err := net.Listen("tcp", addr) require.NoError(t, err, "failed to listen on %q: %v", addr, err) defer ln.Close() - - r, err := newOpenCensusReceiver("tcp", addr, nil, nil, receivertest.NewNopCreateSettings()) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + r := newOpenCensusReceiver(cfg, nil, nil, receivertest.NewNopCreateSettings()) + err = r.Start(context.Background(), componenttest.NewNopHost()) require.Error(t, err) - require.Nil(t, r) + require.NoError(t, r.Shutdown(context.Background())) } func TestMultipleStopReceptionShouldNotError(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r, err := newOpenCensusReceiver("tcp", addr, consumertest.NewNop(), consumertest.NewNop(), receivertest.NewNopCreateSettings()) - require.NoError(t, err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + r := newOpenCensusReceiver(cfg, consumertest.NewNop(), consumertest.NewNop(), receivertest.NewNopCreateSettings()) require.NotNil(t, r) require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -248,8 +288,15 @@ func TestMultipleStopReceptionShouldNotError(t *testing.T) { func TestStartWithoutConsumersShouldFail(t *testing.T) { addr := testutil.GetAvailableLocalAddress(t) - r, err := newOpenCensusReceiver("tcp", addr, nil, nil, receivertest.NewNopCreateSettings()) - require.NoError(t, err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + r := newOpenCensusReceiver(cfg, nil, nil, receivertest.NewNopCreateSettings()) require.NotNil(t, r) require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) @@ -267,15 +314,19 @@ func tempSocketName(t *testing.T) string { func TestReceiveOnUnixDomainSocket_endToEnd(t *testing.T) { socketName := tempSocketName(t) cbts := consumertest.NewNop() - r, err := newOpenCensusReceiver("unix", socketName, cbts, nil, receivertest.NewNopCreateSettings()) - require.NoError(t, err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: socketName, + Transport: "unix", + }, + }, + } + r := newOpenCensusReceiver(cfg, cbts, nil, receivertest.NewNopCreateSettings()) require.NotNil(t, r) require.NoError(t, r.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { require.NoError(t, r.Shutdown(context.Background())) }) - // Wait for the servers to start - <-time.After(10 * time.Millisecond) - span := ` { "node": { @@ -302,10 +353,12 @@ func TestReceiveOnUnixDomainSocket_endToEnd(t *testing.T) { }, }, } + defer c.CloseIdleConnections() response, err := c.Post("http://unix/v1/trace", "application/json", strings.NewReader(span)) require.NoError(t, err) - defer response.Body.Close() + _, _ = io.ReadAll(response.Body) + require.NoError(t, response.Body.Close()) require.Equal(t, 200, response.StatusCode) } @@ -413,8 +466,15 @@ func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) { sink := &errOrSinkConsumer{TracesSink: new(consumertest.TracesSink)} var opts []ocOption - ocr, err := newOpenCensusReceiver("tcp", addr, nil, nil, receiver.CreateSettings{ID: exporter.receiverID, TelemetrySettings: testTel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, opts...) - require.NoError(t, err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + ocr := newOpenCensusReceiver(cfg, nil, nil, receiver.CreateSettings{ID: exporter.receiverID, TelemetrySettings: testTel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, opts...) require.NotNil(t, ocr) ocr.traceConsumer = sink @@ -564,8 +624,15 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { sink := &errOrSinkConsumer{MetricsSink: new(consumertest.MetricsSink)} var opts []ocOption - ocr, err := newOpenCensusReceiver("tcp", addr, nil, nil, receiver.CreateSettings{ID: exporter.receiverID, TelemetrySettings: testTel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, opts...) - require.NoError(t, err) + cfg := &Config{ + ServerConfig: configgrpc.ServerConfig{ + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, + }, + } + ocr := newOpenCensusReceiver(cfg, nil, nil, receiver.CreateSettings{ID: exporter.receiverID, TelemetrySettings: testTel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, opts...) require.NotNil(t, ocr) ocr.metricsConsumer = sink @@ -600,24 +667,27 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { } func TestInvalidTLSCredentials(t *testing.T) { + addr := testutil.GetAvailableLocalAddress(t) cfg := Config{ ServerConfig: configgrpc.ServerConfig{ - TLSSetting: &configtls.TLSServerSetting{ - TLSSetting: configtls.TLSSetting{ + TLSSetting: &configtls.ServerConfig{ + TLSSetting: configtls.Config{ CertFile: "willfail", }, }, + NetAddr: confignet.AddrConfig{ + Endpoint: addr, + Transport: "tcp", + }, }, } opt := cfg.buildOptions() assert.NotNil(t, opt) - addr := testutil.GetAvailableLocalAddress(t) - ocr, err := newOpenCensusReceiver("tcp", addr, nil, nil, receivertest.NewNopCreateSettings(), opt...) - assert.NoError(t, err) + ocr := newOpenCensusReceiver(&cfg, nil, nil, receivertest.NewNopCreateSettings(), opt...) assert.NotNil(t, ocr) - srv, err := ocr.grpcServer(componenttest.NewNopHost()) + srv, err := ocr.grpcServerSettings.ToServerContext(context.Background(), componenttest.NewNopHost(), ocr.settings.TelemetrySettings) assert.EqualError(t, err, `failed to load TLS config: failed to load TLS cert and key: for auth via TLS, provide both certificate and key, or neither`) assert.Nil(t, srv) } From 455e5c3d3a1a4419fe10a3ba2cc38789fcea62c0 Mon Sep 17 00:00:00 2001 From: Antoine Toulme Date: Fri, 15 Mar 2024 08:58:32 -0700 Subject: [PATCH 2/2] remove empty tests section --- receiver/opencensusreceiver/metadata.yaml | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/receiver/opencensusreceiver/metadata.yaml b/receiver/opencensusreceiver/metadata.yaml index 9df6cb29366fb..49a52b22d5516 100644 --- a/receiver/opencensusreceiver/metadata.yaml +++ b/receiver/opencensusreceiver/metadata.yaml @@ -13,6 +13,4 @@ status: - redhat - sumo codeowners: - active: [open-telemetry/collector-approvers] -tests: - config: \ No newline at end of file + active: [open-telemetry/collector-approvers] \ No newline at end of file