forked from grafana/loki
/
logfmt.go
154 lines (132 loc) · 4.25 KB
/
logfmt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package stages
import (
"fmt"
"reflect"
"strings"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/go-logfmt/logfmt"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
)
// Config Errors
const (
ErrMappingRequired = "logfmt mapping is required"
ErrEmptyLogfmtStageConfig = "empty logfmt stage configuration"
ErrEmptyLogfmtStageSource = "empty source"
)
// LogfmtConfig represents a logfmt Stage configuration
type LogfmtConfig struct {
Mapping map[string]string `mapstructure:"mapping"`
Source *string `mapstructure:"source"`
}
// validateLogfmtConfig validates a logfmt stage config and returns an inverse mapping of configured mapping.
// Mapping inverse is done to make lookup easier. The key would be the key from parsed logfmt and
// value would be the key with which the data in extracted map would be set.
func validateLogfmtConfig(c *LogfmtConfig) (map[string]string, error) {
if c == nil {
return nil, errors.New(ErrEmptyLogfmtStageConfig)
}
if len(c.Mapping) == 0 {
return nil, errors.New(ErrMappingRequired)
}
if c.Source != nil && *c.Source == "" {
return nil, errors.New(ErrEmptyLogfmtStageSource)
}
inverseMapping := make(map[string]string)
for k, v := range c.Mapping {
// if value is not set, use the key for setting data in extracted map.
if v == "" {
v = k
}
inverseMapping[v] = k
}
return inverseMapping, nil
}
// logfmtStage sets extracted data using logfmt parser
type logfmtStage struct {
cfg *LogfmtConfig
inverseMapping map[string]string
logger log.Logger
}
// newLogfmtStage creates a new logfmt pipeline stage from a config.
func newLogfmtStage(logger log.Logger, config interface{}) (Stage, error) {
cfg, err := parseLogfmtConfig(config)
if err != nil {
return nil, err
}
// inverseMapping would hold the mapping in inverse which would make lookup easier.
// To explain it simply, the key would be the key from parsed logfmt and value would be the key with which the data in extracted map would be set.
inverseMapping, err := validateLogfmtConfig(cfg)
if err != nil {
return nil, err
}
return toStage(&logfmtStage{
cfg: cfg,
inverseMapping: inverseMapping,
logger: log.With(logger, "component", "stage", "type", "logfmt"),
}), nil
}
func parseLogfmtConfig(config interface{}) (*LogfmtConfig, error) {
cfg := &LogfmtConfig{}
err := mapstructure.Decode(config, cfg)
if err != nil {
return nil, err
}
return cfg, nil
}
// Process implements Stage
func (j *logfmtStage) Process(labels model.LabelSet, extracted map[string]interface{}, t *time.Time, entry *string) {
// If a source key is provided, the logfmt stage should process it
// from the extracted map, otherwise should fallback to the entry
input := entry
if j.cfg.Source != nil {
if _, ok := extracted[*j.cfg.Source]; !ok {
if Debug {
level.Debug(j.logger).Log("msg", "source does not exist in the set of extracted values", "source", *j.cfg.Source)
}
return
}
value, err := getString(extracted[*j.cfg.Source])
if err != nil {
if Debug {
level.Debug(j.logger).Log("msg", "failed to convert source value to string", "source", *j.cfg.Source, "err", err, "type", reflect.TypeOf(extracted[*j.cfg.Source]))
}
return
}
input = &value
}
if input == nil {
if Debug {
level.Debug(j.logger).Log("msg", "cannot parse a nil entry")
}
return
}
decoder := logfmt.NewDecoder(strings.NewReader(*input))
extractedEntriesCount := 0
for decoder.ScanRecord() {
for decoder.ScanKeyval() {
mapKey, ok := j.inverseMapping[string(decoder.Key())]
if ok {
extracted[mapKey] = string(decoder.Value())
extractedEntriesCount++
}
}
}
if decoder.Err() != nil {
level.Error(j.logger).Log("msg", "failed to decode logfmt", "err", decoder.Err())
return
}
if Debug {
if extractedEntriesCount != len(j.inverseMapping) {
level.Debug(j.logger).Log("msg", fmt.Sprintf("found only %d out of %d configured mappings in logfmt stage", extractedEntriesCount, len(j.inverseMapping)))
}
level.Debug(j.logger).Log("msg", "extracted data debug in logfmt stage", "extracted data", fmt.Sprintf("%v", extracted))
}
}
// Name implements Stage
func (j *logfmtStage) Name() string {
return StageTypeLogfmt
}