Skip to content

Commit

Permalink
Optional rate limiting of new connections and help prevent churn
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshartig committed Apr 9, 2018
1 parent 94975c2 commit 4609d14
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 35 deletions.
12 changes: 11 additions & 1 deletion cluster/cluster.go
Expand Up @@ -79,6 +79,10 @@ type Opts struct {
// The size of the connection pool to use for each host. Default is 10
PoolSize int

// Whether or not to use a pool that rate limits new connections. Default
// is false.
PoolLimited bool

// The time which must elapse between subsequent calls to create a new
// connection pool (on a per redis instance basis) in certain circumstances.
// The default is 500 milliseconds
Expand Down Expand Up @@ -170,7 +174,13 @@ func (c *Cluster) newPool(addr string, clearThrottle bool) (clusterPool, error)
df := func(network, addr string) (*redis.Client, error) {
return c.o.Dialer(network, addr)
}
p, err := pool.NewCustom("tcp", addr, c.o.PoolSize, df)
var p *pool.Pool
var err error
if c.o.PoolLimited {
p, err = pool.NewCustomLimited("tcp", addr, c.o.PoolSize, 0, df)
} else {
p, err = pool.NewCustom("tcp", addr, c.o.PoolSize, df)
}
if err != nil {
c.poolThrottles[addr] = time.After(c.o.PoolThrottle)
return clusterPool{}, err
Expand Down
6 changes: 6 additions & 0 deletions pool/doc.go
Expand Up @@ -26,6 +26,12 @@
//
// p.Put(conn)
//
// Rate limited pool
//
// A standard pool creates new connections on the fly when necessary. You can
// create a "limited" pool using NewLimited if you want to rate limit those new
// connections to at most `size` new connections every 10 seconds.
//
// Shortcuts
//
// If you're doing multiple operations you may find it useful to defer the Put
Expand Down
119 changes: 99 additions & 20 deletions pool/pool.go
Expand Up @@ -7,13 +7,19 @@ import (
"github.com/mediocregopher/radix.v2/redis"
)

// Pool is a simple connection pool for redis Clients. It will create a small
// pool of initial connections, and if more connections are needed they will be
// Pool is a connection pool for redis Clients. It will create a small pool of
// initial connections, and if more connections are needed they will be
// created on demand. If a connection is Put back and the pool is full it will
// be closed.
// be closed. A reserve pool is kept alongside the main pool to prevent
// connection churn. If the main pool is filled, a connection is put into the
// reserve pool and they're slowly (over 100 seconds) evicted from the reserve.
type Pool struct {
pool chan *redis.Client
df DialFunc
pool chan *redis.Client
reservePool chan *redis.Client
df DialFunc

// limited channel is read whenever we attempt to make a new client
limited chan bool

initDoneCh chan bool // used for tests
stopCh chan bool
Expand All @@ -32,12 +38,14 @@ type DialFunc func(network, addr string) (*redis.Client, error)
// authentication for new connections.
func NewCustom(network, addr string, size int, df DialFunc) (*Pool, error) {
p := Pool{
Network: network,
Addr: addr,
pool: make(chan *redis.Client, size),
df: df,
initDoneCh: make(chan bool),
stopCh: make(chan bool),
Network: network,
Addr: addr,
pool: make(chan *redis.Client, size),
reservePool: make(chan *redis.Client, size*10),
limited: make(chan bool, size),
df: df,
initDoneCh: make(chan bool),
stopCh: make(chan bool),
}

if size < 1 {
Expand All @@ -46,20 +54,37 @@ func NewCustom(network, addr string, size int, df DialFunc) (*Pool, error) {

// set up a go-routine which will periodically ping connections in the pool.
// if the pool is idle every connection will be hit once every 10 seconds.
// additionally, if there are any connections in the reserve pool, they're closed
// periodically so that within 100 seconds a full reserve will be emptied
// we do some weird defer/wait stuff to ensure this always gets started no
// matter what happens with the rest of the initialization
startTickCh := make(chan struct{})
defer close(startTickCh)
go func() {
tick := time.NewTicker(10 * time.Second / time.Duration(size))
defer tick.Stop()
pingTick := time.NewTicker(10 * time.Second / time.Duration(size))
defer pingTick.Stop()
<-startTickCh
for {
select {
case <-p.stopCh:
return
case <-tick.C:
p.Cmd("PING")
case <-pingTick.C:
// instead of using Cmd/Get, which might make a new connection,
// we only check from the pool
select {
case conn := <-p.pool:
// we don't care if PING errors since Put will handle that
conn.Cmd("PING")
p.Put(conn)
default:
}

// remove one from the reservePool if there is any
select {
case conn := <-p.reservePool:
conn.Close()
default:
}
}
}
}()
Expand Down Expand Up @@ -93,7 +118,41 @@ func NewCustom(network, addr string, size int, df DialFunc) (*Pool, error) {
// connections to have waiting to be used at any given moment. If an error is
// encountered an empty (but still usable) pool is returned alongside that error
func New(network, addr string, size int) (*Pool, error) {
return NewCustom(network, addr, size, redis.Dial)
p, err := NewCustom(network, addr, size, redis.Dial)
close(p.limited)
return p, err
}

// NewCustomLimited creates a new Pool like NewCustom except that the creation of clients
// is limited to size every dur (defaults to 10 seconds).
func NewCustomLimited(network, addr string, size int, dur time.Duration, df DialFunc) (*Pool, error) {
if dur == 0 {
dur = 10 * time.Second
}
p, err := NewCustom(network, addr, size, df)
// fill up the bucket
for i := 0; i < size; i++ {
p.limited <- true
}

go func() {
fillTick := time.NewTicker(dur / time.Duration(size))
defer fillTick.Stop()
for {
select {
case <-p.stopCh:
return
case <-fillTick.C:
// refill a "token" to the limited bucket
select {
case p.limited <- true:
default:
}
}
}
}()

return p, err
}

// Get retrieves an available redis client. If there are none available it will
Expand All @@ -102,10 +161,22 @@ func (p *Pool) Get() (*redis.Client, error) {
select {
case conn := <-p.pool:
return conn, nil
case conn := <-p.reservePool:
return conn, nil
case <-p.stopCh:
return nil, errors.New("pool emptied")
default:
return p.df(p.Network, p.Addr)
// we need a separate select here since it's indeterminate which case go
// will select and we want to always prefer pools over creating a new
// connection
select {
case conn := <-p.pool:
return conn, nil
case conn := <-p.reservePool:
return conn, nil
case <-p.limited:
return p.df(p.Network, p.Addr)
}
}
}

Expand All @@ -125,7 +196,13 @@ func (p *Pool) Put(conn *redis.Client) {
select {
case p.pool <- conn:
default:
conn.Close()
// we need a separate select here since it's indeterminate which case go
// will select and we want to always prefer the main pool over the reserve
select {
case p.reservePool <- conn:
default:
conn.Close()
}
}
}
}
Expand Down Expand Up @@ -159,6 +236,8 @@ func (p *Pool) Empty() {
select {
case conn = <-p.pool:
conn.Close()
case conn = <-p.reservePool:
conn.Close()
default:
return
}
Expand All @@ -167,7 +246,7 @@ func (p *Pool) Empty() {

// Avail returns the number of connections currently available to be gotten from
// the Pool using Get. If the number is zero then subsequent calls to Get will
// be creating new connections on the fly
// be creating new connections on the fly.
func (p *Pool) Avail() int {
return len(p.pool)
return len(p.pool) + len(p.reservePool)
}
102 changes: 88 additions & 14 deletions pool/pool_test.go
Expand Up @@ -3,44 +3,118 @@ package pool
import (
"sync"
. "testing"
"time"

"github.com/mediocregopher/radix.v2/redis"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestPool(t *T) {
size := 10
size := 3
pool, err := New("tcp", "localhost:6379", size)
require.Nil(t, err)
<-pool.initDoneCh

var wg sync.WaitGroup
for i := 0; i < size*4; i++ {
wg.Add(1)
go func() {
for i := 0; i < 100; i++ {
conn, err := pool.Get()
assert.Nil(t, err)
clients := []*redis.Client{}
for i := 0; i < size*11; i++ {
c, err := pool.Get()
require.NoError(t, err)
clients = append(clients, c)
}
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, 0, pool.Avail())

assert.Nil(t, conn.Cmd("ECHO", "HI").Err)
// put size back
for i := 0; i < size; i++ {
pool.Put(clients[i])
}
assert.Equal(t, size, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))

pool.Put(conn)
}
wg.Done()
}()
// put the rest back
for i := size; i < len(clients); i++ {
pool.Put(clients[i])
}
wg.Wait()
assert.Equal(t, size, len(pool.pool))
assert.Equal(t, size*10, len(pool.reservePool))

// wait for the background goroutine to evict from the pool
time.Sleep(10*time.Second/time.Duration(size) + time.Second)
assert.Equal(t, size, len(pool.pool))
assert.True(t, len(pool.reservePool) < size*10, "size: %d, expected less than: %d", len(pool.reservePool), size*10)

pool.Empty()
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, 0, pool.Avail())

c, err := pool.Get()
assert.Nil(t, c)
assert.Error(t, err)
}

func TestLimitedPool(t *T) {
size := 3
pool, err := NewCustomLimited("tcp", "localhost:6379", size, 0, redis.Dial)
require.Nil(t, err)
<-pool.initDoneCh

assert.Equal(t, size, len(pool.limited))

clients := []*redis.Client{}
for i := 0; i < size*2; i++ {
c, err := pool.Get()
require.NoError(t, err)
clients = append(clients, c)
}
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, 0, len(pool.limited))
assert.Equal(t, 0, pool.Avail())

// we should've used up ALL of our limit so start 2 goroutines to test
// putting one back and the token bucket refilling
doneCh := make(chan bool, 2)
for i := 0; i < 2; i++ {
go func() {
c, err := pool.Get()
require.NoError(t, err)
c.Close()
doneCh <- true
}()
}
// sleep a bit for the goroutine to start up and make sure the goroutines are
// still waiting for a connection
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 0, len(doneCh))

// if we put one back that should free up one of the goroutines
pool.Put(clients[0])
time.Sleep(100 * time.Millisecond)
assert.Equal(t, 1, len(doneCh))
assert.Equal(t, 0, len(pool.limited))

// now if we wait 10/size seconds we should get another token added to the
// bucket and the goroutine should be freed up
time.Sleep(10*time.Second/time.Duration(size) + time.Second)
assert.Equal(t, 2, len(doneCh))
// since the goroutine was freed up, it just created one and so the pool
// should be back to 0
assert.Equal(t, 0, len(pool.limited))

// close whatever is left
for i := 1; i < len(clients); i++ {
clients[i].Close()
}

pool.Empty()
assert.Equal(t, 0, len(pool.pool))
assert.Equal(t, 0, len(pool.reservePool))
assert.Equal(t, 0, pool.Avail())
}

func TestCmd(t *T) {
size := 10
pool, err := New("tcp", "localhost:6379", 10)
Expand Down

0 comments on commit 4609d14

Please sign in to comment.