Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
[buffered encoder pool] Add ability to not return buffers greater tha…
Browse files Browse the repository at this point in the history
…n a certain capacity to the pool
  • Loading branch information
Jerome Froelich committed Aug 28, 2017
1 parent e4cd5c1 commit c023826
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 4 deletions.
13 changes: 10 additions & 3 deletions protocol/msgpack/buffered_encoder_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,16 @@ package msgpack
import "github.com/m3db/m3x/pool"

type bufferedEncoderPool struct {
pool pool.ObjectPool
maxBufferCapacity int
pool pool.ObjectPool
}

// NewBufferedEncoderPool creates a new pool for buffered encoders.
func NewBufferedEncoderPool(opts pool.ObjectPoolOptions) BufferedEncoderPool {
return &bufferedEncoderPool{pool: pool.NewObjectPool(opts)}
func NewBufferedEncoderPool(opts BufferedEncoderPoolOptions) BufferedEncoderPool {
return &bufferedEncoderPool{
maxBufferCapacity: opts.MaxBufferCapacity(),
pool: pool.NewObjectPool(opts.ObjectPoolOptions()),
}
}

func (p *bufferedEncoderPool) Init(alloc BufferedEncoderAlloc) {
Expand All @@ -42,5 +46,8 @@ func (p *bufferedEncoderPool) Get() BufferedEncoder {
}

func (p *bufferedEncoderPool) Put(encoder BufferedEncoder) {
if p.maxBufferCapacity != 0 && cap(encoder.Buffer().Bytes()) > p.maxBufferCapacity {
return
}
p.pool.Put(encoder)
}
33 changes: 32 additions & 1 deletion protocol/msgpack/buffered_encoder_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package msgpack

import (
"fmt"
"testing"

"github.com/m3db/m3x/pool"
Expand All @@ -29,7 +30,11 @@ import (
)

func TestBufferedEncoderPool(t *testing.T) {
p := NewBufferedEncoderPool(pool.NewObjectPoolOptions().SetSize(1))
poolOpts := pool.NewObjectPoolOptions().SetSize(1)
opts := NewBufferedEncoderPoolOptions().
SetObjectPoolOptions(poolOpts)

p := NewBufferedEncoderPool(opts)
p.Init(func() BufferedEncoder {
return NewPooledBufferedEncoder(p)
})
Expand All @@ -50,3 +55,29 @@ func TestBufferedEncoderPool(t *testing.T) {
encoder.Reset()
require.Equal(t, 0, encoder.Buffer().Len())
}

func TestBufferedEncoderPoolMaxCapacity(t *testing.T) {
poolOpts := pool.NewObjectPoolOptions().SetSize(1)
opts := NewBufferedEncoderPoolOptions().
SetMaxBufferCapacity(2).
SetObjectPoolOptions(poolOpts)

p := NewBufferedEncoderPool(opts)
p.Init(func() BufferedEncoder {
return NewPooledBufferedEncoder(p)
})

// Retrieve an encoder from the pool.
encoder := p.Get()
encoder.Buffer().Write([]byte{1, 2, 3})
require.Equal(t, 3, encoder.Buffer().Len())

// Closing the encoder should put it back to the pool.
encoder.Close()

// Retrieve an encoder and assert it's a different encoder since
// the previous one exceeded the maximum capacity of the pool.
encoder = p.Get()
fmt.Println(encoder.Buffer().Len())
require.Equal(t, 0, encoder.Buffer().Len())
}
38 changes: 38 additions & 0 deletions protocol/msgpack/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ package msgpack
import xpool "github.com/m3db/m3x/pool"

const (
// The maximum capacity of buffers that can be returned to the buffered
// encoder pool. A value of 0 indicates no maximum so all buffered
// encoders will be returned to the pool.
defaultBufferedEncoderPoolMaxCapacity = 0

// Whether the iterator should ignore higher-than-supported version
// by default for unaggregated iterator.
defaultUnaggregatedIgnoreHigherVersion = false
Expand All @@ -42,6 +47,39 @@ const (
defaultAggregatedReaderBufferSize = 1440
)

type bufferedEncoderPoolOptions struct {
maxBufferCapacity int
poolOpts xpool.ObjectPoolOptions
}

// NewBufferedEncoderPoolOptions creates a new set of buffered encoder pool options.
func NewBufferedEncoderPoolOptions() BufferedEncoderPoolOptions {
return &bufferedEncoderPoolOptions{
maxBufferCapacity: defaultBufferedEncoderPoolMaxCapacity,
poolOpts: xpool.NewObjectPoolOptions(),
}
}

func (o *bufferedEncoderPoolOptions) SetMaxBufferCapacity(value int) BufferedEncoderPoolOptions {
opts := *o
opts.maxBufferCapacity = value
return &opts
}

func (o *bufferedEncoderPoolOptions) MaxBufferCapacity() int {
return o.maxBufferCapacity
}

func (o *bufferedEncoderPoolOptions) SetObjectPoolOptions(value xpool.ObjectPoolOptions) BufferedEncoderPoolOptions {
opts := *o
opts.poolOpts = value
return &opts
}

func (o *bufferedEncoderPoolOptions) ObjectPoolOptions() xpool.ObjectPoolOptions {
return o.poolOpts
}

type unaggregatedIteratorOptions struct {
ignoreHigherVersion bool
readerBufferSize int
Expand Down
15 changes: 15 additions & 0 deletions protocol/msgpack/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ type BufferedEncoderPool interface {
Put(enc BufferedEncoder)
}

// BufferedEncoderPoolOptions provides options for buffered encoder pools.
type BufferedEncoderPoolOptions interface {
// SetMaxBufferCapacity sets the maximum buffer capacity.
SetMaxBufferCapacity(value int) BufferedEncoderPoolOptions

// MaxBufferCapacity returns the maximum buffer capacity.
MaxBufferCapacity() int

// SetObjectPoolOptions sets the object pool options.
SetObjectPoolOptions(value pool.ObjectPoolOptions) BufferedEncoderPoolOptions

// ObjectPoolOptions returns the object pool options.
ObjectPoolOptions() pool.ObjectPoolOptions
}

// encoderBase is the base encoder interface.
type encoderBase interface {
// Encoder returns the encoder.
Expand Down

0 comments on commit c023826

Please sign in to comment.