Skip to content

Commit

Permalink
fix: Intermittent failure from Kafka to get consumer offsets (#803)
Browse files Browse the repository at this point in the history
Signed-off-by: Yashash H L <yashashhl25@gmail.com>
  • Loading branch information
yhl25 committed Jun 21, 2023
1 parent 271225a commit 2ce0ac9
Showing 1 changed file with 20 additions and 5 deletions.
25 changes: 20 additions & 5 deletions pkg/sources/kafka/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,15 +246,11 @@ func (r *KafkaSource) Close() error {
// finally, shut down the client
r.cancelfn()
if r.adminClient != nil {
// closes the underlying sarama client as well.
if err := r.adminClient.Close(); err != nil {
r.logger.Errorw("Error in closing kafka admin client", zap.Error(err))
}
}
if r.saramaClient != nil {
if err := r.saramaClient.Close(); err != nil {
r.logger.Errorw("Error in closing kafka sarama client", zap.Error(err))
}
}
<-r.stopch
for _, p := range r.sourcePublishWMs {
if err := p.Close(); err != nil {
Expand All @@ -276,6 +272,10 @@ func (r *KafkaSource) Pending(ctx context.Context) (int64, error) {
totalPending := int64(0)
rep, err := r.adminClient.ListConsumerGroupOffsets(r.groupName, map[string][]int32{r.topic: partitions})
if err != nil {
err := r.refreshAdminClient()
if err != nil {
return isb.PendingNotAvailable, fmt.Errorf("failed to update the admin client, %w", err)
}
return isb.PendingNotAvailable, fmt.Errorf("failed to list consumer group offsets, %w", err)
}
for _, partition := range partitions {
Expand Down Expand Up @@ -374,6 +374,21 @@ func NewKafkaSource(
return kafkasource, nil
}

// refreshAdminClient refreshes the admin client
func (r *KafkaSource) refreshAdminClient() error {
if _, err := r.saramaClient.RefreshController(); err != nil {
return fmt.Errorf("failed to refresh controller, %w", err)
}
// we are not closing the old admin client because it will close the underlying sarama client as well
// it is safe to not close the admin client, since we are using the same sarama client we will not leak any resources(tcp connections)
admin, err := sarama.NewClusterAdminFromClient(r.saramaClient)
if err != nil {
return fmt.Errorf("failed to create new admin client, %w", err)
}
r.adminClient = admin
return nil
}

func configFromOpts(yamlconfig string) (*sarama.Config, error) {
config, err := sharedutil.GetSaramaConfigFromYAMLString(yamlconfig)
if err != nil {
Expand Down

0 comments on commit 2ce0ac9

Please sign in to comment.