Skip to content

Commit

Permalink
Some cleanups in otlp exporter (#1350)
Browse files Browse the repository at this point in the history
* Drop WorkerCount option

This is not a good option - the user isn't likely to know how many
worker goroutines is optimal. This should be something that an
exporter should figure out itself. The second problem with the option
is that it is specific to the metric transformation from SDK export
format into protobuf. When the exporter starts supporting other
protocols (HTTP/JSON for example), this option may be of no use. So
the option should rather belong to the protocol, not to the
exporter. Currently both mean the same, but later they will be
separated, and this option breaks the separation.

* Make stop channel a typical signalling channel

Signalling channels are idiomatically defined as chan struct{}, so
let's make it so, to avoid confusion about the meaning of the bool
type.

* Close a race when grpc connection is closed multiple times

If several goroutines call Shutdown at the same time, then the
following scenario is possible:

goroutine A locks a mutex, reads a started member, unlocks the mutex
and gets preempted

goroutine B locks a mutex, reads a started member, unlocks the mutex
and gets preempted

goroutine A does not return early in the "if !started" conditional and
continues to close the connection and execute the rest of the function
(where it finally sets the started member to false), gets preempted

goroutine B also does not return early, because it got a copy of
started before goroutine A set it to false, so it tries to close the
connection again.

* Update changelog
  • Loading branch information
krnowak committed Nov 20, 2020
1 parent 5b5b4ab commit 6eb6801
Show file tree
Hide file tree
Showing 5 changed files with 58 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Expand Up @@ -59,6 +59,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `AddEventWithTimestamp` method on the `Span` interface in `go.opentelemetry.io/otel` is removed due to its redundancy.
It is replaced by using the `AddEvent` method with a `WithTimestamp` option. (#1254)
- Structs `MockSpan` and `MockTracer` are removed from `go.opentelemetry.io/otel/oteltest`. `Tracer` and `Span` from the same module should be used in their place instead. (#1306)
- `WorkerCount` option is removed from `go.opentelemetry.io/otel/exporters/otlp`. (#1350)

### Fixed

Expand Down
15 changes: 0 additions & 15 deletions exporters/otlp/options.go
Expand Up @@ -30,10 +30,6 @@ const (
// DefaultCollectorHost is the host address the Exporter will attempt
// connect to if no collector address is provided.
DefaultCollectorHost string = "localhost"
// DefaultNumWorkers is the number of goroutines the Exporter will use when
// processing telemetry.
DefaultNumWorkers uint = 1

// DefaultGRPCServiceConfig is the gRPC service config used if none is
// provided by the user.
//
Expand Down Expand Up @@ -84,20 +80,9 @@ type config struct {
grpcDialOptions []grpc.DialOption
headers map[string]string
clientCredentials credentials.TransportCredentials
numWorkers uint
exportKindSelector metricsdk.ExportKindSelector
}

// WorkerCount sets the number of Goroutines to use when processing telemetry.
func WorkerCount(n uint) ExporterOption {
if n == 0 {
n = DefaultNumWorkers
}
return func(cfg *config) {
cfg.numWorkers = n
}
}

// WithInsecure disables client transport security for the exporter's gRPC connection
// just like grpc.WithInsecure() https://pkg.go.dev/google.golang.org/grpc#WithInsecure
// does. Note, by default, client security is required unless WithInsecure is used.
Expand Down
46 changes: 26 additions & 20 deletions exporters/otlp/otlp.go
Expand Up @@ -51,7 +51,8 @@ type Exporter struct {
lastConnectErrPtr unsafe.Pointer

startOnce sync.Once
stopCh chan bool
stopOnce sync.Once
stopCh chan struct{}
disconnectedCh chan bool

backgroundConnectionDoneCh chan bool
Expand All @@ -67,7 +68,6 @@ var _ metricsdk.Exporter = (*Exporter)(nil)
// any ExporterOptions provided.
func newConfig(opts ...ExporterOption) config {
cfg := config{
numWorkers: DefaultNumWorkers,
grpcServiceConfig: DefaultGRPCServiceConfig,

// Note: the default ExportKindSelector is specified
Expand Down Expand Up @@ -119,7 +119,7 @@ func (e *Exporter) Start() error {
e.mu.Lock()
e.started = true
e.disconnectedCh = make(chan bool, 1)
e.stopCh = make(chan bool)
e.stopCh = make(chan struct{})
e.backgroundConnectionDoneCh = make(chan bool)
e.mu.Unlock()

Expand Down Expand Up @@ -206,7 +206,7 @@ func (e *Exporter) dialToCollector() (*grpc.ClientConn, error) {
}

// closeStopCh is used to wrap the exporters stopCh channel closing for testing.
var closeStopCh = func(stopCh chan bool) {
var closeStopCh = func(stopCh chan struct{}) {
close(stopCh)
}

Expand All @@ -223,23 +223,26 @@ func (e *Exporter) Shutdown(ctx context.Context) error {
}

var err error
if cc != nil {
// Clean things up before checking this error.
err = cc.Close()
}

// At this point we can change the state variable started
e.mu.Lock()
e.started = false
e.mu.Unlock()
closeStopCh(e.stopCh)
e.stopOnce.Do(func() {
if cc != nil {
// Clean things up before checking this error.
err = cc.Close()
}

// Ensure that the backgroundConnector returns
select {
case <-e.backgroundConnectionDoneCh:
case <-ctx.Done():
return ctx.Err()
}
// At this point we can change the state variable started
e.mu.Lock()
e.started = false
e.mu.Unlock()
closeStopCh(e.stopCh)

// Ensure that the backgroundConnector returns
select {
case <-e.backgroundConnectionDoneCh:
case <-ctx.Done():
err = ctx.Err()
}
})

return err
}
Expand All @@ -259,7 +262,10 @@ func (e *Exporter) Export(parent context.Context, cps metricsdk.CheckpointSet) e
}
}(ctx, cancel)

rms, err := transform.CheckpointSet(ctx, e, cps, e.c.numWorkers)
// Hardcode the number of worker goroutines to 1. We later will
// need to see if there's a way to adjust that number for longer
// running operations.
rms, err := transform.CheckpointSet(ctx, e, cps, 1)
if err != nil {
return err
}
Expand Down
10 changes: 1 addition & 9 deletions exporters/otlp/otlp_metric_test.go
Expand Up @@ -767,15 +767,7 @@ func TestStatelessExportKind(t *testing.T) {

// What works single-threaded should work multi-threaded
func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
t.Run("1 goroutine", func(t *testing.T) {
runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(1))...), rs, expected)
})
t.Run("20 goroutines", func(t *testing.T) {
runMetricExportTest(t, NewUnstartedExporter(append(opts[:len(opts):len(opts)], WorkerCount(20))...), rs, expected)
})
}

func runMetricExportTest(t *testing.T, exp *Exporter, rs []record, expected []metricpb.ResourceMetrics) {
exp := NewUnstartedExporter(opts...)
msc := &metricsServiceClientStub{}
exp.metricExporter = msc
exp.started = true
Expand Down
32 changes: 30 additions & 2 deletions exporters/otlp/otlp_test.go
Expand Up @@ -17,6 +17,7 @@ package otlp
import (
"context"
"errors"
"sync"
"testing"
"time"
)
Expand All @@ -28,7 +29,7 @@ func TestExporterShutdownHonorsTimeout(t *testing.T) {
cancel()
closeStopCh = orig
}()
closeStopCh = func(stopCh chan bool) {
closeStopCh = func(stopCh chan struct{}) {
go func() {
<-ctx.Done()
close(stopCh)
Expand Down Expand Up @@ -56,7 +57,7 @@ func TestExporterShutdownHonorsCancel(t *testing.T) {
cancel()
closeStopCh = orig
}()
closeStopCh = func(stopCh chan bool) {
closeStopCh = func(stopCh chan struct{}) {
go func() {
<-ctx.Done()
close(stopCh)
Expand Down Expand Up @@ -91,3 +92,30 @@ func TestExporterShutdownNoError(t *testing.T) {
t.Errorf("shutdown errored: expected nil, got %v", err)
}
}

func TestExporterShutdownManyTimes(t *testing.T) {
ctx := context.Background()
e, err := NewExporter()
if err != nil {
t.Fatalf("failed to start an exporter: %v", err)
}
ch := make(chan struct{})
wg := sync.WaitGroup{}
const num int = 20
wg.Add(num)
errs := make([]error, num)
for i := 0; i < num; i++ {
go func(idx int) {
defer wg.Done()
<-ch
errs[idx] = e.Shutdown(ctx)
}(i)
}
close(ch)
wg.Wait()
for _, err := range errs {
if err != nil {
t.Fatalf("failed to shutdown exporter: %v", err)
}
}
}

0 comments on commit 6eb6801

Please sign in to comment.