Skip to content

Commit

Permalink
opt: refactor the load-balancing algorithm of source addr hash
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Aug 12, 2020
1 parent 9ab678b commit abbf582
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 43 deletions.
8 changes: 6 additions & 2 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ package gnet
import (
"os"

"github.com/panjf2000/gnet/internal/netpoll"
"golang.org/x/sys/unix"
)

Expand All @@ -39,8 +40,11 @@ func (svr *server) acceptNewConnection(fd int) error {
if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil {
return err
}
el := svr.subEventLoopSet.next(nfd)
c := newTCPConn(nfd, el, sa)

netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa)
el := svr.subEventLoopSet.next(netAddr)
c := newTCPConn(nfd, el, sa, netAddr)

_ = el.poller.Trigger(func() (err error) {
if err = el.poller.AddRead(nfd); err != nil {
return
Expand Down
18 changes: 3 additions & 15 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,7 @@

package gnet

import (
"hash/crc32"
"time"
)

// hashCode hashes a string to a unique hashcode.
func hashCode(s string) int {
v := int(crc32.ChecksumIEEE([]byte(s)))
if v >= 0 {
return v
}
return -v
}
import "time"

func (svr *server) listenerRun() {
var err error
Expand All @@ -48,7 +36,7 @@ func (svr *server) listenerRun() {
return
}

el := svr.subEventLoopSet.next(hashCode(addr.String()))
el := svr.subEventLoopSet.next(addr)
c := newUDPConn(el, svr.ln.lnaddr, addr)
el.ch <- packUDPConn(c, packet[:n])
} else {
Expand All @@ -58,7 +46,7 @@ func (svr *server) listenerRun() {
err = e
return
}
el := svr.subEventLoopSet.next(hashCode(conn.RemoteAddr().String()))
el := svr.subEventLoopSet.next(conn.RemoteAddr())
c := newTCPConn(conn, el)
el.ch <- c
go func() {
Expand Down
13 changes: 11 additions & 2 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package gnet
import (
"net"
"os"
"time"

"github.com/panjf2000/gnet/internal/netpoll"
"github.com/panjf2000/gnet/pool/bytebuffer"
Expand All @@ -49,15 +50,23 @@ type conn struct {
outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client
}

func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn {
return &conn{
func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, remoteAddr net.Addr) (c *conn) {
c = &conn{
fd: fd,
sa: sa,
loop: el,
codec: el.svr.codec,
inboundBuffer: prb.Get(),
outboundBuffer: prb.Get(),
}
c.localAddr = el.ln.lnaddr
c.remoteAddr = remoteAddr
if el.svr.opts.TCPKeepAlive > 0 {
if proto := el.ln.network; proto == "tcp" || proto == "unix" {
_ = netpoll.SetKeepAlive(fd, int(el.svr.opts.TCPKeepAlive/time.Second))
}
}
return
}

func (c *conn) releaseTCP() {
Expand Down
13 changes: 11 additions & 2 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,22 @@ func packUDPConn(c *stdConn, buf []byte) *udpConn {
return packet
}

func newTCPConn(conn net.Conn, el *eventloop) *stdConn {
return &stdConn{
func newTCPConn(conn net.Conn, el *eventloop) (c *stdConn) {
c = &stdConn{
conn: conn,
loop: el,
codec: el.svr.codec,
inboundBuffer: prb.Get(),
}
c.localAddr = el.svr.ln.lnaddr
c.remoteAddr = c.conn.RemoteAddr()
if el.svr.opts.TCPKeepAlive > 0 {
if tc, ok := conn.(*net.TCPConn); ok {
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive)
}
}
return
}

func (c *stdConn) releaseTCP() {
Expand Down
10 changes: 2 additions & 8 deletions eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ func (el *eventloop) loopAccept(fd int) error {
if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil {
return err
}
c := newTCPConn(nfd, el, sa)
netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa)
c := newTCPConn(nfd, el, sa, netAddr)
if err = el.poller.AddRead(c.fd); err == nil {
el.connections[c.fd] = c
return el.loopOpen(c)
Expand All @@ -103,13 +104,6 @@ func (el *eventloop) loopAccept(fd int) error {

func (el *eventloop) loopOpen(c *conn) error {
c.opened = true
c.localAddr = el.ln.lnaddr
c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa)
if el.svr.opts.TCPKeepAlive > 0 {
if proto := el.ln.network; proto == "tcp" || proto == "unix" {
_ = netpoll.SetKeepAlive(c.fd, int(el.svr.opts.TCPKeepAlive/time.Second))
}
}
el.calibrateCallback(el, 1)

out, action := el.eventHandler.OnOpened(c)
Expand Down
12 changes: 2 additions & 10 deletions eventloop_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
package gnet

import (
"github.com/panjf2000/gnet/pool/bytebuffer"
"net"
"time"

"github.com/panjf2000/gnet/pool/bytebuffer"

"github.com/panjf2000/gnet/errors"
)

Expand Down Expand Up @@ -82,14 +82,6 @@ func (el *eventloop) loopRun() {

func (el *eventloop) loopAccept(c *stdConn) error {
el.connections[c] = struct{}{}
c.localAddr = el.svr.ln.lnaddr
c.remoteAddr = c.conn.RemoteAddr()
if el.svr.opts.TCPKeepAlive > 0 {
if c, ok := c.conn.(*net.TCPConn); ok {
_ = c.SetKeepAlive(true)
_ = c.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive)
}
}
el.calibrateCallback(el, 1)

out, action := el.eventHandler.OnOpened(c)
Expand Down
20 changes: 16 additions & 4 deletions load_balancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ package gnet

import (
"container/heap"
"hash/crc32"
"net"
"sync"
"sync/atomic"
)
Expand All @@ -45,7 +47,7 @@ type (
// loadBalancer is a interface which manipulates the event-loop set.
loadBalancer interface {
register(*eventloop)
next(int) *eventloop
next(net.Addr) *eventloop
iterate(func(int, *eventloop) bool)
len() int
calibrate(*eventloop, int32)
Expand Down Expand Up @@ -83,7 +85,7 @@ func (set *roundRobinEventLoopSet) register(el *eventloop) {
}

// next returns the eligible event-loop based on Round-Robin algorithm.
func (set *roundRobinEventLoopSet) next(_ int) (el *eventloop) {
func (set *roundRobinEventLoopSet) next(_ net.Addr) (el *eventloop) {
el = set.eventLoops[set.nextLoopIndex]
if set.nextLoopIndex++; set.nextLoopIndex >= set.size {
set.nextLoopIndex = 0
Expand Down Expand Up @@ -154,7 +156,7 @@ func (set *leastConnectionsEventLoopSet) register(el *eventloop) {
}

// next returns the eligible event-loop by taking the root node from minimum heap based on Least-Connections algorithm.
func (set *leastConnectionsEventLoopSet) next(_ int) (el *eventloop) {
func (set *leastConnectionsEventLoopSet) next(_ net.Addr) (el *eventloop) {
// set.RLock()
// el = set.minHeap[0]
// set.RUnlock()
Expand Down Expand Up @@ -208,8 +210,18 @@ func (set *sourceAddrHashEventLoopSet) register(el *eventloop) {
set.size++
}

// hash hashes a string to a unique hash code.
func (set *sourceAddrHashEventLoopSet) hash(s string) int {
v := int(crc32.ChecksumIEEE([]byte(s)))
if v >= 0 {
return v
}
return -v
}

// next returns the eligible event-loop by taking the remainder of a given fd as the index of event-loop list.
func (set *sourceAddrHashEventLoopSet) next(hashCode int) *eventloop {
func (set *sourceAddrHashEventLoopSet) next(netAddr net.Addr) *eventloop {
hashCode := set.hash(netAddr.String())
return set.eventLoops[hashCode%set.size]
}

Expand Down

0 comments on commit abbf582

Please sign in to comment.