From 6eb68013b5afc4b68f38e6a8e01d4b170dcf1476 Mon Sep 17 00:00:00 2001 From: Krzesimir Nowak Date: Fri, 20 Nov 2020 03:03:25 +0100 Subject: [PATCH] Some cleanups in otlp exporter (#1350) * Drop WorkerCount option This is not a good option - the user isn't likely to know how many worker goroutines is optimal. This should be something that an exporter should figure out itself. The second problem with the option is that it is specific to the metric transformation from SDK export format into protobuf. When the exporter starts supporting other protocols (HTTP/JSON for example), this option may be of no use. So the option should rather belong to the protocol, not to the exporter. Currently both mean the same, but later they will be separated, and this option breaks the separation. * Make stop channel a typical signalling channel Signalling channels are idiomatically defined as chan struct{}, so let's make it so, to avoid confusion about the meaning of the bool type. * Close a race when grpc connection is closed multiple times If several goroutines call Shutdown at the same time, then the following scenario is possible: goroutine A locks a mutex, reads a started member, unlocks the mutex and gets preempted goroutine B locks a mutex, reads a started member, unlocks the mutex and gets preempted goroutine A does not return early in the "if !started" conditional and continues to close the connection and execute the rest of the function (where it finally sets the started member to false), gets preempted goroutine B also does not return early, because it got a copy of started before goroutine A set it to false, so it tries to close the connection again. * Update changelog --- CHANGELOG.md | 1 + exporters/otlp/options.go | 15 ---------- exporters/otlp/otlp.go | 46 +++++++++++++++++------------- exporters/otlp/otlp_metric_test.go | 10 +------ exporters/otlp/otlp_test.go | 32 +++++++++++++++++++-- 5 files changed, 58 insertions(+), 46 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c8988e455f..7277742e8d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - The `AddEventWithTimestamp` method on the `Span` interface in `go.opentelemetry.io/otel` is removed due to its redundancy. It is replaced by using the `AddEvent` method with a `WithTimestamp` option. (#1254) - Structs `MockSpan` and `MockTracer` are removed from `go.opentelemetry.io/otel/oteltest`. `Tracer` and `Span` from the same module should be used in their place instead. (#1306) +- `WorkerCount` option is removed from `go.opentelemetry.io/otel/exporters/otlp`. (#1350) ### Fixed diff --git a/exporters/otlp/options.go b/exporters/otlp/options.go index 737f56b8c08..3fdb9b64d92 100644 --- a/exporters/otlp/options.go +++ b/exporters/otlp/options.go @@ -30,10 +30,6 @@ const ( // DefaultCollectorHost is the host address the Exporter will attempt // connect to if no collector address is provided. DefaultCollectorHost string = "localhost" - // DefaultNumWorkers is the number of goroutines the Exporter will use when - // processing telemetry. - DefaultNumWorkers uint = 1 - // DefaultGRPCServiceConfig is the gRPC service config used if none is // provided by the user. // @@ -84,20 +80,9 @@ type config struct { grpcDialOptions []grpc.DialOption headers map[string]string clientCredentials credentials.TransportCredentials - numWorkers uint exportKindSelector metricsdk.ExportKindSelector } -// WorkerCount sets the number of Goroutines to use when processing telemetry. -func WorkerCount(n uint) ExporterOption { - if n == 0 { - n = DefaultNumWorkers - } - return func(cfg *config) { - cfg.numWorkers = n - } -} - // WithInsecure disables client transport security for the exporter's gRPC connection // just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure // does. Note, by default, client security is required unless WithInsecure is used. diff --git a/exporters/otlp/otlp.go b/exporters/otlp/otlp.go index 8c60aeb8e5f..16754394ce9 100644 --- a/exporters/otlp/otlp.go +++ b/exporters/otlp/otlp.go @@ -51,7 +51,8 @@ type Exporter struct { lastConnectErrPtr unsafe.Pointer startOnce sync.Once - stopCh chan bool + stopOnce sync.Once + stopCh chan struct{} disconnectedCh chan bool backgroundConnectionDoneCh chan bool @@ -67,7 +68,6 @@ var _ metricsdk.Exporter = (*Exporter)(nil) // any ExporterOptions provided. func newConfig(opts ...ExporterOption) config { cfg := config{ - numWorkers: DefaultNumWorkers, grpcServiceConfig: DefaultGRPCServiceConfig, // Note: the default ExportKindSelector is specified @@ -119,7 +119,7 @@ func (e *Exporter) Start() error { e.mu.Lock() e.started = true e.disconnectedCh = make(chan bool, 1) - e.stopCh = make(chan bool) + e.stopCh = make(chan struct{}) e.backgroundConnectionDoneCh = make(chan bool) e.mu.Unlock() @@ -206,7 +206,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) { } // closeStopCh is used to wrap the exporters stopCh channel closing for testing. -var closeStopCh = func(stopCh chan bool) { +var closeStopCh = func(stopCh chan struct{}) { close(stopCh) } @@ -223,23 +223,26 @@ func (e *Exporter) Shutdown(ctx context.Context) error { } var err error - if cc != nil { - // Clean things up before checking this error. - err = cc.Close() - } - // At this point we can change the state variable started - e.mu.Lock() - e.started = false - e.mu.Unlock() - closeStopCh(e.stopCh) + e.stopOnce.Do(func() { + if cc != nil { + // Clean things up before checking this error. + err = cc.Close() + } - // Ensure that the backgroundConnector returns - select { - case <-e.backgroundConnectionDoneCh: - case <-ctx.Done(): - return ctx.Err() - } + // At this point we can change the state variable started + e.mu.Lock() + e.started = false + e.mu.Unlock() + closeStopCh(e.stopCh) + + // Ensure that the backgroundConnector returns + select { + case <-e.backgroundConnectionDoneCh: + case <-ctx.Done(): + err = ctx.Err() + } + }) return err } @@ -259,7 +262,10 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e } }(ctx, cancel) - rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers) + // Hardcode the number of worker goroutines to 1. We later will + // need to see if there's a way to adjust that number for longer + // running operations. + rms, err := transform.CheckpointSet(ctx, e, cps, 1) if err != nil { return err } diff --git a/exporters/otlp/otlp_metric_test.go b/exporters/otlp/otlp_metric_test.go index c4ebca4c905..120b88ad8eb 100644 --- a/exporters/otlp/otlp_metric_test.go +++ b/exporters/otlp/otlp_metric_test.go @@ -767,15 +767,7 @@ func TestStatelessExportKind(t *testing.T) { // What works single-threaded should work multi-threaded func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) { - t.Run("1 goroutine", func(t *testing.T) { - runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(1))...), rs, expected) - }) - t.Run("20 goroutines", func(t *testing.T) { - runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(20))...), rs, expected) - }) -} - -func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []metricpb.ResourceMetrics) { + exp := NewUnstartedExporter(opts...) msc := &metricsServiceClientStub{} exp.metricExporter = msc exp.started = true diff --git a/exporters/otlp/otlp_test.go b/exporters/otlp/otlp_test.go index d2e808ebf65..5c2e0e655bf 100644 --- a/exporters/otlp/otlp_test.go +++ b/exporters/otlp/otlp_test.go @@ -17,6 +17,7 @@ package otlp import ( "context" "errors" + "sync" "testing" "time" ) @@ -28,7 +29,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) { cancel() closeStopCh = orig }() - closeStopCh = func(stopCh chan bool) { + closeStopCh = func(stopCh chan struct{}) { go func() { <-ctx.Done() close(stopCh) @@ -56,7 +57,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) { cancel() closeStopCh = orig }() - closeStopCh = func(stopCh chan bool) { + closeStopCh = func(stopCh chan struct{}) { go func() { <-ctx.Done() close(stopCh) @@ -91,3 +92,30 @@ func TestExporterShutdownNoError(t *testing.T) { t.Errorf("shutdown errored: expected nil, got %v", err) } } + +func TestExporterShutdownManyTimes(t *testing.T) { + ctx := context.Background() + e, err := NewExporter() + if err != nil { + t.Fatalf("failed to start an exporter: %v", err) + } + ch := make(chan struct{}) + wg := sync.WaitGroup{} + const num int = 20 + wg.Add(num) + errs := make([]error, num) + for i := 0; i < num; i++ { + go func(idx int) { + defer wg.Done() + <-ch + errs[idx] = e.Shutdown(ctx) + }(i) + } + close(ch) + wg.Wait() + for _, err := range errs { + if err != nil { + t.Fatalf("failed to shutdown exporter: %v", err) + } + } +}