/
zipkin_receiver.go
92 lines (84 loc) · 3 KB
/
zipkin_receiver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
// Copyright (c) 2023 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0
package handler
import (
"context"
"fmt"
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
noopmetric "go.opentelemetry.io/otel/metric/noop"
nooptrace "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/zap"
"github.com/jaegertracing/jaeger/cmd/collector/app/flags"
"github.com/jaegertracing/jaeger/cmd/collector/app/processor"
"github.com/jaegertracing/jaeger/pkg/tenancy"
)
// StartZipkinReceiver starts Zipkin receiver from OTEL Collector.
func StartZipkinReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.Manager,
) (receiver.Traces, error) {
zipkinFactory := zipkinreceiver.NewFactory()
return startZipkinReceiver(
options,
logger,
spanProcessor,
tm,
zipkinFactory,
consumer.NewTraces,
zipkinFactory.CreateTracesReceiver,
)
}
// Some of OTELCOL constructor functions return errors when passed nil arguments,
// which is a situation we cannot reproduce. To test our own error handling, this
// function allows to mock those constructors.
func startZipkinReceiver(
options *flags.CollectorOptions,
logger *zap.Logger,
spanProcessor processor.SpanProcessor,
tm *tenancy.Manager,
// from here: params that can be mocked in tests
zipkinFactory receiver.Factory,
newTraces func(consume consumer.ConsumeTracesFunc, options ...consumer.Option) (consumer.Traces, error),
createTracesReceiver func(ctx context.Context, set receiver.CreateSettings,
cfg component.Config, nextConsumer consumer.Traces) (receiver.Traces, error),
) (receiver.Traces, error) {
receiverConfig := zipkinFactory.CreateDefaultConfig().(*zipkinreceiver.Config)
applyHTTPSettings(&receiverConfig.ServerConfig, &flags.HTTPOptions{
HostPort: options.Zipkin.HTTPHostPort,
TLS: options.Zipkin.TLS,
CORS: options.HTTP.CORS,
// TODO keepAlive not supported?
})
receiverSettings := receiver.CreateSettings{
TelemetrySettings: component.TelemetrySettings{
Logger: logger,
TracerProvider: nooptrace.NewTracerProvider(),
MeterProvider: noopmetric.NewMeterProvider(), // TODO wire this with jaegerlib metrics?
},
}
consumerAdapter := newConsumerDelegate(logger, spanProcessor, tm)
// reset Zipkin spanFormat
consumerAdapter.batchConsumer.spanOptions.SpanFormat = processor.ZipkinSpanFormat
nextConsumer, err := newTraces(consumerAdapter.consume)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin consumer: %w", err)
}
rcvr, err := createTracesReceiver(
context.Background(),
receiverSettings,
receiverConfig,
nextConsumer,
)
if err != nil {
return nil, fmt.Errorf("could not create Zipkin receiver: %w", err)
}
if err := rcvr.Start(context.Background(), &otelHost{logger: logger}); err != nil {
return nil, fmt.Errorf("could not start Zipkin receiver: %w", err)
}
return rcvr, nil
}