forked from elastic/apm-server
-
Notifications
You must be signed in to change notification settings - Fork 0
/
processor.go
65 lines (52 loc) · 1.42 KB
/
processor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package error
import (
"github.com/santhosh-tekuri/jsonschema"
"github.com/mitchellh/mapstructure"
pr "github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/utility"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/monitoring"
)
var (
errorMetrics = monitoring.Default.NewRegistry("apm-server.processor.error")
validationCount = monitoring.NewInt(errorMetrics, "validation.count")
validationError = monitoring.NewInt(errorMetrics, "validation.errors")
transformations = monitoring.NewInt(errorMetrics, "transformations")
)
const (
processorName = "error"
)
var schema = pr.CreateSchema(errorSchema, processorName)
func NewProcessor(config *pr.Config) pr.Processor {
return &processor{schema: schema, config: config}
}
type processor struct {
schema *jsonschema.Schema
config *pr.Config
}
func (p *processor) Validate(raw map[string]interface{}) error {
validationCount.Inc()
err := pr.Validate(raw, p.schema)
if err != nil {
validationError.Inc()
}
return err
}
func (p *processor) Transform(raw interface{}) ([]beat.Event, error) {
transformations.Inc()
var pa payload
decoder, _ := mapstructure.NewDecoder(
&mapstructure.DecoderConfig{
DecodeHook: utility.RFC3339DecoderHook,
Result: &pa,
},
)
err := decoder.Decode(raw)
if err != nil {
return nil, err
}
return pa.transform(p.config), nil
}
func (p *processor) Name() string {
return processorName
}