Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a split protocol driver for otlp exporter #1418

Merged
merged 4 commits into from
Dec 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

- Add the `ReadOnlySpan` and `ReadWriteSpan` interfaces to provide better control for accessing span data. (#1360)
- `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369)
- `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418)

### Changed

Expand Down
106 changes: 106 additions & 0 deletions exporters/otlp/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ import (

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/sdk/metric/controller/push"
"go.opentelemetry.io/otel/sdk/metric/processor/basic"
"go.opentelemetry.io/otel/sdk/metric/selector/simple"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
)

Expand Down Expand Up @@ -51,6 +55,13 @@ func Example_insecure() {
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()
otel.SetTracerProvider(tp)

tracer := otel.Tracer("test-tracer")
Expand Down Expand Up @@ -97,6 +108,13 @@ func Example_withTLS() {
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()
otel.SetTracerProvider(tp)

tracer := otel.Tracer("test-tracer")
Expand All @@ -111,3 +129,91 @@ func Example_withTLS() {
iSpan.End()
}
}

func Example_withDifferentSignalCollectors() {

// Set different endpoints for the metrics and traces collectors
metricsDriver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithAddress("localhost:30080"),
)
tracesDriver := otlp.NewGRPCDriver(
otlp.WithInsecure(),
otlp.WithAddress("localhost:30082"),
)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,
ForTraces: tracesDriver,
}
driver := otlp.NewSplitDriver(splitCfg)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
log.Fatalf("failed to create the collector exporter: %v", err)
}

defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()

tp := sdktrace.NewTracerProvider(
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
exp,
// add following two options to ensure flush
sdktrace.WithBatchTimeout(5),
sdktrace.WithMaxExportBatchSize(10),
),
)
defer func() {
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
if err := tp.Shutdown(ctx); err != nil {
otel.Handle(err)
}
}()
otel.SetTracerProvider(tp)

pusher := push.New(
basic.New(
simple.NewWithExactDistribution(),
exp,
),
exp,
push.WithPeriod(2*time.Second),
)
otel.SetMeterProvider(pusher.MeterProvider())

pusher.Start()
defer pusher.Stop() // pushes any last exports to the receiver

tracer := otel.Tracer("test-tracer")
meter := otel.Meter("test-meter")

// Recorder metric example
valuerecorder := metric.Must(meter).
NewFloat64Counter(
"an_important_metric",
metric.WithDescription("Measures the cumulative epicness of the app"),
)

// work begins
ctx, span := tracer.Start(
ctx,
"DifferentCollectors-Example")
defer span.End()
for i := 0; i < 10; i++ {
_, iSpan := tracer.Start(ctx, fmt.Sprintf("Sample-%d", i))
log.Printf("Doing really hard work (%d / 10)\n", i+1)
valuerecorder.Add(ctx, 1.0)

<-time.After(time.Second)
iSpan.End()
}

log.Printf("Done!")
}
90 changes: 68 additions & 22 deletions exporters/otlp/otlp_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,25 +97,7 @@ func newGRPCExporter(t *testing.T, ctx context.Context, address string, addition
return exp
}

func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
mc := runMockColAtAddr(t, "localhost:56561")

defer func() {
_ = mc.stop()
}()

<-time.After(5 * time.Millisecond)

ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()

func runEndToEndTest(t *testing.T, ctx context.Context, exp *otlp.Exporter, mcTraces, mcMetrics *mockCol) {
pOpts := []sdktrace.TracerProviderOption{
sdktrace.WithConfig(sdktrace.Config{DefaultSampler: sdktrace.AlwaysSample()}),
sdktrace.WithBatcher(
Expand Down Expand Up @@ -239,10 +221,11 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO

// Shutdown the collector too so that we can begin
// verification checks of expected data back.
_ = mc.stop()
_ = mcTraces.stop()
_ = mcMetrics.stop()

// Now verify that we only got two resources
rss := mc.getResourceSpans()
rss := mcTraces.getResourceSpans()
if got, want := len(rss), 2; got != want {
t.Fatalf("resource span count: got %d, want %d\n", got, want)
}
Expand Down Expand Up @@ -273,7 +256,7 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
}
}

metrics := mc.getMetrics()
metrics := mcMetrics.getMetrics()
assert.Len(t, metrics, len(instruments), "not enough metrics exported")
seen := make(map[string]struct{}, len(instruments))
for _, m := range metrics {
Expand Down Expand Up @@ -342,6 +325,28 @@ func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionO
}
}

