diff --git a/apis/voyager/v1beta1/annotations.go b/apis/voyager/v1beta1/annotations.go index e508826c6..8b4e0c7c3 100644 --- a/apis/voyager/v1beta1/annotations.go +++ b/apis/voyager/v1beta1/annotations.go @@ -205,6 +205,7 @@ const ( HSTSIncludeSubDomains = IngressKey + "/hsts-include-subdomains" WhitelistSourceRange = IngressKey + "/whitelist-source-range" + MaxConnections = IngressKey + "/max-connections" ) const ( @@ -308,6 +309,11 @@ func (r Ingress) WhitelistSourceRange() string { return GetString(r.Annotations, WhitelistSourceRange) } +func (r Ingress) MaxConnections() int { + v, _ := GetInt(r.Annotations, MaxConnections) + return v +} + func (r Ingress) ProxyBodySize() string { return GetString(r.Annotations, ProxyBodySize) } diff --git a/hack/docker/voyager/templates/default-backend.cfg b/hack/docker/voyager/templates/default-backend.cfg index e8c6c7009..a32e7d152 100644 --- a/hack/docker/voyager/templates/default-backend.cfg +++ b/hack/docker/voyager/templates/default-backend.cfg @@ -23,6 +23,6 @@ backend {{ .DefaultBackend.Name }} http-request redirect location http://{{$e.ExternalName}}:{{ $e.Port }} code 301 unless https {{ end -}} {{- else }} - server {{ $e.Name }} {{ $e.IP }}:{{ $e.Port -}} {{ if $e.Weight }} weight {{ $e.Weight }}{{ end -}} {{ if $.DefaultBackend.Sticky }} cookie {{ $e.Name }}{{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} + server {{ $e.Name }} {{ $e.IP }}:{{ $e.Port -}} {{ if $e.MaxConnections }} maxconn {{ $e.MaxConnections }} {{ end -}} {{ if $e.Weight }} weight {{ $e.Weight }}{{ end -}} {{ if $.DefaultBackend.Sticky }} cookie {{ $e.Name }}{{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} {{ end -}} {{ end -}} diff --git a/hack/docker/voyager/templates/global.cfg b/hack/docker/voyager/templates/global.cfg index f577aca0a..85e442f84 100644 --- a/hack/docker/voyager/templates/global.cfg +++ b/hack/docker/voyager/templates/global.cfg @@ -3,7 +3,7 @@ global stats socket /tmp/haproxy server-state-file global server-state-base /var/state/haproxy/ - maxconn 4000 + {{ if .MaxConnections }}maxconn {{ .MaxConnections }}{{ end }} # log using a syslog socket log /dev/log local0 info log /dev/log local0 notice diff --git a/hack/docker/voyager/templates/http-backend.cfg b/hack/docker/voyager/templates/http-backend.cfg index 83cf765de..0f5f35ad2 100644 --- a/hack/docker/voyager/templates/http-backend.cfg +++ b/hack/docker/voyager/templates/http-backend.cfg @@ -22,7 +22,7 @@ backend {{ $path.Backend.Name }} http-request redirect location {{ if $.OffloadSSL }}https://{{ else }}http://{{ end }}{{$e.ExternalName}}:{{ $e.Port }} code 301 {{- end }} {{- else }} - server {{ $e.Name }} {{ $e.IP }}:{{ $e.Port -}} {{ if $e.Weight }} weight {{ $e.Weight }} {{ end -}} {{ if $path.Backend.Sticky }} cookie {{ backend_hash $e.Name $index $path.Backend.StickyCookieHash }} {{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} + server {{ $e.Name }} {{ $e.IP }}:{{ $e.Port -}} {{ if $e.MaxConnections }} maxconn {{ $e.MaxConnections }} {{ end -}} {{ if $e.Weight }} weight {{ $e.Weight }} {{ end -}} {{ if $path.Backend.Sticky }} cookie {{ backend_hash $e.Name $index $path.Backend.StickyCookieHash }} {{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} {{ end -}} {{ end }} {{ end -}} diff --git a/hack/docker/voyager/templates/tcp-backend.cfg b/hack/docker/voyager/templates/tcp-backend.cfg index ebcd9a2d2..4a3266962 100644 --- a/hack/docker/voyager/templates/tcp-backend.cfg +++ b/hack/docker/voyager/templates/tcp-backend.cfg @@ -14,6 +14,6 @@ backend {{ .Backend.Name }} {{- if $e.ExternalName }} server {{ $e.Name }} {{ $e.ExternalName }}:{{ $e.Port -}} {{ if $e.DNSResolver }} {{ if $e.CheckHealth }} check{{ end }} resolvers {{ $e.DNSResolver }} resolve-prefer ipv4{{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} {{- else }} - server {{ $e.Name }} {{ $e.IP }}:{{ $e.Port -}} {{ if $e.Weight }} weight {{ $e.Weight }}{{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} + server {{ $e.Name }} {{ $e.IP }}:{{ $e.Port -}} {{ if $e.MaxConnections }} maxconn {{ $e.MaxConnections }} {{ end -}} {{ if $e.Weight }} weight {{ $e.Weight }}{{ end -}} {{ if $e.TLSOption }} {{ $e.TLSOption }} {{ end -}} {{ end -}} {{ end -}} diff --git a/pkg/haproxy/template_test.go b/pkg/haproxy/template_test.go index 6de80cb34..bb1522f83 100644 --- a/pkg/haproxy/template_test.go +++ b/pkg/haproxy/template_test.go @@ -57,6 +57,7 @@ func TestTemplate(t *testing.T) { {Name: "first", IP: "10.244.2.2", Port: "2324"}, }, }, + MaxConnections: 3000, } testParsedConfig := TemplateData{ SharedInfo: si, @@ -255,6 +256,22 @@ func TestTemplate(t *testing.T) { FrontendName: "with-whitelist-http", OffloadSSL: true, }, + { + SharedInfo: si, + FrontendName: "http-with-backend-maxconn", + Port: 80, + Paths: []*HTTPPath{ + { + Backend: Backend{ + Name: "backend-maxconn", + Endpoints: []*Endpoint{ + {Name: "first", IP: "10.244.2.1", Port: "2323", MaxConnections: 20, Weight: 2}, + {Name: "second", IP: "10.244.2.2", Port: "2323", Weight: 5}, + }, + }, + }, + }, + }, }, TCPService: []*TCPService{ { diff --git a/pkg/haproxy/types.go b/pkg/haproxy/types.go index 326293eb8..bcae7aa16 100644 --- a/pkg/haproxy/types.go +++ b/pkg/haproxy/types.go @@ -29,6 +29,7 @@ type SharedInfo struct { HSTSPreload bool HSTSIncludeSubDomains bool WhitelistSourceRange string + MaxConnections int } type StatsInfo struct { @@ -106,6 +107,7 @@ type Endpoint struct { IP string Port string Weight int + MaxConnections int ExternalName string UseDNSResolver bool DNSResolver string diff --git a/pkg/ingress/parser.go b/pkg/ingress/parser.go index 3e051e782..86b482647 100644 --- a/pkg/ingress/parser.go +++ b/pkg/ingress/parser.go @@ -133,6 +133,9 @@ func (c *controller) getEndpoints(s *apiv1.Service, servicePort *apiv1.ServicePo if val, ok := pod.Annotations[api.BackendWeight]; ok { ep.Weight, _ = strconv.Atoi(val) } + if val, ok := pod.Annotations[api.MaxConnections]; ok { + ep.MaxConnections, _ = strconv.Atoi(val) + } } } } @@ -215,6 +218,7 @@ func (c *controller) generateConfig() error { HSTSPreload: c.Ingress.HSTSPreload(), HSTSIncludeSubDomains: c.Ingress.HSTSIncludeSubDomains(), WhitelistSourceRange: c.Ingress.WhitelistSourceRange(), + MaxConnections: c.Ingress.MaxConnections(), } if c.Opt.CloudProvider == "aws" && c.Ingress.LBType() == api.LBTypeLoadBalancer { si.AcceptProxy = c.Ingress.KeepSourceIP() diff --git a/test/e2e/ingress_ops.go b/test/e2e/ingress_ops.go index af7a61c76..f95270644 100644 --- a/test/e2e/ingress_ops.go +++ b/test/e2e/ingress_ops.go @@ -645,4 +645,220 @@ var _ = Describe("IngressOperations", func() { // TODO @ dipta: how to test if whitelist is actually working? }) }) + + Describe("With Global MaxConnections (1) Specified", func() { + BeforeEach(func() { + ing.Annotations[api.MaxConnections] = "1" + ing.Annotations[api.DefaultsTimeOut] = `{"connect": "300s", "server": "300s"}` + }) + + It("Should Allow 1 Connection Concurrently", func() { + By("Getting HTTP endpoints") + + eps, err := f.Ingress.GetHTTPEndpoints(ing) + Expect(err).NotTo(HaveOccurred()) + Expect(len(eps)).Should(BeNumerically(">=", 1)) + + errChan := make(chan error) + go func() { + // request-1: take 30s to response + errChan <- f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 300, "", ing, eps, "GET", + "/testpath/ok?delay=30s", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + }() + + time.Sleep(time.Second * 5) // to ensure request-1 always hits server before request-2 + + // request-2: responses instantaneously + err = f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 5, "", ing, eps, "GET", + "/testpath/ok", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + + // request-1 should block request-2 since maxconn = 1 + // request-2 should be timeout (sleep: 5s + client-timeout: 5s < request-1: 30s) + Expect(err).To(HaveOccurred()) + Expect(<-errChan).NotTo(HaveOccurred()) // check request-1 + + }) + }) + + Describe("With Global MaxConnections (2) Specified", func() { + BeforeEach(func() { + ing.Annotations[api.MaxConnections] = "2" + ing.Annotations[api.DefaultsTimeOut] = `{"connect": "300s", "server": "300s"}` + }) + + It("Should Allow 2 Connections Concurrently", func() { + By("Getting HTTP endpoints") + + eps, err := f.Ingress.GetHTTPEndpoints(ing) + Expect(err).NotTo(HaveOccurred()) + Expect(len(eps)).Should(BeNumerically(">=", 1)) + + errChan := make(chan error) + go func() { + // request-1: take 30s to response + errChan <- f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 300, "", ing, eps, "GET", + "/testpath/ok?delay=30s", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + }() + + time.Sleep(time.Second * 5) // to ensure request-1 always hits server before request-2 + + // request-2: responses instantaneously + err = f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 5, "", ing, eps, "GET", + "/testpath/ok", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + + Expect(err).NotTo(HaveOccurred()) // request-1 should not block request-2 since maxconn = 2 + Expect(<-errChan).NotTo(HaveOccurred()) // check request-1 + + }) + }) + + Describe("With Pod MaxConnections (1) Specified", func() { + BeforeEach(func() { + meta, err := f.Ingress.CreateResourceWithBackendMaxConn(1) + Expect(err).NotTo(HaveOccurred()) + + ing.Spec.Rules = []api.IngressRule{ + { + IngressRuleValue: api.IngressRuleValue{ + HTTP: &api.HTTPIngressRuleValue{ + Paths: []api.HTTPIngressPath{ + { + Path: "/testpath", + Backend: api.HTTPIngressBackend{ + IngressBackend: api.IngressBackend{ + ServiceName: meta.Name, + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + } + + ing.Annotations[api.DefaultsTimeOut] = `{"connect": "300s", "server": "300s"}` + }) + + It("Should Allow 1 Connection Concurrently", func() { + By("Getting HTTP endpoints") + + eps, err := f.Ingress.GetHTTPEndpoints(ing) + Expect(err).NotTo(HaveOccurred()) + Expect(len(eps)).Should(BeNumerically(">=", 1)) + + errChan := make(chan error) + go func() { + // request-1: take 30s to response + errChan <- f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 300, "", ing, eps, "GET", + "/testpath/ok?delay=30s", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + }() + + time.Sleep(time.Second * 5) // to ensure request-1 always hits server before request-2 + + // request-2: responses instantaneously + err = f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 5, "", ing, eps, "GET", + "/testpath/ok", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + + // request-1 should block request-2 since maxconn = 1 + // request-2 should be timeout (sleep: 5s + client-timeout: 5s < request-1: 30s) + Expect(err).To(HaveOccurred()) + Expect(<-errChan).NotTo(HaveOccurred()) // check request-1 + + }) + }) + + Describe("With Pod MaxConnections (2) Specified", func() { + BeforeEach(func() { + meta, err := f.Ingress.CreateResourceWithBackendMaxConn(2) + Expect(err).NotTo(HaveOccurred()) + + ing.Spec.Rules = []api.IngressRule{ + { + IngressRuleValue: api.IngressRuleValue{ + HTTP: &api.HTTPIngressRuleValue{ + Paths: []api.HTTPIngressPath{ + { + Path: "/testpath", + Backend: api.HTTPIngressBackend{ + IngressBackend: api.IngressBackend{ + ServiceName: meta.Name, + ServicePort: intstr.FromInt(80), + }, + }, + }, + }, + }, + }, + }, + } + + ing.Annotations[api.DefaultsTimeOut] = `{"connect": "300s", "server": "300s"}` + }) + + It("Should Allow 2 Connections Concurrently", func() { + By("Getting HTTP endpoints") + + eps, err := f.Ingress.GetHTTPEndpoints(ing) + Expect(err).NotTo(HaveOccurred()) + Expect(len(eps)).Should(BeNumerically(">=", 1)) + + errChan := make(chan error) + go func() { + // request-1: take 30s to response + errChan <- f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 300, "", ing, eps, "GET", + "/testpath/ok?delay=30s", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + }() + + time.Sleep(time.Second * 5) // to ensure request-1 always hits server before request-2 + + // request-2: responses instantaneously + err = f.Ingress.DoHTTPWithTimeout(framework.NoRetry, 5, "", ing, eps, "GET", + "/testpath/ok", + func(r *testserverclient.Response) bool { + return Expect(r.Status).Should(Equal(http.StatusOK)) && + Expect(r.Method).Should(Equal("GET")) && + Expect(r.Path).Should(Equal("/testpath/ok")) + }) + + Expect(err).NotTo(HaveOccurred()) // request-1 should not block request-2 since maxconn = 2 + Expect(<-errChan).NotTo(HaveOccurred()) // check request-1 + + }) + }) }) diff --git a/test/framework/ingress_suite.go b/test/framework/ingress_suite.go index df73f6ae0..9bd1e40e0 100644 --- a/test/framework/ingress_suite.go +++ b/test/framework/ingress_suite.go @@ -18,7 +18,7 @@ import ( ) const ( - testServerImage = "appscode/test-server:2.0" + testServerImage = "appscode/test-server:2.2" ) var ( @@ -196,6 +196,21 @@ func (i *ingressInvocation) DoHTTP(retryCount int, host string, ing *api_v1beta1 return nil } +func (i *ingressInvocation) DoHTTPWithTimeout(retryCount int, timeout int, host string, ing *api_v1beta1.Ingress, eps []string, method, path string, matcher func(resp *testserverclient.Response) bool) error { + for _, url := range eps { + resp, err := testserverclient.NewTestHTTPClientWithTimeout(url, timeout).WithHost(host).Method(method).Path(path).DoWithRetry(retryCount) + if err != nil { + return err + } + + log.Infoln("HTTP Response received from server", *resp) + if !matcher(resp) { + return errors.New("Failed to match") + } + } + return nil +} + func (i *ingressInvocation) DoHTTPWithHeader(retryCount int, ing *api_v1beta1.Ingress, eps []string, method, path string, h map[string]string, matcher func(resp *testserverclient.Response) bool) error { for _, url := range eps { resp, err := testserverclient.NewTestHTTPClient(url).Method(method).Header(h).Path(path).DoWithRetry(retryCount) diff --git a/test/framework/ingress_util.go b/test/framework/ingress_util.go index 0c917bf83..f1573607c 100644 --- a/test/framework/ingress_util.go +++ b/test/framework/ingress_util.go @@ -6,6 +6,7 @@ import ( "fmt" "net/url" "os/exec" + "strconv" "strings" "text/template" "time" @@ -886,3 +887,85 @@ func (i *ingressInvocation) DeleteResourceWithBackendWeight(meta metav1.ObjectMe OrphanDependents: &orphan, }) } + +func (i *ingressInvocation) CreateResourceWithBackendMaxConn(maxconn int) (metav1.ObjectMeta, error) { + meta := metav1.ObjectMeta{ + Name: i.UniqueName(), + Namespace: i.Namespace(), + } + _, err := i.KubeClient.CoreV1().Services(i.Namespace()).Create(&apiv1.Service{ + ObjectMeta: meta, + Spec: apiv1.ServiceSpec{ + Ports: []apiv1.ServicePort{ + { + Name: "http-1", + Port: 80, + TargetPort: intstr.FromInt(8080), + Protocol: "TCP", + }, + }, + Selector: map[string]string{ + "app": "deployment", + }, + }, + }) + if err != nil { + return meta, err + } + + _, err = i.KubeClient.ExtensionsV1beta1().Deployments(i.Namespace()).Create(&extensions.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "dep-1-" + meta.Name, + Namespace: meta.Namespace, + }, + Spec: extensions.DeploymentSpec{ + Replicas: types.Int32P(1), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "deployment", + "app-version": "v1", + }, + }, + Template: apiv1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "deployment", + "app-version": "v1", + }, + Annotations: map[string]string{ + api_v1beta1.MaxConnections: strconv.Itoa(maxconn), + }, + }, + Spec: apiv1.PodSpec{ + Containers: []apiv1.Container{ + { + Name: "server", + Image: "appscode/test-server:2.2", + Env: []apiv1.EnvVar{ + { + Name: "POD_NAME", + ValueFrom: &apiv1.EnvVarSource{ + FieldRef: &apiv1.ObjectFieldSelector{ + FieldPath: "metadata.name", + }, + }, + }, + }, + Ports: []apiv1.ContainerPort{ + { + Name: "http-1", + ContainerPort: 8080, + }, + }, + }, + }, + }, + }, + }, + }) + if err != nil { + return meta, err + } + + return meta, nil +} diff --git a/test/test-server/server.go b/test/test-server/server.go index 7e2a8e460..feec5276a 100644 --- a/test/test-server/server.go +++ b/test/test-server/server.go @@ -9,6 +9,7 @@ import ( "os" "os/signal" "syscall" + "time" ) type Response struct { @@ -27,6 +28,10 @@ type HttpServerHandler struct { } func (h HttpServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if delay, err := time.ParseDuration(r.URL.Query().Get("delay")); err == nil { + time.Sleep(delay) + } + resp := &Response{ Type: "http", PodName: os.Getenv("POD_NAME"), @@ -50,6 +55,10 @@ type HttpsServerHandler struct { } func (h HttpsServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if delay, err := time.ParseDuration(r.URL.Query().Get("delay")); err == nil { + time.Sleep(delay) + } + resp := &Response{ Type: "http", PodName: os.Getenv("POD_NAME"), diff --git a/test/test-server/setup.sh b/test/test-server/setup.sh index b5921d08c..368f67ffb 100755 --- a/test/test-server/setup.sh +++ b/test/test-server/setup.sh @@ -1,6 +1,6 @@ #!/usr/bin/env bash -VERSION=2.0 +VERSION=2.2 build() { rm -rf dist/* diff --git a/test/test-server/testserverclient/http.go b/test/test-server/testserverclient/http.go index c77cfafd0..ca84cabbd 100644 --- a/test/test-server/testserverclient/http.go +++ b/test/test-server/testserverclient/http.go @@ -47,6 +47,14 @@ func NewTestHTTPClient(url string) *httpClient { } } +func NewTestHTTPClientWithTimeout(url string, timeout int) *httpClient { + url = strings.TrimSuffix(url, "/") + return &httpClient{ + client: &http.Client{Timeout: time.Second * time.Duration(timeout)}, + baseURL: url, + } +} + func (t *httpClient) WithCert(cert string) *httpClient { caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM([]byte(cert))