-
Notifications
You must be signed in to change notification settings - Fork 3
/
consumer.go
130 lines (115 loc) · 2.9 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
package rabbitmq
import (
"context"
"errors"
amqp "github.com/rabbitmq/amqp091-go"
"golang.org/x/sync/errgroup"
)
type Consumer func(msg amqp.Delivery) error
type RabbitMQConsumer struct {
ctx context.Context
workers int64
eg *errgroup.Group
ErrChan chan error
clientName string
queueDeclareParams *QueueDeclareParams
queueBindParams []*QueueBindParams
consumeParams *ConsumeParams
}
type QueueDeclareParams struct {
Queue string
Durable bool
AutoDelete bool
Exclusive bool
NoWait bool
Args amqp.Table
}
type QueueBindParams struct {
QueueName string
RoutingKey string
ExchangeName string
NoWait bool
Args amqp.Table
}
type ConsumeParams struct {
QueueName string
ConsumerName string
AutoAck bool
Exclusive bool
NoLocal bool
NoWait bool
Args amqp.Table
}
func NewRabbitMQConsumer(workSize int64, rabbitModel string,
queueDeclareParams *QueueDeclareParams, consumeParams *ConsumeParams, queueBindParams ...*QueueBindParams) *RabbitMQConsumer {
g, ctx := errgroup.WithContext(context.Background())
r := &RabbitMQConsumer{
workers: workSize,
eg: g,
ctx: ctx,
clientName: rabbitModel,
queueDeclareParams: queueDeclareParams,
queueBindParams: queueBindParams,
consumeParams: consumeParams,
}
r.ErrChan = make(chan error, 10)
return r
}
func (f *RabbitMQConsumer) Working(consumer Consumer) (err error) {
cli, exist := Client.GetClient(f.clientName)
if !exist {
return errors.New("rabbitmq client is not exist,client name is " + f.clientName)
}
ch, err := cli.Channel()
if err != nil {
return
}
_, err = ch.QueueDeclare(f.queueDeclareParams.Queue, f.queueDeclareParams.Durable, f.queueDeclareParams.AutoDelete,
f.queueDeclareParams.Exclusive, f.queueDeclareParams.NoWait, f.queueDeclareParams.Args)
if err != nil {
return
}
for _, v := range f.queueBindParams {
err = ch.QueueBind(v.QueueName, v.RoutingKey, v.ExchangeName,
f.queueDeclareParams.NoWait, f.queueDeclareParams.Args)
if err != nil {
return err
}
}
_ = ch.Close()
for i := 0; i < int(f.workers); i++ {
f.eg.Go(func() (err error) {
ch, err := cli.Channel()
if err != nil {
return
}
defer ch.Close()
err = ch.Qos(8, 0, false)
if err != nil {
return
}
msgCh, err := ch.Consume(f.consumeParams.QueueName, f.consumeParams.ConsumerName, f.consumeParams.AutoAck,
f.consumeParams.Exclusive, f.consumeParams.NoLocal, f.consumeParams.NoWait, f.queueDeclareParams.Args)
if err != nil {
return
}
for {
select {
case <-f.ctx.Done():
return f.ctx.Err()
case msg := <-msgCh:
err := consumer(msg)
if err != nil {
return err
}
_ = msg.Ack(false)
}
}
})
}
err = f.eg.Wait()
if err != nil {
f.ErrChan <- err
}
return nil
}