-
Notifications
You must be signed in to change notification settings - Fork 0
/
rabbit.go
135 lines (111 loc) · 3.63 KB
/
rabbit.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package brokers
import (
"context"
"encoding/json"
"fmt"
"github.com/pkg/errors"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/willdot/sendit/config"
"github.com/willdot/sendit/service"
)
// DestinationType is where the message will be sent, either directly to a queue or to an exchange
type DestinationType string
const (
DestinationTypeQueue DestinationType = "queue"
DestinationTypeExchange DestinationType = "exchange"
)
// RabbitPublisher is a publisher that can send messages to a RabbitMQ server
type RabbitPublisher struct {
conn *amqp.Connection
destinationType DestinationType
url string
}
// NewRabbitPublisher will create a connection to a RabbitMQ server. Shutdown on the returned publisher should be called
// to close the connection once finished
func NewRabbitPublisher(cfg *config.Config) (*RabbitPublisher, error) {
// create connection
conn, err := amqp.Dial(cfg.URL)
if err != nil {
return nil, errors.Wrap(err, "failed to open connection")
}
var destinationType DestinationType
switch cfg.Broker {
case config.RabbitExchangeBroker:
destinationType = DestinationTypeExchange
case config.RabbitQueueBroker:
destinationType = DestinationTypeQueue
default:
return nil, fmt.Errorf("invalid destination type provided '%s'. Should be either Queue or Exchange", cfg.Broker)
}
return &RabbitPublisher{
conn: conn,
destinationType: destinationType,
url: cfg.URL,
}, nil
}
// Shutdown will close the RabbitMQ connection
func (r *RabbitPublisher) Shutdown() {
r.conn.Close()
}
// Send will send the provided message
func (r *RabbitPublisher) Send(destinationName string, msg service.Message) error {
headers, err := convertRabbitHeaders(msg.Headers)
if err != nil {
return err
}
// open a channel
c, err := r.conn.Channel()
if err != nil {
return errors.Wrap(err, "failed to open channel")
}
defer c.Close()
switch r.destinationType {
case DestinationTypeExchange:
return r.publishToExchange(c, destinationName, msg.Body, headers)
case DestinationTypeQueue:
return r.publishToQueue(c, destinationName, msg.Body, headers)
default:
}
return nil
}
func convertRabbitHeaders(headerData []byte) (map[string]interface{}, error) {
if headerData == nil {
return nil, nil
}
var headers map[string]interface{}
err := json.Unmarshal(headerData, &headers)
if err != nil {
return nil, errors.Wrap(err, "failed to convert header data")
}
return headers, nil
}
func (r *RabbitPublisher) publishToExchange(c *amqp.Channel, exchangeName string, msg []byte, headers map[string]interface{}) error {
err := c.ExchangeDeclarePassive(exchangeName, "headers", false, false, false, false, nil)
if err != nil {
return errors.Wrap(err, "failed to declare exchange")
}
err = c.PublishWithContext(context.Background(), exchangeName, "", false, false, amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: msg,
})
if err != nil {
return errors.Wrapf(err, "failed to publish message to exchange '%s': %s\n", exchangeName, err.Error())
}
return nil
}
func (r *RabbitPublisher) publishToQueue(c *amqp.Channel, queueName string, msg []byte, headers map[string]interface{}) error {
queue, err := c.QueueDeclarePassive(queueName, false, false, false, false, nil)
if err != nil {
return errors.Wrap(err, "failed to declare queue")
}
err = c.PublishWithContext(context.Background(), "", queueName, false, false, amqp.Publishing{
Headers: headers,
ContentType: "application/json",
Body: msg,
})
if err != nil {
return errors.Wrapf(err, "failed to publish message to queue '%s': %s", queue.Name, err.Error())
}
return nil
}