-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
handler.go
230 lines (201 loc) · 6.51 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package gnmi
import (
"context"
"crypto/tls"
"fmt"
"io"
"net"
"path"
"strings"
"time"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/metric"
gnmiLib "github.com/openconfig/gnmi/proto/gnmi"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/encoding/protojson"
)
type handler struct {
address string
aliases map[string]string
tagsubs []TagSubscription
emptyNameWarnShown bool
tagStore *tagStore
trace bool
log telegraf.Logger
}
func newHandler(addr string, aliases map[string]string, subs []TagSubscription, l telegraf.Logger, trace bool) *handler {
return &handler{
address: addr,
aliases: aliases,
tagsubs: subs,
tagStore: newTagStore(subs),
trace: trace,
log: l,
}
}
// SubscribeGNMI and extract telemetry data
func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, tlscfg *tls.Config, request *gnmiLib.SubscribeRequest) error {
var creds credentials.TransportCredentials
if tlscfg != nil {
creds = credentials.NewTLS(tlscfg)
} else {
creds = insecure.NewCredentials()
}
opts := []grpc.DialOption{
grpc.WithTransportCredentials(creds),
}
client, err := grpc.DialContext(ctx, h.address, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %v", err)
}
defer client.Close()
subscribeClient, err := gnmiLib.NewGNMIClient(client).Subscribe(ctx)
if err != nil {
return fmt.Errorf("failed to setup subscription: %w", err)
}
// If io.EOF is returned, the stream may have ended and stream status
// can be determined by calling Recv.
if err := subscribeClient.Send(request); err != nil && err != io.EOF {
return fmt.Errorf("failed to send subscription request: %w", err)
}
h.log.Debugf("Connection to gNMI device %s established", h.address)
defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
for ctx.Err() == nil {
var reply *gnmiLib.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
if err != io.EOF && ctx.Err() == nil {
return fmt.Errorf("aborted gNMI subscription: %w", err)
}
break
}
if h.trace {
buf, err := protojson.Marshal(reply)
if err != nil {
h.log.Debugf("marshal failed: %v", err)
} else {
t := reply.GetUpdate().GetTimestamp()
h.log.Debugf("update_%v: %s", t, string(buf))
}
}
if response, ok := reply.Response.(*gnmiLib.SubscribeResponse_Update); ok {
h.handleSubscribeResponseUpdate(acc, response)
}
}
return nil
}
// Handle SubscribeResponse_Update message from gNMI and parse contained telemetry data
func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, response *gnmiLib.SubscribeResponse_Update) {
var prefix, prefixAliasPath string
grouper := metric.NewSeriesGrouper()
timestamp := time.Unix(0, response.Update.Timestamp)
prefixTags := make(map[string]string)
if response.Update.Prefix != nil {
var err error
if prefix, prefixAliasPath, err = handlePath(response.Update.Prefix, prefixTags, h.aliases, ""); err != nil {
h.log.Errorf("handling path %q failed: %v", response.Update.Prefix, err)
}
}
prefixTags["source"], _, _ = net.SplitHostPort(h.address)
if prefix != "" {
prefixTags["path"] = prefix
}
// Process and remove tag-updates from the response first so we will
// add all available tags to the metrics later.
var valueUpdates []*gnmiLib.Update
for _, update := range response.Update.Update {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
tags[key] = val
}
_, fields := h.handleTelemetryField(update, tags, prefix)
var tagUpdate bool
for _, tagSub := range h.tagsubs {
if !equalPathNoKeys(fullPath, tagSub.fullPath) {
continue
}
h.log.Debugf("Tag-subscription update for %q: %+v", tagSub.Name, update)
if err := h.tagStore.insert(tagSub, fullPath, fields, tags); err != nil {
h.log.Errorf("inserting tag failed: %w", err)
}
tagUpdate = true
break
}
if !tagUpdate {
valueUpdates = append(valueUpdates, update)
}
}
// Parse individual Update message and create measurements
var name, lastAliasPath string
for _, update := range valueUpdates {
fullPath := pathWithPrefix(response.Update.Prefix, update.Path)
// Prepare tags from prefix
tags := make(map[string]string, len(prefixTags))
for key, val := range prefixTags {
tags[key] = val
}
aliasPath, fields := h.handleTelemetryField(update, tags, prefix)
// Add the tags derived via tag-subscriptions
for k, v := range h.tagStore.lookup(fullPath, tags) {
tags[k] = v
}
// Inherent valid alias from prefix parsing
if len(prefixAliasPath) > 0 && len(aliasPath) == 0 {
aliasPath = prefixAliasPath
}
// Lookup alias if alias-path has changed
if aliasPath != lastAliasPath {
name = prefix
if alias, ok := h.aliases[aliasPath]; ok {
name = alias
} else {
h.log.Debugf("No measurement alias for gNMI path: %s", name)
}
lastAliasPath = aliasPath
}
// Check for empty names
if name == "" && !h.emptyNameWarnShown {
h.log.Warnf(emptyNameWarning, response.Update)
h.emptyNameWarnShown = true
}
// Group metrics
for k, v := range fields {
key := k
if len(aliasPath) < len(key) && len(aliasPath) != 0 {
// This may not be an exact prefix, due to naming style
// conversion on the key.
key = key[len(aliasPath)+1:]
} else if len(aliasPath) >= len(key) {
// Otherwise use the last path element as the field key.
key = path.Base(key)
// If there are no elements skip the item; this would be an
// invalid message.
key = strings.TrimLeft(key, "/.")
if key == "" {
h.log.Errorf("invalid empty path: %q", k)
continue
}
}
grouper.Add(name, tags, timestamp, key, v)
}
}
// Add grouped measurements
for _, metricToAdd := range grouper.Metrics() {
acc.AddMetric(metricToAdd)
}
}
// HandleTelemetryField and add it to a measurement
func (h *handler) handleTelemetryField(update *gnmiLib.Update, tags map[string]string, prefix string) (string, map[string]interface{}) {
gpath, aliasPath, err := handlePath(update.Path, tags, h.aliases, prefix)
if err != nil {
h.log.Errorf("handling path %q failed: %v", update.Path, err)
}
fields, err := gnmiToFields(strings.Replace(gpath, "-", "_", -1), update.Val)
if err != nil {
h.log.Errorf("error parsing update value %q: %v", update.Val, err)
}
return aliasPath, fields
}