Skip to content

Commit

Permalink
opt: speed up the Least-Connections load-balancing
Browse files Browse the repository at this point in the history
By reducing calls to global mutex of thread-safe min-heap
  • Loading branch information
panjf2000 committed Jun 7, 2020
1 parent 258253c commit b5fbbda
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
9 changes: 0 additions & 9 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package gnet

import (
"net"
"sync/atomic"
"time"

"github.com/panjf2000/gnet/internal/netpoll"
Expand All @@ -28,14 +27,6 @@ type eventloop struct {
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
}

func (el *eventloop) adjustConnCount(delta int32) {
atomic.AddInt32(&el.connCount, delta)
}

func (el *eventloop) loadConnCount() int32 {
return atomic.LoadInt32(&el.connCount)
}

func (el *eventloop) closeAllConns() {
// Close loops and all outstanding connections
for _, c := range el.connections {
Expand Down
9 changes: 0 additions & 9 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ package gnet

import (
"net"
"sync/atomic"
"time"

"github.com/panjf2000/gnet/pool/bytebuffer"
Expand All @@ -26,14 +25,6 @@ type eventloop struct {
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
}

func (el *eventloop) adjustConnCount(delta int32) {
atomic.AddInt32(&el.connCount, delta)
}

func (el *eventloop) loadConnCount() int32 {
return atomic.LoadInt32(&el.connCount)
}

func (el *eventloop) loopRun() {
var err error
defer func() {
Expand Down
3 changes: 2 additions & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"runtime"
"strings"
"sync/atomic"
"time"

"github.com/panjf2000/gnet/internal/netpoll"
Expand Down Expand Up @@ -66,7 +67,7 @@ type Server struct {
// CountConnections counts the number of currently active connections and returns it.
func (s Server) CountConnections() (count int) {
s.svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool {
count += int(el.loadConnCount())
count += int(atomic.LoadInt32(&el.connCount))
return true
})
return
Expand Down
49 changes: 35 additions & 14 deletions load_balancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gnet
import (
"container/heap"
"sync"
"sync/atomic"
)

// LoadBalancing represents the the type of load-balancing algorithm.
Expand Down Expand Up @@ -44,7 +45,10 @@ type (
// leastConnectionsEventLoopSet with Least-Connections algorithm.
leastConnectionsEventLoopSet struct {
sync.RWMutex
minHeap minEventLoopHeap
minHeap minEventLoopHeap
cachedRoot *eventloop
threshold int32
calibrateConnsThreshold int32
}

// sourceAddrHashEventLoopSet with Hash algorithm.
Expand Down Expand Up @@ -84,12 +88,12 @@ func (set *roundRobinEventLoopSet) len() int {
}

func (set *roundRobinEventLoopSet) calibrate(el *eventloop, delta int32) {
el.adjustConnCount(delta)
atomic.AddInt32(&el.connCount, delta)
}

// ================================= Implementation of Least-Connections load-balancer =================================

// Leverage min-heap to optimize Least-Connections load-balancing
// Leverage min-heap to optimize Least-Connections load-balancing.
type minEventLoopHeap []*eventloop

// Implement heap.Interface: Len, Less, Swap, Push, Pop.
Expand Down Expand Up @@ -126,16 +130,30 @@ func (h *minEventLoopHeap) Pop() interface{} {
func (set *leastConnectionsEventLoopSet) register(el *eventloop) {
set.Lock()
heap.Push(&set.minHeap, el)
if el.idx == 0 {
set.cachedRoot = el
}
set.calibrateConnsThreshold = int32(set.minHeap.Len())
set.Unlock()
}

// next returns the eligible event-loop by taking the root node from minimum heap based on Least-Connections algorithm.
func (set *leastConnectionsEventLoopSet) next(_ int) (el *eventloop) {
set.RLock()
//el = heap.Pop(&set.minHeap).(*eventloop)
el = set.minHeap[0]
set.RUnlock()
return
//set.RLock()
//el = set.minHeap[0]
//set.RUnlock()
//return

// In most cases, `next` method returns the cached event-loop immediately and it only
// reconstruct the min-heap every `calibrateConnsThreshold` times to reduce calls to global mutex.
if atomic.LoadInt32(&set.threshold) >= set.calibrateConnsThreshold {
set.Lock()
atomic.StoreInt32(&set.threshold, 0)
heap.Init(&set.minHeap)
set.cachedRoot = set.minHeap[0]
set.Unlock()
}
return set.cachedRoot
}

func (set *leastConnectionsEventLoopSet) iterate(f func(int, *eventloop) bool) {
Expand All @@ -156,11 +174,14 @@ func (set *leastConnectionsEventLoopSet) len() (size int) {
}

func (set *leastConnectionsEventLoopSet) calibrate(el *eventloop, delta int32) {
//el.adjustConnCount(delta)
set.Lock()
el.connCount += delta
heap.Fix(&set.minHeap, el.idx)
set.Unlock()
//set.Lock()
//el.connCount += delta
//heap.Fix(&set.minHeap, el.idx)
//set.Unlock()
set.RLock()
atomic.AddInt32(&el.connCount, delta)
atomic.AddInt32(&set.threshold, 1)
set.RUnlock()
}

// ======================================= Implementation of Hash load-balancer ========================================
Expand Down Expand Up @@ -189,5 +210,5 @@ func (set *sourceAddrHashEventLoopSet) len() int {
}

func (set *sourceAddrHashEventLoopSet) calibrate(el *eventloop, delta int32) {
el.adjustConnCount(delta)
atomic.AddInt32(&el.connCount, delta)
}

0 comments on commit b5fbbda

Please sign in to comment.