/
database_dialer.go
127 lines (109 loc) · 3.06 KB
/
database_dialer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package redis
import (
"fmt"
"sync"
"time"
"github.com/moira-alert/moira"
"github.com/FZambia/sentinel"
"github.com/gomodule/redigo/redis"
)
// PoolDialer hides details of how connections are created and tested in a pool
type PoolDialer interface {
// Dial creates a connection
Dial() (redis.Conn, error)
// Test helps to check if a connection
Test(c redis.Conn, t time.Time) error
}
// DirectPoolDialer connects directly to Redis
type DirectPoolDialer struct {
serverAddress string
db int
dialTimeout time.Duration
}
// Dial connects directly to the server
func (dialer *DirectPoolDialer) Dial() (redis.Conn, error) {
return redis.Dial(
"tcp",
dialer.serverAddress,
redis.DialDatabase(dialer.db),
redis.DialConnectTimeout(dialer.dialTimeout),
)
}
// Test checks the connection by sending PING to the server
func (dialer *DirectPoolDialer) Test(c redis.Conn, t time.Time) error {
_, err := c.Do("PING")
return err
}
//SentinelPoolDialerConfig provides options to configure SentinelPoolDialer
type SentinelPoolDialerConfig struct {
MasterName string
SentinelAddresses []string
DB int
DialTimeout time.Duration
}
//NewSentinelPoolDialer returns new SentinelPoolDialer
func NewSentinelPoolDialer(logger moira.Logger, config SentinelPoolDialerConfig) *SentinelPoolDialer {
dialer := &SentinelPoolDialer{
logger: logger,
sentinel: &sentinel.Sentinel{
Addrs: config.SentinelAddresses,
MasterName: config.MasterName,
Dial: func(addr string) (redis.Conn, error) {
return redis.Dial(
"tcp",
addr,
redis.DialConnectTimeout(config.DialTimeout),
)
},
},
config: config,
}
go dialer.discoverLoop()
return dialer
}
// SentinelPoolDialer connects directly to Redis through sentinels
type SentinelPoolDialer struct {
logger moira.Logger
sentinel *sentinel.Sentinel
config SentinelPoolDialerConfig
lastMasterMutex sync.Mutex
lastMaster string
}
// Dial finds the master and connects to it
func (dialer *SentinelPoolDialer) Dial() (redis.Conn, error) {
masterAddr, err := dialer.sentinel.MasterAddr()
if err != nil {
return nil, err
}
dialer.refreshLastMaster(masterAddr)
return redis.Dial(
"tcp",
masterAddr,
redis.DialDatabase(dialer.config.DB),
redis.DialConnectTimeout(dialer.config.DialTimeout),
)
}
// Test checks if connection is alive and connected to the master
func (dialer *SentinelPoolDialer) Test(c redis.Conn, t time.Time) error {
if !sentinel.TestRole(c, "master") {
return fmt.Errorf("failed master role check")
}
return nil
}
func (dialer *SentinelPoolDialer) discoverLoop() {
checkTicker := time.NewTicker(30 * time.Second)
defer checkTicker.Stop()
for range checkTicker.C {
if err := dialer.sentinel.Discover(); err != nil {
dialer.logger.Error(err)
}
}
}
func (dialer *SentinelPoolDialer) refreshLastMaster(master string) {
dialer.lastMasterMutex.Lock()
defer dialer.lastMasterMutex.Unlock()
if master != dialer.lastMaster {
dialer.logger.Infof("Redis master discovered: %s", master)
dialer.lastMaster = master
}
}