Permalink
Browse files

Read config values from environment for max_conns tuning

- max_conns / idle / per host are now read from env-vars and have
defaults set to 1024 for both values
- logging / metrics are collected in the client transaction
rather than via defer (this may impact throughput)
- function cache moved to use RWMutex to try to improve latency
around locking when updating cache
- logging message added to show latency in running GetReplicas
because this was observed to increase in a linear fashion under
high concurrency
- changes tested against 3-node bare-metal 1.13 K8s cluster
with kubeadm

Signed-off-by: Alex Ellis (VMware) <alexellis2@gmail.com>
  • Loading branch information...
alexellis committed Jan 22, 2019
1 parent 52c27e2 commit 299e5a59331f8c2ff3d7fcf71283f739e797e9fe
@@ -61,11 +61,11 @@ func MakeForwardingProxyHandler(proxy *types.HTTPClientReverseProxy, notifiers [
log.Printf("error with upstream request to: %s, %s\n", requestURL, err.Error())
}

defer func() {
for _, notifier := range notifiers {
notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
}
}()
// defer func() {
for _, notifier := range notifiers {
notifier.Notify(r.Method, requestURL, originalURL, statusCode, seconds)
}
// }()

}
}
@@ -6,17 +6,15 @@ package plugin
import (
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"strconv"
"time"

"fmt"

"io/ioutil"

"github.com/openfaas/faas-provider/auth"
"github.com/openfaas/faas/gateway/requests"
"github.com/openfaas/faas/gateway/scaling"
@@ -62,6 +60,8 @@ type ScaleServiceRequest struct {

// GetReplicas replica count for function
func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQueryResponse, error) {
start := time.Now()

var err error
var emptyServiceQueryResponse scaling.ServiceQueryResponse

@@ -92,6 +92,7 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
log.Println(urlPath, err)
}
} else {
log.Printf("GetReplicas took: %fs", time.Since(start).Seconds())
return emptyServiceQueryResponse, fmt.Errorf("server returned non-200 status code (%d) for function, %s", res.StatusCode, serviceName)
}
}
@@ -115,6 +116,8 @@ func (s ExternalServiceQuery) GetReplicas(serviceName string) (scaling.ServiceQu
}
}

log.Printf("GetReplicas took: %fs", time.Since(start).Seconds())

