-
Notifications
You must be signed in to change notification settings - Fork 0
/
connection.go
112 lines (99 loc) · 2.82 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package queue
import (
"fmt"
"github.com/streadway/amqp"
"log"
)
// TODO
// Need to make this configurable
var (
uri = "amqp://guest:guest@localhost:5672/"
exchangeName = "msgbox"
exchangeType = "direct"
reliable = true
)
// Create a AMQP Connection
// This allows the service to publish and consume from a
// queue. It should be closed using a defer after being opened.
//
// @return {*amqp.Connection}, {error v}
// @api public
func Connect() (*amqp.Connection, error) {
connection, err := amqp.Dial(uri)
if err != nil {
log.Fatalf("Dial: %s", err)
return nil, fmt.Errorf("Dial: %s", err)
}
// Declare the AMQP Topology to ensure it's
// setup to handle the correct exchanges and queues
t_err := declareTopology(connection)
if t_err != nil {
return nil, fmt.Errorf("Topology: %s", err)
}
return connection, nil
}
// Declare a Common Topology to ensure the structure exists
// and is correct before publishing or consuming against it
//
// @param {&amqp.Connection} connection
// @return {Error}
// @api private
func declareTopology(connection *amqp.Connection) error {
channel, err := connection.Channel()
if err != nil {
return fmt.Errorf("Channel: %s", err)
}
defer channel.Close()
if err = channel.ExchangeDeclare(
exchangeName, // name of the exchange
exchangeType, // type
true, // durable
false, // delete when complete
false, // internal
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Exchange Declare: %s", err)
}
// Create the "incoming_messages" queue
// This holds all the unprocessed incoming messages
// received by the relay
channel.QueueDeclare(
"incoming_messages", // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err = channel.QueueBind(
"incoming_messages", // name of the queue
"incoming", // bindingKey
exchangeName, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Queue Bind: %s", err)
}
// Create the "outgoing_messages" queue
// This holds all the unprocessed outgoing messages
// that have not been sent yet
channel.QueueDeclare(
"outgoing_messages", // name of the queue
true, // durable
false, // delete when usused
false, // exclusive
false, // noWait
nil, // arguments
)
if err = channel.QueueBind(
"outgoing_messages", // name of the queue
"outgoing", // bindingKey
exchangeName, // sourceExchange
false, // noWait
nil, // arguments
); err != nil {
return fmt.Errorf("Queue Bind: %s", err)
}
return nil
}