/
rabbitmq.go
138 lines (113 loc) · 2.7 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package sink
import (
"strconv"
"time"
"os"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/streadway/amqp"
)
// RabbitmqSink ...
type RabbitmqSink struct {
conn *amqp.Connection
exchange string
routingKey string
workerCount int
stopCh chan interface{}
putCh chan []byte
}
// NewRabbitmq ...
func NewRabbitmq() (*RabbitmqSink, error) {
connStr := os.Getenv("SINK_AMQP_CONNECTION")
if connStr == "" {
return nil, fmt.Errorf("[sink/amqp] Missing SINK_AMQP_CONNECTION (example: amqp://guest:guest@127.0.0.1:5672/)")
}
exchange := os.Getenv("SINK_AMQP_EXCHANGE")
if exchange == "" {
return nil, fmt.Errorf("[sink/amqp] Missing SINK_AMQP_EXCHANGE")
}
routingKey := os.Getenv("SINK_AMQP_ROUTING_KEY")
if routingKey == "" {
return nil, fmt.Errorf("[sink/amqp] Mising SINK_AMQP_ROUTING_KEY")
}
workerCountStr := os.Getenv("SINK_AMQP_WORKERS")
if workerCountStr == "" {
workerCountStr = "1"
}
workerCount, err := strconv.Atoi(workerCountStr)
if err != nil {
return nil, fmt.Errorf("Invalid SINK_AMQP_WORKERS value, must be an integer")
}
conn, err := amqp.Dial(connStr)
if err != nil {
return nil, fmt.Errorf("[sink/amqp] Failed to connect to AMQP: %s", err)
}
return &RabbitmqSink{
conn: conn,
exchange: exchange,
routingKey: routingKey,
workerCount: workerCount,
stopCh: make(chan interface{}),
putCh: make(chan []byte, 1000),
}, nil
}
// Start ...
func (s *RabbitmqSink) Start() error {
// Stop chan for all tasks to depend on
s.stopCh = make(chan interface{})
for i := 0; i < s.workerCount; i++ {
go s.write(i)
}
// wait forever for a stop signal to happen
for {
select {
case <-s.stopCh:
break
}
break
}
return nil
}
// Stop ...
func (s *RabbitmqSink) Stop() {
log.Infof("[sink/amqp] ensure writer queue is empty (%d messages left)", len(s.putCh))
for len(s.putCh) > 0 {
log.Info("[sink/amqp] Waiting for queue to drain - (%d messages left)", len(s.putCh))
time.Sleep(1 * time.Second)
}
close(s.stopCh)
defer s.conn.Close()
}
// Put ..
func (s *RabbitmqSink) Put(data []byte) error {
s.putCh <- data
return nil
}
func (s *RabbitmqSink) write(id int) {
log.Infof("[sink/amqp/%d] Starting writer", id)
ch, err := s.conn.Channel()
if err != nil {
log.Error(err)
return
}
defer ch.Close()
for {
select {
case data := <-s.putCh:
err = ch.Publish(
s.exchange, // exchange
s.routingKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: data,
})
if err != nil {
log.Errorf("[sink/amqp/%d] %s", id, err)
} else {
log.Debugf("[sink/amqp/%d] publish ok", id)
}
}
}
}