/
kafkaWorker.go
346 lines (315 loc) · 10.8 KB
/
kafkaWorker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
package kafka
import (
"encoding/json"
"fmt"
"strings"
"time"
proto "github.com/golang/protobuf/proto"
"github.com/kevinyjn/gocom/logger"
"github.com/kevinyjn/gocom/mq/mqenv"
"github.com/kevinyjn/gocom/utils"
)
// Worker 订阅topic 后处理收到信息的回调函数.
type Worker func(*KafkaPacket) []byte
// KafkaWorker 把生产者、消费者结合起来,实现请求响应模式.
type KafkaWorker struct {
Producer *Producer // 生产者
Consumer *Consumer // 消费者
consumerRegisters map[string]*mqenv.MQConsumerProxy // 已经订阅的topic
methodRegisters map[string]*mqenv.MQConsumerProxy // 处理函数字典
PrivateTopic string // 私有topic,用于发出信息后收到回复
waitResponseMessage map[string]chan *KafkaPacket //发出信息后,会以消息id为key 保存在字典中,值是通道。通过通道来接收信息
availableChannels []chan *KafkaPacket // 可用于接收的通道切片
openTopicChannel map[string]string // 记录已经打开的topic通道
ContentType string //序列化类型,如json
ContentEncoding string // 编码格式
GroupID string //组id,会包含在 kafkapacket 数据包中
MsgType string // 消息类型
Stats Stats // 统计信息
UseOriginalContent bool // 是否使用原始的方式序列化(使用json 序列化,而不是protobuf)
}
// newChannel 返回一个新的 字节数组通道.
func (worker *KafkaWorker) newChannel() chan *KafkaPacket {
c := make(chan *KafkaPacket)
return c
}
// obtainChannel 获取一个通道.
func (worker *KafkaWorker) obtainChannel() chan *KafkaPacket {
channelLength := len(worker.availableChannels)
if channelLength == 0 {
return worker.newChannel()
}
c := worker.availableChannels[channelLength-1]
// 从可用的数组从移除
worker.availableChannels = worker.availableChannels[:channelLength-1]
return c
}
// recycleChannel 回收用过的旧通道.
func (worker *KafkaWorker) recycleChannel(c chan *KafkaPacket) {
worker.availableChannels = append(worker.availableChannels, c)
}
// sendWorker 对发送的操作做额外的操作.
func (worker *KafkaWorker) sendWorker(topic string, message []byte) error {
_, ok := worker.openTopicChannel[topic]
if !ok {
err := worker.sendOpenChannel(topic)
if err != nil {
return err
}
worker.openTopicChannel[topic] = "1"
}
worker.Stats.Producer.Bytes += int64(len(message))
worker.Stats.Producer.Messages++
err := worker.Producer.Send(topic, message)
return err
}
// sendOpenChannel 用于创建私有topic 或第一次发topic发送信息,用于把通道打开.
func (worker *KafkaWorker) sendOpenChannel(topic string) error {
registerValue := []byte("{'_register_private': 'open'}")
for i := 0; i < 20; i++ {
err := worker.Producer.Send(topic, registerValue)
if nil != err {
logger.Error.Println(err)
// 因为开启kafka 自动创建topic后,第一次向private_topic 发送信息会报错
// return err
}
time.Sleep(20 * time.Millisecond)
}
return nil
}
// registerPrivateTopic 注册私有的topic,接收发送出去的信息的回复.
func (worker *KafkaWorker) registerPrivateTopic() {
if worker.PrivateTopic == "" {
return
}
_, ok := worker.consumerRegisters[worker.PrivateTopic]
if !ok {
worker.sendOpenChannel(worker.PrivateTopic)
// PrivateTopic 收到的信息是发出信息后的回复,会在onMessage被拦截用chan 返回给发送方
// 所以这里传的回调函数不做任何处理
time.Sleep(100 * time.Millisecond)
proxy := &mqenv.MQConsumerProxy{}
worker.Subscribe(worker.PrivateTopic, proxy)
}
}
// Send 发送信息.
func (worker *KafkaWorker) Send(topic string, publishMsg *mqenv.MQPublishMessage, needReply bool) (*mqenv.MQConsumerMessage, error) {
worker.registerPrivateTopic()
headers := make([]*KafkaPacket_Header, 0)
for k, v := range publishMsg.Headers {
h := &KafkaPacket_Header{
Name: k,
Value: v,
}
headers = append(headers, h)
}
replyTo := worker.PrivateTopic
if worker.PrivateTopic == "" {
replyTo = publishMsg.ReplyTo
}
p := &KafkaPacket{
ContentType: publishMsg.ContentType,
ContentEncoding: worker.ContentEncoding,
SendTo: topic,
GroupId: worker.GroupID,
CorrelationId: publishMsg.CorrelationID,
ReplyTo: replyTo,
Timestamp: uint64(utils.CurrentMillisecond()),
Type: worker.MsgType,
UserId: publishMsg.UserID,
AppId: publishMsg.AppID,
StatusCode: 200,
ErrorMessage: "success",
Body: publishMsg.Body,
Headers: headers,
RoutingKey: publishMsg.RoutingKey,
ConsumerTag: publishMsg.RoutingKey,
Exchange: publishMsg.Exchange,
}
var sendBytes []byte
var err error
if worker.UseOriginalContent {
sendBytes, err = json.Marshal(p)
} else {
sendBytes, err = proto.Marshal(p)
}
if err != nil {
logger.Error.Println(err)
return nil, err
}
// 注册通道,等待回复
if needReply {
ch := worker.obtainChannel()
worker.waitResponseMessage[p.CorrelationId] = ch
worker.sendWorker(topic, sendBytes)
responsePacket := <-ch
// 回收通道
worker.recycleChannel(ch)
consumerMessage := ConvertKafkaPacketToMQConsumerMessage(responsePacket)
return &consumerMessage, nil
}
worker.sendWorker(topic, sendBytes)
return nil, nil
}
// reply 服务端收到信息处理完后进行回复.
func (worker *KafkaWorker) reply(topic string, message *mqenv.MQPublishMessage, msgID string) {
headers := make([]*KafkaPacket_Header, 0)
for k, v := range message.Headers {
h := &KafkaPacket_Header{
Name: k,
Value: v,
}
headers = append(headers, h)
}
p := &KafkaPacket{
ContentType: message.ContentType,
ContentEncoding: worker.ContentEncoding,
SendTo: topic,
GroupId: worker.GroupID,
CorrelationId: msgID,
ReplyTo: worker.PrivateTopic,
Timestamp: uint64(time.Now().UTC().Unix()),
Type: worker.MsgType,
UserId: message.UserID,
AppId: message.AppID,
StatusCode: 200,
ErrorMessage: "success",
Body: message.Body,
Headers: headers,
RoutingKey: message.ReplyTo,
ConsumerTag: message.ReplyTo,
Exchange: topic,
}
var sendBytes []byte
var err error
if worker.UseOriginalContent {
sendBytes, err = json.Marshal(p)
} else {
sendBytes, err = proto.Marshal(p)
}
if err != nil {
logger.Error.Println(err)
}
worker.sendWorker(topic, sendBytes)
// logger.Debug.Println("reply " + utils.HumanByteText(message.Body))
}
// onMessage 处理收到的信息的逻辑.
func (worker *KafkaWorker) onMessage(packet *KafkaPacket) {
// 收到信息有两种情况
// 1 发出信息后收到回复,在waitResponseMessage 有通道,把信息发送过去就可以了
// 2 订阅topic 后收到的回复
// logger.Debug.Println("onMessage body=" + utils.HumanByteText(packet.Body))
ch, ok := worker.waitResponseMessage[packet.CorrelationId]
if ok {
delete(worker.waitResponseMessage, packet.CorrelationId)
ch <- packet
} else {
//consumerProxy, isExits := worker.consumerRegisters[packet.SendTo]
// 先从包含具体方法的字典(methodRegisters)查找处理函数
//如果找不到就从订阅topic的字典(consumerRegisters)查找
key := fmt.Sprintf("%s-%s", packet.SendTo, packet.RoutingKey)
consumerProxy, isExits := worker.methodRegisters[key]
if !isExits {
consumerProxy, isExits = worker.consumerRegisters[packet.SendTo]
}
if isExits {
func() {
defer func() {
if err := recover(); err != nil {
logger.Error.Println(err)
}
}()
consumerMessage := ConvertKafkaPacketToMQConsumerMessage(packet)
if consumerProxy.Callback != nil {
result := consumerProxy.Callback(consumerMessage)
// if result != nil {
// logger.Debug.Println("onMessage result=" + utils.HumanByteText(result.Body))
// logger.Debug.Println("onMessage packet.ReplyTo=" + packet.ReplyTo)
// }
if result != nil && packet.ReplyTo != "" {
// logger.Debug.Println("onMessage begin to reply")
worker.reply(packet.ReplyTo, result, packet.CorrelationId)
}
}
}()
}
}
}
// bindToOnMessage 绑定接收到的数据.
func (worker *KafkaWorker) bindToOnMessage(data []byte) {
// 私有topic不一定存在,所以worker 会发送信息来创建。
// 收到创建的信息忽略掉
logger.Debug.Println("bindToOnMessage: " + utils.HumanByteText(data))
if strings.Contains(string(data), "_register_private") {
return
}
worker.Stats.Consumer.Bytes += int64(len(data))
worker.Stats.Consumer.Messages++
p := &KafkaPacket{}
var err error
if worker.UseOriginalContent {
err = json.Unmarshal(data, p)
} else {
err = proto.Unmarshal(data, p)
}
if err != nil {
logger.Error.Println(err)
} else {
worker.extractRoutingKey(p)
worker.onMessage(p)
}
}
// Subscribe 订阅topic.
func (worker *KafkaWorker) Subscribe(topic string, consumeProxy *mqenv.MQConsumerProxy) error {
_, ok := worker.consumerRegisters[topic]
if !ok {
logger.Info.Println("Subscribe subscribing topic " + topic)
worker.Consumer.Receive(topic, worker.bindToOnMessage)
worker.consumerRegisters[topic] = consumeProxy
}
key := fmt.Sprintf("%s-%s", consumeProxy.Queue, consumeProxy.ConsumerTag)
_, ok = worker.methodRegisters[key]
if !ok {
worker.methodRegisters[key] = consumeProxy
}
return nil
}
// extractRoutingKey 尝试从body 里面提取routing key.
func (worker *KafkaWorker) extractRoutingKey(packet *KafkaPacket) error {
if packet.RoutingKey != "" {
return nil
}
var params struct {
Method string `json:"method"`
RoutingKey string `json:"routingKey"`
}
err := json.Unmarshal(packet.Body, ¶ms)
if nil != err {
// logger.Error.Println(err)
return err
}
if params.Method != "" {
packet.RoutingKey = params.Method
return nil
}
if params.RoutingKey != "" {
packet.RoutingKey = params.RoutingKey
return nil
}
return fmt.Errorf("cannot found method value")
}
// NewKafkaWorker 实例化一个kafka worker.
func NewKafkaWorker(hosts string, partition int, privateTopic, groupID string) *KafkaWorker {
worker := &KafkaWorker{}
worker.Producer = NewProducer(hosts, partition)
worker.Consumer = NewConsumer(hosts, groupID)
worker.PrivateTopic = privateTopic
worker.waitResponseMessage = make(map[string]chan *KafkaPacket)
worker.availableChannels = []chan *KafkaPacket{}
worker.consumerRegisters = make(map[string]*mqenv.MQConsumerProxy)
worker.methodRegisters = make(map[string]*mqenv.MQConsumerProxy)
worker.openTopicChannel = make(map[string]string)
worker.Stats.Consumer = InstStats{}
worker.Stats.Producer = InstStats{}
return worker
}