forked from contiv/vpp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
plugin_impl_statscollector.go
executable file
·354 lines (312 loc) · 10.6 KB
/
plugin_impl_statscollector.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
package statscollector
import (
"strings"
"sync"
"time"
"github.com/contiv/vpp/plugins/contiv"
"github.com/contiv/vpp/plugins/contiv/containeridx"
"github.com/golang/protobuf/proto"
"github.com/ligato/cn-infra/datasync"
"github.com/ligato/cn-infra/infra"
prometheusplugin "github.com/ligato/cn-infra/rpc/prometheus"
"github.com/ligato/cn-infra/servicelabel"
"github.com/ligato/vpp-agent/plugins/vpp/model/interfaces"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
const (
// path where the statistics are exposed
prometheusStatsPath = "/stats"
podNameLabel = "podName"
podNamespaceLabel = "podNamespace"
interfaceNameLabel = "interfaceName"
nodeLabel = "node"
inPacketsMetric = "inPackets"
outPacketsMetric = "outPackets"
inBytesMetric = "inBytes"
outBytesMetric = "outBytes"
dropPacketsMetric = "dropPackets"
puntPacketsMetric = "puntPackets"
ipv4PacketsMetric = "ipv4Packets"
ipv6PacketsMetric = "ipv6Packets"
inNobufPacketsMetric = "inNobufPackets"
inMissPacketsMetric = "inMissPackets"
inErrorPacketsMetric = "inErrorPackets"
outErrorPacketsMetric = "outErrorPackets"
)
var systemIfNames = []string{"afpacket-vpp2", "vpp2", "tap-vpp2", "vxlanBVI", "loopbackNIC", "GigabitEthernet"}
// Plugin collects the statistics from vpp interfaces and publishes them to prometheus.
type Plugin struct {
Deps
sync.Mutex
ifStats map[string]*stats
closeCh chan interface{}
gaugeVecs map[string]*prometheus.GaugeVec
podIfs map[string] /*pod namespace*/ map[string] /*pod name*/ []string /*stats keys*/
}
type stats struct {
podName string
podNamespace string
data *interfaces.InterfacesState_Interface
metrics map[string]prometheus.Gauge
}
// Deps groups the dependencies of the Plugin.
type Deps struct {
infra.PluginDeps
ServiceLabel servicelabel.ReaderAPI
// Contiv plugin is used to lookup pod related to interfaces statistics
Contiv contiv.API
// Prometheus plugin used to stream statistics
Prometheus prometheusplugin.API
}
// Init initializes the plugin resources
func (p *Plugin) Init() error {
p.closeCh = make(chan interface{})
p.ifStats = map[string]*stats{}
p.podIfs = map[string]map[string][]string{}
p.gaugeVecs = map[string]*prometheus.GaugeVec{}
if p.Prometheus != nil {
// create new registry for statistics
err := p.Prometheus.NewRegistry(prometheusStatsPath, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError, ErrorLog: p.Log})
if err != nil {
return err
}
// initialize gauge vectors for statistics
for _, statItem := range [][2]string{
{inPacketsMetric, "Number of received packets for interface"},
{outPacketsMetric, "Number of transmitted packets for interface"},
{inBytesMetric, "Number of received bytes for interface"},
{outBytesMetric, "Number of transmitted bytes for interface"},
{dropPacketsMetric, "Number of dropped packets for interface"},
{puntPacketsMetric, "Number of punt packets for interface"},
{ipv4PacketsMetric, "Number of ipv4 packets for interface"},
{ipv6PacketsMetric, "Number of ipv6 packets for interface"},
{inNobufPacketsMetric, "Number of received packets ??? for interface"}, // TODO describe metric
{inMissPacketsMetric, "Number of missed packets for interface"},
{inErrorPacketsMetric, "Number of received packets with error for interface"},
{outErrorPacketsMetric, "Number of transmitted packets with error for interface"},
} {
name := statItem[0]
help := statItem[1]
p.gaugeVecs[name] = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: name,
Help: help,
ConstLabels: prometheus.Labels{
nodeLabel: p.ServiceLabel.GetAgentLabel(),
},
}, []string{podNameLabel, podNamespaceLabel, interfaceNameLabel})
}
// register created vectors to prometheus
for name, metric := range p.gaugeVecs {
err = p.Prometheus.Register(prometheusStatsPath, metric)
if err != nil {
p.Log.Errorf("failed to register %v metric %v", name, err)
return err
}
}
}
go p.PrintStats()
return nil
}
// processPodEvent processes pod delete events; it removes the deleted pod's
// stats entry from the stats set reported to Prometheus.
func (p *Plugin) processPodEvent(event containeridx.ChangeEvent) {
p.Lock()
defer p.Unlock()
var err error
if !event.Del {
return
}
nsmap, exists := p.podIfs[event.Value.PodNamespace]
if !exists {
return
}
ifs, exists := nsmap[event.Value.PodName]
if !exists {
return
}
for _, key := range ifs {
entry, exists := p.ifStats[key]
if exists && entry != nil {
// remove gauge with corresponding labels from each vector
for _, vec := range p.gaugeVecs {
vec.Delete(prometheus.Labels{
podNameLabel: entry.podName,
podNamespaceLabel: entry.podNamespace,
interfaceNameLabel: entry.data.Name,
})
if err != nil {
p.Log.Error(err)
}
}
}
delete(p.ifStats, key)
}
}
// AfterInit subscribes for monitoring of changes in ContainerIndex
func (p *Plugin) AfterInit() error {
// watch containerIDX and remove gauges of pods that have been deleted
return p.Contiv.GetContainerIndex().Watch(p.PluginName, func(event containeridx.ChangeEvent) {
p.processPodEvent(event)
})
}
// Close cleans up the plugin resources
func (p *Plugin) Close() error {
close(p.closeCh)
return nil
}
// PrintStats periodically dumps stats to log
func (p *Plugin) PrintStats() {
for {
select {
case <-p.closeCh:
return
case <-time.After(10 * time.Second):
p.Lock()
for _, v := range p.ifStats {
p.Log.Debugf("%v %v %v %+v", v.data.Name, v.podName, v.podNamespace, *v.data.Statistics)
}
p.Unlock()
}
}
}
// Put updates the statistics for the given key
func (p *Plugin) Put(key string, data proto.Message, opts ...datasync.PutOption) error {
p.Lock()
defer p.Unlock()
p.Log.Debugf("Statistic data with key %v received", key)
if strings.HasPrefix(key, interfaces.StatePrefix) {
var (
entry *stats
found bool
)
if st, ok := data.(*interfaces.InterfacesState_Interface); ok {
entry, found = p.ifStats[key]
if found && entry.podName != "" {
// interface is associated with a pod and we're already streaming its statistics
entry.data = st
p.ifStats[key] = entry
p.updatePrometheusStats(entry)
} else {
// adding stats for new interface
var created bool
entry, created = p.addNewEntry(key, st)
if created {
p.ifStats[key] = entry
p.updatePrometheusStats(entry)
}
}
} else {
p.Log.Warn("Unable to decode received stats")
}
}
return nil
}
// RegisterGaugeFunc registers a new gauge with specific name, help string and valueFunc to report status when invoked.
func (p *Plugin) RegisterGaugeFunc(name string, help string, valueFunc func() float64) {
p.Lock()
defer p.Unlock()
p.Log.Debugf("Registering new gauge: %s", name)
if p.Prometheus != nil {
promLables := prometheus.Labels{
nodeLabel: p.ServiceLabel.GetAgentLabel(),
}
p.Prometheus.RegisterGaugeFunc(prometheusStatsPath, "", "", name, help, promLables, valueFunc)
}
}
func (p *Plugin) addNewEntry(key string, data *interfaces.InterfacesState_Interface) (newEntry *stats, created bool) {
var (
err error
entry *stats
podName, podNs string
found bool
)
const contivSystemInterfacePlaceholder = "--"
contivInterface := p.isContivSystemInterface(data.Name)
if contivInterface {
podName = contivSystemInterfacePlaceholder
podNs = contivSystemInterfacePlaceholder
} else {
podNs, podName, found = p.Contiv.GetPodByIf(data.Name)
}
if found {
// add entry into pod - interface mapping, in order to allow
// deletion of metrics when a pod is undeployed
nsmap, exists := p.podIfs[podNs]
if !exists {
nsmap = map[string][]string{}
p.podIfs[podNs] = nsmap
}
nsmap[podName] = append(nsmap[podName], key)
}
if found || contivInterface {
entry = &stats{
podName: podName,
podNamespace: podNs,
data: data,
metrics: map[string]prometheus.Gauge{},
}
// add gauges with corresponding labels into vectors
for k, vec := range p.gaugeVecs {
entry.metrics[k], err = vec.GetMetricWith(prometheus.Labels{
podNameLabel: podName,
podNamespaceLabel: podNs,
interfaceNameLabel: data.Name,
})
if err != nil {
p.Log.Error(err)
}
}
return entry, true
}
return nil, false
}
// updatePrometheusStats publishes the statistics for the given interfaces into prometheus
func (p *Plugin) updatePrometheusStats(entry *stats) {
if inPacket, found := entry.metrics[inPacketsMetric]; found && inPacket != nil {
inPacket.Set(float64(entry.data.Statistics.InPackets))
}
if outPacket, found := entry.metrics[outPacketsMetric]; found && outPacket != nil {
outPacket.Set(float64(entry.data.Statistics.OutPackets))
}
if inBytes, found := entry.metrics[inBytesMetric]; found && inBytes != nil {
inBytes.Set(float64(entry.data.Statistics.InBytes))
}
if outBytes, found := entry.metrics[outBytesMetric]; found && outBytes != nil {
outBytes.Set(float64(entry.data.Statistics.OutBytes))
}
if dropPacket, found := entry.metrics[dropPacketsMetric]; found && dropPacket != nil {
dropPacket.Set(float64(entry.data.Statistics.DropPackets))
}
if puntPacket, found := entry.metrics[puntPacketsMetric]; found && puntPacket != nil {
puntPacket.Set(float64(entry.data.Statistics.PuntPackets))
}
if ipv4Packet, found := entry.metrics[ipv4PacketsMetric]; found && ipv4Packet != nil {
ipv4Packet.Set(float64(entry.data.Statistics.Ipv4Packets))
}
if ipv6Packet, found := entry.metrics[ipv6PacketsMetric]; found && ipv6Packet != nil {
ipv6Packet.Set(float64(entry.data.Statistics.Ipv6Packets))
}
if inNoBufPacket, found := entry.metrics[inNobufPacketsMetric]; found && inNoBufPacket != nil {
inNoBufPacket.Set(float64(entry.data.Statistics.InNobufPackets))
}
if inMissPacket, found := entry.metrics[inMissPacketsMetric]; found && inMissPacket != nil {
inMissPacket.Set(float64(entry.data.Statistics.InMissPackets))
}
if inErrorPacket, found := entry.metrics[inErrorPacketsMetric]; found && inErrorPacket != nil {
inErrorPacket.Set(float64(entry.data.Statistics.InErrorPackets))
}
if outErrorPacket, found := entry.metrics[outErrorPacketsMetric]; found && outErrorPacket != nil {
outErrorPacket.Set(float64(entry.data.Statistics.OutErrorPackets))
}
}
// isContivSystemInterface returns true if given interface name is not associated
// with a pod (e.g. interface that interconnect vpp and host stack), otherwise false
func (p *Plugin) isContivSystemInterface(ifName string) bool {
for _, n := range systemIfNames {
if strings.HasPrefix(ifName, n) {
return true
}
}
return false
}