Skip to content

Commit 2516433

Browse files
committed
Fix pool panic on slow connection with MaxRetries > 0.
1 parent 8ca66a5 commit 2516433

File tree

10 files changed

+112
-39
lines changed

10 files changed

+112
-39
lines changed

conn.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,8 @@ func (cn *conn) init(opt *Options) error {
4343
return nil
4444
}
4545

46-
// Use connection to connect to Redis.
47-
pool := newSingleConnPoolConn(cn)
48-
49-
// Client is not closed because we want to reuse underlying connection.
50-
client := newClient(opt, pool)
46+
// Temp client for Auth and Select.
47+
client := newClient(opt, newSingleConnPool(cn))
5148

5249
if opt.Password != "" {
5350
if err := client.Auth(opt.Password).Err(); err != nil {

conn_test.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
package redis_test
2+
3+
import (
4+
"net"
5+
6+
. "github.com/onsi/ginkgo"
7+
. "github.com/onsi/gomega"
8+
9+
"gopkg.in/redis.v3"
10+
)
11+
12+
var _ = Describe("newConnDialer with bad connection", func() {
13+
It("should return an error", func() {
14+
dialer := redis.NewConnDialer(&redis.Options{
15+
Dialer: func() (net.Conn, error) {
16+
return &badConn{}, nil
17+
},
18+
MaxRetries: 3,
19+
Password: "password",
20+
DB: 1,
21+
})
22+
_, err := dialer()
23+
Expect(err).To(MatchError("bad connection"))
24+
})
25+
})

export_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ func (c *baseClient) Pool() pool {
66
return c.connPool
77
}
88

9+
var NewConnDialer = newConnDialer
10+
911
func (cn *conn) SetNetConn(netcn net.Conn) {
1012
cn.netcn = netcn
1113
}

main_test.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,15 @@ func startSentinel(port, masterName, masterPort string) (*redisProcess, error) {
232232

233233
//------------------------------------------------------------------------------
234234

235-
var errTimeout = syscall.ETIMEDOUT
235+
var (
236+
errTimeout = syscall.ETIMEDOUT
237+
)
238+
239+
type badConnError string
240+
241+
func (e badConnError) Error() string { return string(e) }
242+
func (e badConnError) Timeout() bool { return false }
243+
func (e badConnError) Temporary() bool { return false }
236244

237245
type badConn struct {
238246
net.TCPConn
@@ -250,7 +258,7 @@ func (cn *badConn) Read([]byte) (int, error) {
250258
if cn.readErr != nil {
251259
return 0, cn.readErr
252260
}
253-
return 0, net.UnknownNetworkError("badConn")
261+
return 0, badConnError("bad connection")
254262
}
255263

256264
func (cn *badConn) Write([]byte) (int, error) {
@@ -260,5 +268,5 @@ func (cn *badConn) Write([]byte) (int, error) {
260268
if cn.writeErr != nil {
261269
return 0, cn.writeErr
262270
}
263-
return 0, net.UnknownNetworkError("badConn")
271+
return 0, badConnError("bad connection")
264272
}

multi.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (c *Client) Multi() *Multi {
2222
multi := &Multi{
2323
base: &baseClient{
2424
opt: c.opt,
25-
connPool: newSingleConnPool(c.connPool, true),
25+
connPool: newStickyConnPool(c.connPool, true),
2626
},
2727
}
2828
multi.commandable.process = multi.process

pool.go

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,52 @@ func (p *connPool) reaper() {
314314
//------------------------------------------------------------------------------
315315

316316
type singleConnPool struct {
317+
cn *conn
318+
}
319+
320+
func newSingleConnPool(cn *conn) *singleConnPool {
321+
return &singleConnPool{
322+
cn: cn,
323+
}
324+
}
325+
326+
func (p *singleConnPool) First() *conn {
327+
return p.cn
328+
}
329+
330+
func (p *singleConnPool) Get() (*conn, error) {
331+
return p.cn, nil
332+
}
333+
334+
func (p *singleConnPool) Put(cn *conn) error {
335+
if p.cn != cn {
336+
panic("p.cn != cn")
337+
}
338+
return nil
339+
}
340+
341+
func (p *singleConnPool) Remove(cn *conn) error {
342+
if p.cn != cn {
343+
panic("p.cn != cn")
344+
}
345+
return nil
346+
}
347+
348+
func (p *singleConnPool) Len() int {
349+
return 1
350+
}
351+
352+
func (p *singleConnPool) FreeLen() int {
353+
return 0
354+
}
355+
356+
func (p *singleConnPool) Close() error {
357+
return nil
358+
}
359+
360+
//------------------------------------------------------------------------------
361+
362+
type stickyConnPool struct {
317363
pool pool
318364
reusable bool
319365

@@ -322,27 +368,21 @@ type singleConnPool struct {
322368
mx sync.Mutex
323369
}
324370

325-
func newSingleConnPool(pool pool, reusable bool) *singleConnPool {
326-
return &singleConnPool{
371+
func newStickyConnPool(pool pool, reusable bool) *stickyConnPool {
372+
return &stickyConnPool{
327373
pool: pool,
328374
reusable: reusable,
329375
}
330376
}
331377

332-
func newSingleConnPoolConn(cn *conn) *singleConnPool {
333-
return &singleConnPool{
334-
cn: cn,
335-
}
336-
}
337-
338-
func (p *singleConnPool) First() *conn {
378+
func (p *stickyConnPool) First() *conn {
339379
p.mx.Lock()
340380
cn := p.cn
341381
p.mx.Unlock()
342382
return cn
343383
}
344384

345-
func (p *singleConnPool) Get() (*conn, error) {
385+
func (p *stickyConnPool) Get() (*conn, error) {
346386
defer p.mx.Unlock()
347387
p.mx.Lock()
348388

@@ -362,15 +402,13 @@ func (p *singleConnPool) Get() (*conn, error) {
362402
return p.cn, nil
363403
}
364404

365-
func (p *singleConnPool) put() (err error) {
366-
if p.pool != nil {
367-
err = p.pool.Put(p.cn)
368-
}
405+
func (p *stickyConnPool) put() (err error) {
406+
err = p.pool.Put(p.cn)
369407
p.cn = nil
370408
return err
371409
}
372410

373-
func (p *singleConnPool) Put(cn *conn) error {
411+
func (p *stickyConnPool) Put(cn *conn) error {
374412
defer p.mx.Unlock()
375413
p.mx.Lock()
376414
if p.cn != cn {
@@ -382,30 +420,32 @@ func (p *singleConnPool) Put(cn *conn) error {
382420
return nil
383421
}
384422

385-
func (p *singleConnPool) remove() (err error) {
386-
if p.pool != nil {
387-
err = p.pool.Remove(p.cn)
388-
}
423+
func (p *stickyConnPool) remove() (err error) {
424+
err = p.pool.Remove(p.cn)
389425
p.cn = nil
390426
return err
391427
}
392428

393-
func (p *singleConnPool) Remove(cn *conn) error {
429+
func (p *stickyConnPool) Remove(cn *conn) error {
394430
defer p.mx.Unlock()
395431
p.mx.Lock()
396432
if p.cn == nil {
397433
panic("p.cn == nil")
398434
}
399-
if cn != nil && cn != p.cn {
400-
panic("cn != p.cn")
435+
if cn != nil && p.cn != cn {
436+
panic("p.cn != cn")
401437
}
402438
if p.closed {
403439
return errClosed
404440
}
405-
return p.remove()
441+
if cn == nil {
442+
return p.remove()
443+
} else {
444+
return nil
445+
}
406446
}
407447

408-
func (p *singleConnPool) Len() int {
448+
func (p *stickyConnPool) Len() int {
409449
defer p.mx.Unlock()
410450
p.mx.Lock()
411451
if p.cn == nil {
@@ -414,7 +454,7 @@ func (p *singleConnPool) Len() int {
414454
return 1
415455
}
416456

417-
func (p *singleConnPool) FreeLen() int {
457+
func (p *stickyConnPool) FreeLen() int {
418458
defer p.mx.Unlock()
419459
p.mx.Lock()
420460
if p.cn == nil {
@@ -423,7 +463,7 @@ func (p *singleConnPool) FreeLen() int {
423463
return 0
424464
}
425465

426-
func (p *singleConnPool) Close() error {
466+
func (p *stickyConnPool) Close() error {
427467
defer p.mx.Unlock()
428468
p.mx.Lock()
429469
if p.closed {

pool_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"gopkg.in/redis.v3"
1212
)
1313

14-
var _ = Describe("Pool", func() {
14+
var _ = Describe("pool", func() {
1515
var client *redis.Client
1616

1717
var perform = func(n int, cb func()) {

pubsub.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func (c *Client) PubSub() *PubSub {
2929
return &PubSub{
3030
baseClient: &baseClient{
3131
opt: c.opt,
32-
connPool: newSingleConnPool(c.connPool, false),
32+
connPool: newStickyConnPool(c.connPool, false),
3333
},
3434
}
3535
}

redis_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ var _ = Describe("Client", func() {
161161
Expect(err).NotTo(HaveOccurred())
162162

163163
cn.SetNetConn(&badConn{})
164-
Expect(client.Pool().Put(cn)).NotTo(HaveOccurred())
164+
err = client.Pool().Put(cn)
165+
Expect(err).NotTo(HaveOccurred())
165166

166167
err = client.Ping().Err()
167168
Expect(err).NotTo(HaveOccurred())

sentinel.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ func (c *sentinelClient) PubSub() *PubSub {
9090
return &PubSub{
9191
baseClient: &baseClient{
9292
opt: c.opt,
93-
connPool: newSingleConnPool(c.connPool, false),
93+
connPool: newStickyConnPool(c.connPool, false),
9494
},
9595
}
9696
}

0 commit comments

Comments
 (0)