-
Notifications
You must be signed in to change notification settings - Fork 5.5k
/
intel_pmt.go
402 lines (361 loc) · 11 KB
/
intel_pmt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
//go:generate ../../../tools/readme_config_includer/generator
//go:build linux && amd64
package intel_pmt
import (
_ "embed"
"encoding/binary"
"errors"
"fmt"
"html"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
"sync/atomic"
"github.com/PaesslerAG/gval"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/inputs"
)
//go:embed sample.conf
var sampleConfig string
var hexToDecRegex = regexp.MustCompile(`0x[0-9a-fA-F]+`)
const (
defaultPmtBasePath = "/sys/class/intel_pmt"
pluginName = "intel_pmt"
)
type pmtFileInfo []fileInfo
type fileInfo struct {
path string
numaNode string
}
type IntelPMT struct {
PmtSpec string `toml:"spec"`
DatatypeFilter []string `toml:"datatypes_enabled"`
SampleFilter []string `toml:"samples_enabled"`
Log telegraf.Logger `toml:"-"`
pmtBasePath string
reader sourceReader
pmtTelemetryFiles map[string]pmtFileInfo
pmtMetadata *pmt
pmtAggregator map[string]aggregator
pmtAggregatorInterface map[string]aggregatorInterface
pmtTransformations map[string]map[string]transformation
}
// SampleConfig returns a sample configuration (See sample.conf).
func (p *IntelPMT) SampleConfig() string {
return sampleConfig
}
// Init performs one time setup of the plugin
func (p *IntelPMT) Init() error {
err := p.checkPmtSpec()
if err != nil {
return err
}
err = p.explorePmtInSysfs()
if err != nil {
return fmt.Errorf("error while exploring pmt sysfs: %w", err)
}
return p.parseXMLs()
}
// Gather collects the plugin's metrics.
func (p *IntelPMT) Gather(acc telegraf.Accumulator) error {
var wg sync.WaitGroup
var hasError atomic.Bool
for guid := range p.pmtTelemetryFiles {
wg.Add(1)
go func(guid string, fileInfo []fileInfo) {
defer wg.Done()
for _, info := range fileInfo {
data, err := os.ReadFile(info.path)
if err != nil {
hasError.Store(true)
acc.AddError(fmt.Errorf("gathering metrics failed: %w", err))
return
}
err = p.aggregateSamples(acc, guid, data, info.numaNode)
if err != nil {
hasError.Store(true)
acc.AddError(fmt.Errorf("gathering metrics failed: %w", err))
return
}
}
}(guid, p.pmtTelemetryFiles[guid])
}
wg.Wait()
if hasError.Load() {
return errors.New("error(s) occurred while gathering metrics")
}
return nil
}
// checkPmtSpec checks if provided PmtSpec is correct and readable.
//
// PmtSpec is expected to be an absolute filepath.
//
// Returns:
//
// error - error if PmtSpec is invalid, not readable, or not absolute.
func (p *IntelPMT) checkPmtSpec() error {
if p.PmtSpec == "" {
return errors.New("pmt spec is empty")
}
if !isFileReadable(p.PmtSpec) {
return fmt.Errorf("provided pmt spec is not readable %q", p.PmtSpec)
}
lastSlash := strings.LastIndex(p.PmtSpec, "/")
// if PmtSpec contains no "/"
if lastSlash == -1 {
return errors.New("provided pmt spec is not an absolute path")
}
p.pmtBasePath = p.PmtSpec[:lastSlash]
p.reader = fileReader{}
return nil
}
// explorePmtInSysfs finds necessary paths in pmt sysfs.
//
// This method finds "telem" files, used to retrieve telemetry values
// and saves them under their corresponding GUID.
// It also finds which NUMA node the samples belong to.
//
// Returns:
//
// error - error if any of the operations failed.
func (p *IntelPMT) explorePmtInSysfs() error {
pmtDirectories, err := os.ReadDir(defaultPmtBasePath)
if err != nil {
return fmt.Errorf("error reading pmt directory: %w", err)
}
p.pmtTelemetryFiles = make(map[string]pmtFileInfo)
for _, dir := range pmtDirectories {
if !strings.HasPrefix(dir.Name(), "telem") {
continue
}
telemDirPath := filepath.Join(defaultPmtBasePath, dir.Name())
symlinkInfo, err := os.Stat(telemDirPath)
if err != nil {
return fmt.Errorf("error resolving symlink for directory %q: %w", telemDirPath, err)
}
if !symlinkInfo.IsDir() {
continue
}
pmtGUIDPath := filepath.Join(telemDirPath, "guid")
rawGUID, err := os.ReadFile(pmtGUIDPath)
if err != nil {
return fmt.Errorf("cannot read GUID: %w", err)
}
// cut the newline char
tID := strings.TrimSpace(string(rawGUID))
telemPath := filepath.Join(telemDirPath, "telem")
if !isFileReadable(telemPath) {
p.Log.Warnf("telem file is not readable %q", telemPath)
continue
}
telemDevicePath := filepath.Join(telemDirPath, "device")
telemDeviceSymlink, err := filepath.EvalSymlinks(telemDevicePath)
if err != nil {
return fmt.Errorf("error while evaluating symlink %q: %w", telemDeviceSymlink, err)
}
numaNodePath := filepath.Join(telemDeviceSymlink, "..", "numa_node")
numaNode, err := os.ReadFile(numaNodePath)
if err != nil {
return fmt.Errorf("error while reading numa_node file %q: %w", numaNodePath, err)
}
numaNodeString := strings.TrimSpace(string(numaNode))
if numaNodeString == "" {
return fmt.Errorf("numa_node file %q is empty", numaNodePath)
}
fi := fileInfo{
path: telemPath,
numaNode: numaNodeString,
}
p.pmtTelemetryFiles[tID] = append(p.pmtTelemetryFiles[tID], fi)
}
if len(p.pmtTelemetryFiles) == 0 {
return errors.New("no telemetry sources found - current platform doesn't support PMT or proper permissions needed to read them")
}
return nil
}
func isFileReadable(path string) bool {
if _, err := os.Stat(path); err != nil {
return false
}
file, err := os.Open(path)
if err != nil {
return false
}
file.Close()
return true
}
// getSampleValues reads all sample values for all sample groups.
//
// This method reads all telemetry samples for given GUID from given data
// and saves it in results map.
//
// Parameters:
//
// guid - GUID saying which Aggregator XML will be read.
// data - data read from "telem" file.
//
// Returns:
//
// map[string]uint64 - results map with read data.
// error - error if getting any of the values failed.
func (p *IntelPMT) getSampleValues(guid string, data []byte) (map[string]uint64, error) {
results := make(map[string]uint64)
for _, group := range p.pmtAggregator[guid].SampleGroup {
// Determine starting position of the Sample Group.
// Each Sample Group occupies 8 bytes.
offset := 8 * group.SampleID
for _, sample := range group.Sample {
var err error
results[sample.SampleID], err = getTelemSample(sample, data, offset)
if err != nil {
return nil, err
}
}
}
return results, nil
}
// getTelemSample extracts a telemetry sample from a given buffer.
//
// This function uses offset as a starting position.
// Then it uses LSB and MSB from sample to determine which bits
// to read from the given buffer.
//
// Parameters:
//
// s - sample from Aggregator XML containing LSB and MSB info.
// buf - the byte buffer containing the telemetry data.
// offset - the starting position (in bytes) in the buffer.
//
// Returns:
//
// uint64 - the extracted sample as a 64-bit unsigned integer.
// error - error if offset+8 exceeds the size of the buffer.
func getTelemSample(s sample, buf []byte, offset uint64) (uint64, error) {
if len(buf) < int(offset+8) {
return 0, fmt.Errorf("error reading telemetry sample: insufficient bytes from offset %d in buffer of size %d", offset, len(buf))
}
data := binary.LittleEndian.Uint64(buf[offset : offset+8])
// Apply mask and shift right
value := (data & s.mask) >> s.Lsb
return value, nil
}
// aggregateSamples outputs transformed metrics to Telegraf.
//
// This method transforms low level samples
// into high-level samples with appropriate transformation equation.
// Then it creates fields and tags and adds them to Telegraf Accumulator.
//
// Parameters:
//
// guid - GUID saying which Aggregator Interface will be read.
// data - contents of the "telem" file.
// numaNode - which NUMA node this sample belongs to.
// acc - Telegraf Accumulator.
//
// Returns:
//
// error - error if getting values has failed, if sample IDref is missing or if equation evaluation has failed.
func (p *IntelPMT) aggregateSamples(acc telegraf.Accumulator, guid string, data []byte, numaNode string) error {
results, err := p.getSampleValues(guid, data)
if err != nil {
return err
}
for _, sample := range p.pmtAggregatorInterface[guid].AggregatorSamples.AggregatorSample {
parameters := make(map[string]interface{})
for _, input := range sample.TransformInputs.TransformInput {
if _, ok := results[input.SampleIDREF]; !ok {
return fmt.Errorf("sample with IDREF %q has not been found", input.SampleIDREF)
}
parameters[input.VarName] = results[input.SampleIDREF]
}
eq := transformEquation(p.pmtTransformations[guid][sample.TransformREF].Transform)
res, err := eval(eq, parameters)
if err != nil {
return fmt.Errorf("error during eval of sample %q: %w", sample.SampleName, err)
}
fields := map[string]interface{}{
"value": res,
}
tags := map[string]string{
"guid": guid,
"numa_node": numaNode,
"sample_name": sample.SampleName,
"sample_group": sample.SampleGroup,
"datatype_idref": sample.DatatypeIDRef,
}
if sample.core != "" {
tags["core"] = sample.core
}
if sample.cha != "" {
tags["cha"] = sample.cha
}
acc.AddFields(pluginName, fields, tags)
}
return nil
}
// transformEquation changes the equation string to be ready for eval.
//
// This function removes "$" signs, which prefixes every parameter in equations.
// Then escapes special characters from XML
// like "<" into "<", "&" into "&" and ">" into ">"
// so they can be used in evaluation.
//
// Parameters:
//
// eq - string which should be transformed.
//
// Returns:
//
// string - transformed string.
func transformEquation(eq string) string {
withoutDollar := strings.ReplaceAll(eq, "$", "")
decoded := html.UnescapeString(withoutDollar)
return decoded
}
// eval calculates the value of given equation for given parameters.
//
// This function evaluates arbitrary equations with parameters.
// It substitutes the parameters in the equation with their values
// and calculates its value.
// Example: equation "a + b", with params: a: 2, b: 3.
// a and b will be substituted with their values so the equation becomes "2 + 3".
// If any of the parameters are missing then the equation is invalid and returns an error.
// Parameters:
//
// eq - equation which should be calculated.
// params - parameters to substitute in the equation.
//
// Returns:
//
// interface - the value of calculation.
// error - error if the equation is empty, if hex to dec conversion failed or if the equation is invalid.
func eval(eq string, params map[string]interface{}) (interface{}, error) {
if eq == "" {
return nil, errors.New("no transformation equation found")
}
// gval doesn't support hexadecimals
eq = hexToDecRegex.ReplaceAllStringFunc(eq, hexToDec)
if eq == "" {
return nil, fmt.Errorf("error during hex to decimal conversion")
}
result, err := gval.Evaluate(eq, params)
if err != nil {
return nil, err
}
return result, nil
}
func hexToDec(hexStr string) string {
dec, err := strconv.ParseInt(hexStr, 0, 64)
if err != nil {
return ""
}
return strconv.FormatInt(dec, 10)
}
func init() {
inputs.Add(pluginName, func() telegraf.Input {
return &IntelPMT{}
})
}