forked from compose/transporter
-
Notifications
You must be signed in to change notification settings - Fork 0
/
writer.go
58 lines (51 loc) · 1.46 KB
/
writer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package rabbitmq
import (
"bytes"
"encoding/json"
"time"
"github.com/compose/transporter/client"
"github.com/compose/transporter/message"
"github.com/compose/transporter/message/ops"
"github.com/streadway/amqp"
)
const (
// DefaultDeliveryMode is used when writing messages to an exchange.
DefaultDeliveryMode = amqp.Transient
// DefaultRoutingKey is set to an empty string so all messages published to the exchange will
// get routed to whatever queues are bound to it.
DefaultRoutingKey = ""
)
var (
_ client.Writer = &Writer{}
)
// Writer implements client.Writer by publishing messages to the cluster based on its configuration.
type Writer struct {
DeliveryMode uint8
RoutingKey string
KeyInField bool
}
func (w *Writer) Write(msg message.Msg) func(client.Session) (message.Msg, error) {
return func(s client.Session) (message.Msg, error) {
if msg.OP() == ops.Insert || msg.OP() == ops.Update {
b := new(bytes.Buffer)
json.NewEncoder(b).Encode(msg.Data())
amqpMsg := amqp.Publishing{
DeliveryMode: w.DeliveryMode,
Timestamp: time.Unix(msg.Timestamp(), 0),
ContentType: "application/json",
Body: b.Bytes(),
}
if w.KeyInField {
return msg,
s.(*Session).channel.Publish(
msg.Namespace(),
msg.Data().Get(w.RoutingKey).(string),
false,
false,
amqpMsg)
}
return msg, s.(*Session).channel.Publish(msg.Namespace(), w.RoutingKey, false, false, amqpMsg)
}
return msg, nil
}
}