This repository has been archived by the owner on Mar 30, 2020. It is now read-only.
/
metrics.go
119 lines (95 loc) · 2.2 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package processor
import (
"regexp"
"strings"
"sync"
"time"
"github.com/mcuadros/harvester/src/intf"
. "github.com/mcuadros/harvester/src/logger"
. "github.com/mcuadros/harvester/src/processor/metric"
)
type MetricsConfig struct {
Flush int
Metrics string
}
type Metrics struct {
metrics []Metric
flush time.Duration
mutex sync.Mutex
channel chan intf.Record
isAlive bool
}
type Metric interface {
Process(record intf.Record)
GetValue() interface{}
GetField() string
Reset()
}
var configRegExp = regexp.MustCompile("^\\((\\w+)\\)(\\w+)$")
func NewMetrics(config *MetricsConfig) *Metrics {
processor := new(Metrics)
processor.SetConfig(config)
processor.Setup()
return processor
}
func (p *Metrics) SetConfig(config *MetricsConfig) {
p.flush = time.Duration(config.Flush)
p.parseMetricsConfig(config.Metrics)
}
func (p *Metrics) SetChannel(channel chan intf.Record) {
p.channel = channel
}
func (p *Metrics) parseMetricsConfig(metricsConfig string) {
for _, config := range strings.Split(metricsConfig, ",") {
var metric Metric
class, field := p.parseMetric(config)
switch class {
case "terms":
metric = NewTerms(field)
case "histogram":
metric = NewHistogram(field)
default:
Critical("Unknown metric \"%s\", valid: [terms histogram]", class)
}
p.metrics = append(p.metrics, metric)
}
}
func (p *Metrics) parseMetric(metric string) (class string, field string) {
config := configRegExp.FindStringSubmatch(metric)
if len(config) != 3 {
Critical("Malformed metric config \"%s\"", metric)
}
return config[1], config[2]
}
func (p *Metrics) Do(record intf.Record) bool {
p.mutex.Lock()
for _, metric := range p.metrics {
metric.Process(record)
}
p.mutex.Unlock()
return false
}
func (p *Metrics) Setup() {
p.isAlive = true
go p.deliveryRecord()
}
func (p *Metrics) deliveryRecord() {
Debug("Started metrics routine")
for {
time.Sleep(p.flush * time.Second)
if p.isAlive {
p.flushMetrics()
}
}
}
func (p *Metrics) flushMetrics() {
var record = make(map[string]interface{})
for _, metric := range p.metrics {
record[metric.GetField()] = metric.GetValue()
}
p.channel <- record
}
func (p *Metrics) Teardown() {
p.isAlive = false
p.flushMetrics()
}