diff --git a/CHANGELOG.md b/CHANGELOG.md index 347742c0b11..d1823333836 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Changed - Move the OpenCensus example into `example` directory. (#1359) +- `NewExporter` and `Start` functions in `go.opentelemetry.io/otel/exporters/otlp` now receive `context.Context` as a first parameter. (#1357) ## [0.14.0] - 2020-11-19 diff --git a/example/otel-collector/main.go b/example/otel-collector/main.go index c357209a3ce..a2452f87485 100644 --- a/example/otel-collector/main.go +++ b/example/otel-collector/main.go @@ -49,7 +49,7 @@ func initProvider() func() { // `localhost:30080` address. Otherwise, replace `localhost` with the // address of your cluster. If you run the app inside k8s, then you can // probably connect directly to the service through dns - exp, err := otlp.NewExporter( + exp, err := otlp.NewExporter(ctx, otlp.WithInsecure(), otlp.WithAddress("localhost:30080"), otlp.WithGRPCDialOption(grpc.WithBlock()), // useful for testing diff --git a/exporters/otlp/alignment_test.go b/exporters/otlp/alignment_test.go index f20a0bd1b2d..276625637a1 100644 --- a/exporters/otlp/alignment_test.go +++ b/exporters/otlp/alignment_test.go @@ -26,8 +26,8 @@ import ( func TestMain(m *testing.M) { fields := []ottest.FieldOffset{ { - Name: "Exporter.lastConnectErrPtr", - Offset: unsafe.Offsetof(Exporter{}.lastConnectErrPtr), + Name: "grpcConnection.lastConnectErrPtr", + Offset: unsafe.Offsetof(grpcConnection{}.lastConnectErrPtr), }, } if !ottest.Aligned8Byte(fields, os.Stderr) { diff --git a/exporters/otlp/connection.go b/exporters/otlp/connection.go index 48c799c80e2..283d6d42d55 100644 --- a/exporters/otlp/connection.go +++ b/exporters/otlp/connection.go @@ -15,52 +15,113 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( + "context" + "fmt" "math/rand" + "sync" "sync/atomic" "time" "unsafe" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) -func (e *Exporter) lastConnectError() error { - errPtr := (*error)(atomic.LoadPointer(&e.lastConnectErrPtr)) +type grpcConnection struct { + // Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. + lastConnectErrPtr unsafe.Pointer + + // mu protects the connection as it is accessed by the + // exporter goroutines and background connection goroutine + mu sync.Mutex + cc *grpc.ClientConn + + // these fields are read-only after constructor is finished + c config + metadata metadata.MD + newConnectionHandler func(cc *grpc.ClientConn) error + + // these channels are created once + disconnectedCh chan bool + backgroundConnectionDoneCh chan struct{} + stopCh chan struct{} + + // this is for tests, so they can replace the closing + // routine without a worry of modifying some global variable + // or changing it back to original after the test is done + closeBackgroundConnectionDoneCh func(ch chan struct{}) +} + +func newGRPCConnection(c config, handler func(cc *grpc.ClientConn) error) *grpcConnection { + conn := new(grpcConnection) + conn.newConnectionHandler = handler + if c.collectorAddr == "" { + c.collectorAddr = fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) + } + conn.c = c + if len(conn.c.headers) > 0 { + conn.metadata = metadata.New(conn.c.headers) + } + conn.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { + close(ch) + } + return conn +} + +func (oc *grpcConnection) startConnection(ctx context.Context) { + oc.stopCh = make(chan struct{}) + oc.disconnectedCh = make(chan bool) + oc.backgroundConnectionDoneCh = make(chan struct{}) + + if err := oc.connect(ctx); err == nil { + oc.setStateConnected() + } else { + oc.setStateDisconnected(err) + } + go oc.indefiniteBackgroundConnection() +} + +func (oc *grpcConnection) lastConnectError() error { + errPtr := (*error)(atomic.LoadPointer(&oc.lastConnectErrPtr)) if errPtr == nil { return nil } return *errPtr } -func (e *Exporter) saveLastConnectError(err error) { +func (oc *grpcConnection) saveLastConnectError(err error) { var errPtr *error if err != nil { errPtr = &err } - atomic.StorePointer(&e.lastConnectErrPtr, unsafe.Pointer(errPtr)) + atomic.StorePointer(&oc.lastConnectErrPtr, unsafe.Pointer(errPtr)) } -func (e *Exporter) setStateDisconnected(err error) { - e.saveLastConnectError(err) +func (oc *grpcConnection) setStateDisconnected(err error) { + oc.saveLastConnectError(err) select { - case e.disconnectedCh <- true: + case oc.disconnectedCh <- true: default: } + _ = oc.newConnectionHandler(nil) } -func (e *Exporter) setStateConnected() { - e.saveLastConnectError(nil) +func (oc *grpcConnection) setStateConnected() { + oc.saveLastConnectError(nil) } -func (e *Exporter) connected() bool { - return e.lastConnectError() == nil +func (oc *grpcConnection) connected() bool { + return oc.lastConnectError() == nil } const defaultConnReattemptPeriod = 10 * time.Second -func (e *Exporter) indefiniteBackgroundConnection() { +func (oc *grpcConnection) indefiniteBackgroundConnection() { defer func() { - e.backgroundConnectionDoneCh <- true + oc.closeBackgroundConnectionDoneCh(oc.backgroundConnectionDoneCh) }() - connReattemptPeriod := e.c.reconnectionPeriod + connReattemptPeriod := oc.c.reconnectionPeriod if connReattemptPeriod <= 0 { connReattemptPeriod = defaultConnReattemptPeriod } @@ -79,17 +140,26 @@ func (e *Exporter) indefiniteBackgroundConnection() { // 2. Otherwise block until we are disconnected, and // then retry connecting select { - case <-e.stopCh: + case <-oc.stopCh: return - case <-e.disconnectedCh: + case <-oc.disconnectedCh: + // Quickly check if we haven't stopped at the + // same time. + select { + case <-oc.stopCh: + return + + default: + } + // Normal scenario that we'll wait for } - if err := e.connect(); err == nil { - e.setStateConnected() + if err := oc.connect(context.Background()); err == nil { + oc.setStateConnected() } else { - e.setStateDisconnected(err) + oc.setStateDisconnected(err) } // Apply some jitter to avoid lockstep retrials of other @@ -97,17 +167,110 @@ func (e *Exporter) indefiniteBackgroundConnection() { // innocent DDOS, by clogging the machine's resources and network. jitter := time.Duration(rng.Int63n(maxJitterNanos)) select { - case <-e.stopCh: + case <-oc.stopCh: return case <-time.After(connReattemptPeriod + jitter): } } } -func (e *Exporter) connect() error { - cc, err := e.dialToCollector() +func (oc *grpcConnection) connect(ctx context.Context) error { + cc, err := oc.dialToCollector(ctx) if err != nil { return err } - return e.enableConnections(cc) + oc.setConnection(cc) + return oc.newConnectionHandler(cc) +} + +// setConnection sets cc as the client connection and returns true if +// the connection state changed. +func (oc *grpcConnection) setConnection(cc *grpc.ClientConn) bool { + oc.mu.Lock() + defer oc.mu.Unlock() + + // If previous clientConn is same as the current then just return. + // This doesn't happen right now as this func is only called with new ClientConn. + // It is more about future-proofing. + if oc.cc == cc { + return false + } + + // If the previous clientConn was non-nil, close it + if oc.cc != nil { + _ = oc.cc.Close() + } + oc.cc = cc + return true +} + +func (oc *grpcConnection) dialToCollector(ctx context.Context) (*grpc.ClientConn, error) { + addr := oc.c.collectorAddr + + dialOpts := []grpc.DialOption{} + if oc.c.grpcServiceConfig != "" { + dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(oc.c.grpcServiceConfig)) + } + if oc.c.clientCredentials != nil { + dialOpts = append(dialOpts, grpc.WithTransportCredentials(oc.c.clientCredentials)) + } else if oc.c.canDialInsecure { + dialOpts = append(dialOpts, grpc.WithInsecure()) + } + if oc.c.compressor != "" { + dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(oc.c.compressor))) + } + if len(oc.c.grpcDialOptions) != 0 { + dialOpts = append(dialOpts, oc.c.grpcDialOptions...) + } + + ctx, cancel := oc.contextWithStop(ctx) + defer cancel() + ctx = oc.contextWithMetadata(ctx) + return grpc.DialContext(ctx, addr, dialOpts...) +} + +func (oc *grpcConnection) contextWithMetadata(ctx context.Context) context.Context { + if oc.metadata.Len() > 0 { + return metadata.NewOutgoingContext(ctx, oc.metadata) + } + return ctx +} + +func (oc *grpcConnection) shutdown(ctx context.Context) error { + close(oc.stopCh) + // Ensure that the backgroundConnector returns + select { + case <-oc.backgroundConnectionDoneCh: + case <-ctx.Done(): + return ctx.Err() + } + + close(oc.disconnectedCh) + + oc.mu.Lock() + cc := oc.cc + oc.cc = nil + oc.mu.Unlock() + + if cc != nil { + return cc.Close() + } + + return nil +} + +func (oc *grpcConnection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { + // Unify the parent context Done signal with the connection's + // stop channel. + ctx, cancel := context.WithCancel(ctx) + go func(ctx context.Context, cancel context.CancelFunc) { + select { + case <-ctx.Done(): + // Nothing to do, either cancelled or deadline + // happened. + case <-oc.stopCh: + cancel() + } + }(ctx, cancel) + return ctx, cancel } diff --git a/exporters/otlp/example_test.go b/exporters/otlp/example_test.go index cf3a8eb0103..a34811e25e2 100644 --- a/exporters/otlp/example_test.go +++ b/exporters/otlp/example_test.go @@ -28,12 +28,13 @@ import ( ) func Example_insecure() { - exp, err := otlp.NewExporter(otlp.WithInsecure()) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, otlp.WithInsecure()) if err != nil { log.Fatalf("Failed to create the collector exporter: %v", err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { otel.Handle(err) @@ -54,7 +55,7 @@ func Example_insecure() { tracer := otel.Tracer("test-tracer") // Then use the OpenTelemetry tracing library, like we normally would. - ctx, span := tracer.Start(context.Background(), "CollectorExporter-Example") + ctx, span := tracer.Start(ctx, "CollectorExporter-Example") defer span.End() for i := 0; i < 10; i++ { @@ -72,12 +73,13 @@ func Example_withTLS() { log.Fatalf("failed to create gRPC client TLS credentials: %v", err) } - exp, err := otlp.NewExporter(otlp.WithTLSCredentials(creds)) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, otlp.WithTLSCredentials(creds)) if err != nil { log.Fatalf("failed to create the collector exporter: %v", err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { otel.Handle(err) @@ -98,7 +100,7 @@ func Example_withTLS() { tracer := otel.Tracer("test-tracer") // Then use the OpenTelemetry tracing library, like we normally would. - ctx, span := tracer.Start(context.Background(), "Securely-Talking-To-Collector-Span") + ctx, span := tracer.Start(ctx, "Securely-Talking-To-Collector-Span") defer span.End() for i := 0; i < 10; i++ { diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 16754394ce9..f8c1c5b0e76 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -20,12 +20,9 @@ package otlp // import "go.opentelemetry.io/otel/exporters/otlp" import ( "context" "errors" - "fmt" "sync" - "unsafe" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" colmetricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/metrics/v1" coltracepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/collector/trace/v1" @@ -43,22 +40,16 @@ type Exporter struct { // mu protects the non-atomic and non-channel variables mu sync.RWMutex // senderMu protects the concurrent unsafe sends on the shared gRPC client connection. - senderMu sync.Mutex - started bool - traceExporter coltracepb.TraceServiceClient - metricExporter colmetricpb.MetricsServiceClient - grpcClientConn *grpc.ClientConn - lastConnectErrPtr unsafe.Pointer - - startOnce sync.Once - stopOnce sync.Once - stopCh chan struct{} - disconnectedCh chan bool - - backgroundConnectionDoneCh chan bool - - c config - metadata metadata.MD + senderMu sync.Mutex + started bool + traceExporter coltracepb.TraceServiceClient + metricExporter colmetricpb.MetricsServiceClient + cc *grpcConnection + + startOnce sync.Once + stopOnce sync.Once + + exportKindSelector metricsdk.ExportKindSelector } var _ tracesdk.SpanExporter = (*Exporter)(nil) @@ -82,9 +73,9 @@ func newConfig(opts ...ExporterOption) config { } // NewExporter constructs a new Exporter and starts it. -func NewExporter(opts ...ExporterOption) (*Exporter, error) { +func NewExporter(ctx context.Context, opts ...ExporterOption) (*Exporter, error) { exp := NewUnstartedExporter(opts...) - if err := exp.Start(); err != nil { + if err := exp.Start(ctx); err != nil { return nil, err } return exp, nil @@ -93,19 +84,29 @@ func NewExporter(opts ...ExporterOption) (*Exporter, error) { // NewUnstartedExporter constructs a new Exporter and does not start it. func NewUnstartedExporter(opts ...ExporterOption) *Exporter { e := new(Exporter) - e.c = newConfig(opts...) - if len(e.c.headers) > 0 { - e.metadata = metadata.New(e.c.headers) - } + cfg := newConfig(opts...) + e.exportKindSelector = cfg.exportKindSelector + e.cc = newGRPCConnection(cfg, e.handleNewConnection) return e } +func (e *Exporter) handleNewConnection(cc *grpc.ClientConn) error { + e.mu.Lock() + defer e.mu.Unlock() + if cc != nil { + e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) + e.traceExporter = coltracepb.NewTraceServiceClient(cc) + } else { + e.metricExporter = nil + e.traceExporter = nil + } + return nil +} + var ( - errAlreadyStarted = errors.New("already started") - errNotStarted = errors.New("not started") - errDisconnected = errors.New("exporter disconnected") - errStopped = errors.New("exporter stopped") - errContextCanceled = errors.New("context canceled") + errNoClient = errors.New("no client") + errAlreadyStarted = errors.New("already started") + errDisconnected = errors.New("exporter disconnected") ) // Start dials to the collector, establishing a connection to it. It also @@ -113,108 +114,25 @@ var ( // messages that consist of the node identifier. Start invokes a background // connector that will reattempt connections to the collector periodically // if the connection dies. -func (e *Exporter) Start() error { +func (e *Exporter) Start(ctx context.Context) error { var err = errAlreadyStarted e.startOnce.Do(func() { e.mu.Lock() e.started = true - e.disconnectedCh = make(chan bool, 1) - e.stopCh = make(chan struct{}) - e.backgroundConnectionDoneCh = make(chan bool) e.mu.Unlock() - // An optimistic first connection attempt to ensure that - // applications under heavy load can immediately process - // data. See https://github.com/census-ecosystem/opencensus-go-exporter-ocagent/pull/63 - if err := e.connect(); err == nil { - e.setStateConnected() - } else { - e.setStateDisconnected(err) - } - go e.indefiniteBackgroundConnection() - err = nil + e.cc.startConnection(ctx) }) return err } -func (e *Exporter) prepareCollectorAddress() string { - if e.c.collectorAddr != "" { - return e.c.collectorAddr - } - return fmt.Sprintf("%s:%d", DefaultCollectorHost, DefaultCollectorPort) -} - -func (e *Exporter) enableConnections(cc *grpc.ClientConn) error { - e.mu.RLock() - started := e.started - e.mu.RUnlock() - - if !started { - return errNotStarted - } - - e.mu.Lock() - // If previous clientConn is same as the current then just return. - // This doesn't happen right now as this func is only called with new ClientConn. - // It is more about future-proofing. - if e.grpcClientConn == cc { - e.mu.Unlock() - return nil - } - // If the previous clientConn was non-nil, close it - if e.grpcClientConn != nil { - _ = e.grpcClientConn.Close() - } - e.grpcClientConn = cc - e.traceExporter = coltracepb.NewTraceServiceClient(cc) - e.metricExporter = colmetricpb.NewMetricsServiceClient(cc) - e.mu.Unlock() - - return nil -} - -func (e *Exporter) contextWithMetadata(ctx context.Context) context.Context { - if e.metadata.Len() > 0 { - return metadata.NewOutgoingContext(ctx, e.metadata) - } - return ctx -} - -func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { - addr := e.prepareCollectorAddress() - - dialOpts := []grpc.DialOption{} - if e.c.grpcServiceConfig != "" { - dialOpts = append(dialOpts, grpc.WithDefaultServiceConfig(e.c.grpcServiceConfig)) - } - if e.c.clientCredentials != nil { - dialOpts = append(dialOpts, grpc.WithTransportCredentials(e.c.clientCredentials)) - } else if e.c.canDialInsecure { - dialOpts = append(dialOpts, grpc.WithInsecure()) - } - if e.c.compressor != "" { - dialOpts = append(dialOpts, grpc.WithDefaultCallOptions(grpc.UseCompressor(e.c.compressor))) - } - if len(e.c.grpcDialOptions) != 0 { - dialOpts = append(dialOpts, e.c.grpcDialOptions...) - } - - ctx := e.contextWithMetadata(context.Background()) - return grpc.DialContext(ctx, addr, dialOpts...) -} - -// closeStopCh is used to wrap the exporters stopCh channel closing for testing. -var closeStopCh = func(stopCh chan struct{}) { - close(stopCh) -} - // Shutdown closes all connections and releases resources currently being used // by the exporter. If the exporter is not started this does nothing. func (e *Exporter) Shutdown(ctx context.Context) error { e.mu.RLock() - cc := e.grpcClientConn + cc := e.cc started := e.started e.mu.RUnlock() @@ -225,23 +143,13 @@ func (e *Exporter) Shutdown(ctx context.Context) error { var err error e.stopOnce.Do(func() { - if cc != nil { - // Clean things up before checking this error. - err = cc.Close() - } + // Clean things up before checking this error. + err = cc.shutdown(ctx) // At this point we can change the state variable started e.mu.Lock() e.started = false e.mu.Unlock() - closeStopCh(e.stopCh) - - // Ensure that the backgroundConnector returns - select { - case <-e.backgroundConnectionDoneCh: - case <-ctx.Done(): - err = ctx.Err() - } }) return err @@ -251,16 +159,8 @@ func (e *Exporter) Shutdown(ctx context.Context) error { // interface. It transforms and batches metric Records into OTLP Metrics and // transmits them to the configured collector. func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) error { - // Unify the parent context Done signal with the exporter stopCh. - ctx, cancel := context.WithCancel(parent) + ctx, cancel := e.cc.contextWithStop(parent) defer cancel() - go func(ctx context.Context, cancel context.CancelFunc) { - select { - case <-ctx.Done(): - case <-e.stopCh: - cancel() - } - }(ctx, cancel) // Hardcode the number of worker goroutines to 1. We later will // need to see if there's a way to adjust that number for longer @@ -270,32 +170,31 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e return err } - if !e.connected() { + if !e.cc.connected() { return errDisconnected } - select { - case <-e.stopCh: - return errStopped - case <-ctx.Done(): - return errContextCanceled - default: + err = func() error { e.senderMu.Lock() - _, err := e.metricExporter.Export(e.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ + defer e.senderMu.Unlock() + if e.metricExporter == nil { + return errNoClient + } + _, err := e.metricExporter.Export(e.cc.contextWithMetadata(ctx), &colmetricpb.ExportMetricsServiceRequest{ ResourceMetrics: rms, }) - e.senderMu.Unlock() - if err != nil { - return err - } + return err + }() + if err != nil { + e.cc.setStateDisconnected(err) } - return nil + return err } // ExportKindFor reports back to the OpenTelemetry SDK sending this Exporter // metric telemetry that it needs to be provided in a cumulative format. func (e *Exporter) ExportKindFor(desc *metric.Descriptor, kind aggregation.Kind) metricsdk.ExportKind { - return e.c.exportKindSelector.ExportKindFor(desc, kind) + return e.exportKindSelector.ExportKindFor(desc, kind) } // ExportSpans exports a batch of SpanData. @@ -304,28 +203,31 @@ func (e *Exporter) ExportSpans(ctx context.Context, sds []*tracesdk.SpanData) er } func (e *Exporter) uploadTraces(ctx context.Context, sdl []*tracesdk.SpanData) error { - select { - case <-e.stopCh: + ctx, cancel := e.cc.contextWithStop(ctx) + defer cancel() + + if !e.cc.connected() { return nil - default: - if !e.connected() { - return nil - } + } - protoSpans := transform.SpanData(sdl) - if len(protoSpans) == 0 { - return nil - } + protoSpans := transform.SpanData(sdl) + if len(protoSpans) == 0 { + return nil + } + err := func() error { e.senderMu.Lock() - _, err := e.traceExporter.Export(e.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ + defer e.senderMu.Unlock() + if e.traceExporter == nil { + return errNoClient + } + _, err := e.traceExporter.Export(e.cc.contextWithMetadata(ctx), &coltracepb.ExportTraceServiceRequest{ ResourceSpans: protoSpans, }) - e.senderMu.Unlock() - if err != nil { - e.setStateDisconnected(err) - return err - } + return err + }() + if err != nil { + e.cc.setStateDisconnected(err) } - return nil + return err } diff --git a/exporters/otlp/otlp_integration_test.go b/exporters/otlp/otlp_integration_test.go index 70f32c46571..ced8d103558 100644 --- a/exporters/otlp/otlp_integration_test.go +++ b/exporters/otlp/otlp_integration_test.go @@ -72,12 +72,13 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) } opts = append(opts, additionalOpts...) - exp, err := otlp.NewExporter(opts...) + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, opts...) if err != nil { t.Fatalf("failed to create a new collector exporter: %v", err) } defer func() { - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithTimeout(ctx, time.Second) defer cancel() if err := exp.Shutdown(ctx); err != nil { panic(err) @@ -110,11 +111,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) // Now create few spans m := 4 for i := 0; i < m; i++ { - _, span := tr1.Start(context.Background(), "AlwaysSample") + _, span := tr1.Start(ctx, "AlwaysSample") span.SetAttributes(label.Int64("i", int64(i))) span.End() - _, span = tr2.Start(context.Background(), "AlwaysSample") + _, span = tr2.Start(ctx, "AlwaysSample") span.SetAttributes(label.Int64("i", int64(i))) span.End() } @@ -124,7 +125,6 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) pusher := push.New(processor, exp) pusher.Start() - ctx := context.Background() meter := pusher.MeterProvider().Meter("test-meter") labels := []label.KeyValue{label.Bool("test", true)} @@ -190,7 +190,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.ExporterOption) <-time.After(40 * time.Millisecond) // Now shutdown the exporter - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to stop the exporter: %v", err) @@ -307,31 +307,33 @@ func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) { _ = mc.stop() }() - exp, err := otlp.NewExporter(otlp.WithInsecure(), + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, + otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address)) if err != nil { t.Fatalf("error creating exporter: %v", err) } defer func() { - if err := exp.Shutdown(context.Background()); err != nil { + if err := exp.Shutdown(ctx); err != nil { panic(err) } }() // Invoke Start numerous times, should return errAlreadyStarted for i := 0; i < 10; i++ { - if err := exp.Start(); err == nil || !strings.Contains(err.Error(), "already started") { + if err := exp.Start(ctx); err == nil || !strings.Contains(err.Error(), "already started") { t.Fatalf("#%d unexpected Start error: %v", i, err) } } - if err := exp.Shutdown(context.Background()); err != nil { + if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to Shutdown the exporter: %v", err) } // Invoke Shutdown numerous times for i := 0; i < 10; i++ { - if err := exp.Shutdown(context.Background()); err != nil { + if err := exp.Shutdown(ctx); err != nil { t.Fatalf(`#%d got error (%v) expected none`, i, err) } } @@ -341,14 +343,16 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { mc := runMockCol(t) reconnectionPeriod := 20 * time.Millisecond - exp, err := otlp.NewExporter(otlp.WithInsecure(), + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, + otlp.WithInsecure(), otlp.WithAddress(mc.address), otlp.WithReconnectionPeriod(reconnectionPeriod)) if err != nil { t.Fatalf("Unexpected error: %v", err) } defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() // We'll now stop the collector right away to simulate a connection @@ -363,7 +367,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { // No endpoint up. require.Error( t, - exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}}), + exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "in the midst"}}), "transport: Error while dialing dial tcp %s: connect: connection refused", mc.address, ) @@ -377,7 +381,7 @@ func TestNewExporter_collectorConnectionDiesThenReconnects(t *testing.T) { n := 10 for i := 0; i < n; i++ { - require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "Resurrected"}})) + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "Resurrected"}})) } nmaSpans := nmc.getSpans() @@ -412,13 +416,15 @@ func TestNewExporter_collectorOnBadConnection(t *testing.T) { _, collectorPortStr, _ := net.SplitHostPort(ln.Addr().String()) address := fmt.Sprintf("localhost:%s", collectorPortStr) - exp, err := otlp.NewExporter(otlp.WithInsecure(), + ctx := context.Background() + exp, err := otlp.NewExporter(ctx, + otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(address)) if err != nil { t.Fatalf("Despite an indefinite background reconnection, got error: %v", err) } - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) } func TestNewExporter_withAddress(t *testing.T) { @@ -432,11 +438,12 @@ func TestNewExporter_withAddress(t *testing.T) { otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address)) + ctx := context.Background() defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() - if err := exp.Start(); err != nil { + if err := exp.Start(ctx); err != nil { t.Fatalf("Unexpected Start error: %v", err) } } @@ -447,16 +454,17 @@ func TestNewExporter_withHeaders(t *testing.T) { _ = mc.stop() }() - exp, _ := otlp.NewExporter( + ctx := context.Background() + exp, _ := otlp.NewExporter(ctx, otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address), otlp.WithHeaders(map[string]string{"header1": "value1"}), ) - require.NoError(t, exp.ExportSpans(context.Background(), []*exporttrace.SpanData{{Name: "in the midst"}})) + require.NoError(t, exp.ExportSpans(ctx, []*exporttrace.SpanData{{Name: "in the midst"}})) defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() headers := mc.getHeaders() @@ -473,14 +481,15 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { <-time.After(5 * time.Millisecond) - exp, _ := otlp.NewExporter( + ctx := context.Background() + exp, _ := otlp.NewExporter(ctx, otlp.WithInsecure(), otlp.WithReconnectionPeriod(50*time.Millisecond), otlp.WithAddress(mc.address), ) defer func() { - _ = exp.Shutdown(context.Background()) + _ = exp.Shutdown(ctx) }() tp := sdktrace.NewTracerProvider( @@ -492,7 +501,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { sdktrace.WithMaxExportBatchSize(10), ), ) - defer func() { _ = tp.Shutdown(context.Background()) }() + defer func() { _ = tp.Shutdown(ctx) }() tr := tp.Tracer("test-tracer") testKvs := []label.KeyValue{ @@ -504,7 +513,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { label.Bool("Bool", true), label.String("String", "test"), } - _, span := tr.Start(context.Background(), "AlwaysSample") + _, span := tr.Start(ctx, "AlwaysSample") span.SetAttributes(testKvs...) span.End() @@ -520,7 +529,7 @@ func TestNewExporter_withMultipleAttributeTypes(t *testing.T) { <-time.After(40 * time.Millisecond) // Now shutdown the exporter - ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + ctx, cancel := context.WithTimeout(ctx, time.Millisecond) defer cancel() if err := exp.Shutdown(ctx); err != nil { t.Fatalf("failed to stop the exporter: %v", err) diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index 5c2e0e655bf..afcb28ddfea 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -23,21 +23,18 @@ import ( ) func TestExporterShutdownHonorsTimeout(t *testing.T) { - orig := closeStopCh ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer func() { - cancel() - closeStopCh = orig - }() - closeStopCh = func(stopCh chan struct{}) { + defer cancel() + + e := NewUnstartedExporter() + orig := e.cc.closeBackgroundConnectionDoneCh + e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { go func() { <-ctx.Done() - close(stopCh) + orig(ch) }() } - - e := NewUnstartedExporter() - if err := e.Start(); err != nil { + if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -51,21 +48,18 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { } func TestExporterShutdownHonorsCancel(t *testing.T) { - orig := closeStopCh ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) - defer func() { - cancel() - closeStopCh = orig - }() - closeStopCh = func(stopCh chan struct{}) { + defer cancel() + + e := NewUnstartedExporter() + orig := e.cc.closeBackgroundConnectionDoneCh + e.cc.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { go func() { <-ctx.Done() - close(stopCh) + orig(ch) }() } - - e := NewUnstartedExporter() - if err := e.Start(); err != nil { + if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -84,7 +78,7 @@ func TestExporterShutdownNoError(t *testing.T) { defer cancel() e := NewUnstartedExporter() - if err := e.Start(); err != nil { + if err := e.Start(ctx); err != nil { t.Fatalf("failed to start exporter: %v", err) } @@ -95,7 +89,7 @@ func TestExporterShutdownNoError(t *testing.T) { func TestExporterShutdownManyTimes(t *testing.T) { ctx := context.Background() - e, err := NewExporter() + e, err := NewExporter(ctx) if err != nil { t.Fatalf("failed to start an exporter: %v", err) }