Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rmq][consumer] Cleaned up log spam and updated examples #71

Merged
merged 1 commit into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion rmq/consumer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"

"github.com/kelchy/go-lib/rmq/consumer/internal/connectionmanager"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down
1 change: 0 additions & 1 deletion rmq/consumer/consume.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"sync"

"github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down
1 change: 0 additions & 1 deletion rmq/consumer/consumer_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package rabbitmq

import (
"github.com/kelchy/go-lib/rmq/consumer/internal/logger"

amqp "github.com/rabbitmq/amqp091-go"
)

Expand Down
7 changes: 5 additions & 2 deletions rmq/consumer/declare.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rabbitmq

import (
"github.com/kelchy/go-lib/log"
"github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager"
"github.com/kelchy/go-lib/rmq/consumer/internal/logger"
)
Expand Down Expand Up @@ -96,7 +97,8 @@ func DeclareExchange(
options ExchangeOptions,
) error {
// Creates a new channel manager
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval)
errLogger, _ := log.New("erroronly")
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval)
if err != nil {
return err
}
Expand All @@ -111,7 +113,8 @@ func DeclareQueue(
options QueueOptions,
) error {
// Creates a new channel manager
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval)
errLogger, _ := log.New("erroronly")
chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval)
if err != nil {
return err
}
Expand Down
152 changes: 52 additions & 100 deletions rmq/consumer/example/consumer.example.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ package main

import (
"fmt"
"log"
"os"
"os/signal"
"syscall"

rabbitmq "github.com/kelchy/go-lib/rmq/consumer"
)

/**
* This example demonstrates how to create a consumer with retry and dead letter queue
* The consumer will consume messages from the queue "q_events" with routing key "events.basic"
* If the message is Nacked, it will be sent to the dead letter queue "q_event_retry"
* The dead letter queue will retry the message 3 times with a delay of 10 seconds between each retry
* If the message is Nacked after the 3rd retry, it will be handled by the dead message handler
*/

