Skip to content

Commit

Permalink
Merge pull request #71 from Lunastryke/feat/update-consumer
Browse files Browse the repository at this point in the history
[rmq][consumer] Cleaned up log spam and updated examples
  • Loading branch information
kelchy committed Jul 18, 2023
2 parents 4866810 + 19f8b22 commit 6ec388a
Show file tree
Hide file tree
Showing 7 changed files with 59 additions and 107 deletions.
1 change: 0 additions & 1 deletion rmq/consumer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/kelchy/go-lib/rmq/consumer/internal/connectionmanager"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down
1 change: 0 additions & 1 deletion rmq/consumer/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down
1 change: 0 additions & 1 deletion rmq/consumer/consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rabbitmq

import (
"github.com/kelchy/go-lib/rmq/consumer/internal/logger"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down
7 changes: 5 additions & 2 deletions rmq/consumer/declare.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rabbitmq

import (
"github.com/kelchy/go-lib/log"
"github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager"
"github.com/kelchy/go-lib/rmq/consumer/internal/logger"
)
Expand Down Expand Up @@ -96,7 +97,8 @@ func DeclareExchange(
options ExchangeOptions,
) error {
// Creates a new channel manager
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval)
errLogger, _ := log.New("erroronly")
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval)
if err != nil {
return err
}
Expand All @@ -111,7 +113,8 @@ func DeclareQueue(
options QueueOptions,
) error {
// Creates a new channel manager
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval)
errLogger, _ := log.New("erroronly")
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval)
if err != nil {
return err
}
Expand Down
152 changes: 52 additions & 100 deletions rmq/consumer/example/consumer.example.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

rabbitmq "github.com/kelchy/go-lib/rmq/consumer"
)

/**
* This example demonstrates how to create a consumer with retry and dead letter queue
* The consumer will consume messages from the queue "q_events" with routing key "events.basic"
* If the message is Nacked, it will be sent to the dead letter queue "q_event_retry"
* The dead letter queue will retry the message 3 times with a delay of 10 seconds between each retry
* If the message is Nacked after the 3rd retry, it will be handled by the dead message handler
*/

func main() {
conn, err := rabbitmq.NewConn(
os.Getenv("RMQ_URI"),
Expand All @@ -21,7 +28,7 @@ func main() {
}
defer conn.Close()

// Verify that exchange exists (Not needed if it is declared in NewConsumer)
// Verify that exchange exists
if err := rabbitmq.DeclareExchange(
conn,
rabbitmq.ExchangeOptions{
Expand All @@ -39,117 +46,51 @@ func main() {
return
}

// Verify that queue exists (Not needed if it is declared in NewConsumer)
if err := rabbitmq.DeclareQueue(
// Consumer options can be changed according to your needs
rabbitmq.NewConsumer(
conn,
rabbitmq.QueueOptions{
Name: "my_queue",
Declare: true,
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: map[string]interface{}{
eventHandler,
deadMessageHandler,
"q_events",
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsRoutingKey("events.basic"),
rabbitmq.WithConsumerOptionsConsumerDlxRetry,
rabbitmq.WithConsumerOptionsConsumerRetryLimit(3),
rabbitmq.WithConsumerOptionsConsumerAutoAck(false),
rabbitmq.WithConsumerOptionsQueueDurable,
rabbitmq.WithConsumerOptionsQueueArgs(
map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key_dlk",
"x-dead-letter-routing-key": "retry.events.basic",
},
},
); err != nil {
fmt.Println(err)
return
}

// Verify that queue is bound to exchange (Not needed binding is declared in NewConsumer)
if err := rabbitmq.DeclareBinding(
conn,
rabbitmq.BindingDeclareOptions{
QueueName: "my_queue",
ExchangeName: "events",
RoutingKey: "test_routing_key",
NoWait: false,
Args: nil,
Declare: true,
}); err != nil {
fmt.Println(err)
return
}
),
)

if err := rabbitmq.DeclareQueue(
// Declares the DLQ and binds it back to the original queue
rabbitmq.DeclareQueue(
conn,
rabbitmq.QueueOptions{
Name: "my_queue_dlx",
Declare: true,
Name: "q_event_retry",
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key",
"x-message-ttl": 10000,
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "events.basic",
},
Declare: true,
},
); err != nil {
fmt.Println(err)
return
}

// Verify that queue is bound to exchange (Not needed binding is declared in NewConsumer)
if err := rabbitmq.DeclareBinding(
)
rabbitmq.DeclareBinding(
conn,
rabbitmq.BindingDeclareOptions{
QueueName: "my_queue_dlx",
QueueName: "q_event_retry",
ExchangeName: "events",
RoutingKey: "test_routing_key_dlk",
NoWait: false,
Args: nil,
Declare: true,
}); err != nil {
fmt.Println(err)
return
}

consumer, err := rabbitmq.NewConsumer(
conn,
eventHandler,
deadMessageHandler,
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsRoutingKey("test_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsQueueArgs(map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key_dlk",
}),
rabbitmq.WithConsumerOptionsConsumerRetryLimit(3),
rabbitmq.WithConsumerOptionsConsumerDlxRetry,
rabbitmq.WithConsumerOptionsQueueDurable,
)
if err != nil {
fmt.Println(err)
}
defer consumer.Close()

consumer2, err := rabbitmq.NewConsumer(
conn,
eventHandler,
deadMessageHandler,
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsRoutingKey("test_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsQueueArgs(map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key_dlk",
}),
rabbitmq.WithConsumerOptionsConsumerRetryLimit(3),
rabbitmq.WithConsumerOptionsConsumerDlxRetry,
rabbitmq.WithConsumerOptionsQueueDurable,
RoutingKey: "retry.events.basic",
},
)
if err != nil {
log.Fatal(err)
}
defer consumer2.Close()

// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
Expand All @@ -171,18 +112,29 @@ func main() {

func eventHandler(d rabbitmq.Delivery) error {
fmt.Printf("consumed: %s, %v \n", string(d.MessageId), string(d.Body))
return fmt.Errorf("TEST_ERR")
err := d.Ack(false)
if err != nil {
return fmt.Errorf("nack")
if err := d.Ack(false); err != nil {
return err
}
return nil
}

// Alternatively can use a wrapper for the event handler so you will not need to call ack manually within each different consumer
func eventHandlerWrapper(f rabbitmq.EventHandler) func(msg rabbitmq.Delivery) error {
return func(msg rabbitmq.Delivery) error {
if err := f(msg); err != nil {
return err
}
if err := msg.Ack(false); err != nil {
return err
}
return nil
}
}

func deadMessageHandler(d rabbitmq.Delivery) error {
fmt.Printf("Dead Message Received: %s, %v \n", string(d.MessageId), string(d.Body))
err := d.Ack(false)
if err != nil {
if err := d.Ack(false); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion rmq/consumer/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//v0.1.0
//v0.1.1
module github.com/kelchy/go-lib/rmq/consumer

go 1.19
Expand Down
2 changes: 1 addition & 1 deletion rmq/consumer/internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ type Logger interface {
}

// DefaultLogger is the default logger used by the library.
var DefaultLogger, _ = log.New("standard")
var DefaultLogger, _ = log.New("erroronly")

0 comments on commit 6ec388a

Please sign in to comment.