-
Notifications
You must be signed in to change notification settings - Fork 0
/
consumer.go
80 lines (71 loc) · 1.72 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
package consumer
import (
"context"
"github.com/apache/rocketmq-client-go/v2"
"github.com/apache/rocketmq-client-go/v2/consumer"
"github.com/apache/rocketmq-client-go/v2/primitive"
"github.com/apache/rocketmq-client-go/v2/rlog"
"github.com/khaosles/go-contrib/core/config"
glog "github.com/khaosles/go-contrib/core/log"
"github.com/khaosles/go-contrib/rocket"
"github.com/khaosles/giz/g"
)
/*
@File: consumer.go
@Author: khaosles
@Time: 2023/8/17 11:34
@Desc:
*/
var pushConsumer rocketmq.PushConsumer
type Consumer struct {
rocket.Rocketmq `yaml:",inline" mapstructure:",squash"`
}
func init() {
var c Consumer
// 解析参数
if err := config.Configuration(rocket.APP, &c); err != nil {
glog.Fatal(err)
}
rlog.SetLogLevel(c.LogLevel)
// 启动实例
if err := c.run(); err != nil {
glog.Fatal(err)
}
glog.Info("Push consumer connect succeed")
}
func (c Consumer) run() error {
var err error
// push
pushConsumer, err = rocketmq.NewPushConsumer(
consumer.WithNameServer(c.NameServer),
consumer.WithGroupName(c.GroupName),
consumer.WithRetry(c.Retry),
consumer.WithCredentials(primitive.Credentials{
AccessKey: c.AccessKey,
SecretKey: c.SecretKey,
}),
)
if err != nil {
return err
}
return nil
}
func Subscribe(topic string, selector consumer.MessageSelector,
cb func(context.Context, ...*primitive.MessageExt) (consumer.ConsumeResult, error)) error {
// 监听退出
go g.Exit(func() {
// 停止消费
_ = pushConsumer.Shutdown()
// 取消订阅
_ = pushConsumer.Unsubscribe(topic)
})
// 订阅主题
if err := pushConsumer.Subscribe(topic, selector, cb); err != nil {
return err
}
// 开始消费
if err := pushConsumer.Start(); err != nil {
return err
}
return nil
}