Skip to content
This repository has been archived by the owner on Apr 8, 2019. It is now read-only.

Commit

Permalink
Add RegisterCloser to allow cleanup of resources with a Close method (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
robskillington committed May 11, 2018
1 parent f591af0 commit 4eee939
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 76 deletions.
67 changes: 46 additions & 21 deletions context/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@ import (
type ctx struct {
sync.RWMutex

pool contextPool
done bool
wg sync.WaitGroup
finalizers []resource.Finalizer
pool contextPool
done bool
wg sync.WaitGroup
finalizeables []finalizeable
}

type finalizeable struct {
finalizer resource.Finalizer
closer resource.Closer
}

// NewContext creates a new context.
Expand All @@ -55,28 +60,36 @@ func (c *ctx) IsClosed() bool {
}

func (c *ctx) RegisterFinalizer(f resource.Finalizer) {
c.registerFinalizeable(finalizeable{finalizer: f})
}

func (c *ctx) RegisterCloser(f resource.Closer) {
c.registerFinalizeable(finalizeable{closer: f})
}

func (c *ctx) registerFinalizeable(f finalizeable) {
if c.Lock(); c.done {
c.Unlock()
return
}

if c.finalizers != nil {
c.finalizers = append(c.finalizers, f)
if c.finalizeables != nil {
c.finalizeables = append(c.finalizeables, f)
c.Unlock()
return
}

if c.pool != nil {
c.finalizers = append(c.pool.GetFinalizers(), f)
c.finalizeables = append(c.pool.getFinalizeables(), f)
} else {
c.finalizers = append(allocateFinalizers(), f)
c.finalizeables = append(allocateFinalizeables(), f)
}

c.Unlock()
}

func allocateFinalizers() []resource.Finalizer {
return make([]resource.Finalizer, 0, defaultInitFinalizersCap)
func allocateFinalizeables() []finalizeable {
return make([]finalizeable, 0, defaultInitFinalizersCap)
}

func (c *ctx) DependsOn(blocker Context) {
Expand Down Expand Up @@ -119,26 +132,42 @@ func (c *ctx) close(mode closeMode) {
c.done = true
c.Unlock()

if len(c.finalizers) == 0 {
if c.finalizeables == nil {
c.returnToPool()
return
}

// Capture finalizeables to avoid concurrent r/w if Reset
// is used after a caller waits for the finalizers to finish
f := c.finalizeables
c.finalizeables = nil

switch mode {
case closeAsync:
go c.finalize()
go c.finalize(f)
case closeBlock:
c.finalize()
c.finalize(f)
}
}

func (c *ctx) finalize() {
func (c *ctx) finalize(f []finalizeable) {
// Wait for dependencies.
c.wg.Wait()

// Now call finalizers.
for i := range c.finalizers {
c.finalizers[i].Finalize()
for i := range f {
if f[i].finalizer != nil {
f[i].finalizer.Finalize()
f[i].finalizer = nil
}
if f[i].closer != nil {
f[i].closer.Close()
f[i].closer = nil
}
}

if c.pool != nil {
c.pool.putFinalizeables(f)
}

c.returnToPool()
Expand All @@ -147,11 +176,7 @@ func (c *ctx) finalize() {
func (c *ctx) Reset() {
c.Lock()

if c.pool != nil && c.finalizers != nil {
c.pool.PutFinalizers(c.finalizers)
}

c.done, c.finalizers = false, nil
c.done, c.finalizeables = false, nil

c.Unlock()
}
Expand Down
40 changes: 33 additions & 7 deletions context/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,28 @@ func TestRegisterFinalizer(t *testing.T) {
wg.Done()
}))

assert.Equal(t, 1, len(ctx.finalizers))
assert.Equal(t, 1, len(ctx.finalizeables))

ctx.Close()
wg.Wait()

assert.Equal(t, true, closed)
}

func TestRegisterCloser(t *testing.T) {
var (
wg sync.WaitGroup
closed = false
ctx = NewContext().(*ctx)
)

wg.Add(1)
ctx.RegisterCloser(resource.CloserFn(func() {
closed = true
wg.Done()
}))

assert.Equal(t, 1, len(ctx.finalizeables))

ctx.Close()
wg.Wait()
Expand All @@ -57,7 +78,7 @@ func TestDoesNotRegisterFinalizerWhenClosed(t *testing.T) {
ctx.Close()
ctx.RegisterFinalizer(resource.FinalizerFn(func() {}))

assert.Equal(t, 0, len(ctx.finalizers))
assert.Equal(t, 0, len(ctx.finalizeables))
}

func TestDoesNotCloseTwice(t *testing.T) {
Expand All @@ -80,21 +101,26 @@ func TestDoesNotCloseTwice(t *testing.T) {
func TestDependsOnNoCloserAllocation(t *testing.T) {
ctx := NewContext().(*ctx)
ctx.DependsOn(NewContext())
assert.Nil(t, ctx.finalizers)
assert.Nil(t, ctx.finalizeables)
}

func TestDependsOn(t *testing.T) {
ctx := NewContext().(*ctx)
testDependsOn(t, ctx)
}

func TestDependsOnWithReset(t *testing.T) {
ctx := NewContext().(*ctx)

testDependsOn(ctx, t)
testDependsOn(t, ctx)

// Reset and test works again.
ctx.Reset()

testDependsOn(ctx, t)
testDependsOn(t, ctx)
}

func testDependsOn(c *ctx, t *testing.T) {
func testDependsOn(t *testing.T, c *ctx) {
var wg sync.WaitGroup
var closed int32

Expand All @@ -116,7 +142,7 @@ func testDependsOn(c *ctx, t *testing.T) {
assert.Equal(t, int32(0), atomic.LoadInt32(&closed))

// Now close the context ctx is dependent on.
other.Close()
other.BlockingClose()

wg.Wait()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package context

import (
"github.com/m3db/m3x/pool"
"github.com/m3db/m3x/resource"
)

// Copyright (c) 2018 Uber Technologies, Inc.
Expand All @@ -29,78 +28,78 @@ import (
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

// finalizersArrayPool provides a pool for resourceFinalizer slices.
type finalizersArrayPool interface {
// finalizeablesArrayPool provides a pool for finalizeable slices.
type finalizeablesArrayPool interface {
// Init initializes the array pool, it needs to be called
// before Get/Put use.
Init()

// Get returns the a slice from the pool.
Get() []resource.Finalizer
Get() []finalizeable

// Put returns the provided slice to the pool.
Put(elems []resource.Finalizer)
Put(elems []finalizeable)
}

type finalizersFinalizeFn func([]resource.Finalizer) []resource.Finalizer
type finalizeablesFinalizeFn func([]finalizeable) []finalizeable

type finalizersArrayPoolOpts struct {
type finalizeablesArrayPoolOpts struct {
Options pool.ObjectPoolOptions
Capacity int
MaxCapacity int
FinalizeFn finalizersFinalizeFn
FinalizeFn finalizeablesFinalizeFn
}

type finalizersArrPool struct {
opts finalizersArrayPoolOpts
type finalizeablesArrPool struct {
opts finalizeablesArrayPoolOpts
pool pool.ObjectPool
}

func newFinalizersArrayPool(opts finalizersArrayPoolOpts) finalizersArrayPool {
func newFinalizeablesArrayPool(opts finalizeablesArrayPoolOpts) finalizeablesArrayPool {
if opts.FinalizeFn == nil {
opts.FinalizeFn = defaultFinalizersFinalizerFn
opts.FinalizeFn = defaultFinalizeablesFinalizerFn
}
p := pool.NewObjectPool(opts.Options)
return &finalizersArrPool{opts, p}
return &finalizeablesArrPool{opts, p}
}

func (p *finalizersArrPool) Init() {
func (p *finalizeablesArrPool) Init() {
p.pool.Init(func() interface{} {
return make([]resource.Finalizer, 0, p.opts.Capacity)
return make([]finalizeable, 0, p.opts.Capacity)
})
}

func (p *finalizersArrPool) Get() []resource.Finalizer {
return p.pool.Get().([]resource.Finalizer)
func (p *finalizeablesArrPool) Get() []finalizeable {
return p.pool.Get().([]finalizeable)
}

func (p *finalizersArrPool) Put(arr []resource.Finalizer) {
func (p *finalizeablesArrPool) Put(arr []finalizeable) {
arr = p.opts.FinalizeFn(arr)
if max := p.opts.MaxCapacity; max > 0 && cap(arr) > max {
return
}
p.pool.Put(arr)
}

func defaultFinalizersFinalizerFn(elems []resource.Finalizer) []resource.Finalizer {
var empty resource.Finalizer
func defaultFinalizeablesFinalizerFn(elems []finalizeable) []finalizeable {
var empty finalizeable
for i := range elems {
elems[i] = empty
}
elems = elems[:0]
return elems
}

type finalizersArr []resource.Finalizer
type finalizeablesArr []finalizeable

func (elems finalizersArr) grow(n int) []resource.Finalizer {
func (elems finalizeablesArr) grow(n int) []finalizeable {
if cap(elems) < n {
elems = make([]resource.Finalizer, n)
elems = make([]finalizeable, n)
}
elems = elems[:n]
// following compiler optimized memcpy impl
// https://github.com/golang/go/wiki/CompilerOptimizations#optimized-memclr
var empty resource.Finalizer
var empty finalizeable
for i := range elems {
elems[i] = empty
}
Expand Down
11 changes: 5 additions & 6 deletions context/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@ package context

import (
"github.com/m3db/m3x/pool"
"github.com/m3db/m3x/resource"
)

type poolOfContexts struct {
ctxPool pool.ObjectPool
finalizersPool finalizersArrayPool
finalizersPool finalizeablesArrayPool
}

// NewPool creates a new context pool.
func NewPool(opts Options) Pool {
p := &poolOfContexts{
ctxPool: pool.NewObjectPool(opts.ContextPoolOptions()),
finalizersPool: newFinalizersArrayPool(finalizersArrayPoolOpts{
finalizersPool: newFinalizeablesArrayPool(finalizeablesArrayPoolOpts{
Capacity: opts.InitPooledFinalizerCapacity(),
MaxCapacity: opts.MaxPooledFinalizerCapacity(),
Options: opts.FinalizerPoolOptions(),
Expand All @@ -57,10 +56,10 @@ func (p *poolOfContexts) Put(context Context) {
p.ctxPool.Put(context)
}

func (p *poolOfContexts) GetFinalizers() []resource.Finalizer {
func (p *poolOfContexts) getFinalizeables() []finalizeable {
return p.finalizersPool.Get()
}

func (p *poolOfContexts) PutFinalizers(finalizers []resource.Finalizer) {
p.finalizersPool.Put(finalizers)
func (p *poolOfContexts) putFinalizeables(finalizeables []finalizeable) {
p.finalizersPool.Put(finalizeables)
}
11 changes: 7 additions & 4 deletions context/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type Context interface {
// RegisterFinalizer will register a resource finalizer.
RegisterFinalizer(resource.Finalizer)

// RegisterCloser will register a resource closer.
RegisterCloser(resource.Closer)

// DependsOn will register a blocking context that
// must complete first before finalizers can be called.
DependsOn(Context)
Expand Down Expand Up @@ -106,9 +109,9 @@ type Options interface {
type contextPool interface {
Pool

// GetFinalizers provides a finalizer slice from the pool.
GetFinalizers() []resource.Finalizer
// getFinalizeables provides a finalizeables slice from the pool.
getFinalizeables() []finalizeable

// PutFinalizers returns the finalizers to pool.
PutFinalizers([]resource.Finalizer)
// putFinalizeables returns the finalizers to pool.
putFinalizeables([]finalizeable)
}
Loading

0 comments on commit 4eee939

Please sign in to comment.