Skip to content

Commit

Permalink
Publisher confirms used for reliable messaging
Browse files Browse the repository at this point in the history
Closes #36

This takes a simple solution of bridging two brokers log topic
exchanges only acknowledging or negatively acknowledging the delivery
when the publishing has been reliable received and handled.
  • Loading branch information
Sean Treadway committed Aug 1, 2012
1 parent 62e0a0b commit dd71072
Showing 1 changed file with 105 additions and 0 deletions.
105 changes: 105 additions & 0 deletions examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,111 @@ import (
"time"
)

func ExampleChannel_Confirm_bridge() {
// This example acts as a bridge, shoveling all messages sent from the source
// exchange "log" to destination exchange "log".

// Confirming publishes can help from overproduction and ensure every message
// is delivered.

// Setup the source of the store and forward
source, err := amqp.Dial("amqp://source/")
if err != nil {
log.Fatalf("connection.open source: %s", err)
}
defer source.Close()

chs, err := source.Channel()
if err != nil {
log.Fatalf("channel.open source: %s", err)
}

if err := chs.ExchangeDeclare("log", "topic", amqp.UntilDeleted, false, false, nil); err != nil {
log.Fatalf("exchange.declare destination: %s", err)
}

if _, err := chs.QueueDeclare("remote-tee", amqp.UntilUnused, false, false, nil); err != nil {
log.Fatalf("queue.declare source: %s", err)
}

if err := chs.QueueBind("remote-tee", "#", "logs", false, nil); err != nil {
log.Fatalf("queue.bind source: %s", err)
}

shovel, err := chs.Consume("remote-tee", "shovel", false, false, false, false, nil)
if err != nil {
log.Fatalf("basic.consume source: %s", err)
}

// Setup the destination of the store and forward
destination, err := amqp.Dial("amqp://destination/")
if err != nil {
log.Fatalf("connection.open destination: %s", err)
}
defer destination.Close()

chd, err := destination.Channel()
if err != nil {
log.Fatalf("channel.open destination: %s", err)
}

if err := chd.ExchangeDeclare("log", "topic", amqp.UntilDeleted, false, false, nil); err != nil {
log.Fatalf("exchange.declare destination: %s", err)
}

pubAcks, pubNacks := chd.NotifyConfirm(make(chan uint64), make(chan uint64))

if err := chd.Confirm(false); err != nil {
log.Fatalf("confirm.select destination: %s", err)
}

// Now pump the messages, one by one, a smarter implementation
// would batch the deliveries and use multiple ack/nacks
for {
msg, ok := <-shovel
if !ok {
log.Fatalf("source channel closed, see the reconnect example for handling this")
}

err = chd.Publish("logs", msg.RoutingKey, false, false, amqp.Publishing{
// Copy all the properties
ContentType: msg.ContentType,
ContentEncoding: msg.ContentEncoding,
DeliveryMode: msg.DeliveryMode,
Priority: msg.Priority,
CorrelationId: msg.CorrelationId,
ReplyTo: msg.ReplyTo,
Expiration: msg.Expiration,
MessageId: msg.MessageId,
Timestamp: msg.Timestamp,
Type: msg.Type,
UserId: msg.UserId,
AppId: msg.AppId,

// Custom headers
Headers: msg.Headers,

// And the body
Body: msg.Body,
})

if err != nil {
msg.Nack(false, false)
log.Fatalf("basic.publish destination: %s", msg)
}

// only ack the source delivery when the destination acks the publishing
// here you could check for delivery order by keeping a local state of
// expected delivery tags
select {
case <-pubAcks:
msg.Ack(false)
case <-pubNacks:
msg.Nack(false, false)
}
}
}

func ExampleChannel_Consume() {
// Connects opens an AMQP connection from the credentials in the URL.
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
Expand Down

0 comments on commit dd71072

Please sign in to comment.