forked from letsencrypt/boulder
/
connection.go
149 lines (134 loc) · 4.59 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package rpc
import (
"fmt"
"sync"
"time"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/streadway/amqp"
"github.com/letsencrypt/boulder/Godeps/_workspace/src/github.com/jmhodges/clock"
"github.com/letsencrypt/boulder/cmd"
"github.com/letsencrypt/boulder/core"
blog "github.com/letsencrypt/boulder/log"
)
func newAMQPConnector(
queueName string,
retryTimeoutBase time.Duration,
retryTimeoutMax time.Duration,
) *amqpConnector {
return &amqpConnector{
queueName: queueName,
routingKey: queueName,
chMaker: defaultChannelMaker{},
clk: clock.Default(),
retryTimeoutBase: retryTimeoutBase,
retryTimeoutMax: retryTimeoutMax,
}
}
// channelMaker encapsulates how to create an AMQP channel.
type channelMaker interface {
makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error)
}
type defaultChannelMaker struct{}
func (d defaultChannelMaker) makeChannel(conf *cmd.AMQPConfig) (amqpChannel, error) {
return makeAmqpChannel(conf)
}
// amqpConnector encapsulates an AMQP channel and a subscription to a specific
// queue, plus appropriate locking for its members. It provides reconnect logic,
// and allows publishing via the channel onto an arbitrary queue.
type amqpConnector struct {
mu sync.RWMutex
chMaker channelMaker
channel amqpChannel
queueName string
// Usually this is the same as queueName, except for Activity Monitor, which
// sets it to "#".
routingKey string
closeChan chan *amqp.Error
msgs <-chan amqp.Delivery
retryTimeoutBase time.Duration
retryTimeoutMax time.Duration
clk clock.Clock
}
func (ac *amqpConnector) messages() <-chan amqp.Delivery {
ac.mu.RLock()
defer ac.mu.RUnlock()
return ac.msgs
}
func (ac *amqpConnector) closeChannel() chan *amqp.Error {
ac.mu.RLock()
defer ac.mu.RUnlock()
return ac.closeChan
}
// connect attempts to connect to a channel and subscribe to the named queue,
// returning error if it fails. This is used at first startup, where we want to
// fail fast if we can't connect.
func (ac *amqpConnector) connect(config *cmd.AMQPConfig) error {
channel, err := ac.chMaker.makeChannel(config)
if err != nil {
return fmt.Errorf("channel connect failed for %s: %s", ac.queueName, err)
}
msgs, err := amqpSubscribe(channel, ac.queueName, ac.routingKey)
if err != nil {
return fmt.Errorf("queue subscribe failed for %s: %s", ac.queueName, err)
}
closeChan := channel.NotifyClose(make(chan *amqp.Error, 1))
ac.mu.Lock()
defer ac.mu.Unlock()
ac.channel = channel
ac.msgs = msgs
ac.closeChan = closeChan
return nil
}
// reconnect attempts repeatedly to connect and subscribe to the named queue. It
// will loop forever until it succeeds. This is used for a running server, where
// we don't want to shut down because we lost our AMQP connection.
func (ac *amqpConnector) reconnect(config *cmd.AMQPConfig, log blog.SyslogWriter) {
for i := 0; ; i++ {
ac.clk.Sleep(core.RetryBackoff(i, ac.retryTimeoutBase, ac.retryTimeoutMax, 2))
log.Info(fmt.Sprintf(" [!] attempting reconnect for %s", ac.queueName))
err := ac.connect(config)
if err != nil {
log.Warning(fmt.Sprintf(" [!] %s", err))
continue
}
break
}
log.Info(fmt.Sprintf(" [!] reconnect success for %s", ac.queueName))
return
}
// cancel cancels the AMQP channel. Used for graceful shutdowns.
func (ac *amqpConnector) cancel() {
ac.mu.RLock()
channel := ac.channel
ac.mu.RUnlock()
channel.Cancel(consumerName, false)
}
// publish publishes a message onto the provided queue. We provide this wrapper
// because it requires locking around the read of ac.channel.
func (ac *amqpConnector) publish(queueName, corrId, expiration, replyTo, msgType string, body []byte) error {
ac.mu.RLock()
channel := ac.channel
ac.mu.RUnlock()
return channel.Publish(
AmqpExchange,
queueName,
AmqpMandatory,
AmqpImmediate,
amqp.Publishing{
Body: body,
CorrelationId: corrId,
Expiration: expiration,
ReplyTo: replyTo,
Type: msgType,
Timestamp: ac.clk.Now(),
})
}
// amqpChannel defines the subset of amqp.Channel methods that we use in this
// package.
type amqpChannel interface {
Cancel(consumer string, noWait bool) error
Consume(queue, consumer string, autoAck, exclusive, noLocal, noWait bool, args amqp.Table) (<-chan amqp.Delivery, error)
NotifyClose(c chan *amqp.Error) chan *amqp.Error
Publish(exchange, key string, mandatory, immediate bool, msg amqp.Publishing) error
QueueBind(name, key, exchange string, noWait bool, args amqp.Table) error
QueueDeclare(name string, durable, autoDelete, exclusive, noWait bool, args amqp.Table) (amqp.Queue, error)
}