Skip to content

Commit

Permalink
Improve lag calculation and error handling in lagCmd
Browse files Browse the repository at this point in the history
  • Loading branch information
jack authored and jack committed Jul 2, 2024
1 parent a971bdb commit aaae395
Showing 1 changed file with 30 additions and 17 deletions.
47 changes: 30 additions & 17 deletions cmd/kaf/topic.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package main

import (
"encoding/json"
"fmt"
"os"
"sort"
"text/tabwriter"

"strings"

"encoding/json"
"text/tabwriter"

"github.com/IBM/sarama"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -343,6 +342,8 @@ var lagCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
topic := args[0]
admin := getClusterAdmin()
defer admin.Close()

topicDetails, err := admin.DescribeTopics([]string{args[0]})
if err != nil {
errorExit("Unable to describe topics: %v\n", err)
Expand All @@ -354,35 +355,48 @@ var lagCmd = &cobra.Command{
highWatermarks := getHighWatermarks(args[0], partitions)

var groups []string
rst, err := admin.ListConsumerGroups()
consumerGroups, err := admin.ListConsumerGroups()
if err != nil {
errorExit("Unable to list consumer info: %v\n", err)
errorExit("Unable to list consumer groups: %v\n", err)
}
for group, _ := range rst {
for group := range consumerGroups {
groups = append(groups, group)
}
groupsInfo, err := admin.DescribeConsumerGroups(groups)
if err != nil {
errorExit("Unable to list consumer info: %v\n", err)
errorExit("Unable to describe consumer groups: %v\n", err)
}
var lagInfo = make(map[string]int64)
for _, v := range groupsInfo {
var sum int64
for _, member := range v.Members {
assignment, err := member.GetMemberAssignment()
if err != nil || assignment == nil {
continue
}
if _, exist := assignment.Topics[topic]; exist {
var sum int64
resp, _ := admin.ListConsumerGroupOffsets(v.GroupId, assignment.Topics)
for _, v1 := range resp.Blocks {
for pid, v2 := range v1 {
sum += highWatermarks[pid] - v2.Offset
if topicPartitions, exist := assignment.Topics[topic]; exist {
resp, err := admin.ListConsumerGroupOffsets(v.GroupId, map[string][]int32{topic: topicPartitions})
if err != nil {
fmt.Fprintf(os.Stderr, "Error fetching offsets for group %s: %v\n", v.GroupId, err)
continue
}
if blocks, ok := resp.Blocks[topic]; ok {
for pid, block := range blocks {
if hwm, ok := highWatermarks[pid]; ok {
if block.Offset > hwm {
fmt.Fprintf(os.Stderr, "Warning: Consumer offset (%d) is greater than high watermark (%d) for partition %d in group %s\n", block.Offset, hwm, pid, v.GroupId)
// You might choose to set the lag to 0 in this case
// sum += 0
} else {
sum += hwm - block.Offset
}
}
}
}
lagInfo[v.GroupId] = sum
}

}
if sum > 0 {
lagInfo[v.GroupId] = sum
}
}
w := tabwriter.NewWriter(outWriter, tabwriterMinWidth, tabwriterWidth, tabwriterPadding, tabwriterPadChar, tabwriterFlags)
Expand All @@ -393,6 +407,5 @@ var lagCmd = &cobra.Command{
fmt.Fprintf(w, "%v\t%v\n", group, lag)
}
w.Flush()

},
}

0 comments on commit aaae395

Please sign in to comment.