diff --git a/Makefile b/Makefile index 6023b93..e96b588 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ SHELL := /bin/bash .PHONY: -DIR = log redis +DIR = log redis http test-%: $(MAKE) GOPATH=$${PWD} test -C $* SUB=${SUB} diff --git a/http/Makefile b/http/Makefile index 7d522ba..30e7bf2 100644 --- a/http/Makefile +++ b/http/Makefile @@ -2,7 +2,17 @@ SHELL := /bin/bash .PHONY: -test: +# this will install binary in ${GOPATH} +$(GOPATH)/bin/golint: + go install golang.org/x/lint/golint@v0.0.0-20201208152925-83fdc39ff7b5 + +lint: $(GOPATH)/bin/golint + ${GOPATH}/bin/golint -set_exit_status ./... + +test: lint + $(MAKE) test -C client + $(MAKE) test -C server + ifndef ${SUB} @echo "SUB not defined defaulting to client" $(MAKE) test -C client diff --git a/http/server/server_test.go b/http/server/server_test.go new file mode 100644 index 0000000..9cb0784 --- /dev/null +++ b/http/server/server_test.go @@ -0,0 +1,81 @@ +package server + +import ( + "github.com/kelchy/go-lib/log" + "net/http" + "net/http/httptest" + "testing" +) + +func TestNew(t *testing.T) { + // Test case with empty origins and headers + router, err := New(nil, nil) + if err != nil { + t.Fatalf("Error creating router: %v", err) + } + if router == nil { + t.Fatal("Router is nil") + } + + // Test case with invalid log configuration + _, err = log.New("invalid config") + if err == nil { + t.Fatal("Expected an error creating logger") + } + + // happy flow + origins := []string{"http://localhost"} + headers := []string{"Accept", "Content-Type"} + + router, err = New(origins, headers) + if err != nil { + t.Fatalf("Error creating router: %v", err) + } + + if router == nil { + t.Fatal("Router is nil") + } + + // Test that the router has the expected fields + if router.log == (log.Log{}) { + t.Fatal("Log is nil/invalid") + } + if router.logRequest != true { + t.Fatal("LogRequest should be true by default") + } + if len(router.logSkipPath) != 1 || router.logSkipPath[0] != "/" { + t.Fatal("LogSkipPath should have the default value") + } + if router.Engine == nil { + t.Fatal("Engine is nil") + } + + // Test that the router middleware has been set correctly + req, err := http.NewRequest("GET", "/", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() + router.Engine.ServeHTTP(resp, req) + if resp.Code != http.StatusNotFound { + t.Fatal("catchall middleware should return 404") + } + + // Test case with empty origins and default headers + router, err = New(nil, []string{}) + if err != nil { + t.Fatalf("Error creating router: %v", err) + } + if router == nil { + t.Fatal("Router is nil") + } + + // Test case with default origins and empty headers + router, err = New([]string{}, nil) + if err != nil { + t.Fatalf("Error creating router: %v", err) + } + if router == nil { + t.Fatal("Router is nil") + } +} diff --git a/redis/example/redis_example.go b/redis/example/redis_example.go index a3328d4..1c250a7 100644 --- a/redis/example/redis_example.go +++ b/redis/example/redis_example.go @@ -3,22 +3,23 @@ package main import ( "context" "fmt" - "time" - "github.com/kelchy/go-lib/redis" + "os" + "time" ) func main() { - uri := "" + uri := os.Getenv("REDIS_URI") // path to cert files - clientCertPath := "" - clientKeyPath := `` +// clientCertPath := "" +// clientKeyPath := `` // use redis.New if TLS connection is not required // skipVerifyCondition should only be true when running locally - redisclient, e := redis.NewSecure(uri, clientCertPath, clientKeyPath, false) +// redisclient, e := redis.NewSecure(uri, clientCertPath, clientKeyPath, false) + redisclient, e := redis.New(uri) if e != nil { fmt.Println(e) return diff --git a/rmq/README.md b/rmq/README.md index 0b10d4f..048e772 100644 --- a/rmq/README.md +++ b/rmq/README.md @@ -4,12 +4,15 @@ This is a wrapper to provide functions for easily and safely interacting with Ra Functionlity Supported: -| Functionality | Supported? | -| ----------- | ----------- | -| Publisher | ✅ | -| Consumer | ✅ | -| Dead Letter Queue | ✅ | -| Immediate & Mandatory Options | ❌ | +| Functionality | Supported? | +| ------------------------------- | ---------- | +| Publisher | ✅ | +| Consumer | ✅ | +| Dead Letter Queue | ✅ | +| Auto Reconnect upon disconnects | ✅ | +| Immediate & Mandatory Options | ✅ | +| Message retry count | ✅ | ## Quickstart -Refer to the respective publisher and consumer packages for how to use the library \ No newline at end of file + +Refer to the respective publisher and consumer packages for how to use the library diff --git a/rmq/consumer/LICENSE b/rmq/consumer/LICENSE new file mode 100644 index 0000000..e1ffb92 --- /dev/null +++ b/rmq/consumer/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Lane Wagner + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/rmq/consumer/Makefile b/rmq/consumer/Makefile index 2f71f77..ac2ad98 100644 --- a/rmq/consumer/Makefile +++ b/rmq/consumer/Makefile @@ -1,6 +1,7 @@ SHELL := /bin/bash .PHONY: + # this will install binary in ${GOPATH} $(GOPATH)/bin/golint: go install golang.org/x/lint/golint@v0.0.0-20201208152925-83fdc39ff7b5 diff --git a/rmq/consumer/README.md b/rmq/consumer/README.md index 8c1876d..0dd7ac3 100644 --- a/rmq/consumer/README.md +++ b/rmq/consumer/README.md @@ -1,54 +1,9 @@ # RMQ Consumer -This library is a wrapper around RMQ functions to make interaction with RMQ simpler and safer +This library is a wrapper around RMQ functions to make interaction with RMQ simpler and safer. Functionality is tested with hosted CloudAMQP, which offers automated failover using connection with a single URI. -## Quickstart - -Refer to the below code snippet for how to set up a consumer called `test-consumer` consuming events from a queue `test-queue` bound to an exchange `test-exchange` on a binding key `test-routing-key`. -``` -package main - -import ( - "context" - "fmt" - "os" - "time" - - "github.com/kelchy/go-lib/rmq/consumer" -) - -func main() { - eventProcessor := &EventProcessor{} - err := consumer.New( - consumer.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), - // Queue names should be the same in QueueConfig and QueueBindConfig - consumer.DefaultQueueConfig("test-queue-logging"), - consumer.DefaultQueueBindConfig("test-exchange", "test-queue-logging", "test-routing-key"), - consumer.DefaultConfig("test-consumer"), - consumer.DefaultMessageRetryConfig(), - eventProcessor, - consumer.DefaultLogger()) - if err != nil { - fmt.Println("failed to create consumer: ", err) - } +Adapted from [go-rabbitmq](https://github.com/wagslane/go-rabbitmq) - // Leave the consumer running for 30 seconds before exiting, only for example purposes - time.Sleep(30 * time.Second) -} - -// Declare what to do when messages come in on the queue here -type EventProcessor struct{} - -func (*EventProcessor) ProcessEvent(ctx context.Context, message consumer.IMessage) error { - fmt.Printf("Recieved message: ID: %s, Message: %s\n", message.GetID(), string(message.Body())) - // Fill in what to do with the consumed messages here - return nil -} - -func (*EventProcessor) ProcessDeadMessage(ctx context.Context, message consumer.IMessage, err error) error { - fmt.Printf("Recieved dead message: ID: %s, Message: %s, Error: %v", message.GetID(), string(message.Body()), err) - // Fill in what to do with the dead messages here - return nil -} +## Quickstart -``` \ No newline at end of file +Refer to `example/consumer.example.go` to see how to setup your own consumer diff --git a/rmq/consumer/config.go b/rmq/consumer/config.go deleted file mode 100644 index a104a37..0000000 --- a/rmq/consumer/config.go +++ /dev/null @@ -1,166 +0,0 @@ -package consumer - -import ( - "fmt" - "time" - - "github.com/kelchy/go-lib/log" -) - -// Default constants for initialising default configs -const defaultReconnectInterval = 5 * time.Second -const defaultReconnectMaxAttempt = 3 -const defaultPrefetchCount = 1 -const defaultPrefetchSize = 0 -const defaultRetryCountLimit = 2 - -// ConnectionConfig is the configuration for connection creation. -type ConnectionConfig struct { - // ConnURIs: list of connection URIs. - ConnURIs []string `json:"conn_uris" mapstructure:"conn_uris"` - // ReconnectInterval: interval between reconnect attempts. - ReconnectInterval time.Duration `json:"reconnect_interval" mapstructure:"reconnect_interval"` - // ReconnectMaxAttempt: max number of reconnect attempts. - ReconnectMaxAttempt int `json:"reconnect_max_attempt" mapstructure:"reconnect_max_attempt"` -} - -// DefaultConnectionConfig returns a default connection configuration. -func DefaultConnectionConfig(connURIs []string) ConnectionConfig { - return ConnectionConfig{ - ConnURIs: connURIs, - ReconnectInterval: defaultReconnectInterval, - ReconnectMaxAttempt: defaultReconnectMaxAttempt, - } -} - -// Config is the configuration for consumer creation. -type Config struct { - Enabled bool `json:"enabled" mapstructure:"enabled"` - Name string `json:"name" mapstructure:"name"` - AutoAck bool `json:"auto_ack" mapstructure:"auto_ack"` - Exclusive bool `json:"exclusive" mapstructure:"exclusive"` - NoLocal bool `json:"no_local" mapstructure:"no_local"` - NoWait bool `json:"no_wait" mapstructure:"no_wait"` - Args map[string]interface{} `json:"args" mapstructure:"args"` - - // Fair dispatch - EnabledPrefetch bool `json:"enabled_prefetch" mapstructure:"enabled_prefetch"` - PrefetchCount int `json:"prefetch_count" mapstructure:"prefetch_count"` - PrefetchSize int `json:"prefetch_size" mapstructure:"prefetch_size"` - Global bool `json:"global" mapstructure:"global"` -} - -// DefaultConfig returns a default consumer configuration. -func DefaultConfig(name string) Config { - return Config{ - Enabled: true, - Name: name, - AutoAck: true, - Exclusive: false, - NoLocal: false, - NoWait: false, - Args: nil, - EnabledPrefetch: true, - PrefetchCount: defaultPrefetchCount, - PrefetchSize: defaultPrefetchSize, - Global: false, - } -} - -// ExchangeConfig is the configuration for exchange creation. -type ExchangeConfig struct { - Name string `json:"name" mapstructure:"name"` - Kind string `json:"kind" mapstructure:"kind"` - Durable bool `json:"durable" mapstructure:"durable"` - AutoDelete bool `json:"auto_delete" mapstructure:"auto_delete"` - Internal bool `json:"internal" mapstructure:"internal"` - NoWait bool `json:"no_wait" mapstructure:"no_wait"` - Args map[string]interface{} -} - -// DefaultExchangeConfig returns a default exchange configuration. -func DefaultExchangeConfig(name string, kind string) ExchangeConfig { - return ExchangeConfig{ - Name: name, - Kind: kind, - Durable: true, - AutoDelete: false, - Internal: false, - NoWait: false, - Args: nil, - } -} - -// QueueConfig is the configuration for queue creation. -type QueueConfig struct { - Name string `json:"name" mapstructure:"name"` - // Durable: if true, the queue will survive broker restarts. - Durable bool `json:"durable" mapstructure:"durable"` - // AutoDelete: if true, the queue will be deleted when the last consumer unsubscribes. - AutoDelete bool `json:"auto_delete" mapstructure:"auto_delete"` - // Exclusive: if true, only accessible by the connection that declares it. - Exclusive bool `json:"exclusive" mapstructure:"exclusive"` - // NoWait: if true, the server will not respond to the method. - NoWait bool `json:"no_wait" mapstructure:"no_wait"` - // Additional Arguments - Args map[string]interface{} `json:"args" mapstructure:"args"` -} - -// DefaultQueueConfig returns a default queue configuration. -func DefaultQueueConfig(name string) QueueConfig { - return QueueConfig{ - Name: name, - Durable: true, - AutoDelete: true, - Exclusive: false, - NoWait: false, - Args: nil, - } -} - -// QueueBindConfig is the configuration for queue binding to an exchange with a routing key. -type QueueBindConfig struct { - Queue string `json:"queue" mapstructure:"queue"` - Exchange string `json:"exchange" mapstructure:"exchange"` - // Key which the queue possesses. Routing Key (on message) will be compared to the Binding Key (on route) to decide if message is to be routed - BindingKey string `json:"binding_key" mapstructure:"binding_key"` - NoWait bool `json:"no_wait" mapstructure:"no_wait"` - Args map[string]interface{} `json:"args" mapstructure:"args"` -} - -// DefaultQueueBindConfig returns a default queue binding configuration. -func DefaultQueueBindConfig(exchange string, queue string, bindingKey string) QueueBindConfig { - return QueueBindConfig{ - Queue: queue, - Exchange: exchange, - BindingKey: bindingKey, - NoWait: false, - Args: nil, - } -} - -// DefaultLogger generates a logger to be used with the consumer. -func DefaultLogger() ILogger { - logger, err := log.New("standard") - if err != nil { - fmt.Println("failed to create logger: ", err) - } - return logger -} - -// MessageRetryConfig is the configuration for message retry. -type MessageRetryConfig struct { - // retry - Enabled bool `json:"enabled" mapstructure:"enabled"` - HandleDeadMessage bool `json:"handle_dead_message" mapstructure:"handle_dead_message"` - RetryCountLimit int `json:"retry_count_limit" mapstructure:"retry_count_limit"` -} - -// DefaultMessageRetryConfig returns a default message retry configuration. -func DefaultMessageRetryConfig() MessageRetryConfig { - return MessageRetryConfig{ - Enabled: true, - HandleDeadMessage: true, - RetryCountLimit: defaultRetryCountLimit, - } -} diff --git a/rmq/consumer/connection.go b/rmq/consumer/connection.go new file mode 100644 index 0000000..06fe1c5 --- /dev/null +++ b/rmq/consumer/connection.go @@ -0,0 +1,63 @@ +package rabbitmq + +import ( + "fmt" + + "github.com/kelchy/go-lib/rmq/consumer/internal/connectionmanager" + amqp "github.com/rabbitmq/amqp091-go" +) + +// Conn manages the connection to a rabbit cluster +// it is intended to be shared across publishers and consumers +type Conn struct { + connectionManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + + options ConnectionOptions +} + +// Config wraps amqp.Config +// Config is used in DialConfig and Open to specify the desired tuning +// parameters used during a connection open handshake. The negotiated tuning +// will be stored in the returned connection's Config field. +type Config amqp.Config + +// NewConn creates a new connection manager +func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) { + defaultOptions := getDefaultConnectionOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := manager.NotifyReconnect() + conn := &Conn{ + connectionManager: manager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + options: *options, + } + + go conn.handleRestarts() + return conn, nil +} + +func (conn *Conn) handleRestarts() { + for err := range conn.reconnectErrCh { + conn.options.Logger.Out("OK_RMQ-CONNECTION_HANDLE-RESTART", fmt.Sprintf("successful connection recovery from: %v", err)) + } +} + +// Close closes the connection, it's not safe for re-use. +// You should also close any consumers and publishers before +// closing the connection +func (conn *Conn) Close() error { + conn.closeConnectionToManagerCh <- struct{}{} + return conn.connectionManager.Close() +} diff --git a/rmq/consumer/connection_options.go b/rmq/consumer/connection_options.go new file mode 100644 index 0000000..379df84 --- /dev/null +++ b/rmq/consumer/connection_options.go @@ -0,0 +1,53 @@ +package rabbitmq + +import ( + "time" + + "github.com/kelchy/go-lib/rmq/consumer/internal/logger" +) + +const defaultReconnectInterval = time.Second * 5 + +// ConnectionOptions are used to describe how a new consumer will be created. +type ConnectionOptions struct { + ReconnectInterval time.Duration + Logger logger.Logger + Config Config +} + +// getDefaultConnectionOptions describes the options that will be used when a value isn't provided +func getDefaultConnectionOptions() ConnectionOptions { + return ConnectionOptions{ + ReconnectInterval: defaultReconnectInterval, + Logger: logger.DefaultLogger, + Config: Config{}, + } +} + +// WithConnectionOptionsReconnectInterval sets the reconnection interval +func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.ReconnectInterval = interval + } +} + +// WithConnectionOptionsLogging sets logging to true on the consumer options +// and sets the +func WithConnectionOptionsLogging(options *ConnectionOptions) { + options.Logger = logger.DefaultLogger +} + +// WithConnectionOptionsLogger sets logging to true on the consumer options +// and sets the +func WithConnectionOptionsLogger(log logger.Logger) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.Logger = log + } +} + +// WithConnectionOptionsConfig sets the Config used in the connection +func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.Config = cfg + } +} diff --git a/rmq/consumer/connectionpool.go b/rmq/consumer/connectionpool.go deleted file mode 100644 index 8012d3c..0000000 --- a/rmq/consumer/connectionpool.go +++ /dev/null @@ -1,43 +0,0 @@ -package consumer - -import ( - "github.com/streadway/amqp" -) - -// IConnectionPool interface -type IConnectionPool interface { - getCon() (*amqp.Connection, error) -} - -type connectionPool struct { - uris []string - currentURIIndex int -} - -func newConnectionPool(uris ...string) IConnectionPool { - return &connectionPool{ - currentURIIndex: 0, - uris: uris, - } -} - -func (connPool *connectionPool) nextURI() (uri string) { - if connPool.currentURIIndex == len(connPool.uris)-1 { - uri = connPool.uris[connPool.currentURIIndex] - connPool.currentURIIndex = 0 - return - } - uri = connPool.uris[connPool.currentURIIndex] - connPool.currentURIIndex++ - return -} - -func (connPool *connectionPool) getCon() (*amqp.Connection, error) { - var err error - uri := connPool.nextURI() - con, err := amqp.Dial(uri) - if err != nil { - return nil, err - } - return con, err -} diff --git a/rmq/consumer/connector.go b/rmq/consumer/connector.go deleted file mode 100644 index e832c80..0000000 --- a/rmq/consumer/connector.go +++ /dev/null @@ -1,167 +0,0 @@ -package consumer - -import ( - "context" - "fmt" - "time" - - "github.com/streadway/amqp" -) - -// Consumer is the struct that contains the consumer configuration -type Consumer struct { - consumerChan *amqp.Channel - consumerConfig Config - queueConfig QueueConfig - queueBindConfig QueueBindConfig - messageRetryConfig MessageRetryConfig - conn *amqp.Connection - connPool IConnectionPool - errorConn chan *amqp.Error - errorConsumerChan chan *amqp.Error - handler IEventHandler - logger ILogger -} - -// ILogger is the interface for the logger required by the package -type ILogger interface { - Debug(key string, message string) - Out(key string, message string) - Error(key string, err error) -} - -// New creates a new consumer, should use this by default -func New(connConfig ConnectionConfig, queueConfig QueueConfig, queueBindConfig QueueBindConfig, consumerConfig Config, msgRetryConfig MessageRetryConfig, processor IClientHandler, logger ILogger) error { - // Set up connection to RabbitMQ - c := Consumer{ - queueConfig: queueConfig, - queueBindConfig: queueBindConfig, - consumerConfig: consumerConfig, - messageRetryConfig: msgRetryConfig, - logger: logger, - } - c.connPool = newConnectionPool(connConfig.ConnURIs...) - c.connect(connConfig) - go c.listenOnChanClose() - - // Creates a new queue if it does not exist - conQueue, qDeclareErr := c.consumerChan.QueueDeclare(c.queueConfig.Name, c.queueConfig.Durable, c.queueConfig.AutoDelete, c.queueConfig.Exclusive, c.queueConfig.NoWait, c.queueConfig.Args) - if qDeclareErr != nil { - logger.Error("ERR_RMQ-CONSUMER_FAIL-DECLARE-QUEUE", qDeclareErr) - return qDeclareErr - } - // Binds the queue to the exchange - qBindErr := c.consumerChan.QueueBind(c.queueBindConfig.Queue, c.queueBindConfig.BindingKey, c.queueBindConfig.Exchange, c.queueBindConfig.NoWait, c.queueBindConfig.Args) - if qBindErr != nil { - logger.Error("ERR_RMQ-CONSUMER_FAIL-BIND-QUEUE", qBindErr) - return qBindErr - } - // Creates a new event handler - c.handler = NewEventHandler(processor, c.logger, &c.messageRetryConfig) - // Creates a consumer on the queue - if c.consumerConfig.EnabledPrefetch { - err := c.consumerChan.Qos(c.consumerConfig.PrefetchCount, c.consumerConfig.PrefetchSize, c.consumerConfig.Global) - if err != nil { - c.logger.Error("ERR_CONSUMER-FAILED-PREFETCH-ADD", err) - return err - } - } - msgs, err := c.consumerChan.Consume(c.queueBindConfig.Queue, c.consumerConfig.Name, c.consumerConfig.AutoAck, - c.consumerConfig.Exclusive, c.consumerConfig.NoLocal, c.consumerConfig.NoWait, c.consumerConfig.Args) - if err != nil { - c.logger.Error(fmt.Sprintf("ERR_CONSUMER-FAILED-CONSUMER-REGISTER-%s", c.consumerConfig.Name), err) - return err - } - // Starts consuming messages - go func(msgs <-chan amqp.Delivery) { - for msg := range msgs { - c.handler.HandleEvent(context.TODO(), NewMessage(msg)) - } - }(msgs) - c.logger.Out("RMQ-CONSUMER", fmt.Sprintf("Consumer %s is listening on queue %s", c.consumerConfig.Name, conQueue.Name)) - return nil -} - -// NewConnection creates a new connection to RabbitMQ. For use when you want to initialize queues and exchanges or verify that they are present -func NewConnection(connConfig ConnectionConfig, logger ILogger) (*amqp.Connection, error) { - attempts := 0 - for attempts <= connConfig.ReconnectMaxAttempt { - logger.Out("RMQ-CONSUMER", "Connecting to RabbitMQ") - // Make a connection to RMQ - conn, err := amqp.Dial(connConfig.ConnURIs[0]) - if err != nil { - logger.Error("ERR_RMQ-CONSUMER_FAIL-CONNECT", err) - time.Sleep(connConfig.ReconnectInterval) - // Wait before retrying - continue - } - logger.Out("RMQ-CONSUMER", "Connected to RabbitMQ") - return conn, nil - } - return nil, fmt.Errorf("failed to connect to RabbitMQ after %d attempts", connConfig.ReconnectMaxAttempt) -} - -// OpenChannel opens a new channel on the connection -func OpenChannel(conn *amqp.Connection, logger ILogger) (*amqp.Channel, error) { - amqpchan, err := conn.Channel() - if err != nil { - logger.Error("ERR_RMQ-CONSUMER_FAIL-OPEN-CHANNEL", err) - return nil, err - } - return amqpchan, nil -} - -func (c *Consumer) connect(connConfig ConnectionConfig) error { - attempts := 0 - for attempts <= connConfig.ReconnectMaxAttempt { - c.logger.Out("RMQ-CONSUMER", "Connecting to RabbitMQ") - // Make a connection to RMQ - conn, err := c.connPool.getCon() - if err != nil { - c.logger.Error("ERR_RMQ-CONSUMER_FAIL-CONNECT", err) - time.Sleep(connConfig.ReconnectInterval) - // Wait before retrying - continue - } - c.conn = conn - c.errorConn = make(chan *amqp.Error) - c.conn.NotifyClose(c.errorConn) - - // Open a channel for publishing - consumerChan, conChanErr := c.openChannel() - if conChanErr != nil { - c.logger.Error("ERR_RMQ-CONSUMER_FAIL-OPEN-CHANNEL", conChanErr) - return conChanErr - } - c.consumerChan = consumerChan - c.errorConsumerChan = make(chan *amqp.Error) - c.consumerChan.NotifyClose(c.errorConsumerChan) - c.logger.Out("RMQ-CONSUMER", "Connected to RabbitMQ") - return nil - } - return nil -} - -func (c *Consumer) openChannel() (*amqp.Channel, error) { - if c.conn == nil || c.conn.IsClosed() { - return nil, fmt.Errorf("connection is not open") - } - return c.conn.Channel() -} - -func (c *Consumer) listenOnChanClose() { - for { - select { - case err := <-c.errorConsumerChan: - if err != nil { - c.logger.Error("ERR_RMQ-PUBLISHER_FAIL-CHANNEL-CLOSE", err) - if !c.conn.IsClosed() { - errClose := c.conn.Close() - if errClose != nil { - c.logger.Error("ERR_RMQ-PUBLISHER_FAIL-CHANNEL-CLOSE", errClose) - } - } - } - } - } -} diff --git a/rmq/consumer/consume.go b/rmq/consumer/consume.go new file mode 100644 index 0000000..cc1a63e --- /dev/null +++ b/rmq/consumer/consume.go @@ -0,0 +1,253 @@ +package rabbitmq + +import ( + "errors" + "fmt" + "sync" + + "github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager" + amqp "github.com/rabbitmq/amqp091-go" +) + +// Action is an action that occurs after processed this delivery +type Action int + +// Handler defines the handler of each Delivery and return Action +type Handler func(d Delivery) (action Action) + +// EventHandler defines the handler of each Delivery and returns error if any +type EventHandler func(d Delivery) (err error) + +const ( + // Ack default ack this msg after you have successfully processed this delivery. + Ack Action = iota + // NackDiscard the message will be dropped or delivered to a server configured dead-letter queue. + NackDiscard + // NackRequeue deliver this message to a different consumer. + NackRequeue + // Manual means message acknowledgement is left to the user using the msg.Ack() method + Manual +) + +// Consumer allows you to create and connect to queues for data consumption. +type Consumer struct { + chanManager *channelmanager.ChannelManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + options ConsumerOptions + + isClosedMux *sync.RWMutex + isClosed bool +} + +// Delivery captures the fields for a previously delivered message resident in +// a queue to be delivered by the server to a consumer from Channel.Consume or +// Channel.Get. +type Delivery struct { + amqp.Delivery +} + +// NewConsumer returns a new Consumer connected to the given rabbitmq server +// it also starts consuming on the given connection with automatic reconnection handling +// Do not reuse the returned consumer for anything other than to close it +func NewConsumer( + conn *Conn, + handler EventHandler, + deadMsgHandler EventHandler, + queue string, + optionFuncs ...func(*ConsumerOptions), +) (*Consumer, error) { + defaultOptions := getDefaultConsumerOptions(queue) + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") + } + + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + reconnectErrCh, closeCh := chanManager.NotifyReconnect() + + consumer := &Consumer{ + chanManager: chanManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + options: *options, + isClosedMux: &sync.RWMutex{}, + isClosed: false, + } + + err = consumer.startGoroutines( + handler, + deadMsgHandler, + *options, + ) + if err != nil { + return nil, err + } + + go func() { + for err := range consumer.reconnectErrCh { + consumer.options.Logger.Out("OK_CONSUME_RECONNECT", fmt.Sprintf("successful consumer recovery from: %v", err)) + err = consumer.startGoroutines( + handler, + deadMsgHandler, + *options, + ) + if err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_RECONNECT", fmt.Errorf("consumer closing, error restarting consumer goroutines after cancel or close: %v", err)) + return + } + } + }() + + return consumer, nil +} + +// Close cleans up resources and closes the consumer. +// It does not close the connection manager, just the subscription +// to the connection manager and the consuming goroutines. +// Only call once. +func (consumer *Consumer) Close() { + consumer.isClosedMux.Lock() + defer consumer.isClosedMux.Unlock() + consumer.isClosed = true + // close the channel so that rabbitmq server knows that the + // consumer has been stopped. + err := consumer.chanManager.Close() + if err != nil { + consumer.options.Logger.Error("WARN_CONSUMER_CLOSE-CHANNEL", fmt.Errorf("error while closing the channel: %v", err)) + } + + consumer.options.Logger.Out("OK_CONSUMER_CLOSE", "closing consumer") + go func() { + consumer.closeConnectionToManagerCh <- struct{}{} + }() +} + +// startGoroutines declares the queue if it doesn't exist, +// binds the queue to the routing key(s), and starts the goroutines +// that will consume from the queue +func (consumer *Consumer) startGoroutines( + handler EventHandler, + deadMsgHandler EventHandler, + options ConsumerOptions, +) error { + err := consumer.chanManager.QosSafe( + options.QOSPrefetch, + 0, + options.QOSGlobal, + ) + if err != nil { + return fmt.Errorf("declare qos failed: %w", err) + } + err = declareExchange(consumer.chanManager, options.ExchangeOptions) + if err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } + err = declareQueue(consumer.chanManager, options.QueueOptions) + if err != nil { + return fmt.Errorf("declare queue failed: %w", err) + } + err = declareBindings(consumer.chanManager, options) + if err != nil { + return fmt.Errorf("declare bindings failed: %w", err) + } + + msgs, err := consumer.chanManager.ConsumeSafe( + options.QueueOptions.Name, + options.RabbitConsumerOptions.Name, + options.RabbitConsumerOptions.AutoAck, + options.RabbitConsumerOptions.Exclusive, + false, // no-local is not supported by RabbitMQ + options.RabbitConsumerOptions.NoWait, + tableToAMQPTable(options.RabbitConsumerOptions.Args), + ) + if err != nil { + return err + } + + for i := 0; i < options.Concurrency; i++ { + go handlerGoroutine(consumer, msgs, options, handler, deadMsgHandler) + } + consumer.options.Logger.Out("OK_CONSUMER_GOROUTINE-HANDLER", fmt.Sprintf("Processing messages on %v goroutines", options.Concurrency)) + return nil +} + +func (consumer *Consumer) getIsClosed() bool { + consumer.isClosedMux.RLock() + defer consumer.isClosedMux.RUnlock() + return consumer.isClosed +} + +func handlerGoroutine(consumer *Consumer, msgs <-chan amqp.Delivery, consumeOptions ConsumerOptions, handler EventHandler, deadMsgHandler EventHandler) { + for msg := range msgs { + if consumer.getIsClosed() { + break + } + + // Ack the message before processing if autoAck is enabled, if not the user will have to ack the message manually. + if consumeOptions.RabbitConsumerOptions.AutoAck { + err := msg.Ack(false) + if err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_AUTO-ACK", fmt.Errorf("can't ack message: %v", err)) + } + } + + // Check if message has been redelivered. If headers are present, it means the message has been redelivered at least once. + // We pass the msg to the deadMsgHandler if the retry limit has been exceeded. + headers := msg.Headers + // This is only for instant retry on quorum queues (x-delivery-count) + if deliveryCount, ok := headers["x-delivery-count"].(int64); ok { + if int(deliveryCount) > consumeOptions.RabbitConsumerOptions.RetryLimit { + if err := deadMsgHandler(Delivery{msg}); err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_DEAD-MESSAGE-HANDLER", fmt.Errorf("error in dead message handler: %v", err)) + } + continue + } + } + // This is for dlx retry (x-death) + if xDeathContent, ok := headers["x-death"].([]interface{}); ok { + var retryCount int64 + for _, content := range xDeathContent { + table, _ := content.(amqp.Table) + retryCount = table["count"].(int64) + break + } + if int(retryCount) > consumeOptions.RabbitConsumerOptions.RetryLimit { + if err := deadMsgHandler(Delivery{msg}); err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_DEAD-MESSAGE-HANDLER", fmt.Errorf("error in dead message handler: %v", err)) + } + continue + } + } + + // Attempt to handle message, Ack should be performed by user in handler, we only handle error cases. + if err := handler(Delivery{msg}); err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_HANDLER", fmt.Errorf("error in handler: %v", err)) + // Two options here, requeue directly into queue or requeue via dead letter exchange + if consumeOptions.RabbitConsumerOptions.DlxRetry { + err := msg.Nack(false, false) + if err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_NACK-REQUEUE-DLX", fmt.Errorf("can't nack message: %v", err)) + } + } else if consumeOptions.RabbitConsumerOptions.Retry && consumer.options.QueueOptions.Args["x-queue-type"] == "quorum" { + err := msg.Nack(false, true) + if err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_NACK-REQUEUE", fmt.Errorf("can't nack message: %v", err)) + } + } else { + if err := deadMsgHandler(Delivery{msg}); err != nil { + consumer.options.Logger.Error("ERR_CONSUMER_DEAD-MESSAGE-HANDLER", fmt.Errorf("error in dead message handler: %v", err)) + } + } + continue + } + } + consumer.options.Logger.Out("OK_CONSUMER_GOROUTINE-CLOSED", "rabbit consumer goroutine closed") +} diff --git a/rmq/consumer/consumer_options.go b/rmq/consumer/consumer_options.go new file mode 100644 index 0000000..48601fa --- /dev/null +++ b/rmq/consumer/consumer_options.go @@ -0,0 +1,334 @@ +package rabbitmq + +import ( + "github.com/kelchy/go-lib/rmq/consumer/internal/logger" + amqp "github.com/rabbitmq/amqp091-go" +) + +const defaultPrefetchCount = 1 + +// getDefaultConsumerOptions describes the options that will be used when a value isn't provided +func getDefaultConsumerOptions(queueName string) ConsumerOptions { + return ConsumerOptions{ + RabbitConsumerOptions: RabbitConsumerOptions{ + Name: "", + AutoAck: false, + Exclusive: false, + NoWait: false, + NoLocal: false, + Args: Table{}, + }, + QueueOptions: QueueOptions{ + Name: queueName, + Durable: false, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: true, + }, + ExchangeOptions: ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, + Bindings: []Binding{}, + Concurrency: 1, + Logger: logger.DefaultLogger, + QOSPrefetch: defaultPrefetchCount, + QOSGlobal: false, + } +} + +func getDefaultBindingOptions() BindingOptions { + return BindingOptions{ + NoWait: false, + Args: Table{}, + Declare: true, + } +} + +// ConsumerOptions are used to describe how a new consumer will be created. +// If QueueOptions is not nil, the options will be used to declare a queue +// If ExchangeOptions is not nil, it will be used to declare an exchange +// If there are Bindings, the queue will be bound to them +type ConsumerOptions struct { + RabbitConsumerOptions RabbitConsumerOptions + QueueOptions QueueOptions + ExchangeOptions ExchangeOptions + Bindings []Binding + Concurrency int + Logger logger.Logger + QOSPrefetch int + QOSGlobal bool +} + +// RabbitConsumerOptions are used to configure the consumer +// on the rabbit server +type RabbitConsumerOptions struct { + Name string + AutoAck bool + Exclusive bool + NoWait bool + NoLocal bool + DlxRetry bool // if true, the consumer will retry failed messages using the DLX + Retry bool // if true, the consumer will retry failed messages + RetryLimit int // the number of times a message will be retried before being rejected + Args Table +} + +// QueueOptions are used to configure a queue. +// A passive queue is assumed by RabbitMQ to already exist, and attempting to connect +// to a non-existent queue will cause RabbitMQ to throw an exception. +type QueueOptions struct { + Name string + Durable bool + AutoDelete bool + Exclusive bool + NoWait bool + Passive bool // if false, a missing queue will be created on the server + Args Table + Declare bool +} + +// Binding describes the bhinding of a queue to a routing key on an exchange +type Binding struct { + RoutingKey string + BindingOptions +} + +// BindingOptions describes the options a binding can have +type BindingOptions struct { + NoWait bool + Args Table + Declare bool +} + +// BindingDeclareOptions are used to configure a binding declaration +type BindingDeclareOptions struct { + QueueName string + RoutingKey string + ExchangeName string + NoWait bool + Args Table + Declare bool +} + +// WithConsumerOptionsQueueDurable ensures the queue is a durable queue +func WithConsumerOptionsQueueDurable(options *ConsumerOptions) { + options.QueueOptions.Durable = true +} + +// WithConsumerOptionsQueueAutoDelete ensures the queue is an auto-delete queue +func WithConsumerOptionsQueueAutoDelete(options *ConsumerOptions) { + options.QueueOptions.AutoDelete = true +} + +// WithConsumerOptionsQueueExclusive ensures the queue is an exclusive queue +func WithConsumerOptionsQueueExclusive(options *ConsumerOptions) { + options.QueueOptions.Exclusive = true +} + +// WithConsumerOptionsQueueNoWait ensures the queue is a no-wait queue +func WithConsumerOptionsQueueNoWait(options *ConsumerOptions) { + options.QueueOptions.NoWait = true +} + +// WithConsumerOptionsQueuePassive ensures the queue is a passive queue +func WithConsumerOptionsQueuePassive(options *ConsumerOptions) { + options.QueueOptions.Passive = true +} + +// WithConsumerOptionsQueueNoDeclare will turn off the declaration of the queue's +// existance upon startup +func WithConsumerOptionsQueueNoDeclare(options *ConsumerOptions) { + options.QueueOptions.Declare = false +} + +// WithConsumerOptionsQueueArgs adds optional args to the queue +func WithConsumerOptionsQueueArgs(args Table) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.QueueOptions.Args = args + } +} + +// WithConsumerOptionsExchangeName sets the exchange name +func WithConsumerOptionsExchangeName(name string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions.Name = name + } +} + +// WithConsumerOptionsExchangeKind ensures the queue is a durable queue +func WithConsumerOptionsExchangeKind(kind string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions.Kind = kind + } +} + +// WithConsumerOptionsExchangeDurable ensures the exchange is a durable exchange +func WithConsumerOptionsExchangeDurable(options *ConsumerOptions) { + options.ExchangeOptions.Durable = true +} + +// WithConsumerOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange +func WithConsumerOptionsExchangeAutoDelete(options *ConsumerOptions) { + options.ExchangeOptions.AutoDelete = true +} + +// WithConsumerOptionsExchangeInternal ensures the exchange is an internal exchange +func WithConsumerOptionsExchangeInternal(options *ConsumerOptions) { + options.ExchangeOptions.Internal = true +} + +// WithConsumerOptionsExchangeNoWait ensures the exchange is a no-wait exchange +func WithConsumerOptionsExchangeNoWait(options *ConsumerOptions) { + options.ExchangeOptions.NoWait = true +} + +// WithConsumerOptionsExchangeDeclare stops this library from declaring the exchanges existance +func WithConsumerOptionsExchangeDeclare(options *ConsumerOptions) { + options.ExchangeOptions.Declare = true +} + +// WithConsumerOptionsExchangePassive ensures the exchange is a passive exchange +func WithConsumerOptionsExchangePassive(options *ConsumerOptions) { + options.ExchangeOptions.Passive = true +} + +// WithConsumerOptionsExchangeArgs adds optional args to the exchange +func WithConsumerOptionsExchangeArgs(args Table) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.ExchangeOptions.Args = args + } +} + +// WithConsumerOptionsRoutingKey binds the queue to a routing key with the default binding options +func WithConsumerOptionsRoutingKey(routingKey string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Bindings = append(options.Bindings, Binding{ + RoutingKey: routingKey, + BindingOptions: getDefaultBindingOptions(), + }) + } +} + +// WithConsumerOptionsBinding adds a new binding to the queue which allows you to set the binding options +// on a per-binding basis. Keep in mind that everything in the BindingOptions struct will default to +// the zero value. If you want to declare your bindings for example, be sure to set Declare=true +func WithConsumerOptionsBinding(binding Binding) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Bindings = append(options.Bindings, binding) + } +} + +// WithConsumerOptionsConcurrency returns a function that sets the concurrency, which means that +// many goroutines will be spawned to run the provided handler on messages +func WithConsumerOptionsConcurrency(concurrency int) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Concurrency = concurrency + } +} + +// WithConsumerOptionsConsumerName returns a function that sets the name on the server of this consumer +// if unset a random name will be given +func WithConsumerOptionsConsumerName(consumerName string) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.Name = consumerName + } +} + +// WithConsumerOptionsConsumerAutoAck returns a function that sets the auto acknowledge property on the server of this consumer +// if unset the default will be used (false) +func WithConsumerOptionsConsumerAutoAck(autoAck bool) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.AutoAck = autoAck + } +} + +// WithConsumerOptionsConsumerExclusive sets the consumer to exclusive, which means +// the server will ensure that this is the sole consumer +// from this queue. When exclusive is false, the server will fairly distribute +// deliveries across multiple consumers. +func WithConsumerOptionsConsumerExclusive(options *ConsumerOptions) { + options.RabbitConsumerOptions.Exclusive = true +} + +// WithConsumerOptionsConsumerNoWait sets the consumer to nowait, which means +// it does not wait for the server to confirm the request and +// immediately begin deliveries. If it is not possible to consume, a channel +// exception will be raised and the channel will be closed. +func WithConsumerOptionsConsumerNoWait(options *ConsumerOptions) { + options.RabbitConsumerOptions.NoWait = true +} + +// WithConsumerOptionsConsumerRetry sets the consumer to retry, which means +// it will retry failed messages instantly by queuing them up again on the same routing key +// Only use this if you have a quorum queue since poison messages will be requeued +func WithConsumerOptionsConsumerRetry(options *ConsumerOptions) { + options.RabbitConsumerOptions.Retry = true +} + +// WithConsumerOptionsConsumerDlxRetry sets the consumer to retry using dlx, which means +// it will retry failed messages by queuing them up again on the dlx exchange +// this is useful if you want to retry messages after a delay and have set up a dlx exchange +// with a ttl on the queue +func WithConsumerOptionsConsumerDlxRetry(options *ConsumerOptions) { + options.RabbitConsumerOptions.DlxRetry = true +} + +// WithConsumerOptionsConsumerRetryLimit sets the consumer to the retry limit, which means +// it will retry failed messages up to the limit and then drop them. +// Dropped messages will be sent to the dead message handler and not the dlq +func WithConsumerOptionsConsumerRetryLimit(limit int) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.RabbitConsumerOptions.RetryLimit = limit + } +} + +// WithConsumerOptionsLogging uses a default logger that writes to std out +func WithConsumerOptionsLogging(options *ConsumerOptions) { + options.Logger = logger.DefaultLogger +} + +// WithConsumerOptionsLogger sets logging to a custom interface. +// Use WithConsumerOptionsLogging to just log to stdout. +func WithConsumerOptionsLogger(log logger.Logger) func(options *ConsumerOptions) { + return func(options *ConsumerOptions) { + options.Logger = log + } +} + +// WithConsumerOptionsQOSPrefetch returns a function that sets the prefetch count, which means that +// many messages will be fetched from the server in advance to help with throughput. +// This doesn't affect the handler, messages are still processed one at a time. +func WithConsumerOptionsQOSPrefetch(prefetchCount int) func(*ConsumerOptions) { + return func(options *ConsumerOptions) { + options.QOSPrefetch = prefetchCount + } +} + +// WithConsumerOptionsQOSGlobal sets the qos on the channel to global, which means +// these QOS settings apply to ALL existing and future +// consumers on all channels on the same connection +func WithConsumerOptionsQOSGlobal(options *ConsumerOptions) { + options.QOSGlobal = true +} + +// WithConsumerOptionsQueueQuorum sets the queue a quorum type, which means +// multiple nodes in the cluster will have the messages distributed amongst them +// for higher reliability +func WithConsumerOptionsQueueQuorum(options *ConsumerOptions) { + if options.QueueOptions.Args == nil { + options.QueueOptions.Args = Table{} + } + + options.QueueOptions.Args["x-queue-type"] = "quorum" +} diff --git a/rmq/consumer/declare.go b/rmq/consumer/declare.go new file mode 100644 index 0000000..a86b055 --- /dev/null +++ b/rmq/consumer/declare.go @@ -0,0 +1,148 @@ +package rabbitmq + +import ( + "github.com/kelchy/go-lib/log" + "github.com/kelchy/go-lib/rmq/consumer/internal/channelmanager" + "github.com/kelchy/go-lib/rmq/consumer/internal/logger" +) + +func declareExchange(chanManager *channelmanager.ChannelManager, options ExchangeOptions) error { + if !options.Declare { + return nil + } + if options.Passive { + err := chanManager.ExchangeDeclarePassiveSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil + } + err := chanManager.ExchangeDeclareSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil +} + +func declareQueue(chanManager *channelmanager.ChannelManager, options QueueOptions) error { + if !options.Declare { + return nil + } + if options.Passive { + _, err := chanManager.QueueDeclarePassiveSafe( + options.Name, + options.Durable, + options.AutoDelete, + options.Exclusive, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil + } + _, err := chanManager.QueueDeclareSafe( + options.Name, + options.Durable, + options.AutoDelete, + options.Exclusive, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil +} + +func declareBindings(chanManager *channelmanager.ChannelManager, options ConsumerOptions) error { + for _, binding := range options.Bindings { + if !binding.Declare { + continue + } + err := chanManager.QueueBindSafe( + options.QueueOptions.Name, + binding.RoutingKey, + options.ExchangeOptions.Name, + binding.NoWait, + tableToAMQPTable(binding.Args), + ) + if err != nil { + return err + } + } + return nil +} + +// DeclareExchange declares all exchanges in the given options +func DeclareExchange( + conn *Conn, + options ExchangeOptions, +) error { + // Creates a new channel manager + errLogger, _ := log.New("erroronly") + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval) + if err != nil { + return err + } + // Close the channel manager when done + defer chanManager.Close() + return declareExchange(chanManager, options) +} + +// DeclareQueue declares the queue in the given options +func DeclareQueue( + conn *Conn, + options QueueOptions, +) error { + // Creates a new channel manager + errLogger, _ := log.New("erroronly") + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, errLogger, conn.connectionManager.ReconnectInterval) + if err != nil { + return err + } + // Close the channel manager when done + defer chanManager.Close() + return declareQueue(chanManager, options) +} + +// DeclareBinding declares the binding in the given options +func DeclareBinding( + conn *Conn, + options BindingDeclareOptions, +) error { + // Creates a new channel manager + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, logger.DefaultLogger, conn.connectionManager.ReconnectInterval) + if err != nil { + return err + } + // Close the channel manager when done + defer chanManager.Close() + if err := chanManager.QueueBindSafe( + options.QueueName, + options.RoutingKey, + options.ExchangeName, + options.NoWait, + tableToAMQPTable(options.Args), + ); err != nil { + return err + } + return nil +} diff --git a/rmq/consumer/eventhandler.go b/rmq/consumer/eventhandler.go deleted file mode 100644 index 76066aa..0000000 --- a/rmq/consumer/eventhandler.go +++ /dev/null @@ -1,91 +0,0 @@ -package consumer - -import ( - "context" - "fmt" - - "github.com/streadway/amqp" -) - -// IEventHandler interface contains methods implemented by the package -type IEventHandler interface { - HandleEvent(context.Context, IMessage) - Retry(context.Context, IMessage, error) - HandleDeadMessage(context.Context, IMessage, error) -} - -// IClientHandler interface contains methods to be implemented by the user of the package -type IClientHandler interface { - ProcessEvent(context.Context, IMessage) error - ProcessDeadMessage(context.Context, IMessage, error) error -} - -// EventHandler struct -type EventHandler struct { - processor IClientHandler - logger ILogger - retryConfig *MessageRetryConfig -} - -// NewEventHandler returns a new EventHandler -func NewEventHandler(processor IClientHandler, logger ILogger, retryConfig *MessageRetryConfig) IEventHandler { - return &EventHandler{ - processor: processor, - logger: logger, - retryConfig: retryConfig, - } -} - -// HandleEvent handles the event received from the queue -func (e *EventHandler) HandleEvent(ctx context.Context, message IMessage) { - err := e.processor.ProcessEvent(ctx, message) - if err != nil { - // Attempts to retry the message if the retry is enabled - if e.retryConfig.Enabled { - e.Retry(ctx, message, err) - return - } - e.HandleDeadMessage(ctx, message, err) - return - } -} - -// Retry retries the message if the retry config is enabled -func (e *EventHandler) Retry(ctx context.Context, message IMessage, err error) { - headers := message.Headers() - if headers == nil { // in case of 1st retry no headers are present - errAck := message.Nack(false, false) - if errAck != nil { - e.logger.Error(fmt.Sprintf("ERR_EVENT_HANDLER-FAIL-MSG-ACK-%s", message.GetID()), errAck) - } - return - } - if xDeathContent, ok := headers["x-death"].([]interface{}); ok { - for _, content := range xDeathContent { - table, _ := content.(amqp.Table) - retryCount, _ := table["count"].(int64) - if int(retryCount) < e.retryConfig.RetryCountLimit { - errAck := message.Nack(false, false) - if errAck != nil { - e.logger.Error(fmt.Sprintf("ERR_EVENT_HANDLER-FAIL-MSG-ACK-%s", message.GetID()), errAck) - return - } - return - } - e.HandleDeadMessage(ctx, message, fmt.Errorf("retry count %d exceeded for msg %s", int(retryCount), message.GetID())) - // Golint flags this return but it is needed - return - } - } - e.HandleDeadMessage(ctx, message, err) -} - -// HandleDeadMessage handles the dead message -func (e *EventHandler) HandleDeadMessage(ctx context.Context, message IMessage, err error) { - if e.retryConfig.HandleDeadMessage { - handleDMErr := e.processor.ProcessDeadMessage(ctx, message, err) - if handleDMErr != nil { - e.logger.Error(fmt.Sprintf("ERR_EVENT_HANDLER-DEAD-MSG-FAIL-MSG-ACK-%s", message.GetID()), handleDMErr) - } - } -} diff --git a/rmq/consumer/example/consumer.example.go b/rmq/consumer/example/consumer.example.go index 3390c91..8a3973a 100644 --- a/rmq/consumer/example/consumer.example.go +++ b/rmq/consumer/example/consumer.example.go @@ -1,69 +1,141 @@ package main import ( - "context" "fmt" "os" - "time" + "os/signal" + "syscall" - "github.com/kelchy/go-lib/rmq/consumer" + rabbitmq "github.com/kelchy/go-lib/rmq/consumer" ) +/** + * This example demonstrates how to create a consumer with retry and dead letter queue + * The consumer will consume messages from the queue "q_events" with routing key "events.basic" + * If the message is Nacked, it will be sent to the dead letter queue "q_event_retry" + * The dead letter queue will retry the message 3 times with a delay of 10 seconds between each retry + * If the message is Nacked after the 3rd retry, it will be handled by the dead message handler + */ + func main() { - eventProcessor := &eventProcessor{} - err := consumer.New( - consumer.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), - // Queue names should be the same in QueueConfig and QueueBindConfig - consumer.DefaultQueueConfig("test-queue-logging"), - consumer.DefaultQueueBindConfig("test-exchange", "test-queue-logging", "test-routing-key"), - consumer.DefaultConfig("test-consumer"), - consumer.DefaultMessageRetryConfig(), - eventProcessor, - consumer.DefaultLogger()) + conn, err := rabbitmq.NewConn( + os.Getenv("RMQ_URI"), + rabbitmq.WithConnectionOptionsLogging, + ) if err != nil { - fmt.Println("failed to create consumer: ", err) - } - - // If you want to verify the presence of queue and exchanges - // Not required if consumer is init using New() as above - conn, _ := consumer.NewConnection(consumer.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), consumer.DefaultLogger()) - connChan, _ := conn.Channel() - exDeclareErr := consumer.NewExchange(connChan, consumer.DefaultExchangeConfig("test-exchange", "direct")) - _, queueDeclareErr := consumer.NewQueue(connChan, consumer.QueueConfig{ - Name: "test-queue-logging", - Durable: true, - AutoDelete: true, - Exclusive: false, - NoWait: false, - Args: nil, - }) - qBindErr := consumer.NewQueueBind(connChan, "test-exchange", "test-queue-logging", "test-routing-key", false, nil) - if exDeclareErr != nil { - fmt.Println("failed to declare exchange: ", exDeclareErr) - return - } - if queueDeclareErr != nil { - fmt.Println("failed to declare queue: ", queueDeclareErr) + fmt.Println(err) return } - if qBindErr != nil { - fmt.Println("failed to bind queue: ", qBindErr) + defer conn.Close() + + // Verify that exchange exists + if err := rabbitmq.DeclareExchange( + conn, + rabbitmq.ExchangeOptions{ + Name: "events", + Kind: "direct", + Declare: true, + Durable: true, + AutoDelete: false, + Internal: false, + NoWait: false, + Args: nil, + }, + ); err != nil { + fmt.Println(err) return } - fmt.Println("queue and exchange verified") - // Leave the consumer running for 30 seconds before exiting, only for example purposes - time.Sleep(30 * time.Second) -} -// EventProcessor is an example of a consumer event processor. -type eventProcessor struct{} + // Consumer options can be changed according to your needs + rabbitmq.NewConsumer( + conn, + eventHandler, + deadMessageHandler, + "q_events", + rabbitmq.WithConsumerOptionsExchangeName("events"), + rabbitmq.WithConsumerOptionsRoutingKey("events.basic"), + rabbitmq.WithConsumerOptionsConsumerDlxRetry, + rabbitmq.WithConsumerOptionsConsumerRetryLimit(3), + rabbitmq.WithConsumerOptionsConsumerAutoAck(false), + rabbitmq.WithConsumerOptionsQueueDurable, + rabbitmq.WithConsumerOptionsQueueArgs( + map[string]interface{}{ + "x-dead-letter-exchange": "events", + "x-dead-letter-routing-key": "retry.events.basic", + }, + ), + ) + + // Declares the DLQ and binds it back to the original queue + rabbitmq.DeclareQueue( + conn, + rabbitmq.QueueOptions{ + Name: "q_event_retry", + Durable: true, + AutoDelete: false, + Exclusive: false, + NoWait: false, + Args: map[string]interface{}{ + "x-message-ttl": 10000, + "x-dead-letter-exchange": "events", + "x-dead-letter-routing-key": "events.basic", + }, + Declare: true, + }, + ) + rabbitmq.DeclareBinding( + conn, + rabbitmq.BindingDeclareOptions{ + QueueName: "q_event_retry", + ExchangeName: "events", + RoutingKey: "retry.events.basic", + }, + ) + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) -func (*eventProcessor) ProcessEvent(ctx context.Context, message consumer.IMessage) error { - fmt.Printf("Recieved message: ID: %s, Message: %s\n", message.GetID(), string(message.Body())) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + <-done + fmt.Println("stopping consumer") +} + +func eventHandler(d rabbitmq.Delivery) error { + fmt.Printf("consumed: %s, %v \n", string(d.MessageId), string(d.Body)) + return fmt.Errorf("nack") + if err := d.Ack(false); err != nil { + return err + } return nil } -func (*eventProcessor) ProcessDeadMessage(ctx context.Context, message consumer.IMessage, err error) error { - fmt.Printf("Recieved dead message: ID: %s, Message: %s, Error: %v", message.GetID(), string(message.Body()), err) +// Alternatively can use a wrapper for the event handler so you will not need to call ack manually within each different consumer +func eventHandlerWrapper(f rabbitmq.EventHandler) func(msg rabbitmq.Delivery) error { + return func(msg rabbitmq.Delivery) error { + if err := f(msg); err != nil { + return err + } + if err := msg.Ack(false); err != nil { + return err + } + return nil + } +} + +func deadMessageHandler(d rabbitmq.Delivery) error { + fmt.Printf("Dead Message Received: %s, %v \n", string(d.MessageId), string(d.Body)) + if err := d.Ack(false); err != nil { + return err + } return nil } diff --git a/rmq/consumer/exchange.go b/rmq/consumer/exchange.go deleted file mode 100644 index 5668adc..0000000 --- a/rmq/consumer/exchange.go +++ /dev/null @@ -1,9 +0,0 @@ -package consumer - -import "github.com/streadway/amqp" - -// NewExchange creates a new exchange or ensures an exchange exists that matches the provided configuration -func NewExchange(amqpChannel *amqp.Channel, exchangeConfig ExchangeConfig) error { - err := amqpChannel.ExchangeDeclare(exchangeConfig.Name, exchangeConfig.Kind, exchangeConfig.Durable, exchangeConfig.AutoDelete, exchangeConfig.Internal, exchangeConfig.NoWait, exchangeConfig.Args) - return err -} diff --git a/rmq/consumer/exchange_options.go b/rmq/consumer/exchange_options.go new file mode 100644 index 0000000..cf5e255 --- /dev/null +++ b/rmq/consumer/exchange_options.go @@ -0,0 +1,16 @@ +package rabbitmq + +// ExchangeOptions are used to configure an exchange. +// If the Passive flag is set the client will only check if the exchange exists on the server +// and that the settings match, no creation attempt will be made. +type ExchangeOptions struct { + Name string + Kind string // possible values: empty string for default exchange or direct, topic, fanout + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Passive bool // if false, a missing exchange will be created on the server + Args Table + Declare bool +} diff --git a/rmq/consumer/go.mod b/rmq/consumer/go.mod index 6753d95..5c995eb 100644 --- a/rmq/consumer/go.mod +++ b/rmq/consumer/go.mod @@ -1,9 +1,10 @@ -//v0.0.9 +//v0.1.1 module github.com/kelchy/go-lib/rmq/consumer go 1.19 require ( - github.com/kelchy/go-lib/log v0.0.10 - github.com/streadway/amqp v1.0.0 + github.com/google/uuid v1.3.0 + github.com/kelchy/go-lib/log v0.0.11 + github.com/rabbitmq/amqp091-go v1.8.0 ) diff --git a/rmq/consumer/go.sum b/rmq/consumer/go.sum index 8d24dad..59a958d 100644 --- a/rmq/consumer/go.sum +++ b/rmq/consumer/go.sum @@ -1,4 +1,22 @@ -github.com/kelchy/go-lib/log v0.0.10 h1:K2ilS1c3pHwzXuQhKbTYagiNPYJYaGJq6BHr8TjPM2g= -github.com/kelchy/go-lib/log v0.0.10/go.mod h1:08sbkvkTs1hFLUcHsOqCUXJBAF1VUrllqKkB3lmDEFM= -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/kelchy/go-lib/log v0.0.11 h1:8WkPZNmOk+4UVZe8kI/lYaWOjXNWrdYCS5HD/YFc+T8= +github.com/kelchy/go-lib/log v0.0.11/go.mod h1:iD2C86ZX89OfM91lvvJtDaJm9BX+KqD5i2R9Fq7D404= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM= +github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/rmq/consumer/internal/channelmanager/channel_manager.go b/rmq/consumer/internal/channelmanager/channel_manager.go new file mode 100644 index 0000000..063de01 --- /dev/null +++ b/rmq/consumer/internal/channelmanager/channel_manager.go @@ -0,0 +1,150 @@ +package channelmanager + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/kelchy/go-lib/rmq/consumer/internal/connectionmanager" + "github.com/kelchy/go-lib/rmq/consumer/internal/dispatcher" + "github.com/kelchy/go-lib/rmq/consumer/internal/logger" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// ChannelManager - +type ChannelManager struct { + logger logger.Logger + channel *amqp.Channel + connManager *connectionmanager.ConnectionManager + channelMux *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher +} + +// NewChannelManager creates a new connection manager +func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { + ch, err := getNewChannel(connManager) + if err != nil { + return nil, err + } + + chanManager := ChannelManager{ + logger: log, + connManager: connManager, + channel: ch, + channelMux: &sync.RWMutex{}, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go chanManager.startNotifyCancelOrClosed() + return &chanManager, nil +} + +func getNewChannel(connManager *connectionmanager.ConnectionManager) (*amqp.Channel, error) { + conn := connManager.CheckoutConnection() + defer connManager.CheckinConnection() + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + return ch, nil +} + +// startNotifyCancelOrClosed listens on the channel's cancelled and closed +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// channel +func (chanManager *ChannelManager) startNotifyCancelOrClosed() { + notifyCloseChan := chanManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := chanManager.channel.NotifyCancel(make(chan string, 1)) + + select { + case err := <-notifyCloseChan: + if err != nil { + chanManager.logger.Error("ERR_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", fmt.Errorf("attempting to reconnect to amqp server after close with error: %v", err)) + chanManager.reconnectLoop() + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", "successfully reconnected to amqp server after close") + chanManager.dispatcher.Dispatch(err) + } + if err == nil { + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", "amqp channel closed gracefully") + } + case err := <-notifyCancelChan: + chanManager.logger.Error("ERR_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", fmt.Errorf("attempting to reconnect to amqp server after cancel with error: %v", err)) + chanManager.reconnectLoop() + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", "successfully reconnected to amqp server after cancel") + chanManager.dispatcher.Dispatch(errors.New(err)) + } +} + +// GetReconnectionCount - +func (chanManager *ChannelManager) GetReconnectionCount() uint { + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + return chanManager.reconnectionCount +} + +func (chanManager *ChannelManager) incrementReconnectionCount() { + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCount++ +} + +// reconnectLoop continuously attempts to reconnect +func (chanManager *ChannelManager) reconnectLoop() { + for { + time.Sleep(chanManager.reconnectInterval) + err := chanManager.reconnect() + if err != nil { + chanManager.logger.Error("ERR_RMQ-CHAN-MANAGER_RECONNECT-LOOP", fmt.Errorf("error reconnecting to amqp server: %v", err)) + } else { + chanManager.incrementReconnectionCount() + go chanManager.startNotifyCancelOrClosed() + return + } + } +} + +// reconnect safely closes the current channel and obtains a new one +func (chanManager *ChannelManager) reconnect() error { + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + newChannel, err := getNewChannel(chanManager.connManager) + if err != nil { + return err + } + + if err = chanManager.channel.Close(); err != nil { + chanManager.logger.Error("WARN_RMQ-CHAN-MANAGER_RECONNECT", fmt.Errorf("error closing channel while reconnecting: %v", err)) + } + + chanManager.channel = newChannel + return nil +} + +// Close safely closes the current channel and connection +func (chanManager *ChannelManager) Close() error { + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_CLOSE", "closing channel manager...") + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + + err := chanManager.channel.Close() + if err != nil { + return err + } + + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnect to the server +func (chanManager *ChannelManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return chanManager.dispatcher.AddSubscriber() +} diff --git a/rmq/consumer/internal/channelmanager/safe_wraps.go b/rmq/consumer/internal/channelmanager/safe_wraps.go new file mode 100644 index 0000000..ff99a3e --- /dev/null +++ b/rmq/consumer/internal/channelmanager/safe_wraps.go @@ -0,0 +1,237 @@ +package channelmanager + +import ( + "context" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// ConsumeSafe safely wraps the (*amqp.Channel).Consume method +func (chanManager *ChannelManager) ConsumeSafe( + queue, + consumer string, + autoAck, + exclusive, + noLocal, + noWait bool, + args amqp.Table, +) (<-chan amqp.Delivery, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Consume( + queue, + consumer, + autoAck, + exclusive, + noLocal, + noWait, + args, + ) +} + +// QueueDeclarePassiveSafe safely wraps the (*amqp.Channel).QueueDeclarePassive method +func (chanManager *ChannelManager) QueueDeclarePassiveSafe( + name string, + durable bool, + autoDelete bool, + exclusive bool, + noWait bool, + args amqp.Table, +) (amqp.Queue, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueDeclarePassive( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// QueueDeclareSafe safely wraps the (*amqp.Channel).QueueDeclare method +func (chanManager *ChannelManager) QueueDeclareSafe( + name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, +) (amqp.Queue, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueDeclare( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// ExchangeDeclarePassiveSafe safely wraps the (*amqp.Channel).ExchangeDeclarePassive method +func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeDeclarePassive( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// ExchangeDeclareSafe safely wraps the (*amqp.Channel).ExchangeDeclare method +func (chanManager *ChannelManager) ExchangeDeclareSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeDeclare( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method +func (chanManager *ChannelManager) QueueBindSafe( + name string, key string, exchange string, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueBind( + name, + key, + exchange, + noWait, + args, + ) +} + +// QosSafe safely wraps the (*amqp.Channel).Qos method +func (chanManager *ChannelManager) QosSafe( + prefetchCount int, prefetchSize int, global bool, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Qos( + prefetchCount, + prefetchSize, + global, + ) +} + +/* +PublishSafe safely wraps the (*amqp.Channel).Publish method. +*/ +func (chanManager *ChannelManager) PublishSafe( + exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithContext( + context.Background(), + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// PublishWithContextSafe safely wraps the (*amqp.Channel).PublishWithContext method. +func (chanManager *ChannelManager) PublishWithContextSafe( + ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithContext( + ctx, + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// PublishWithDeferredConfirmWithContextSafe safely wraps the (*amqp.Channel).PublishWithDeferredConfirmWithContext method. +func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( + ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) (*amqp.DeferredConfirmation, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithDeferredConfirmWithContext( + ctx, + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method +func (chanManager *ChannelManager) NotifyReturnSafe( + c chan amqp.Return, +) chan amqp.Return { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyReturn( + c, + ) +} + +// ConfirmSafe safely wraps the (*amqp.Channel).Confirm method +func (chanManager *ChannelManager) ConfirmSafe( + noWait bool, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Confirm( + noWait, + ) +} + +// NotifyPublishSafe safely wraps the (*amqp.Channel).NotifyPublish method +func (chanManager *ChannelManager) NotifyPublishSafe( + confirm chan amqp.Confirmation, +) chan amqp.Confirmation { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyPublish( + confirm, + ) +} + +// NotifyFlowSafe safely wraps the (*amqp.Channel).NotifyFlow method +func (chanManager *ChannelManager) NotifyFlowSafe( + c chan bool, +) chan bool { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyFlow( + c, + ) +} diff --git a/rmq/consumer/internal/connectionmanager/connection_manager.go b/rmq/consumer/internal/connectionmanager/connection_manager.go new file mode 100644 index 0000000..899ed87 --- /dev/null +++ b/rmq/consumer/internal/connectionmanager/connection_manager.go @@ -0,0 +1,140 @@ +package connectionmanager + +import ( + "fmt" + "sync" + "time" + + "github.com/kelchy/go-lib/rmq/consumer/internal/dispatcher" + "github.com/kelchy/go-lib/rmq/consumer/internal/logger" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// ConnectionManager - +type ConnectionManager struct { + logger logger.Logger + url string + connection *amqp.Connection + amqpConfig amqp.Config + connectionMux *sync.RWMutex + ReconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher +} + +// NewConnectionManager creates a new connection manager +func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { + conn, err := amqp.DialConfig(url, amqp.Config(conf)) + if err != nil { + return nil, err + } + connManager := ConnectionManager{ + logger: log, + url: url, + connection: conn, + amqpConfig: conf, + connectionMux: &sync.RWMutex{}, + ReconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go connManager.startNotifyClose() + return &connManager, nil +} + +// Close safely closes the current channel and connection +func (connManager *ConnectionManager) Close() error { + connManager.logger.Out("OK_RMQ-CONN-MANAGER_CLOSE", "closing connection manager...") + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + + err := connManager.connection.Close() + if err != nil { + return err + } + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnected to the server +func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return connManager.dispatcher.AddSubscriber() +} + +// CheckoutConnection - +func (connManager *ConnectionManager) CheckoutConnection() *amqp.Connection { + connManager.connectionMux.RLock() + return connManager.connection +} + +// CheckinConnection - +func (connManager *ConnectionManager) CheckinConnection() { + connManager.connectionMux.RUnlock() +} + +// startNotifyCancelOrClosed listens on the channel's cancelled and closed +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// channel +func (connManager *ConnectionManager) startNotifyClose() { + notifyCloseChan := connManager.connection.NotifyClose(make(chan *amqp.Error, 1)) + + err := <-notifyCloseChan + if err != nil { + connManager.logger.Error("ERR_RMQ-CONN-MANAGER_START-NOTIFY_CLOSE", fmt.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err.Error())) + connManager.reconnectLoop() + connManager.logger.Out("OK_RMQ-CONN-MANAGER_START-NOTIFY-CLOSE", "successfully reconnected to amqp server") + connManager.dispatcher.Dispatch(err) + } + if err == nil { + connManager.logger.Out("OK_RMQ-CONN-MANAGER", "amqp connection closed gracefully") + } +} + +// GetReconnectionCount - +func (connManager *ConnectionManager) GetReconnectionCount() uint { + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + return connManager.reconnectionCount +} + +func (connManager *ConnectionManager) incrementReconnectionCount() { + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCount++ +} + +// reconnectLoop continuously attempts to reconnect +func (connManager *ConnectionManager) reconnectLoop() { + for { + time.Sleep(connManager.ReconnectInterval) + err := connManager.reconnect() + if err != nil { + connManager.logger.Error("ERR_RMQ_RECONNECT-LOOP", fmt.Errorf("error reconnecting to amqp server: %v", err)) + } else { + connManager.incrementReconnectionCount() + go connManager.startNotifyClose() + return + } + } +} + +// reconnect safely closes the current channel and obtains a new one +func (connManager *ConnectionManager) reconnect() error { + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) + if err != nil { + return err + } + + if err = connManager.connection.Close(); err != nil { + connManager.logger.Error("WARN_RMQ-CONN-MANAGER_RECONNECT", fmt.Errorf("error closing connection while reconnecting: %v", err)) + } + + connManager.connection = newConn + return nil +} diff --git a/rmq/consumer/internal/connectionmanager/safe_wraps.go b/rmq/consumer/internal/connectionmanager/safe_wraps.go new file mode 100644 index 0000000..b6702af --- /dev/null +++ b/rmq/consumer/internal/connectionmanager/safe_wraps.go @@ -0,0 +1,17 @@ +package connectionmanager + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +// NotifyBlockedSafe safely wraps the (*amqp.Connection).NotifyBlocked method +func (connManager *ConnectionManager) NotifyBlockedSafe( + receiver chan amqp.Blocking, +) chan amqp.Blocking { + connManager.connectionMux.RLock() + defer connManager.connectionMux.RUnlock() + + return connManager.connection.NotifyBlocked( + receiver, + ) +} diff --git a/rmq/consumer/internal/dispatcher/dispatcher.go b/rmq/consumer/internal/dispatcher/dispatcher.go new file mode 100644 index 0000000..52385c6 --- /dev/null +++ b/rmq/consumer/internal/dispatcher/dispatcher.go @@ -0,0 +1,72 @@ +package dispatcher + +import ( + "log" + "math" + "math/rand" + "sync" + "time" +) + +// Dispatcher - +type Dispatcher struct { + subscribers map[int]dispatchSubscriber + subscribersMux *sync.Mutex +} + +type dispatchSubscriber struct { + notifyCancelOrCloseChan chan error + closeCh <-chan struct{} +} + +// NewDispatcher - +func NewDispatcher() *Dispatcher { + return &Dispatcher{ + subscribers: make(map[int]dispatchSubscriber), + subscribersMux: &sync.Mutex{}, + } +} + +// Dispatch - +func (d *Dispatcher) Dispatch(err error) error { + d.subscribersMux.Lock() + defer d.subscribersMux.Unlock() + for _, subscriber := range d.subscribers { + select { + case <-time.After(time.Second * 5): + log.Println("Unexpected rabbitmq error: timeout in dispatch") + case subscriber.notifyCancelOrCloseChan <- err: + } + } + return nil +} + +// AddSubscriber - +func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}) { + const maxRand = math.MaxInt + const minRand = 0 + id := rand.Intn(maxRand-minRand) + minRand + + closeCh := make(chan struct{}) + notifyCancelOrCloseChan := make(chan error) + + d.subscribersMux.Lock() + d.subscribers[id] = dispatchSubscriber{ + notifyCancelOrCloseChan: notifyCancelOrCloseChan, + closeCh: closeCh, + } + d.subscribersMux.Unlock() + + go func(id int) { + <-closeCh + d.subscribersMux.Lock() + defer d.subscribersMux.Unlock() + sub, ok := d.subscribers[id] + if !ok { + return + } + close(sub.notifyCancelOrCloseChan) + delete(d.subscribers, id) + }(id) + return notifyCancelOrCloseChan, closeCh +} diff --git a/rmq/consumer/internal/dispatcher/dispatcher_test.go b/rmq/consumer/internal/dispatcher/dispatcher_test.go new file mode 100644 index 0000000..afc5509 --- /dev/null +++ b/rmq/consumer/internal/dispatcher/dispatcher_test.go @@ -0,0 +1,34 @@ +package dispatcher + +import ( + "testing" + "time" +) + +func TestNewDispatcher(t *testing.T) { + d := NewDispatcher() + if d.subscribers == nil { + t.Error("Dispatcher subscribers is nil") + } + if d.subscribersMux == nil { + t.Error("Dispatcher subscribersMux is nil") + } +} + +func TestAddSubscriber(t *testing.T) { + d := NewDispatcher() + d.AddSubscriber() + if len(d.subscribers) != 1 { + t.Error("Dispatcher subscribers length is not 1") + } +} + +func TestCloseSubscriber(t *testing.T) { + d := NewDispatcher() + _, closeCh := d.AddSubscriber() + close(closeCh) + time.Sleep(time.Millisecond) + if len(d.subscribers) != 0 { + t.Error("Dispatcher subscribers length is not 0") + } +} diff --git a/rmq/consumer/internal/logger/logger.go b/rmq/consumer/internal/logger/logger.go new file mode 100644 index 0000000..d87ca31 --- /dev/null +++ b/rmq/consumer/internal/logger/logger.go @@ -0,0 +1,13 @@ +package logger + +import "github.com/kelchy/go-lib/log" + +// Logger is a simple interface for logging. +type Logger interface { + Debug(key string, message string) + Out(key string, message string) + Error(key string, err error) +} + +// DefaultLogger is the default logger used by the library. +var DefaultLogger, _ = log.New("erroronly") diff --git a/rmq/consumer/message.go b/rmq/consumer/message.go deleted file mode 100644 index 165da32..0000000 --- a/rmq/consumer/message.go +++ /dev/null @@ -1,63 +0,0 @@ -package consumer - -import ( - "github.com/streadway/amqp" -) - -// IMessage interface contains methods implemented by the package -type IMessage interface { - GetID() string - IsRetry() bool - Ack(multiple bool) error - Nack(multiple bool, requeue bool) error - Reject(requeue bool) error - Headers() map[string]interface{} - Body() []byte -} - -// Message struct -type Message struct { - delivery amqp.Delivery -} - -// NewMessage returns a new Message -func NewMessage(d amqp.Delivery) IMessage { - return &Message{ - delivery: d, - } -} - -// GetID returns the message id -func (m *Message) GetID() string { - return m.delivery.MessageId -} - -// Ack acknowledges the message -func (m *Message) Ack(multiple bool) error { - return m.delivery.Ack(multiple) -} - -// Nack nacks the message with the option to Nack multiple messages -func (m *Message) Nack(multiple bool, requeue bool) error { - return m.delivery.Nack(multiple, requeue) -} - -// Reject rejects the message -func (m *Message) Reject(requeue bool) error { - return m.delivery.Reject(requeue) -} - -// Body returns the message body -func (m *Message) Body() []byte { - return m.delivery.Body -} - -// Headers returns the message headers -func (m *Message) Headers() map[string]interface{} { - return m.delivery.Headers -} - -// IsRetry returns true if the message is a retry of a previous message -func (m *Message) IsRetry() bool { - return m.delivery.Redelivered -} diff --git a/rmq/consumer/queue.go b/rmq/consumer/queue.go deleted file mode 100644 index a222d7a..0000000 --- a/rmq/consumer/queue.go +++ /dev/null @@ -1,15 +0,0 @@ -package consumer - -import "github.com/streadway/amqp" - -// NewQueue declares a new queue or ensures a queue exists that matches the provided configuration -func NewQueue(amqpChannel *amqp.Channel, queueConfig QueueConfig) (amqp.Queue, error) { - q, err := amqpChannel.QueueDeclare(queueConfig.Name, queueConfig.Durable, queueConfig.AutoDelete, queueConfig.Exclusive, queueConfig.NoWait, queueConfig.Args) - return q, err -} - -// NewQueueBind binds a queue to an exchange with a routing key -func NewQueueBind(amqpChannel *amqp.Channel, exchange string, queue string, routingKey string, noWait bool, args amqp.Table) error { - queueBindErr := amqpChannel.QueueBind(queue, routingKey, exchange, false, nil) - return queueBindErr -} diff --git a/rmq/consumer/table.go b/rmq/consumer/table.go new file mode 100644 index 0000000..1b351f4 --- /dev/null +++ b/rmq/consumer/table.go @@ -0,0 +1,41 @@ +package rabbitmq + +import amqp "github.com/rabbitmq/amqp091-go" + +// Table stores user supplied fields of the following types: +// +// bool +// byte +// float32 +// float64 +// int +// int16 +// int32 +// int64 +// nil +// string +// time.Time +// amqp.Decimal +// amqp.Table +// []byte +// []interface{} - containing above types +// +// Functions taking a table will immediately fail when the table contains a +// value of an unsupported type. +// +// The caller must be specific in which precision of integer it wishes to +// encode. +// +// Use a type assertion when reading values from a table for type conversion. +// +// RabbitMQ expects int32 for integer values. +// +type Table map[string]interface{} + +func tableToAMQPTable(table Table) amqp.Table { + new := amqp.Table{} + for k, v := range table { + new[k] = v + } + return new +} diff --git a/rmq/publisher/LICENSE b/rmq/publisher/LICENSE new file mode 100644 index 0000000..e1ffb92 --- /dev/null +++ b/rmq/publisher/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 Lane Wagner + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/rmq/publisher/README.md b/rmq/publisher/README.md index 291e6bd..3589806 100644 --- a/rmq/publisher/README.md +++ b/rmq/publisher/README.md @@ -1,41 +1,9 @@ -# RMQ Consumer +# RMQ Publisher -This library is a wrapper around RMQ functions to make interaction with RMQ simpler and safer +This library is a wrapper around RMQ functions to make interaction with RMQ simpler and safer. Functionality is tested with hosted CloudAMQP, which offers automated failover using connection with a single URI. -## Quickstart - -Refer to the below code snippet for how to set up a publisher called `test-publisher` publishing events to an exchange `test-exchange` with a routing key `test-routing-key`. -``` -package main - -import ( - "context" - "fmt" - "os" +Adapted from [go-rabbitmq](https://github.com/wagslane/go-rabbitmq) - "github.com/kelchy/go-lib/rmq/publisher" -) - -func main() { - // If using the default publisher config, you can just pass your publisher name and exchange name - p, err := publisher.New( - publisher.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), - publisher.DefaultConfig("test-publisher", "test-exchange"), - publisher.DefaultLogger()) - if err != nil { - // Publisher failed to create and connect - panic(err) - } +## Quickstart - // Publish a message - // Message to be published should be in a amqp Publishing object - // Can simply use the default message if you don't need to set any custom properties - message := []byte("test message from publisher") - messageId, pubErr := p.Publish(context.TODO(), "test-routing-key", publisher.DefaultPublishMessage(message)) - if pubErr != nil { - // Failed to publish message - panic(pubErr) - } - fmt.Println("Message published with id: ", messageId) -} -``` \ No newline at end of file +Refer to `example/publisher.example.go` to see how to setup your own publisher diff --git a/rmq/publisher/config.go b/rmq/publisher/config.go deleted file mode 100644 index ab2cd53..0000000 --- a/rmq/publisher/config.go +++ /dev/null @@ -1,81 +0,0 @@ -package publisher - -import ( - "fmt" - "time" - - "github.com/kelchy/go-lib/log" - "github.com/streadway/amqp" -) - -// Config is the configuration for publisher. -type Config struct { - // Name: name of the publisher queue. - Name string `json:"name" mapstructure:"name"` - // Exchange: exchange to publish to. - Exchange string `json:"exchange" mapstructure:"exchange"` - // Mandatory: if true, return an unroutable message with a Return method. - Mandatory bool `json:"mandatory" mapstructure:"mandatory"` - // Immediate: if true, request a delivery confirmation from the server. - Immediate bool `json:"immediate" mapstructure:"immediate"` - // AutoGenerateMessageID: if true, generate a message id for the message. - AutoGenerateMessageID bool `json:"auto_generate_message_id" mapstructure:"auto_generate_message_id"` - // PublisherConfirmed: if true, wait for publisher confirmation. - PublisherConfirmed bool `json:"publisher_confirmed" mapstructure:"publisher_confirmed"` - // Timeout: timeout for waiting for publisher confirmation. - Timeout time.Duration `json:"timeout" mapstructure:"timeout"` - // NoWait: if true, do not wait for the server to confirm the message. - NoWait bool `json:"no_wait" mapstructure:"no_wait"` -} - -// DefaultConfig is the default configuration for publisher. -func DefaultConfig(name string, exchange string) Config { - return Config{ - Name: name, - Exchange: exchange, - Mandatory: false, - Immediate: false, - AutoGenerateMessageID: true, - PublisherConfirmed: false, - Timeout: 5 * time.Second, - NoWait: false, - } -} - -// ConnectionConfig is the configuration for connection. -type ConnectionConfig struct { - // ConnURIs: list of connection URIs. - ConnURIs []string `json:"conn_uris" mapstructure:"conn_uris"` - // ReconnectInterval: interval between reconnect attempts. - ReconnectInterval time.Duration `json:"reconnect_interval" mapstructure:"reconnect_interval"` - // ReconnectMaxAttempt: max number of reconnect attempts. - ReconnectMaxAttempt int `json:"reconnect_max_attempt" mapstructure:"reconnect_max_attempt"` -} - -// DefaultConnectionConfig is the default configuration for connection. -func DefaultConnectionConfig(connURIs []string) ConnectionConfig { - return ConnectionConfig{ - ConnURIs: connURIs, - ReconnectInterval: 5 * time.Second, - ReconnectMaxAttempt: 3, - } -} - -// DefaultPublishMessage is the function to publish a default message. -func DefaultPublishMessage(message []byte) amqp.Publishing { - return amqp.Publishing{ - ContentType: "application/json", - DeliveryMode: amqp.Transient, - Timestamp: time.Now(), - Body: message, - } -} - -// DefaultLogger is the default logger for the package -func DefaultLogger() ILogger { - logger, err := log.New("standard") - if err != nil { - fmt.Println("failed to create logger: ", err) - } - return logger -} diff --git a/rmq/publisher/connection.go b/rmq/publisher/connection.go new file mode 100644 index 0000000..31d77ea --- /dev/null +++ b/rmq/publisher/connection.go @@ -0,0 +1,64 @@ +package rabbitmq + +import ( + "fmt" + + "github.com/kelchy/go-lib/rmq/publisher/internal/connectionmanager" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// Conn manages the connection to a rabbit cluster +// it is intended to be shared across publishers and consumers +type Conn struct { + connectionManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + + options ConnectionOptions +} + +// Config wraps amqp.Config +// Config is used in DialConfig and Open to specify the desired tuning +// parameters used during a connection open handshake. The negotiated tuning +// will be stored in the returned connection's Config field. +type Config amqp.Config + +// NewConn creates a new connection manager +func NewConn(url string, optionFuncs ...func(*ConnectionOptions)) (*Conn, error) { + defaultOptions := getDefaultConnectionOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + manager, err := connectionmanager.NewConnectionManager(url, amqp.Config(options.Config), options.Logger, options.ReconnectInterval) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := manager.NotifyReconnect() + conn := &Conn{ + connectionManager: manager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + options: *options, + } + + go conn.handleRestarts() + return conn, nil +} + +func (conn *Conn) handleRestarts() { + for err := range conn.reconnectErrCh { + conn.options.Logger.Out("OK_RMQ-CONNECTION_HANDLE-RESTART", fmt.Sprintf("successful connection recovery from: %v", err)) + } +} + +// Close closes the connection, it's not safe for re-use. +// You should also close any consumers and publishers before +// closing the connection +func (conn *Conn) Close() error { + conn.closeConnectionToManagerCh <- struct{}{} + return conn.connectionManager.Close() +} diff --git a/rmq/publisher/connection_options.go b/rmq/publisher/connection_options.go new file mode 100644 index 0000000..73a2e73 --- /dev/null +++ b/rmq/publisher/connection_options.go @@ -0,0 +1,53 @@ +package rabbitmq + +import ( + "time" + + "github.com/kelchy/go-lib/rmq/publisher/internal/logger" +) + +const defaultReconnectInterval = time.Second * 5 + +// ConnectionOptions are used to describe how a new consumer will be created. +type ConnectionOptions struct { + ReconnectInterval time.Duration + Logger logger.Logger + Config Config +} + +// getDefaultConnectionOptions describes the options that will be used when a value isn't provided +func getDefaultConnectionOptions() ConnectionOptions { + return ConnectionOptions{ + ReconnectInterval: defaultReconnectInterval, + Logger: logger.DefaultLogger, + Config: Config{}, + } +} + +// WithConnectionOptionsReconnectInterval sets the reconnection interval +func WithConnectionOptionsReconnectInterval(interval time.Duration) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.ReconnectInterval = interval + } +} + +// WithConnectionOptionsLogging sets logging to true on the consumer options +// and sets the +func WithConnectionOptionsLogging(options *ConnectionOptions) { + options.Logger = logger.DefaultLogger +} + +// WithConnectionOptionsLogger sets logging to true on the consumer options +// and sets the +func WithConnectionOptionsLogger(log logger.Logger) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.Logger = log + } +} + +// WithConnectionOptionsConfig sets the Config used in the connection +func WithConnectionOptionsConfig(cfg Config) func(options *ConnectionOptions) { + return func(options *ConnectionOptions) { + options.Config = cfg + } +} diff --git a/rmq/publisher/connectionpool.go b/rmq/publisher/connectionpool.go deleted file mode 100644 index 39783cd..0000000 --- a/rmq/publisher/connectionpool.go +++ /dev/null @@ -1,43 +0,0 @@ -package publisher - -import ( - "github.com/streadway/amqp" -) - -// IConnectionPool is the interface for connection pool -type IConnectionPool interface { - GetCon() (*amqp.Connection, error) -} - -type connectionPool struct { - uris []string - currentURIIndex int -} - -func newConnectionPool(uris ...string) IConnectionPool { - return &connectionPool{ - currentURIIndex: 0, - uris: uris, - } -} - -func (connPool *connectionPool) nextURI() (uri string) { - if connPool.currentURIIndex == len(connPool.uris)-1 { - uri = connPool.uris[connPool.currentURIIndex] - connPool.currentURIIndex = 0 - return - } - uri = connPool.uris[connPool.currentURIIndex] - connPool.currentURIIndex++ - return -} - -func (connPool *connectionPool) GetCon() (*amqp.Connection, error) { - var err error - uri := connPool.nextURI() - con, err := amqp.Dial(uri) - if err != nil { - return nil, err - } - return con, err -} diff --git a/rmq/publisher/connector.go b/rmq/publisher/connector.go deleted file mode 100644 index 2734ac8..0000000 --- a/rmq/publisher/connector.go +++ /dev/null @@ -1,113 +0,0 @@ -package publisher - -import ( - "context" - "fmt" - "time" - - "github.com/streadway/amqp" -) - -// Publisher is the publisher struct -type Publisher struct { - pubChan *amqp.Channel - pubConfig Config - conn *amqp.Connection - connPool IConnectionPool - errorConn chan *amqp.Error - errorPubChan chan *amqp.Error - logger ILogger -} - -// ILogger is the interface for logger -type ILogger interface { - Debug(key string, message string) - Out(key string, message string) - Error(key string, err error) -} - -// New creates a new publisher -func New(connConfig ConnectionConfig, pubConfig Config, logger ILogger) (*Publisher, error) { - publisher := Publisher{ - logger: logger, - pubConfig: pubConfig, - } - publisher.connPool = newConnectionPool(connConfig.ConnURIs...) - publisher.connect(connConfig) - go publisher.listenOnChanClose() - return &publisher, nil -} - -// Publish publishes a message to the queue -// Returns message id and error -func (p *Publisher) Publish(ctx context.Context, routingKey string, toPublish amqp.Publishing) (string, error) { - // Create a publishing object to be sent to RMQ - if p.pubConfig.AutoGenerateMessageID { - uuid, err := NewUUID() - if err != nil { - return "", fmt.Errorf("failed to auto generate message id: %v", err) - } - toPublish.MessageId = uuid - } - // Publish the message - pubErr := p.pubChan.Publish(p.pubConfig.Exchange, routingKey, p.pubConfig.Mandatory, p.pubConfig.Immediate, toPublish) - if pubErr != nil { - p.logger.Error("ERR_RMQ-PUBLISHER_FAIL-PUBLISH", pubErr) - } - return toPublish.MessageId, nil -} - -func (p *Publisher) connect(connConfig ConnectionConfig) error { - attempts := 0 - for attempts <= connConfig.ReconnectMaxAttempt { - p.logger.Out("RMQ-PUBLISHER", "Connecting to RabbitMQ") - // Make a connection to RMQ - conn, err := p.connPool.GetCon() - if err != nil { - p.logger.Error("ERR_RMQ-PUBLISHER_FAIL-CONNECT", err) - time.Sleep(connConfig.ReconnectInterval) - // Wait before retrying - continue - } - p.conn = conn - p.errorConn = make(chan *amqp.Error) - p.conn.NotifyClose(p.errorConn) - - // Open a channel for publishing - pubChan, pubChanErr := p.openChannel() - if pubChanErr != nil { - p.logger.Error("ERR_RMQ-PUBLISHER_FAIL-OPEN-CHANNEL", pubChanErr) - return pubChanErr - } - p.pubChan = pubChan - p.errorPubChan = make(chan *amqp.Error) - p.pubChan.NotifyClose(p.errorPubChan) - p.logger.Out("RMQ-PUBLISHER", "Connected to RabbitMQ") - return nil - } - return nil -} - -func (p *Publisher) openChannel() (*amqp.Channel, error) { - if p.conn == nil || p.conn.IsClosed() { - return nil, fmt.Errorf("connection is not open") - } - return p.conn.Channel() -} - -func (p *Publisher) listenOnChanClose() { - for { - select { - case err := <-p.errorPubChan: - if err != nil { - p.logger.Error("ERR_RMQ-PUBLISHER_FAIL-CHANNEL-CLOSE", err) - if !p.conn.IsClosed() { - errClose := p.conn.Close() - if errClose != nil { - p.logger.Error("ERR_RMQ-PUBLISHER_FAIL-CHANNEL-CLOSE", errClose) - } - } - } - } - } -} diff --git a/rmq/publisher/declare.go b/rmq/publisher/declare.go new file mode 100644 index 0000000..84c57d1 --- /dev/null +++ b/rmq/publisher/declare.go @@ -0,0 +1,37 @@ +package rabbitmq + +import "github.com/kelchy/go-lib/rmq/publisher/internal/channelmanager" + +func declareExchange(chanManager *channelmanager.ChannelManager, options ExchangeOptions) error { + if !options.Declare { + return nil + } + if options.Passive { + err := chanManager.ExchangeDeclarePassiveSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil + } + err := chanManager.ExchangeDeclareSafe( + options.Name, + options.Kind, + options.Durable, + options.AutoDelete, + options.Internal, + options.NoWait, + tableToAMQPTable(options.Args), + ) + if err != nil { + return err + } + return nil +} diff --git a/rmq/publisher/example/publisher.example.go b/rmq/publisher/example/publisher.example.go index 198747c..f6373a1 100644 --- a/rmq/publisher/example/publisher.example.go +++ b/rmq/publisher/example/publisher.example.go @@ -4,29 +4,116 @@ import ( "context" "fmt" "os" + "os/signal" + "syscall" + "time" - "github.com/kelchy/go-lib/rmq/publisher" + rabbitmq "github.com/kelchy/go-lib/rmq/publisher" ) func main() { - // If using the default publisher config, you can just pass your publisher name and exchange name - p, err := publisher.New( - publisher.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), - publisher.DefaultConfig("test-publisher", "test-exchange"), - publisher.DefaultLogger()) + conn, err := rabbitmq.NewConn( + os.Getenv("RMQ_URI"), + rabbitmq.WithConnectionOptionsLogging, + ) if err != nil { - // Publisher failed to create and connect - panic(err) + fmt.Println(err) + return } + defer conn.Close() - // Publish a message - // Message to be published should be in a amqp Publishing object - // Can simply use the default message if you don't need to set any custom properties - message := []byte("test message from publisher") - messageID, pubErr := p.Publish(context.TODO(), "test-routing-key", publisher.DefaultPublishMessage(message)) - if pubErr != nil { - // Failed to publish message - panic(pubErr) + publisher, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDurable, + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + fmt.Println(err) + return + } + defer publisher.Close() + + publisher.NotifyReturn(func(r rabbitmq.Return) { + // To handle returned messages + fmt.Println("err message returned from server: ", string(r.MessageId), (r.Body)) + }) + + publisher.NotifyPublish(func(c rabbitmq.Confirmation) { + fmt.Println("message confirmed from server. ack: ", c.Ack) + }) + + publisher2, err := rabbitmq.NewPublisher( + conn, + rabbitmq.WithPublisherOptionsLogging, + rabbitmq.WithPublisherOptionsExchangeName("events"), + rabbitmq.WithPublisherOptionsExchangeDurable, + rabbitmq.WithPublisherOptionsExchangeDeclare, + ) + if err != nil { + fmt.Println(err) + return + } + defer publisher2.Close() + + publisher2.NotifyReturn(func(r rabbitmq.Return) { + // To handle returned messages + fmt.Println("err message returned from server: ", string(r.MessageId), string(r.Body)) + }) + + publisher2.NotifyPublish(func(c rabbitmq.Confirmation) { + fmt.Println("message confirmed from server. ack: ", c.Ack) + }) + + // block main thread - wait for shutdown signal + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigs + fmt.Println() + fmt.Println(sig) + done <- true + }() + + fmt.Println("awaiting signal") + + ticker := time.NewTicker(time.Second * 5) + for { + select { + case <-ticker.C: + err = publisher.PublishWithContext( + context.Background(), + []byte("hello, world"), + []string{"test_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + rabbitmq.WithPublishOptionsAutoMessageID(), + ) + if err != nil { + fmt.Println(err) + } + err = publisher2.PublishWithContext( + context.Background(), + []byte("hello, world 2"), + []string{"test_routing_key"}, + rabbitmq.WithPublishOptionsContentType("application/json"), + rabbitmq.WithPublishOptionsMandatory, + rabbitmq.WithPublishOptionsPersistentDelivery, + rabbitmq.WithPublishOptionsExchange("events"), + rabbitmq.WithPublishOptionsAutoMessageID(), + ) + if err != nil { + fmt.Println(err) + } + case <-done: + fmt.Println("stopping publisher") + return + } } - fmt.Println("Message published with id: ", messageID) } diff --git a/rmq/publisher/exchange_options.go b/rmq/publisher/exchange_options.go new file mode 100644 index 0000000..cf5e255 --- /dev/null +++ b/rmq/publisher/exchange_options.go @@ -0,0 +1,16 @@ +package rabbitmq + +// ExchangeOptions are used to configure an exchange. +// If the Passive flag is set the client will only check if the exchange exists on the server +// and that the settings match, no creation attempt will be made. +type ExchangeOptions struct { + Name string + Kind string // possible values: empty string for default exchange or direct, topic, fanout + Durable bool + AutoDelete bool + Internal bool + NoWait bool + Passive bool // if false, a missing exchange will be created on the server + Args Table + Declare bool +} diff --git a/rmq/publisher/go.mod b/rmq/publisher/go.mod index 3326d9a..499da05 100644 --- a/rmq/publisher/go.mod +++ b/rmq/publisher/go.mod @@ -1,10 +1,10 @@ -//v0.0.4 +//v0.1.0 module github.com/kelchy/go-lib/rmq/publisher go 1.19 require ( github.com/google/uuid v1.3.0 - github.com/kelchy/go-lib/log v0.0.10 - github.com/streadway/amqp v1.0.0 + github.com/kelchy/go-lib/log v0.0.11 + github.com/rabbitmq/amqp091-go v1.8.0 ) diff --git a/rmq/publisher/go.sum b/rmq/publisher/go.sum index c84092b..59a958d 100644 --- a/rmq/publisher/go.sum +++ b/rmq/publisher/go.sum @@ -1,6 +1,22 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/kelchy/go-lib/log v0.0.10 h1:K2ilS1c3pHwzXuQhKbTYagiNPYJYaGJq6BHr8TjPM2g= -github.com/kelchy/go-lib/log v0.0.10/go.mod h1:08sbkvkTs1hFLUcHsOqCUXJBAF1VUrllqKkB3lmDEFM= -github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo= -github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= +github.com/kelchy/go-lib/log v0.0.11 h1:8WkPZNmOk+4UVZe8kI/lYaWOjXNWrdYCS5HD/YFc+T8= +github.com/kelchy/go-lib/log v0.0.11/go.mod h1:iD2C86ZX89OfM91lvvJtDaJm9BX+KqD5i2R9Fq7D404= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rabbitmq/amqp091-go v1.8.0 h1:GBFy5PpLQ5jSVVSYv8ecHGqeX7UTLYR4ItQbDCss9MM= +github.com/rabbitmq/amqp091-go v1.8.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= +go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/rmq/publisher/internal/channelmanager/channel_manager.go b/rmq/publisher/internal/channelmanager/channel_manager.go new file mode 100644 index 0000000..b447be1 --- /dev/null +++ b/rmq/publisher/internal/channelmanager/channel_manager.go @@ -0,0 +1,150 @@ +package channelmanager + +import ( + "errors" + "fmt" + "sync" + "time" + + "github.com/kelchy/go-lib/rmq/publisher/internal/connectionmanager" + "github.com/kelchy/go-lib/rmq/publisher/internal/dispatcher" + "github.com/kelchy/go-lib/rmq/publisher/internal/logger" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// ChannelManager - +type ChannelManager struct { + logger logger.Logger + channel *amqp.Channel + connManager *connectionmanager.ConnectionManager + channelMux *sync.RWMutex + reconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher +} + +// NewChannelManager creates a new connection manager +func NewChannelManager(connManager *connectionmanager.ConnectionManager, log logger.Logger, reconnectInterval time.Duration) (*ChannelManager, error) { + ch, err := getNewChannel(connManager) + if err != nil { + return nil, err + } + + chanManager := ChannelManager{ + logger: log, + connManager: connManager, + channel: ch, + channelMux: &sync.RWMutex{}, + reconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go chanManager.startNotifyCancelOrClosed() + return &chanManager, nil +} + +func getNewChannel(connManager *connectionmanager.ConnectionManager) (*amqp.Channel, error) { + conn := connManager.CheckoutConnection() + defer connManager.CheckinConnection() + + ch, err := conn.Channel() + if err != nil { + return nil, err + } + return ch, nil +} + +// startNotifyCancelOrClosed listens on the channel's cancelled and closed +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// channel +func (chanManager *ChannelManager) startNotifyCancelOrClosed() { + notifyCloseChan := chanManager.channel.NotifyClose(make(chan *amqp.Error, 1)) + notifyCancelChan := chanManager.channel.NotifyCancel(make(chan string, 1)) + + select { + case err := <-notifyCloseChan: + if err != nil { + chanManager.logger.Error("ERR_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", fmt.Errorf("attempting to reconnect to amqp server after close with error: %v", err)) + chanManager.reconnectLoop() + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", "successfully reconnected to amqp server after close") + chanManager.dispatcher.Dispatch(err) + } + if err == nil { + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", "amqp channel closed gracefully") + } + case err := <-notifyCancelChan: + chanManager.logger.Error("ERR_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", fmt.Errorf("attempting to reconnect to amqp server after cancel with error: %v", err)) + chanManager.reconnectLoop() + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_START-NOTIFY-CANCEL-OR-CLOSED", "successfully reconnected to amqp server after cancel") + chanManager.dispatcher.Dispatch(errors.New(err)) + } +} + +// GetReconnectionCount - +func (chanManager *ChannelManager) GetReconnectionCount() uint { + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + return chanManager.reconnectionCount +} + +func (chanManager *ChannelManager) incrementReconnectionCount() { + chanManager.reconnectionCountMux.Lock() + defer chanManager.reconnectionCountMux.Unlock() + chanManager.reconnectionCount++ +} + +// reconnectLoop continuously attempts to reconnect +func (chanManager *ChannelManager) reconnectLoop() { + for { + time.Sleep(chanManager.reconnectInterval) + err := chanManager.reconnect() + if err != nil { + chanManager.logger.Error("ERR_RMQ-CHAN-MANAGER_RECONNECT-LOOP", fmt.Errorf("error reconnecting to amqp server: %v", err)) + } else { + chanManager.incrementReconnectionCount() + go chanManager.startNotifyCancelOrClosed() + return + } + } +} + +// reconnect safely closes the current channel and obtains a new one +func (chanManager *ChannelManager) reconnect() error { + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + newChannel, err := getNewChannel(chanManager.connManager) + if err != nil { + return err + } + + if err = chanManager.channel.Close(); err != nil { + chanManager.logger.Error("WARN_RMQ-CHAN-MANAGER_RECONNECT", fmt.Errorf("error closing channel while reconnecting: %v", err)) + } + + chanManager.channel = newChannel + return nil +} + +// Close safely closes the current channel and connection +func (chanManager *ChannelManager) Close() error { + chanManager.logger.Out("OK_RMQ-CHAN-MANAGER_CLOSE", "closing channel manager...") + chanManager.channelMux.Lock() + defer chanManager.channelMux.Unlock() + + err := chanManager.channel.Close() + if err != nil { + return err + } + + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnect to the server +func (chanManager *ChannelManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return chanManager.dispatcher.AddSubscriber() +} diff --git a/rmq/publisher/internal/channelmanager/safe_wraps.go b/rmq/publisher/internal/channelmanager/safe_wraps.go new file mode 100644 index 0000000..ff99a3e --- /dev/null +++ b/rmq/publisher/internal/channelmanager/safe_wraps.go @@ -0,0 +1,237 @@ +package channelmanager + +import ( + "context" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// ConsumeSafe safely wraps the (*amqp.Channel).Consume method +func (chanManager *ChannelManager) ConsumeSafe( + queue, + consumer string, + autoAck, + exclusive, + noLocal, + noWait bool, + args amqp.Table, +) (<-chan amqp.Delivery, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Consume( + queue, + consumer, + autoAck, + exclusive, + noLocal, + noWait, + args, + ) +} + +// QueueDeclarePassiveSafe safely wraps the (*amqp.Channel).QueueDeclarePassive method +func (chanManager *ChannelManager) QueueDeclarePassiveSafe( + name string, + durable bool, + autoDelete bool, + exclusive bool, + noWait bool, + args amqp.Table, +) (amqp.Queue, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueDeclarePassive( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// QueueDeclareSafe safely wraps the (*amqp.Channel).QueueDeclare method +func (chanManager *ChannelManager) QueueDeclareSafe( + name string, durable bool, autoDelete bool, exclusive bool, noWait bool, args amqp.Table, +) (amqp.Queue, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueDeclare( + name, + durable, + autoDelete, + exclusive, + noWait, + args, + ) +} + +// ExchangeDeclarePassiveSafe safely wraps the (*amqp.Channel).ExchangeDeclarePassive method +func (chanManager *ChannelManager) ExchangeDeclarePassiveSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeDeclarePassive( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// ExchangeDeclareSafe safely wraps the (*amqp.Channel).ExchangeDeclare method +func (chanManager *ChannelManager) ExchangeDeclareSafe( + name string, kind string, durable bool, autoDelete bool, internal bool, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.ExchangeDeclare( + name, + kind, + durable, + autoDelete, + internal, + noWait, + args, + ) +} + +// QueueBindSafe safely wraps the (*amqp.Channel).QueueBind method +func (chanManager *ChannelManager) QueueBindSafe( + name string, key string, exchange string, noWait bool, args amqp.Table, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.QueueBind( + name, + key, + exchange, + noWait, + args, + ) +} + +// QosSafe safely wraps the (*amqp.Channel).Qos method +func (chanManager *ChannelManager) QosSafe( + prefetchCount int, prefetchSize int, global bool, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Qos( + prefetchCount, + prefetchSize, + global, + ) +} + +/* +PublishSafe safely wraps the (*amqp.Channel).Publish method. +*/ +func (chanManager *ChannelManager) PublishSafe( + exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithContext( + context.Background(), + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// PublishWithContextSafe safely wraps the (*amqp.Channel).PublishWithContext method. +func (chanManager *ChannelManager) PublishWithContextSafe( + ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithContext( + ctx, + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// PublishWithDeferredConfirmWithContextSafe safely wraps the (*amqp.Channel).PublishWithDeferredConfirmWithContext method. +func (chanManager *ChannelManager) PublishWithDeferredConfirmWithContextSafe( + ctx context.Context, exchange string, key string, mandatory bool, immediate bool, msg amqp.Publishing, +) (*amqp.DeferredConfirmation, error) { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.PublishWithDeferredConfirmWithContext( + ctx, + exchange, + key, + mandatory, + immediate, + msg, + ) +} + +// NotifyReturnSafe safely wraps the (*amqp.Channel).NotifyReturn method +func (chanManager *ChannelManager) NotifyReturnSafe( + c chan amqp.Return, +) chan amqp.Return { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyReturn( + c, + ) +} + +// ConfirmSafe safely wraps the (*amqp.Channel).Confirm method +func (chanManager *ChannelManager) ConfirmSafe( + noWait bool, +) error { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.Confirm( + noWait, + ) +} + +// NotifyPublishSafe safely wraps the (*amqp.Channel).NotifyPublish method +func (chanManager *ChannelManager) NotifyPublishSafe( + confirm chan amqp.Confirmation, +) chan amqp.Confirmation { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyPublish( + confirm, + ) +} + +// NotifyFlowSafe safely wraps the (*amqp.Channel).NotifyFlow method +func (chanManager *ChannelManager) NotifyFlowSafe( + c chan bool, +) chan bool { + chanManager.channelMux.RLock() + defer chanManager.channelMux.RUnlock() + + return chanManager.channel.NotifyFlow( + c, + ) +} diff --git a/rmq/publisher/internal/connectionmanager/connection_manager.go b/rmq/publisher/internal/connectionmanager/connection_manager.go new file mode 100644 index 0000000..3cb9e11 --- /dev/null +++ b/rmq/publisher/internal/connectionmanager/connection_manager.go @@ -0,0 +1,140 @@ +package connectionmanager + +import ( + "fmt" + "sync" + "time" + + "github.com/kelchy/go-lib/rmq/publisher/internal/dispatcher" + "github.com/kelchy/go-lib/rmq/publisher/internal/logger" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// ConnectionManager - +type ConnectionManager struct { + logger logger.Logger + url string + connection *amqp.Connection + amqpConfig amqp.Config + connectionMux *sync.RWMutex + ReconnectInterval time.Duration + reconnectionCount uint + reconnectionCountMux *sync.Mutex + dispatcher *dispatcher.Dispatcher +} + +// NewConnectionManager creates a new connection manager +func NewConnectionManager(url string, conf amqp.Config, log logger.Logger, reconnectInterval time.Duration) (*ConnectionManager, error) { + conn, err := amqp.DialConfig(url, amqp.Config(conf)) + if err != nil { + return nil, err + } + connManager := ConnectionManager{ + logger: log, + url: url, + connection: conn, + amqpConfig: conf, + connectionMux: &sync.RWMutex{}, + ReconnectInterval: reconnectInterval, + reconnectionCount: 0, + reconnectionCountMux: &sync.Mutex{}, + dispatcher: dispatcher.NewDispatcher(), + } + go connManager.startNotifyClose() + return &connManager, nil +} + +// Close safely closes the current channel and connection +func (connManager *ConnectionManager) Close() error { + connManager.logger.Out("OK_RMQ-CONN-MANAGER_CLOSE", "closing connection manager...") + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + + err := connManager.connection.Close() + if err != nil { + return err + } + return nil +} + +// NotifyReconnect adds a new subscriber that will receive error messages whenever +// the connection manager has successfully reconnected to the server +func (connManager *ConnectionManager) NotifyReconnect() (<-chan error, chan<- struct{}) { + return connManager.dispatcher.AddSubscriber() +} + +// CheckoutConnection - +func (connManager *ConnectionManager) CheckoutConnection() *amqp.Connection { + connManager.connectionMux.RLock() + return connManager.connection +} + +// CheckinConnection - +func (connManager *ConnectionManager) CheckinConnection() { + connManager.connectionMux.RUnlock() +} + +// startNotifyCancelOrClosed listens on the channel's cancelled and closed +// notifiers. When it detects a problem, it attempts to reconnect. +// Once reconnected, it sends an error back on the manager's notifyCancelOrClose +// channel +func (connManager *ConnectionManager) startNotifyClose() { + notifyCloseChan := connManager.connection.NotifyClose(make(chan *amqp.Error, 1)) + + err := <-notifyCloseChan + if err != nil { + connManager.logger.Error("ERR_RMQ-CONN-MANAGER_START-NOTIFY_CLOSE", fmt.Errorf("attempting to reconnect to amqp server after connection close with error: %v", err.Error())) + connManager.reconnectLoop() + connManager.logger.Out("OK_RMQ-CONN-MANAGER_START-NOTIFY-CLOSE", "successfully reconnected to amqp server") + connManager.dispatcher.Dispatch(err) + } + if err == nil { + connManager.logger.Out("OK_RMQ-CONN-MANAGER", "amqp connection closed gracefully") + } +} + +// GetReconnectionCount - +func (connManager *ConnectionManager) GetReconnectionCount() uint { + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + return connManager.reconnectionCount +} + +func (connManager *ConnectionManager) incrementReconnectionCount() { + connManager.reconnectionCountMux.Lock() + defer connManager.reconnectionCountMux.Unlock() + connManager.reconnectionCount++ +} + +// reconnectLoop continuously attempts to reconnect +func (connManager *ConnectionManager) reconnectLoop() { + for { + time.Sleep(connManager.ReconnectInterval) + err := connManager.reconnect() + if err != nil { + connManager.logger.Error("ERR_RMQ_RECONNECT-LOOP", fmt.Errorf("error reconnecting to amqp server: %v", err)) + } else { + connManager.incrementReconnectionCount() + go connManager.startNotifyClose() + return + } + } +} + +// reconnect safely closes the current channel and obtains a new one +func (connManager *ConnectionManager) reconnect() error { + connManager.connectionMux.Lock() + defer connManager.connectionMux.Unlock() + newConn, err := amqp.DialConfig(connManager.url, amqp.Config(connManager.amqpConfig)) + if err != nil { + return err + } + + if err = connManager.connection.Close(); err != nil { + connManager.logger.Error("WARN_RMQ-CONN-MANAGER_RECONNECT", fmt.Errorf("error closing connection while reconnecting: %v", err)) + } + + connManager.connection = newConn + return nil +} diff --git a/rmq/publisher/internal/connectionmanager/safe_wraps.go b/rmq/publisher/internal/connectionmanager/safe_wraps.go new file mode 100644 index 0000000..b6702af --- /dev/null +++ b/rmq/publisher/internal/connectionmanager/safe_wraps.go @@ -0,0 +1,17 @@ +package connectionmanager + +import ( + amqp "github.com/rabbitmq/amqp091-go" +) + +// NotifyBlockedSafe safely wraps the (*amqp.Connection).NotifyBlocked method +func (connManager *ConnectionManager) NotifyBlockedSafe( + receiver chan amqp.Blocking, +) chan amqp.Blocking { + connManager.connectionMux.RLock() + defer connManager.connectionMux.RUnlock() + + return connManager.connection.NotifyBlocked( + receiver, + ) +} diff --git a/rmq/publisher/internal/dispatcher/dispatcher.go b/rmq/publisher/internal/dispatcher/dispatcher.go new file mode 100644 index 0000000..52385c6 --- /dev/null +++ b/rmq/publisher/internal/dispatcher/dispatcher.go @@ -0,0 +1,72 @@ +package dispatcher + +import ( + "log" + "math" + "math/rand" + "sync" + "time" +) + +// Dispatcher - +type Dispatcher struct { + subscribers map[int]dispatchSubscriber + subscribersMux *sync.Mutex +} + +type dispatchSubscriber struct { + notifyCancelOrCloseChan chan error + closeCh <-chan struct{} +} + +// NewDispatcher - +func NewDispatcher() *Dispatcher { + return &Dispatcher{ + subscribers: make(map[int]dispatchSubscriber), + subscribersMux: &sync.Mutex{}, + } +} + +// Dispatch - +func (d *Dispatcher) Dispatch(err error) error { + d.subscribersMux.Lock() + defer d.subscribersMux.Unlock() + for _, subscriber := range d.subscribers { + select { + case <-time.After(time.Second * 5): + log.Println("Unexpected rabbitmq error: timeout in dispatch") + case subscriber.notifyCancelOrCloseChan <- err: + } + } + return nil +} + +// AddSubscriber - +func (d *Dispatcher) AddSubscriber() (<-chan error, chan<- struct{}) { + const maxRand = math.MaxInt + const minRand = 0 + id := rand.Intn(maxRand-minRand) + minRand + + closeCh := make(chan struct{}) + notifyCancelOrCloseChan := make(chan error) + + d.subscribersMux.Lock() + d.subscribers[id] = dispatchSubscriber{ + notifyCancelOrCloseChan: notifyCancelOrCloseChan, + closeCh: closeCh, + } + d.subscribersMux.Unlock() + + go func(id int) { + <-closeCh + d.subscribersMux.Lock() + defer d.subscribersMux.Unlock() + sub, ok := d.subscribers[id] + if !ok { + return + } + close(sub.notifyCancelOrCloseChan) + delete(d.subscribers, id) + }(id) + return notifyCancelOrCloseChan, closeCh +} diff --git a/rmq/publisher/internal/dispatcher/dispatcher_test.go b/rmq/publisher/internal/dispatcher/dispatcher_test.go new file mode 100644 index 0000000..afc5509 --- /dev/null +++ b/rmq/publisher/internal/dispatcher/dispatcher_test.go @@ -0,0 +1,34 @@ +package dispatcher + +import ( + "testing" + "time" +) + +func TestNewDispatcher(t *testing.T) { + d := NewDispatcher() + if d.subscribers == nil { + t.Error("Dispatcher subscribers is nil") + } + if d.subscribersMux == nil { + t.Error("Dispatcher subscribersMux is nil") + } +} + +func TestAddSubscriber(t *testing.T) { + d := NewDispatcher() + d.AddSubscriber() + if len(d.subscribers) != 1 { + t.Error("Dispatcher subscribers length is not 1") + } +} + +func TestCloseSubscriber(t *testing.T) { + d := NewDispatcher() + _, closeCh := d.AddSubscriber() + close(closeCh) + time.Sleep(time.Millisecond) + if len(d.subscribers) != 0 { + t.Error("Dispatcher subscribers length is not 0") + } +} diff --git a/rmq/publisher/helper.go b/rmq/publisher/internal/helper/uuid.go similarity index 93% rename from rmq/publisher/helper.go rename to rmq/publisher/internal/helper/uuid.go index 161ba31..9d766d3 100644 --- a/rmq/publisher/helper.go +++ b/rmq/publisher/internal/helper/uuid.go @@ -1,4 +1,4 @@ -package publisher +package uuid import ( "github.com/google/uuid" diff --git a/rmq/publisher/internal/logger/logger.go b/rmq/publisher/internal/logger/logger.go new file mode 100644 index 0000000..8a462c4 --- /dev/null +++ b/rmq/publisher/internal/logger/logger.go @@ -0,0 +1,13 @@ +package logger + +import "github.com/kelchy/go-lib/log" + +// Logger is a simple interface for logging. +type Logger interface { + Debug(key string, message string) + Out(key string, message string) + Error(key string, err error) +} + +// DefaultLogger is the default logger used by the library. +var DefaultLogger, _ = log.New("standard") diff --git a/rmq/publisher/publish.go b/rmq/publisher/publish.go new file mode 100644 index 0000000..9f21ec6 --- /dev/null +++ b/rmq/publisher/publish.go @@ -0,0 +1,344 @@ +package rabbitmq + +import ( + "context" + "errors" + "fmt" + "sync" + + "github.com/kelchy/go-lib/rmq/publisher/internal/channelmanager" + "github.com/kelchy/go-lib/rmq/publisher/internal/connectionmanager" + + amqp "github.com/rabbitmq/amqp091-go" +) + +// DeliveryMode. Transient means higher throughput but messages will not be +// restored on broker restart. The delivery mode of publishings is unrelated +// to the durability of the queues they reside on. Transient messages will +// not be restored to durable queues, persistent messages will be restored to +// durable queues and lost on non-durable queues during server restart. +// +// This remains typed as uint8 to match Publishing.DeliveryMode. Other +// delivery modes specific to custom queue implementations are not enumerated +// here. +const ( + Transient uint8 = amqp.Transient + Persistent uint8 = amqp.Persistent +) + +// Return captures a flattened struct of fields returned by the server when a +// Publishing is unable to be delivered either due to the `mandatory` flag set +// and no route found, or `immediate` flag set and no free consumer. +type Return struct { + amqp.Return +} + +// Confirmation notifies the acknowledgment or negative acknowledgement of a publishing identified by its delivery tag. +// Use NotifyPublish to consume these events. ReconnectionCount is useful in that each time it increments, the DeliveryTag +// is reset to 0, meaning you can use ReconnectionCount+DeliveryTag to ensure uniqueness +type Confirmation struct { + amqp.Confirmation + ReconnectionCount int +} + +// Publisher allows you to publish messages safely across an open connection +type Publisher struct { + chanManager *channelmanager.ChannelManager + connManager *connectionmanager.ConnectionManager + reconnectErrCh <-chan error + closeConnectionToManagerCh chan<- struct{} + + disablePublishDueToFlow bool + disablePublishDueToFlowMux *sync.RWMutex + + disablePublishDueToBlocked bool + disablePublishDueToBlockedMux *sync.RWMutex + + handlerMux *sync.Mutex + notifyReturnHandler func(r Return) + notifyPublishHandler func(p Confirmation) + + options PublisherOptions +} + +// PublisherConfirmation is a slice of Confirmations that can be used to wait for all of them to be confirmed +type PublisherConfirmation []*amqp.DeferredConfirmation + +// NewPublisher returns a new publisher with an open channel to the cluster. +// If you plan to enforce mandatory or immediate publishing, those failures will be reported +// on the channel of Returns that you should setup a listener on. +// Flow controls are automatically handled as they are sent from the server, and publishing +// will fail with an error when the server is requesting a slowdown +func NewPublisher(conn *Conn, optionFuncs ...func(*PublisherOptions)) (*Publisher, error) { + defaultOptions := getDefaultPublisherOptions() + options := &defaultOptions + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + + if conn.connectionManager == nil { + return nil, errors.New("connection manager can't be nil") + } + + chanManager, err := channelmanager.NewChannelManager(conn.connectionManager, options.Logger, conn.connectionManager.ReconnectInterval) + if err != nil { + return nil, err + } + + reconnectErrCh, closeCh := chanManager.NotifyReconnect() + publisher := &Publisher{ + chanManager: chanManager, + connManager: conn.connectionManager, + reconnectErrCh: reconnectErrCh, + closeConnectionToManagerCh: closeCh, + disablePublishDueToFlow: false, + disablePublishDueToFlowMux: &sync.RWMutex{}, + disablePublishDueToBlocked: false, + disablePublishDueToBlockedMux: &sync.RWMutex{}, + handlerMux: &sync.Mutex{}, + notifyReturnHandler: nil, + notifyPublishHandler: nil, + options: *options, + } + + err = publisher.startup() + if err != nil { + return nil, err + } + + go func() { + for err := range publisher.reconnectErrCh { + publisher.options.Logger.Out("OK_PUBLISH_RECONNECT", fmt.Sprintf("successful publisher recovery from: %v", err)) + err := publisher.startup() + if err != nil { + publisher.options.Logger.Error("ERR_PUBLISH_RECONNECT", fmt.Errorf("publisher closing, unable to recover from error on startup for publisher after cancel or close: %v", err)) + return + } + go publisher.startReturnHandler() + go publisher.startPublishHandler() + } + }() + + return publisher, nil +} + +func (publisher *Publisher) startup() error { + err := declareExchange(publisher.chanManager, publisher.options.ExchangeOptions) + if err != nil { + return fmt.Errorf("declare exchange failed: %w", err) + } + go publisher.startNotifyFlowHandler() + go publisher.startNotifyBlockedHandler() + return nil +} + +/* +Publish publishes the provided data to the given routing keys over the connection. +*/ +func (publisher *Publisher) Publish( + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), +) error { + return publisher.PublishWithContext(context.Background(), data, routingKeys, optionFuncs...) +} + +// PublishWithContext publishes the provided data to the given routing keys over the connection. +func (publisher *Publisher) PublishWithContext( + ctx context.Context, + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), +) error { + publisher.disablePublishDueToFlowMux.RLock() + defer publisher.disablePublishDueToFlowMux.RUnlock() + if publisher.disablePublishDueToFlow { + return fmt.Errorf("publishing blocked due to high flow on the server") + } + + publisher.disablePublishDueToBlockedMux.RLock() + defer publisher.disablePublishDueToBlockedMux.RUnlock() + if publisher.disablePublishDueToBlocked { + return fmt.Errorf("publishing blocked due to TCP block on the server") + } + + options := &PublishOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.DeliveryMode == 0 { + options.DeliveryMode = Transient + } + + for _, routingKey := range routingKeys { + message := amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration + message.ContentEncoding = options.ContentEncoding + message.Priority = options.Priority + message.CorrelationId = options.CorrelationID + message.ReplyTo = options.ReplyTo + message.MessageId = options.MessageID + message.Timestamp = options.Timestamp + message.Type = options.Type + message.UserId = options.UserID + message.AppId = options.AppID + + // Actual publish. + err := publisher.chanManager.PublishWithContextSafe( + ctx, + options.Exchange, + routingKey, + options.Mandatory, + options.Immediate, + message, + ) + if err != nil { + return err + } + } + return nil +} + +// PublishWithDeferredConfirmWithContext publishes the provided data to the given routing keys over the connection with a context. +func (publisher *Publisher) PublishWithDeferredConfirmWithContext( + ctx context.Context, + data []byte, + routingKeys []string, + optionFuncs ...func(*PublishOptions), +) (PublisherConfirmation, error) { + publisher.disablePublishDueToFlowMux.RLock() + defer publisher.disablePublishDueToFlowMux.RUnlock() + if publisher.disablePublishDueToFlow { + return nil, fmt.Errorf("publishing blocked due to high flow on the server") + } + + publisher.disablePublishDueToBlockedMux.RLock() + defer publisher.disablePublishDueToBlockedMux.RUnlock() + if publisher.disablePublishDueToBlocked { + return nil, fmt.Errorf("publishing blocked due to TCP block on the server") + } + + options := &PublishOptions{} + for _, optionFunc := range optionFuncs { + optionFunc(options) + } + if options.DeliveryMode == 0 { + options.DeliveryMode = Transient + } + + var deferredConfirmations []*amqp.DeferredConfirmation + + for _, routingKey := range routingKeys { + message := amqp.Publishing{} + message.ContentType = options.ContentType + message.DeliveryMode = options.DeliveryMode + message.Body = data + message.Headers = tableToAMQPTable(options.Headers) + message.Expiration = options.Expiration + message.ContentEncoding = options.ContentEncoding + message.Priority = options.Priority + message.CorrelationId = options.CorrelationID + message.ReplyTo = options.ReplyTo + message.MessageId = options.MessageID + message.Timestamp = options.Timestamp + message.Type = options.Type + message.UserId = options.UserID + message.AppId = options.AppID + + // Actual publish. + conf, err := publisher.chanManager.PublishWithDeferredConfirmWithContextSafe( + ctx, + options.Exchange, + routingKey, + options.Mandatory, + options.Immediate, + message, + ) + if err != nil { + return nil, err + } + deferredConfirmations = append(deferredConfirmations, conf) + } + return deferredConfirmations, nil +} + +// Close closes the publisher and releases resources +// The publisher should be discarded as it's not safe for re-use +// Only call Close() once +func (publisher *Publisher) Close() { + // close the channel so that rabbitmq server knows that the + // publisher has been stopped. + err := publisher.chanManager.Close() + if err != nil { + publisher.options.Logger.Error("WARN_RMQ-PUBLISH_CLOSE", fmt.Errorf("error while closing the channel: %v", err)) + } + publisher.options.Logger.Out("RMQ-PUBLISH_CLOSE", "closing publisher") + go func() { + publisher.closeConnectionToManagerCh <- struct{}{} + }() +} + +// NotifyReturn registers a listener for basic.return methods. +// These can be sent from the server when a publish is undeliverable either from the mandatory or immediate flags. +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (publisher *Publisher) NotifyReturn(handler func(r Return)) { + publisher.handlerMux.Lock() + start := publisher.notifyReturnHandler == nil + publisher.notifyReturnHandler = handler + publisher.handlerMux.Unlock() + + if start { + go publisher.startReturnHandler() + } +} + +// NotifyPublish registers a listener for publish confirmations, must set ConfirmPublishings option +// These notifications are shared across an entire connection, so if you're creating multiple +// publishers on the same connection keep that in mind +func (publisher *Publisher) NotifyPublish(handler func(p Confirmation)) { + publisher.handlerMux.Lock() + start := publisher.notifyPublishHandler == nil + publisher.notifyPublishHandler = handler + publisher.handlerMux.Unlock() + + if start { + go publisher.startPublishHandler() + } +} + +func (publisher *Publisher) startReturnHandler() { + publisher.handlerMux.Lock() + if publisher.notifyReturnHandler == nil { + publisher.handlerMux.Unlock() + return + } + publisher.handlerMux.Unlock() + + returns := publisher.chanManager.NotifyReturnSafe(make(chan amqp.Return, 1)) + for ret := range returns { + go publisher.notifyReturnHandler(Return{ret}) + } +} + +func (publisher *Publisher) startPublishHandler() { + publisher.handlerMux.Lock() + if publisher.notifyPublishHandler == nil { + publisher.handlerMux.Unlock() + return + } + publisher.handlerMux.Unlock() + + publisher.chanManager.ConfirmSafe(false) + confirmationCh := publisher.chanManager.NotifyPublishSafe(make(chan amqp.Confirmation, 1)) + for conf := range confirmationCh { + go publisher.notifyPublishHandler(Confirmation{ + Confirmation: conf, + ReconnectionCount: int(publisher.chanManager.GetReconnectionCount()), + }) + } +} diff --git a/rmq/publisher/publish_flow_block.go b/rmq/publisher/publish_flow_block.go new file mode 100644 index 0000000..1698793 --- /dev/null +++ b/rmq/publisher/publish_flow_block.go @@ -0,0 +1,45 @@ +package rabbitmq + +import ( + "fmt" + + amqp "github.com/rabbitmq/amqp091-go" +) + +func (publisher *Publisher) startNotifyFlowHandler() { + notifyFlowChan := publisher.chanManager.NotifyFlowSafe(make(chan bool)) + publisher.disablePublishDueToFlowMux.Lock() + publisher.disablePublishDueToFlow = false + publisher.disablePublishDueToFlowMux.Unlock() + + for ok := range notifyFlowChan { + publisher.disablePublishDueToFlowMux.Lock() + if ok { + publisher.options.Logger.Error("WARN_RMQ-PUBLISH-FLOW_START-NOTIFY-FLOW-HANDLER", fmt.Errorf("pausing publishing due to flow request from server")) + publisher.disablePublishDueToFlow = true + } else { + publisher.disablePublishDueToFlow = false + publisher.options.Logger.Error("WARN_RMQ-PUBLISH-FLOW_START-NOTIFY-FLOW-HANDLER", fmt.Errorf("resuming publishing due to flow request from server")) + } + publisher.disablePublishDueToFlowMux.Unlock() + } +} + +func (publisher *Publisher) startNotifyBlockedHandler() { + blockings := publisher.connManager.NotifyBlockedSafe(make(chan amqp.Blocking)) + publisher.disablePublishDueToBlockedMux.Lock() + publisher.disablePublishDueToBlocked = false + publisher.disablePublishDueToBlockedMux.Unlock() + + for b := range blockings { + publisher.disablePublishDueToBlockedMux.Lock() + if b.Active { + publisher.options.Logger.Error("WARN_RMQ-PUBLISH-FLOW_START-NOTIFY-BLOCKED-HANDLER", fmt.Errorf("pausing publishing due to TCP blocking from server")) + publisher.disablePublishDueToBlocked = true + } else { + publisher.disablePublishDueToBlocked = false + publisher.options.Logger.Error("WARN_RMQ-PUBLISH-FLOW_START-NOTIFY-BLOCKED-HANDLER", fmt.Errorf("resuming publishing due to TCP blocking from server")) + } + publisher.disablePublishDueToBlockedMux.Unlock() + } +} diff --git a/rmq/publisher/publish_options.go b/rmq/publisher/publish_options.go new file mode 100644 index 0000000..f1dd39b --- /dev/null +++ b/rmq/publisher/publish_options.go @@ -0,0 +1,166 @@ +package rabbitmq + +import ( + "time" + + uuid "github.com/kelchy/go-lib/rmq/publisher/internal/helper" +) + +// PublishOptions are used to control how data is published +type PublishOptions struct { + Exchange string + // Mandatory fails to publish if there are no queues + // bound to the routing key + Mandatory bool + // Immediate fails to publish if there are no consumers + // that can ack bound to the queue on the routing key + Immediate bool + // MIME content type + ContentType string + // Transient (0 or 1) or Persistent (2) + DeliveryMode uint8 + // Expiration time in ms that a message will expire from a queue. + // See https://www.rabbitmq.com/ttl.html#per-message-ttl-in-publishers + Expiration string + // MIME content encoding + ContentEncoding string + // 0 to 9 + Priority uint8 + // correlation identifier + CorrelationID string + // address to to reply to (ex: RPC) + ReplyTo string + // message identifier + MessageID string + // message timestamp + Timestamp time.Time + // message type name + Type string + // creating user id - ex: "guest" + UserID string + // creating application id + AppID string + // Application or exchange specific fields, + // the headers exchange will inspect this field. + Headers Table +} + +// WithPublishOptionsExchange returns a function that sets the exchange to publish to +func WithPublishOptionsExchange(exchange string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Exchange = exchange + } +} + +// WithPublishOptionsMandatory makes the publishing mandatory, which means when a queue is not +// bound to the routing key a message will be sent back on the returns channel for you to handle +func WithPublishOptionsMandatory(options *PublishOptions) { + options.Mandatory = true +} + +// WithPublishOptionsImmediate makes the publishing immediate, which means when a consumer is not available +// to immediately handle the new message, a message will be sent back on the returns channel for you to handle +func WithPublishOptionsImmediate(options *PublishOptions) { + options.Immediate = true +} + +// WithPublishOptionsContentType returns a function that sets the content type, i.e. "application/json" +func WithPublishOptionsContentType(contentType string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.ContentType = contentType + } +} + +// WithPublishOptionsPersistentDelivery sets the message to persist. Transient messages will +// not be restored to durable queues, persistent messages will be restored to +// durable queues and lost on non-durable queues during server restart. By default publishings +// are transient +func WithPublishOptionsPersistentDelivery(options *PublishOptions) { + options.DeliveryMode = Persistent +} + +// WithPublishOptionsExpiration returns a function that sets the expiry/TTL of a message. As per RabbitMq spec, it must be a +// string value in milliseconds. +func WithPublishOptionsExpiration(expiration string) func(options *PublishOptions) { + return func(options *PublishOptions) { + options.Expiration = expiration + } +} + +// WithPublishOptionsHeaders returns a function that sets message header values, i.e. "msg-id" +func WithPublishOptionsHeaders(headers Table) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Headers = headers + } +} + +// WithPublishOptionsContentEncoding returns a function that sets the content encoding, i.e. "utf-8" +func WithPublishOptionsContentEncoding(contentEncoding string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.ContentEncoding = contentEncoding + } +} + +// WithPublishOptionsPriority returns a function that sets the content priority from 0 to 9 +func WithPublishOptionsPriority(priority uint8) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Priority = priority + } +} + +// WithPublishOptionsCorrelationID returns a function that sets the content correlation identifier +func WithPublishOptionsCorrelationID(correlationID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.CorrelationID = correlationID + } +} + +// WithPublishOptionsReplyTo returns a function that sets the reply to field +func WithPublishOptionsReplyTo(replyTo string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.ReplyTo = replyTo + } +} + +// WithPublishOptionsMessageID returns a function that sets the message identifier +func WithPublishOptionsMessageID(messageID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.MessageID = messageID + } +} + +// WithPublishOptionsAutoMessageID returns a function that sets the message identifier to an auto generated value +func WithPublishOptionsAutoMessageID() func(*PublishOptions) { + return func(options *PublishOptions) { + msgID, _ := uuid.NewUUID() + options.MessageID = msgID + } +} + +// WithPublishOptionsTimestamp returns a function that sets the timestamp for the message +func WithPublishOptionsTimestamp(timestamp time.Time) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Timestamp = timestamp + } +} + +// WithPublishOptionsType returns a function that sets the message type name +func WithPublishOptionsType(messageType string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.Type = messageType + } +} + +// WithPublishOptionsUserID returns a function that sets the user id i.e. "user" +func WithPublishOptionsUserID(userID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.UserID = userID + } +} + +// WithPublishOptionsAppID returns a function that sets the application id +func WithPublishOptionsAppID(appID string) func(*PublishOptions) { + return func(options *PublishOptions) { + options.AppID = appID + } +} diff --git a/rmq/publisher/publisher_options.go b/rmq/publisher/publisher_options.go new file mode 100644 index 0000000..e7c336e --- /dev/null +++ b/rmq/publisher/publisher_options.go @@ -0,0 +1,95 @@ +package rabbitmq + +import ( + "github.com/kelchy/go-lib/rmq/publisher/internal/logger" + amqp "github.com/rabbitmq/amqp091-go" +) + +// PublisherOptions are used to describe a publisher's configuration. +// Logger is a custom logging interface. +type PublisherOptions struct { + ExchangeOptions ExchangeOptions + Logger logger.Logger +} + +// getDefaultPublisherOptions describes the options that will be used when a value isn't provided +func getDefaultPublisherOptions() PublisherOptions { + return PublisherOptions{ + ExchangeOptions: ExchangeOptions{ + Name: "", + Kind: amqp.ExchangeDirect, + Durable: false, + AutoDelete: false, + Internal: false, + NoWait: false, + Passive: false, + Args: Table{}, + Declare: false, + }, + Logger: logger.DefaultLogger, + } +} + +// WithPublisherOptionsLogging sets logging to true on the publisher options +func WithPublisherOptionsLogging(options *PublisherOptions) { + options.Logger = logger.DefaultLogger +} + +// WithPublisherOptionsLogger sets logging to a custom interface. +// Use WithPublisherOptionsLogging to just log to stdout. +func WithPublisherOptionsLogger(log logger.Logger) func(options *PublisherOptions) { + return func(options *PublisherOptions) { + options.Logger = log + } +} + +// WithPublisherOptionsExchangeName sets the exchange name +func WithPublisherOptionsExchangeName(name string) func(*PublisherOptions) { + return func(options *PublisherOptions) { + options.ExchangeOptions.Name = name + } +} + +// WithPublisherOptionsExchangeKind ensures the queue is a durable queue +func WithPublisherOptionsExchangeKind(kind string) func(*PublisherOptions) { + return func(options *PublisherOptions) { + options.ExchangeOptions.Kind = kind + } +} + +// WithPublisherOptionsExchangeDurable ensures the exchange is a durable exchange +func WithPublisherOptionsExchangeDurable(options *PublisherOptions) { + options.ExchangeOptions.Durable = true +} + +// WithPublisherOptionsExchangeAutoDelete ensures the exchange is an auto-delete exchange +func WithPublisherOptionsExchangeAutoDelete(options *PublisherOptions) { + options.ExchangeOptions.AutoDelete = true +} + +// WithPublisherOptionsExchangeInternal ensures the exchange is an internal exchange +func WithPublisherOptionsExchangeInternal(options *PublisherOptions) { + options.ExchangeOptions.Internal = true +} + +// WithPublisherOptionsExchangeNoWait ensures the exchange is a no-wait exchange +func WithPublisherOptionsExchangeNoWait(options *PublisherOptions) { + options.ExchangeOptions.NoWait = true +} + +// WithPublisherOptionsExchangeDeclare stops this library from declaring the exchanges existance +func WithPublisherOptionsExchangeDeclare(options *PublisherOptions) { + options.ExchangeOptions.Declare = true +} + +// WithPublisherOptionsExchangePassive ensures the exchange is a passive exchange +func WithPublisherOptionsExchangePassive(options *PublisherOptions) { + options.ExchangeOptions.Passive = true +} + +// WithPublisherOptionsExchangeArgs adds optional args to the exchange +func WithPublisherOptionsExchangeArgs(args Table) func(*PublisherOptions) { + return func(options *PublisherOptions) { + options.ExchangeOptions.Args = args + } +} diff --git a/rmq/publisher/table.go b/rmq/publisher/table.go new file mode 100644 index 0000000..1b351f4 --- /dev/null +++ b/rmq/publisher/table.go @@ -0,0 +1,41 @@ +package rabbitmq + +import amqp "github.com/rabbitmq/amqp091-go" + +// Table stores user supplied fields of the following types: +// +// bool +// byte +// float32 +// float64 +// int +// int16 +// int32 +// int64 +// nil +// string +// time.Time +// amqp.Decimal +// amqp.Table +// []byte +// []interface{} - containing above types +// +// Functions taking a table will immediately fail when the table contains a +// value of an unsupported type. +// +// The caller must be specific in which precision of integer it wishes to +// encode. +// +// Use a type assertion when reading values from a table for type conversion. +// +// RabbitMQ expects int32 for integer values. +// +type Table map[string]interface{} + +func tableToAMQPTable(table Table) amqp.Table { + new := amqp.Table{} + for k, v := range table { + new[k] = v + } + return new +}