Skip to content

Commit

Permalink
Minor improvements to examples (#86)
Browse files Browse the repository at this point in the history
* Improvements to simple consumer

* Improvements to simple producer
  • Loading branch information
lukebakken committed May 21, 2022
1 parent b39be01 commit ffeb46d
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 33 deletions.
83 changes: 60 additions & 23 deletions _examples/simple-consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"flag"
"fmt"
"log"
"os"
"os/signal"
"time"
"syscall"

amqp "github.com/rabbitmq/amqp091-go"
)
Expand All @@ -21,6 +24,11 @@ var (
bindingKey = flag.String("key", "test-key", "AMQP binding key")
consumerTag = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
lifetime = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
verbose = flag.Bool("verbose", true, "enable verbose output of message data")
autoAck = flag.Bool("auto_ack", false, "enable message auto-ack")
ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix)
Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix)
deliveryCount int = 0
)

func init() {
Expand All @@ -30,21 +38,23 @@ func init() {
func main() {
c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag)
if err != nil {
log.Fatalf("%s", err)
ErrLog.Fatalf("%s", err)
}

SetupCloseHandler(c)

if *lifetime > 0 {
log.Printf("running for %s", *lifetime)
Log.Printf("running for %s", *lifetime)
time.Sleep(*lifetime)
} else {
log.Printf("running forever")
Log.Printf("running forever")
select {}
}

log.Printf("shutting down")
Log.Printf("shutting down")

if err := c.Shutdown(); err != nil {
log.Fatalf("error during shutdown: %s", err)
ErrLog.Fatalf("error during shutdown: %s", err)
}
}

Expand All @@ -55,6 +65,19 @@ type Consumer struct {
done chan error
}

func SetupCloseHandler(consumer *Consumer) {
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
Log.Printf("Ctrl+C pressed in Terminal")
if err := consumer.Shutdown(); err != nil {
ErrLog.Fatalf("error during shutdown: %s", err)
}
os.Exit(0)
}()
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (*Consumer, error) {
c := &Consumer{
conn: nil,
Expand All @@ -65,23 +88,23 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (

var err error

log.Printf("dialing %q", amqpURI)
Log.Printf("dialing %q", amqpURI)
c.conn, err = amqp.Dial(amqpURI)
if err != nil {
return nil, fmt.Errorf("Dial: %s", err)
}

go func() {
fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
Log.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
}()

log.Printf("got Connection, getting Channel")
Log.Printf("got Connection, getting Channel")
c.channel, err = c.conn.Channel()
if err != nil {
return nil, fmt.Errorf("Channel: %s", err)
}

log.Printf("got Channel, declaring Exchange (%q)", exchange)
Log.Printf("got Channel, declaring Exchange (%q)", exchange)
if err = c.channel.ExchangeDeclare(
exchange, // name of the exchange
exchangeType, // type
Expand All @@ -94,7 +117,7 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (
return nil, fmt.Errorf("Exchange Declare: %s", err)
}

log.Printf("declared Exchange, declaring Queue %q", queueName)
Log.Printf("declared Exchange, declaring Queue %q", queueName)
queue, err := c.channel.QueueDeclare(
queueName, // name of the queue
true, // durable
Expand All @@ -107,7 +130,7 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (
return nil, fmt.Errorf("Queue Declare: %s", err)
}

log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
Log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
queue.Name, queue.Messages, queue.Consumers, key)

if err = c.channel.QueueBind(
Expand All @@ -120,11 +143,11 @@ func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (
return nil, fmt.Errorf("Queue Bind: %s", err)
}

log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag)
Log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag)
deliveries, err := c.channel.Consume(
queue.Name, // name
c.tag, // consumerTag,
false, // noAck
*autoAck, // autoAck
false, // exclusive
false, // noLocal
false, // noWait
Expand All @@ -149,22 +172,36 @@ func (c *Consumer) Shutdown() error {
return fmt.Errorf("AMQP connection close error: %s", err)
}

defer log.Printf("AMQP shutdown OK")
defer Log.Printf("AMQP shutdown OK")

// wait for handle() to exit
return <-c.done
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
cleanup := func() {
Log.Printf("handle: deliveries channel closed")
done <- nil
}

defer cleanup()

for d := range deliveries {
log.Printf(
"got %dB delivery: [%v] %q",
len(d.Body),
d.DeliveryTag,
d.Body,
)
d.Ack(false)
deliveryCount++
if *verbose == true {
Log.Printf(
"got %dB delivery: [%v] %q",
len(d.Body),
d.DeliveryTag,
d.Body,
)
} else {
if deliveryCount % 65536 == 0 {
Log.Printf("delivery count %d", deliveryCount)
}
}
if *autoAck == false {
d.Ack(false)
}
}
log.Printf("handle: deliveries channel closed")
done <- nil
}
23 changes: 13 additions & 10 deletions _examples/simple-producer/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"flag"
"fmt"
"log"
"os"

amqp "github.com/rabbitmq/amqp091-go"
)
Expand All @@ -18,6 +19,8 @@ var (
routingKey = flag.String("key", "test-key", "AMQP routing key")
body = flag.String("body", "foobar", "Body of message")
reliable = flag.Bool("reliable", true, "Wait for the publisher confirmation before exiting")
ErrLog = log.New(os.Stderr, "[ERROR] ", log.LstdFlags|log.Lmsgprefix)
Log = log.New(os.Stdout, "[INFO] ", log.LstdFlags|log.Lmsgprefix)
)

func init() {
Expand All @@ -26,9 +29,9 @@ func init() {

func main() {
if err := publish(*uri, *exchangeName, *exchangeType, *routingKey, *body, *reliable); err != nil {
log.Fatalf("%s", err)
ErrLog.Fatalf("%s", err)
}
log.Printf("published %dB OK", len(*body))
Log.Printf("published %dB OK", len(*body))
}

func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable bool) error {
Expand All @@ -37,20 +40,20 @@ func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable
// all in one go. In a real service, you probably want to maintain a
// long-lived connection as state, and publish against that.

log.Printf("dialing %q", amqpURI)
Log.Printf("dialing %q", amqpURI)
connection, err := amqp.Dial(amqpURI)
if err != nil {
return fmt.Errorf("Dial: %s", err)
}
defer connection.Close()

log.Printf("got Connection, getting Channel")
Log.Printf("got Connection, getting Channel")
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}

log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
Log.Printf("got Channel, declaring %q Exchange (%q)", exchangeType, exchange)
if err := channel.ExchangeDeclare(
exchange, // name
exchangeType, // type
Expand All @@ -66,7 +69,7 @@ func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable
// Reliable publisher confirms require confirm.select support from the
// connection.
if reliable {
log.Printf("enabling publishing confirms.")
Log.Printf("enabling publishing confirms.")
if err := channel.Confirm(false); err != nil {
return fmt.Errorf("Channel could not be put into confirm mode: %s", err)
}
Expand All @@ -76,7 +79,7 @@ func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable
defer confirmOne(confirms)
}

log.Printf("declared Exchange, publishing %dB body (%q)", len(body), body)
Log.Printf("declared Exchange, publishing %dB body (%q)", len(body), body)
if err = channel.Publish(
exchange, // publish to an exchange
routingKey, // routing to 0 or more queues
Expand All @@ -102,11 +105,11 @@ func publish(amqpURI, exchange, exchangeType, routingKey, body string, reliable
// set of unacknowledged sequence numbers and loop until the publishing channel
// is closed.
func confirmOne(confirms <-chan amqp.Confirmation) {
log.Printf("waiting for confirmation of one publishing")
Log.Printf("waiting for confirmation of one publishing")

if confirmed := <-confirms; confirmed.Ack {
log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
Log.Printf("confirmed delivery with delivery tag: %d", confirmed.DeliveryTag)
} else {
log.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
ErrLog.Printf("failed delivery of delivery tag: %d", confirmed.DeliveryTag)
}
}

0 comments on commit ffeb46d

Please sign in to comment.