Skip to content

Commit

Permalink
opt: improve the implementation of Least-Connections load-balancing
Browse files Browse the repository at this point in the history
Leverage min-heap to optimize Least-Connections load-balancing
  • Loading branch information
panjf2000 committed Jun 6, 2020
1 parent 5197f18 commit b6a5f56
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 147 deletions.
4 changes: 2 additions & 2 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ func (svr *server) acceptNewConnection(fd int) error {
if err := unix.SetNonblock(nfd, true); err != nil {
return err
}
el := svr.subLoopGroup.next(nfd)
el := svr.subEventLoopSet.next(nfd)
c := newTCPConn(nfd, el, sa)
_ = el.poller.Trigger(func() (err error) {
if err = el.poller.AddRead(nfd); err != nil {
return
}
el.connections[nfd] = c
el.plusConnCount()
el.calibrateCallback(el, 1)
err = el.loopOpen(c)
return
})
Expand Down
4 changes: 2 additions & 2 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (svr *server) listenerRun() {
buf := bytebuffer.Get()
_, _ = buf.Write(packet[:n])

el := svr.subLoopGroup.next(hashCode(addr.String()))
el := svr.subEventLoopSet.next(hashCode(addr.String()))
el.ch <- &udpIn{newUDPConn(el, svr.ln.lnaddr, addr, buf)}
} else {
// Accept TCP socket.
Expand All @@ -47,7 +47,7 @@ func (svr *server) listenerRun() {
err = e
return
}
el := svr.subLoopGroup.next(hashCode(conn.RemoteAddr().String()))
el := svr.subEventLoopSet.next(hashCode(conn.RemoteAddr().String()))
c := newTCPConn(conn, el)
el.ch <- c
go func() {
Expand Down
29 changes: 13 additions & 16 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,19 @@ import (
)

type eventloop struct {
idx int // loop index in the server loops list
svr *server // server in loop
codec ICodec // codec for TCP
packet []byte // read packet buffer
poller *netpoll.Poller // epoll or kqueue
connCount int32 // number of active connections in event-loop
connections map[int]*conn // loop connections fd -> conn
eventHandler EventHandler // user eventHandler
idx int // loop index in the server loops list
svr *server // server in loop
codec ICodec // codec for TCP
packet []byte // read packet buffer
poller *netpoll.Poller // epoll or kqueue
connCount int32 // number of active connections in event-loop
connections map[int]*conn // loop connections fd -> conn
eventHandler EventHandler // user eventHandler
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
}

func (el *eventloop) plusConnCount() {
atomic.AddInt32(&el.connCount, 1)
}

func (el *eventloop) minusConnCount() {
atomic.AddInt32(&el.connCount, -1)
func (el *eventloop) adjustConnCount(delta int32) {
atomic.AddInt32(&el.connCount, delta)
}

func (el *eventloop) loadConnCount() int32 {
Expand Down Expand Up @@ -80,7 +77,7 @@ func (el *eventloop) loopAccept(fd int) error {
c := newTCPConn(nfd, el, sa)
if err = el.poller.AddRead(c.fd); err == nil {
el.connections[c.fd] = c
el.plusConnCount()
el.calibrateCallback(el, 1)
return el.loopOpen(c)
}
return err
Expand Down Expand Up @@ -179,7 +176,7 @@ func (el *eventloop) loopCloseConn(c *conn, err error) error {
err0, err1 := el.poller.Delete(c.fd), unix.Close(c.fd)
if err0 == nil && err1 == nil {
delete(el.connections, c.fd)
el.minusConnCount()
el.calibrateCallback(el, -1)
switch el.eventHandler.OnClosed(c, err) {
case Shutdown:
return errServerShutdown
Expand Down
27 changes: 12 additions & 15 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,18 @@ import (
)

type eventloop struct {
ch chan interface{} // command channel
idx int // loop index
svr *server // server in loop
codec ICodec // codec for TCP
connCount int32 // number of active connections in event-loop
connections map[*stdConn]struct{} // track all the sockets bound to this loop
eventHandler EventHandler // user eventHandler
ch chan interface{} // command channel
idx int // loop index
svr *server // server in loop
codec ICodec // codec for TCP
connCount int32 // number of active connections in event-loop
connections map[*stdConn]struct{} // track all the sockets bound to this loop
eventHandler EventHandler // user eventHandler
calibrateCallback func(*eventloop, int32) // callback func for re-adjusting connCount
}

func (el *eventloop) plusConnCount() {
atomic.AddInt32(&el.connCount, 1)
}

func (el *eventloop) minusConnCount() {
atomic.AddInt32(&el.connCount, -1)
func (el *eventloop) adjustConnCount(delta int32) {
atomic.AddInt32(&el.connCount, delta)
}

func (el *eventloop) loadConnCount() int32 {
Expand Down Expand Up @@ -79,7 +76,7 @@ func (el *eventloop) loopAccept(c *stdConn) error {
el.connections[c] = struct{}{}
c.localAddr = el.svr.ln.lnaddr
c.remoteAddr = c.conn.RemoteAddr()
el.plusConnCount()
el.calibrateCallback(el, 1)

out, action := el.eventHandler.OnOpened(c)
if out != nil {
Expand Down Expand Up @@ -173,7 +170,7 @@ func (el *eventloop) loopTicker() {
func (el *eventloop) loopError(c *stdConn, err error) (e error) {
if e = c.conn.Close(); e == nil {
delete(el.connections, c)
el.minusConnCount()
el.calibrateCallback(el, -1)
switch el.eventHandler.OnClosed(c, err) {
case Shutdown:
return errServerShutdown
Expand Down
2 changes: 1 addition & 1 deletion gnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Server struct {

// CountConnections counts the number of currently active connections and returns it.
func (s Server) CountConnections() (count int) {
s.svr.subLoopGroup.iterate(func(i int, el *eventloop) bool {
s.svr.subEventLoopSet.iterate(func(i int, el *eventloop) bool {
count += int(el.loadConnCount())
return true
})
Expand Down
164 changes: 114 additions & 50 deletions load_balancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@

package gnet

import (
"container/heap"
"sync"
)

// LoadBalancing represents the the type of load-balancing algorithm.
type LoadBalancing int

Expand All @@ -19,111 +24,170 @@ const (
SourceAddrHash
)

// IEventLoopGroup represents a set of event-loops.
type (
IEventLoopGroup interface {
// loadBalancer is a interface which manipulates the event-loops group.
loadBalancer interface {
register(*eventloop)
next(int) *eventloop
iterate(func(int, *eventloop) bool)
len() int
calibrate(*eventloop, int32)
}

// roundRobinEventLoopGroup with RoundRobin algorithm.
roundRobinEventLoopGroup struct {
// roundRobinEventLoopSet with Round-Robin algorithm.
roundRobinEventLoopSet struct {
nextLoopIndex int
eventLoops []*eventloop
size int
}

// leastConnectionsEventLoopGroup with Least-Connections algorithm.
leastConnectionsEventLoopGroup []*eventloop
// leastConnectionsEventLoopSet with Least-Connections algorithm.
leastConnectionsEventLoopSet struct {
sync.RWMutex
minHeap minEventLoopHeap
}

// sourceAddrHashEventLoopGroup with Hash algorithm.
sourceAddrHashEventLoopGroup struct {
// sourceAddrHashEventLoopSet with Hash algorithm.
sourceAddrHashEventLoopSet struct {
eventLoops []*eventloop
size int
}
)

func (g *roundRobinEventLoopGroup) register(el *eventloop) {
g.eventLoops = append(g.eventLoops, el)
g.size++
// ==================================== Implementation of Round-Robin load-balancer ====================================

func (set *roundRobinEventLoopSet) register(el *eventloop) {
el.idx = set.size
set.eventLoops = append(set.eventLoops, el)
set.size++
}

// next returns the eligible event-loop based on Round-Robin algorithm.
func (g *roundRobinEventLoopGroup) next(_ int) (el *eventloop) {
el = g.eventLoops[g.nextLoopIndex]
if g.nextLoopIndex++; g.nextLoopIndex >= g.size {
g.nextLoopIndex = 0
func (set *roundRobinEventLoopSet) next(_ int) (el *eventloop) {
el = set.eventLoops[set.nextLoopIndex]
if set.nextLoopIndex++; set.nextLoopIndex >= set.size {
set.nextLoopIndex = 0
}
return
}

func (g *roundRobinEventLoopGroup) iterate(f func(int, *eventloop) bool) {
for i, el := range g.eventLoops {
func (set *roundRobinEventLoopSet) iterate(f func(int, *eventloop) bool) {
for i, el := range set.eventLoops {
if !f(i, el) {
break
}
}
}

func (g *roundRobinEventLoopGroup) len() int {
return g.size
func (set *roundRobinEventLoopSet) len() int {
return set.size
}

func (g *leastConnectionsEventLoopGroup) register(el *eventloop) {
*g = append(*g, el)
func (set *roundRobinEventLoopSet) calibrate(el *eventloop, delta int32) {
el.adjustConnCount(delta)
}

// next returns the eligible event-loop based on least-connections algorithm.
func (g *leastConnectionsEventLoopGroup) next(_ int) (el *eventloop) {
eventLoops := *g
el = eventLoops[0]
leastConnCount := el.loadConnCount()
var (
curEventLoop *eventloop
curConnCount int32
)
for _, curEventLoop = range eventLoops[1:] {
if curConnCount = curEventLoop.loadConnCount(); curConnCount < leastConnCount {
leastConnCount = curConnCount
el = curEventLoop
}
}
// ================================= Implementation of Least-Connections load-balancer =================================

// Leverage min-heap to optimize Least-Connections load-balancing
type minEventLoopHeap []*eventloop

// Implement heap.Interface: Len, Less, Swap, Push, Pop.
func (h minEventLoopHeap) Len() int {
return len(h)
}

func (h minEventLoopHeap) Less(i, j int) bool {
//return (*h)[i].loadConnCount() < (*h)[j].loadConnCount()
return h[i].connCount < h[j].connCount
}

func (h minEventLoopHeap) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].idx, h[j].idx = i, j
}

func (h *minEventLoopHeap) Push(x interface{}) {
el := x.(*eventloop)
el.idx = len(*h)
*h = append(*h, el)
}

func (h *minEventLoopHeap) Pop() interface{} {
old := *h
i := len(old) - 1
x := old[i]
old[i] = nil // avoid memory leak
x.idx = -1 // for safety
*h = old[:i]
return x
}

func (set *leastConnectionsEventLoopSet) register(el *eventloop) {
set.Lock()
heap.Push(&set.minHeap, el)
set.Unlock()
}

// next returns the eligible event-loop by taking the root node from minimum heap based on least-connections algorithm.
func (set *leastConnectionsEventLoopSet) next(_ int) (el *eventloop) {
set.RLock()
//el = heap.Pop(&set.minHeap).(*eventloop)
el = set.minHeap[0]
set.RUnlock()
return
}

func (g *leastConnectionsEventLoopGroup) iterate(f func(int, *eventloop) bool) {
eventLoops := *g
for i, el := range eventLoops {
func (set *leastConnectionsEventLoopSet) iterate(f func(int, *eventloop) bool) {
set.RLock()
for i, el := range set.minHeap {
if !f(i, el) {
break
}
}
set.RUnlock()
}

func (g *leastConnectionsEventLoopGroup) len() int {
return len(*g)
func (set *leastConnectionsEventLoopSet) len() (size int) {
set.RLock()
size = set.minHeap.Len()
set.RUnlock()
return
}

func (set *leastConnectionsEventLoopSet) calibrate(el *eventloop, delta int32) {
//el.adjustConnCount(delta)
set.Lock()
el.connCount += delta
heap.Fix(&set.minHeap, el.idx)
set.Unlock()
}

func (g *sourceAddrHashEventLoopGroup) register(el *eventloop) {
g.eventLoops = append(g.eventLoops, el)
g.size++
// ======================================= Implementation of Hash load-balancer ========================================

func (set *sourceAddrHashEventLoopSet) register(el *eventloop) {
el.idx = set.size
set.eventLoops = append(set.eventLoops, el)
set.size++
}

// next returns the eligible event-loop by taking the remainder of a given fd as the index of event-loop list.
func (g *sourceAddrHashEventLoopGroup) next(hashCode int) *eventloop {
return g.eventLoops[hashCode%g.size]
func (set *sourceAddrHashEventLoopSet) next(hashCode int) *eventloop {
return set.eventLoops[hashCode%set.size]
}

func (g *sourceAddrHashEventLoopGroup) iterate(f func(int, *eventloop) bool) {
for i, el := range g.eventLoops {
func (set *sourceAddrHashEventLoopSet) iterate(f func(int, *eventloop) bool) {
for i, el := range set.eventLoops {
if !f(i, el) {
break
}
}
}

func (g *sourceAddrHashEventLoopGroup) len() int {
return g.size
func (set *sourceAddrHashEventLoopSet) len() int {
return set.size
}

func (set *sourceAddrHashEventLoopSet) calibrate(el *eventloop, delta int32) {
el.adjustConnCount(delta)
}
Loading

0 comments on commit b6a5f56

Please sign in to comment.