From 19f8b228147f6d07ef5dc76d0c5a5746e216efac Mon Sep 17 00:00:00 2001 From: Xuan Hao Date: Tue, 18 Jul 2023 14:24:22 +0800 Subject: [PATCH] Introduced additional functionality and clearer examples --- rmq/consumer/connection.go | 1 - rmq/consumer/consume.go | 1 - rmq/consumer/consumer_options.go | 1 - rmq/consumer/declare.go | 7 +- rmq/consumer/example/consumer.example.go | 152 ++++++++--------------- rmq/consumer/go.mod | 2 +- rmq/consumer/internal/logger/logger.go | 2 +- 7 files changed, 59 insertions(+), 107 deletions(-) diff --git a/rmq/consumer/connection.go b/rmq/consumer/connection.go index c55cedf..06fe1c5 100644 --- a/rmq/consumer/connection.go +++ b/rmq/consumer/connection.go @@ -4,7 +4,6 @@ import ( "fmt" "github.com/kelchy/go-lib/rmq/consumer/internal/connectionmanager" - amqp "github.com/rabbitmq/amqp091-go" ) diff --git a/rmq/consumer/consume.go b/rmq/consumer/consume.go index ef0a08b..cc1a63e 100644 --- a/rmq/consumer/consume.go +++ b/rmq/consumer/consume.go @@ -6,7 +6,6 @@ import ( "sync" "github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager" - amqp "github.com/rabbitmq/amqp091-go" ) diff --git a/rmq/consumer/consumer_options.go b/rmq/consumer/consumer_options.go index 45a1dc6..48601fa 100644 --- a/rmq/consumer/consumer_options.go +++ b/rmq/consumer/consumer_options.go @@ -2,7 +2,6 @@ package rabbitmq import ( "github.com/kelchy/go-lib/rmq/consumer/internal/logger" - amqp "github.com/rabbitmq/amqp091-go" ) diff --git a/rmq/consumer/declare.go b/rmq/consumer/declare.go index 7df1bef..a86b055 100644 --- a/rmq/consumer/declare.go +++ b/rmq/consumer/declare.go @@ -1,6 +1,7 @@ package rabbitmq import ( + "github.com/kelchy/go-lib/log" "github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager" "github.com/kelchy/go-lib/rmq/consumer/internal/logger" ) @@ -96,7 +97,8 @@ func DeclareExchange( options ExchangeOptions, ) error { // Creates a new channel manager - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval) + errLogger, _ := log.New("erroronly") + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval) if err != nil { return err } @@ -111,7 +113,8 @@ func DeclareQueue( options QueueOptions, ) error { // Creates a new channel manager - chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval) + errLogger, _ := log.New("erroronly") + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval) if err != nil { return err } diff --git a/rmq/consumer/example/consumer.example.go b/rmq/consumer/example/consumer.example.go index 332244a..8a3973a 100644 --- a/rmq/consumer/example/consumer.example.go +++ b/rmq/consumer/example/consumer.example.go @@ -2,7 +2,6 @@ package main import ( "fmt" - "log" "os" "os/signal" "syscall" @@ -10,6 +9,14 @@ import ( rabbitmq "github.com/kelchy/go-lib/rmq/consumer" ) +/** + * This example demonstrates how to create a consumer with retry and dead letter queue + * The consumer will consume messages from the queue "q_events" with routing key "events.basic" + * If the message is Nacked, it will be sent to the dead letter queue "q_event_retry" + * The dead letter queue will retry the message 3 times with a delay of 10 seconds between each retry + * If the message is Nacked after the 3rd retry, it will be handled by the dead message handler + */ + func main() { conn, err := rabbitmq.NewConn( os.Getenv("RMQ_URI"), @@ -21,7 +28,7 @@ func main() { } defer conn.Close() - // Verify that exchange exists (Not needed if it is declared in NewConsumer) + // Verify that exchange exists if err := rabbitmq.DeclareExchange( conn, rabbitmq.ExchangeOptions{ @@ -39,117 +46,51 @@ func main() { return } - // Verify that queue exists (Not needed if it is declared in NewConsumer) - if err := rabbitmq.DeclareQueue( + // Consumer options can be changed according to your needs + rabbitmq.NewConsumer( conn, - rabbitmq.QueueOptions{ - Name: "my_queue", - Declare: true, - Durable: true, - AutoDelete: false, - Exclusive: false, - NoWait: false, - Args: map[string]interface{}{ + eventHandler, + deadMessageHandler, + "q_events", + rabbitmq.WithConsumerOptionsExchangeName("events"), + rabbitmq.WithConsumerOptionsRoutingKey("events.basic"), + rabbitmq.WithConsumerOptionsConsumerDlxRetry, + rabbitmq.WithConsumerOptionsConsumerRetryLimit(3), + rabbitmq.WithConsumerOptionsConsumerAutoAck(false), + rabbitmq.WithConsumerOptionsQueueDurable, + rabbitmq.WithConsumerOptionsQueueArgs( + map[string]interface{}{ "x-dead-letter-exchange": "events", - "x-dead-letter-routing-key": "test_routing_key_dlk", + "x-dead-letter-routing-key": "retry.events.basic", }, - }, - ); err != nil { - fmt.Println(err) - return - } - - // Verify that queue is bound to exchange (Not needed binding is declared in NewConsumer) - if err := rabbitmq.DeclareBinding( - conn, - rabbitmq.BindingDeclareOptions{ - QueueName: "my_queue", - ExchangeName: "events", - RoutingKey: "test_routing_key", - NoWait: false, - Args: nil, - Declare: true, - }); err != nil { - fmt.Println(err) - return - } + ), + ) - if err := rabbitmq.DeclareQueue( + // Declares the DLQ and binds it back to the original queue + rabbitmq.DeclareQueue( conn, rabbitmq.QueueOptions{ - Name: "my_queue_dlx", - Declare: true, + Name: "q_event_retry", Durable: true, AutoDelete: false, Exclusive: false, NoWait: false, Args: map[string]interface{}{ - "x-dead-letter-exchange": "events", - "x-dead-letter-routing-key": "test_routing_key", "x-message-ttl": 10000, + "x-dead-letter-exchange": "events", + "x-dead-letter-routing-key": "events.basic", }, + Declare: true, }, - ); err != nil { - fmt.Println(err) - return - } - - // Verify that queue is bound to exchange (Not needed binding is declared in NewConsumer) - if err := rabbitmq.DeclareBinding( + ) + rabbitmq.DeclareBinding( conn, rabbitmq.BindingDeclareOptions{ - QueueName: "my_queue_dlx", + QueueName: "q_event_retry", ExchangeName: "events", - RoutingKey: "test_routing_key_dlk", - NoWait: false, - Args: nil, - Declare: true, - }); err != nil { - fmt.Println(err) - return - } - - consumer, err := rabbitmq.NewConsumer( - conn, - eventHandler, - deadMessageHandler, - "my_queue", - rabbitmq.WithConsumerOptionsConcurrency(2), - rabbitmq.WithConsumerOptionsRoutingKey("test_routing_key"), - rabbitmq.WithConsumerOptionsExchangeName("events"), - rabbitmq.WithConsumerOptionsQueueArgs(map[string]interface{}{ - "x-dead-letter-exchange": "events", - "x-dead-letter-routing-key": "test_routing_key_dlk", - }), - rabbitmq.WithConsumerOptionsConsumerRetryLimit(3), - rabbitmq.WithConsumerOptionsConsumerDlxRetry, - rabbitmq.WithConsumerOptionsQueueDurable, - ) - if err != nil { - fmt.Println(err) - } - defer consumer.Close() - - consumer2, err := rabbitmq.NewConsumer( - conn, - eventHandler, - deadMessageHandler, - "my_queue", - rabbitmq.WithConsumerOptionsConcurrency(2), - rabbitmq.WithConsumerOptionsRoutingKey("test_routing_key"), - rabbitmq.WithConsumerOptionsExchangeName("events"), - rabbitmq.WithConsumerOptionsQueueArgs(map[string]interface{}{ - "x-dead-letter-exchange": "events", - "x-dead-letter-routing-key": "test_routing_key_dlk", - }), - rabbitmq.WithConsumerOptionsConsumerRetryLimit(3), - rabbitmq.WithConsumerOptionsConsumerDlxRetry, - rabbitmq.WithConsumerOptionsQueueDurable, + RoutingKey: "retry.events.basic", + }, ) - if err != nil { - log.Fatal(err) - } - defer consumer2.Close() // block main thread - wait for shutdown signal sigs := make(chan os.Signal, 1) @@ -171,18 +112,29 @@ func main() { func eventHandler(d rabbitmq.Delivery) error { fmt.Printf("consumed: %s, %v \n", string(d.MessageId), string(d.Body)) - return fmt.Errorf("TEST_ERR") - err := d.Ack(false) - if err != nil { + return fmt.Errorf("nack") + if err := d.Ack(false); err != nil { return err } return nil } +// Alternatively can use a wrapper for the event handler so you will not need to call ack manually within each different consumer +func eventHandlerWrapper(f rabbitmq.EventHandler) func(msg rabbitmq.Delivery) error { + return func(msg rabbitmq.Delivery) error { + if err := f(msg); err != nil { + return err + } + if err := msg.Ack(false); err != nil { + return err + } + return nil + } +} + func deadMessageHandler(d rabbitmq.Delivery) error { fmt.Printf("Dead Message Received: %s, %v \n", string(d.MessageId), string(d.Body)) - err := d.Ack(false) - if err != nil { + if err := d.Ack(false); err != nil { return err } return nil diff --git a/rmq/consumer/go.mod b/rmq/consumer/go.mod index 8505abd..5c995eb 100644 --- a/rmq/consumer/go.mod +++ b/rmq/consumer/go.mod @@ -1,4 +1,4 @@ -//v0.1.0 +//v0.1.1 module github.com/kelchy/go-lib/rmq/consumer go 1.19 diff --git a/rmq/consumer/internal/logger/logger.go b/rmq/consumer/internal/logger/logger.go index 8a462c4..d87ca31 100644 --- a/rmq/consumer/internal/logger/logger.go +++ b/rmq/consumer/internal/logger/logger.go @@ -10,4 +10,4 @@ type Logger interface { } // DefaultLogger is the default logger used by the library. -var DefaultLogger, _ = log.New("standard") +var DefaultLogger, _ = log.New("erroronly")