-
Notifications
You must be signed in to change notification settings - Fork 7
/
producer.go
154 lines (130 loc) · 3.88 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package databus
import (
"errors"
"github.com/Shopify/sarama"
"github.com/mapgoo-lab/atreus/pkg/log"
"strconv"
"time"
)
type ProducerEvent interface {
//发送消息接口
SendMessage(data []byte, key string) error
//关闭生产者
Close()
}
const (
//返回一个手动选择分区的分割器,也就是获取msg中指定的`partition`
KafkaManual uint32 = 1
//通过随机函数随机获取一个分区号
KafkaRandom uint32 = 2
//环形选择,也就是在所有分区中循环选择一个
KafkaRoundRobin uint32 = 3
//通过msg中的key生成hash值,选择分区
KafkaHash uint32 = 4
)
type producerEvent struct {
address []string
topic string
isack bool
producer sarama.AsyncProducer
partlen int
partitioner uint32
}
type ProducerParam struct {
Address []string
Topic string
IsAck bool
KafkaVer string
Partitioner uint32
}
func NewAsyncProducer(param ProducerParam) (ProducerEvent, error) {
config := sarama.NewConfig()
config.Net.MaxOpenRequests = 10
//等待服务器所有副本都保存成功后的响应
config.Producer.RequiredAcks = sarama.WaitForAll
//分区选择算法
if param.Partitioner == KafkaManual {
config.Producer.Partitioner = sarama.NewManualPartitioner
} else if param.Partitioner == KafkaRandom {
config.Producer.Partitioner = sarama.NewRandomPartitioner
} else if param.Partitioner == KafkaRandom {
config.Producer.Partitioner = sarama.NewRoundRobinPartitioner
} else {
config.Producer.Partitioner = sarama.NewHashPartitioner
}
//是否等待成功和失败后的响应,只有上面的RequireAcks设置不是NoReponse这里才有用.
if param.IsAck == true {
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
}
//设置使用的kafka版本,如果低于V0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
//注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
version, err := sarama.ParseKafkaVersion(param.KafkaVer)
if err != nil {
log.Error("Error parsing Kafka version: %v", err)
return nil, err
}
config.Version = version
client, err := sarama.NewClient(param.Address, config)
if err != nil {
log.Error("Error sarama.NewClient: %v", err)
return nil, err
}
partitions, err := client.Partitions(param.Topic)
if err != nil {
log.Error("Error client.Partitions: %v", err)
return nil, err
}
partlen := len(partitions)
client.Close()
producer, err := sarama.NewAsyncProducer(param.Address, config)
if err != nil {
log.Error("Error sarama.NewAsyncProducer: %v", err)
return nil, err
}
return &producerEvent{
address: param.Address,
topic: param.Topic,
isack: param.IsAck,
producer: producer,
partlen: partlen,
partitioner: param.Partitioner,
}, nil
}
func (handle *producerEvent) SendMessage(data []byte, key string) error {
var partindex int32
partindex = 0
if handle.partitioner == KafkaManual {
index, err := strconv.Atoi(key)
if err != nil {
index = 0
}
partindex = int32(index % handle.partlen)
}
// 注意:这里的msg必须得是新构建的变量,不然你会发现发送过去的消息内容都是一样的,因为批次发送消息的关系。
msg := &sarama.ProducerMessage{
Topic: handle.topic,
Key: sarama.ByteEncoder(key),
Value: sarama.ByteEncoder(data),
Partition: partindex,
}
//使用通道发送
handle.producer.Input() <- msg
if handle.isack == true {
select {
case <-handle.producer.Successes():
return nil
case fail := <-handle.producer.Errors():
log.Error("SendMessage fail: %v", fail.Err)
return fail.Err
case timeout := <-time.After(time.Second * 10):
log.Error("ack msg error %p.", timeout)
return errors.New("ack msg timeout.")
}
}
return nil
}
func (handle *producerEvent) Close() {
log.Info("Close producer")
handle.producer.AsyncClose()
}