Skip to content

Commit

Permalink
Make several optimizations to worker pool (#680)
Browse files Browse the repository at this point in the history
* Use binary-search algorithm to speed up cleaning up workers

* Speed it up when iterating the slice of workerChan

* Use sync.Pool as a more canonical way

* Add benchmark test between binary-search and linear search

* Optimize range to the slice of workerChan, avoiding elements copy

* Perfect the benchmark of work pool

* Make binary-search code inline and remove benchmark test code
  • Loading branch information
panjf2000 authored and erikdubbelboer committed Oct 27, 2019
1 parent f82a646 commit 9f11af2
Showing 1 changed file with 32 additions and 21 deletions.
53 changes: 32 additions & 21 deletions workerpool.go
Expand Up @@ -50,6 +50,11 @@ func (wp *workerPool) Start() {
}
wp.stopCh = make(chan struct{})
stopCh := wp.stopCh
wp.workerChanPool.New = func() interface{} {
return &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
go func() {
var scratch []*workerChan
for {
Expand All @@ -76,8 +81,8 @@ func (wp *workerPool) Stop() {
// serving the connection and noticing wp.mustStop = true.
wp.lock.Lock()
ready := wp.ready
for i, ch := range ready {
ch.ch <- nil
for i := range ready {
ready[i].ch <- nil
ready[i] = nil
}
wp.ready = ready[:0]
Expand All @@ -97,32 +102,43 @@ func (wp *workerPool) clean(scratch *[]*workerChan) {

// Clean least recently used workers if they didn't serve connections
// for more than maxIdleWorkerDuration.
currentTime := time.Now()
criticalTime := time.Now().Add(-maxIdleWorkerDuration)

wp.lock.Lock()
ready := wp.ready
n := len(ready)
i := 0
for i < n && currentTime.Sub(ready[i].lastUseTime) > maxIdleWorkerDuration {
i++
}
*scratch = append((*scratch)[:0], ready[:i]...)
if i > 0 {
m := copy(ready, ready[i:])
for i = m; i < n; i++ {
ready[i] = nil

// Use binary-search algorithm to find out the index of the least recently worker which can be cleaned up.
l, r, mid := 0, n-1, 0
for l <= r {
mid = (l + r) / 2
if criticalTime.After(wp.ready[mid].lastUseTime) {
l = mid + 1
} else {
r = mid - 1
}
wp.ready = ready[:m]
}
i := r
if i == -1 {
wp.lock.Unlock()
return
}

*scratch = append((*scratch)[:0], ready[:i+1]...)
m := copy(ready, ready[i+1:])
for i = m; i < n; i++ {
ready[i] = nil
}
wp.ready = ready[:m]
wp.lock.Unlock()

// Notify obsolete workers to stop.
// This notification must be outside the wp.lock, since ch.ch
// may be blocking and may consume a lot of time if many workers
// are located on non-local CPUs.
tmp := *scratch
for i, ch := range tmp {
ch.ch <- nil
for i := range tmp {
tmp[i].ch <- nil
tmp[i] = nil
}
}
Expand Down Expand Up @@ -174,11 +190,6 @@ func (wp *workerPool) getCh() *workerChan {
return nil
}
vch := wp.workerChanPool.Get()
if vch == nil {
vch = &workerChan{
ch: make(chan net.Conn, workerChanCap),
}
}
ch = vch.(*workerChan)
go func() {
wp.workerFunc(ch)
Expand Down Expand Up @@ -222,7 +233,7 @@ func (wp *workerPool) workerFunc(ch *workerChan) {
if err == errHijacked {
wp.connState(c, StateHijacked)
} else {
c.Close()
_ = c.Close()
wp.connState(c, StateClosed)
}
c = nil
Expand Down

0 comments on commit 9f11af2

Please sign in to comment.