-
Notifications
You must be signed in to change notification settings - Fork 67
/
ingester.go
190 lines (160 loc) 路 5.09 KB
/
ingester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
package collector
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/kubeshop/tracetest/agent/event"
"github.com/kubeshop/tracetest/server/otlp"
"go.opencensus.io/trace"
pb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
type stoppable interface {
Stop()
}
func newForwardIngester(ctx context.Context, batchTimeout time.Duration, cfg remoteIngesterConfig, startRemoteServer bool) (otlp.Ingester, error) {
ingester := &forwardIngester{
BatchTimeout: batchTimeout,
RemoteIngester: cfg,
buffer: &buffer{},
done: make(chan bool),
traceCache: cfg.traceCache,
logger: cfg.logger,
}
if startRemoteServer {
err := ingester.connectToRemoteServer(ctx)
if err != nil {
return nil, fmt.Errorf("could not connect to remote server: %w", err)
}
go ingester.startBatchWorker()
}
return ingester, nil
}
// forwardIngester forwards all incoming spans to a remote ingester. It also batches those
// spans to reduce network traffic.
type forwardIngester struct {
BatchTimeout time.Duration
RemoteIngester remoteIngesterConfig
client pb.TraceServiceClient
buffer *buffer
done chan bool
traceCache TraceCache
logger *zap.Logger
}
type remoteIngesterConfig struct {
URL string
Token string
traceCache TraceCache
startRemoteServer bool
logger *zap.Logger
observer event.Observer
}
type buffer struct {
mutex sync.Mutex
spans []*v1.ResourceSpans
}
func (i *forwardIngester) Ingest(ctx context.Context, request *pb.ExportTraceServiceRequest, requestType otlp.RequestType) (*pb.ExportTraceServiceResponse, error) {
i.buffer.mutex.Lock()
i.buffer.spans = append(i.buffer.spans, request.ResourceSpans...)
i.buffer.mutex.Unlock()
i.logger.Debug("received spans", zap.Int("count", len(request.ResourceSpans)))
if i.traceCache != nil {
i.logger.Debug("caching test spans")
// In case of OTLP datastore, those spans will be polled from this cache instead
// of a real datastore
i.cacheTestSpans(request.ResourceSpans)
}
return &pb.ExportTraceServiceResponse{
PartialSuccess: &pb.ExportTracePartialSuccess{
RejectedSpans: 0,
},
}, nil
}
func (i *forwardIngester) connectToRemoteServer(ctx context.Context) error {
conn, err := grpc.DialContext(ctx, i.RemoteIngester.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
i.logger.Error("could not connect to remote server", zap.Error(err))
return fmt.Errorf("could not connect to remote server: %w", err)
}
i.client = pb.NewTraceServiceClient(conn)
return nil
}
func (i *forwardIngester) startBatchWorker() {
i.logger.Debug("starting batch worker", zap.Duration("batch_timeout", i.BatchTimeout))
ticker := time.NewTicker(i.BatchTimeout)
done := make(chan bool)
for {
select {
case <-done:
i.logger.Debug("stopping batch worker")
return
case <-ticker.C:
i.logger.Debug("executing batch")
err := i.executeBatch(context.Background())
if err != nil {
i.logger.Error("could not execute batch", zap.Error(err))
log.Println(err)
}
}
}
}
func (i *forwardIngester) executeBatch(ctx context.Context) error {
i.buffer.mutex.Lock()
newSpans := i.buffer.spans
i.buffer.spans = []*v1.ResourceSpans{}
i.buffer.mutex.Unlock()
if len(newSpans) == 0 {
i.logger.Debug("no spans to forward")
return nil
}
err := i.forwardSpans(ctx, newSpans)
if err != nil {
i.logger.Error("could not forward spans", zap.Error(err))
return err
}
i.logger.Debug("successfully forwarded spans", zap.Int("count", len(newSpans)))
return nil
}
func (i *forwardIngester) forwardSpans(ctx context.Context, spans []*v1.ResourceSpans) error {
_, err := i.client.Export(ctx, &pb.ExportTraceServiceRequest{
ResourceSpans: spans,
})
if err != nil {
i.logger.Error("could not forward spans to remote server", zap.Error(err))
return fmt.Errorf("could not forward spans to remote server: %w", err)
}
return nil
}
func (i *forwardIngester) cacheTestSpans(resourceSpans []*v1.ResourceSpans) {
i.logger.Debug("caching test spans")
spans := make(map[string][]*v1.Span)
for _, resourceSpan := range resourceSpans {
for _, scopedSpan := range resourceSpan.ScopeSpans {
for _, span := range scopedSpan.Spans {
traceID := trace.TraceID(span.TraceId).String()
spans[traceID] = append(spans[traceID], span)
}
}
}
i.logger.Debug("caching test spans", zap.Int("count", len(spans)))
for traceID, spans := range spans {
if _, ok := i.traceCache.Get(traceID); !ok {
i.logger.Debug("traceID is not part of a test", zap.String("traceID", traceID))
// traceID is not part of a test
continue
}
i.RemoteIngester.observer.StartSpanReceive(spans)
i.traceCache.Append(traceID, spans)
i.logger.Debug("caching test spans", zap.String("traceID", traceID), zap.Int("count", len(spans)))
i.RemoteIngester.observer.EndSpanReceive(spans, nil)
}
}
func (i *forwardIngester) Stop() {
i.logger.Debug("stopping forward ingester")
i.done <- true
}