Skip to content

Commit

Permalink
Added pool.GetTimeout to limit how long Get blocks when pool is empty
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshartig authored and Brian Picciano committed Jun 3, 2018
1 parent 0fc168e commit 94360be
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
15 changes: 15 additions & 0 deletions pool/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type opts struct {
createLimitInterval time.Duration
overflowDrainInterval time.Duration
overflowSize int
getTimeout time.Duration
}

// Opt is an optional behavior which can be applied to the NewCustom
Expand Down Expand Up @@ -52,6 +53,20 @@ func OnFullBuffer(size int, drainInterval time.Duration) Opt {
}
}

// GetTimeout effects the Pool's behavior when it is empty. The effect is to
// limit the amount of time Get spends waiting for a new connection before
// timing out and returning ErrGetTimeout.
//
// The timeout does not include the time it takes to dial the new connection
// since we have no way of cancelling the dial once it has begun.
//
// The default is 0, which disables the timeout.
func GetTimeout(timeout time.Duration) Opt {
return func(po *opts) {
po.getTimeout = timeout
}
}

// CreateLimit effects the Pool's create behavior when the pool is empty. The
// effect is to limit any connection creation to at most one every interval
// after headroom has been exhausted. When a request comes in and the pool is
Expand Down
15 changes: 15 additions & 0 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ import (
"github.com/mediocregopher/radix.v2/redis"
)

var (

// ErrGetTimeout is returned from Get when a GetTimeout was specified and
// that time was hit.
ErrGetTimeout = errors.New("get timed out")
)

// Pool is a connection pool for redis Clients. It will create a small pool of
// initial connections, and if more connections are needed they will be
// created on demand. If a connection is Put back and the pool is full it will
Expand Down Expand Up @@ -199,6 +206,12 @@ func (p *Pool) Get() (*redis.Client, error) {
case <-p.stopCh:
return nil, errors.New("pool emptied")
default:
var timeoutCh <-chan time.Time
if p.po.getTimeout > 0 {
timer := time.NewTimer(p.po.getTimeout)
defer timer.Stop()
timeoutCh = timer.C
}
// we need a separate select here since it's indeterminate which case go
// will select and we want to always prefer pools over creating a new
// connection
Expand All @@ -207,6 +220,8 @@ func (p *Pool) Get() (*redis.Client, error) {
return conn, nil
case conn := <-p.reservePool:
return conn, nil
case <-timeoutCh:
return nil, ErrGetTimeout
case <-p.limited:
return p.df(p.Network, p.Addr)
}
Expand Down
48 changes: 48 additions & 0 deletions pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,54 @@ func TestLimitedPool(t *T) {
assert.Equal(t, 0, pool.Avail())
}

func TestGetTimeoutPool(t *T) {
size := 1
pool, err := NewCustom("tcp", "localhost:6379", size, redis.Dial,
CreateLimit(1, 5*time.Second),
GetTimeout(time.Second),
)
require.Nil(t, err)
<-pool.initDoneCh

assert.Equal(t, size, len(pool.limited))

clients := []*redis.Client{}
for i := 0; i < size; i++ {
c, err := pool.Get()
require.NoError(t, err)
clients = append(clients, c)
}
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, size, len(pool.limited))
assert.Equal(t, 0, pool.Avail())

for i := 0; i < size; i++ {
c, err := pool.Get()
require.NoError(t, err)
clients = append(clients, c)
}
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, 0, len(pool.limited))
assert.Equal(t, 0, pool.Avail())

// now try to get a client and it should timeout
c, err := pool.Get()
assert.Nil(t, c)
assert.Equal(t, ErrGetTimeout, err)

// close whatever is left
for i := 1; i < len(clients); i++ {
clients[i].Close()
}

pool.Empty()
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, 0, pool.Avail())
}

func TestCmd(t *T) {
size := 10
pool, err := New("tcp", "localhost:6379", 10)
Expand Down

0 comments on commit 94360be

Please sign in to comment.