forked from NeowayLabs/wabbit
-
Notifications
You must be signed in to change notification settings - Fork 0
/
dial.go
133 lines (106 loc) · 2.75 KB
/
dial.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package amqp
import (
"time"
"github.com/sbcd90/wabbit"
"github.com/sbcd90/wabbit/utils"
"github.com/streadway/amqp"
)
// Conn is the amqp connection
type Conn struct {
*amqp.Connection
// closure info of connection
dialFn func() error
attempts uint8
}
// Dial to AMQP broker
func Dial(uri string) (*Conn, error) {
conn := &Conn{}
// closure the uri for handle reconnects
conn.dialFn = func() error {
var err error
conn.Connection, err = amqp.Dial(uri)
if err != nil {
return err
}
return nil
}
err := conn.dialFn()
if err != nil {
return nil, err
}
return conn, nil
}
// NotifyClose registers a listener for close events.
// For more information see: https://godoc.org/github.com/streadway/amqp#Connection.NotifyClose
func (conn *Conn) NotifyClose(c chan wabbit.Error) chan wabbit.Error {
amqpErr := conn.Connection.NotifyClose(make(chan *amqp.Error, cap(c)))
go func() {
for err := range amqpErr {
var ne wabbit.Error
if err != nil {
ne = utils.NewError(
err.Code,
err.Reason,
err.Server,
err.Recover,
)
} else {
ne = nil
}
c <- ne
}
close(c)
}()
return c
}
// AutoRedial manages the automatic redial of connection when unexpected closed.
// outChan is an unbuffered channel required to receive the errors that results from
// attempts of reconnect. On successfully reconnected, the true value is sent to done channel
//
// The outChan parameter can receive *amqp.Error for AMQP connection errors
// or errors.Error for any other net/tcp internal error.
//
// Redial strategy:
// If the connection is closed in an unexpected way (opposite of conn.Close()), then
// AutoRedial will try to automatically reconnect waiting for N seconds before each
// attempt, where N is the number of attempts of reconnecting. If the number of
// attempts reach 60, it will be zero'ed.
func (conn *Conn) AutoRedial(outChan chan wabbit.Error, done chan bool) {
errChan2 := make(chan wabbit.Error)
errChan := conn.NotifyClose(errChan2)
go func() {
var err wabbit.Error
select {
case amqpErr := <-errChan:
err = amqpErr
if amqpErr == nil {
// Gracefull connection close
return
}
attempts:
outChan <- err
if conn.attempts > 60 {
conn.attempts = 0
}
// Wait n Seconds where n == conn.attempts...
time.Sleep(time.Duration(conn.attempts) * time.Second)
connErr := conn.dialFn()
if connErr != nil {
conn.attempts++
goto attempts
}
// enabled AutoRedial on the new connection
conn.AutoRedial(outChan, done)
done <- true
return
}
}()
}
// Channel returns a new channel ready to be used
func (conn *Conn) Channel() (wabbit.Channel, error) {
ch, err := conn.Connection.Channel()
if err != nil {
return nil, err
}
return &Channel{ch}, nil
}