New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Returns are not delivered always before ACKs (but they should) #351
Comments
This is my (dirty) fix: diff --git a/channel.go b/channel.go
index dd2552c..f90d823 100644
--- a/channel.go
+++ b/channel.go
@@ -297,7 +297,12 @@ func (ch *Channel) dispatch(msg message) {
}
ch.notifyM.RUnlock()
ch.consumers.cancel(m.ConsumerTag)
-
+ /*
+ # When Will Published Messages Be Confirmed by the Broker?
+ For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue
+ (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client
+ before basic.ack. The same is true for negative acknowledgements (basic.nack).
+ */
case *basicReturn:
ret := newReturn(*m)
ch.notifyM.RLock()
@@ -307,6 +312,14 @@ func (ch *Channel) dispatch(msg message) {
ch.notifyM.RUnlock()
case *basicAck:
+ // send the ACK also on the return channel, so that the sequence there is always preserved
+ ret := newAckReturn(true, m.DeliveryTag, m.Multiple, false)
+ ch.notifyM.RLock()
+ for _, c := range ch.returns {
+ c <- *ret
+ }
+ ch.notifyM.RUnlock()
+
if ch.confirming {
if m.Multiple {
ch.confirms.Multiple(Confirmation{m.DeliveryTag, true})
@@ -316,6 +329,14 @@ func (ch *Channel) dispatch(msg message) {
}
case *basicNack:
+ // send the NACK also on the return channel, so that the sequence there is always preserved
+ ret := newAckReturn(false, m.DeliveryTag, m.Multiple, m.Requeue)
+ ch.notifyM.RLock()
+ for _, c := range ch.returns {
+ c <- *ret
+ }
+ ch.notifyM.RUnlock()
+
if ch.confirming {
if m.Multiple {
ch.confirms.Multiple(Confirmation{m.DeliveryTag, false})
diff --git a/return.go b/return.go
index 10dcedb..feefad8 100644
--- a/return.go
+++ b/return.go
@@ -34,6 +34,23 @@ type Return struct {
AppId string // application use - creating application
Body []byte
+
+ // payload for ACK/NACK
+ ACKType bool
+ ACK bool
+ Multiple bool
+ Requeue bool
+ DeliveryTag uint64
+}
+
+func newAckReturn(isACK bool, deliveryTag uint64, multiple, requeue bool) *Return {
+ return &Return{
+ ACKType: true,
+ ACK: isACK,
+ DeliveryTag: deliveryTag,
+ Multiple: multiple,
+ Requeue: requeue,
+ }
}
func newReturn(msg basicReturn) *Return { It may be dirty but at least makes return/ACKs be delivered in the correct order. I think a proper solution should rework confirmations in this sense (order preservation with RETURNs). |
Thanks, looks like you are onto something. I don't like how returns and acks are intertwined in your example but cannot think of a better solution without reviewing how return handlers are implemented. |
Me neither! I was hoping that by showing where the pain is, you could come up with something more elegant? Although I don't see how it would be possible without some refactoring. |
Quoting RabbitMQ documentation (https://www.rabbitmq.com/confirms.html#when-publishes-are-confirmed):
From the above I deduce that a generic client can detect the existence of a return by the fact that such return is always received before the ACK.
However, when using this client AMQP library there is no guarantee that this order is respected and I am in fact receiving returns after ACKs.
I can produce a testcase but can somebody please confirm whether this is by design or not?
My problem is (apparently) simple: send multiple messages concurrently, verify that each of them has been received and routed (with
immediate=false
, butdurable=true
). If the server-side constraint of "returns are ordered before ACKs" is not respected, this becomes impossible.I am also suggesting here to add an example for consistent (transactional) message delivery with a concurrent pattern (otherwise the usage and capabilities of all the channels involved, with proper cleanup, might be unclear), but we can address that separately in a PR
Edit: I also noticed that returns have no
DeliveryTag
, which perhaps would be impossible to associate, so I am usingMessageId
(application-level) to detect the returns in my test.The text was updated successfully, but these errors were encountered: