-
Notifications
You must be signed in to change notification settings - Fork 0
/
channel.go
138 lines (111 loc) · 3.92 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package angora
import (
"context"
"fmt"
"sync"
"sync/atomic"
"github.com/streadway/amqp"
)
// confirmChCap is the buffer capacity for a publish confirmation channel.
const confirmChCap = 1
// channel is threads-safe and is only held by one publisher at a time.
// It is an extended amqp channel, which manages the asynchronous publish confirm.
//
// Thread safety is achieved through channel pool.
type channel struct {
*amqp.Channel
// flag to decide whether to set channel in confirm mode.
confirmPublish bool
// Callback function when pub confirm with ACK arrives.
onPubConfirmFn OnPubConfirmFunc
// buffered channel for reading amqp.Confirmation after publish.
notifyConfirmCh chan amqp.Confirmation
// stores the messages by deliveryTag to track unconfirmed messages from Server.
unconfirmedMsgTracker *sync.Map
// number of messages published for a given channel.
published uint64
}
type channelParams struct {
amqpCh *amqp.Channel
confirmPublish bool
onPubConfirmFn OnPubConfirmFunc
}
// newChannel creates a new amqp channel.
// Registers notifyClose channel and if enabled, notifyPublish channel.
func newChannel(params channelParams) (ch *channel, err error) {
ch = &channel{
Channel: params.amqpCh,
confirmPublish: params.confirmPublish,
}
if ch.confirmPublish {
if err = params.amqpCh.Confirm(false); err != nil {
return nil, fmt.Errorf("%s: failed to put channel to publish confirm: %w", projPrefix, err)
}
ch.onPubConfirmFn = params.onPubConfirmFn
ch.notifyConfirmCh = make(chan amqp.Confirmation, confirmChCap)
ch.NotifyPublish(ch.notifyConfirmCh)
// This is for tracking unconfirmed published messages, if ack=false
// returned by Broker, messages can be acted upon such as republish.
ch.unconfirmedMsgTracker = new(sync.Map)
go ch.asyncPubConfirmReader()
}
return
}
// publish publishes the message to Exchange based on ProducerConfig.
// if channel is in confirm mode, it gets the next Publishing seq number and
// use it to store the unconfirmed payload to reprocess in case, broker fails and
// message gets undelivered.
func (ch *channel) publish(ctx context.Context, producerCfg ProducerConfig) error {
var n uint64
if ch.confirmPublish {
n = ch.nextPublishSeqNo()
ch.unconfirmedMsgTracker.Store(n, UnconfirmedPub{
ProducerCfg: producerCfg,
})
}
select {
case <-ctx.Done():
return ctx.Err()
default:
if err := ch.Publish(
producerCfg.Exchange,
producerCfg.RoutingKey,
producerCfg.Mandatory,
producerCfg.Immediate,
producerCfg.Publishing,
); err != nil {
// Deleting as it is failed to publish immediately and there will not be
// any pub confirm with NACK, so clean up the message tracker.
ch.unconfirmedMsgTracker.Delete(n)
return fmt.Errorf(
"%s: channel failed to publish message: %d Exchange: %s routingKey: %s: %w",
projPrefix, n, producerCfg.Exchange, producerCfg.RoutingKey, err,
)
}
}
return nil
}
// asyncPubConfirmReader should be run asynchronously to listen for confirmation for
// published payload, notifyConfirmCh is closed if the server does not support
// confirm mode on channel and in this case, this function will just return.
func (ch *channel) asyncPubConfirmReader() {
for conf := range ch.notifyConfirmCh {
trackedPayload, ok := ch.unconfirmedMsgTracker.Load(conf.DeliveryTag)
if !ok {
// if DeliveryTag is not added for any reason, just continue.
continue
}
// Call the callback handler for processing the delivery ACK
ch.onPubConfirmFn(trackedPayload, conf.Ack)
// delete from the tracking map of unconfirmed payload.
ch.unconfirmedMsgTracker.Delete(conf.DeliveryTag)
}
}
// nextPublishSeqNo generates the next Publishing sequence number, it is always
// incremented by 1 for a given channel.
//
// when PR https://github.com/streadway/amqp/pull/478 is merged, this method can
// be removed.
func (ch *channel) nextPublishSeqNo() uint64 {
return atomic.AddUint64(&ch.published, 1)
}