-
Notifications
You must be signed in to change notification settings - Fork 0
/
rocket.go
153 lines (131 loc) · 3.56 KB
/
rocket.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
package queue
import (
"context"
"fmt"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/producer"
)
type RocketMq struct {
endPoints []string
producerIns rocketmq.Producer
consumerIns rocketmq.PushConsumer
}
// RegisterRocketProducer 注册并启动生产者接口实现
func RegisterRocketProducer(endPoints []string, groupName string, retry int) (client Producer, err error) {
client, err = RegisterRocketMqProducer(endPoints, groupName, retry)
if err != nil {
return
}
return
}
// RegisterRocketConsumer 注册消费者
func RegisterRocketConsumer(endPoints []string, groupName string) (client Consumer, err error) {
client, err = RegisterRocketMqConsumer(endPoints, groupName)
if err != nil {
return
}
return
}
// SendMsg 按字符串类型生产数据
func (r *RocketMq) SendMsg(topic string, body string) (mqMsg Msg, err error) {
return r.SendByteMsg(topic, []byte(body))
}
// SendByteMsg 生产数据
func (r *RocketMq) SendByteMsg(topic string, body []byte) (mqMsg Msg, err error) {
if r.producerIns == nil {
return mqMsg, fmt.Errorf("rocketMq producer not register")
}
result, err := r.producerIns.SendSync(context.Background(), &primitive.Message{
Topic: topic,
Body: body,
})
if err != nil {
return
}
if result.Status != primitive.SendOK {
return mqMsg, fmt.Errorf("rocketMq producer send msg error status:%v", result.Status)
}
mqMsg = Msg{
RunType: SendMsg,
Topic: topic,
MsgId: result.MsgID,
Body: body,
}
return mqMsg, nil
}
func (r *RocketMq) SendDelayMsg(topic string, body string, delaySecond int64) (mqMsg Msg, err error) {
err = fmt.Errorf("implement me")
return
}
// ListenReceiveMsgDo 消费数据
func (r *RocketMq) ListenReceiveMsgDo(topic string, receiveDo func(mqMsg Msg)) (err error) {
if r.consumerIns == nil {
return fmt.Errorf("rocketMq consumer not register")
}
err = r.consumerIns.Subscribe(topic, consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
for _, item := range msgs {
go receiveDo(Msg{
RunType: ReceiveMsg,
Topic: item.Topic,
MsgId: item.MsgId,
Body: item.Body,
})
}
return consumer.ConsumeSuccess, nil
})
if err != nil {
return
}
if err = r.consumerIns.Start(); err != nil {
_ = r.consumerIns.Unsubscribe(topic)
return
}
return
}
// RegisterRocketMqProducer 注册rocketmq生产者
func RegisterRocketMqProducer(endPoints []string, groupName string, retry int) (mqIns *RocketMq, err error) {
addr, err := primitive.NewNamesrvAddr(endPoints...)
if err != nil {
return nil, err
}
mqIns = &RocketMq{
endPoints: endPoints,
}
if retry <= 0 {
retry = 0
}
mqIns.producerIns, err = rocketmq.NewProducer(
producer.WithNameServer(addr),
producer.WithRetry(retry),
producer.WithGroupName(groupName),
)
if err != nil {
return nil, err
}
err = mqIns.producerIns.Start()
if err != nil {
return nil, err
}
return mqIns, nil
}
// RegisterRocketMqConsumer 注册rocketmq消费者
func RegisterRocketMqConsumer(endPoints []string, groupName string) (mqIns *RocketMq, err error) {
addr, err := primitive.NewNamesrvAddr(endPoints...)
if err != nil {
return nil, err
}
mqIns = &RocketMq{
endPoints: endPoints,
}
mqIns.consumerIns, err = rocketmq.NewPushConsumer(
consumer.WithNameServer(addr),
consumer.WithConsumerModel(consumer.Clustering),
consumer.WithGroupName(groupName),
)
if err != nil {
return nil, err
}
return mqIns, nil
}