Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Application testable Delivery - API Change #65

Merged
merged 1 commit into from
May 29, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1391,3 +1391,48 @@ func (me *Channel) Recover(requeue bool) error {
&basicRecoverOk{},
)
}

/*
Ack acknowledges a delivery by its delivery tag when having been consumed with
Channel.Consume or Channel.Get.

Ack acknowledges all message received prior to the delivery tag when multiple
is true.

See also Delivery.Ack
*/
func (me *Channel) Ack(tag uint64, multiple bool) error {
return me.send(me, &basicAck{
DeliveryTag: tag,
Multiple: multiple,
})
}

/*
Nack negatively acknowledges a delivery by its delivery tag. Prefer this
method to notify the server that you were not able to process this delivery and
it must be redelivered or dropped.

See also Delivery.Nack
*/
func (me *Channel) Nack(tag uint64, multiple bool, requeue bool) error {
return me.send(me, &basicNack{
DeliveryTag: tag,
Multiple: multiple,
Requeue: requeue,
})
}

/*
Reject negatively acknowledges a delivery by its delivery tag. Prefer Nack
over Reject when communicating with a RabbitMQ server because you can Nack
multiple messages, reducing the amount of protocol messages to exchange.

See also Delivery.Reject
*/
func (me *Channel) Reject(tag uint64, requeue bool) error {
return me.send(me, &basicReject{
DeliveryTag: tag,
Requeue: requeue,
})
}
49 changes: 20 additions & 29 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@ import (
"time"
)

// Acknowledger notifies the server of successful or failed consumption of
// delivieries via identifier found in the Delivery.DeliveryTag field.
//
// Applications can provide mock implementations in tests of Delivery handlers.
type Acknowledger interface {
Ack(tag uint64, multiple bool) error
Nack(tag uint64, multiple bool, requeue bool) error
Reject(tag uint64, requeue bool) error
}

// Delivery captures the fields for a previously delivered message resident in
// a queue to be delivered by the server to a consumer from Channel.Consume or
// Channel.Get.
type Delivery struct {
channel *Channel
Acknowledger Acknowledger // the channel from which this delivery arrived

Headers Table // Application or header exchange table

Expand Down Expand Up @@ -49,7 +59,7 @@ func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()

delivery := Delivery{
channel: channel,
Acknowledger: channel,

Headers: props.Headers,
ContentType: props.ContentType,
Expand Down Expand Up @@ -89,7 +99,8 @@ func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
}

/*
Ack acknowledges that the client or server has finished work on a delivery.
Ack delegates an acknowledgement through the Acknowledger interface that the
client or server has finished work on a delivery.

All deliveries in AMQP must be acknowledged. If you called Channel.Consume
with autoAck true then the server will be automatically ack each message and
Expand All @@ -107,14 +118,11 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (me Delivery) Ack(multiple bool) error {
return me.channel.send(me.channel, &basicAck{
DeliveryTag: me.DeliveryTag,
Multiple: multiple,
})
return me.Acknowledger.Ack(me.DeliveryTag, multiple)
}

/*
Reject negatively acknowledge processing of this message.
Reject delegates a negatively acknowledgement through the Acknowledger interface.

When requeue is true, queue this message to be delivered to a consumer on a
different channel. When requeue is false or the server is unable to queue this
Expand All @@ -127,28 +135,15 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (me Delivery) Reject(requeue bool) error {
return me.channel.send(me.channel, &basicReject{
DeliveryTag: me.DeliveryTag,
Requeue: requeue,
})
}

/*
Cancel delegates to a Channel.Cancel and uses the consumer identity in this delivery to cancel the consumer.

An error indicates the channel is closed.

*/
func (me Delivery) Cancel(noWait bool) error {
return me.channel.Cancel(me.ConsumerTag, noWait)
return me.Acknowledger.Reject(me.DeliveryTag, requeue)
}

/*
Nack negatively acknowledge the delivery of message(s) identified by the
deliveryTag from either the client or server.
delivery tag from either the client or server.

When multiple is true, nack messages up to and including delivered messages up
until the deliveryTag delivered on the same channel.
until the delivery tag delivered on the same channel.

When requeue is true, request the server to deliver this message to a different
consumer. If it is not possible or requeue is false, the message will be
Expand All @@ -162,9 +157,5 @@ Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (me Delivery) Nack(multiple, requeue bool) error {
return me.channel.send(me.channel, &basicNack{
DeliveryTag: me.DeliveryTag,
Multiple: multiple,
Requeue: requeue,
})
return me.Acknowledger.Nack(me.DeliveryTag, multiple, requeue)
}