This repository has been archived by the owner on Mar 30, 2020. It is now read-only.
/
mutate.go
88 lines (74 loc) · 1.82 KB
/
mutate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package processor
import (
"regexp"
"strings"
"github.com/mcuadros/harvester/src/intf"
. "github.com/mcuadros/harvester/src/logger"
"github.com/mcuadros/harvester/src/processor/mutate"
)
const FIELDSEP = '.'
type MutateConfig struct {
Verbose bool
Cast []string
}
func (mc *MutateConfig) ParseOperations() []*mutate.Operation {
var operations []*mutate.Operation
if len(mc.Cast) > 0 {
for _, rawparams := range mc.Cast {
op := mc.parseOperation(mutate.CAST, rawparams)
operations = append(operations, op)
}
}
return operations
}
func (p *MutateConfig) parseOperation(id mutate.OperationId, rawparams string) *mutate.Operation {
// Parse the raw string into:
// * single keywords with no spaces
// * groups of single-quoted strings
re := regexp.MustCompile("[^' ]+|'[^']+'")
splitted := re.FindAllString(rawparams, -1)
for i, s := range splitted {
splitted[i] = strings.Trim(s, "'")
}
return &mutate.Operation{
Id: id,
Field: strings.Split(splitted[0], string(FIELDSEP)),
Params: splitted[1:],
}
}
type Mutate struct {
operations []*mutate.Operation
channel chan intf.Record
verbose bool
isAlive bool
}
func NewMutate(config *MutateConfig) *Mutate {
processor := Mutate{
operations: []*mutate.Operation{},
}
processor.SetConfig(config)
processor.Setup()
return &processor
}
func (p *Mutate) SetConfig(config *MutateConfig) {
p.verbose = config.Verbose
p.operations = config.ParseOperations()
}
func (p *Mutate) SetChannel(channel chan intf.Record) {
p.channel = channel
}
func (p *Mutate) Do(record intf.Record) bool {
for _, op := range p.operations {
err := op.Apply(map[string]interface{}(record))
if err != nil && p.verbose {
Warning(err.Error())
}
}
return true
}
func (p *Mutate) Setup() {
p.isAlive = true
}
func (p *Mutate) Teardown() {
p.isAlive = false
}