-
Notifications
You must be signed in to change notification settings - Fork 67
/
fetcher_worker.go
109 lines (89 loc) 路 2.69 KB
/
fetcher_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
package tracepollerworker
import (
"context"
"log"
"github.com/kubeshop/tracetest/server/datastore"
"github.com/kubeshop/tracetest/server/executor"
"github.com/kubeshop/tracetest/server/pkg/pipeline"
"github.com/kubeshop/tracetest/server/resourcemanager"
"github.com/kubeshop/tracetest/server/subscription"
"github.com/kubeshop/tracetest/server/tracedb"
"go.opentelemetry.io/otel/trace"
)
type traceFetcherWorker struct {
state *workerState
outputQueue pipeline.Enqueuer[executor.Job]
enabled bool
}
func NewFetcherWorker(
eventEmitter executor.EventEmitter,
newTraceDBFn tracedb.FactoryFunc,
dsRepo resourcemanager.Current[datastore.DataStore],
updater executor.RunUpdater,
subscriptionManager *subscription.Manager,
tracer trace.Tracer,
enabled bool,
) *traceFetcherWorker {
state := &workerState{
eventEmitter: eventEmitter,
newTraceDBFn: newTraceDBFn,
dsRepo: dsRepo,
updater: updater,
subscriptionManager: subscriptionManager,
tracer: tracer,
}
return &traceFetcherWorker{state: state, enabled: enabled}
}
func (w *traceFetcherWorker) SetInputQueue(queue pipeline.Enqueuer[executor.Job]) {
w.state.inputQueue = queue
}
func (w *traceFetcherWorker) SetOutputQueue(queue pipeline.Enqueuer[executor.Job]) {
w.outputQueue = queue
}
func (w *traceFetcherWorker) ProcessItem(ctx context.Context, job executor.Job) {
if !w.enabled {
return
}
select {
default:
case <-ctx.Done():
return
}
ctx, span := w.state.tracer.Start(ctx, "Fetching trace")
defer span.End()
if job.Run.SkipTraceCollection {
w.outputQueue.Enqueue(ctx, job)
return
}
populateSpan(span, job, "", nil)
traceDB, err := getTraceDB(ctx, w.state)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: GetDataStore error: %s", job.Test.ID, job.Run.ID, err.Error())
handleError(ctx, job, err, w.state, span)
return
}
traceID := job.Run.TraceID.String()
trace, err := traceDB.GetTraceByID(ctx, traceID)
if err != nil {
log.Printf("[TracePoller] Test %s Run %d: GetTraceByID (traceID %s) error: %s", job.Test.ID, job.Run.ID, traceID, err.Error())
if isTraceNotFoundError(err) {
job.Headers.SetBool("traceNotFound", true)
} else {
job.Run.LastError = err
handleDBError(w.state.updater.Update(ctx, job.Run))
}
w.outputQueue.Enqueue(ctx, job)
return
}
spansBefore := 0
if job.Run.Trace != nil {
spansBefore = len(job.Run.Trace.Flat)
}
collectedSpans := len(trace.Flat) - spansBefore
job.Headers.SetInt("collectedSpans", collectedSpans)
job.Headers.SetBool("traceNotFound", false)
trace.ID = job.Run.TraceID
job.Run.Trace = &trace
handleDBError(w.state.updater.Update(ctx, job.Run))
w.outputQueue.Enqueue(ctx, job)
}