diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c8988e455f..7277742e8d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `AddEventWithTimestamp` method on the `Span` interface in `go.opentelemetry.io/otel` is removed due to its redundancy. It is replaced by using the `AddEvent` method with a `WithTimestamp` option. (#1254) - Structs `MockSpan` and `MockTracer` are removed from `go.opentelemetry.io/otel/oteltest`. `Tracer` and `Span` from the same module should be used in their place instead. (#1306) +- `WorkerCount` option is removed from `go.opentelemetry.io/otel/exporters/otlp`. (#1350) ### Fixed diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go index 737f56b8c08..3fdb9b64d92 100644 --- a/exporters/otlp/options.go +++ b/exporters/otlp/options.go @@ -30,10 +30,6 @@ const ( // DefaultCollectorHost is the host address the Exporter will attempt // connect to if no collector address is provided. DefaultCollectorHost string = "localhost" - // DefaultNumWorkers is the number of goroutines the Exporter will use when - // processing telemetry. - DefaultNumWorkers uint = 1 - // DefaultGRPCServiceConfig is the gRPC service config used if none is // provided by the user. // @@ -84,20 +80,9 @@ type config struct { grpcDialOptions []grpc.DialOption headers map[string]string clientCredentials credentials.TransportCredentials - numWorkers uint exportKindSelector metricsdk.ExportKindSelector } -// WorkerCount sets the number of Goroutines to use when processing telemetry. -func WorkerCount(n uint) ExporterOption { - if n == 0 { - n = DefaultNumWorkers - } - return func(cfg *config) { - cfg.numWorkers = n - } -} - // WithInsecure disables client transport security for the exporter's gRPC connection // just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure // does. Note, by default, client security is required unless WithInsecure is used. diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 8c60aeb8e5f..16754394ce9 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -51,7 +51,8 @@ type Exporter struct { lastConnectErrPtr unsafe.Pointer startOnce sync.Once - stopCh chan bool + stopOnce sync.Once + stopCh chan struct{} disconnectedCh chan bool backgroundConnectionDoneCh chan bool @@ -67,7 +68,6 @@ var _ metricsdk.Exporter = (*Exporter)(nil) // any ExporterOptions provided. func newConfig(opts ...ExporterOption) config { cfg := config{ - numWorkers: DefaultNumWorkers, grpcServiceConfig: DefaultGRPCServiceConfig, // Note: the default ExportKindSelector is specified @@ -119,7 +119,7 @@ func (e *Exporter) Start() error { e.mu.Lock() e.started = true e.disconnectedCh = make(chan bool, 1) - e.stopCh = make(chan bool) + e.stopCh = make(chan struct{}) e.backgroundConnectionDoneCh = make(chan bool) e.mu.Unlock() @@ -206,7 +206,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { } // closeStopCh is used to wrap the exporters stopCh channel closing for testing. -var closeStopCh = func(stopCh chan bool) { +var closeStopCh = func(stopCh chan struct{}) { close(stopCh) } @@ -223,23 +223,26 @@ func (e *Exporter) Shutdown(ctx context.Context) error { } var err error - if cc != nil { - // Clean things up before checking this error. - err = cc.Close() - } - // At this point we can change the state variable started - e.mu.Lock() - e.started = false - e.mu.Unlock() - closeStopCh(e.stopCh) + e.stopOnce.Do(func() { + if cc != nil { + // Clean things up before checking this error. + err = cc.Close() + } - // Ensure that the backgroundConnector returns - select { - case <-e.backgroundConnectionDoneCh: - case <-ctx.Done(): - return ctx.Err() - } + // At this point we can change the state variable started + e.mu.Lock() + e.started = false + e.mu.Unlock() + closeStopCh(e.stopCh) + + // Ensure that the backgroundConnector returns + select { + case <-e.backgroundConnectionDoneCh: + case <-ctx.Done(): + err = ctx.Err() + } + }) return err } @@ -259,7 +262,10 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e } }(ctx, cancel) - rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers) + // Hardcode the number of worker goroutines to 1. We later will + // need to see if there's a way to adjust that number for longer + // running operations. + rms, err := transform.CheckpointSet(ctx, e, cps, 1) if err != nil { return err } diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index c4ebca4c905..120b88ad8eb 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -767,15 +767,7 @@ func TestStatelessExportKind(t *testing.T) { // What works single-threaded should work multi-threaded func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { - t.Run("1 goroutine", func(t *testing.T) { - runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(1))...), rs, expected) - }) - t.Run("20 goroutines", func(t *testing.T) { - runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(20))...), rs, expected) - }) -} - -func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []metricpb.ResourceMetrics) { + exp := NewUnstartedExporter(opts...) msc := &metricsServiceClientStub{} exp.metricExporter = msc exp.started = true diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index d2e808ebf65..5c2e0e655bf 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -17,6 +17,7 @@ package otlp import ( "context" "errors" + "sync" "testing" "time" ) @@ -28,7 +29,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { cancel() closeStopCh = orig }() - closeStopCh = func(stopCh chan bool) { + closeStopCh = func(stopCh chan struct{}) { go func() { <-ctx.Done() close(stopCh) @@ -56,7 +57,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) { cancel() closeStopCh = orig }() - closeStopCh = func(stopCh chan bool) { + closeStopCh = func(stopCh chan struct{}) { go func() { <-ctx.Done() close(stopCh) @@ -91,3 +92,30 @@ func TestExporterShutdownNoError(t *testing.T) { t.Errorf("shutdown errored: expected nil, got %v", err) } } + +func TestExporterShutdownManyTimes(t *testing.T) { + ctx := context.Background() + e, err := NewExporter() + if err != nil { + t.Fatalf("failed to start an exporter: %v", err) + } + ch := make(chan struct{}) + wg := sync.WaitGroup{} + const num int = 20 + wg.Add(num) + errs := make([]error, num) + for i := 0; i < num; i++ { + go func(idx int) { + defer wg.Done() + <-ch + errs[idx] = e.Shutdown(ctx) + }(i) + } + close(ch) + wg.Wait() + for _, err := range errs { + if err != nil { + t.Fatalf("failed to shutdown exporter: %v", err) + } + } +}