-
Notifications
You must be signed in to change notification settings - Fork 5.6k
/
selfstat.go
207 lines (176 loc) · 5.1 KB
/
selfstat.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
// selfstat is a package for tracking and collecting internal statistics
// about telegraf. Metrics can be registered using this package, and then
// incremented or set within your code. If the inputs.internal plugin is enabled,
// then all registered stats will be collected as they would by any other input
// plugin.
package selfstat
import (
"hash/fnv"
"log"
"sort"
"sync"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
)
var (
registry *Registry
)
// Stat is an interface for dealing with telegraf statistics collected
// on itself.
type Stat interface {
// Name is the name of the measurement
Name() string
// FieldName is the name of the measurement field
FieldName() string
// Tags is a tag map. Each time this is called a new map is allocated.
Tags() map[string]string
// Incr increments a regular stat by 'v'.
// in the case of a timing stat, increment adds the timing to the cache.
Incr(v int64)
// Set sets a regular stat to 'v'.
// in the case of a timing stat, set adds the timing to the cache.
Set(v int64)
// Get gets the value of the stat. In the case of timings, this returns
// an average value of all timings received since the last call to Get().
// If no timings were received, it returns the previous value.
Get() int64
}
// Register registers the given measurement, field, and tags in the selfstat
// registry. If given an identical measurement, it will return the stat that's
// already been registered.
//
// The returned Stat can be incremented by the consumer of Register(), and it's
// value will be returned as a telegraf metric when Metrics() is called.
func Register(measurement, field string, tags map[string]string) Stat {
return registry.register("internal_"+measurement, field, tags)
}
// RegisterTiming registers the given measurement, field, and tags in the selfstat
// registry. If given an identical measurement, it will return the stat that's
// already been registered.
//
// Timing stats differ from regular stats in that they accumulate multiple
// "timings" added to them, and will return the average when Get() is called.
// After Get() is called, the average is cleared and the next timing returned
// from Get() will only reflect timings added since the previous call to Get().
// If Get() is called without receiving any new timings, then the previous value
// is used.
//
// In other words, timings are an averaged metric that get cleared on each call
// to Get().
//
// The returned Stat can be incremented by the consumer of Register(), and it's
// value will be returned as a telegraf metric when Metrics() is called.
func RegisterTiming(measurement, field string, tags map[string]string) Stat {
return registry.registerTiming("internal_"+measurement, field, tags)
}
// Metrics returns all registered stats as telegraf metrics.
func Metrics() []telegraf.Metric {
registry.mu.Lock()
now := time.Now()
metrics := make([]telegraf.Metric, len(registry.stats))
i := 0
for _, stats := range registry.stats {
if len(stats) > 0 {
var tags map[string]string
var name string
fields := map[string]interface{}{}
j := 0
for fieldname, stat := range stats {
if j == 0 {
tags = stat.Tags()
name = stat.Name()
}
fields[fieldname] = stat.Get()
j++
}
metric, err := metric.New(name, tags, fields, now)
if err != nil {
log.Printf("E! Error creating selfstat metric: %s", err)
continue
}
metrics[i] = metric
i++
}
}
registry.mu.Unlock()
return metrics
}
type Registry struct {
stats map[uint64]map[string]Stat
mu sync.Mutex
}
func (r *Registry) register(measurement, field string, tags map[string]string) Stat {
r.mu.Lock()
defer r.mu.Unlock()
key := key(measurement, tags)
if stat, ok := registry.get(key, field); ok {
return stat
}
t := make(map[string]string, len(tags))
for k, v := range tags {
t[k] = v
}
s := &stat{
measurement: measurement,
field: field,
tags: t,
}
registry.set(key, s)
return s
}
func (r *Registry) registerTiming(measurement, field string, tags map[string]string) Stat {
r.mu.Lock()
defer r.mu.Unlock()
key := key(measurement, tags)
if stat, ok := registry.get(key, field); ok {
return stat
}
t := make(map[string]string, len(tags))
for k, v := range tags {
t[k] = v
}
s := &timingStat{
measurement: measurement,
field: field,
tags: t,
}
registry.set(key, s)
return s
}
func (r *Registry) get(key uint64, field string) (Stat, bool) {
if _, ok := r.stats[key]; !ok {
return nil, false
}
if stat, ok := r.stats[key][field]; ok {
return stat, true
}
return nil, false
}
func (r *Registry) set(key uint64, s Stat) {
if _, ok := r.stats[key]; !ok {
r.stats[key] = make(map[string]Stat)
}
r.stats[key][s.FieldName()] = s
return
}
func key(measurement string, tags map[string]string) uint64 {
h := fnv.New64a()
h.Write([]byte(measurement))
tmp := make([]string, len(tags))
i := 0
for k, v := range tags {
tmp[i] = k + v
i++
}
sort.Strings(tmp)
for _, s := range tmp {
h.Write([]byte(s))
}
return h.Sum64()
}
func init() {
registry = &Registry{
stats: make(map[uint64]map[string]Stat),
}
}