Skip to content

Commit

Permalink
Merge pull request #63 from kelchy/main
Browse files Browse the repository at this point in the history
Release rmq/consumer 0.0.7
  • Loading branch information
kelchy committed Mar 20, 2023
2 parents 3a8c31c + 12330b0 commit 2194c6a
Show file tree
Hide file tree
Showing 7 changed files with 106 additions and 8 deletions.
1 change: 0 additions & 1 deletion rmq/consumer/Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
SHELL := /bin/bash

.PHONY:

# this will install binary in ${GOPATH}
$(GOPATH)/bin/golint:
go install golang.org/x/lint/golint@v0.0.0-20201208152925-83fdc39ff7b5
Expand Down
41 changes: 36 additions & 5 deletions rmq/consumer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"github.com/kelchy/go-lib/log"
)

// Default constants for initialising default configs
const defaultReconnectInterval = 5 * time.Second
const defaultReconnectMaxAttempt = 3
const defaultPrefetchCount = 1
const defaultPrefetchSize = 0
const defaultRetryCountLimit = 2

// ConnectionConfig is the configuration for connection creation.
type ConnectionConfig struct {
// ConnURIs: list of connection URIs.
Expand All @@ -21,8 +28,8 @@ type ConnectionConfig struct {
func DefaultConnectionConfig(connURIs []string) ConnectionConfig {
return ConnectionConfig{
ConnURIs: connURIs,
ReconnectInterval: 5 * time.Second,
ReconnectMaxAttempt: 3,
ReconnectInterval: defaultReconnectInterval,
ReconnectMaxAttempt: defaultReconnectMaxAttempt,
}
}

Expand Down Expand Up @@ -54,12 +61,36 @@ func DefaultConfig(name string) Config {
NoWait: false,
Args: nil,
EnabledPrefetch: true,
PrefetchCount: 1,
PrefetchSize: 0,
PrefetchCount: defaultPrefetchCount,
PrefetchSize: defaultPrefetchSize,
Global: false,
}
}

// ExchangeConfig is the configuration for exchange creation.
type ExchangeConfig struct {
Name string `json:"name" mapstructure:"name"`
Kind string `json:"kind" mapstructure:"kind"`
Durable bool `json:"durable" mapstructure:"durable"`
AutoDelete bool `json:"auto_delete" mapstructure:"auto_delete"`
Internal bool `json:"internal" mapstructure:"internal"`
NoWait bool `json:"no_wait" mapstructure:"no_wait"`
Args map[string]interface{}
}

// DefaultExchangeConfig returns a default exchange configuration.
func DefaultExchangeConfig(name string, kind string) ExchangeConfig {
return ExchangeConfig{
Name: name,
Kind: kind,
Durable: true,
AutoDelete: false,
Internal: false,
NoWait: false,
Args: nil,
}
}

// QueueConfig is the configuration for queue creation.
type QueueConfig struct {
Name string `json:"name" mapstructure:"name"`
Expand Down Expand Up @@ -130,6 +161,6 @@ func DefaultMessageRetryConfig() MessageRetryConfig {
return MessageRetryConfig{
Enabled: true,
HandleDeadMessage: true,
RetryCountLimit: 2,
RetryCountLimit: defaultRetryCountLimit,
}
}
31 changes: 30 additions & 1 deletion rmq/consumer/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ type ILogger interface {
Error(key string, err error)
}

// New creates a new consumer
// New creates a new consumer, should use this by default
func New(connConfig ConnectionConfig, queueConfig QueueConfig, queueBindConfig QueueBindConfig, consumerConfig Config, msgRetryConfig MessageRetryConfig, processor IClientHandler, logger ILogger) error {
// Set up connection to RabbitMQ
c := Consumer{
Expand Down Expand Up @@ -82,6 +82,35 @@ func New(connConfig ConnectionConfig, queueConfig QueueConfig, queueBindConfig Q
return nil
}

// NewConnection creates a new connection to RabbitMQ. For use when you want to initialize queues and exchanges or verify that they are present
func NewConnection(connConfig ConnectionConfig, logger ILogger) (*amqp.Connection, error) {
attempts := 0
for attempts <= connConfig.ReconnectMaxAttempt {
logger.Out("RMQ-CONSUMER", "Connecting to RabbitMQ")
// Make a connection to RMQ
conn, err := amqp.Dial(connConfig.ConnURIs[0])
if err != nil {
logger.Error("ERR_RMQ-CONSUMER_FAIL-CONNECT", err)
time.Sleep(connConfig.ReconnectInterval)
// Wait before retrying
continue
}
logger.Out("RMQ-CONSUMER", "Connected to RabbitMQ")
return conn, nil
}
return nil, fmt.Errorf("failed to connect to RabbitMQ after %d attempts", connConfig.ReconnectMaxAttempt)
}

// OpenChannel opens a new channel on the connection
func OpenChannel(conn *amqp.Connection, logger ILogger) (*amqp.Channel, error) {
amqpchan, err := conn.Channel()
if err != nil {
logger.Error("ERR_RMQ-CONSUMER_FAIL-OPEN-CHANNEL", err)
return nil, err
}
return amqpchan, nil
}

func (c *Consumer) connect(connConfig ConnectionConfig) error {
attempts := 0
for attempts <= connConfig.ReconnectMaxAttempt {
Expand Down
21 changes: 21 additions & 0 deletions rmq/consumer/example/consumer.example.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,27 @@ func main() {
fmt.Println("failed to create consumer: ", err)
}

// If you want to verify the presence of queue and exchanges
// Not required if consumer is init using New() as above
conn, _ := consumer.NewConnection(consumer.DefaultConnectionConfig([]string{os.Getenv("RMQ_URI")}), consumer.DefaultLogger())
connChan, _ := conn.Channel()
exDeclareErr := consumer.NewExchange(connChan, consumer.DefaultExchangeConfig("test-exchange", "direct"))
_, queueDeclareErr := consumer.NewQueue(connChan, consumer.QueueConfig{
Name: "test-queue-logging",
Durable: true,
AutoDelete: true,
Exclusive: false,
NoWait: false,
Args: nil,
})
if exDeclareErr != nil {
fmt.Println("failed to declare exchange: ", exDeclareErr)
return
}
if queueDeclareErr != nil {
fmt.Println("failed to declare queue: ", queueDeclareErr)
return
}
// Leave the consumer running for 30 seconds before exiting, only for example purposes
time.Sleep(30 * time.Second)
}
Expand Down
9 changes: 9 additions & 0 deletions rmq/consumer/exchange.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package consumer

import "github.com/streadway/amqp"

// NewExchange creates a new exchange or ensures an exchange exists that matches the provided configuration
func NewExchange(amqpChannel *amqp.Channel, exchangeConfig ExchangeConfig) error {
err := amqpChannel.ExchangeDeclare(exchangeConfig.Name, exchangeConfig.Kind, exchangeConfig.Durable, exchangeConfig.AutoDelete, exchangeConfig.Internal, exchangeConfig.NoWait, exchangeConfig.Args)
return err
}
2 changes: 1 addition & 1 deletion rmq/consumer/go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
//v0.0.6
//v0.0.7
module github.com/kelchy/go-lib/rmq/consumer

go 1.19
Expand Down
9 changes: 9 additions & 0 deletions rmq/consumer/queue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package consumer

import "github.com/streadway/amqp"

// NewQueue declares a new queue or ensures a queue exists that matches the provided configuration
func NewQueue(amqpChannel *amqp.Channel, queueConfig QueueConfig) (amqp.Queue, error) {
q, err := amqpChannel.QueueDeclare(queueConfig.Name, queueConfig.Durable, queueConfig.AutoDelete, queueConfig.Exclusive, queueConfig.NoWait, queueConfig.Args)
return q, err
}

0 comments on commit 2194c6a

Please sign in to comment.