Skip to content

Commit

Permalink
proxy: limit the number of concurrent inflight requests (#521) (#526)
Browse files Browse the repository at this point in the history
With a default limit of 256 inflight requests, a latency of 3ms to
NextDNS, asking `dnsperf` to run at 100 QPS, there is no change in
latency (which is expected).

The limit can be adjusted using `-max-inflight-requests`. Ideally,
this could be tuned depending on the memory available on the system,
but it seems complex to do that portably in Go. I think 256 is a fine
limit for most systems. We could have a different default limit for
smaller archs (like mips) if we wanted to do some kind of "detection".
  • Loading branch information
vincentbernat committed Jun 28, 2021
1 parent aca0a60 commit d0570f3
Show file tree
Hide file tree
Showing 6 changed files with 63 additions and 10 deletions.
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
BogusPriv bool
UseHosts bool
Timeout time.Duration
MaxInflightRequests uint
SetupRouter bool
AutoActivate bool
}
Expand Down Expand Up @@ -145,6 +146,11 @@ func (c *Config) flagSet(cmd string) flagSet {
fs.BoolVar(&c.UseHosts, "use-hosts", true,
"Lookup /etc/hosts before sending queries to upstream resolver.")
fs.DurationVar(&c.Timeout, "timeout", 5*time.Second, "Maximum duration allowed for a request before failing.")
fs.UintVar(&c.MaxInflightRequests, "max-inflight-requests", 256,
"Maximum number of inflight requests handled by the proxy. No additional\n"+
"requests will not be answered after this threshold is met. Increasing\n"+
"this value can reduce latency in case of burst of requests but it can\n"+
"also increase significantly memory usage.")
fs.BoolVar(&c.SetupRouter, "setup-router", false,
"Automatically configure NextDNS for a router setup.\n"+
"Common types of router are detected to integrate gracefuly. Changes\n"+
Expand Down Expand Up @@ -236,6 +242,13 @@ func (fs flagSet) DurationVar(p *time.Duration, name string, value time.Duration
fs.storage[name] = service.ConfigDuration{Value: p, Default: value}
}

func (fs flagSet) UintVar(p *uint, name string, value uint, usage string) {
if fs.flag != nil {
fs.flag.UintVar(p, name, value, usage)
}
fs.storage[name] = service.ConfigUint{Value: p, Default: value}
}

func (fs flagSet) Var(value flag.Value, name string, usage string) {
if fs.flag != nil {
fs.flag.Var(value, name, usage)
Expand Down
26 changes: 26 additions & 0 deletions host/service/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -103,6 +104,31 @@ func (e ConfigDuration) String() string {
return e.Value.String()
}

type ConfigUint struct {
Value *uint
Default uint
}

func (e ConfigUint) IsDefault() bool {
return e.Value == nil || *e.Value == e.Default
}

func (e ConfigUint) Set(v string) error {
d, err := strconv.ParseUint(v, 10, 16)
if err != nil {
return err
}
*e.Value = uint(d)
return nil
}

func (e ConfigUint) String() string {
if e.Value == nil {
return ""
}
return fmt.Sprintf("%d", *e.Value)
}

type ConfigFileStorer struct {
File string
}
Expand Down
9 changes: 7 additions & 2 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ type Proxy struct {
// being cancelled.
Timeout time.Duration

// Maximum number of inflight requests. Further requests will
// not be answered.
MaxInflightRequests uint

// QueryLog specifies an optional log function called for each received query.
QueryLog func(QueryInfo)

Expand Down Expand Up @@ -101,6 +105,7 @@ func (p Proxy) ListenAndServe(ctx context.Context) error {
errs := make(chan error, expReturns)
var closeAll []func() error
var closeAllMu sync.Mutex
inflightRequests := make(chan struct{}, p.MaxInflightRequests)

for _, addr := range addrs {
go func(addr string) {
Expand All @@ -111,7 +116,7 @@ func (p Proxy) ListenAndServe(ctx context.Context) error {
closeAllMu.Lock()
closeAll = append(closeAll, udp.Close)
closeAllMu.Unlock()
err = p.serveUDP(udp)
err = p.serveUDP(udp, inflightRequests)
}
cancel()
if err != nil {
Expand All @@ -128,7 +133,7 @@ func (p Proxy) ListenAndServe(ctx context.Context) error {
closeAllMu.Lock()
closeAll = append(closeAll, tcp.Close)
closeAllMu.Unlock()
err = p.serveTCP(tcp)
err = p.serveTCP(tcp, inflightRequests)
}
cancel()
if err != nil {
Expand Down
10 changes: 7 additions & 3 deletions proxy/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const maxTCPSize = 65535

func (p Proxy) serveTCP(l net.Listener) error {
func (p Proxy) serveTCP(l net.Listener, inflightRequests chan struct{}) error {
bpool := &sync.Pool{
New: func() interface{} {
b := make([]byte, maxTCPSize)
Expand All @@ -34,7 +34,7 @@ func (p Proxy) serveTCP(l net.Listener) error {
return err
}
go func() {
if err := p.serveTCPConn(c, bpool); err != nil {
if err := p.serveTCPConn(c, inflightRequests, bpool); err != nil {
if p.ErrorLog != nil {
p.ErrorLog(err)
}
Expand All @@ -43,19 +43,22 @@ func (p Proxy) serveTCP(l net.Listener) error {
}
}

func (p Proxy) serveTCPConn(c net.Conn, bpool *sync.Pool) error {
func (p Proxy) serveTCPConn(c net.Conn, inflightRequests chan struct{}, bpool *sync.Pool) error {
defer c.Close()

for {
inflightRequests <- struct{}{}
buf := *bpool.Get().(*[]byte)
qsize, err := readTCP(c, buf)
if err != nil {
<-inflightRequests
if err == io.EOF {
return nil
}
return fmt.Errorf("TCP read: %v", err)
}
if qsize <= 14 {
<-inflightRequests
return fmt.Errorf("query too small: %d", qsize)
}
start := time.Now()
Expand All @@ -77,6 +80,7 @@ func (p Proxy) serveTCPConn(c net.Conn, bpool *sync.Pool) error {
}
bpool.Put(&buf)
bpool.Put(&rbuf)
<-inflightRequests
p.logQuery(QueryInfo{
PeerIP: q.PeerIP,
Protocol: "TCP",
Expand Down
6 changes: 5 additions & 1 deletion proxy/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ var udpOOBSize = func() int {
return len(oob6)
}()

func (p Proxy) serveUDP(l net.PacketConn) error {
func (p Proxy) serveUDP(l net.PacketConn, inflightRequests chan struct{}) error {
bpool := sync.Pool{
New: func() interface{} {
// Use the same buffer size as for TCP and truncate later. UDP and
Expand All @@ -58,9 +58,11 @@ func (p Proxy) serveUDP(l net.PacketConn) error {
}

for {
inflightRequests <- struct{}{}
buf := *bpool.Get().(*[]byte)
qsize, lip, raddr, err := readUDP(c, buf)
if err != nil {
<-inflightRequests
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
bpool.Put(&buf)
continue
Expand All @@ -69,6 +71,7 @@ func (p Proxy) serveUDP(l net.PacketConn) error {
}
if qsize <= 14 {
bpool.Put(&buf)
<-inflightRequests
continue
}
start := time.Now()
Expand All @@ -89,6 +92,7 @@ func (p Proxy) serveUDP(l net.PacketConn) error {
}
bpool.Put(&buf)
bpool.Put(&rbuf)
<-inflightRequests
p.logQuery(QueryInfo{
PeerIP: q.PeerIP,
Protocol: "UDP",
Expand Down
9 changes: 5 additions & 4 deletions run.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,11 @@ func run(args []string) error {
}

p.Proxy = proxy.Proxy{
Addrs: c.Listens,
Upstream: p.resolver,
BogusPriv: c.BogusPriv,
Timeout: c.Timeout,
Addrs: c.Listens,
Upstream: p.resolver,
BogusPriv: c.BogusPriv,
Timeout: c.Timeout,
MaxInflightRequests: c.MaxInflightRequests,
}

discoverHosts := &discovery.Hosts{OnError: func(err error) { log.Errorf("hosts: %v", err) }}
Expand Down

0 comments on commit d0570f3

Please sign in to comment.