This repository has been archived by the owner on Oct 7, 2022. It is now read-only.
/
metrics.go
140 lines (126 loc) · 4.78 KB
/
metrics.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package metrics
import (
"strconv"
"sync"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
kitprometheus "github.com/go-kit/kit/metrics/prometheus"
"github.com/inloco/kafka-elasticsearch-injector/src/logger_builder"
stdprometheus "github.com/prometheus/client_golang/prometheus"
)
type metrics struct {
logger log.Logger
partitionDelay *kitprometheus.Gauge
recordsConsumed *kitprometheus.Counter
endpointLatencyHistogram *kitprometheus.Summary
bufferFullGauge *kitprometheus.Gauge
elasticsearchRetries *kitprometheus.Counter
elasticsearchConflicts *kitprometheus.Counter
elasticsearchBadRequest *kitprometheus.Counter
lock sync.RWMutex
topicPartitionToOffset map[string]map[int32]int64
}
func (m *metrics) IncrementRecordsConsumed(count int) {
m.recordsConsumed.Add(float64(count))
}
func (m *metrics) RecordEndpointLatency(latency float64) {
m.endpointLatencyHistogram.Observe(latency)
}
func (m *metrics) UpdateOffset(topic string, partition int32, offset int64) {
m.lock.Lock()
currentOffset, exists := m.topicPartitionToOffset[topic][partition]
if !exists || offset > currentOffset {
_, exists := m.topicPartitionToOffset[topic]
if !exists {
m.topicPartitionToOffset[topic] = make(map[int32]int64)
}
m.topicPartitionToOffset[topic][partition] = offset
}
m.lock.Unlock()
}
func (m *metrics) PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64) {
for topic, partitions := range highWaterMarks {
for partition, maxOffset := range partitions {
m.lock.RLock()
offset, ok := m.topicPartitionToOffset[topic][partition]
m.lock.RUnlock()
if ok {
delay := maxOffset - offset
level.Info(m.logger).Log("message", "updating partition offset metric",
"partition", partition, "maxOffset", maxOffset, "current", offset, "delay", delay)
m.partitionDelay.
With("partition", strconv.Itoa(int(partition)), "topic", topic).
Set(float64(delay))
}
}
}
}
func (m *metrics) BufferFull(full bool) {
val := 0.0
if full {
val = 1.0
}
m.bufferFullGauge.Set(val)
}
func (m *metrics) ElasticsearchRetries(count int) {
m.elasticsearchRetries.Add(float64(count))
}
func (m *metrics) ElasticsearchConflicts(count int) {
m.elasticsearchConflicts.Add(float64(count))
}
func (m *metrics) ElasticsearchBadRequests(count int) {
m.elasticsearchBadRequest.Add(float64(count))
}
type MetricsPublisher interface {
PublishOffsetMetrics(highWaterMarks map[string]map[int32]int64)
UpdateOffset(topic string, partition int32, delay int64)
IncrementRecordsConsumed(count int)
RecordEndpointLatency(latency float64)
BufferFull(full bool)
ElasticsearchRetries(count int)
ElasticsearchConflicts(count int)
ElasticsearchBadRequests(cont int)
}
func NewMetricsPublisher() MetricsPublisher {
logger := logger_builder.NewLogger("metrics_updater")
recordsConsumed := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Name: "kafka_consumer_records_consumed_successfully",
Help: "Number of records consumed successfully",
}, []string{})
partitionDelay := kitprometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Name: "kafka_consumer_partition_delay",
Help: "Kafka consumer partition delay",
}, []string{"partition", "topic"})
endpointLatencySummary := kitprometheus.NewSummaryFrom(stdprometheus.SummaryOpts{
Name: "kafka_consumer_endpoint_latency_histogram_seconds",
Help: "Kafka consumer endpoint latency histogram in seconds",
}, []string{})
bufferFullGauge := kitprometheus.NewGaugeFrom(stdprometheus.GaugeOpts{
Name: "kafka_consumer_buffer_full",
Help: "Kafka consumer boolean indicating if app buffer is full",
}, []string{})
elasticsearchRetriesCounter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Name: "elasticsearch_events_retryed",
Help: "number of events that needed to be retryed sending to Elasticsearch",
}, []string{})
elasticsearchConflictsCounter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Name: "elasticsearch_document_already_exists",
Help: "number of events that tried to be inserted on elasticsearch but alredy existed",
}, []string{})
elasticsearchBadRequestCounter := kitprometheus.NewCounterFrom(stdprometheus.CounterOpts{
Name: "elasticsearch_bad_request",
Help: "the number of malformed events",
}, []string{})
return &metrics{
logger: logger,
partitionDelay: partitionDelay,
recordsConsumed: recordsConsumed,
endpointLatencyHistogram: endpointLatencySummary,
bufferFullGauge: bufferFullGauge,
lock: sync.RWMutex{},
elasticsearchRetries: elasticsearchRetriesCounter,
elasticsearchConflicts: elasticsearchConflictsCounter,
elasticsearchBadRequest: elasticsearchBadRequestCounter,
topicPartitionToOffset: make(map[string]map[int32]int64),
}
}