/
kafka.go
75 lines (63 loc) · 2.75 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package forwarder
import (
"context"
"fmt"
"github.com/Shopify/sarama"
"github.com/spf13/viper"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
"go.opentelemetry.io/otel"
"log"
"os"
"strings"
"time"
)
type KafkaForwarder struct {
producer sarama.SyncProducer
topicPrefix string
}
func NewKafkaForwarder(config *viper.Viper) (*KafkaForwarder, error) {
if config.GetBool("kafka.logger.enabled") {
sarama.Logger = log.New(os.Stdout, "sarama", log.Llongfile)
}
kafkaConf := sarama.NewConfig()
kafkaConf.Net.MaxOpenRequests = config.GetInt("kafka.producer.net.maxOpenRequests")
kafkaConf.Net.DialTimeout = config.GetDuration("kafka.producer.net.dialTimeout")
kafkaConf.Net.ReadTimeout = config.GetDuration("kafka.producer.net.readTimeout")
kafkaConf.Net.WriteTimeout = config.GetDuration("kafka.producer.net.writeTimeout")
kafkaConf.Net.KeepAlive = config.GetDuration("kafka.producer.net.keepAlive")
kafkaConf.Producer.Return.Errors = true
kafkaConf.Producer.Return.Successes = true
kafkaConf.Producer.Idempotent = config.GetBool("kafka.producer.idempotent")
kafkaConf.Producer.MaxMessageBytes = config.GetInt("kafka.producer.maxMessageBytes")
kafkaConf.Producer.Timeout = config.GetDuration("kafka.producer.timeout")
kafkaConf.Producer.Flush.Bytes = config.GetInt("kafka.producer.batch.size")
kafkaConf.Producer.Flush.Frequency = time.Duration(config.GetInt("kafka.producer.linger.ms")) * time.Millisecond
kafkaConf.Producer.Retry.Max = config.GetInt("kafka.producer.retry.max")
kafkaConf.Producer.RequiredAcks = sarama.WaitForLocal
kafkaConf.Producer.Compression = sarama.CompressionSnappy
kafkaConf.ClientID = config.GetString("kafka.producer.clientId")
kafkaConf.Version = sarama.V2_2_0_0
brokers := strings.Split(config.GetString("kafka.producer.brokers"), ",")
producer, err := sarama.NewSyncProducer(brokers, kafkaConf)
if err != nil {
return nil, err
}
topicPrefix := config.GetString("kafka.producer.topicPrefix")
tracerProvider := otelsarama.WithTracerProvider(otel.GetTracerProvider())
propagator := otelsarama.WithPropagators(otel.GetTextMapPropagator())
wrappedProducer := otelsarama.WrapSyncProducer(kafkaConf, producer, tracerProvider, propagator)
return &KafkaForwarder{
producer: wrappedProducer,
topicPrefix: topicPrefix,
}, nil
}
func (k KafkaForwarder) Produce(ctx context.Context, topic string, message []byte) (int32, int64, error) {
prefixedTopic := fmt.Sprintf("%s%s", k.topicPrefix, topic)
kafkaMsg := &sarama.ProducerMessage{
Topic: prefixedTopic,
Value: sarama.ByteEncoder(message),
}
// ensure the otelsarama wrapped producer will use the span from otelgrpc
otel.GetTextMapPropagator().Inject(ctx, otelsarama.NewProducerMessageCarrier(kafkaMsg))
return k.producer.SendMessage(kafkaMsg)
}