Skip to content

Commit

Permalink
ref(boost): tidy up (#287)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 6, 2024
1 parent 04deaaf commit 0f5a8c3
Show file tree
Hide file tree
Showing 7 changed files with 19 additions and 29 deletions.
7 changes: 7 additions & 0 deletions boost/annotated-wait-group.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type namesCollection map[GoRoutineName]string
// that only allows adding to the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
// Deprecated: use ants base worker-pool instead.
type AnnotatedWgAdder interface {
Add(delta int, name ...GoRoutineName)
}
Expand All @@ -34,6 +35,7 @@ type AnnotatedWgAdder interface {
// that only allows Done signalling on the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
// Deprecated: use ants base worker-pool instead.
type AnnotatedWgQuitter interface {
Done(name ...GoRoutineName)
}
Expand All @@ -42,6 +44,7 @@ type AnnotatedWgQuitter interface {
// that allows adding to the wait group and Done signalling with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
// Deprecated: use ants base worker-pool instead.
type AnnotatedWgAQ interface {
AnnotatedWgAdder
AnnotatedWgQuitter
Expand All @@ -51,18 +54,21 @@ type AnnotatedWgAQ interface {
// that only allows waiting on the wait group with the addition of being
// able to specify the name representing the calling go routine. This interface
// can be acquired from the wait group using a standard interface type query.
// Deprecated: use ants base worker-pool instead.
type AnnotatedWgWaiter interface {
Wait(name ...GoRoutineName)
}

// AnnotatedWgCounter is the interface that is a restricted view of a wait group
// that only allows querying the wait group count. This interface
// can be acquired from the wait group using a standard interface type query.
// Deprecated: use ants base worker-pool instead.
type AnnotatedWgCounter interface {
Count() int
}

// WaitGroupAn the extended WaitGroup
// Deprecated: use ants base worker-pool instead.
type WaitGroupAn interface {
AnnotatedWgAdder
AnnotatedWgQuitter
Expand Down Expand Up @@ -132,6 +138,7 @@ type AnnotatedWaitGroup struct {

// NewAnnotatedWaitGroup creates a new AnnotatedWaitGroup instance containing
// the core WaitGroup instance.
// Deprecated: use ants base worker-pool instead.
func NewAnnotatedWaitGroup(name string, log ...*slog.Logger) WaitGroupAn {
logger := lo.TernaryF(len(log) > 0,
func() *slog.Logger {
Expand Down
6 changes: 3 additions & 3 deletions boost/examples/mf-input-injected-via-chan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func main() {
}

wg.Add(1)
go inject(ctx, pool, &wg)
go inject(ctx, pool, &wg)

wg.Add(1)
go consume(ctx, pool, &wg)
go consume(ctx, pool, &wg)

fmt.Printf("pool with func, no of running workers:%d\n",
pool.Running(),
)
wg.Wait()
fmt.Println("🏁 (manifold-func-pool) FINISHED")
fmt.Println("🏁 (manifold-func-pool, input injected via channel) FINISHED")
}

const (
Expand Down
6 changes: 3 additions & 3 deletions boost/generic-pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func source[I any](ctx context.Context,
inputDupCh := NewDuplex(make(SourceStream[I], o.Input.BufferSize))

wg.Add(1)
go func(ctx context.Context) {
go func(ctx context.Context, inputCh SourceStreamR[I]) {
defer func() {
closable.terminate()
wg.Done()
Expand All @@ -78,15 +78,15 @@ func source[I any](ctx context.Context,
select {
case <-ctx.Done():
return
case input, ok := <-inputDupCh.ReaderCh:
case input, ok := <-inputCh:
if !ok {
return
}

_ = injectable.inject(input)
}
}
}(ctx)
}(ctx, inputDupCh.ReaderCh)

return inputDupCh
}
Expand Down
8 changes: 0 additions & 8 deletions boost/observable.go

This file was deleted.

2 changes: 1 addition & 1 deletion boost/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ func withDefaults(options ...Option) []Option {
)
o := make([]Option, 0, len(options)+noDefaults)
o = append(o, WithGenerator(&Sequential{
Format: "ID:%v",
Format: "ID:%08d",
}))
o = append(o, options...)

Expand Down
14 changes: 0 additions & 14 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ const (
// fine tuned
//
DefaultChSize = 100

// minimumCheckCloseInterval denotes the minimum duration of how long to wait
// in between successive attempts to check wether the output channel can be
// closed when the source of the workload indicates no more jobs will be
// submitted, either by closing the input stream or invoking Conclude on the pool.
//
minimumCheckCloseInterval = time.Millisecond * 10
)

type (
Expand All @@ -42,13 +35,6 @@ type (
closable interface {
terminate()
}

// generic represents the common characteristics of all worker
// pools
generic interface {
options() *Options
terminate()
}
)

type injector[I any] func(input I) error
Expand Down
5 changes: 5 additions & 0 deletions boost/worker-pool-legacy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type privateWpInfoL[I, O any] struct {
// WorkerPoolL owns the resultOut channel, because it is the only entity that knows
// when all workers have completed their work due to the finished channel, which it also
// owns.
// Deprecated: use ants base worker-pool instead.
type WorkerPoolL[I, O any] struct {
private privateWpInfoL[I, O]
outputChTimeout time.Duration
Expand All @@ -57,6 +58,8 @@ type WorkerPoolL[I, O any] struct {
Logger *slog.Logger
}

// NewWorkerPoolParamsL
// Deprecated: use ants base worker-pool instead.
type NewWorkerPoolParamsL[I, O any] struct {
NoWorkers int
OutputChTimeout time.Duration
Expand All @@ -67,6 +70,8 @@ type NewWorkerPoolParamsL[I, O any] struct {
Logger *slog.Logger
}

// NewWorkerPoolL
// Deprecated: use ants base worker-pool instead.
func NewWorkerPoolL[I, O any](params *NewWorkerPoolParamsL[I, O]) *WorkerPoolL[I, O] {
noWorkers := runtime.NumCPU()
if params.NoWorkers > 1 && params.NoWorkers <= MaxWorkers {
Expand Down

0 comments on commit 0f5a8c3

Please sign in to comment.