-
Notifications
You must be signed in to change notification settings - Fork 1
/
logprocesscounterset.go
171 lines (152 loc) · 7.47 KB
/
logprocesscounterset.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package base
import (
"github.com/relex/gotils/promexporter/promext"
"github.com/relex/gotils/promexporter/promreg"
"github.com/relex/slog-agent/util"
)
// LogProcessCounterSet tracks metrics for log transform, serialization and chunk making.
//
// LogProcessCounterSet must be accessed through pointer. It's not concurrently usable. Counter-vectors and counters
// created here may duplicate with others, as long as the labels match.
//
// It tracks counters by metric keys (ex: vhost+source) that are not part of orchestration keys (ex: level), by
// creating a fixed-length array containing counters for each of key-set. The positions of final counters are decided
// during registration process.
//
// LogInputCounter's own custom counter registry is ignored here, as map access per counter update would be very
// inefficient.
type LogProcessCounterSet struct {
factory promreg.MetricCreator
metricKeyExtractor FieldSetExtractor // to extract metric keys from log records
metricKeyNames []string // label names of metric keys (ex: key_vhost)
customCounterVecMap map[string]logProcessCustomCounterVec // map of custom label => counter-vector[label], with unfilled metric key labels
keySetPairs map[string]logKeySetCounterPair // map of merged metric key => (input counter, custom counters)
serializedLengthTotal []valueCounterProvider // an array of per-output metrics counters, accessed by output index
chunksCountTotal []valueCounterProvider
chunksLengthTotal []valueCounterProvider
currentCustomCounters []*logCustomCounterImpl // custom counter array of the currently selected metric key-set
mergeKeyBuffer []byte // reused buffer to build merged metric key from record
}
// FIXME: why is logCustomCounterHost not used here?
type logProcessCustomCounterVec struct {
index int
countMetricVec *promext.LazyRWCounterVec // Use lazy-init counters as there could be many unused metrics for many pipelines
lengthMetricVec *promext.LazyRWCounterVec
}
type logKeySetCounterPair struct {
inputCounter *LogInputCounterSet
customCounters []*logCustomCounterImpl
}
// NewLogProcessCounter creates a LogProcessCounter
func NewLogProcessCounter(factory promreg.MetricCreator, schema LogSchema, keyLocators []LogFieldLocator, outputNames []string) *LogProcessCounterSet {
metricKeyNames := make([]string, len(keyLocators))
for i, loc := range keyLocators {
metricKeyNames[i] = "key_" + loc.Name(schema)
}
counter := &LogProcessCounterSet{
factory: factory,
metricKeyExtractor: *NewFieldSetExtractor(keyLocators),
metricKeyNames: metricKeyNames,
customCounterVecMap: make(map[string]logProcessCustomCounterVec, 100),
keySetPairs: make(map[string]logKeySetCounterPair, 2000),
currentCustomCounters: nil,
mergeKeyBuffer: make([]byte, 0, 200),
}
counter.serializedLengthTotal = make([]valueCounterProvider, len(outputNames))
counter.chunksCountTotal = make([]valueCounterProvider, len(outputNames))
counter.chunksLengthTotal = make([]valueCounterProvider, len(outputNames))
for i, output := range outputNames {
counter.serializedLengthTotal[i] = valueCounterProvider{
factory.AddOrGetCounter("serialized_bytes_total", "Total lengths in bytes of serialized log records", []string{"output"}, []string{output}), 0,
}
counter.chunksCountTotal[i] = valueCounterProvider{
factory.AddOrGetCounter("chunks_total", "Numbers of created chunks", []string{"output"}, []string{output}), 0,
}
counter.chunksLengthTotal[i] = valueCounterProvider{
factory.AddOrGetCounter("chunk_bytes_total", "Total length in bytes of created chunks", []string{"output"}, []string{output}), 0,
}
}
return counter
}
// RegisterCustomCounter registers a custom counter by label and count/length pointers
//
// This method must not be called in processing stage, when counters are already being selected and updated
func (pcounter *LogProcessCounterSet) RegisterCustomCounter(label string) func(length int) {
counterVec, exists := pcounter.customCounterVecMap[label]
if !exists {
counterVec = logProcessCustomCounterVec{
index: len(pcounter.customCounterVecMap),
countMetricVec: pcounter.factory.AddOrGetLazyCounterVec("labelled_records_total", "Numbers of labelled log records",
append([]string{"label"}, pcounter.metricKeyNames...), []string{label}),
lengthMetricVec: pcounter.factory.AddOrGetLazyCounterVec("labelled_record_bytes_total", "Total length in bytes of labelled log records",
append([]string{"label"}, pcounter.metricKeyNames...), []string{label}),
}
pcounter.customCounterVecMap[label] = counterVec
}
counterVecIndex := counterVec.index
return func(length int) {
c := pcounter.currentCustomCounters[counterVecIndex]
c.unwrittenCount++
c.unwrittenLength += uint64(length)
}
}
// SelectMetricKeySet switches the current metric key set to that of the given record.
//
// 1. Subsequent transforms would write counter values to the correct key-set.
//
// 2. Returns an input counter for that key-set.
func (pcounter *LogProcessCounterSet) SelectMetricKeySet(record *LogRecord) *LogInputCounterSet {
tempKeys := pcounter.metricKeyExtractor.Extract(record)
tempMergedKey := pcounter.mergeKeyBuffer
for _, tkey := range tempKeys {
tempMergedKey = append(tempMergedKey, tkey...)
}
pcounter.mergeKeyBuffer = tempMergedKey[:0]
// try to get existing counter by temp key, no new key string is created here
pair, found := pcounter.keySetPairs[string(tempMergedKey)]
if !found {
// copy transient field values from record for storing into map and counters
permKeys := util.DeepCopyStrings(tempKeys)
permMergedKey := util.DeepCopyStringFromBytes(tempMergedKey)
customCounters := make([]*logCustomCounterImpl, len(pcounter.customCounterVecMap))
for _, vec := range pcounter.customCounterVecMap {
customCounters[vec.index] = &logCustomCounterImpl{
countMetric: vec.countMetricVec.WithLabelValues(permKeys...),
lengthMetric: vec.lengthMetricVec.WithLabelValues(permKeys...),
unwrittenCount: 0,
unwrittenLength: 0,
}
}
pair = logKeySetCounterPair{
inputCounter: NewLogInputCounter(pcounter.factory.AddOrGetPrefix("", pcounter.metricKeyNames, permKeys)),
customCounters: customCounters,
}
pcounter.keySetPairs[permMergedKey] = pair
}
pcounter.currentCustomCounters = pair.customCounters
return pair.inputCounter
}
// CountStream updates counters for stream serialization
func (pcounter *LogProcessCounterSet) CountStream(outputIndex int, stream LogStream) { // xx:inline
pcounter.serializedLengthTotal[outputIndex].unwrittenValue += uint64(len(stream))
}
// CountChunk updates counters for chunk generation
func (pcounter *LogProcessCounterSet) CountChunk(outputIndex int, chunk *LogChunk) { // xx:inline
pcounter.chunksCountTotal[outputIndex].unwrittenValue++
pcounter.chunksLengthTotal[outputIndex].unwrittenValue += uint64(len(chunk.Data))
}
// UpdateMetrics writes unwritten values in the counter to underlying Prometheus counters
func (pcounter *LogProcessCounterSet) UpdateMetrics() {
for _, pair := range pcounter.keySetPairs {
pair.inputCounter.UpdateMetrics()
for _, counter := range pair.customCounters {
counter.UpdateMetrics()
}
}
// all these slices should have the same length, so we can iterate over them in one loop
for i := range pcounter.serializedLengthTotal {
pcounter.serializedLengthTotal[i].UpdateMetric()
pcounter.chunksCountTotal[i].UpdateMetric()
pcounter.chunksLengthTotal[i].UpdateMetric()
}
}