-
Notifications
You must be signed in to change notification settings - Fork 17
/
kafka.go
58 lines (48 loc) · 1.14 KB
/
kafka.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package kafka
import (
"strings"
"sync"
"github.com/henrylee2cn/pholcus/common/util"
"github.com/henrylee2cn/pholcus/config"
"github.com/henrylee2cn/pholcus/logs"
"github.com/Shopify/sarama"
)
var (
err error
producer sarama.SyncProducer
lock sync.RWMutex
once sync.Once
)
type KafkaSender struct {
topic string
}
func GetProducer() (sarama.SyncProducer, error) {
return producer, err
}
//刷新producer
func Refresh() {
once.Do(func() {
conf := sarama.NewConfig()
conf.Producer.RequiredAcks = sarama.WaitForAll //等待所有备份返回ack
conf.Producer.Retry.Max = 10 // 重试次数
brokerList := config.KAFKA_BORKERS
producer, err = sarama.NewSyncProducer(strings.Split(brokerList, ","), conf)
if err != nil {
logs.Log.Error("Kafka:%v\n", err)
}
})
}
func New() *KafkaSender {
return &KafkaSender{}
}
func (p *KafkaSender) SetTopic(topic string) {
p.topic = topic
}
func (p *KafkaSender) Push(data map[string]interface{}) error {
val := util.JsonString(data)
_, _, err := producer.SendMessage(&sarama.ProducerMessage{
Topic: p.topic,
Value: sarama.StringEncoder(val),
})
return err
}