Skip to content

Commit

Permalink
Optional rate limiting of new connections and help prevent churn
Browse files Browse the repository at this point in the history
  • Loading branch information
jameshartig committed May 4, 2018
1 parent 47b63ad commit 2cdb82e
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 46 deletions.
5 changes: 4 additions & 1 deletion cluster/cluster.go
Expand Up @@ -79,6 +79,9 @@ type Opts struct {
// The size of the connection pool to use for each host. Default is 10
PoolSize int

// PoolOpts contain optional Opt's to pass to pool.NewCustom
PoolOpts []pool.Opt

// The time which must elapse between subsequent calls to create a new
// connection pool (on a per redis instance basis) in certain circumstances.
// The default is 500 milliseconds
Expand Down Expand Up @@ -170,7 +173,7 @@ func (c *Cluster) newPool(addr string, clearThrottle bool) (clusterPool, error)
df := func(network, addr string) (*redis.Client, error) {
return c.o.Dialer(network, addr)
}
p, err := pool.NewCustom("tcp", addr, c.o.PoolSize, df)
p, err := pool.NewCustom("tcp", addr, c.o.PoolSize, df, c.o.PoolOpts...)
if err != nil {
c.poolThrottles[addr] = time.After(c.o.PoolThrottle)
return clusterPool{}, err
Expand Down
6 changes: 6 additions & 0 deletions pool/doc.go
Expand Up @@ -26,6 +26,12 @@
//
// p.Put(conn)
//
// Rate limited pool
//
// A standard pool creates new connections on the fly when necessary. You can
// create a "limited" pool using NewLimited if you want to rate limit those new
// connections to at most `size` new connections every 10 seconds.
//
// Shortcuts
//
// If you're doing multiple operations you may find it useful to defer the Put
Expand Down
71 changes: 71 additions & 0 deletions pool/opts.go
@@ -0,0 +1,71 @@
package pool

import "time"

type opts struct {
pingInterval time.Duration
createLimitBuffer int
createLimitInterval time.Duration
overflowDrainInterval time.Duration
overflowSize int
}

// Opt is an optional behavior which can be applied to the NewCustom
// function to effect a Pool's behavior
type Opt func(*opts)

// PingInterval specifies the interval at which a ping event happens. On
// each ping event the Pool calls the PING redis command over one of it's
// available connections.
//
// Since connections are used in LIFO order, the ping interval * pool size is
// the duration of time it takes to ping every connection once when the pool is
// idle.
//
// A shorter interval means connections are pinged more frequently, but also
// means more traffic with the server.
//
// An interval of 0 disables pinging and is highly discouraged.
func PingInterval(d time.Duration) Opt {
return func(po *opts) {
po.pingInterval = d
}
}

// OnFullBuffer effects the Pool's behavior when it is full. The effect is
// to cause any connection which is being put back into a full pool to be put
// instead into an overflow buffer which can hold up to the given number of
// connections. If the overflow buffer is also full then the connection is
// closed and discarded.
//
// drainInterval specifies the interval at which a drain event happens. On each
// drain event a connection is removed from the overflow buffer and put into the
// pool. If the pool is full the connection is closed and discarded.
//
// When Actions are performed with the Pool the connection used may come from
// either the main pool or the overflow buffer. Connections do _not_ have to
// wait to be drained into the main pool before they will be used.
func OnFullBuffer(size int, drainInterval time.Duration) Opt {
return func(po *opts) {
po.overflowSize = size
po.overflowDrainInterval = drainInterval
}
}

// CreateLimit effects the Pool's create behavior when the pool is empty. The
// effect is to limit any connection creation to at most one every interval
// after size has been exhausted. When a request comes in and the pool is empty,
// new connections will be created as fast as necessary until the size is
// depleated and then any new requests will be blocked until interval happens.
//
// Typically you'll want some headroom over the pool size to allow a burst of
// traffic to be satisfied as quickly as possible but then limit creation after
// that initial headroom.
//
// Setting the interval to 0 disables any creation limits.
func CreateLimit(headroom int, createInterval time.Duration) Opt {
return func(po *opts) {
po.createLimitBuffer = headroom
po.createLimitInterval = createInterval
}
}
172 changes: 142 additions & 30 deletions pool/pool.go
Expand Up @@ -7,13 +7,21 @@ import (
"github.com/mediocregopher/radix.v2/redis"
)

// Pool is a simple connection pool for redis Clients. It will create a small
// pool of initial connections, and if more connections are needed they will be
// Pool is a connection pool for redis Clients. It will create a small pool of
// initial connections, and if more connections are needed they will be
// created on demand. If a connection is Put back and the pool is full it will
// be closed.
// be closed. A reserve pool is kept alongside the main pool to prevent
// connection churn. If the main pool is filled, a connection is put into the
// reserve pool and they're slowly (over 100 seconds) evicted from the reserve.
type Pool struct {
pool chan *redis.Client
df DialFunc
pool chan *redis.Client
reservePool chan *redis.Client
df DialFunc

po opts

// limited channel is read whenever we attempt to make a new client
limited chan bool

initDoneCh chan bool // used for tests
stopCh chan bool
Expand All @@ -30,39 +38,117 @@ type DialFunc func(network, addr string) (*redis.Client, error)
// NewCustom is like New except you can specify a DialFunc which will be
// used when creating new connections for the pool. The common use-case is to do
// authentication for new connections.
func NewCustom(network, addr string, size int, df DialFunc) (*Pool, error) {
func NewCustom(network, addr string, size int, df DialFunc, os ...Opt) (*Pool, error) {
defaultPoolOpts := []Opt{
PingInterval(10 * time.Second / time.Duration(size)),
}

var po opts
for _, opt := range append(defaultPoolOpts, os...) {
opt(&po)
}

p := Pool{
Network: network,
Addr: addr,
pool: make(chan *redis.Client, size),
df: df,
initDoneCh: make(chan bool),
stopCh: make(chan bool),
Network: network,
Addr: addr,
po: po,
pool: make(chan *redis.Client, size),
reservePool: make(chan *redis.Client, po.overflowSize),
limited: make(chan bool, po.createLimitBuffer),
df: df,
initDoneCh: make(chan bool),
stopCh: make(chan bool),
}

if size < 1 {
return &p, nil
}

// set up a go-routine which will periodically ping connections in the pool.
// if the pool is idle every connection will be hit once every 10 seconds.
// we do some weird defer/wait stuff to ensure this always gets started no
// we do some weird defer/wait stuff to ensure thsee goroutines alway start no
// matter what happens with the rest of the initialization
startTickCh := make(chan struct{})
defer close(startTickCh)
go func() {
tick := time.NewTicker(10 * time.Second / time.Duration(size))
defer tick.Stop()
<-startTickCh
for {

doEvery := func(i time.Duration, do func()) {
go func() {
tick := time.NewTicker(i)
defer tick.Stop()
<-startTickCh
for {
select {
case <-p.stopCh:
return
case <-tick.C:
do()
}
}
}()
}

// set up a go-routine which will periodically ping connections in the pool.
// if the pool is idle every connection will be hit once every 10 seconds.
if po.pingInterval > 0 {
doEvery(po.pingInterval, func() {
// instead of using Cmd/Get, which might make a new connection,
// we only check from the pool
select {
case <-p.stopCh:
return
case <-tick.C:
p.Cmd("PING")
case conn := <-p.pool:
// we don't care if PING errors since Put will handle that
conn.Cmd("PING")
p.Put(conn)
default:
}
}
}()
})
}

// additionally, if there are any connections in the reserve pool, they're closed
// periodically so that within 100 seconds a full reserve will be emptied
if po.overflowSize > 0 {
doEvery(po.overflowDrainInterval, func() {
// remove one from the reservePool if there is any
select {
case conn := <-p.reservePool:
conn.Close()
default:
}
})
}

if po.createLimitInterval > 0 {
go func() {
// until we're done seeding, allow it to make as fast as possible
seedLoop:
for {
select {
case <-p.stopCh:
return
case <-startTickCh:
break seedLoop
case p.limited <- true:
}
}

// now we only refill the bucket every interval, but we can't use a
// ticker because that'll overflow while we're blocked on writing to
// the limited channel
for {
select {
case <-time.After(po.createLimitInterval):
// now try to fill the bucket but we might block if its already
// filled
select {
case <-p.stopCh:
case p.limited <- true:
return
}
case <-p.stopCh:
return
}
}
}()
} else {
close(p.limited)
}

mkConn := func() error {
client, err := df(network, addr)
Expand Down Expand Up @@ -102,10 +188,22 @@ func (p *Pool) Get() (*redis.Client, error) {
select {
case conn := <-p.pool:
return conn, nil
case conn := <-p.reservePool:
return conn, nil
case <-p.stopCh:
return nil, errors.New("pool emptied")
default:
return p.df(p.Network, p.Addr)
// we need a separate select here since it's indeterminate which case go
// will select and we want to always prefer pools over creating a new
// connection
select {
case conn := <-p.pool:
return conn, nil
case conn := <-p.reservePool:
return conn, nil
case <-p.limited:
return p.df(p.Network, p.Addr)
}
}
}

Expand All @@ -125,7 +223,19 @@ func (p *Pool) Put(conn *redis.Client) {
select {
case p.pool <- conn:
default:
conn.Close()
// if there isn't any overflow allowed, immediately close
if p.po.overflowSize == 0 {
conn.Close()
return
}

// we need a separate select here since it's indeterminate which case go
// will select and we want to always prefer the main pool over the reserve
select {
case p.reservePool <- conn:
default:
conn.Close()
}
}
}
}
Expand Down Expand Up @@ -159,6 +269,8 @@ func (p *Pool) Empty() {
select {
case conn = <-p.pool:
conn.Close()
case conn = <-p.reservePool:
conn.Close()
default:
return
}
Expand All @@ -167,7 +279,7 @@ func (p *Pool) Empty() {

// Avail returns the number of connections currently available to be gotten from
// the Pool using Get. If the number is zero then subsequent calls to Get will
// be creating new connections on the fly
// be creating new connections on the fly.
func (p *Pool) Avail() int {
return len(p.pool)
return len(p.pool) + len(p.reservePool)
}

0 comments on commit 2cdb82e

Please sign in to comment.