Skip to content

Commit

Permalink
ref(ants,boost): avoid storing context (#279)
Browse files Browse the repository at this point in the history
  • Loading branch information
plastikfan committed Jun 2, 2024
1 parent c0fa094 commit 212f623
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 75 deletions.
4 changes: 2 additions & 2 deletions boost/examples/alpha/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,11 @@ func main() {
time.Sleep(time.Second)
}, &wg, ants.WithNonblocking(false))

defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < 30; i++ { // producer
fmt.Printf("PRE: <--- (n: %v) [%v] 🍋 \n", i, time.Now().Format(time.TimeOnly))
_ = pool.Post(i)
_ = pool.Post(ctx, i)
fmt.Printf("POST: <--- (n: %v) [%v] 🍊 \n", i, time.Now().Format(time.TimeOnly))
}

Expand Down
4 changes: 2 additions & 2 deletions boost/examples/beta/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func main() {

pool, _ := boost.NewTaskPool[int, int](ctx, NoW, &wg)

defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < 30; i++ { // producer
fmt.Printf("PRE: <--- (n: %v) [%v] 🍋 \n", i, time.Now().Format(time.TimeOnly))
_ = pool.Post(func() {
_ = pool.Post(ctx, func() {
fmt.Printf("=> running: '%v')\n", pool.Running())
fmt.Printf("<--- (n: %v)🍒 \n", i)
time.Sleep(time.Second)
Expand Down
2 changes: 0 additions & 2 deletions boost/pool-defs-internal.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package boost

import (
"context"
"sync"

"github.com/snivilised/lorax/internal/ants"
Expand Down Expand Up @@ -32,7 +31,6 @@ type (
workersCollectionL[I, O any] map[workerID]*workerWrapperL[I, O]

basePool struct {
ctx context.Context
wg *sync.WaitGroup
idGen IDGenerator
}
Expand Down
1 change: 0 additions & 1 deletion boost/worker-pool-func-manifold.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func NewManifoldFuncPool[I, O any](ctx context.Context,

return &ManifoldFuncPool[I, O]{
basePool: basePool{
ctx: ctx,
idGen: &Sequential{},
wg: wg,
},
Expand Down
9 changes: 4 additions & 5 deletions boost/worker-pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func NewFuncPool[I, O any](ctx context.Context,

return &FuncPool[I, O]{
basePool: basePool{
ctx: ctx,
wg: wg,
idGen: &Sequential{},
},
Expand All @@ -40,14 +39,14 @@ func NewFuncPool[I, O any](ctx context.Context,
}, err
}

func (p *FuncPool[I, O]) Post(job ants.InputParam) error {
return p.pool.Invoke(job)
func (p *FuncPool[I, O]) Post(ctx context.Context, job ants.InputParam) error {
return p.pool.Invoke(ctx, job)
}

func (p *FuncPool[I, O]) Running() int {
return p.pool.Running()
}

func (p *FuncPool[I, O]) Release() {
p.pool.Release()
func (p *FuncPool[I, O]) Release(ctx context.Context) {
p.pool.Release(ctx)
}
18 changes: 9 additions & 9 deletions boost/worker-pool-func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ var _ = Describe("WorkerPoolFunc", func() {

pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg)

defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < n; i++ {
_ = pool.Post(Param)
_ = pool.Post(ctx, Param)
}
wg.Wait()
GinkgoWriter.Printf("pool with func, running workers number:%d\n",
Expand All @@ -48,10 +48,10 @@ var _ = Describe("WorkerPoolFunc", func() {

pool, err := boost.NewFuncPool[int, int](ctx, AntsSize, demoPoolFunc, &wg)

defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < n; i++ {
_ = pool.Post(Param)
_ = pool.Post(ctx, Param)

if i > 10 {
cancel()
Expand Down Expand Up @@ -82,18 +82,18 @@ var _ = Describe("WorkerPoolFunc", func() {
)

Expect(err).To(Succeed(), "create TimingPool failed")
defer pool.Release()
defer pool.Release(ctx)

By("👾 POOL-CREATED\n")
for i := 0; i < PoolSize-1; i++ {
Expect(pool.Post(Param)).To(Succeed(),
Expect(pool.Post(ctx, Param)).To(Succeed(),
"submit when pool is not full shouldn't return error",
)
}

ch := make(chan struct{})
// pool is full now.
Expect(pool.Post(ch)).To(Succeed(),
Expect(pool.Post(ctx, ch)).To(Succeed(),
"submit when pool is not full shouldn't return error",
)

Expand All @@ -103,15 +103,15 @@ var _ = Describe("WorkerPoolFunc", func() {

go func() {
// should be blocked. blocking num == 1
if err := pool.Post(Param); err != nil {
if err := pool.Post(ctx, Param); err != nil {
errCh <- err
}
By("👾 Producer complete\n")
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
Expect(pool.Post(Param)).To(MatchError(ants.ErrPoolOverload.Error()),
Expect(pool.Post(ctx, Param)).To(MatchError(ants.ErrPoolOverload.Error()),
"blocking submit when pool reach max blocking submit should return ErrPoolOverload",
)

Expand Down
9 changes: 4 additions & 5 deletions boost/worker-pool-task.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ func NewTaskPool[I, O any](ctx context.Context,

return &TaskPool[I, O]{
basePool: basePool{
ctx: ctx,
wg: wg,
idGen: &Sequential{},
},
Expand All @@ -56,14 +55,14 @@ func NewTaskPool[I, O any](ctx context.Context,
}, err
}

func (p *TaskPool[I, O]) Post(task ants.TaskFunc) error {
return p.pool.Submit(p.ctx, task)
func (p *TaskPool[I, O]) Post(ctx context.Context, task ants.TaskFunc) error {
return p.pool.Submit(ctx, task)
}

func (p *TaskPool[I, O]) Running() int {
return p.pool.Running()
}

func (p *TaskPool[I, O]) Release() {
p.pool.Release()
func (p *TaskPool[I, O]) Release(ctx context.Context) {
p.pool.Release(ctx)
}
20 changes: 10 additions & 10 deletions boost/worker-pool-task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ var _ = Describe("WorkerPoolTask", func() {
pool, err := boost.NewTaskPool[int, int](ctx, PoolSize, &wg,
boost.WithNonblocking(true),
)
defer pool.Release()
defer pool.Release(ctx)

Expect(err).To(Succeed())
Expect(pool).NotTo(BeNil())

for i := 0; i < PoolSize-1; i++ {
Expect(pool.Post(longRunningFunc)).To(Succeed(),
Expect(pool.Post(ctx, longRunningFunc)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)
}
Expand All @@ -43,16 +43,16 @@ var _ = Describe("WorkerPoolTask", func() {
close(secondCh)
}
// pool is full now.
Expect(pool.Post(fn)).To(Succeed(),
Expect(pool.Post(ctx, fn)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)
Expect(pool.Post(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()),
Expect(pool.Post(ctx, demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()),
"nonblocking submit when pool is full should get an ErrPoolOverload",
)
// interrupt fn to get an available worker
close(firstCh)
<-secondCh
Expect(pool.Post(demoFunc)).To(Succeed(),
Expect(pool.Post(ctx, demoFunc)).To(Succeed(),
"nonblocking submit when pool is not full shouldn't return error",
)
})
Expand All @@ -70,11 +70,11 @@ var _ = Describe("WorkerPoolTask", func() {
boost.WithMaxBlockingTasks(1),
)
Expect(err).To(Succeed(), "create TimingPool failed")
defer pool.Release()
defer pool.Release(ctx)

By("👾 POOL-CREATED\n")
for i := 0; i < PoolSize-1; i++ {
Expect(pool.Post(longRunningFunc)).To(Succeed(),
Expect(pool.Post(ctx, longRunningFunc)).To(Succeed(),
"submit when pool is not full shouldn't return error",
)
}
Expand All @@ -83,7 +83,7 @@ var _ = Describe("WorkerPoolTask", func() {
<-ch
}
// pool is full now.
Expect(pool.Post(fn)).To(Succeed(),
Expect(pool.Post(ctx, fn)).To(Succeed(),
"submit when pool is not full shouldn't return error",
)

Expand All @@ -92,7 +92,7 @@ var _ = Describe("WorkerPoolTask", func() {
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := pool.Post(demoFunc); err != nil {
if err := pool.Post(ctx, demoFunc); err != nil {
errCh <- err
}
By("👾 Producer complete\n")
Expand All @@ -103,7 +103,7 @@ var _ = Describe("WorkerPoolTask", func() {
time.Sleep(1 * time.Second)

// already reached max blocking limit
Expect(pool.Post(demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()),
Expect(pool.Post(ctx, demoFunc)).To(MatchError(ants.ErrPoolOverload.Error()),
"blocking submit when pool reach max blocking submit should return ErrPoolOverload",
)

Expand Down
12 changes: 6 additions & 6 deletions internal/ants/ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ var _ = Describe("Ants", func() {
pool, err := ants.NewPool(ctx, poolSize, ants.WithNonblocking(true))
Expect(err).To(Succeed(), "create TimingPool failed")

defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < poolSize-1; i++ {
Expect(pool.Submit(ctx, longRunningFunc)).To(Succeed(),
Expand Down Expand Up @@ -66,7 +66,7 @@ var _ = Describe("Ants", func() {
pool, err := ants.NewPool(ctx, poolSize, ants.WithMaxBlockingTasks(1))
Expect(err).To(Succeed(), "create TimingPool failed")

defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < poolSize-1; i++ {
Expect(pool.Submit(ctx, longRunningFunc)).To(Succeed(),
Expand Down Expand Up @@ -126,11 +126,11 @@ var _ = Describe("Ants", func() {
demoPoolFunc(i)
wg.Done()
})
defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < n; i++ {
wg.Add(1)
_ = pool.Invoke(Param)
_ = pool.Invoke(ctx, Param)
}
wg.Wait()
GinkgoWriter.Printf("pool with func, running workers number:%d\n",
Expand All @@ -152,11 +152,11 @@ var _ = Describe("Ants", func() {
demoPoolFunc(i)
wg.Done()
}, ants.WithPreAlloc(true))
defer pool.Release()
defer pool.Release(ctx)

for i := 0; i < n; i++ {
wg.Add(1)
_ = pool.Invoke(Param)
_ = pool.Invoke(ctx, Param)
}
wg.Wait()
GinkgoWriter.Printf("pool with func, running workers number:%d\n",
Expand Down
23 changes: 11 additions & 12 deletions internal/ants/pool-func.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,21 +105,21 @@ func (p *PoolWithFunc) ticktock(ticktockCtx context.Context) {
}
}

func (p *PoolWithFunc) goPurge() {
func (p *PoolWithFunc) goPurge(ctx context.Context) {
if p.o.DisablePurge {
return
}

// Start a goroutine to clean up expired workers periodically.
var purgeCtx context.Context
purgeCtx, p.stopPurge = context.WithCancel(p.ctx)
purgeCtx, p.stopPurge = context.WithCancel(ctx)
go p.purgeStaleWorkers(purgeCtx)
}

func (p *PoolWithFunc) goTicktock() {
func (p *PoolWithFunc) goTicktock(ctx context.Context) {
p.now.Store(time.Now())
var ticktockCtx context.Context
ticktockCtx, p.stopTicktock = context.WithCancel(p.ctx)
ticktockCtx, p.stopTicktock = context.WithCancel(ctx)
go p.ticktock(ticktockCtx)
}

Expand Down Expand Up @@ -157,7 +157,6 @@ func NewPoolWithFunc(ctx context.Context,

p := &PoolWithFunc{
workerPool: workerPool{
ctx: ctx,
capacity: int32(size),
lock: async.NewSpinLock(),
o: opts,
Expand All @@ -181,8 +180,8 @@ func NewPoolWithFunc(ctx context.Context,

p.cond = sync.NewCond(p.lock)

p.goPurge()
p.goTicktock()
p.goPurge(ctx)
p.goTicktock(ctx)

return p, nil
}
Expand All @@ -193,26 +192,26 @@ func NewPoolWithFunc(ctx context.Context,
// but what calls for special attention is that you will get blocked with the last
// Pool.Invoke() call once the current Pool runs out of its capacity, and to avoid this,
// you should instantiate a PoolWithFunc with ants.WithNonblocking(true).
func (p *PoolWithFunc) Invoke(job InputParam) error {
func (p *PoolWithFunc) Invoke(ctx context.Context, job InputParam) error {
if p.IsClosed() {
return ErrPoolClosed
}

w, err := p.retrieveWorker()
if w != nil {
w.sendParam(p.ctx, job)
w.sendParam(ctx, job)
}

return err
}

// Reboot reboots a closed pool.
func (p *PoolWithFunc) Reboot() {
func (p *PoolWithFunc) Reboot(ctx context.Context) {
if atomic.CompareAndSwapInt32(&p.state, CLOSED, OPENED) {
atomic.StoreInt32(&p.purgeDone, 0)
p.goPurge()
p.goPurge(ctx)
atomic.StoreInt32(&p.ticktockDone, 0)
p.goTicktock()
p.goTicktock(ctx)
}
}

Expand Down
Loading

0 comments on commit 212f623

Please sign in to comment.