Skip to content

Commit

Permalink
Make Signaller HTTP-client more configurable (#100)
Browse files Browse the repository at this point in the history
This commit introduces additional options for tweaking Signaller
HTTP-client:
* adjusting MaxConnsPerHost, MaxIdleConns, MaxIdleConnsPerHost on
  http.Transport fields if they are different from the default value of
  an according command-line argument, but with the possibility to set
  them to 0, because 0 is a valid value for them;
* setting a timeout on Signaller HTTP-requests towards upstream;

Also this commit adds draining of a response body from Signaler's
upstream for improving connection reuse.

Co-authored-by: mqmr <me@mqmr.com>
Co-authored-by: Martin Helmich <m.helmich@mittwald.de>
  • Loading branch information
3 people committed Oct 6, 2021
1 parent adc2210 commit 783b501
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 27 deletions.
35 changes: 27 additions & 8 deletions cmd/kube-httpcache/internal/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,19 @@ type KubeHTTPProxyFlags struct {
PortName string
}
Signaller struct {
Enable bool
Address string
Port int
WorkersCount int
MaxRetries int
RetryBackoffString string
RetryBackoff time.Duration
QueueLength int
Enable bool
Address string
Port int
WorkersCount int
MaxRetries int
RetryBackoffString string
RetryBackoff time.Duration
QueueLength int
MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int
UpstreamRequestTimeoutString string
UpstreamRequestTimeout time.Duration
}
Admin struct {
Address string
Expand Down Expand Up @@ -84,6 +89,13 @@ func (f *KubeHTTPProxyFlags) Parse() error {
flag.IntVar(&f.Signaller.MaxRetries, "signaller-retries", 5, "maximum number of attempts for signalling request")
flag.StringVar(&f.Signaller.RetryBackoffString, "signaller-backoff", "30s", "backoff for signalling request attempts")
flag.IntVar(&f.Signaller.QueueLength, "signaller-queue-length", 0, "length of signaller's processing queue")
flag.IntVar(&f.Signaller.MaxConnsPerHost, "signaller-max-conns-per-host", -1,
"set http.Transport.MaxConnsPerHost in signaller http-client, avaliable then upstream connection reuse is enabled")
flag.IntVar(&f.Signaller.MaxIdleConns, "signaller-max-idle-conns", -1,
"set http.Transport.MaxIdleConns in signaller http-client, avaliable then upstream connection reuse is enabled")
flag.IntVar(&f.Signaller.MaxIdleConnsPerHost, "signaller-max-idle-conns-per-host", -1,
"set http.Transport.MaxIdleConnsPerHost in signaller http-client, avaliable then upstream connection reuse is enabled")
flag.StringVar(&f.Signaller.UpstreamRequestTimeoutString, "signaller-request-timeout", "", "timeout for an outgoing signaller request")

flag.StringVar(&f.Admin.Address, "admin-addr", "127.0.0.1", "TCP address for the Varnish admin")
flag.IntVar(&f.Admin.Port, "admin-port", 6082, "TCP port for the Varnish admin")
Expand Down Expand Up @@ -118,5 +130,12 @@ func (f *KubeHTTPProxyFlags) Parse() error {
return err
}

if f.Signaller.UpstreamRequestTimeoutString != "" {
f.Signaller.UpstreamRequestTimeout, err = time.ParseDuration(f.Signaller.UpstreamRequestTimeoutString)
if err != nil {
return err
}
}

return nil
}
4 changes: 4 additions & 0 deletions cmd/kube-httpcache/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ func main() {
opts.Signaller.MaxRetries,
opts.Signaller.RetryBackoff,
opts.Signaller.QueueLength,
opts.Signaller.MaxConnsPerHost,
opts.Signaller.MaxIdleConns,
opts.Signaller.MaxIdleConnsPerHost,
opts.Signaller.UpstreamRequestTimeout,
)
varnishSignallerErrors = varnishSignaller.GetErrors()

Expand Down
24 changes: 24 additions & 0 deletions pkg/signaller/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package signaller
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"net/http"
"strconv"
Expand Down Expand Up @@ -57,6 +58,25 @@ func (b *Signaller) ServeHTTP(w http.ResponseWriter, r *http.Request) {

func (b *Signaller) ProcessSignalQueue() {
client := &http.Client{}
transport := http.DefaultTransport.(*http.Transport).Clone()

if b.MaxConnsPerHost != -1 {
transport.MaxConnsPerHost = b.MaxConnsPerHost
}

if b.MaxIdleConns != -1 {
transport.MaxIdleConns = b.MaxIdleConns
}

if b.MaxIdleConnsPerHost != -1 {
transport.MaxIdleConnsPerHost = b.MaxIdleConnsPerHost
}

client.Transport = transport

if b.UpstreamRequestTimeout != 0 {
client.Timeout = b.UpstreamRequestTimeout
}

for signal := range b.signalQueue {
response, err := client.Do(signal.Request)
Expand All @@ -73,6 +93,10 @@ func (b *Signaller) ProcessSignalQueue() {
}

if response != nil {
if _, err := io.Copy(ioutil.Discard, response.Body); err != nil {
glog.Error("error on discarding response body for connection reuse:", err)
}

if err := response.Body.Close(); err != nil {
glog.Error("error on closing response body:", err)
}
Expand Down
50 changes: 31 additions & 19 deletions pkg/signaller/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,20 @@ type Signal struct {
}

type Signaller struct {
Address string
Port int
WorkersCount int
MaxRetries int
RetryBackoff time.Duration
EndpointScheme string
endpoints *watcher.EndpointConfig
signalQueue chan Signal
errors chan error
mutex sync.RWMutex
Address string
Port int
WorkersCount int
MaxRetries int
RetryBackoff time.Duration
MaxConnsPerHost int
MaxIdleConns int
MaxIdleConnsPerHost int
UpstreamRequestTimeout time.Duration
EndpointScheme string
endpoints *watcher.EndpointConfig
signalQueue chan Signal
errors chan error
mutex sync.RWMutex
}

func NewSignaller(
Expand All @@ -34,22 +38,30 @@ func NewSignaller(
maxRetries int,
retryBackoff time.Duration,
queueLength int,
maxConnsPerHost int,
maxIdleConns int,
maxIdleConnsPerHost int,
upstreamRequestTimeout time.Duration,
) *Signaller {
if queueLength < 0 {
queueLength = 0
glog.Warningf("signaller processing queue cannot have a negative length, falling back to default value: %d", queueLength)
}

return &Signaller{
Address: address,
Port: port,
WorkersCount: workersCount,
MaxRetries: maxRetries,
RetryBackoff: retryBackoff,
EndpointScheme: "http",
endpoints: watcher.NewEndpointConfig(),
signalQueue: make(chan Signal, queueLength),
errors: make(chan error),
Address: address,
Port: port,
WorkersCount: workersCount,
MaxRetries: maxRetries,
RetryBackoff: retryBackoff,
MaxConnsPerHost: maxConnsPerHost,
MaxIdleConns: maxIdleConns,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
UpstreamRequestTimeout: upstreamRequestTimeout,
EndpointScheme: "http",
endpoints: watcher.NewEndpointConfig(),
signalQueue: make(chan Signal, queueLength),
errors: make(chan error),
}
}

Expand Down

0 comments on commit 783b501

Please sign in to comment.