From fd6e1766b03b3de4b3ad39a011568edfdb105db8 Mon Sep 17 00:00:00 2001 From: David Reimschussel Date: Wed, 19 Oct 2022 16:20:23 -0600 Subject: [PATCH] fix: switch to sarama's new consumer group rebalance strategy setting --- plugins/inputs/kafka_consumer/kafka_consumer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index e8df2ea49ccd7..a725139975068 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -112,11 +112,11 @@ func (k *KafkaConsumer) Init() error { switch strings.ToLower(k.BalanceStrategy) { case "range", "": - cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} case "roundrobin": - cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin} case "sticky": - cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} default: return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy) }