-
Notifications
You must be signed in to change notification settings - Fork 39
/
live_amqp.go
183 lines (161 loc) · 5.13 KB
/
live_amqp.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
package common
import (
"context"
"encoding/json"
"fmt"
"log"
amqp "github.com/rabbitmq/amqp091-go"
)
var (
// AmqpChan is the AMQP channel handle we use for communication with our AMQP backend
AmqpChan *amqp.Channel
// UseAMQP switches between running in AMQP mode (true) or job queue server mode (false)
UseAMQP = false
)
// CloseMQChannel closes an open AMQP channel
func CloseMQChannel(channel *amqp.Channel) (err error) {
err = channel.Close()
return
}
// CloseMQConnection closes an open AMQP connection
func CloseMQConnection(connection *amqp.Connection) (err error) {
err = connection.Close()
return
}
// MQResponse sends an AMQP response back to its requester
func MQResponse(requestType string, msg amqp.Delivery, channel *amqp.Channel, nodeName string, responseData interface{}) (err error) {
var z []byte
z, err = json.Marshal(responseData)
if err != nil {
log.Println(err)
// It's super unlikely we can safely return here without ack-ing the message. So as something has gone
// wrong with json.Marshall() we'd better just attempt passing back info about that error message instead (!)
z = []byte(fmt.Sprintf(`{"node":"%s","error":"%s"}`, nodeName, err.Error())) // This is a LiveDBErrorResponse structure
}
// Send the message
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
amqp.Publishing{
ContentType: "text/json",
CorrelationId: msg.CorrelationId,
Body: z,
})
if err != nil {
log.Println(err)
}
// Acknowledge the request, so it doesn't stick around in the queue
err = msg.Ack(false)
if err != nil {
log.Println(err)
}
if JobQueueDebug > 0 {
log.Printf("[%s] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", requestType, nodeName, msg.CorrelationId, msg.ReplyTo)
return
}
return
}
// MQCreateDBQueue creates a queue on the MQ server for "create database" messages
func MQCreateDBQueue(channel *amqp.Channel) (queue amqp.Queue, err error) {
queue, err = channel.QueueDeclare("create_queue", true, false, false, false, nil)
if err != nil {
return
}
// FIXME: Re-read the docs for this, and work out if this is needed
err = channel.Qos(1, 0, false)
if err != nil {
return
}
return
}
// MQCreateQueryQueue creates a queue on the MQ server for sending database queries to
func MQCreateQueryQueue(channel *amqp.Channel, nodeName string) (queue amqp.Queue, err error) {
queue, err = channel.QueueDeclare(nodeName, false, false, false, false, nil)
if err != nil {
return
}
// FIXME: Re-read the docs for this, and work out if this is needed
err = channel.Qos(0, 0, false)
if err != nil {
return
}
return
}
// MQCreateResponse sends a success/failure response back
func MQCreateResponse(msg amqp.Delivery, channel *amqp.Channel, nodeName, result string) (err error) {
// Construct the response. It's such a simple string we just create it directly instead of using json.Marshall()
resp := fmt.Sprintf(`{"node":"%s","dbowner":"","dbname":"","result":"%s","error":""}`, nodeName, result)
// Send the message
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
err = channel.PublishWithContext(ctx, "", msg.ReplyTo, false, false,
amqp.Publishing{
ContentType: "text/json",
CorrelationId: msg.CorrelationId,
Body: []byte(resp),
})
if err != nil {
log.Println(err)
}
msg.Ack(false)
if JobQueueDebug > 0 {
log.Printf("[CREATE] Live node '%s' responded with ACK to message with correlationID: '%s', msg.ReplyTo: '%s'", nodeName, msg.CorrelationId, msg.ReplyTo)
}
return
}
// MQRequest is the main function used for sending requests to our AMQP backend
func MQRequest(channel *amqp.Channel, queue, operation, requestingUser, dbOwner, dbName string, data interface{}) (result []byte, err error) {
// Create a temporary AMQP queue for receiving the response
var q amqp.Queue
q, err = channel.QueueDeclare("", false, false, true, false, nil)
if err != nil {
return
}
// Construct the request
bar := LiveDBRequest{
Operation: operation,
DBOwner: dbOwner,
DBName: dbName,
Data: data,
RequestingUser: requestingUser,
}
var z []byte
z, err = json.Marshal(bar)
if err != nil {
log.Println(err)
return
}
// Send the request via AMQP
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
corrID := RandomString(32)
err = channel.PublishWithContext(ctx, "", queue, false, false,
amqp.Publishing{
ContentType: "text/json",
CorrelationId: corrID,
ReplyTo: q.Name,
Body: z,
})
if err != nil {
log.Println(err)
return
}
// Start processing messages from the AMQP response queue
msgs, err := channel.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return
}
// Wait for, then extract the response. Without json unmarshalling it yet
for d := range msgs {
if corrID == d.CorrelationId {
result = d.Body
break
}
}
// Delete the temporary queue
_, err = channel.QueueDelete(q.Name, false, false, false)
if err != nil {
log.Println(err)
}
return
}