Skip to content

Commit

Permalink
Add OTLP receiver to collector (#3701)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <github@ysh.us>
Co-authored-by: Albert <26584478+albertteoh@users.noreply.github.com>
  • Loading branch information
yurishkuro and albertteoh committed May 29, 2022
1 parent 88bfe91 commit b708823
Show file tree
Hide file tree
Showing 12 changed files with 575 additions and 95 deletions.
37 changes: 31 additions & 6 deletions cmd/collector/app/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ import (
"time"

"github.com/uber/jaeger-lib/metrics"
"go.opentelemetry.io/collector/component"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/cmd/collector/app/handler"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/cmd/collector/app/sampling/strategystore"
"github.com/jaegertracing/jaeger/cmd/collector/app/server"
Expand All @@ -49,6 +51,7 @@ type Collector struct {
hServer *http.Server
zkServer *http.Server
grpcServer *grpc.Server
otlpReceiver component.TracesReceiver
tlsGRPCCertWatcherCloser io.Closer
tlsHTTPCertWatcherCloser io.Closer
tlsZipkinCertWatcherCloser io.Closer
Expand Down Expand Up @@ -106,7 +109,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
MaxConnectionAgeGrace: options.GRPC.MaxConnectionAgeGrace,
})
if err != nil {
return fmt.Errorf("could not start gRPC collector %w", err)
return fmt.Errorf("could not start gRPC server: %w", err)
}
c.grpcServer = grpcServer

Expand All @@ -120,7 +123,7 @@ func (c *Collector) Start(options *CollectorOptions) error {
Logger: c.logger,
})
if err != nil {
return fmt.Errorf("could not start the HTTP server %w", err)
return fmt.Errorf("could not start HTTP server: %w", err)
}
c.hServer = httpServer

Expand All @@ -138,10 +141,23 @@ func (c *Collector) Start(options *CollectorOptions) error {
MetricsFactory: c.metricsFactory,
})
if err != nil {
return fmt.Errorf("could not start the Zipkin server %w", err)
return fmt.Errorf("could not start Zipkin server: %w", err)
}
c.zkServer = zkServer

otlpReceiver, err := handler.StartOtelReceiver(
handler.OtelReceiverOptions{
GRPCHostPort: options.OTLP.GRPCHostPort,
HTTPHostPort: options.OTLP.HTTPHostPort,
},
c.logger,
c.spanProcessor,
)
if err != nil {
return fmt.Errorf("could not start OTLP receiver: %w", err)
}
c.otlpReceiver = otlpReceiver

c.publishOpts(options)

