From 0bbad8676b2edbb5190ff698579106e5e2519928 Mon Sep 17 00:00:00 2001 From: Xuan Hao Date: Tue, 14 Mar 2023 10:25:40 +0800 Subject: [PATCH 1/3] added functions for verifying the presences of queues and exchanges --- rmq/consumer/Makefile | 1 - rmq/consumer/config.go | 24 ++++++++++++++++++++++++ rmq/consumer/connector.go | 31 ++++++++++++++++++++++++++++++- rmq/consumer/exchange.go | 9 +++++++++ rmq/consumer/go.mod | 2 +- rmq/consumer/queue.go | 9 +++++++++ 6 files changed, 73 insertions(+), 3 deletions(-) create mode 100644 rmq/consumer/exchange.go create mode 100644 rmq/consumer/queue.go diff --git a/rmq/consumer/Makefile b/rmq/consumer/Makefile index ac2ad98..2f71f77 100644 --- a/rmq/consumer/Makefile +++ b/rmq/consumer/Makefile @@ -1,7 +1,6 @@ SHELL := /bin/bash .PHONY: - # this will install binary in ${GOPATH} $(GOPATH)/bin/golint: go install golang.org/x/lint/golint@v0.0.0-20201208152925-83fdc39ff7b5 diff --git a/rmq/consumer/config.go b/rmq/consumer/config.go index b01d6a4..c89837e 100644 --- a/rmq/consumer/config.go +++ b/rmq/consumer/config.go @@ -60,6 +60,30 @@ func DefaultConfig(name string) Config { } } +// ExchangeConfig is the configuration for exchange creation. +type ExchangeConfig struct { + Name string `json:"name" mapstructure:"name"` + Kind string `json:"kind" mapstructure:"kind"` + Durable bool `json:"durable" mapstructure:"durable"` + AutoDelete bool `json:"auto_delete" mapstructure:"auto_delete"` + Internal bool `json:"internal" mapstructure:"internal"` + NoWait bool `json:"no_wait" mapstructure:"no_wait"` + Args map[string]interface{} +} + +// DefaultExchangeConfig returns a default exchange configuration. +func DefaultExchangeConfig(name string, kind string) ExchangeConfig { + return ExchangeConfig{ + Name: name, + Kind: kind, + Durable: true, + AutoDelete: false, + Internal: false, + NoWait: false, + Args: nil, + } +} + // QueueConfig is the configuration for queue creation. type QueueConfig struct { Name string `json:"name" mapstructure:"name"` diff --git a/rmq/consumer/connector.go b/rmq/consumer/connector.go index 246e4ef..e832c80 100644 --- a/rmq/consumer/connector.go +++ b/rmq/consumer/connector.go @@ -30,7 +30,7 @@ type ILogger interface { Error(key string, err error) } -// New creates a new consumer +// New creates a new consumer, should use this by default func New(connConfig ConnectionConfig, queueConfig QueueConfig, queueBindConfig QueueBindConfig, consumerConfig Config, msgRetryConfig MessageRetryConfig, processor IClientHandler, logger ILogger) error { // Set up connection to RabbitMQ c := Consumer{ @@ -82,6 +82,35 @@ func New(connConfig ConnectionConfig, queueConfig QueueConfig, queueBindConfig Q return nil } +// NewConnection creates a new connection to RabbitMQ. For use when you want to initialize queues and exchanges or verify that they are present +func NewConnection(connConfig ConnectionConfig, logger ILogger) (*amqp.Connection, error) { + attempts := 0 + for attempts <= connConfig.ReconnectMaxAttempt { + logger.Out("RMQ-CONSUMER", "Connecting to RabbitMQ") + // Make a connection to RMQ + conn, err := amqp.Dial(connConfig.ConnURIs[0]) + if err != nil { + logger.Error("ERR_RMQ-CONSUMER_FAIL-CONNECT", err) + time.Sleep(connConfig.ReconnectInterval) + // Wait before retrying + continue + } + logger.Out("RMQ-CONSUMER", "Connected to RabbitMQ") + return conn, nil + } + return nil, fmt.Errorf("failed to connect to RabbitMQ after %d attempts", connConfig.ReconnectMaxAttempt) +} + +// OpenChannel opens a new channel on the connection +func OpenChannel(conn *amqp.Connection, logger ILogger) (*amqp.Channel, error) { + amqpchan, err := conn.Channel() + if err != nil { + logger.Error("ERR_RMQ-CONSUMER_FAIL-OPEN-CHANNEL", err) + return nil, err + } + return amqpchan, nil +} + func (c *Consumer) connect(connConfig ConnectionConfig) error { attempts := 0 for attempts <= connConfig.ReconnectMaxAttempt { diff --git a/rmq/consumer/exchange.go b/rmq/consumer/exchange.go new file mode 100644 index 0000000..5668adc --- /dev/null +++ b/rmq/consumer/exchange.go @@ -0,0 +1,9 @@ +package consumer + +import "github.com/streadway/amqp" + +// NewExchange creates a new exchange or ensures an exchange exists that matches the provided configuration +func NewExchange(amqpChannel *amqp.Channel, exchangeConfig ExchangeConfig) error { + err := amqpChannel.ExchangeDeclare(exchangeConfig.Name, exchangeConfig.Kind, exchangeConfig.Durable, exchangeConfig.AutoDelete, exchangeConfig.Internal, exchangeConfig.NoWait, exchangeConfig.Args) + return err +} diff --git a/rmq/consumer/go.mod b/rmq/consumer/go.mod index bdf01fe..b269342 100644 --- a/rmq/consumer/go.mod +++ b/rmq/consumer/go.mod @@ -1,4 +1,4 @@ -//v0.0.6 +//v0.0.7 module github.com/kelchy/go-lib/rmq/consumer go 1.19 diff --git a/rmq/consumer/queue.go b/rmq/consumer/queue.go new file mode 100644 index 0000000..7b24940 --- /dev/null +++ b/rmq/consumer/queue.go @@ -0,0 +1,9 @@ +package consumer + +import "github.com/streadway/amqp" + +// NewQueue declares a new queue or ensures a queue exists that matches the provided configuration +func NewQueue(amqpChannel *amqp.Channel, queueConfig QueueConfig) (amqp.Queue, error) { + q, err := amqpChannel.QueueDeclare(queueConfig.Name, queueConfig.Durable, queueConfig.AutoDelete, queueConfig.Exclusive, queueConfig.NoWait, queueConfig.Args) + return q, err +} From fd50c44307438751f01bc5a8ebafcd49bbab326d Mon Sep 17 00:00:00 2001 From: Xuan Hao Date: Tue, 14 Mar 2023 11:14:42 +0800 Subject: [PATCH 2/3] added example --- rmq/consumer/example/consumer.example.go | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/rmq/consumer/example/consumer.example.go b/rmq/consumer/example/consumer.example.go index 4a2493f..df993cd 100644 --- a/rmq/consumer/example/consumer.example.go +++ b/rmq/consumer/example/consumer.example.go @@ -24,6 +24,27 @@ func main() { fmt.Println("failed to create consumer: ", err) } + // If you want to verify the presence of queue and exchanges + // Not required if consumer is init using New() as above + conn, _ := consumer.NewConnection(consumer.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), consumer.DefaultLogger()) + connChan, _ := conn.Channel() + exDeclareErr := consumer.NewExchange(connChan, consumer.DefaultExchangeConfig("test-exchange", "direct")) + _, queueDeclareErr := consumer.NewQueue(connChan, consumer.QueueConfig{ + Name: "test-queue-logging", + Durable: true, + AutoDelete: true, + Exclusive: false, + NoWait: false, + Args: nil, + }) + if exDeclareErr != nil { + fmt.Println("failed to declare exchange: ", exDeclareErr) + return + } + if queueDeclareErr != nil { + fmt.Println("failed to declare queue: ", queueDeclareErr) + return + } // Leave the consumer running for 30 seconds before exiting, only for example purposes time.Sleep(30 * time.Second) } From 25a6f80bc42d0bd72745ea960db4d9361e851f61 Mon Sep 17 00:00:00 2001 From: Xuan Hao Date: Thu, 16 Mar 2023 11:00:31 +0800 Subject: [PATCH 3/3] abstracted constants --- rmq/consumer/config.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/rmq/consumer/config.go b/rmq/consumer/config.go index c89837e..a104a37 100644 --- a/rmq/consumer/config.go +++ b/rmq/consumer/config.go @@ -7,6 +7,13 @@ import ( "github.com/kelchy/go-lib/log" ) +// Default constants for initialising default configs +const defaultReconnectInterval = 5 * time.Second +const defaultReconnectMaxAttempt = 3 +const defaultPrefetchCount = 1 +const defaultPrefetchSize = 0 +const defaultRetryCountLimit = 2 + // ConnectionConfig is the configuration for connection creation. type ConnectionConfig struct { // ConnURIs: list of connection URIs. @@ -21,8 +28,8 @@ type ConnectionConfig struct { func DefaultConnectionConfig(connURIs []string) ConnectionConfig { return ConnectionConfig{ ConnURIs: connURIs, - ReconnectInterval: 5 * time.Second, - ReconnectMaxAttempt: 3, + ReconnectInterval: defaultReconnectInterval, + ReconnectMaxAttempt: defaultReconnectMaxAttempt, } } @@ -54,8 +61,8 @@ func DefaultConfig(name string) Config { NoWait: false, Args: nil, EnabledPrefetch: true, - PrefetchCount: 1, - PrefetchSize: 0, + PrefetchCount: defaultPrefetchCount, + PrefetchSize: defaultPrefetchSize, Global: false, } } @@ -154,6 +161,6 @@ func DefaultMessageRetryConfig() MessageRetryConfig { return MessageRetryConfig{ Enabled: true, HandleDeadMessage: true, - RetryCountLimit: 2, + RetryCountLimit: defaultRetryCountLimit, } }