Skip to content

Commit

Permalink
Merge pull request #449 from alibresco/main
Browse files Browse the repository at this point in the history
Add config option for Kafka version to Kafka output
  • Loading branch information
karimra committed May 22, 2024
2 parents 0d75e08 + a929f96 commit ef4175d
Showing 1 changed file with 8 additions and 0 deletions.
8 changes: 8 additions & 0 deletions pkg/outputs/kafka_output/kafka_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ type config struct {
SplitEvents bool `mapstructure:"split-events,omitempty"`
NumWorkers int `mapstructure:"num-workers,omitempty"`
CompressionCodec string `mapstructure:"compression-codec,omitempty"`
KafkaVersion string `mapstructure:"kafka-version,omitempty"`
Debug bool `mapstructure:"debug,omitempty"`
BufferSize int `mapstructure:"buffer-size,omitempty"`
OverrideTimestamps bool `mapstructure:"override-timestamps,omitempty"`
Expand Down Expand Up @@ -508,6 +509,13 @@ func (k *kafkaOutput) SetTargetsConfig(map[string]*types.TargetConfig) {}
func (k *kafkaOutput) createConfig() (*sarama.Config, error) {
cfg := sarama.NewConfig()
cfg.ClientID = k.cfg.Name
if k.cfg.KafkaVersion != "" {
var err error
cfg.Version, err = sarama.ParseKafkaVersion(k.cfg.KafkaVersion)
if err != nil {
return nil, err
}
}
// SASL_PLAINTEXT or SASL_SSL
if k.cfg.SASL != nil {
cfg.Net.SASL.Enable = true
Expand Down

0 comments on commit ef4175d

Please sign in to comment.