Skip to content

Commit

Permalink
prometheus-metrics-kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
nstankov-bg committed Jul 22, 2023
1 parent 96fcd72 commit 50a01a0
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 1 deletion.
10 changes: 9 additions & 1 deletion pkg/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,15 @@ func flushMessageBuffer() {
bufferMutex.Unlock()

operation := func() error {
return writer.WriteMessages(context.Background(), tmpBuffer...)
err := writer.WriteMessages(context.Background(), tmpBuffer...)
if err == nil {
// Increment Prometheus counter
MessagesSent.Add(float64(len(tmpBuffer)))
} else {
// Increment error counter
SendErrors.Inc()
}
return err
}

// Use exponential backoff strategy
Expand Down
18 changes: 18 additions & 0 deletions pkg/kafka/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package kafka

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

// Initialize Prometheus metrics
var (
MessagesSent = promauto.NewCounter(prometheus.CounterOpts{
Name: "kafka_messages_sent_total",
Help: "The total number of messages sent",
})
SendErrors = promauto.NewCounter(prometheus.CounterOpts{
Name: "kafka_send_errors_total",
Help: "The total number of send errors",
})
)

0 comments on commit 50a01a0

Please sign in to comment.