-
Notifications
You must be signed in to change notification settings - Fork 50
/
builder.go
194 lines (165 loc) · 6.56 KB
/
builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
package telemetry
import (
"context"
"fmt"
"time"
"connectrpc.com/connect"
"connectrpc.com/otelconnect"
"github.com/open-feature/flagd/core/pkg/logger"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/prometheus"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
const (
metricsExporterOtel = "otel"
exportInterval = 2 * time.Second
)
// Config of the telemetry runtime. These are expected to be mapped to start-up arguments
type Config struct {
MetricsExporter string
CollectorTarget string
}
func RegisterErrorHandling(log *logger.Logger) {
otel.SetErrorHandler(otelErrorsHandler{
logger: log,
})
}
// BuildMetricsRecorder is a helper to build telemetry.MetricsRecorder based on configurations
func BuildMetricsRecorder(
ctx context.Context, svcName string, svcVersion string, config Config,
) (*MetricsRecorder, error) {
// Build metric reader based on configurations
mReader, err := buildMetricReader(ctx, config)
if err != nil {
return nil, fmt.Errorf("failed to setup metric reader: %w", err)
}
// Build telemetry resource identifier
rsc, err := buildResourceFor(ctx, svcName, svcVersion)
if err != nil {
return nil, fmt.Errorf("failed to setup resource identifier: %w", err)
}
return NewOTelRecorder(mReader, rsc, svcName), nil
}
// BuildTraceProvider build and register the trace provider and propagator for the caller runtime. This method
// attempt to register a global TracerProvider backed by batch SpanProcessor.Config. CollectorTarget can be used to
// provide the grpc collector target. Providing empty target results in skipping provider & propagator registration.
// This results in tracers having NoopTracerProvider and propagator having No-Op TextMapPropagator performing no action
func BuildTraceProvider(ctx context.Context, logger *logger.Logger, svc string, svcVersion string, cfg Config) error {
if cfg.CollectorTarget == "" {
logger.Debug("skipping trace provider setup as collector target is not set." +
" Traces will use NoopTracerProvider provider and propagator will use no-Op TextMapPropagator")
return nil
}
exporter, err := buildOtlpExporter(ctx, cfg.CollectorTarget)
if err != nil {
return err
}
res, err := buildResourceFor(ctx, svc, svcVersion)
if err != nil {
return err
}
provider := trace.NewTracerProvider(
trace.WithSampler(trace.AlwaysSample()),
trace.WithSpanProcessor(trace.NewBatchSpanProcessor(exporter)),
trace.WithResource(res))
otel.SetTracerProvider(provider)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
return nil
}
// BuildConnectOptions is a helper to build connect options based on telemetry configurations
func BuildConnectOptions(cfg Config) ([]connect.HandlerOption, error) {
options := []connect.HandlerOption{}
// add interceptor if configuration is available for collector
if cfg.CollectorTarget != "" {
interceptor, err := otelconnect.NewInterceptor(otelconnect.WithTrustRemote())
if err != nil {
return nil, fmt.Errorf("error creating interceptor, %w", err)
}
options = append(options, connect.WithInterceptors(interceptor))
}
return options, nil
}
// buildMetricReader builds a metric reader based on provided configurations
func buildMetricReader(ctx context.Context, cfg Config) (metric.Reader, error) {
if cfg.MetricsExporter == "" {
return buildDefaultMetricReader()
}
// Handle metric reader override
if cfg.MetricsExporter != metricsExporterOtel {
return nil, fmt.Errorf("provided metrics operator %s is not supported. currently only support %s",
cfg.MetricsExporter, metricsExporterOtel)
}
// Otel override require target configuration
if cfg.CollectorTarget == "" {
return nil, fmt.Errorf("metric exporter is set(%s) without providing otel collector target."+
" collector target is required for this option", cfg.MetricsExporter)
}
// Non-blocking, insecure grpc connection
conn, err := grpc.DialContext(ctx, cfg.CollectorTarget, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("error creating client connection: %w", err)
}
// Otel metric exporter
otelExporter, err := otlpmetricgrpc.New(ctx, otlpmetricgrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("error creating otel metric exporter: %w", err)
}
return metric.NewPeriodicReader(otelExporter, metric.WithInterval(exportInterval)), nil
}
// buildOtlpExporter is a helper to build grpc backed otlp trace exporter
func buildOtlpExporter(ctx context.Context, collectorTarget string) (*otlptrace.Exporter, error) {
// Non-blocking, insecure grpc connection
conn, err := grpc.DialContext(ctx, collectorTarget, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("error creating client connection: %w", err)
}
traceClient := otlptracegrpc.NewClient(otlptracegrpc.WithGRPCConn(conn))
exporter, err := otlptrace.New(ctx, traceClient)
if err != nil {
return nil, fmt.Errorf("error starting otel exporter: %w", err)
}
return exporter, nil
}
// buildDefaultMetricReader provides the default metric reader
func buildDefaultMetricReader() (metric.Reader, error) {
p, err := prometheus.New()
if err != nil {
return nil, fmt.Errorf("unable to create default metric reader: %w", err)
}
return p, nil
}
// buildResourceFor builds a resource identifier with set of resources and service key as attributes
func buildResourceFor(ctx context.Context, serviceName string, serviceVersion string) (*resource.Resource, error) {
r, err := resource.New(
ctx,
resource.WithOS(),
resource.WithHost(),
resource.WithProcessRuntimeVersion(),
resource.WithTelemetrySDK(),
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(serviceVersion)),
)
if err != nil {
return nil, fmt.Errorf("unable to create resource identifier: %w", err)
}
return r, nil
}
// OTelErrorsHandler is a custom error interceptor for OpenTelemetry
type otelErrorsHandler struct {
logger *logger.Logger
}
func (h otelErrorsHandler) Handle(err error) {
msg := fmt.Sprintf("OpenTelemetry Error: %s", err.Error())
h.logger.WithFields(zap.String("component", "otel")).Debug(msg)
}