/
redirconn.go
309 lines (274 loc) · 7.47 KB
/
redirconn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package redicluster
import (
"context"
"errors"
"strconv"
"strings"
"sync"
"time"
"github.com/gomodule/redigo/redis"
)
// redirconn is a struct that implements redis.Conn interface, which supports most of commands for redis cluster,
// including redirection handling automatically, multikeys command(MSET/MGET) and pipeline. Meanwhile, redirconn
// also works well if you are using stand-alone redis.
const (
OpNil = 0
OpDO = 1
OpPipeLine = 2
)
type redirconn struct {
// cp is the pointer to the ClusterPool, immutable
cp *ClusterPool
// redir decides if the conn should handle redirecting, immutable
redir bool
// if this is a read only conn, immutable
readOnly bool
// protect the following members
mu sync.Mutex
// ppl is the pipeLiner (specified by Send() API)
ppl *pipeLiner
// lastRc is the last redis.Conn used by Do() API
lastRc redis.Conn
// lastAddr is the last node address used by Do() API
lastAddr string
// last operation
lastOp int
}
type RedirInfo struct {
// Kind indicates the redirection type, MOVED or ASK
Kind string
// Slot is the slot number of the redirecting
Slot int
// Addr is the node address to redirect to
Addr string
// Raw is the original error string
Raw string
}
// ParseRedirInfo parses the redirecting error into redirInfo
func ParseRedirInfo(err error) *RedirInfo {
re, ok := err.(redis.Error)
if !ok {
return nil
}
parts := strings.Fields(re.Error())
if len(parts) != 3 || (parts[0] != "MOVED" && parts[0] != "ASK") {
return nil
}
slot, err := strconv.Atoi(parts[1])
if err != nil {
return nil
}
return &RedirInfo{
Kind: parts[0],
Slot: slot,
Addr: parts[2],
Raw: re.Error(),
}
}
func (c *redirconn) hookDo(ctx context.Context, cmd string, args ...interface{}) (reply interface{}, err error, hooked bool) {
switch cmd {
case "MSET":
rep, err := multiset(ctx, c, args...)
return rep, err, true
case "MGET":
rep, err := multiget(ctx, c, args...)
return rep, err, true
default:
return nil, nil, false
}
}
func connDoContext(conn redis.Conn, ctx context.Context, cmd string, args ...interface{}) (interface{}, error) {
if conn == nil {
return nil, errors.New("invalid conn")
}
cwt, ok := conn.(redis.ConnWithContext)
if ok {
return cwt.DoContext(ctx, cmd, args...)
} else {
return conn.Do(cmd, args...)
}
}
func connReceiveWithContext(conn redis.Conn, ctx context.Context) (interface{}, error) {
if conn == nil {
return nil, errors.New("invalid conn")
}
cwt, ok := conn.(redis.ConnWithContext)
if ok {
return cwt.ReceiveContext(ctx)
} else {
return conn.Receive()
}
}
func connReceiveWithTimeout(conn redis.Conn, timeout time.Duration) (interface{}, error) {
if conn == nil {
return nil, errors.New("invalid conn")
}
cwt, ok := conn.(redis.ConnWithTimeout)
if ok {
return cwt.ReceiveWithTimeout(timeout)
} else {
return conn.Receive()
}
}
// Close closes the connection.
func (c *redirconn) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.ppl != nil {
return c.ppl.close()
}
if c.lastRc != nil {
c.lastRc.Close()
c.lastRc = nil
c.lastAddr = ""
}
return nil
}
// Err returns a non-nil value when the connection is not usable.
func (c *redirconn) Err() error {
c.mu.Lock()
defer c.mu.Unlock()
if c.lastOp == OpDO && c.lastRc != nil {
return c.lastRc.Err()
} else if c.lastOp == OpPipeLine && c.ppl != nil {
return c.ppl.err()
}
return nil
}
// Do sends a command and return the reply with context.Background() by calling DoContext
func (c *redirconn) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
return c.DoContext(context.Background(), cmd, args...)
}
// DoWithTimeout sends a command and return the reply with timeout by calling DoContext
func (c *redirconn) DoWithTimeout(timeout time.Duration, cmd string, args ...interface{}) (reply interface{}, err error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
return c.DoContext(ctx, cmd, args...)
}
func (c *redirconn) getConn(ctx context.Context, lastOp int, cmd string, args ...interface{}) (redis.Conn, error) {
var addr string
slot := CmdSlot(cmd, args...)
if slot < 0 {
c.mu.Lock()
// if slot=-1, then use the last addr and conn to request
addr = c.lastAddr
c.mu.Unlock()
}
if len(addr) == 0 {
addrs, err := c.cp.GetAddrsBySlots([]int{slot}, c.readOnly)
if err != nil {
return nil, err
}
if len(addrs) == 0 || len(addrs[0]) == 0 {
return nil, errors.New("empty node address")
}
addr = addrs[0]
}
c.mu.Lock()
c.lastOp = lastOp
if addr != c.lastAddr || c.lastRc == nil {
conn, err := c.cp.getRedisConnByAddrContext(ctx, addr)
if conn == nil {
c.mu.Unlock()
return nil, errors.New("invalid conn")
}
if err != nil {
c.mu.Unlock()
return nil, err
}
c.lastAddr = addr
if c.lastRc != nil {
c.lastRc.Close()
}
c.lastRc = conn
}
conn := c.lastRc
c.mu.Unlock()
return conn, nil
}
// DoContext sends a command to the server and returns the received reply.
// Request will be sent to the node automatically that redirection error indicates if redir is true and redirecting occurs
func (c *redirconn) DoContext(ctx context.Context, cmd string, args ...interface{}) (reply interface{}, err error) {
if repl, err, hooked := c.hookDo(ctx, cmd, args...); hooked {
return repl, err
}
conn, err := c.getConn(ctx, OpDO, cmd, args...)
if err != nil {
return nil, err
}
repl, err1 := connDoContext(conn, ctx, cmd, args...)
if err1 != nil {
ri := ParseRedirInfo(err1)
if ri != nil {
c.cp.onRedir(ri)
conn, err := c.cp.getRedisConnByAddrContext(ctx, ri.Addr)
if conn != nil && err == nil {
repl, err1 = connDoContext(conn, ctx, cmd, args...)
c.mu.Lock()
c.lastAddr = ri.Addr
c.lastRc = conn
c.mu.Unlock()
}
}
}
reply = repl
err = err1
return
}
// Send writes the command to the pipeLiner
func (c *redirconn) Send(cmd string, args ...interface{}) error {
c.mu.Lock()
if c.ppl == nil {
c.ppl = newPipeliner(c.cp)
}
c.lastOp = OpPipeLine
c.mu.Unlock()
return c.ppl.send(cmd, args...)
}
// Flush flushes the output buffer to the Redis server
func (c *redirconn) Flush() error {
c.mu.Lock()
if c.ppl == nil {
c.mu.Unlock()
return nil
}
c.lastOp = OpPipeLine
c.mu.Unlock()
return c.ppl.flush(context.Background())
}
// Receive receives a single reply from the pipeLiner.
// Note: this func is a non-block operation since pipeLiner just copy the reply to the caller from received buffer
func (c *redirconn) Receive() (reply interface{}, err error) {
c.mu.Lock()
if c.ppl == nil {
c.mu.Unlock()
return nil, errors.New("no send request before")
}
c.lastOp = OpPipeLine
c.mu.Unlock()
return c.ppl.receive()
}
// Receive receives a single reply from the pipeLiner with timeout.
// Note: just for implementing ConnWithTimeout, timeout will be ignored since there is no I/O operation in pipeLiner
func (c *redirconn) ReceiveWithTimeout(timeout time.Duration) (reply interface{}, err error) {
c.mu.Lock()
if c.ppl == nil {
c.mu.Unlock()
return nil, errors.New("no send request before")
}
c.lastOp = OpPipeLine
c.mu.Unlock()
return c.ppl.receive()
}
// Receive receives a single reply from the pipeLiner with context.
// Note: just for implementing ConnWithContext, ctx will be ignored since there is no I/O operation in pipeLiner
func (c *redirconn) ReceiveContext(ctx context.Context) (reply interface{}, err error) {
c.mu.Lock()
if c.ppl == nil {
c.mu.Unlock()
return nil, errors.New("no send request before")
}
c.lastOp = OpPipeLine
c.mu.Unlock()
return c.ppl.receive()
}