/
rabbitmq.go
142 lines (127 loc) · 3.17 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
package queue
import (
"errors"
"fmt"
"github.com/loafoe/go-rabbitmq"
"github.com/philips-software/go-hsdp-api/logging"
"github.com/streadway/amqp"
)
var (
Exchange = "logproxy"
RoutingKey = "new.rfc5424"
ErrInvalidProducer = errors.New("RabbitMQ producer is nil or invalid")
)
// RabbitMQ implements Queue backed by RabbitMQ
type RabbitMQ struct {
producer rabbitmq.Producer
resourceChannel chan logging.Resource
}
var _ Queue = &RabbitMQ{}
func consumerTag() string {
return "logproxy"
}
// RFC5424QueueName returns the queue name to use
func RFC5424QueueName() string {
return "logproxy_rfc5424"
}
func setupProducer() (rabbitmq.Producer, error) {
producer, err := rabbitmq.NewProducer(rabbitmq.Config{
Exchange: Exchange,
ExchangeType: "topic",
Durable: false,
})
if err != nil {
return nil, err
}
return producer, nil
}
func NewRabbitMQQueue(producers ...rabbitmq.Producer) (*RabbitMQ, error) {
var producer rabbitmq.Producer
var err error
resourceChannel := make(chan logging.Resource)
if len(producers) > 0 {
producer = producers[0]
} else {
producer, err = setupProducer()
}
if err != nil {
return nil, err
}
return &RabbitMQ{
producer: producer,
resourceChannel: resourceChannel,
}, nil
}
func (r RabbitMQ)Output() <-chan logging.Resource {
return r.resourceChannel
}
func (r RabbitMQ) Push(raw []byte) error {
if r.producer == nil {
return ErrInvalidProducer
}
err := r.producer.Publish(Exchange, RoutingKey, amqp.Publishing{
Headers: amqp.Table{},
ContentType: "application/octet-stream",
ContentEncoding: "",
Body: raw,
DeliveryMode: amqp.Transient, // 1=non-persistent, 2=persistent
Priority: 0, // 0-9
// a bunch of application/implementation-specific fields
})
if err != nil {
return err
}
return nil
}
func (r RabbitMQ) Start() (chan bool, error) {
doneChannel := make(chan bool)
// Consumer
consumer, err := rabbitmq.NewConsumer(rabbitmq.Config{
RoutingKey: RoutingKey,
Exchange: Exchange,
ExchangeType: "topic",
Durable: false,
AutoDelete: true,
QueueName: RFC5424QueueName(),
CTag: consumerTag(),
HandlerFunc: RabbitMQRFC5424Worker(r.resourceChannel, doneChannel),
})
if err != nil {
return nil, err
}
if err := consumer.Start(); err != nil {
return nil, err
}
return doneChannel, nil
}
func ackDelivery(d amqp.Delivery) {
err := d.Ack(true)
if err != nil {
fmt.Printf("Error Acking delivery: %v\n", err)
}
}
func RabbitMQRFC5424Worker(resourceChannel chan<- logging.Resource, done <-chan bool) rabbitmq.ConsumerHandlerFunc {
return func(deliveries <-chan amqp.Delivery, doneChannel <-chan bool) {
for {
select {
case d := <-deliveries:
resource, err := BodyToResource(d.Body)
ackDelivery(d)
if err != nil {
fmt.Printf("Error processing syslog message: %v\n", err)
continue
}
resourceChannel <- *resource
case <-doneChannel:
fmt.Printf("Worker received done message (worker)...\n")
case <-done:
fmt.Printf("Worker received done message (master)...\n")
return
}
}
}
}
func (r RabbitMQ) DeadLetter(msg logging.Resource) error {
// TODO: implement
return nil
}