-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
worker.go
109 lines (96 loc) · 3.21 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
package metrics
import (
"context"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/resource"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
type worker struct {
running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test
metricName string // name of metric to generate
metricType metricType // type of metric to generate
exemplars []metricdata.Exemplar[int64] // exemplars to attach to the metric
numMetrics int // how many metrics the worker has to generate (only when duration==0)
totalDuration time.Duration // how long to run the test for (overrides `numMetrics`)
limitPerSecond rate.Limit // how many metrics per second to generate
wg *sync.WaitGroup // notify when done
logger *zap.Logger // logger
index int // worker index
}
func (w worker) simulateMetrics(res *resource.Resource, exporterFunc func() (sdkmetric.Exporter, error), signalAttrs []attribute.KeyValue) {
limiter := rate.NewLimiter(w.limitPerSecond, 1)
exporter, err := exporterFunc()
if err != nil {
w.logger.Error("failed to create the exporter", zap.Error(err))
return
}
defer func() {
w.logger.Info("stopping the exporter")
if tempError := exporter.Shutdown(context.Background()); tempError != nil {
w.logger.Error("failed to stop the exporter", zap.Error(tempError))
}
}()
var i int64
for w.running.Load() {
var metrics []metricdata.Metrics
switch w.metricType {
case metricTypeGauge:
metrics = append(metrics, metricdata.Metrics{
Name: w.metricName,
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Time: time.Now(),
Value: i,
Attributes: attribute.NewSet(signalAttrs...),
Exemplars: w.exemplars,
},
},
},
})
case metricTypeSum:
metrics = append(metrics, metricdata.Metrics{
Name: w.metricName,
Data: metricdata.Sum[int64]{
IsMonotonic: true,
Temporality: metricdata.CumulativeTemporality,
DataPoints: []metricdata.DataPoint[int64]{
{
StartTime: time.Now().Add(-1 * time.Second),
Time: time.Now(),
Value: i,
Attributes: attribute.NewSet(signalAttrs...),
Exemplars: w.exemplars,
},
},
},
})
default:
w.logger.Fatal("unknown metric type")
}
rm := metricdata.ResourceMetrics{
Resource: res,
ScopeMetrics: []metricdata.ScopeMetrics{{Metrics: metrics}},
}
if err := exporter.Export(context.Background(), &rm); err != nil {
w.logger.Fatal("exporter failed", zap.Error(err))
}
if err := limiter.Wait(context.Background()); err != nil {
w.logger.Fatal("limiter wait failed, retry", zap.Error(err))
}
i++
if w.numMetrics != 0 && i >= int64(w.numMetrics) {
break
}
}
w.logger.Info("metrics generated", zap.Int64("metrics", i))
w.wg.Done()
}