Skip to content

Commit

Permalink
feat(boost): conditional options with if-option (#300)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 16, 2024
1 parent 8a003d2 commit 8b07060
Show file tree
Hide file tree
Showing 12 changed files with 144 additions and 31 deletions.
1 change: 1 addition & 0 deletions boost/ants-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ type (
Option = ants.Option
Options = ants.Options
PoolFunc = ants.PoolFunc
Sequential = ants.Sequential
TaskFunc = ants.TaskFunc
)

Expand Down
8 changes: 8 additions & 0 deletions boost/generic-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ func (p *functionalPool) Waiting() int {
return p.pool.Waiting()
}

func (p *functionalPool) GetOptions() *Options {
return p.pool.GetOptions()
}

// taskPool
type taskPool struct {
pool *ants.Pool
Expand All @@ -59,6 +63,10 @@ func (p *taskPool) Waiting() int {
return p.pool.Waiting()
}

func (p *taskPool) GetOptions() *Options {
return p.pool.GetOptions()
}

func source[I any](ctx context.Context,
wg WaitGroup, o *ants.Options,
injectable injectable[I],
Expand Down
21 changes: 0 additions & 21 deletions boost/options.go
Original file line number Diff line number Diff line change
@@ -1,22 +1 @@
package boost

import (
"runtime"
)

// withDefaults prepends boost withDefaults to the sequence of options
func withDefaults(options ...Option) []Option {
const (
noDefaults = 1
)
o := make([]Option, 0, len(options)+noDefaults)
o = append(o,
WithGenerator(&Sequential{
Format: "ID:%08d",
}),
WithSize(uint(runtime.NumCPU())),
)
o = append(o, options...)

return o
}
2 changes: 1 addition & 1 deletion boost/worker-pool-func-manifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func NewManifoldFuncPool[I, O any](ctx context.Context,
var (
oi *outputInfo[O]
wi *outputInfoW[O]
o = ants.LoadOptions(withDefaults(options...)...)
o = ants.NewOptions(options...)
)

if oi = newOutputInfo[O](o); oi != nil {
Expand Down
50 changes: 50 additions & 0 deletions boost/worker-pool-func-manifold_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package boost_test

import (
"context"
"runtime"
"sync"

. "github.com/onsi/ginkgo/v2" //nolint:revive // ginkgo ok
. "github.com/onsi/gomega" //nolint:revive // gomega ok

"github.com/snivilised/lorax/boost"
"github.com/snivilised/lorax/internal/ants"
)

func produce(ctx context.Context,
Expand Down Expand Up @@ -229,5 +231,53 @@ var _ = Describe("WorkerPoolFuncManifold", func() {
})
})
})

Context("IfOption", func() {
When("true", func() {
It("🧪 should: use option", func(specCtx SpecContext) {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

var wg sync.WaitGroup

const (
poolSize = 10
)

pool, _ := boost.NewManifoldFuncPool(
ctx, demoPoolManifoldFunc, &wg,
ants.If(true, ants.WithSize(poolSize)),
boost.WithInput(InputBufferSize),
boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend),
)

options := pool.GetOptions()
Expect(options.Size).To(BeEquivalentTo(poolSize))
})
})

When("false", func() {
It("🧪 should: use option", func(specCtx SpecContext) {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

var wg sync.WaitGroup

const (
poolSize = 10
)

pool, _ := boost.NewManifoldFuncPool(
ctx, demoPoolManifoldFunc, &wg,
ants.If(false, ants.WithSize(poolSize)),
boost.WithInput(InputBufferSize),
boost.WithOutput(10, CheckCloseInterval, TimeoutOnSend),
)

options := pool.GetOptions()
Expect(options.Size).To(BeEquivalentTo(runtime.NumCPU()))
})
})
})
})
})
2 changes: 1 addition & 1 deletion boost/worker-pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func NewFuncPool[I, O any](ctx context.Context,
// allocated for each job, but this is not necessarily
// the case, because each worker has its own job queue.
//
pool, err := ants.NewPoolWithFunc(ctx, pf, withDefaults(options...)...)
pool, err := ants.NewPoolWithFunc(ctx, pf, options...)

return &FuncPool[I, O]{
basePool: basePool[I, O]{
Expand Down
2 changes: 1 addition & 1 deletion boost/worker-pool-task.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func NewTaskPool[I, O any](ctx context.Context,
wg WaitGroup,
options ...Option,
) (*TaskPool[I, O], error) {
pool, err := ants.NewPool(ctx, withDefaults(options...)...)
pool, err := ants.NewPool(ctx, options...)

return &TaskPool[I, O]{
basePool: basePool[I, O]{
Expand Down
42 changes: 42 additions & 0 deletions internal/ants/ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ants_test

import (
"context"
"runtime"
"sync"
"time"

Expand All @@ -28,6 +29,7 @@ var _ = Describe("Ants", func() {
ants.WithSize(poolSize),
ants.WithNonblocking(true),
)

Expect(err).To(Succeed(), "create TimingPool failed")

defer pool.Release(ctx)
Expand Down Expand Up @@ -118,6 +120,46 @@ var _ = Describe("Ants", func() {
})
})
})

Context("IfOption", func() {
When("true", func() {
It("🧪 should: use option", func(specCtx SpecContext) {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

const (
poolSize = 10
)

pool, _ := ants.NewPool(ctx,
ants.If(true, ants.WithSize(poolSize)),
ants.WithNonblocking(true),
)

options := pool.GetOptions()
Expect(options.Size).To(BeEquivalentTo(poolSize))
})
})

When("false", func() {
It("🧪 should: use option", func(specCtx SpecContext) {
ctx, cancel := context.WithCancel(specCtx)
defer cancel()

const (
poolSize = 10
)

pool, _ := ants.NewPool(ctx,
ants.If(false, ants.WithSize(poolSize)),
ants.WithNonblocking(true),
)

options := pool.GetOptions()
Expect(options.Size).To(BeEquivalentTo(runtime.NumCPU()))
})
})
})
})

Context("NewPoolWithFunc", func() {
Expand Down
41 changes: 37 additions & 4 deletions internal/ants/options.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,54 @@
package ants

import (
"runtime"
"time"
)

// Option represents the optional function.
// Option represents the functional option.
type Option func(opts *Options)

func LoadOptions(options ...Option) *Options {
// NewOptions creates new options instance with defaults
// applied.
func NewOptions(options ...Option) *Options {
opts := new(Options)
for _, option := range options {
option(opts)
combined := withDefaults(options...)

for _, option := range combined {
if option != nil { // nil check supports conditional options
option(opts)
}
}

return opts
}

// If enables options to be conditional. If ifo evaluates to true
// then the option is returned, otherwise nil.
func If(condition bool, option Option) Option {
if condition {
return option
}

return nil
}

func withDefaults(options ...Option) []Option {
const (
noDefaults = 1
)
o := make([]Option, 0, len(options)+noDefaults)
o = append(o,
WithGenerator(&Sequential{
Format: "ID:%08d",
}),
WithSize(uint(runtime.NumCPU())),
)
o = append(o, options...)

return o
}

// Options contains all options which will be applied when instantiating an ants pool.
type Options struct {
// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
Expand Down
2 changes: 1 addition & 1 deletion internal/ants/pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func NewPoolWithFunc(ctx context.Context,
return nil, ErrLackPoolFunc
}

opts := LoadOptions(options...)
opts := NewOptions(options...)
size := opts.Size

if size == 0 {
Expand Down
2 changes: 1 addition & 1 deletion internal/ants/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (p *Pool) nowTime() time.Time {

// NewPool instantiates a Pool with customized options.
func NewPool(ctx context.Context, options ...Option) (*Pool, error) {
opts := LoadOptions(options...)
opts := NewOptions(options...)
size := opts.Size

if size == 0 {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package boost
package ants

import (
"fmt"
Expand Down

0 comments on commit 8b07060

Please sign in to comment.