Skip to content

Commit

Permalink
feat(ants,boost): expose input channel (#284)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 5, 2024
1 parent 549a6b5 commit 6b51295
Show file tree
Hide file tree
Showing 16 changed files with 346 additions and 34 deletions.
2 changes: 2 additions & 0 deletions boost/ants-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ type (
IDGenerator = ants.IDGenerator
InputParam = ants.InputParam
Option = ants.Option
Options = ants.Options
PoolFunc = ants.PoolFunc
TaskFunc = ants.TaskFunc
)
Expand All @@ -14,6 +15,7 @@ var (
WithDisablePurge = ants.WithDisablePurge
WithExpiryDuration = ants.WithExpiryDuration
WithGenerator = ants.WithGenerator
WithInput = ants.WithInput
WithMaxBlockingTasks = ants.WithMaxBlockingTasks
WithNonblocking = ants.WithNonblocking
WithOptions = ants.WithOptions
Expand Down
7 changes: 4 additions & 3 deletions boost/base-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,19 @@ import (
)

type (
basePool[O any] struct {
basePool[I, O any] struct {
wg *sync.WaitGroup
sequence int32
inputDupCh *Duplex[I]
outputDupCh *Duplex[JobOutput[O]]
ending bool
}
)

func (p *basePool[O]) next() int32 {
func (p *basePool[I, O]) next() int32 {
return atomic.AddInt32(&p.sequence, int32(1))
}

func (p *basePool[O]) Observe() JobOutputStreamR[O] {
func (p *basePool[I, O]) Observe() JobOutputStreamR[O] {
return p.outputDupCh.ReaderCh
}
4 changes: 4 additions & 0 deletions boost/boost-public-api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ const (
)

type (
SourceStream[I any] chan I
SourceStreamR[I any] <-chan I
SourceStreamW[I any] chan<- I

Job[I any] struct {
ID string
SequenceNo int
Expand Down
3 changes: 3 additions & 0 deletions boost/examples/alpha/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ func main() {
fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)

// Note, we don't need to inform the pool of the end of the workload
// since this pool is not emitting output.
wg.Wait()
fmt.Println("🏁 (func-pool) FINISHED")
}
3 changes: 3 additions & 0 deletions boost/examples/beta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func main() {
fmt.Printf("task pool, no of running workers:%d\n",
pool.Running(),
)

// Note, we don't need to inform the pool of the end of the workload
// since this pool is not emitting output.
wg.Wait()
fmt.Println("🏁 (task-pool) FINISHED")
}
99 changes: 99 additions & 0 deletions boost/examples/delta/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package main

import (
"context"
"fmt"
"sync"
"time"

"github.com/snivilised/lorax/boost"
)

// Demonstrates use of manifold func base worker pool where
// the client manifold func returns an output and an error.
// Submission to the pool occurs via an input channel as opposed
// directly invoking Post on the pool.

const (
AntsSize = 1000
n = 100000
InputChSize = 10
OutputChSize = 10
Param = 100
OutputChTimeout = time.Second / 2 // do not use a value that is similar to interval
CheckCloseInterval = time.Second / 10
)

func inject(ctx context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

inputCh := pool.Source(ctx, wg)
for i, n := 0, 100; i < n; i++ {
inputCh <- Param
}

// required to inform the worker pool that no more jobs will be submitted.
// failure to close the input channel will result in a never ending
// worker pool.
//
close(inputCh)
}

func consume(_ context.Context,
pool *boost.ManifoldFuncPool[int, int],
wg *sync.WaitGroup,
) {
defer wg.Done()

// We don't need to use a timeout on the observe channel
// because our producer invokes Conclude, which results in
// the observe channel being closed, terminating the range.
// This aspect is specific to this example and clients may
// have to use different strategies depending on their use-case,
// eg support for context cancellation.
//
for output := range pool.Observe() {
fmt.Printf("🍒 payload: '%v', id: '%v', seq: '%v' (e: '%v')\n",
output.Payload, output.ID, output.SequenceNo, output.Error,
)
}
}

func main() {
var wg sync.WaitGroup

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

pool, err := boost.NewManifoldFuncPool(
ctx, AntsSize, func(input int) (int, error) {
time.Sleep(time.Duration(input) * time.Millisecond)

return n + 1, nil
}, &wg,
boost.WithInput(InputChSize),
boost.WithOutput(OutputChSize, CheckCloseInterval),
)

defer pool.Release(ctx)

if err != nil {
fmt.Printf("🔥 error creating pool: '%v'\n", err)
return
}

wg.Add(1)
go inject(ctx, pool, &wg) //nolint:wsl // pendant

wg.Add(1)
go consume(ctx, pool, &wg) //nolint:wsl // pendant

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool) FINISHED")
}
21 changes: 12 additions & 9 deletions boost/examples/gamma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ import (
// all generated outputs.

const (
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
OutputChTimeout = time.Second / 2 // do not use a value that is similar to interval
interval = time.Second / 10
AntsSize = 1000
n = 100000
OutputChSize = 10
Param = 100
OutputChTimeout = time.Second / 2 // do not use a value that is similar to CheckCloseInterval
CheckCloseInterval = time.Second / 10
)

func produce(ctx context.Context,
Expand Down Expand Up @@ -52,7 +52,10 @@ func produce(ctx context.Context,
_ = pool.Post(ctx, Param)
}

pool.EndWork(ctx, interval)
// required to inform the worker pool that no more jobs will be submitted.
// failure to invoke Conclude will result in a never ending worker pool.
//
pool.Conclude(ctx)
}

func consume(_ context.Context,
Expand All @@ -62,7 +65,7 @@ func consume(_ context.Context,
defer wg.Done()

// We don't need to use a timeout on the observe channel
// because our producer invokes EndWork, which results in
// because our producer invokes Conclude, which results in
// the observe channel being closed, terminating the range.
// This aspect is specific to this example and clients may
// have to use different strategies depending on their use-case,
Expand All @@ -87,7 +90,7 @@ func main() {

return n + 1, nil
}, &wg,
boost.WithOutput(OutputChSize),
boost.WithOutput(OutputChSize, CheckCloseInterval),
)

defer pool.Release(ctx)
Expand Down
32 changes: 32 additions & 0 deletions boost/generic-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package boost

import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
)
Expand Down Expand Up @@ -47,3 +48,34 @@ func (p *taskPool) Running() int {
func (p *taskPool) Waiting() int {
return p.pool.Waiting()
}

func source[I any](ctx context.Context,
wg *sync.WaitGroup, o *ants.Options,
injectable injectable[I],
closable closable,
) *Duplex[I] {
inputDupCh := NewDuplex(make(SourceStream[I], o.Input.BufferSize))

wg.Add(1)
go func(ctx context.Context) { //nolint:wsl // pedant
defer func() {
closable.terminate()
wg.Done()
}()

for {
select {
case <-ctx.Done():
return
case input, ok := <-inputDupCh.ReaderCh:
if !ok {
return
}

_ = injectable.inject(input)
}
}
}(ctx)

return inputDupCh
}
18 changes: 18 additions & 0 deletions boost/options.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package boost

import (
"time"

"github.com/samber/lo"
)

// withDefaults prepends boost withDefaults to the sequence of options
func withDefaults(options ...Option) []Option {
const (
Expand All @@ -13,3 +19,15 @@ func withDefaults(options ...Option) []Option {

return o
}

func GetValidatedCheckCloseInterval(o *Options) time.Duration {
return lo.TernaryF(
o.Output != nil && o.Output.CheckCloseInterval > minimumCheckCloseInterval,
func() time.Duration {
return o.Output.CheckCloseInterval
},
func() time.Duration {
return minimumCheckCloseInterval
},
)
}
38 changes: 38 additions & 0 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
package boost

import (
"time"
)

const (
// TODO: This is just temporary, channel size definition still needs to be
// fine tuned
//
DefaultChSize = 100

// minimumCheckCloseInterval denotes the minimum duration of how long to wait
// in between successive attempts to check wether the output channel can be
// closed when the source of the workload indicates no more jobs will be
// submitted, either by closing the input stream or invoking Conclude on the pool.
//
minimumCheckCloseInterval = time.Millisecond * 10
)

type (
Expand All @@ -23,8 +34,35 @@ type (
}

workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]

injectable[I any] interface {
inject(input I) error
}

closable interface {
terminate()
}

// generic represents the common characteristics of all worker
// pools
generic interface {
options() *Options
terminate()
}
)

type injector[I any] func(input I) error

func (f injector[I]) inject(input I) error {
return f(input)
}

type terminator func()

func (f terminator) terminate() {
f()
}

// Worker pool types:
//
// 🍺 ManifoldFuncPool (to be used by traverse):
Expand Down
12 changes: 7 additions & 5 deletions boost/support_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,20 @@ const (
)

const (
Param = 100
AntsSize = 1000
TestSize = 10000
n = 100000
PoolSize = 10
Param = 100
AntsSize = 1000
TestSize = 10000
n = 100000
PoolSize = 10
InputBufferSize = 3
)

const (
RunTimes = 1e6
PoolCap = 5e4
BenchParam = 10
DefaultExpiredTime = 10 * time.Second
CheckCloseInterval = time.Second / 100
)

var curMem uint64
Expand Down
Loading

0 comments on commit 6b51295

Please sign in to comment.