-
Notifications
You must be signed in to change notification settings - Fork 6
/
distribution_rate_calculator.go
70 lines (62 loc) · 2.07 KB
/
distribution_rate_calculator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package processors
import (
"sync"
gometrics "github.com/rcrowley/go-metrics"
log "github.com/sirupsen/logrus"
"github.com/wavefronthq/go-metrics-wavefront/reporting"
"github.com/wavefronthq/observability-for-kubernetes/collector/internal/metrics"
"github.com/wavefronthq/observability-for-kubernetes/collector/internal/wf"
)
func DuplicateHistogramCounter(name string) gometrics.Counter {
return gometrics.GetOrRegisterCounter(
reporting.EncodeKey("histograms.duplicates", map[string]string{"metricname": name}),
gometrics.DefaultRegistry,
)
}
type DistributionRateCalculator struct {
lock sync.Mutex
prevDistributions map[wf.DistributionHash]*wf.Distribution
}
func (rc *DistributionRateCalculator) Name() string {
return "distribution rate calculator"
}
func (rc *DistributionRateCalculator) Process(batch *metrics.Batch) (*metrics.Batch, error) {
rc.lock.Lock()
defer rc.lock.Unlock()
seen := map[wf.DistributionHash]struct{}{}
batch.Metrics = filterMapInPlace(func(metric wf.Metric) (wf.Metric, bool) {
distribution, ok := metric.(*wf.Distribution)
if !ok {
return metric, true
}
if _, isDuplicate := seen[distribution.Key()]; isDuplicate {
log.Warnf(
"duplicate histogram series name=%s source=%s tags=%v",
distribution.Name(), distribution.Source, distribution.Tags(),
)
DuplicateHistogramCounter(distribution.Name()).Inc(1)
return nil, false
}
rate := distribution.Rate(rc.prevDistributions[distribution.Key()])
rc.prevDistributions[distribution.Key()] = distribution.Clone()
seen[distribution.Key()] = struct{}{}
return rate, rate != nil
}, batch.Metrics)
return batch, nil
}
func filterMapInPlace(f func(wf.Metric) (wf.Metric, bool), es []wf.Metric) []wf.Metric {
newEs := es[:0]
for _, e := range es {
newE, include := f(e)
if include {
newEs = append(newEs, newE)
}
}
for i := range es[len(newEs):] {
es[len(newEs)+i] = nil
}
return newEs
}
func NewDistributionRateCalculator() *DistributionRateCalculator {
return &DistributionRateCalculator{prevDistributions: map[wf.DistributionHash]*wf.Distribution{}}
}