diff --git a/plugins/inputs/kafka_consumer/kafka_consumer.go b/plugins/inputs/kafka_consumer/kafka_consumer.go index e8df2ea49ccd7..a725139975068 100644 --- a/plugins/inputs/kafka_consumer/kafka_consumer.go +++ b/plugins/inputs/kafka_consumer/kafka_consumer.go @@ -112,11 +112,11 @@ func (k *KafkaConsumer) Init() error { switch strings.ToLower(k.BalanceStrategy) { case "range", "": - cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange + cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRange} case "roundrobin": - cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin + cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategyRoundRobin} case "sticky": - cfg.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategySticky + cfg.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{sarama.BalanceStrategySticky} default: return fmt.Errorf("invalid balance strategy %q", k.BalanceStrategy) }