From a929f96b4589c42c1fdeaf32abb5c8b12b46f860 Mon Sep 17 00:00:00 2001 From: Adam Libresco Date: Tue, 21 May 2024 13:39:41 -0400 Subject: [PATCH] Add config option for Kafka version to Kafka output --- pkg/outputs/kafka_output/kafka_output.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/outputs/kafka_output/kafka_output.go b/pkg/outputs/kafka_output/kafka_output.go index c769aa03..9217a76b 100644 --- a/pkg/outputs/kafka_output/kafka_output.go +++ b/pkg/outputs/kafka_output/kafka_output.go @@ -95,6 +95,7 @@ type config struct { SplitEvents bool `mapstructure:"split-events,omitempty"` NumWorkers int `mapstructure:"num-workers,omitempty"` CompressionCodec string `mapstructure:"compression-codec,omitempty"` + KafkaVersion string `mapstructure:"kafka-version,omitempty"` Debug bool `mapstructure:"debug,omitempty"` BufferSize int `mapstructure:"buffer-size,omitempty"` OverrideTimestamps bool `mapstructure:"override-timestamps,omitempty"` @@ -508,6 +509,13 @@ func (k *kafkaOutput) SetTargetsConfig(map[string]*types.TargetConfig) {} func (k *kafkaOutput) createConfig() (*sarama.Config, error) { cfg := sarama.NewConfig() cfg.ClientID = k.cfg.Name + if k.cfg.KafkaVersion != "" { + var err error + cfg.Version, err = sarama.ParseKafkaVersion(k.cfg.KafkaVersion) + if err != nil { + return nil, err + } + } // SASL_PLAINTEXT or SASL_SSL if k.cfg.SASL != nil { cfg.Net.SASL.Enable = true