-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
otlgrpc: provide a way to get otlp client from grpc conn
Hopefully this can be removed with a future upstream change that could make this configurable. The package also needs internal dependency that is copied in. Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
- Loading branch information
1 parent
5479425
commit 9167e98
Showing
8 changed files
with
1,412 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,215 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package otlpgrpc | ||
|
||
import ( | ||
"context" | ||
"math/rand" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
"unsafe" | ||
|
||
"google.golang.org/grpc" | ||
"google.golang.org/grpc/metadata" | ||
) | ||
|
||
type connection struct { | ||
// Ensure pointer is 64-bit aligned for atomic operations on both 32 and 64 bit machines. | ||
lastConnectErrPtr unsafe.Pointer | ||
|
||
// mu protects the connection as it is accessed by the | ||
// exporter goroutines and background connection goroutine | ||
mu sync.Mutex | ||
cc *grpc.ClientConn | ||
|
||
// these fields are read-only after constructor is finished | ||
metadata metadata.MD | ||
newConnectionHandler func(cc *grpc.ClientConn) | ||
|
||
// these channels are created once | ||
disconnectedCh chan bool | ||
backgroundConnectionDoneCh chan struct{} | ||
stopCh chan struct{} | ||
|
||
// this is for tests, so they can replace the closing | ||
// routine without a worry of modifying some global variable | ||
// or changing it back to original after the test is done | ||
closeBackgroundConnectionDoneCh func(ch chan struct{}) | ||
} | ||
|
||
func newConnection(cc *grpc.ClientConn, handler func(cc *grpc.ClientConn)) *connection { | ||
c := new(connection) | ||
c.newConnectionHandler = handler | ||
c.cc = cc | ||
c.closeBackgroundConnectionDoneCh = func(ch chan struct{}) { | ||
close(ch) | ||
} | ||
return c | ||
} | ||
|
||
func (c *connection) startConnection(ctx context.Context) { | ||
c.stopCh = make(chan struct{}) | ||
c.disconnectedCh = make(chan bool, 1) | ||
c.backgroundConnectionDoneCh = make(chan struct{}) | ||
|
||
if err := c.connect(ctx); err == nil { | ||
c.setStateConnected() | ||
} else { | ||
c.setStateDisconnected(err) | ||
} | ||
go c.indefiniteBackgroundConnection() | ||
} | ||
|
||
func (c *connection) lastConnectError() error { | ||
errPtr := (*error)(atomic.LoadPointer(&c.lastConnectErrPtr)) | ||
if errPtr == nil { | ||
return nil | ||
} | ||
return *errPtr | ||
} | ||
|
||
func (c *connection) saveLastConnectError(err error) { | ||
var errPtr *error | ||
if err != nil { | ||
errPtr = &err | ||
} | ||
atomic.StorePointer(&c.lastConnectErrPtr, unsafe.Pointer(errPtr)) | ||
} | ||
|
||
func (c *connection) setStateDisconnected(err error) { | ||
c.saveLastConnectError(err) | ||
select { | ||
case c.disconnectedCh <- true: | ||
default: | ||
} | ||
c.newConnectionHandler(nil) | ||
} | ||
|
||
func (c *connection) setStateConnected() { | ||
c.saveLastConnectError(nil) | ||
} | ||
|
||
func (c *connection) connected() bool { | ||
return c.lastConnectError() == nil | ||
} | ||
|
||
const defaultConnReattemptPeriod = 10 * time.Second | ||
|
||
func (c *connection) indefiniteBackgroundConnection() { | ||
defer func() { | ||
c.closeBackgroundConnectionDoneCh(c.backgroundConnectionDoneCh) | ||
}() | ||
|
||
connReattemptPeriod := defaultConnReattemptPeriod | ||
|
||
// No strong seeding required, nano time can | ||
// already help with pseudo uniqueness. | ||
rng := rand.New(rand.NewSource(time.Now().UnixNano() + rand.Int63n(1024))) | ||
|
||
// maxJitterNanos: 70% of the connectionReattemptPeriod | ||
maxJitterNanos := int64(0.7 * float64(connReattemptPeriod)) | ||
|
||
for { | ||
// Otherwise these will be the normal scenarios to enable | ||
// reconnection if we trip out. | ||
// 1. If we've stopped, return entirely | ||
// 2. Otherwise block until we are disconnected, and | ||
// then retry connecting | ||
select { | ||
case <-c.stopCh: | ||
return | ||
|
||
case <-c.disconnectedCh: | ||
// Quickly check if we haven't stopped at the | ||
// same time. | ||
select { | ||
case <-c.stopCh: | ||
return | ||
|
||
default: | ||
} | ||
|
||
// Normal scenario that we'll wait for | ||
} | ||
|
||
if err := c.connect(context.Background()); err == nil { | ||
c.setStateConnected() | ||
} else { | ||
// this code is unreachable in most cases | ||
// c.connect does not establish connection | ||
c.setStateDisconnected(err) | ||
} | ||
|
||
// Apply some jitter to avoid lockstep retrials of other | ||
// collector-exporters. Lockstep retrials could result in an | ||
// innocent DDOS, by clogging the machine's resources and network. | ||
jitter := time.Duration(rng.Int63n(maxJitterNanos)) | ||
select { | ||
case <-c.stopCh: | ||
return | ||
case <-time.After(connReattemptPeriod + jitter): | ||
} | ||
} | ||
} | ||
|
||
func (c *connection) connect(ctx context.Context) error { | ||
c.newConnectionHandler(c.cc) | ||
return nil | ||
} | ||
|
||
func (c *connection) contextWithMetadata(ctx context.Context) context.Context { | ||
if c.metadata.Len() > 0 { | ||
return metadata.NewOutgoingContext(ctx, c.metadata) | ||
} | ||
return ctx | ||
} | ||
|
||
func (c *connection) shutdown(ctx context.Context) error { | ||
close(c.stopCh) | ||
// Ensure that the backgroundConnector returns | ||
select { | ||
case <-c.backgroundConnectionDoneCh: | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
|
||
c.mu.Lock() | ||
cc := c.cc | ||
c.cc = nil | ||
c.mu.Unlock() | ||
|
||
if cc != nil { | ||
return cc.Close() | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (c *connection) contextWithStop(ctx context.Context) (context.Context, context.CancelFunc) { | ||
// Unify the parent context Done signal with the connection's | ||
// stop channel. | ||
ctx, cancel := context.WithCancel(ctx) | ||
go func(ctx context.Context, cancel context.CancelFunc) { | ||
select { | ||
case <-ctx.Done(): | ||
// Nothing to do, either cancelled or deadline | ||
// happened. | ||
case <-c.stopCh: | ||
cancel() | ||
} | ||
}(ctx, cancel) | ||
return ctx, cancel | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
/* | ||
Package otlpgrpc provides an implementation of otlp.ProtocolDriver | ||
that connects to the collector and sends traces and metrics using | ||
gRPC. | ||
This package is currently in a pre-GA phase. Backwards incompatible | ||
changes may be introduced in subsequent minor version releases as we | ||
work to track the evolving OpenTelemetry specification and user | ||
feedback. | ||
*/ | ||
package otlpgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,122 @@ | ||
// Copyright The OpenTelemetry Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package otlpgrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlpgrpc" | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"fmt" | ||
"sync" | ||
|
||
"google.golang.org/grpc" | ||
|
||
transform "github.com/moby/buildkit/util/tracing/otlptransform" | ||
"go.opentelemetry.io/otel/exporters/otlp" | ||
metricsdk "go.opentelemetry.io/otel/sdk/export/metric" | ||
tracesdk "go.opentelemetry.io/otel/sdk/trace" | ||
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" | ||
tracepb "go.opentelemetry.io/proto/otlp/trace/v1" | ||
) | ||
|
||
type driver struct { | ||
tracesDriver tracesDriver | ||
} | ||
|
||
type tracesDriver struct { | ||
connection *connection | ||
|
||
lock sync.Mutex | ||
tracesClient coltracepb.TraceServiceClient | ||
} | ||
|
||
var ( | ||
errNoClient = errors.New("no client") | ||
) | ||
|
||
// NewDriver creates a new gRPC protocol driver. | ||
func NewDriver(cc *grpc.ClientConn) otlp.ProtocolDriver { | ||
d := &driver{} | ||
|
||
d.tracesDriver = tracesDriver{ | ||
connection: newConnection(cc, d.tracesDriver.handleNewConnection), | ||
} | ||
|
||
return d | ||
} | ||
|
||
func (td *tracesDriver) handleNewConnection(cc *grpc.ClientConn) { | ||
td.lock.Lock() | ||
defer td.lock.Unlock() | ||
if cc != nil { | ||
td.tracesClient = coltracepb.NewTraceServiceClient(cc) | ||
} else { | ||
td.tracesClient = nil | ||
} | ||
} | ||
|
||
// Start implements otlp.ProtocolDriver. It establishes a connection | ||
// to the collector. | ||
func (d *driver) Start(ctx context.Context) error { | ||
d.tracesDriver.connection.startConnection(ctx) | ||
return nil | ||
} | ||
|
||
// Stop implements otlp.ProtocolDriver. It shuts down the connection | ||
// to the collector. | ||
func (d *driver) Stop(ctx context.Context) error { | ||
return d.tracesDriver.connection.shutdown(ctx) | ||
} | ||
|
||
func (d *driver) ExportMetrics(ctx context.Context, cps metricsdk.CheckpointSet, selector metricsdk.ExportKindSelector) error { | ||
return errors.New("metrics in not implemented") | ||
} | ||
|
||
// ExportTraces implements otlp.ProtocolDriver. It transforms spans to | ||
// protobuf binary format and sends the result to the collector. | ||
func (d *driver) ExportTraces(ctx context.Context, ss []*tracesdk.SpanSnapshot) error { | ||
if !d.tracesDriver.connection.connected() { | ||
return fmt.Errorf("traces exporter is disconnected: %w", d.tracesDriver.connection.lastConnectError()) | ||
} | ||
ctx, cancel := d.tracesDriver.connection.contextWithStop(ctx) | ||
defer cancel() | ||
ctx, tCancel := context.WithTimeout(ctx, 30) | ||
defer tCancel() | ||
|
||
protoSpans := transform.SpanData(ss) | ||
if len(protoSpans) == 0 { | ||
return nil | ||
} | ||
|
||
return d.tracesDriver.uploadTraces(ctx, protoSpans) | ||
} | ||
|
||
func (td *tracesDriver) uploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { | ||
ctx = td.connection.contextWithMetadata(ctx) | ||
err := func() error { | ||
td.lock.Lock() | ||
defer td.lock.Unlock() | ||
if td.tracesClient == nil { | ||
return errNoClient | ||
} | ||
_, err := td.tracesClient.Export(ctx, &coltracepb.ExportTraceServiceRequest{ | ||
ResourceSpans: protoSpans, | ||
}) | ||
return err | ||
}() | ||
if err != nil { | ||
td.connection.setStateDisconnected(err) | ||
} | ||
return err | ||
} |
Oops, something went wrong.