-
-
Notifications
You must be signed in to change notification settings - Fork 36
/
reconnect.go
116 lines (91 loc) · 2.42 KB
/
reconnect.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
// Copy from: https://github.com/sirius1024/go-amqp-reconnect
package rabbit
import (
"fmt"
"time"
amqp "github.com/rabbitmq/amqp091-go"
"github.com/shortlink-org/shortlink/pkg/logger"
)
// Connection amqp.Connection wrapper
type Connection struct {
log logger.Logger
*amqp.Connection
delay int
}
// Channel wraps amqp.Connection.Channel to provide an auto-reconnecting channel.
func (c *Connection) Channel() (*Channel, error) {
ch, err := c.Connection.Channel()
if err != nil {
return nil, err
}
channel := &Channel{
Channel: ch,
delay: c.delay,
log: c.log,
}
go func() {
for {
reason, ok := <-channel.Channel.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok || channel.IsClosed() {
c.log.Error("channel closed")
err = channel.Close() // close again, ensure closed flag set when connection closed
if err != nil {
c.log.Error(err.Error())
}
break
}
c.log.Error(fmt.Sprintf("channel closed, reason: %v", reason))
// reconnect if not closed by developer
for {
// wait 1s for connection reconnect
time.Sleep(time.Duration(c.delay) * time.Second)
newCh, errConnectToChannel := c.Connection.Channel()
if errConnectToChannel == nil {
c.log.Info("channel recreate success")
channel.Channel = newCh
break
}
c.log.Error(fmt.Sprintf("channel recreate failed, err: %v", errConnectToChannel))
}
}
}()
return channel, nil
}
// Dial wraps amqp.Dial to establish a connection and set up automatic reconnection
// in case the connection is lost.
func (mq *MQ) Dial() error {
conn, err := amqp.Dial(mq.config.URI)
if err != nil {
return err
}
mq.conn = &Connection{
Connection: conn,
delay: mq.config.ReconnectTime,
log: mq.log,
}
go func() {
for {
reason, ok := <-mq.conn.NotifyClose(make(chan *amqp.Error))
// exit this goroutine if closed by developer
if !ok {
mq.log.Error("connection closed")
break
}
mq.log.Error(fmt.Sprintf("connection closed, reason: %v", reason))
// reconnect if not closed by developer
for {
// wait 1s for reconnect
time.Sleep(time.Duration(mq.config.ReconnectTime) * time.Second)
conn, err := amqp.Dial(mq.config.URI)
if err == nil {
mq.conn.Connection = conn
mq.log.Info("reconnect success")
break
}
mq.log.Error(fmt.Sprintf("reconnect failed, err: %v", err))
}
}
}()
return nil
}