return nil
Expand All @@ -155,12 +171,12 @@ func (c *Collector) publishOpts(cOpts *CollectorOptions) {

// Close the component and all its underlying dependencies
func (c *Collector) Close() error {
// gRPC server
// Stop gRPC server
if c.grpcServer != nil {
c.grpcServer.GracefulStop()
}

// HTTP server
// Stop HTTP server
if c.hServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.hServer.Shutdown(timeout); err != nil {
Expand All @@ -169,7 +185,7 @@ func (c *Collector) Close() error {
defer cancel()
}

// Zipkin server
// Stop Zipkin server
if c.zkServer != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.zkServer.Shutdown(timeout); err != nil {
Expand All @@ -178,6 +194,15 @@ func (c *Collector) Close() error {
defer cancel()
}

// Stop OpenTelemetry OTLP receiver
if c.otlpReceiver != nil {
timeout, cancel := context.WithTimeout(context.Background(), 5*time.Second)
if err := c.otlpReceiver.Shutdown(timeout); err != nil {
c.logger.Fatal("failed to stop the OTLP receiver", zap.Error(err))
}
defer cancel()
}

if err := c.spanProcessor.Close(); err != nil {
c.logger.Error("failed to close span processor.", zap.Error(err))
}
Expand Down
99 changes: 73 additions & 26 deletions cmd/collector/app/collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/jaeger-lib/metrics/fork"
"github.com/uber/jaeger-lib/metrics/metricstest"
"go.uber.org/zap"
Expand All @@ -33,6 +34,15 @@ import (

var _ (io.Closer) = (*Collector)(nil)

func optionsForEphemeralPorts() *CollectorOptions {
collectorOpts := &CollectorOptions{}
collectorOpts.GRPC.HostPort = ":0"
collectorOpts.HTTP.HostPort = ":0"
collectorOpts.OTLP.GRPCHostPort = ":0"
collectorOpts.OTLP.HTTPHostPort = ":0"
return collectorOpts
}

func TestNewCollector(t *testing.T) {
// prepare
hc := healthcheck.New()
Expand All @@ -49,13 +59,51 @@ func TestNewCollector(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})
collectorOpts := &CollectorOptions{}
collectorOpts := optionsForEphemeralPorts()
require.NoError(t, c.Start(collectorOpts))
assert.NoError(t, c.Close())
}

// test
c.Start(collectorOpts)
func TestCollector_StartErrors(t *testing.T) {
run := func(name string, options *CollectorOptions, expErr string) {
t.Run(name, func(t *testing.T) {
hc := healthcheck.New()
logger := zap.NewNop()
baseMetrics := metricstest.NewFactory(time.Hour)
spanWriter := &fakeSpanWriter{}
strategyStore := &mockStrategyStore{}

c := New(&CollectorParams{
ServiceName: "collector",
Logger: logger,
MetricsFactory: baseMetrics,
SpanWriter: spanWriter,
StrategyStore: strategyStore,
HealthCheck: hc,
})
err := c.Start(options)
require.Error(t, err)
assert.Contains(t, err.Error(), expErr)
})
}

// verify
assert.NoError(t, c.Close())
var options *CollectorOptions

options = optionsForEphemeralPorts()
options.GRPC.HostPort = ":-1"
run("gRPC", options, "could not start gRPC server")

options = optionsForEphemeralPorts()
options.HTTP.HostPort = ":-1"
run("HTTP", options, "could not start HTTP server")

options = optionsForEphemeralPorts()
options.Zipkin.HTTPHostPort = ":-1"
run("Zipkin", options, "could not start Zipkin server")

options = optionsForEphemeralPorts()
options.OTLP.HTTPHostPort = ":-1"
run("OTLP", options, "could not start OTLP receiver")
}

type mockStrategyStore struct {
Expand Down Expand Up @@ -83,12 +131,11 @@ func TestCollector_PublishOpts(t *testing.T) {
StrategyStore: strategyStore,
HealthCheck: hc,
})
collectorOpts := &CollectorOptions{
NumWorkers: 24,
QueueSize: 42,
}
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 24
collectorOpts.QueueSize = 42

c.Start(collectorOpts)
require.NoError(t, c.Start(collectorOpts))
defer c.Close()

forkFactory.AssertGaugeMetrics(t, metricstest.ExpectedMetric{
Expand Down Expand Up @@ -119,16 +166,13 @@ func TestAggregator(t *testing.T) {
HealthCheck: hc,
Aggregator: agg,
})
collectorOpts := &CollectorOptions{
QueueSize: 10,
NumWorkers: 10,
}

// test
c.Start(collectorOpts)
collectorOpts := optionsForEphemeralPorts()
collectorOpts.NumWorkers = 10
collectorOpts.QueueSize = 10
require.NoError(t, c.Start(collectorOpts))

// assert that aggregator was added to the collector
_, err := c.spanProcessor.ProcessSpans([]*model.Span{
spans := []*model.Span{
{
OperationName: "y",
Process: &model.Process{
Expand All @@ -145,15 +189,18 @@ func TestAggregator(t *testing.T) {
},
},
},
}, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
}
_, err := c.spanProcessor.ProcessSpans(spans, processor.SpansOptions{SpanFormat: processor.JaegerSpanFormat})
assert.NoError(t, err)

// verify
assert.NoError(t, c.Close())

// assert that aggregator was used
assert.Equal(t, 1, agg.callCount)

// assert that aggregator close was called
assert.Equal(t, 1, agg.closeCount)
// spans are processed by background workers, so we may need to wait
for i := 0; i < 1000; i++ {
if agg.callCount.Load() == 1 && agg.closeCount.Load() == 1 {
break
}
time.Sleep(10 * time.Millisecond)
}
assert.EqualValues(t, 1, agg.callCount.Load(), "aggregator was used")
assert.EqualValues(t, 1, agg.closeCount.Load(), "aggregator close was called")
}
5 changes: 5 additions & 0 deletions cmd/collector/app/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ type CollectorOptions struct {
// See gRPC's keepalive.ServerParameters#MaxConnectionAgeGrace.
MaxConnectionAgeGrace time.Duration
}
// OTLP section defines options for servers accepting OpenTelemetry OTLP format
OTLP struct {
GRPCHostPort string
HTTPHostPort string
}
// Zipkin section defines options for Zipkin HTTP server
Zipkin struct {
// HTTPHostPort is the host:port address that the Zipkin collector service listens in on for http requests
Expand Down
42 changes: 31 additions & 11 deletions cmd/collector/app/handler/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,44 +19,64 @@ import (

"go.uber.org/zap"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip"
_ "google.golang.org/grpc/encoding/gzip" // register zip encoding
"google.golang.org/grpc/status"

"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

// GRPCHandler implements gRPC CollectorService.
type GRPCHandler struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
batchConsumer batchConsumer
}

// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor processor.SpanProcessor) *GRPCHandler {
return &GRPCHandler{
logger: logger,
spanProcessor: spanProcessor,
logger: logger,
batchConsumer: batchConsumer{
logger: logger,
spanProcessor: spanProcessor,
spanOptions: processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
},
},
}
}

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
for _, span := range r.GetBatch().Spans {
batch := &r.Batch
err := g.batchConsumer.consume(batch)
return &api_v2.PostSpansResponse{}, err
}

type batchConsumer struct {
logger *zap.Logger
spanProcessor processor.SpanProcessor
spanOptions processor.SpansOptions
}

func (c *batchConsumer) consume(batch *model.Batch) error {
for _, span := range batch.Spans {
if span.GetProcess() == nil {
span.Process = r.Batch.Process
span.Process = batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, processor.SpansOptions{
_, err := c.spanProcessor.ProcessSpans(batch.Spans, processor.SpansOptions{
InboundTransport: processor.GRPCTransport,
SpanFormat: processor.ProtoSpanFormat,
})
if err != nil {
if err == processor.ErrBusy {
return nil, status.Errorf(codes.ResourceExhausted, err.Error())
return status.Errorf(codes.ResourceExhausted, err.Error())
}
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
c.logger.Error("cannot process spans", zap.Error(err))
return err
}
return &api_v2.PostSpansResponse{}, nil
return nil
}
Loading

0 comments on commit b708823

Please sign in to comment.