return scaling.ServiceQueryResponse{
Replicas: function.Replicas,
MaxReplicas: maxReplicas,
@@ -25,7 +25,7 @@ func (fm *FunctionMeta) Expired(expiry time.Duration) bool {
type FunctionCache struct {
Cache map[string]*FunctionMeta
Expiry time.Duration
Sync sync.Mutex
Sync sync.RWMutex
}

// Set replica count for functionName
@@ -37,23 +37,22 @@ func (fc *FunctionCache) Set(functionName string, serviceQueryResponse ServiceQu
fc.Cache[functionName] = &FunctionMeta{}
}

entry := fc.Cache[functionName]
entry.LastRefresh = time.Now()
entry.ServiceQueryResponse = serviceQueryResponse

fc.Cache[functionName].LastRefresh = time.Now()
fc.Cache[functionName].ServiceQueryResponse = serviceQueryResponse
// entry.LastRefresh = time.Now()
// entry.ServiceQueryResponse = serviceQueryResponse
}

// Get replica count for functionName
func (fc *FunctionCache) Get(functionName string) (ServiceQueryResponse, bool) {

fc.Sync.Lock()
defer fc.Sync.Unlock()

replicas := ServiceQueryResponse{
AvailableReplicas: 0,
}

hit := false
fc.Sync.RLock()
defer fc.Sync.RUnlock()

if val, exists := fc.Cache[functionName]; exists {
replicas = val.ServiceQueryResponse
hit = !val.Expired(fc.Expiry)
@@ -57,7 +57,7 @@ func main() {
exporter.StartServiceWatcher(*config.FunctionsProviderURL, metricsOptions, "func", servicePollInterval)
metrics.RegisterExporter(exporter)

reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout)
reverseProxy := types.NewHTTPClientReverseProxy(config.FunctionsProviderURL, config.UpstreamTimeout, config.MaxIdleConns, config.MaxIdleConnsPerHost)

loggingNotifier := handlers.LoggingNotifier{}
prometheusNotifier := handlers.PrometheusFunctionNotifier{
@@ -11,7 +11,7 @@ import (
)

// NewHTTPClientReverseProxy proxies to an upstream host through the use of a http.Client
func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPClientReverseProxy {
func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration, maxIdleConns, maxIdleConnsPerHost int) *HTTPClientReverseProxy {
h := HTTPClientReverseProxy{
BaseURL: baseURL,
Timeout: timeout,
@@ -23,6 +23,13 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli
return http.ErrUseLastResponse
}

// These overrides for the default client enable re-use of connections and prevent
// CoreDNS from rate limiting the gateway under high traffic
//
// See also two similar projects where this value was updated:
// https://github.com/prometheus/prometheus/pull/3592
// https://github.com/minio/minio/pull/5860

// Taken from http.DefaultTransport in Go 1.11
h.Client.Transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
@@ -31,8 +38,8 @@ func NewHTTPClientReverseProxy(baseURL *url.URL, timeout time.Duration) *HTTPCli
KeepAlive: timeout,
DualStack: true,
}).DialContext,
MaxIdleConns: 20000, // Overriden via https://github.com/errordeveloper/prometheus/commit/1f74477646aea93bebb7c098affa8e05132abb0c
MaxIdleConnsPerHost: 1024, // Overriden via https://github.com/minio/minio/pull/5860
MaxIdleConns: maxIdleConns,
MaxIdleConnsPerHost: maxIdleConnsPerHost,
IdleConnTimeout: 90 * time.Second,
TLSHandshakeTimeout: 10 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
@@ -114,6 +114,29 @@ func (ReadConfig) Read(hasEnv HasEnv) GatewayConfig {
cfg.SecretMountPath = secretPath
cfg.ScaleFromZero = parseBoolValue(hasEnv.Getenv("scale_from_zero"))

cfg.MaxIdleConns = 1024
cfg.MaxIdleConnsPerHost = 1024

maxIdleConns := hasEnv.Getenv("max_idle_conns")
if len(maxIdleConns) > 0 {
val, err := strconv.Atoi(maxIdleConns)
if err != nil {
log.Println("Invalid value for max_idle_conns")
} else {
cfg.MaxIdleConns = val
}
}

maxIdleConnsPerHost := hasEnv.Getenv("max_idle_conns_per_host")
if len(maxIdleConnsPerHost) > 0 {
val, err := strconv.Atoi(maxIdleConnsPerHost)
if err != nil {
log.Println("Invalid value for max_idle_conns_per_host")
} else {
cfg.MaxIdleConnsPerHost = val
}
}

return cfg
}

@@ -155,8 +178,13 @@ type GatewayConfig struct {

// SecretMountPath specifies where to read secrets from for embedded basic auth
SecretMountPath string

// Enable the gateway to scale any service from 0 replicas to its configured "min replicas"
ScaleFromZero bool

MaxIdleConns int

MaxIdleConnsPerHost int
}

// UseNATS Use NATSor not
@@ -4,6 +4,7 @@
package types

import (
"fmt"
"testing"
"time"
)
@@ -260,3 +261,41 @@ func TestRead_BasicAuth_SetTrue(t *testing.T) {
t.Fail()
}
}

func TestRead_MaxIdleConnsDefaults(t *testing.T) {
defaults := NewEnvBucket()

readConfig := ReadConfig{}

config := readConfig.Read(defaults)

if config.MaxIdleConns != 1024 {
t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 1024, config.MaxIdleConns)
t.Fail()
}

if config.MaxIdleConnsPerHost != 1024 {
t.Logf("config.MaxIdleConnsPerHost, want: %d, got: %d\n", 1024, config.MaxIdleConnsPerHost)
t.Fail()
}
}

func TestRead_MaxIdleConns_Override(t *testing.T) {
defaults := NewEnvBucket()

readConfig := ReadConfig{}
defaults.Setenv("max_idle_conns", fmt.Sprintf("%d", 100))
defaults.Setenv("max_idle_conns_per_host", fmt.Sprintf("%d", 2))

config := readConfig.Read(defaults)

if config.MaxIdleConns != 100 {
t.Logf("config.MaxIdleConns, want: %d, got: %d\n", 100, config.MaxIdleConns)
t.Fail()
}

if config.MaxIdleConnsPerHost != 2 {
t.Logf("config.MaxIdleConnsPerHost, want: %d, got: %d\n", 2, config.MaxIdleConnsPerHost)
t.Fail()
}
}

0 comments on commit 299e5a5

Please sign in to comment.