From abbf5820c1d264fa22e8bf8d00755d6093a86767 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Wed, 12 Aug 2020 11:03:29 +0800 Subject: [PATCH] opt: refactor the load-balancing algorithm of source addr hash --- acceptor_unix.go | 8 ++++++-- acceptor_windows.go | 18 +++--------------- connection_unix.go | 13 +++++++++++-- connection_windows.go | 13 +++++++++++-- eventloop_unix.go | 10 ++-------- eventloop_windows.go | 12 ++---------- load_balancing.go | 20 ++++++++++++++++---- 7 files changed, 51 insertions(+), 43 deletions(-) diff --git a/acceptor_unix.go b/acceptor_unix.go index b4a2028ba..a3cb441bf 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -25,6 +25,7 @@ package gnet import ( "os" + "github.com/panjf2000/gnet/internal/netpoll" "golang.org/x/sys/unix" ) @@ -39,8 +40,11 @@ func (svr *server) acceptNewConnection(fd int) error { if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil { return err } - el := svr.subEventLoopSet.next(nfd) - c := newTCPConn(nfd, el, sa) + + netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa) + el := svr.subEventLoopSet.next(netAddr) + c := newTCPConn(nfd, el, sa, netAddr) + _ = el.poller.Trigger(func() (err error) { if err = el.poller.AddRead(nfd); err != nil { return diff --git a/acceptor_windows.go b/acceptor_windows.go index eea1b9699..7ce46cd49 100644 --- a/acceptor_windows.go +++ b/acceptor_windows.go @@ -21,19 +21,7 @@ package gnet -import ( - "hash/crc32" - "time" -) - -// hashCode hashes a string to a unique hashcode. -func hashCode(s string) int { - v := int(crc32.ChecksumIEEE([]byte(s))) - if v >= 0 { - return v - } - return -v -} +import "time" func (svr *server) listenerRun() { var err error @@ -48,7 +36,7 @@ func (svr *server) listenerRun() { return } - el := svr.subEventLoopSet.next(hashCode(addr.String())) + el := svr.subEventLoopSet.next(addr) c := newUDPConn(el, svr.ln.lnaddr, addr) el.ch <- packUDPConn(c, packet[:n]) } else { @@ -58,7 +46,7 @@ func (svr *server) listenerRun() { err = e return } - el := svr.subEventLoopSet.next(hashCode(conn.RemoteAddr().String())) + el := svr.subEventLoopSet.next(conn.RemoteAddr()) c := newTCPConn(conn, el) el.ch <- c go func() { diff --git a/connection_unix.go b/connection_unix.go index 47d46b186..ece1aa94a 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -26,6 +26,7 @@ package gnet import ( "net" "os" + "time" "github.com/panjf2000/gnet/internal/netpoll" "github.com/panjf2000/gnet/pool/bytebuffer" @@ -49,8 +50,8 @@ type conn struct { outboundBuffer *ringbuffer.RingBuffer // buffer for data that is ready to write to client } -func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn { - return &conn{ +func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, remoteAddr net.Addr) (c *conn) { + c = &conn{ fd: fd, sa: sa, loop: el, @@ -58,6 +59,14 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr) *conn { inboundBuffer: prb.Get(), outboundBuffer: prb.Get(), } + c.localAddr = el.ln.lnaddr + c.remoteAddr = remoteAddr + if el.svr.opts.TCPKeepAlive > 0 { + if proto := el.ln.network; proto == "tcp" || proto == "unix" { + _ = netpoll.SetKeepAlive(fd, int(el.svr.opts.TCPKeepAlive/time.Second)) + } + } + return } func (c *conn) releaseTCP() { diff --git a/connection_windows.go b/connection_windows.go index fe6133c81..e8c8ccac5 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -72,13 +72,22 @@ func packUDPConn(c *stdConn, buf []byte) *udpConn { return packet } -func newTCPConn(conn net.Conn, el *eventloop) *stdConn { - return &stdConn{ +func newTCPConn(conn net.Conn, el *eventloop) (c *stdConn) { + c = &stdConn{ conn: conn, loop: el, codec: el.svr.codec, inboundBuffer: prb.Get(), } + c.localAddr = el.svr.ln.lnaddr + c.remoteAddr = c.conn.RemoteAddr() + if el.svr.opts.TCPKeepAlive > 0 { + if tc, ok := conn.(*net.TCPConn); ok { + _ = tc.SetKeepAlive(true) + _ = tc.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive) + } + } + return } func (c *stdConn) releaseTCP() { diff --git a/eventloop_unix.go b/eventloop_unix.go index 25372c768..ab8adeedf 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -90,7 +90,8 @@ func (el *eventloop) loopAccept(fd int) error { if err = os.NewSyscallError("fcntl nonblock", unix.SetNonblock(nfd, true)); err != nil { return err } - c := newTCPConn(nfd, el, sa) + netAddr := netpoll.SockaddrToTCPOrUnixAddr(sa) + c := newTCPConn(nfd, el, sa, netAddr) if err = el.poller.AddRead(c.fd); err == nil { el.connections[c.fd] = c return el.loopOpen(c) @@ -103,13 +104,6 @@ func (el *eventloop) loopAccept(fd int) error { func (el *eventloop) loopOpen(c *conn) error { c.opened = true - c.localAddr = el.ln.lnaddr - c.remoteAddr = netpoll.SockaddrToTCPOrUnixAddr(c.sa) - if el.svr.opts.TCPKeepAlive > 0 { - if proto := el.ln.network; proto == "tcp" || proto == "unix" { - _ = netpoll.SetKeepAlive(c.fd, int(el.svr.opts.TCPKeepAlive/time.Second)) - } - } el.calibrateCallback(el, 1) out, action := el.eventHandler.OnOpened(c) diff --git a/eventloop_windows.go b/eventloop_windows.go index 985b53245..cf560db93 100644 --- a/eventloop_windows.go +++ b/eventloop_windows.go @@ -22,10 +22,10 @@ package gnet import ( - "github.com/panjf2000/gnet/pool/bytebuffer" - "net" "time" + "github.com/panjf2000/gnet/pool/bytebuffer" + "github.com/panjf2000/gnet/errors" ) @@ -82,14 +82,6 @@ func (el *eventloop) loopRun() { func (el *eventloop) loopAccept(c *stdConn) error { el.connections[c] = struct{}{} - c.localAddr = el.svr.ln.lnaddr - c.remoteAddr = c.conn.RemoteAddr() - if el.svr.opts.TCPKeepAlive > 0 { - if c, ok := c.conn.(*net.TCPConn); ok { - _ = c.SetKeepAlive(true) - _ = c.SetKeepAlivePeriod(el.svr.opts.TCPKeepAlive) - } - } el.calibrateCallback(el, 1) out, action := el.eventHandler.OnOpened(c) diff --git a/load_balancing.go b/load_balancing.go index da4d5693a..bd5c41567 100644 --- a/load_balancing.go +++ b/load_balancing.go @@ -22,6 +22,8 @@ package gnet import ( "container/heap" + "hash/crc32" + "net" "sync" "sync/atomic" ) @@ -45,7 +47,7 @@ type ( // loadBalancer is a interface which manipulates the event-loop set. loadBalancer interface { register(*eventloop) - next(int) *eventloop + next(net.Addr) *eventloop iterate(func(int, *eventloop) bool) len() int calibrate(*eventloop, int32) @@ -83,7 +85,7 @@ func (set *roundRobinEventLoopSet) register(el *eventloop) { } // next returns the eligible event-loop based on Round-Robin algorithm. -func (set *roundRobinEventLoopSet) next(_ int) (el *eventloop) { +func (set *roundRobinEventLoopSet) next(_ net.Addr) (el *eventloop) { el = set.eventLoops[set.nextLoopIndex] if set.nextLoopIndex++; set.nextLoopIndex >= set.size { set.nextLoopIndex = 0 @@ -154,7 +156,7 @@ func (set *leastConnectionsEventLoopSet) register(el *eventloop) { } // next returns the eligible event-loop by taking the root node from minimum heap based on Least-Connections algorithm. -func (set *leastConnectionsEventLoopSet) next(_ int) (el *eventloop) { +func (set *leastConnectionsEventLoopSet) next(_ net.Addr) (el *eventloop) { // set.RLock() // el = set.minHeap[0] // set.RUnlock() @@ -208,8 +210,18 @@ func (set *sourceAddrHashEventLoopSet) register(el *eventloop) { set.size++ } +// hash hashes a string to a unique hash code. +func (set *sourceAddrHashEventLoopSet) hash(s string) int { + v := int(crc32.ChecksumIEEE([]byte(s))) + if v >= 0 { + return v + } + return -v +} + // next returns the eligible event-loop by taking the remainder of a given fd as the index of event-loop list. -func (set *sourceAddrHashEventLoopSet) next(hashCode int) *eventloop { +func (set *sourceAddrHashEventLoopSet) next(netAddr net.Addr) *eventloop { + hashCode := set.hash(netAddr.String()) return set.eventLoops[hashCode%set.size] }