Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
POABOB committed Oct 18, 2023
2 parents e209aae + 0418fee commit 770d4d6
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 25 deletions.
17 changes: 17 additions & 0 deletions .github/release-drafter.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ autolabeler:
title:
- /fix/i
- /bug/i
- /patch/i
- label: docs
files:
- '*.md'
Expand All @@ -65,11 +66,27 @@ autolabeler:
- /feature/i
- /implement/i
- /add/i
- /minor/i
- label: dependencies
title:
- /dependencies/i
- /upgrade/i
- /bump up/i
- label: chores
title:
- /chore/i
- /\bmisc\b/i
- /cleanup/i
- /clean up/i
- label: major
title:
- /major:/i
- label: minor
title:
- /minor:/i
- label: patch
title:
- /patch:/i
template: |
## Changelogs
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ jobs:
restore-keys: |
${{ runner.os }}-${{ matrix.go }}-go-ci
- name: Run unit tests
- name: Run unit tests and integrated tests
run: go test -v -race -coverprofile="codecov.report" -covermode=atomic

- name: Upload code coverage report to Codecov
Expand Down
21 changes: 9 additions & 12 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func (p *Pool) purgeStaleWorkers(ctx context.Context) {
staleWorkers[i] = nil
}

// There might be a situation where all workers have been cleaned up(no worker is running),
// while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
// There might be a situation where all workers have been cleaned up (no worker is running),
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
Expand Down Expand Up @@ -207,8 +207,6 @@ func NewPool(size int, options ...Option) (*Pool, error) {
return p, nil
}

// ---------------------------------------------------------------------------

// Submit submits a task to this pool.
//
// Note that you are allowed to call Pool.Submit() from the current Pool.Submit(),
Expand All @@ -219,11 +217,12 @@ func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}
if w := p.retrieveWorker(); w != nil {

w, err := p.retrieveWorker()
if w != nil {
w.inputFunc(task)
return nil
}
return ErrPoolOverload
return err
}

// Running returns the number of workers currently running.
Expand Down Expand Up @@ -321,8 +320,6 @@ func (p *Pool) Reboot() {
}
}

// ---------------------------------------------------------------------------

func (p *Pool) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}
Expand All @@ -332,7 +329,7 @@ func (p *Pool) addWaiting(delta int) {
}

// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker() (w worker) {
func (p *Pool) retrieveWorker() (w worker, err error) {
p.lock.Lock()

retry:
Expand All @@ -354,7 +351,7 @@ retry:
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
return
return nil, ErrPoolOverload
}

// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
Expand All @@ -364,7 +361,7 @@ retry:

if p.IsClosed() {
p.lock.Unlock()
return
return nil, ErrPoolClosed
}

goto retry
Expand Down
21 changes: 9 additions & 12 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func (p *PoolWithFunc) purgeStaleWorkers(ctx context.Context) {
staleWorkers[i] = nil
}

// There might be a situation where all workers have been cleaned up(no worker is running),
// while some invokers still are stuck in "p.cond.Wait()", then we need to awake those invokers.
// There might be a situation where all workers have been cleaned up (no worker is running),
// while some invokers still are stuck in p.cond.Wait(), then we need to awake those invokers.
if isDormant && p.Waiting() > 0 {
p.cond.Broadcast()
}
Expand Down Expand Up @@ -213,8 +213,6 @@ func NewPoolWithFunc(size int, pf func(interface{}), options ...Option) (*PoolWi
return p, nil
}

//---------------------------------------------------------------------------

// Invoke submits a task to pool.
//
// Note that you are allowed to call Pool.Invoke() from the current Pool.Invoke(),
Expand All @@ -225,11 +223,12 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
if p.IsClosed() {
return ErrPoolClosed
}
if w := p.retrieveWorker(); w != nil {

w, err := p.retrieveWorker()
if w != nil {
w.inputParam(args)
return nil
}
return ErrPoolOverload
return err
}

// Running returns the number of workers currently running.
Expand Down Expand Up @@ -327,8 +326,6 @@ func (p *PoolWithFunc) Reboot() {
}
}

//---------------------------------------------------------------------------

func (p *PoolWithFunc) addRunning(delta int) {
atomic.AddInt32(&p.running, int32(delta))
}
Expand All @@ -338,7 +335,7 @@ func (p *PoolWithFunc) addWaiting(delta int) {
}

// retrieveWorker returns an available worker to run the tasks.
func (p *PoolWithFunc) retrieveWorker() (w worker) {
func (p *PoolWithFunc) retrieveWorker() (w worker, err error) {
p.lock.Lock()

retry:
Expand All @@ -360,7 +357,7 @@ retry:
// Bail out early if it's in nonblocking mode or the number of pending callers reaches the maximum limit value.
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
return
return nil, ErrPoolOverload
}

// Otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
Expand All @@ -370,7 +367,7 @@ retry:

if p.IsClosed() {
p.lock.Unlock()
return
return nil, ErrPoolClosed
}

goto retry
Expand Down

0 comments on commit 770d4d6

Please sign in to comment.