func main() {
conn, err := rabbitmq.NewConn(
os.Getenv("RMQ_URI"),
Expand All @@ -21,7 +28,7 @@ func main() {
}
defer conn.Close()

// Verify that exchange exists (Not needed if it is declared in NewConsumer)
// Verify that exchange exists
if err := rabbitmq.DeclareExchange(
conn,
rabbitmq.ExchangeOptions{
Expand All @@ -39,117 +46,51 @@ func main() {
return
}

// Verify that queue exists (Not needed if it is declared in NewConsumer)
if err := rabbitmq.DeclareQueue(
// Consumer options can be changed according to your needs
rabbitmq.NewConsumer(
conn,
rabbitmq.QueueOptions{
Name: "my_queue",
Declare: true,
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: map[string]interface{}{
eventHandler,
deadMessageHandler,
"q_events",
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsRoutingKey("events.basic"),
rabbitmq.WithConsumerOptionsConsumerDlxRetry,
rabbitmq.WithConsumerOptionsConsumerRetryLimit(3),
rabbitmq.WithConsumerOptionsConsumerAutoAck(false),
rabbitmq.WithConsumerOptionsQueueDurable,
rabbitmq.WithConsumerOptionsQueueArgs(
map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key_dlk",
"x-dead-letter-routing-key": "retry.events.basic",
},
},
); err != nil {
fmt.Println(err)
return
}

// Verify that queue is bound to exchange (Not needed binding is declared in NewConsumer)
if err := rabbitmq.DeclareBinding(
conn,
rabbitmq.BindingDeclareOptions{
QueueName: "my_queue",
ExchangeName: "events",
RoutingKey: "test_routing_key",
NoWait: false,
Args: nil,
Declare: true,
}); err != nil {
fmt.Println(err)
return
}
),
)

if err := rabbitmq.DeclareQueue(
// Declares the DLQ and binds it back to the original queue
rabbitmq.DeclareQueue(
conn,
rabbitmq.QueueOptions{
Name: "my_queue_dlx",
Declare: true,
Name: "q_event_retry",
Durable: true,
AutoDelete: false,
Exclusive: false,
NoWait: false,
Args: map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key",
"x-message-ttl": 10000,
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "events.basic",
},
Declare: true,
},
); err != nil {
fmt.Println(err)
return
}

// Verify that queue is bound to exchange (Not needed binding is declared in NewConsumer)
if err := rabbitmq.DeclareBinding(
)
rabbitmq.DeclareBinding(
conn,
rabbitmq.BindingDeclareOptions{
QueueName: "my_queue_dlx",
QueueName: "q_event_retry",
ExchangeName: "events",
RoutingKey: "test_routing_key_dlk",
NoWait: false,
Args: nil,
Declare: true,
}); err != nil {
fmt.Println(err)
return
}

consumer, err := rabbitmq.NewConsumer(
conn,
eventHandler,
deadMessageHandler,
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsRoutingKey("test_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsQueueArgs(map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key_dlk",
}),
rabbitmq.WithConsumerOptionsConsumerRetryLimit(3),
rabbitmq.WithConsumerOptionsConsumerDlxRetry,
rabbitmq.WithConsumerOptionsQueueDurable,
)
if err != nil {
fmt.Println(err)
}
defer consumer.Close()

consumer2, err := rabbitmq.NewConsumer(
conn,
eventHandler,
deadMessageHandler,
"my_queue",
rabbitmq.WithConsumerOptionsConcurrency(2),
rabbitmq.WithConsumerOptionsRoutingKey("test_routing_key"),
rabbitmq.WithConsumerOptionsExchangeName("events"),
rabbitmq.WithConsumerOptionsQueueArgs(map[string]interface{}{
"x-dead-letter-exchange": "events",
"x-dead-letter-routing-key": "test_routing_key_dlk",
}),
rabbitmq.WithConsumerOptionsConsumerRetryLimit(3),
rabbitmq.WithConsumerOptionsConsumerDlxRetry,
rabbitmq.WithConsumerOptionsQueueDurable,
RoutingKey: "retry.events.basic",
},
)
if err != nil {
log.Fatal(err)
}
defer consumer2.Close()

// block main thread - wait for shutdown signal
sigs := make(chan os.Signal, 1)
Expand All @@ -171,18 +112,29 @@ func main() {

func eventHandler(d rabbitmq.Delivery) error {
fmt.Printf("consumed: %s, %v \n", string(d.MessageId), string(d.Body))
return fmt.Errorf("TEST_ERR")
err := d.Ack(false)
if err != nil {
return fmt.Errorf("nack")
if err := d.Ack(false); err != nil {
return err
}
return nil
}

// Alternatively can use a wrapper for the event handler so you will not need to call ack manually within each different consumer
func eventHandlerWrapper(f rabbitmq.EventHandler) func(msg rabbitmq.Delivery) error {
return func(msg rabbitmq.Delivery) error {
if err := f(msg); err != nil {
return err
}
if err := msg.Ack(false); err != nil {
return err
}
return nil
}
}

func deadMessageHandler(d rabbitmq.Delivery) error {
fmt.Printf("Dead Message Received: %s, %v \n", string(d.MessageId), string(d.Body))
err := d.Ack(false)
if err != nil {
if err := d.Ack(false); err != nil {
return err
}
return nil
Expand Down
2 changes: 1 addition & 1 deletion rmq/consumer/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//v0.1.0
//v0.1.1
module github.com/kelchy/go-lib/rmq/consumer

go 1.19
Expand Down
2 changes: 1 addition & 1 deletion rmq/consumer/internal/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ type Logger interface {
}

// DefaultLogger is the default logger used by the library.
var DefaultLogger, _ = log.New("standard")
var DefaultLogger, _ = log.New("erroronly")