Skip to content

Commit 9f9822e

Browse files
committed
Use a lock-free connection pool
1 parent c3ab344 commit 9f9822e

File tree

6 files changed

+117
-109
lines changed

6 files changed

+117
-109
lines changed

command_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,9 @@ var _ = Describe("Command", func() {
150150
wg.Add(n)
151151
for i := 0; i < n; i++ {
152152
go func() {
153+
defer GinkgoRecover()
153154
defer wg.Done()
155+
154156
err := client.Incr(key).Err()
155157
Expect(err).NotTo(HaveOccurred())
156158
}()

commands_test.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ var _ = Describe("Commands", func() {
1717

1818
BeforeEach(func() {
1919
client = redis.NewTCPClient(&redis.Options{
20-
Addr: redisAddr,
20+
Addr: redisAddr,
21+
PoolTimeout: 30 * time.Second,
2122
})
2223
})
2324

@@ -1116,6 +1117,8 @@ var _ = Describe("Commands", func() {
11161117
started := make(chan bool)
11171118
done := make(chan bool)
11181119
go func() {
1120+
defer GinkgoRecover()
1121+
11191122
started <- true
11201123
bLPop := client.BLPop(0, "list")
11211124
Expect(bLPop.Err()).NotTo(HaveOccurred())
@@ -1161,6 +1164,8 @@ var _ = Describe("Commands", func() {
11611164
started := make(chan bool)
11621165
done := make(chan bool)
11631166
go func() {
1167+
defer GinkgoRecover()
1168+
11641169
started <- true
11651170
brpop := client.BRPop(0, "list")
11661171
Expect(brpop.Err()).NotTo(HaveOccurred())
@@ -2190,7 +2195,9 @@ var _ = Describe("Commands", func() {
21902195
wg.Add(1)
21912196

21922197
go func() {
2198+
defer GinkgoRecover()
21932199
defer wg.Done()
2200+
21942201
for {
21952202
cmds, err := safeIncr()
21962203
if err == redis.TxFailedErr {

pool.go

Lines changed: 85 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,20 @@
11
package redis
22

33
import (
4-
"container/list"
54
"errors"
65
"fmt"
76
"log"
87
"net"
98
"sync"
9+
"sync/atomic"
1010
"time"
1111

1212
"gopkg.in/bufio.v1"
1313
)
1414

1515
var (
16-
errClosed = errors.New("redis: client is closed")
16+
errClosed = errors.New("redis: client is closed")
17+
errPoolTimeout = errors.New("redis: connection pool timeout")
1718
)
1819

1920
var (
@@ -37,13 +38,9 @@ type conn struct {
3738
rd *bufio.Reader
3839
buf []byte
3940

40-
inUse bool
41-
usedAt time.Time
42-
41+
usedAt time.Time
4342
readTimeout time.Duration
4443
writeTimeout time.Duration
45-
46-
elem *list.Element
4744
}
4845

4946
func newConnFunc(dial func() (net.Conn, error)) func() (*conn, error) {
@@ -87,19 +84,21 @@ func (cn *conn) Close() error {
8784
return cn.netcn.Close()
8885
}
8986

87+
func (cn *conn) isIdle(timeout time.Duration) bool {
88+
return timeout > 0 && time.Since(cn.usedAt) > timeout
89+
}
90+
9091
//------------------------------------------------------------------------------
9192

9293
type connPool struct {
9394
dial func() (*conn, error)
9495
rl *rateLimiter
9596

96-
opt *options
97+
opt *options
98+
conns chan *conn
9799

98-
cond *sync.Cond
99-
conns *list.List
100-
101-
idleNum int
102-
closed bool
100+
size int32
101+
closed int32
103102

104103
lastDialErr error
105104
}
@@ -109,13 +108,47 @@ func newConnPool(dial func() (*conn, error), opt *options) *connPool {
109108
dial: dial,
110109
rl: newRateLimiter(time.Second, 2*opt.PoolSize),
111110

112-
opt: opt,
111+
opt: opt,
112+
conns: make(chan *conn, opt.PoolSize),
113+
}
114+
}
115+
116+
func (p *connPool) isClosed() bool { return atomic.LoadInt32(&p.closed) > 0 }
113117

114-
cond: sync.NewCond(&sync.Mutex{}),
115-
conns: list.New(),
118+
// First available connection, non-blocking
119+
func (p *connPool) first() *conn {
120+
for {
121+
select {
122+
case cn := <-p.conns:
123+
if !cn.isIdle(p.opt.IdleTimeout) {
124+
return cn
125+
}
126+
p.remove(cn)
127+
default:
128+
return nil
129+
}
116130
}
131+
panic("not reached")
117132
}
118133

134+
// Wait for available connection, blocking
135+
func (p *connPool) wait() (*conn, error) {
136+
deadline := time.After(p.opt.PoolTimeout)
137+
for {
138+
select {
139+
case cn := <-p.conns:
140+
if !cn.isIdle(p.opt.IdleTimeout) {
141+
return cn, nil
142+
}
143+
p.remove(cn)
144+
case <-deadline:
145+
return nil, errPoolTimeout
146+
}
147+
}
148+
panic("not reached")
149+
}
150+
151+
// Establish a new connection
119152
func (p *connPool) new() (*conn, error) {
120153
if !p.rl.Check() {
121154
err := fmt.Errorf(
@@ -132,60 +165,29 @@ func (p *connPool) new() (*conn, error) {
132165
}
133166

134167
func (p *connPool) Get() (*conn, bool, error) {
135-
p.cond.L.Lock()
136-
137-
if p.closed {
138-
p.cond.L.Unlock()
168+
if p.isClosed() {
139169
return nil, false, errClosed
140170
}
141171

142-
if p.opt.IdleTimeout > 0 {
143-
for el := p.conns.Front(); el != nil; el = el.Next() {
144-
cn := el.Value.(*conn)
145-
if cn.inUse {
146-
break
147-
}
148-
if time.Since(cn.usedAt) > p.opt.IdleTimeout {
149-
if err := p.remove(cn); err != nil {
150-
log.Printf("remove failed: %s", err)
151-
}
152-
}
153-
}
154-
}
155-
156-
for p.conns.Len() >= p.opt.PoolSize && p.idleNum == 0 {
157-
p.cond.Wait()
158-
}
159-
160-
if p.idleNum > 0 {
161-
elem := p.conns.Front()
162-
cn := elem.Value.(*conn)
163-
if cn.inUse {
164-
panic("pool: precondition failed")
165-
}
166-
cn.inUse = true
167-
p.conns.MoveToBack(elem)
168-
p.idleNum--
169-
170-
p.cond.L.Unlock()
172+
// Fetch first non-idle connection, if available
173+
if cn := p.first(); cn != nil {
171174
return cn, false, nil
172175
}
173176

174-
if p.conns.Len() < p.opt.PoolSize {
177+
// Try to create a new one
178+
if ref := atomic.AddInt32(&p.size, 1); int(ref) <= p.opt.PoolSize {
175179
cn, err := p.new()
176180
if err != nil {
177-
p.cond.L.Unlock()
181+
atomic.AddInt32(&p.size, -1) // Undo ref increment
178182
return nil, false, err
179183
}
180-
181-
cn.inUse = true
182-
cn.elem = p.conns.PushBack(cn)
183-
184-
p.cond.L.Unlock()
185184
return cn, true, nil
186185
}
186+
atomic.AddInt32(&p.size, -1)
187187

188-
panic("not reached")
188+
// Otherwise, wait for the available connection
189+
cn, err := p.wait()
190+
return cn, false, err
189191
}
190192

191193
func (p *connPool) Put(cn *conn) error {
@@ -195,92 +197,67 @@ func (p *connPool) Put(cn *conn) error {
195197
return p.Remove(cn)
196198
}
197199

200+
if p.isClosed() {
201+
return errClosed
202+
}
198203
if p.opt.IdleTimeout > 0 {
199204
cn.usedAt = time.Now()
200205
}
201-
202-
p.cond.L.Lock()
203-
if p.closed {
204-
p.cond.L.Unlock()
205-
return errClosed
206-
}
207-
cn.inUse = false
208-
p.conns.MoveToFront(cn.elem)
209-
p.idleNum++
210-
p.cond.Signal()
211-
p.cond.L.Unlock()
212-
206+
p.conns <- cn
213207
return nil
214208
}
215209

216210
func (p *connPool) Remove(cn *conn) error {
217-
p.cond.L.Lock()
218-
if p.closed {
219-
// Noop, connection is already closed.
220-
p.cond.L.Unlock()
211+
if p.isClosed() {
221212
return nil
222213
}
223-
err := p.remove(cn)
224-
p.cond.Signal()
225-
p.cond.L.Unlock()
226-
return err
214+
return p.remove(cn)
227215
}
228216

229217
func (p *connPool) remove(cn *conn) error {
230-
p.conns.Remove(cn.elem)
231-
cn.elem = nil
232-
if !cn.inUse {
233-
p.idleNum--
234-
}
218+
atomic.AddInt32(&p.size, -1)
235219
return cn.Close()
236220
}
237221

238222
// Len returns number of idle connections.
239223
func (p *connPool) Len() int {
240-
defer p.cond.L.Unlock()
241-
p.cond.L.Lock()
242-
return p.idleNum
224+
return len(p.conns)
243225
}
244226

245227
// Size returns number of connections in the pool.
246228
func (p *connPool) Size() int {
247-
defer p.cond.L.Unlock()
248-
p.cond.L.Lock()
249-
return p.conns.Len()
229+
return int(atomic.LoadInt32(&p.size))
250230
}
251231

252232
func (p *connPool) Filter(f func(*conn) bool) {
253-
p.cond.L.Lock()
254-
for el, next := p.conns.Front(), p.conns.Front(); el != nil; el = next {
255-
next = el.Next()
256-
cn := el.Value.(*conn)
257-
if !f(cn) {
258-
p.remove(cn)
233+
for {
234+
select {
235+
case cn := <-p.conns:
236+
if !f(cn) {
237+
p.remove(cn)
238+
}
239+
default:
240+
return
259241
}
260242
}
261-
p.cond.L.Unlock()
243+
panic("not reached")
262244
}
263245

264-
func (p *connPool) Close() error {
265-
defer p.cond.L.Unlock()
266-
p.cond.L.Lock()
267-
if p.closed {
246+
func (p *connPool) Close() (err error) {
247+
if !atomic.CompareAndSwapInt32(&p.closed, 0, 1) {
268248
return nil
269249
}
270-
p.closed = true
271250
p.rl.Close()
272-
var retErr error
251+
273252
for {
274-
e := p.conns.Front()
275-
if e == nil {
276-
break
253+
if p.Size() < 1 {
254+
return
277255
}
278-
if err := p.remove(e.Value.(*conn)); err != nil {
279-
log.Printf("cn.Close failed: %s", err)
280-
retErr = err
256+
if e := p.remove(<-p.conns); e != nil {
257+
err = e
281258
}
282259
}
283-
return retErr
260+
panic("not reached")
284261
}
285262

286263
//------------------------------------------------------------------------------

pool_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ var _ = Describe("Pool", func() {
1717
for i := 0; i < n; i++ {
1818
wg.Add(1)
1919
go func() {
20+
defer GinkgoRecover()
2021
defer wg.Done()
22+
2123
cb()
2224
}()
2325
}

0 commit comments

Comments
 (0)