-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
collector.go
102 lines (84 loc) · 2.38 KB
/
collector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package v2
import (
"sync"
"time"
"github.com/influxdata/telegraf"
serializer "github.com/influxdata/telegraf/plugins/serializers/prometheus"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)
type Metric struct {
family *dto.MetricFamily
metric *dto.Metric
}
func (m *Metric) Desc() *prometheus.Desc {
labelNames := make([]string, 0, len(m.metric.Label))
for _, label := range m.metric.Label {
labelNames = append(labelNames, *label.Name)
}
desc := prometheus.NewDesc(*m.family.Name, *m.family.Help, labelNames, nil)
return desc
}
func (m *Metric) Write(out *dto.Metric) error {
out.Label = m.metric.Label
out.Counter = m.metric.Counter
out.Untyped = m.metric.Untyped
out.Gauge = m.metric.Gauge
out.Histogram = m.metric.Histogram
out.Summary = m.metric.Summary
out.TimestampMs = m.metric.TimestampMs
return nil
}
type Collector struct {
sync.Mutex
expireDuration time.Duration
coll *serializer.Collection
}
func NewCollector(
expire time.Duration,
stringsAsLabel bool,
exportTimestamp bool,
typeMapping serializer.MetricTypes,
) *Collector {
cfg := serializer.FormatConfig{
StringAsLabel: stringsAsLabel,
ExportTimestamp: exportTimestamp,
TypeMappings: typeMapping,
}
return &Collector{
expireDuration: expire,
coll: serializer.NewCollection(cfg),
}
}
func (c *Collector) Describe(_ chan<- *prometheus.Desc) {
// Sending no descriptor at all marks the Collector as "unchecked",
// i.e. no checks will be performed at registration time, and the
// Collector may yield any Metric it sees fit in its Collect method.
}
func (c *Collector) Collect(ch chan<- prometheus.Metric) {
c.Lock()
defer c.Unlock()
// Expire metrics, doing this on Collect ensure metrics are removed even if no
// new metrics are added to the output.
if c.expireDuration != 0 {
c.coll.Expire(time.Now(), c.expireDuration)
}
for _, family := range c.coll.GetProto() {
for _, metric := range family.Metric {
ch <- &Metric{family: family, metric: metric}
}
}
}
func (c *Collector) Add(metrics []telegraf.Metric) error {
c.Lock()
defer c.Unlock()
for _, metric := range metrics {
c.coll.Add(metric, time.Now())
}
// Expire metrics, doing this on Add ensure metrics are removed even if no
// one is querying the data.
if c.expireDuration != 0 {
c.coll.Expire(time.Now(), c.expireDuration)
}
return nil
}