/
tracing.go
156 lines (127 loc) · 3.96 KB
/
tracing.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
package metrics
import (
"context"
cryptorand "crypto/rand"
"encoding/binary"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
"math/rand"
"runtime"
"sync"
"time"
)
func configureTracing(ctx context.Context, client *client, conf *config) error {
provider := conf.tracerProvider
if provider == nil {
var opts []sdktrace.TracerProviderOption
opts = append(opts, sdktrace.WithIDGenerator(newIDGenerator()))
if res := conf.newResource(); res != nil {
opts = append(opts, sdktrace.WithResource(res))
}
if conf.traceSampler != nil {
opts = append(opts, sdktrace.WithSampler(conf.traceSampler))
}
provider = sdktrace.NewTracerProvider(opts...)
otel.SetTracerProvider(provider)
}
exp, err := otlptrace.New(ctx, otlpTraceClient(conf, client.dsn))
if err != nil {
return fmt.Errorf("otlptrace.New failed: %s", err)
}
queueSize := queueSize()
bspOptions := []sdktrace.BatchSpanProcessorOption{
sdktrace.WithMaxQueueSize(queueSize),
sdktrace.WithMaxExportBatchSize(queueSize),
sdktrace.WithBatchTimeout(10 * time.Second),
sdktrace.WithExportTimeout(10 * time.Second),
}
bspOptions = append(bspOptions, conf.bspOptions...)
bsp := sdktrace.NewBatchSpanProcessor(exp, bspOptions...)
provider.RegisterSpanProcessor(bsp)
if conf.prettyPrint {
exporter, err := stdouttrace.New(stdouttrace.WithPrettyPrint())
if err != nil {
fmt.Errorf(err.Error())
} else {
provider.RegisterSpanProcessor(sdktrace.NewSimpleSpanProcessor(exporter))
}
}
client.tp = provider
return nil
}
func otlpTraceClient(conf *config, dsn *DSN) otlptrace.Client {
options := []otlptracegrpc.Option{
otlptracegrpc.WithEndpoint(dsn.OTLPEndpoint()),
otlptracegrpc.WithHeaders(map[string]string{
"origin-dsn": dsn.String(),
}),
otlptracegrpc.WithCompressor(gzip.Name),
}
if conf.tlsConf != nil {
creds := credentials.NewTLS(conf.tlsConf)
options = append(options, otlptracegrpc.WithTLSCredentials(creds))
} else if dsn.Scheme == "https" {
// Create credentials using system certificates.
creds := credentials.NewClientTLSFromCert(nil, "")
options = append(options, otlptracegrpc.WithTLSCredentials(creds))
} else {
options = append(options, otlptracegrpc.WithInsecure())
}
return otlptracegrpc.NewClient(options...)
}
func queueSize() int {
const min = 1000
const max = 16000
n := (runtime.GOMAXPROCS(0) / 2) * 1000
if n < min {
return min
}
if n > max {
return max
}
return n
}
//------------------------------------------------------------------------------
const spanIDPrec = int64(time.Millisecond)
type idGenerator struct {
sync.Mutex
randSource *rand.Rand
}
func newIDGenerator() *idGenerator {
gen := &idGenerator{}
var rngSeed int64
_ = binary.Read(cryptorand.Reader, binary.LittleEndian, &rngSeed)
gen.randSource = rand.New(rand.NewSource(rngSeed))
return gen
}
var _ sdktrace.IDGenerator = (*idGenerator)(nil)
// NewIDs returns a new trace and span ID.
func (gen *idGenerator) NewIDs(ctx context.Context) (trace.TraceID, trace.SpanID) {
unixNano := time.Now().UnixNano()
gen.Lock()
defer gen.Unlock()
tid := trace.TraceID{}
binary.BigEndian.PutUint64(tid[:8], uint64(unixNano))
_, _ = gen.randSource.Read(tid[8:])
sid := trace.SpanID{}
binary.BigEndian.PutUint32(sid[:4], uint32(unixNano/spanIDPrec))
_, _ = gen.randSource.Read(sid[4:])
return tid, sid
}
// NewSpanID returns a ID for a new span in the trace with traceID.
func (gen *idGenerator) NewSpanID(ctx context.Context, traceID trace.TraceID) trace.SpanID {
unixNano := time.Now().UnixNano()
gen.Lock()
defer gen.Unlock()
sid := trace.SpanID{}
binary.BigEndian.PutUint32(sid[:4], uint32(unixNano/spanIDPrec))
_, _ = gen.randSource.Read(sid[4:])
return sid
}