This repository has been archived by the owner on Oct 3, 2022. It is now read-only.
/
rabbitmq.go
101 lines (92 loc) · 2.17 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package internal
import (
"fmt"
"github.com/streadway/amqp"
)
const rabbitConnectionString = "amqp://guest:guest@messaging:5672/"
type RabbitMQ struct {
Connection *amqp.Connection
Channel *amqp.Channel
Queue amqp.Queue
}
func NewRabbitMQ(local bool) RabbitMQ {
fmt.Println("Attempting to connect to rabbitmq")
var connectionString string
if local {
fmt.Printf("Running in LOCAL mode, connecting to localhost...\n")
connectionString = "amqp://guest:guest@localhost:5672/"
} else {
fmt.Printf("Running in PRODUCTION mode, connecting to messaging...\n")
connectionString = rabbitConnectionString
}
conn, err := amqp.Dial(connectionString)
FailOnError(err, "Failed to connect to RabbitMQ")
fmt.Println("Successfully connected!")
return RabbitMQ{
Connection: conn,
Channel: nil,
Queue: amqp.Queue{
Name: "",
Messages: 0,
Consumers: 0,
},
}
}
func (rmq *RabbitMQ) CreateSubmissionChannel() {
channel, err := rmq.Connection.Channel()
FailOnError(err, "Failed to open a channel")
err = channel.ExchangeDeclare(
"submissions",
"direct",
false,
false,
false,
false,
nil)
FailOnError(err, "Failed to declare an exchange on the channel")
rmq.Channel = channel
}
func (rmq *RabbitMQ) DeclareAndBindQueue(n string) {
var err error
rmq.Queue, err = rmq.Channel.QueueDeclare(
n, // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
FailOnError(err, "Failed to declare a queue")
err = rmq.Channel.QueueBind(
n,
n,
"submissions",
false,
nil)
FailOnError(err, "Failed to bind the queue")
}
func (rmq *RabbitMQ) SetConsumer(k string) <-chan amqp.Delivery {
channel, err := rmq.Channel.Consume(
k,
"",
false,
false,
false,
false,
nil)
FailOnError(err, "Failed to register a consumer")
return channel
}
func (rmq *RabbitMQ) PublishMessage(c string, b []byte) error {
return rmq.Channel.Publish(
"submissions", // exchange
"requests", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
CorrelationId: c,
//ReplyTo: "responses",
Body: b,
})
}