-
Notifications
You must be signed in to change notification settings - Fork 0
/
amqputil.go
executable file
·339 lines (294 loc) · 11.6 KB
/
amqputil.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Package amqputil provides AmqpContext to simplify AMQP interaction
package amqputil
import (
"encoding/json"
"time"
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/science-computing/service-common-golang/apputil"
"github.com/pkg/errors"
)
var log = apputil.InitLogging()
type AmqpAccessor interface {
PublishMessage(queueName string, message interface{}) error
ReceiveMessage(queueName string, message interface{}) (delivery *amqp.Delivery, err error)
Channel() ChannelAccessor
Close() error
Reset() error
LastError() error
SetLastError(err error)
ResetError()
}
// ChannelAccessor is an interface for the necessary methods to access the Channel struct of the AMQP library.
// the library does not define an interface, so we do it here (it helps for mocking)
// this interface only defines those methods that we know we need. See https://pkg.go.dev/github.com/rabbitmq/amqp091-go
// for all possible methods.
type ChannelAccessor interface {
Qos(prefetchCount, prefetchSize int, global bool) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
Close() error
Cancel(consumer string, noWait bool) error
QueueDelete(name string, ifUnused, ifEmpty, noWait bool) (int, error)
QueueInspect(name string) (amqp.Queue, error)
}
// AmqpConnectionHelper helps to get a connection AMQP
type AmqpConnectionHelper struct {
AmqpConnectionURL string
}
// AmqpContext simplifies amqp interaction by providing a context with
// a persistent connection and a channel to simplify message publishing
type AmqpContext struct {
err error
channel ChannelAccessor
connection *amqp.Connection
amqpConnectionURL string
consumerId string
queues map[string]amqp.Queue
deliveryChannels map[string]<-chan amqp.Delivery
}
// ErrNoMessages indicates, that no message were found in a queue
var ErrNoMessage = errors.Errorf("No message found in queue")
// GetAmqpContext creates an AmqpContext for the given amqpConnectionURL
// or returns an already existing AmqpContext for the amqpConnectionURL
// the consumerId identifies the consumer on the channel
func (helper *AmqpConnectionHelper) GetAmqpContext(consumerId string) (amqpContext *AmqpContext) {
log.Debugf("Get AmqpContext for URL [%v] and id [%s]", helper.AmqpConnectionURL, consumerId)
amqpContext = &AmqpContext{}
amqpContext.amqpConnectionURL = helper.AmqpConnectionURL
amqpContext.consumerId = consumerId
log.Debugf("Opening AMQP connection to [%v]", helper.AmqpConnectionURL)
// create connection
if amqpContext.connection, amqpContext.err = amqp.Dial(helper.AmqpConnectionURL); amqpContext.err != nil {
log.Warnf("Cannot open AMPQ connection to '%s', Reason: %s ", helper.AmqpConnectionURL, amqpContext.err.Error())
return nil
}
// create channel
amqpContext.Reset()
return amqpContext
}
func (amqpContext *AmqpContext) Channel() ChannelAccessor {
return amqpContext.channel
}
// Reset resets the channel and queues - asumes that
func (amqpContext *AmqpContext) Reset() error {
if amqpContext.connection == nil || amqpContext.connection.IsClosed() {
log.Debugf("Reopening connection to %s: ", amqpContext.amqpConnectionURL)
if amqpContext.connection, amqpContext.err = amqp.Dial(amqpContext.amqpConnectionURL); amqpContext.err != nil {
log.Warnf("Cannot open AMPQ context, Reason: %s ", amqpContext.err.Error())
return amqpContext.err
}
}
if amqpContext.channel != nil {
amqpContext.channel.Close()
}
// create channel
if amqpContext.channel, amqpContext.err = amqpContext.connection.Channel(); amqpContext.err != nil {
log.Warnf("Cannot open AMPQ channel, Reason: %s ", amqpContext.err.Error())
return amqpContext.err
}
if amqpContext.channel == nil {
log.Error("Channel is nil this should not happen")
}
amqpContext.queues = make(map[string]amqp.Queue)
amqpContext.deliveryChannels = make(map[string]<-chan amqp.Delivery)
return amqpContext.err
}
func (amqpContext *AmqpContext) EnsureQueueExists(queueName string) error {
// get queue from internal map or create new one
_, ok := amqpContext.queues[queueName]
if !ok {
var args = make(amqp.Table)
// args["x-queue-mode"] = "lazy"
amqpContext.queues[queueName], amqpContext.err =
amqpContext.channel.QueueDeclare(queueName, false, false, false, false, args)
if amqpContext.err != nil {
amqpContext.err = errors.Wrapf(amqpContext.err, "Cannot declare AMQP queue [%v]", queueName)
return amqpContext.err
}
}
return nil
}
// PublishMessage sends given message as application/json to queue with given name.
// If the queue does not exist, it is created.
// Errors go to AmqpContext.Err
func (amqpContext *AmqpContext) PublishMessage(queueName string, message interface{}) error {
log.Debugf("Publising message [%v] to queue [%v]", message, queueName)
// get queue from internal map or create new one
amqpContext.err = amqpContext.EnsureQueueExists(queueName)
if amqpContext.err != nil {
return amqpContext.err
}
body, err := json.Marshal(message)
if err != nil {
amqpContext.err = errors.Wrapf(err, "Failed to marshall AMQP message [%v]", message)
return amqpContext.err
}
log.Debugf("Publishing message [%v] to AMQP", string(body))
publishing := amqp.Publishing{ContentType: "application/json", Body: body}
// publish to default exchange ""
if err = amqpContext.channel.Publish("", queueName, false, false, publishing); err != nil {
amqpContext.err = errors.Wrapf(err, "Failed to publish AMQP message [%v]", message)
return amqpContext.err
}
return amqpContext.err
}
func (amqpContext *AmqpContext) registerConsumer(queueName string) {
var deliveryChan <-chan amqp.Delivery
retries := 0
amqpContext.err = amqpContext.channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
for amqpContext.err != nil && retries < 10 {
retries++
log.Warnf("Queue %s ist not available, retrying in 3s: %v", queueName, amqpContext.err)
time.Sleep(3 * time.Second)
amqpContext.Reset()
if amqpContext.err != nil {
continue
}
amqpContext.err = amqpContext.channel.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
}
if amqpContext.err != nil {
amqpContext.err = errors.Wrapf(amqpContext.err, "Failed to set Qos on queue [%v] for consumerId [%v]", queueName, amqpContext.consumerId)
return
}
log.Debugf("Registering consumer [%v] on queue [%v]", amqpContext.consumerId, queueName)
retries = 0
for {
deliveryChan, amqpContext.err = amqpContext.channel.Consume(queueName, amqpContext.consumerId, false, false, false, false, nil)
if amqpContext.err != nil {
// if queue was not found, retry
if notFoundError, ok := amqpContext.err.(*amqp.Error); ok && notFoundError.Code == 404 && retries < 10 {
log.Debugf("Consumer %v did not find queue [%v]. Retrying", amqpContext.consumerId, queueName)
// necessary as Consume() leads to a "channel not open" error after first timed out attempt
amqpContext.Reset()
retries++
time.Sleep(3 * time.Second)
} else {
// if there was another error
amqpContext.err = errors.Wrapf(amqpContext.err, "Cannot consume AMQP queue [%v] for consumerId [%v]", queueName, amqpContext.consumerId)
return
}
} else {
amqpContext.err = nil
break
}
}
amqpContext.deliveryChannels[queueName] = deliveryChan
}
// ReceiveMessage gets next message from queue with given queue name
func (amqpContext *AmqpContext) ReceiveMessage(queueName string, message interface{}) (delivery *amqp.Delivery, err error) {
log.Debugf("Receiving message from queue [%v] for consumerId [%v)", queueName, amqpContext.consumerId)
// get delivery from internal map or create new one
deliveryChan := amqpContext.deliveryChannels[queueName]
if deliveryChan == nil {
amqpContext.registerConsumer(queueName)
if amqpContext.err != nil {
log.Errorf("Unable to register consumer %v", amqpContext.err)
return nil, amqpContext.err
}
deliveryChan = amqpContext.deliveryChannels[queueName]
}
var retDelivery amqp.Delivery
var ok bool
// return false after timeout or non-ok channel read
select {
case <-time.After(10 * time.Second):
amqpContext.err = ErrNoMessage
log.Debugf("No message delivered for consumerId [%v].", amqpContext.consumerId)
// stop consuming
amqpContext.channel.Cancel(amqpContext.consumerId, false)
return nil, amqpContext.err
case retDelivery, ok = <-deliveryChan:
if ok && (retDelivery.Body == nil || len(retDelivery.Body) == 0) {
amqpContext.err = errors.New("Failed to get delivery from delivery chan. Body is empty. ConsumerId [" + amqpContext.consumerId + "]")
return nil, amqpContext.err
} else if !ok {
// chan is closed -> remove consumer
log.Debugf("Chan is closed for consumerId [%v]. ", amqpContext.consumerId)
amqpContext.channel.Cancel(amqpContext.consumerId, false)
/*err := amqpContext.registerConsumer(queueName)
if err != nil {
log.Errorf("Unable to register consumer %v", err)
}*/
return nil, amqpContext.err
}
}
// unmarshal delivery
amqpContext.err = json.Unmarshal(retDelivery.Body, message)
return &retDelivery, amqpContext.err
}
// ReceiveMessage gets next message from queue with given queue name
func (amqpContext *AmqpContext) ReceiveProtoMessage(queueName string, message proto.Message) (delivery *amqp.Delivery, err error) {
log.Debugf("Receiving message from queue [%v] for consumerId [%v)", queueName, amqpContext.consumerId)
// get delivery from internal map or create new one
deliveryChan := amqpContext.deliveryChannels[queueName]
if deliveryChan == nil {
amqpContext.registerConsumer(queueName)
if amqpContext.err != nil {
log.Errorf("Unable to register consumer %v", amqpContext.err)
return nil, amqpContext.err
}
deliveryChan = amqpContext.deliveryChannels[queueName]
}
var retDelivery amqp.Delivery
var ok bool
// return false after timeout or non-ok channel read
select {
case <-time.After(10 * time.Second):
amqpContext.err = ErrNoMessage
log.Debugf("No message delivered for consumerId [%v].", amqpContext.consumerId)
// stop consuming
amqpContext.channel.Cancel(amqpContext.consumerId, false)
return nil, amqpContext.err
case retDelivery, ok = <-deliveryChan:
if ok && (retDelivery.Body == nil || len(retDelivery.Body) == 0) {
amqpContext.err = errors.New("Failed to get delivery from delivery chan. Body is empty. ConsumerId [" + amqpContext.consumerId + "]")
return nil, amqpContext.err
} else if !ok {
// chan is closed -> remove consumer
log.Debugf("Chan is closed for consumerId [%v]. ", amqpContext.consumerId)
amqpContext.channel.Cancel(amqpContext.consumerId, false)
/*err := amqpContext.registerConsumer(queueName)
if err != nil {
log.Errorf("Unable to register consumer %v", err)
}*/
return nil, amqpContext.err
}
}
// unmarshal delivery
amqpContext.err = protojson.Unmarshal(retDelivery.Body, message)
return &retDelivery, amqpContext.err
}
// Close closes the amqp connection
func (amqpContext *AmqpContext) Close() error {
log.Info("Closing AMQP connection and channel")
if amqpContext.channel != nil {
amqpContext.channel.Close()
}
if amqpContext.connection != nil {
amqpContext.err = amqpContext.connection.Close()
return amqpContext.err
} else {
log.Warnf("Connection not available")
return nil
}
}
func (amqpContext *AmqpContext) LastError() error {
return amqpContext.err
}
func (amqpContext *AmqpContext) ResetError() {
amqpContext.err = nil
}
func (amqpContext *AmqpContext) SetLastError(err error) {
amqpContext.err = err
}