func newExporterEndToEndTest(t *testing.T, additionalOpts []otlp.GRPCConnectionOption) {
mc := runMockColAtAddr(t, "localhost:56561")

defer func() {
_ = mc.stop()
}()

<-time.After(5 * time.Millisecond)

ctx := context.Background()
exp := newGRPCExporter(t, ctx, mc.address, additionalOpts...)
defer func() {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
if err := exp.Shutdown(ctx); err != nil {
panic(err)
}
}()

runEndToEndTest(t, ctx, exp, mc, mc)
}

func TestNewExporter_invokeStartThenStopManyTimes(t *testing.T) {
mc := runMockCol(t)
defer func() {
Expand Down Expand Up @@ -761,3 +766,44 @@ func TestFailedMetricTransform(t *testing.T) {

assert.Error(t, exp.Export(ctx, failCheckpointSet{}))
}

func TestMultiConnectionDriver(t *testing.T) {
mcTraces := runMockCol(t)
mcMetrics := runMockCol(t)

defer func() {
_ = mcTraces.stop()
_ = mcMetrics.stop()
}()

<-time.After(5 * time.Millisecond)

commonOpts := []otlp.GRPCConnectionOption{
otlp.WithInsecure(),
otlp.WithReconnectionPeriod(50 * time.Millisecond),
otlp.WithGRPCDialOption(grpc.WithBlock()),
}
optsTraces := append([]otlp.GRPCConnectionOption{
otlp.WithAddress(mcTraces.address),
}, commonOpts...)
optsMetrics := append([]otlp.GRPCConnectionOption{
otlp.WithAddress(mcMetrics.address),
}, commonOpts...)

tracesDriver := otlp.NewGRPCDriver(optsTraces...)
metricsDriver := otlp.NewGRPCDriver(optsMetrics...)
splitCfg := otlp.SplitConfig{
ForMetrics: metricsDriver,
ForTraces: tracesDriver,
}
driver := otlp.NewSplitDriver(splitCfg)
ctx := context.Background()
exp, err := otlp.NewExporter(ctx, driver)
if err != nil {
t.Fatalf("failed to create a new collector exporter: %v", err)
}
defer func() {
assert.NoError(t, exp.Shutdown(ctx))
}()
runEndToEndTest(t, ctx, exp, mcTraces, mcMetrics)
}
9 changes: 5 additions & 4 deletions exporters/otlp/otlp_metric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package otlp
package otlp_test

import (
"context"
Expand All @@ -23,6 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.opentelemetry.io/otel/exporters/otlp"
commonpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/common/v1"
metricpb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/metrics/v1"
resourcepb "go.opentelemetry.io/otel/exporters/otlp/internal/opentelemetry-proto-gen/resource/v1"
Expand Down Expand Up @@ -692,8 +693,8 @@ func TestStatelessExportKind(t *testing.T) {
t.Run(k.name, func(t *testing.T) {
runMetricExportTests(
t,
[]ExporterOption{
WithMetricExportKindSelector(
[]otlp.ExporterOption{
otlp.WithMetricExportKindSelector(
metricsdk.StatelessExportKindSelector(),
),
},
Expand Down Expand Up @@ -740,7 +741,7 @@ func TestStatelessExportKind(t *testing.T) {
}
}

func runMetricExportTests(t *testing.T, opts []ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
func runMetricExportTests(t *testing.T, opts []otlp.ExporterOption, rs []record, expected []metricpb.ResourceMetrics) {
exp, driver := newExporter(t, opts...)

recs := map[label.Distinct][]metricsdk.Record{}
Expand Down
2 changes: 1 addition & 1 deletion exporters/otlp/otlp_span_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package otlp
package otlp_test

import (
"context"
Expand Down
Loading