-
-
Notifications
You must be signed in to change notification settings - Fork 36
/
channel.go
66 lines (51 loc) · 1.4 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package rabbit
import (
"context"
"fmt"
"sync/atomic"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/shortlink-org/shortlink/pkg/logger"
)
// Channel amqp.Channel wapper
type Channel struct {
log logger.Logger
*amqp.Channel
delay int
closed int32
}
// IsClosed indicate closed by developer
func (ch *Channel) IsClosed() bool {
return atomic.LoadInt32(&ch.closed) == 1
}
// Close ensure closed flag set
func (ch *Channel) Close() error {
if ch.IsClosed() {
return amqp.ErrClosed
}
atomic.StoreInt32(&ch.closed, 1)
return ch.Channel.Close()
}
// Consume wrap amqp.Channel.Consume, the returned delivery will end only when channel closed by developer
func (ch *Channel) Consume(ctx context.Context, queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error) {
deliveries := make(chan amqp.Delivery)
go func() {
for {
d, err := ch.Channel.ConsumeWithContext(ctx, queue, consumer, autoAck, exclusive, noLocal, noWait, args)
if err != nil {
ch.log.Error(fmt.Errorf("consume failed, err: %w", err).Error())
time.Sleep(time.Duration(ch.delay) * time.Second)
continue
}
for msg := range d {
deliveries <- msg
}
// sleep before IsClose call. closed flag may not set before sleep.
time.Sleep(time.Duration(ch.delay) * time.Second)
if ch.IsClosed() {
break
}
}
}()
return deliveries, nil
}