-
Notifications
You must be signed in to change notification settings - Fork 3
/
test2.go
78 lines (69 loc) · 1.88 KB
/
test2.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package main
import (
"context"
rmq_client "github.com/apache/rocketmq-clients/golang/v5"
"github.com/richelieu-yang/chimera/v3/src/component/mq/rocketmq5Kit"
"github.com/richelieu-yang/chimera/v3/src/config/viperKit"
"github.com/richelieu-yang/chimera/v3/src/consts"
"github.com/richelieu-yang/chimera/v3/src/core/pathKit"
"github.com/richelieu-yang/chimera/v3/src/log/logrusKit"
"github.com/sirupsen/logrus"
"time"
)
func init() {
logrusKit.MustSetUp(nil)
}
func main() {
var (
topic = "test"
consumerGroup = "a"
tag = "test"
)
{
wd, err := pathKit.ReviseWorkingDirInTestMode(consts.ProjectName)
if err != nil {
logrus.Fatal(err)
}
logrus.Infof("wd: [%s].", wd)
}
path := "_chimera-lib/config.yaml"
type config struct {
RocketMQ5 *rocketmq5Kit.Config `json:"rocketmq5"`
}
c := &config{}
_, err := viperKit.UnmarshalFromFile(path, nil, c)
if err != nil {
logrus.Fatal(err)
}
rocketmq5Kit.MustSetUp(c.RocketMQ5, "_client.log", nil)
consumer, err := rocketmq5Kit.NewSimpleConsumer(consumerGroup, map[string]*rmq_client.FilterExpression{
topic: rmq_client.NewFilterExpression(tag),
})
if err != nil {
logrus.Fatal(err)
}
for {
time.Sleep(time.Second)
mvs, err := consumer.Receive(context.TODO(), rocketmq5Kit.DefaultMaxMessageNum, rocketmq5Kit.DefaultInvisibleDuration)
if err != nil {
logrus.WithError(err).Error("Fail to receive message.")
continue
}
for _, mv := range mvs {
text := string(mv.GetBody())
err := consumer.Ack(context.TODO(), mv)
if err != nil {
logrus.WithFields(logrus.Fields{
"tag": rocketmq5Kit.GetTagString(mv.GetTag()),
"text": text,
"error": err.Error(),
}).Error("Fail to ack message.")
continue
}
logrus.WithFields(logrus.Fields{
"tag": rocketmq5Kit.GetTagString(mv.GetTag()),
"text": text,
}).Info("Manager to receive and ack a message.")
}
}
}