Skip to content

Commit

Permalink
output/kafka: support other compression types (#104)
Browse files Browse the repository at this point in the history
  • Loading branch information
parsa97 committed Jun 29, 2024
1 parent 77154e4 commit 23d5e54
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,9 @@ Note that command line arguments are case-insensitive as of v0.9.5
# Compress Kafka connection
--kafkacompress

# Compression Type[gzip, snappy, lz4, zstd] default is snappy
--kafkacompressiontype=snappy

# Use TLS for kafka connection
--kafkasecure

Expand Down
3 changes: 3 additions & 0 deletions config-sample.ini
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ kafkabatchdelay = 1s
; Compress Kafka connection
kafkacompress = false

; Compression Type[gzip, snappy, lz4, zstd] default is snappy
kafkacompressiontype = snappy

; Use TLS for kafka connection
kafkasecure = false

Expand Down
3 changes: 3 additions & 0 deletions docs/content/en/docs/Outputs/kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ KafkaBatchDelay = 1s
; Compress Kafka connection
KafkaCompress = false

; Compression Type[gzip, snappy, lz4, zstd] default is snappy
KafkaCompressiontype = snappy

; Use TLS for kafka connection
KafkaSecure = false

Expand Down
16 changes: 15 additions & 1 deletion internal/output/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ type kafkaConfig struct {
KafkaTimeout uint `long:"kafkatimeout" ini-name:"kafkatimeout" env:"DNSMONSTER_KAFKATIMEOUT" default:"3" description:"Kafka connection timeout in seconds"`
KafkaBatchDelay time.Duration `long:"kafkabatchdelay" ini-name:"kafkabatchdelay" env:"DNSMONSTER_KAFKABATCHDELAY" default:"1s" description:"Interval between sending results to Kafka if Batch size is not filled"`
KafkaCompress bool `long:"kafkacompress" ini-name:"kafkacompress" env:"DNSMONSTER_KAFKACOMPRESS" description:"Compress Kafka connection"`
KafkaCompressionType string `long:"kafkacompressiontype" ini-name:"kafkacompressiontype" env:"DNSMONSTER_KAFKACOMPRESSIONTYPE" default:"snappy" description:"Compression Type Kafka connection [snappy gzip lz4 zstd]; default(snappy)." choice:"snappy" choice:"gzip" choice:"lz4" choice:"zstd"`
KafkaSecure bool `long:"kafkasecure" ini-name:"kafkasecure" env:"DNSMONSTER_KAFKASECURE" description:"Use TLS for kafka connection"`
KafkaCACertificatePath string `long:"kafkacacertificatepath" ini-name:"kafkacacertificatepath" env:"DNSMONSTER_KAFKACACERTIFICATEPATH" default:"" description:"Path of CA certificate that signs Kafka broker certificate"`
KafkaTLSCertificatePath string `long:"kafkatlscertificatepath" ini-name:"kafkatlscertificatepath" env:"DNSMONSTER_KAFKATLSCERTIFICATEPATH" default:"" description:"Path of TLS certificate to present to broker"`
Expand Down Expand Up @@ -148,7 +149,20 @@ func (kafConfig kafkaConfig) getWriter() *kafka.Writer {
}

if kafConfig.KafkaCompress {
kWriter.Compression = kafka.Snappy
switch kafConfig.KafkaCompressionType {
case "gzip":
kWriter.Compression = kafka.Gzip
log.Info("Kafka using compression: gzip")
case "snappy":
kWriter.Compression = kafka.Snappy
log.Info("Kafka using compression: snappy")
case "lz4":
kWriter.Compression = kafka.Lz4
log.Info("Kafka using compression: lz4")
case "zstd":
kWriter.Compression = kafka.Zstd
log.Info("Kafka using compression: zstd")
}
}

return kWriter
Expand Down

0 comments on commit 23d5e54

Please sign in to comment.