-
Notifications
You must be signed in to change notification settings - Fork 316
/
redis.go
136 lines (122 loc) · 3.17 KB
/
redis.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package kvstoremanager
import (
"crypto/tls"
"crypto/x509"
"net/http"
"strconv"
"strings"
"github.com/go-redis/redis"
"github.com/rudderlabs/rudder-server/utils/types"
)
var abortableErrors = []string{}
type redisManagerT struct {
clusterMode bool
config types.ConfigT
client *redis.Client
clusterClient *redis.ClusterClient
}
func init() {
abortableErrors = []string{"connection refused", "invalid password"}
}
func (m *redisManagerT) Connect() {
var ok bool
if m.clusterMode, ok = m.config["clusterMode"].(bool); !ok {
// setting redis to cluster mode by default if setting missing in config
m.clusterMode = true
}
shouldSecureConn, _ := m.config["secure"].(bool)
addr, _ := m.config["address"].(string)
password, _ := m.config["password"].(string)
tlsConfig := tls.Config{}
if shouldSecureConn {
if skipServerCertCheck, ok := m.config["skipVerify"].(bool); ok && skipServerCertCheck {
tlsConfig.InsecureSkipVerify = true
}
if serverCACert, ok := m.config["caCertificate"].(string); ok && len(strings.TrimSpace(serverCACert)) > 0 {
caCert := []byte(serverCACert)
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
}
if m.clusterMode {
addrs := strings.Split(addr, ",")
for i := range addrs {
addrs[i] = strings.TrimSpace(addrs[i])
}
opts := redis.ClusterOptions{
Addrs: addrs,
Password: password,
}
if shouldSecureConn {
opts.TLSConfig = &tlsConfig
}
m.clusterClient = redis.NewClusterClient(&opts)
} else {
var db int
if dbStr, ok := m.config["database"].(string); ok {
db, _ = strconv.Atoi(dbStr)
}
opts := redis.Options{
Addr: strings.TrimSpace(addr),
Password: password,
DB: db,
}
if shouldSecureConn {
opts.TLSConfig = &tlsConfig
}
m.client = redis.NewClient(&opts)
}
}
func (m *redisManagerT) Close() error {
if m.clusterMode {
return m.clusterClient.Close()
}
return m.client.Close()
}
func (m *redisManagerT) HMSet(key string, fields map[string]interface{}) (err error) {
if m.clusterMode {
_, err = m.clusterClient.HMSet(key, fields).Result()
} else {
_, err = m.client.HMSet(key, fields).Result()
}
return err
}
func (*redisManagerT) StatusCode(err error) int {
if err == nil {
return http.StatusOK
}
statusCode := http.StatusInternalServerError
errorString := err.Error()
for _, s := range abortableErrors {
if strings.Contains(errorString, s) {
statusCode = 400
break
}
}
return statusCode
}
func (m *redisManagerT) DeleteKey(key string) (err error) {
if m.clusterMode {
_, err = m.clusterClient.Del(key).Result()
} else {
_, err = m.client.Del(key).Result()
}
return err
}
func (m *redisManagerT) HMGet(key string, fields ...string) (result []interface{}, err error) {
if m.clusterMode {
result, err = m.clusterClient.HMGet(key, fields...).Result()
} else {
result, err = m.client.HMGet(key, fields...).Result()
}
return result, err
}
func (m *redisManagerT) HGetAll(key string) (result map[string]string, err error) {
if m.clusterMode {
result, err = m.clusterClient.HGetAll(key).Result()
} else {
result, err = m.client.HGetAll(key).Result()
}
return result, err
}