Skip to content
This repository has been archived by the owner on Apr 8, 2019. It is now read-only.

Commit

Permalink
Add use before initialization checks to the ObjectPool
Browse files Browse the repository at this point in the history
  • Loading branch information
Rob Skillington committed Feb 7, 2017
1 parent 6c0871e commit 0af0c9c
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 4 deletions.
33 changes: 29 additions & 4 deletions pool/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,20 @@
package pool

import (
"errors"
"math"
"sync/atomic"
"time"

"github.com/uber-go/tally"
)

var (
errPoolAlreadyInitialized = errors.New("object pool already initialized")
errPoolGetBeforeInitialized = errors.New("object pool get before initialized")
errPoolPutBeforeInitialized = errors.New("object pool put before initialized")
)

const (
// TODO(r): Use tally sampling when available
sampleObjectPoolLengthEvery = 100
Expand All @@ -40,7 +47,8 @@ type objectPool struct {
size int
refillLowWatermark int
refillHighWatermark int
filling int64
filling int32
initialized int32
metrics objectPoolMetrics
}

Expand Down Expand Up @@ -81,6 +89,12 @@ func NewObjectPool(opts ObjectPoolOptions) ObjectPool {
}

func (p *objectPool) Init(alloc Allocator) {
if !atomic.CompareAndSwapInt32(&p.initialized, 0, 1) {
fn := p.opts.OnPoolAccessError()
fn(errPoolAlreadyInitialized)
return
}

p.alloc = alloc

for i := 0; i < cap(p.values); i++ {
Expand All @@ -89,8 +103,13 @@ func (p *objectPool) Init(alloc Allocator) {
}

func (p *objectPool) Get() interface{} {
var v interface{}
if atomic.LoadInt32(&p.initialized) != 1 {
fn := p.opts.OnPoolAccessError()
fn(errPoolGetBeforeInitialized)
return p.alloc()
}

var v interface{}
select {
case v = <-p.values:
default:
Expand All @@ -108,6 +127,12 @@ func (p *objectPool) Get() interface{} {
}

func (p *objectPool) Put(obj interface{}) {
if atomic.LoadInt32(&p.initialized) != 1 {
fn := p.opts.OnPoolAccessError()
fn(errPoolPutBeforeInitialized)
return
}

select {
case p.values <- obj:
default:
Expand All @@ -129,12 +154,12 @@ func (p *objectPool) setGauges() {
}

func (p *objectPool) tryFill() {
if !atomic.CompareAndSwapInt64(&p.filling, 0, 1) {
if !atomic.CompareAndSwapInt32(&p.filling, 0, 1) {
return
}

go func() {
defer atomic.StoreInt64(&p.filling, 0)
defer atomic.StoreInt32(&p.filling, 0)

for len(p.values) < p.refillHighWatermark {
select {
Expand Down
56 changes: 56 additions & 0 deletions pool/object_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestObjectPoolRefillOnLowWaterMark(t *testing.T) {
Expand Down Expand Up @@ -60,3 +61,58 @@ func TestObjectPoolRefillOnLowWaterMark(t *testing.T) {
// Assert refilled
assert.Equal(t, 75, len(pool.values))
}

func TestObjectPoolInitTwiceError(t *testing.T) {
var accessErr error
opts := NewObjectPoolOptions().SetOnPoolAccessError(func(err error) {
accessErr = err
})

pool := NewObjectPool(opts)
pool.Init(func() interface{} {
return 1
})

require.NoError(t, accessErr)

pool.Init(func() interface{} {
return 1
})

assert.Error(t, accessErr)
assert.Equal(t, errPoolAlreadyInitialized, accessErr)
}

func TestObjectPoolGetBeforeInitError(t *testing.T) {
var accessErr error
opts := NewObjectPoolOptions().SetOnPoolAccessError(func(err error) {
accessErr = err
})

pool := NewObjectPool(opts)

require.NoError(t, accessErr)

assert.Panics(t, func() {
pool.Get()
})

assert.Error(t, accessErr)
assert.Equal(t, errPoolGetBeforeInitialized, accessErr)
}

func TestObjectPoolPutBeforeInitError(t *testing.T) {
var accessErr error
opts := NewObjectPoolOptions().SetOnPoolAccessError(func(err error) {
accessErr = err
})

pool := NewObjectPool(opts)

require.NoError(t, accessErr)

pool.Put(1)

assert.Error(t, accessErr)
assert.Equal(t, errPoolPutBeforeInitialized, accessErr)
}
12 changes: 12 additions & 0 deletions pool/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type objectPoolOptions struct {
refillLowWatermark float64
refillHighWatermark float64
instrumentOpts instrument.Options
onPoolAccessError OnPoolAccessError
}

// NewObjectPoolOptions creates a new set of object pool options
Expand All @@ -40,6 +41,7 @@ func NewObjectPoolOptions() ObjectPoolOptions {
size: defaultSize,
refillLowWatermark: defaultRefillLowWatermark,
instrumentOpts: instrument.NewOptions(),
onPoolAccessError: func(err error) { panic(err) },
}
}

Expand Down Expand Up @@ -82,3 +84,13 @@ func (o *objectPoolOptions) SetInstrumentOptions(value instrument.Options) Objec
func (o *objectPoolOptions) InstrumentOptions() instrument.Options {
return o.instrumentOpts
}

func (o *objectPoolOptions) SetOnPoolAccessError(value OnPoolAccessError) ObjectPoolOptions {
opts := *o
opts.onPoolAccessError = value
return &opts
}

func (o *objectPoolOptions) OnPoolAccessError() OnPoolAccessError {
return o.onPoolAccessError
}
12 changes: 12 additions & 0 deletions pool/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type CheckedObjectPool interface {
Get() checked.ReadWriteRef
}

// OnPoolAccessError is a function to call when a pool access error occurs,
// such as get or put before the pool is initialized.
type OnPoolAccessError func(err error)

// ObjectPoolOptions provides options for an object pool.
type ObjectPoolOptions interface {
// SetSize sets the size of the object pool.
Expand Down Expand Up @@ -85,6 +89,14 @@ type ObjectPoolOptions interface {

// InstrumentOptions returns the instrument options.
InstrumentOptions() instrument.Options

// SetOnPoolAccessError sets the on pool access error callback, by
// default this is a panic.
SetOnPoolAccessError(value OnPoolAccessError) ObjectPoolOptions

// OnPoolAccessError returns the on pool access error callback, by
// default this is a panic.
OnPoolAccessError() OnPoolAccessError
}

// Bucket specifies a pool bucket.
Expand Down

0 comments on commit 0af0c9c

Please sign in to comment.