Skip to content

Commit

Permalink
Kafka.NewWriter() will be removed and Writer value can be instantiate…
Browse files Browse the repository at this point in the history
…d and configured directly
  • Loading branch information
M4n5ter committed Feb 24, 2022
1 parent 8200942 commit b3de6dc
Showing 1 changed file with 6 additions and 8 deletions.
14 changes: 6 additions & 8 deletions kq/pusher.go
Expand Up @@ -6,7 +6,6 @@ import (
"time"

"github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/snappy"
"github.com/zeromicro/go-zero/core/executors"
"github.com/zeromicro/go-zero/core/logx"
)
Expand All @@ -27,13 +26,12 @@ type (
)

func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
producer := kafka.NewWriter(kafka.WriterConfig{
Brokers: addrs,
Topic: topic,
Balancer: &kafka.LeastBytes{},
CompressionCodec: snappy.NewCompressionCodec(),
})

producer := &kafka.Writer{
Addr: kafka.TCP(addrs...),
Topic: topic,
Balancer: &kafka.LeastBytes{},
Compression: kafka.Snappy,
}
pusher := &Pusher{
produer: producer,
topic: topic,
Expand Down

0 comments on commit b3de6dc

Please sign in to comment.