/
kafkaProducer.go
77 lines (68 loc) · 1.8 KB
/
kafkaProducer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package kafka
import (
"context"
"time"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
"github.com/kiga-hub/arc/utils"
)
func (k *Kafka) createProducer() (err error) {
k.producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": k.config.BootStrapServers,
"client.id": k.config.ClientID,
"message.max.bytes": k.config.MessageMaxBytes, // 64*1024*1024
})
return
}
// CreateProducerKeepalived -
func (k *Kafka) CreateProducerKeepalived(ctx context.Context) {
if err := k.createProducer(); err != nil {
k.logger.Error(err)
} else {
k.logger.Infow("kafka producer started", "addr", k.config.BootStrapServers)
}
for {
if k.isClose.Load() {
return
}
select {
case e := <-k.producer.Events():
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
k.logger.Errorw("Delivery failed", "topic_partition", ev.TopicPartition)
}
continue
case kafka.Error:
if ev.Code() == kafka.ErrAllBrokersDown {
k.logger.Errorw("Kafka ErrAllBrokersDown", "code", ev.Code())
}
default:
break
}
case <-ctx.Done():
k.producer.Flush(1000)
k.producer.Close()
return
}
}
}
// ProduceData -
func (k *Kafka) ProduceData(topic string, key, value []byte) {
k.producer.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{
Topic: &topic,
Partition: kafka.PartitionAny,
},
Timestamp: time.Now(), // now
Key: key, // now
Value: value, // json,
}
}
// ProduceDataWithTimeKey -
func (k *Kafka) ProduceDataWithTimeKey(topic string, value []byte) {
k.ProduceData(topic, utils.Str2byte(time.Now().String()), value)
}
// ProduceDataSimple -
func (k *Kafka) ProduceDataSimple(value []byte) {
k.ProduceData(k.config.Topic, utils.Str2byte(time.Now().String()), value)
}