-
Notifications
You must be signed in to change notification settings - Fork 2
/
agent.go
135 lines (115 loc) · 3.51 KB
/
agent.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package tracer
import (
"context"
"fmt"
"io"
"net/url"
"time"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/exporters/stdout/stdouttrace"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.24.0"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type (
TraceCollector string
AgentConfig struct {
Collector TraceCollector
Endpoint string
Environment string
Name string
Version string
}
)
const (
OtplCollector TraceCollector = "otpl"
)
var traceProvider *sdktrace.TracerProvider
// start new connection to collector server
// for now only support otpl that using jaeger and prometheus
func StartAgent(c AgentConfig) (func(ctx context.Context) error, error) {
ctx := context.Background()
resOptions := resource.WithAttributes(
semconv.ServiceName(c.Name),
semconv.ServiceVersion(c.Version),
attribute.String("environment", c.Environment),
)
// create new resource
res, err := resource.New(ctx, resOptions)
if err != nil {
return nil, err
}
// create new exporter
traceExporter, err := createExporter(ctx, c)
if err != nil {
return nil, err
}
// stdOutExporter, _ := newStdOutExporter(log.Writer())
// Create new trace provicer
bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
tp := sdktrace.NewTracerProvider(
// sdktrace.WithBatcher(stdOutExporter),
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
sdktrace.WithSpanProcessor(bsp),
)
traceProvider = tp
otel.SetTracerProvider(traceProvider)
otel.SetTextMapPropagator(propagation.TraceContext{})
return traceProvider.Shutdown, nil
}
// export trace data to std.out
func newStdOutExporter(w io.Writer) (sdktrace.SpanExporter, error) {
return stdouttrace.New(
stdouttrace.WithWriter(w),
// Use human readable output.
stdouttrace.WithPrettyPrint(),
// Do not print timestamps for the demo.
stdouttrace.WithoutTimestamps(),
)
}
// create new exporter base on agent configuration
func createExporter(ctx context.Context, c AgentConfig) (sdktrace.SpanExporter, error) {
switch c.Collector {
case OtplCollector:
return createOtplExported(ctx, c.Endpoint)
}
return nil, fmt.Errorf("unsupported collector : %v", string(c.Collector))
}
// create new otpl exporter base on endpoint
// support http and grpc exporter
func createOtplExported(ctx context.Context, endpoint string) (sdktrace.SpanExporter, error) {
url, err := url.Parse(endpoint)
if err != nil {
return nil, err
}
switch url.Scheme {
case "http", "https":
traceExporter, err := otlptracehttp.New(ctx, otlptracehttp.WithEndpoint(endpoint))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
return traceExporter, nil
default:
ctxTimeout, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
conn, err := grpc.DialContext(ctxTimeout, endpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithBlock(),
)
if err != nil {
return nil, fmt.Errorf("failed to create gRPC connection to collector: %w", err)
}
traceExporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("failed to create trace exporter: %w", err)
}
return traceExporter, nil
}
}