From e0dece48f785fd5dd4c93415fa3894f9febc04b4 Mon Sep 17 00:00:00 2001 From: Elvin Efendi Date: Thu, 24 Dec 2020 11:39:12 -0500 Subject: [PATCH] Add Global Rate Limiting support --- build/test-lua.sh | 1 + .../nginx-configuration/annotations.md | 46 +++- .../nginx-configuration/configmap.md | 22 ++ internal/ingress/annotations/annotations.go | 3 + .../annotations/globalratelimit/main.go | 111 ++++++++ .../annotations/globalratelimit/main_test.go | 179 ++++++++++++ internal/ingress/controller/config/config.go | 88 ++++-- internal/ingress/controller/controller.go | 15 +- .../ingress/controller/template/configmap.go | 1 + .../ingress/controller/template/template.go | 74 +++++ .../controller/template/template_test.go | 83 ++++++ internal/ingress/types.go | 5 + internal/ingress/types_equals.go | 3 + rootfs/etc/nginx/lua/global_throttle.lua | 131 +++++++++ rootfs/etc/nginx/lua/lua_ingress.lua | 3 + .../nginx/lua/test/global_throttle_test.lua | 258 ++++++++++++++++++ rootfs/etc/nginx/template/nginx.tmpl | 1 + test/e2e/annotations/globalratelimit.go | 83 ++++++ test/e2e/framework/framework.go | 12 +- test/e2e/framework/k8s.go | 2 +- test/e2e/settings/globalratelimit.go | 96 +++++++ 21 files changed, 1179 insertions(+), 38 deletions(-) create mode 100644 internal/ingress/annotations/globalratelimit/main.go create mode 100644 internal/ingress/annotations/globalratelimit/main_test.go create mode 100644 rootfs/etc/nginx/lua/global_throttle.lua create mode 100644 rootfs/etc/nginx/lua/test/global_throttle_test.lua create mode 100644 test/e2e/annotations/globalratelimit.go create mode 100644 test/e2e/settings/globalratelimit.go diff --git a/build/test-lua.sh b/build/test-lua.sh index c9b3ec53e57..3b00cf35afb 100755 --- a/build/test-lua.sh +++ b/build/test-lua.sh @@ -34,4 +34,5 @@ resty \ --shdict "balancer_ewma 1M" \ --shdict "balancer_ewma_last_touched_at 1M" \ --shdict "balancer_ewma_locks 512k" \ + --shdict "global_throttle_cache 5M" \ ./rootfs/etc/nginx/lua/test/run.lua ${BUSTED_ARGS} ./rootfs/etc/nginx/lua/test/ ./rootfs/etc/nginx/lua/plugins/**/test diff --git a/docs/user-guide/nginx-configuration/annotations.md b/docs/user-guide/nginx-configuration/annotations.md index 37e1f87616d..bb4e7635670 100755 --- a/docs/user-guide/nginx-configuration/annotations.md +++ b/docs/user-guide/nginx-configuration/annotations.md @@ -56,6 +56,10 @@ You can add these Kubernetes annotations to specific Ingress objects to customiz |[nginx.ingress.kubernetes.io/http2-push-preload](#http2-push-preload)|"true" or "false"| |[nginx.ingress.kubernetes.io/limit-connections](#rate-limiting)|number| |[nginx.ingress.kubernetes.io/limit-rps](#rate-limiting)|number| +|[nginx.ingress.kubernetes.io/global-rate-limit](#global-rate-limiting)|number| +|[nginx.ingress.kubernetes.io/global-rate-limit-window](#global-rate-limiting)|duration| +|[nginx.ingress.kubernetes.io/global-rate-limit-key](#global-rate-limiting)|string| +|[nginx.ingress.kubernetes.io/global-rate-limit-ignored-cidrs](#global-rate-limiting)|string| |[nginx.ingress.kubernetes.io/permanent-redirect](#permanent-redirect)|string| |[nginx.ingress.kubernetes.io/permanent-redirect-code](#permanent-redirect-code)|number| |[nginx.ingress.kubernetes.io/temporal-redirect](#temporal-redirect)|string| @@ -474,7 +478,7 @@ By default the controller redirects all requests to an existing service that pro !!! note For more information please see [global-auth-url](./configmap.md#global-auth-url). -### Rate limiting +### Rate Limiting These annotations define limits on connections and transmission rates. These can be used to mitigate [DDoS Attacks](https://www.nginx.com/blog/mitigating-ddos-attacks-with-nginx-and-nginx-plus). @@ -492,6 +496,46 @@ To configure settings globally for all Ingress rules, the `limit-rate-after` and The client IP address will be set based on the use of [PROXY protocol](./configmap.md#use-proxy-protocol) or from the `X-Forwarded-For` header value when [use-forwarded-headers](./configmap.md#use-forwarded-headers) is enabled. +### Global Rate Limiting + +**Note:** Be careful when configuring both (Local) Rate Limiting and Global Rate Limiting at the same time. +They are two completely different rate limiting implementations. Whichever limit exceeds first will reject the +requests. It might be a good idea to configure both of them to ease load on Global Rate Limiting backend +in cases of spike in traffic. + +The stock NGINX rate limiting does not share its counters among different NGINX instances. +Given that most ingress-nginx deployments are elastic and number of replicas can change any day +it is impossible to configure a proper rate limit using stock NGINX functionalities. +Global Rate Limiting overcome this by using [lua-resty-global-throttle](https://github.com/ElvinEfendi/lua-resty-global-throttle). `lua-resty-global-throttle` shares its counters via a central store such as `memcached`. +The obvious shortcoming of this is users have to deploy and operate a `memcached` instance +in order to benefit from this functionality. Configure the `memcached` +using [these configmap settings](./configmap.md#memcached). + +**Here are a few remarks for ingress-nginx integration of `lua-resty-global-throttle`:** + +1. We minimize `memcached` access by caching exceeding limit decisions. The expiry of +cache entry is the desired delay `lua-resty-global-throttle` calculates for us. +The Lua Shared Dictionary used for that is `global_throttle_cache`. Currently its size defaults to 10M. +Customize it as per your needs using [lua-shared-dicts](./configmap.md#lua-shared-dicts). +When we fail to cache the exceeding limit decision then we log an NGINX error. You can monitor +for that error to decide if you need to bump the cache size. Without cache the cost of processing a +request is two memcached commands: `GET`, and `INCR`. With the cache it is only `INCR`. +1. Log NGINX variable `$global_rate_limit_exceeding`'s value to have some visibility into +what portion of requests are rejected (value `y`), whether they are rejected using cached decision (value `c`), +or if they are not rejeced (default value `n`). You can use [log-format-upstream](./configmap.md#log-format-upstream) +to include that in access logs. +1. In case of an error it will log the error message and **fail open**. +1. The annotations below creates Global Rate Limiting instance per ingress. +That means if there are multuple paths configured under the same ingress, +the Global Rate Limiting will count requests to all the paths under the same counter. +Extract a path out into its own ingres if you need to isolate a certain path. + + +* `nginx.ingress.kubernetes.io/global-rate-limit`: Configures maximum allowed number of requests per window. Required. +* `nginx.ingress.kubernetes.io/global-rate-limit-window`: Configures a time window (i.e `1m`) that the limit is applied. Required. +* `nginx.ingress.kubernetes.io/global-rate-limit-key`: Configures a key for counting the samples. Defaults to `$remote_addr`. You can also combine multiple NGINX variables here, like `${remote_addr}-${http_x_api_client}` which would mean the limit will be applied to requests coming from the same API client (indicated by `X-API-Client` HTTP request header) with the same source IP address. +* `nginx.ingress.kubernetes.io/global-rate-limit-ignored-cidrs`: comma separated list of IPs and CIDRs to match client IP against. When there's a match request is not considered for rate limiting. + ### Permanent Redirect This annotation allows to return a permanent redirect (Return Code 301) instead of sending data to the upstream. For example `nginx.ingress.kubernetes.io/permanent-redirect: https://www.google.com` would redirect everything to Google. diff --git a/docs/user-guide/nginx-configuration/configmap.md b/docs/user-guide/nginx-configuration/configmap.md index 82f318874f6..0bead1c34cf 100755 --- a/docs/user-guide/nginx-configuration/configmap.md +++ b/docs/user-guide/nginx-configuration/configmap.md @@ -192,6 +192,12 @@ The following table shows a configuration option's name, type, and the default v |[block-referers](#block-referers)|[]string|""| |[proxy-ssl-location-only](#proxy-ssl-location-only)|bool|"false"| |[default-type](#default-type)|string|"text/html"| +|[global-rate-limit-memcached-host](#global-rate-limit)|string|""| +|[global-rate-limit-memcached-port](#global-rate-limit)|int|11211| +|[global-rate-limit-memcached-connect-timeout](#global-rate-limit)|int|50| +|[global-rate-limit-memcached-max-idle-timeout](#global-rate-limit)|int|10000| +|[global-rate-limit-memcached-pool-size](#global-rate-limit)|int|50| +|[global-rate-limit-status-code](#global-rate-limit)|int|429| ## add-headers @@ -1152,3 +1158,19 @@ _**default:**_ text/html _References:_ [http://nginx.org/en/docs/http/ngx_http_core_module.html#default_type](http://nginx.org/en/docs/http/ngx_http_core_module.html#default_type) + +## global-rate-limit + +* `global-rate-limit-status-code`: configure HTTP status code to return when rejecting requests. Defaults to 429. + +Configure `memcached` client for [Global Rate Limiting](https://github.com/kubernetes/ingress-nginx/blob/master/docs/user-guide/nginx-configuration/annotations.md#global-rate-limiting). + +* `global-rate-limit-memcached-host`: IP/FQDN of memcached server to use. Required to enable Global Rate Limiting. +* `global-rate-limit-memcached-port`: port of memcached server to use. Defaults default memcached port of `11211`. +* `global-rate-limit-memcached-connect-timeout`: configure timeout for connect, send and receive operations. Unit is millisecond. Defaults to 50ms. +* `global-rate-limit-memcached-max-idle-timeout`: configure timeout for cleaning idle connections. Unit is millisecond. Defaults to 50ms. +* `global-rate-limit-memcached-pool-size`: configure number of max connections to keep alive. Make sure your `memcached` server can handle +`global-rate-limit-memcached-pool-size * worker-processes * ` simultaneous connections. + +These settings get used by [lua-resty-global-throttle](https://github.com/ElvinEfendi/lua-resty-global-throttle) +that ingress-nginx includes. Refer to the link to learn more about `lua-resty-global-throttle`. diff --git a/internal/ingress/annotations/annotations.go b/internal/ingress/annotations/annotations.go index 0e9b7f214ea..2c3b3c1cf24 100644 --- a/internal/ingress/annotations/annotations.go +++ b/internal/ingress/annotations/annotations.go @@ -40,6 +40,7 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/customhttperrors" "k8s.io/ingress-nginx/internal/ingress/annotations/defaultbackend" "k8s.io/ingress-nginx/internal/ingress/annotations/fastcgi" + "k8s.io/ingress-nginx/internal/ingress/annotations/globalratelimit" "k8s.io/ingress-nginx/internal/ingress/annotations/http2pushpreload" "k8s.io/ingress-nginx/internal/ingress/annotations/influxdb" "k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist" @@ -94,6 +95,7 @@ type Ingress struct { Proxy proxy.Config ProxySSL proxyssl.Config RateLimit ratelimit.Config + GlobalRateLimit globalratelimit.Config Redirect redirect.Config Rewrite rewrite.Config Satisfy string @@ -142,6 +144,7 @@ func NewAnnotationExtractor(cfg resolver.Resolver) Extractor { "Proxy": proxy.NewParser(cfg), "ProxySSL": proxyssl.NewParser(cfg), "RateLimit": ratelimit.NewParser(cfg), + "GlobalRateLimit": globalratelimit.NewParser(cfg), "Redirect": redirect.NewParser(cfg), "Rewrite": rewrite.NewParser(cfg), "Satisfy": satisfy.NewParser(cfg), diff --git a/internal/ingress/annotations/globalratelimit/main.go b/internal/ingress/annotations/globalratelimit/main.go new file mode 100644 index 00000000000..e4b18bd662c --- /dev/null +++ b/internal/ingress/annotations/globalratelimit/main.go @@ -0,0 +1,111 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package globalratelimit + +import ( + "strings" + "time" + + "github.com/pkg/errors" + networking "k8s.io/api/networking/v1beta1" + + "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + ing_errors "k8s.io/ingress-nginx/internal/ingress/errors" + "k8s.io/ingress-nginx/internal/ingress/resolver" + "k8s.io/ingress-nginx/internal/net" + "k8s.io/ingress-nginx/internal/sets" +) + +const defaultKey = "$remote_addr" + +// Config encapsulates all global rate limit attributes +type Config struct { + Namespace string `json:"namespace"` + Limit int `json:"limit"` + WindowSize int `json:"window-size"` + Key string `json:"key"` + IgnoredCIDRs []string `json:"ignored-cidrs"` +} + +// Equal tests for equality between two Config types +func (l *Config) Equal(r *Config) bool { + if l.Namespace != r.Namespace { + return false + } + if l.Limit != r.Limit { + return false + } + if l.WindowSize != r.WindowSize { + return false + } + if l.Key != r.Key { + return false + } + if len(l.IgnoredCIDRs) != len(r.IgnoredCIDRs) || !sets.StringElementsMatch(l.IgnoredCIDRs, r.IgnoredCIDRs) { + return false + } + + return true +} + +type globalratelimit struct { + r resolver.Resolver +} + +// NewParser creates a new globalratelimit annotation parser +func NewParser(r resolver.Resolver) parser.IngressAnnotation { + return globalratelimit{r} +} + +// Parse extracts globalratelimit annotations from the given ingress +// and returns them structured as Config type +func (a globalratelimit) Parse(ing *networking.Ingress) (interface{}, error) { + config := &Config{} + + limit, _ := parser.GetIntAnnotation("global-rate-limit", ing) + rawWindowSize, _ := parser.GetStringAnnotation("global-rate-limit-window", ing) + + if limit == 0 || len(rawWindowSize) == 0 { + return config, nil + } + + windowSize, err := time.ParseDuration(rawWindowSize) + if err != nil { + return config, ing_errors.LocationDenied{ + Reason: errors.Wrap(err, "failed to parse 'global-rate-limit-window' value"), + } + } + + key, _ := parser.GetStringAnnotation("global-rate-limit-key", ing) + if len(key) == 0 { + key = defaultKey + } + + rawIgnoredCIDRs, _ := parser.GetStringAnnotation("global-rate-limit-ignored-cidrs", ing) + ignoredCIDRs, err := net.ParseCIDRs(rawIgnoredCIDRs) + if err != nil { + return nil, err + } + + config.Namespace = strings.Replace(string(ing.UID), "-", "", -1) + config.Limit = limit + config.WindowSize = int(windowSize.Seconds()) + config.Key = key + config.IgnoredCIDRs = ignoredCIDRs + + return config, nil +} diff --git a/internal/ingress/annotations/globalratelimit/main_test.go b/internal/ingress/annotations/globalratelimit/main_test.go new file mode 100644 index 00000000000..38da8f4a9ca --- /dev/null +++ b/internal/ingress/annotations/globalratelimit/main_test.go @@ -0,0 +1,179 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package globalratelimit + +import ( + "encoding/json" + "fmt" + "testing" + + "github.com/pkg/errors" + api "k8s.io/api/core/v1" + networking "k8s.io/api/networking/v1beta1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/ingress-nginx/internal/ingress/annotations/parser" + ing_errors "k8s.io/ingress-nginx/internal/ingress/errors" + "k8s.io/ingress-nginx/internal/ingress/resolver" +) + +const UID = "31285d47-b150-4dcf-bd6f-12c46d769f6e" +const expectedUID = "31285d47b1504dcfbd6f12c46d769f6e" + +func buildIngress() *networking.Ingress { + defaultBackend := networking.IngressBackend{ + ServiceName: "default-backend", + ServicePort: intstr.FromInt(80), + } + + return &networking.Ingress{ + ObjectMeta: meta_v1.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + UID: UID, + }, + Spec: networking.IngressSpec{ + Backend: &networking.IngressBackend{ + ServiceName: "default-backend", + ServicePort: intstr.FromInt(80), + }, + Rules: []networking.IngressRule{ + { + Host: "foo.bar.com", + IngressRuleValue: networking.IngressRuleValue{ + HTTP: &networking.HTTPIngressRuleValue{ + Paths: []networking.HTTPIngressPath{ + { + Path: "/foo", + Backend: defaultBackend, + }, + }, + }, + }, + }, + }, + }, + } +} + +type mockBackend struct { + resolver.Mock +} + +func TestGlobalRateLimiting(t *testing.T) { + ing := buildIngress() + + annRateLimit := parser.GetAnnotationWithPrefix("global-rate-limit") + annRateLimitWindow := parser.GetAnnotationWithPrefix("global-rate-limit-window") + annRateLimitKey := parser.GetAnnotationWithPrefix("global-rate-limit-key") + annRateLimitIgnoredCIDRs := parser.GetAnnotationWithPrefix("global-rate-limit-ignored-cidrs") + + testCases := []struct { + title string + annotations map[string]string + expectedConfig *Config + expectedErr error + }{ + { + "no annotation", + nil, + &Config{}, + nil, + }, + { + "minimum required annotations", + map[string]string{ + annRateLimit: "100", + annRateLimitWindow: "2m", + }, + &Config{ + Namespace: expectedUID, + Limit: 100, + WindowSize: 120, + Key: "$remote_addr", + IgnoredCIDRs: make([]string, 0), + }, + nil, + }, + { + "global-rate-limit-key annotation", + map[string]string{ + annRateLimit: "100", + annRateLimitWindow: "2m", + annRateLimitKey: "$http_x_api_user", + }, + &Config{ + Namespace: expectedUID, + Limit: 100, + WindowSize: 120, + Key: "$http_x_api_user", + IgnoredCIDRs: make([]string, 0), + }, + nil, + }, + { + "global-rate-limit-ignored-cidrs annotation", + map[string]string{ + annRateLimit: "100", + annRateLimitWindow: "2m", + annRateLimitKey: "$http_x_api_user", + annRateLimitIgnoredCIDRs: "127.0.0.1, 200.200.24.0/24", + }, + &Config{ + Namespace: expectedUID, + Limit: 100, + WindowSize: 120, + Key: "$http_x_api_user", + IgnoredCIDRs: []string{"127.0.0.1", "200.200.24.0/24"}, + }, + nil, + }, + { + "incorrect duration for window", + map[string]string{ + annRateLimit: "100", + annRateLimitWindow: "2mb", + annRateLimitKey: "$http_x_api_user", + }, + &Config{}, + ing_errors.LocationDenied{ + Reason: errors.Wrap(fmt.Errorf(`time: unknown unit "mb" in duration "2mb"`), + "failed to parse 'global-rate-limit-window' value"), + }, + }, + } + + for _, testCase := range testCases { + ing.SetAnnotations(testCase.annotations) + + i, actualErr := NewParser(mockBackend{}).Parse(ing) + if (testCase.expectedErr == nil || actualErr == nil) && testCase.expectedErr != actualErr { + t.Errorf("expected error 'nil' but got '%v'", actualErr) + } else if testCase.expectedErr != nil && actualErr != nil && + testCase.expectedErr.Error() != actualErr.Error() { + t.Errorf("expected error '%v' but got '%v'", testCase.expectedErr, actualErr) + } + + actualConfig := i.(*Config) + if !testCase.expectedConfig.Equal(actualConfig) { + expectedJSON, _ := json.Marshal(testCase.expectedConfig) + actualJSON, _ := json.Marshal(actualConfig) + t.Errorf("%v: expected config '%s' but got '%s'", testCase.title, expectedJSON, actualJSON) + } + } +} diff --git a/internal/ingress/controller/config/config.go b/internal/ingress/controller/config/config.go index b8c51bdb711..63bd65aac62 100644 --- a/internal/ingress/controller/config/config.go +++ b/internal/ingress/controller/config/config.go @@ -708,6 +708,31 @@ type Configuration struct { // http://nginx.org/en/docs/http/ngx_http_core_module.html#default_type // Default: text/html DefaultType string `json:"default-type"` + + // GlobalRateLimitMemcachedHost configures memcached host. + GlobalRateLimitMemcachedHost string `json:"global-rate-limit-memcached-host"` + + // GlobalRateLimitMemcachedPort configures memcached port. + GlobalRateLimitMemcachedPort int `json:"global-rate-limit-memcached-port"` + + // GlobalRateLimitMemcachedConnectTimeout configures timeout when connecting to memcached. + // The unit is millisecond. + GlobalRateLimitMemcachedConnectTimeout int `json:"global-rate-limit-memcached-connect-timeout"` + + // GlobalRateLimitMemcachedMaxIdleTimeout configured how long connections + // should be kept alive in idle state. The unit is millisecond. + GlobalRateLimitMemcachedMaxIdleTimeout int `json:"global-rate-limit-memcached-max-idle-timeout"` + + // GlobalRateLimitMemcachedPoolSize configures how many connections + // should be kept alive in the pool. + // Note that this is per NGINX worker. Make sure your memcached server can + // handle `MemcachedPoolSize * * ` + // simultaneous connections. + GlobalRateLimitMemcachedPoolSize int `json:"global-rate-limit-memcached-pool-size"` + + // GlobalRateLimitStatucCode determines the HTTP status code to return + // when limit is exceeding during global rate limiting. + GlobalRateLimitStatucCode int `json:"global-rate-limit-status-code"` } // NewDefault returns the default nginx configuration @@ -829,35 +854,40 @@ func NewDefault() Configuration { ProxyHTTPVersion: "1.1", ProxyMaxTempFileSize: "1024m", }, - UpstreamKeepaliveConnections: 320, - UpstreamKeepaliveTimeout: 60, - UpstreamKeepaliveRequests: 10000, - LimitConnZoneVariable: defaultLimitConnZoneVariable, - BindAddressIpv4: defBindAddress, - BindAddressIpv6: defBindAddress, - ZipkinCollectorPort: 9411, - ZipkinServiceName: "nginx", - ZipkinSampleRate: 1.0, - JaegerCollectorPort: 6831, - JaegerServiceName: "nginx", - JaegerSamplerType: "const", - JaegerSamplerParam: "1", - JaegerSamplerPort: 5778, - JaegerSamplerHost: "http://127.0.0.1", - DatadogServiceName: "nginx", - DatadogEnvironment: "prod", - DatadogCollectorPort: 8126, - DatadogOperationNameOverride: "nginx.handle", - DatadogSampleRate: 1.0, - DatadogPrioritySampling: true, - LimitReqStatusCode: 503, - LimitConnStatusCode: 503, - SyslogPort: 514, - NoTLSRedirectLocations: "/.well-known/acme-challenge", - NoAuthLocations: "/.well-known/acme-challenge", - GlobalExternalAuth: defGlobalExternalAuth, - ProxySSLLocationOnly: false, - DefaultType: "text/html", + UpstreamKeepaliveConnections: 320, + UpstreamKeepaliveTimeout: 60, + UpstreamKeepaliveRequests: 10000, + LimitConnZoneVariable: defaultLimitConnZoneVariable, + BindAddressIpv4: defBindAddress, + BindAddressIpv6: defBindAddress, + ZipkinCollectorPort: 9411, + ZipkinServiceName: "nginx", + ZipkinSampleRate: 1.0, + JaegerCollectorPort: 6831, + JaegerServiceName: "nginx", + JaegerSamplerType: "const", + JaegerSamplerParam: "1", + JaegerSamplerPort: 5778, + JaegerSamplerHost: "http://127.0.0.1", + DatadogServiceName: "nginx", + DatadogEnvironment: "prod", + DatadogCollectorPort: 8126, + DatadogOperationNameOverride: "nginx.handle", + DatadogSampleRate: 1.0, + DatadogPrioritySampling: true, + LimitReqStatusCode: 503, + LimitConnStatusCode: 503, + SyslogPort: 514, + NoTLSRedirectLocations: "/.well-known/acme-challenge", + NoAuthLocations: "/.well-known/acme-challenge", + GlobalExternalAuth: defGlobalExternalAuth, + ProxySSLLocationOnly: false, + DefaultType: "text/html", + GlobalRateLimitMemcachedPort: 11211, + GlobalRateLimitMemcachedConnectTimeout: 50, + GlobalRateLimitMemcachedMaxIdleTimeout: 10000, + GlobalRateLimitMemcachedPoolSize: 50, + GlobalRateLimitStatucCode: 429, } if klog.V(5).Enabled() { diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 3e4a91b5b47..f5e3d273351 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -232,6 +232,17 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error { k8s.SetDefaultNGINXPathType(ing) + cfg := n.store.GetBackendConfiguration() + cfg.Resolver = n.resolver + + if len(cfg.GlobalRateLimitMemcachedHost) == 0 { + for key := range ing.ObjectMeta.GetAnnotations() { + if strings.HasPrefix(key, fmt.Sprintf("%s/%s", parser.AnnotationsPrefix, "global-rate-limit")) { + return fmt.Errorf("'global-rate-limit*' annotations require 'global-rate-limit-memcached-host' settings configured in the global configmap") + } + } + } + allIngresses := n.store.ListIngresses() filter := func(toCheck *ingress.Ingress) bool { @@ -244,9 +255,6 @@ func (n *NGINXController) CheckIngress(ing *networking.Ingress) error { ParsedAnnotations: annotations.NewAnnotationExtractor(n.store).Extract(ing), }) - cfg := n.store.GetBackendConfiguration() - cfg.Resolver = n.resolver - _, servers, pcfg := n.getConfiguration(ings) err := checkOverlap(ing, allIngresses, servers) @@ -1253,6 +1261,7 @@ func locationApplyAnnotations(loc *ingress.Location, anns *annotations.Ingress) loc.Proxy = anns.Proxy loc.ProxySSL = anns.ProxySSL loc.RateLimit = anns.RateLimit + loc.GlobalRateLimit = anns.GlobalRateLimit loc.Redirect = anns.Redirect loc.Rewrite = anns.Rewrite loc.UpstreamVhost = anns.UpstreamVhost diff --git a/internal/ingress/controller/template/configmap.go b/internal/ingress/controller/template/configmap.go index a6a6712aa27..93a6b430d66 100644 --- a/internal/ingress/controller/template/configmap.go +++ b/internal/ingress/controller/template/configmap.go @@ -75,6 +75,7 @@ var ( "balancer_ewma_locks": 1, "certificate_servers": 5, "ocsp_response_cache": 5, // keep this same as certificate_servers + "global_throttle_cache": 10, } defaultGlobalAuthRedirectParam = "rd" ) diff --git a/internal/ingress/controller/template/template.go b/internal/ingress/controller/template/template.go index 4fe6ce7a71c..014bdfdb944 100644 --- a/internal/ingress/controller/template/template.go +++ b/internal/ingress/controller/template/template.go @@ -289,6 +289,13 @@ func configForLua(input interface{}) string { hsts_max_age = %v, hsts_include_subdomains = %t, hsts_preload = %t, + + global_throttle = { + memcached = { + host = "%v", port = %d, connect_timeout = %d, max_idle_timeout = %d, pool_size = %d, + }, + status_code = %d, + } }`, all.Cfg.UseForwardedHeaders, all.Cfg.UseProxyProtocol, @@ -301,6 +308,13 @@ func configForLua(input interface{}) string { all.Cfg.HSTSMaxAge, all.Cfg.HSTSIncludeSubdomains, all.Cfg.HSTSPreload, + + all.Cfg.GlobalRateLimitMemcachedHost, + all.Cfg.GlobalRateLimitMemcachedPort, + all.Cfg.GlobalRateLimitMemcachedConnectTimeout, + all.Cfg.GlobalRateLimitMemcachedMaxIdleTimeout, + all.Cfg.GlobalRateLimitMemcachedPoolSize, + all.Cfg.GlobalRateLimitStatucCode, ) } @@ -318,16 +332,28 @@ func locationConfigForLua(l interface{}, a interface{}) string { return "{}" } + ignoredCIDRs, err := convertGoSliceIntoLuaTable(location.GlobalRateLimit.IgnoredCIDRs, false) + if err != nil { + klog.Errorf("failed to convert %v into Lua table: %q", location.GlobalRateLimit.IgnoredCIDRs, err) + ignoredCIDRs = "{}" + } + return fmt.Sprintf(`{ force_ssl_redirect = %t, ssl_redirect = %t, force_no_ssl_redirect = %t, use_port_in_redirects = %t, + global_throttle = { namespace = "%v", limit = %d, window_size = %d, key = %v, ignored_cidrs = %v }, }`, location.Rewrite.ForceSSLRedirect, location.Rewrite.SSLRedirect, isLocationInLocationList(l, all.Cfg.NoTLSRedirectLocations), location.UsePortInRedirects, + location.GlobalRateLimit.Namespace, + location.GlobalRateLimit.Limit, + location.GlobalRateLimit.WindowSize, + parseComplexNginxVarIntoLuaTable(location.GlobalRateLimit.Key), + ignoredCIDRs, ) } @@ -1501,3 +1527,51 @@ func buildServerName(hostname string) string { return `~^(?[\w-]+)\.` + strings.Join(parts, "\\.") + `$` } + +// parseComplexNGINXVar parses things like "$my${complex}ngx\$var" into +// [["$var", "complex", "my", "ngx"]]. In other words, 2nd and 3rd elements +// in the result are actual NGINX variable names, whereas first and 4th elements +// are string literals. +func parseComplexNginxVarIntoLuaTable(ngxVar string) string { + r := regexp.MustCompile(`(\\\$[0-9a-zA-Z_]+)|\$\{([0-9a-zA-Z_]+)\}|\$([0-9a-zA-Z_]+)|(\$|[^$\\]+)`) + matches := r.FindAllStringSubmatch(ngxVar, -1) + components := make([][]string, len(matches)) + for i, match := range matches { + components[i] = match[1:] + } + + luaTable, err := convertGoSliceIntoLuaTable(components, true) + if err != nil { + klog.Errorf("unexpected error: %v", err) + luaTable = "{}" + } + return luaTable +} + +func convertGoSliceIntoLuaTable(goSliceInterface interface{}, emptyStringAsNil bool) (string, error) { + goSlice := reflect.ValueOf(goSliceInterface) + kind := goSlice.Kind() + + switch kind { + case reflect.String: + if emptyStringAsNil && len(goSlice.Interface().(string)) == 0 { + return "nil", nil + } + return fmt.Sprintf(`"%v"`, goSlice.Interface()), nil + case reflect.Int, reflect.Bool: + return fmt.Sprintf(`%v`, goSlice.Interface()), nil + case reflect.Slice, reflect.Array: + luaTable := "{ " + for i := 0; i < goSlice.Len(); i++ { + luaEl, err := convertGoSliceIntoLuaTable(goSlice.Index(i).Interface(), emptyStringAsNil) + if err != nil { + return "", err + } + luaTable = luaTable + luaEl + ", " + } + luaTable += "}" + return luaTable, nil + default: + return "", fmt.Errorf("could not process type: %s", kind) + } +} diff --git a/internal/ingress/controller/template/template_test.go b/internal/ingress/controller/template/template_test.go index 69a8fe9c5dc..abccb0d9c41 100644 --- a/internal/ingress/controller/template/template_test.go +++ b/internal/ingress/controller/template/template_test.go @@ -1472,3 +1472,86 @@ func TestBuildServerName(t *testing.T) { } } } + +func TestParseComplexNginxVarIntoLuaTable(t *testing.T) { + testCases := []struct { + ngxVar string + expectedLuaTable string + }{ + {"foo", `{ { nil, nil, nil, "foo", }, }`}, + {"$foo", `{ { nil, nil, "foo", nil, }, }`}, + {"${foo}", `{ { nil, "foo", nil, nil, }, }`}, + {"\\$foo", `{ { "\$foo", nil, nil, nil, }, }`}, + { + "foo\\$bar$baz${daz}xiyar$pomidor", + `{ { nil, nil, nil, "foo", }, { "\$bar", nil, nil, nil, }, { nil, nil, "baz", nil, }, ` + + `{ nil, "daz", nil, nil, }, { nil, nil, nil, "xiyar", }, { nil, nil, "pomidor", nil, }, }`, + }, + } + + for _, testCase := range testCases { + actualLuaTable := parseComplexNginxVarIntoLuaTable(testCase.ngxVar) + if actualLuaTable != testCase.expectedLuaTable { + t.Errorf("expected %v but returned %v", testCase.expectedLuaTable, actualLuaTable) + } + } +} + +func TestConvertGoSliceIntoLuaTablet(t *testing.T) { + testCases := []struct { + title string + goSlice interface{} + emptyStringAsNil bool + expectedLuaTable string + expectedErr error + }{ + { + "flat string slice", + []string{"one", "two", "three"}, + false, + `{ "one", "two", "three", }`, + nil, + }, + { + "nested string slice", + [][]string{{"one", "", "three"}, {"foo", "bar"}}, + false, + `{ { "one", "", "three", }, { "foo", "bar", }, }`, + nil, + }, + { + "converts empty string to nil when enabled", + [][]string{{"one", "", "three"}, {"foo", "bar"}}, + true, + `{ { "one", nil, "three", }, { "foo", "bar", }, }`, + nil, + }, + { + "boolean slice", + []bool{true, true, false}, + false, + `{ true, true, false, }`, + nil, + }, + { + "integer slice", + []int{4, 3, 6}, + false, + `{ 4, 3, 6, }`, + nil, + }, + } + + for _, testCase := range testCases { + actualLuaTable, err := convertGoSliceIntoLuaTable(testCase.goSlice, testCase.emptyStringAsNil) + if testCase.expectedErr != nil && err != nil && testCase.expectedErr.Error() != err.Error() { + t.Errorf("expected error '%v' but returned '%v'", testCase.expectedErr, err) + } + if testCase.expectedErr == nil && err != nil { + t.Errorf("expected error to be nil but returned '%v'", err) + } + if testCase.expectedLuaTable != actualLuaTable { + t.Errorf("%v: expected '%v' but returned '%v'", testCase.title, testCase.expectedLuaTable, actualLuaTable) + } + } +} diff --git a/internal/ingress/types.go b/internal/ingress/types.go index 9cafd6154e3..fabd66c98f9 100644 --- a/internal/ingress/types.go +++ b/internal/ingress/types.go @@ -28,6 +28,7 @@ import ( "k8s.io/ingress-nginx/internal/ingress/annotations/connection" "k8s.io/ingress-nginx/internal/ingress/annotations/cors" "k8s.io/ingress-nginx/internal/ingress/annotations/fastcgi" + "k8s.io/ingress-nginx/internal/ingress/annotations/globalratelimit" "k8s.io/ingress-nginx/internal/ingress/annotations/influxdb" "k8s.io/ingress-nginx/internal/ingress/annotations/ipwhitelist" "k8s.io/ingress-nginx/internal/ingress/annotations/log" @@ -270,6 +271,10 @@ type Location struct { // The Redirect annotation precedes RateLimit // +optional RateLimit ratelimit.Config `json:"rateLimit,omitempty"` + // GlobalRateLimit similar to RateLimit + // but this is applied globally across multiple replicas. + // +optional + GlobalRateLimit globalratelimit.Config `json:"globalRateLimit,omitempty"` // Redirect describes a temporal o permanent redirection this location. // +optional Redirect redirect.Config `json:"redirect,omitempty"` diff --git a/internal/ingress/types_equals.go b/internal/ingress/types_equals.go index 8e19f088102..ac7bf4bf65d 100644 --- a/internal/ingress/types_equals.go +++ b/internal/ingress/types_equals.go @@ -379,6 +379,9 @@ func (l1 *Location) Equal(l2 *Location) bool { if !(&l1.RateLimit).Equal(&l2.RateLimit) { return false } + if !(&l1.GlobalRateLimit).Equal(&l2.GlobalRateLimit) { + return false + } if !(&l1.Redirect).Equal(&l2.Redirect) { return false } diff --git a/rootfs/etc/nginx/lua/global_throttle.lua b/rootfs/etc/nginx/lua/global_throttle.lua new file mode 100644 index 00000000000..bea8cfd17ba --- /dev/null +++ b/rootfs/etc/nginx/lua/global_throttle.lua @@ -0,0 +1,131 @@ +local resty_global_throttle = require("resty.global_throttle") +local resty_ipmatcher = require("resty.ipmatcher") +local util = require("util") + +local ngx = ngx +local ngx_exit = ngx.exit +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_INFO = ngx.INFO + +local _M = {} + +local DECISION_CACHE = ngx.shared.global_throttle_cache + +-- it does not make sense to cache decision for too little time +-- the benefit of caching likely is negated if we cache for too little time +-- Lua Shared Dict's time resolution for expiry is 0.001. +local CACHE_THRESHOLD = 0.001 + +local DEFAULT_RAW_KEY = "remote_addr" + +local function should_ignore_request(ignored_cidrs) + if not ignored_cidrs or #ignored_cidrs == 0 then + return false + end + + local ignored_cidrs_matcher, err = resty_ipmatcher.new(ignored_cidrs) + if not ignored_cidrs_matcher then + ngx_log(ngx_ERR, "failed to initialize resty-ipmatcher: ", err) + return false + end + + local is_ignored + is_ignored, err = ignored_cidrs_matcher:match(ngx.var.remote_addr) + if err then + ngx_log(ngx_ERR, "failed to match ip: '", + ngx.var.remote_addr, "': ", err) + return false + end + + return is_ignored +end + +local function is_enabled(config, location_config) + if config.memcached.host == "" or config.memcached.port == 0 then + return false + end + if location_config.limit == 0 or + location_config.window_size == 0 then + return false + end + + if should_ignore_request(location_config.ignored_cidrs) then + return false + end + + return true +end + +local function get_namespaced_key_value(namespace, key_value) + return namespace .. key_value +end + +function _M.throttle(config, location_config) + if not is_enabled(config, location_config) then + return + end + + local key_value = util.generate_var_value(location_config.key) + if not key_value or key_value == "" then + key_value = ngx.var[DEFAULT_RAW_KEY] + end + + local namespaced_key_value = + get_namespaced_key_value(location_config.namespace, key_value) + + local is_limit_exceeding = DECISION_CACHE:get(namespaced_key_value) + if is_limit_exceeding then + ngx.var.global_rate_limit_exceeding = "c" + return ngx_exit(config.status_code) + end + + local my_throttle, err = resty_global_throttle.new( + location_config.namespace, + location_config.limit, + location_config.window_size, + { + provider = "memcached", + host = config.memcached.host, + port = config.memcached.port, + connect_timeout = config.memcached.connect_timeout, + max_idle_timeout = config.memcached.max_idle_timeout, + pool_size = config.memcached.pool_size, + } + ) + if err then + ngx.log(ngx.ERR, "faled to initialize resty_global_throttle: ", err) + -- fail open + return + end + + local desired_delay, estimated_final_count + estimated_final_count, desired_delay, err = my_throttle:process(key_value) + if err then + ngx.log(ngx.ERR, "error while processing key: ", err) + -- fail open + return + end + + if desired_delay then + if desired_delay > CACHE_THRESHOLD then + local ok + ok, err = + DECISION_CACHE:safe_add(namespaced_key_value, true, desired_delay) + if not ok then + if err ~= "exists" then + ngx_log(ngx_ERR, "failed to cache decision: ", err) + end + end + end + + ngx.var.global_rate_limit_exceeding = "y" + ngx_log(ngx_INFO, "limit is exceeding for ", + location_config.namespace, "/", key_value, + " with estimated_final_count: ", estimated_final_count) + + return ngx_exit(config.status_code) + end +end + +return _M diff --git a/rootfs/etc/nginx/lua/lua_ingress.lua b/rootfs/etc/nginx/lua/lua_ingress.lua index 3facb14c44d..90042fbd3ea 100644 --- a/rootfs/etc/nginx/lua/lua_ingress.lua +++ b/rootfs/etc/nginx/lua/lua_ingress.lua @@ -2,6 +2,7 @@ local ngx_re_split = require("ngx.re").split local certificate_configured_for_current_request = require("certificate").configured_for_current_request +local global_throttle = require("global_throttle") local ngx = ngx local io = io @@ -160,6 +161,8 @@ function _M.rewrite(location_config) return ngx_redirect(uri, config.http_redirect_code) end + + global_throttle.throttle(config.global_throttle, location_config.global_throttle) end function _M.header() diff --git a/rootfs/etc/nginx/lua/test/global_throttle_test.lua b/rootfs/etc/nginx/lua/test/global_throttle_test.lua new file mode 100644 index 00000000000..b8db740ade3 --- /dev/null +++ b/rootfs/etc/nginx/lua/test/global_throttle_test.lua @@ -0,0 +1,258 @@ +local util = require("util") + +local function assert_request_rejected(config, location_config, opts) + stub(ngx, "exit") + + local global_throttle = require_without_cache("global_throttle") + assert.has_no.errors(function() + global_throttle.throttle(config, location_config) + end) + + assert.stub(ngx.exit).was_called_with(config.status_code) + if opts.with_cache then + assert.are.same("c", ngx.var.global_rate_limit_exceeding) + else + assert.are.same("y", ngx.var.global_rate_limit_exceeding) + end +end + +local function assert_request_not_rejected(config, location_config) + stub(ngx, "exit") + local cache_safe_add_spy = spy.on(ngx.shared.global_throttle_cache, "safe_add") + + local global_throttle = require_without_cache("global_throttle") + assert.has_no.errors(function() + global_throttle.throttle(config, location_config) + end) + + assert.stub(ngx.exit).was_not_called() + assert.is_nil(ngx.var.global_rate_limit_exceeding) + assert.spy(cache_safe_add_spy).was_not_called() +end + +local function assert_short_circuits(f) + local cache_get_spy = spy.on(ngx.shared.global_throttle_cache, "get") + + local resty_global_throttle = require_without_cache("resty.global_throttle") + local resty_global_throttle_new_spy = spy.on(resty_global_throttle, "new") + + local global_throttle = require_without_cache("global_throttle") + + f(global_throttle) + + assert.spy(resty_global_throttle_new_spy).was_not_called() + assert.spy(cache_get_spy).was_not_called() +end + +local function assert_fails_open(config, location_config, ...) + stub(ngx, "exit") + stub(ngx, "log") + + local global_throttle = require_without_cache("global_throttle") + + assert.has_no.errors(function() + global_throttle.throttle(config, location_config) + end) + + assert.stub(ngx.exit).was_not_called() + assert.stub(ngx.log).was_called_with(ngx.ERR, ...) + assert.is_nil(ngx.var.global_rate_limit_exceeding) +end + +local function stub_resty_global_throttle_process(ret1, ret2, ret3, f) + local resty_global_throttle = require_without_cache("resty.global_throttle") + local resty_global_throttle_mock = { + process = function(self, key) return ret1, ret2, ret3 end + } + stub(resty_global_throttle, "new", resty_global_throttle_mock) + + f() + + assert.stub(resty_global_throttle.new).was_called() +end + +local function cache_rejection_decision(namespace, key_value, desired_delay) + local namespaced_key_value = namespace .. key_value + local ok, err = ngx.shared.global_throttle_cache:safe_add(namespaced_key_value, true, desired_delay) + assert.is_nil(err) + assert.is_true(ok) + assert.is_true(ngx.shared.global_throttle_cache:get(namespaced_key_value)) +end + +describe("global_throttle", function() + local snapshot + + local NAMESPACE = "31285d47b1504dcfbd6f12c46d769f6e" + local LOCATION_CONFIG = { + namespace = NAMESPACE, + limit = 10, + window_size = 60, + key = {}, + ignored_cidrs = {}, + } + local CONFIG = { + memcached = { + host = "memc.default.svc.cluster.local", port = 11211, + connect_timeout = 50, max_idle_timeout = 10000, pool_size = 50, + }, + status_code = 429, + } + + before_each(function() + snapshot = assert:snapshot() + + ngx.var = { remote_addr = "127.0.0.1", global_rate_limit_exceeding = nil } + end) + + after_each(function() + snapshot:revert() + + ngx.shared.global_throttle_cache:flush_all() + reset_ngx() + end) + + it("short circuits when memcached is not configured", function() + assert_short_circuits(function(global_throttle) + assert.has_no.errors(function() + global_throttle.throttle({ memcached = { host = "", port = 0 } }, LOCATION_CONFIG) + end) + end) + end) + + it("short circuits when limit or window_size is not configured", function() + assert_short_circuits(function(global_throttle) + local location_config_copy = util.deepcopy(LOCATION_CONFIG) + location_config_copy.limit = 0 + assert.has_no.errors(function() + global_throttle.throttle(CONFIG, location_config_copy) + end) + end) + + assert_short_circuits(function(global_throttle) + local location_config_copy = util.deepcopy(LOCATION_CONFIG) + location_config_copy.window_size = 0 + assert.has_no.errors(function() + global_throttle.throttle(CONFIG, location_config_copy) + end) + end) + end) + + it("short circuits when remote_addr is in ignored_cidrs", function() + local global_throttle = require_without_cache("global_throttle") + local location_config = util.deepcopy(LOCATION_CONFIG) + location_config.ignored_cidrs = { ngx.var.remote_addr } + assert_short_circuits(function(global_throttle) + assert.has_no.errors(function() + global_throttle.throttle(CONFIG, location_config) + end) + end) + end) + + it("rejects when exceeding limit has already been cached", function() + local key_value = "foo" + local location_config = util.deepcopy(LOCATION_CONFIG) + location_config.key = { { nil, nil, nil, key_value } } + cache_rejection_decision(NAMESPACE, key_value, 0.5) + + assert_request_rejected(CONFIG, location_config, { with_cache = true }) + end) + + describe("when resty_global_throttle fails", function() + it("fails open in case of initialization error", function() + local too_long_namespace = "" + for i=1,36,1 do + too_long_namespace = too_long_namespace .. "a" + end + + local location_config = util.deepcopy(LOCATION_CONFIG) + location_config.namespace = too_long_namespace + + assert_fails_open(CONFIG, location_config, "faled to initialize resty_global_throttle: ", "'namespace' can be at most 35 characters") + end) + + it("fails open in case of key processing error", function() + stub_resty_global_throttle_process(nil, nil, "failed to process", function() + assert_fails_open(CONFIG, LOCATION_CONFIG, "error while processing key: ", "failed to process") + end) + end) + end) + + it("initializes resty_global_throttle with the right parameters", function() + local resty_global_throttle = require_without_cache("resty.global_throttle") + local resty_global_throttle_original_new = resty_global_throttle.new + resty_global_throttle.new = function(namespace, limit, window_size, store_opts) + local o, err = resty_global_throttle_original_new(namespace, limit, window_size, store_opts) + if not o then + return nil, err + end + o.process = function(self, key) return 1, nil, nil end + + local expected = LOCATION_CONFIG + assert.are.same(expected.namespace, namespace) + assert.are.same(expected.limit, limit) + assert.are.same(expected.window_size, window_size) + + assert.are.same("memcached", store_opts.provider) + assert.are.same(CONFIG.memcached.host, store_opts.host) + assert.are.same(CONFIG.memcached.port, store_opts.port) + assert.are.same(CONFIG.memcached.connect_timeout, store_opts.connect_timeout) + assert.are.same(CONFIG.memcached.max_idle_timeout, store_opts.max_idle_timeout) + assert.are.same(CONFIG.memcached.pool_size, store_opts.pool_size) + + return o, nil + end + local resty_global_throttle_new_spy = spy.on(resty_global_throttle, "new") + + local global_throttle = require_without_cache("global_throttle") + + assert.has_no.errors(function() + global_throttle.throttle(CONFIG, LOCATION_CONFIG) + end) + + assert.spy(resty_global_throttle_new_spy).was_called() + end) + + it("rejects request and caches decision when limit is exceeding after processing a key", function() + local desired_delay = 0.015 + + stub_resty_global_throttle_process(LOCATION_CONFIG.limit + 1, desired_delay, nil, function() + assert_request_rejected(CONFIG, LOCATION_CONFIG, { with_cache = false }) + + local cache_key = LOCATION_CONFIG.namespace .. ngx.var.remote_addr + assert.is_true(ngx.shared.global_throttle_cache:get(cache_key)) + + -- we assume it won't take more than this after caching + -- until we execute the assertion below + local delta = 0.001 + local ttl = ngx.shared.global_throttle_cache:ttl(cache_key) + assert.is_true(ttl > desired_delay - delta) + assert.is_true(ttl <= desired_delay) + end) + end) + + it("rejects request and skip caching of decision when limit is exceeding after processing a key but desired delay is lower than the threshold", function() + local desired_delay = 0.0009 + + stub_resty_global_throttle_process(LOCATION_CONFIG.limit, desired_delay, nil, function() + assert_request_rejected(CONFIG, LOCATION_CONFIG, { with_cache = false }) + + local cache_key = LOCATION_CONFIG.namespace .. ngx.var.remote_addr + assert.is_nil(ngx.shared.global_throttle_cache:get(cache_key)) + end) + end) + + it("allows the request when limit is not exceeding after processing a key", function() + stub_resty_global_throttle_process(LOCATION_CONFIG.limit - 3, nil, nil, + function() + assert_request_not_rejected(CONFIG, LOCATION_CONFIG) + end + ) + end) + + it("rejects with custom status code", function() + cache_rejection_decision(NAMESPACE, ngx.var.remote_addr, 0.3) + local config = util.deepcopy(CONFIG) + config.status_code = 503 + assert_request_rejected(config, LOCATION_CONFIG, { with_cache = true }) + end) +end) diff --git a/rootfs/etc/nginx/template/nginx.tmpl b/rootfs/etc/nginx/template/nginx.tmpl index b37cfe7b403..9af7c9f5e91 100755 --- a/rootfs/etc/nginx/template/nginx.tmpl +++ b/rootfs/etc/nginx/template/nginx.tmpl @@ -1085,6 +1085,7 @@ stream { set $service_name {{ $ing.Service | quote }}; set $service_port {{ $ing.ServicePort | quote }}; set $location_path {{ $ing.Path | escapeLiteralDollar | quote }}; + set $global_rate_limit_exceeding n; {{ buildOpentracingForLocation $all.Cfg.EnableOpentracing $location }} diff --git a/test/e2e/annotations/globalratelimit.go b/test/e2e/annotations/globalratelimit.go new file mode 100644 index 00000000000..dd985c68c20 --- /dev/null +++ b/test/e2e/annotations/globalratelimit.go @@ -0,0 +1,83 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package annotations + +import ( + "fmt" + "net/http" + "strings" + + "github.com/onsi/ginkgo" + "github.com/stretchr/testify/assert" + + "k8s.io/ingress-nginx/test/e2e/framework" +) + +var _ = framework.DescribeAnnotation("annotation-global-rate-limit", func() { + f := framework.NewDefaultFramework("global-rate-limit") + host := "global-rate-limit-annotation" + + ginkgo.BeforeEach(func() { + f.NewEchoDeployment() + }) + + ginkgo.It("generates correct configuration", func() { + annotations := make(map[string]string) + annotations["nginx.ingress.kubernetes.io/global-rate-limit"] = "5" + annotations["nginx.ingress.kubernetes.io/global-rate-limit-window"] = "2m" + + ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations) + ing = f.EnsureIngress(ing) + namespace := strings.Replace(string(ing.UID), "-", "", -1) + + serverConfig := "" + f.WaitForNginxServer(host, func(server string) bool { + serverConfig = server + return true + }) + assert.Contains(ginkgo.GinkgoT(), serverConfig, + fmt.Sprintf(`global_throttle = { namespace = "%v", `+ + `limit = 5, window_size = 120, key = { { nil, nil, "remote_addr", nil, }, }, `+ + `ignored_cidrs = { } }`, + namespace)) + + f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK) + + ginkgo.By("regenerating the correct configuration after update") + annotations["nginx.ingress.kubernetes.io/global-rate-limit-key"] = "${remote_addr}${http_x_api_client}" + annotations["nginx.ingress.kubernetes.io/global-rate-limit-ignored-cidrs"] = "192.168.1.1, 234.234.234.0/24" + ing.SetAnnotations(annotations) + + f.WaitForReload(func() { + ing = f.UpdateIngress(ing) + }) + + serverConfig = "" + f.WaitForNginxServer(host, func(server string) bool { + serverConfig = server + return true + }) + assert.Contains(ginkgo.GinkgoT(), serverConfig, + fmt.Sprintf(`global_throttle = { namespace = "%v", `+ + `limit = 5, window_size = 120, `+ + `key = { { nil, "remote_addr", nil, nil, }, { nil, "http_x_api_client", nil, nil, }, }, `+ + `ignored_cidrs = { "192.168.1.1", "234.234.234.0/24", } }`, + namespace)) + + f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK) + }) +}) diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 8fb85e051c2..8f5fb3884ff 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -215,7 +215,8 @@ func (f *Framework) updateIngressNGINXPod() error { return err } -// WaitForNginxServer waits until the nginx configuration contains a particular server section +// WaitForNginxServer waits until the nginx configuration contains a particular server section. +// `cfg` passed to matcher is normalized by replacing all tabs and spaces with single space. func (f *Framework) WaitForNginxServer(name string, matcher func(cfg string) bool) { err := wait.Poll(Poll, DefaultTimeout, f.matchNginxConditions(name, matcher)) assert.Nil(ginkgo.GinkgoT(), err, "waiting for nginx server condition/s") @@ -223,6 +224,7 @@ func (f *Framework) WaitForNginxServer(name string, matcher func(cfg string) boo } // WaitForNginxConfiguration waits until the nginx configuration contains a particular configuration +// `cfg` passed to matcher is normalized by replacing all tabs and spaces with single space. func (f *Framework) WaitForNginxConfiguration(matcher func(cfg string) bool) { err := wait.Poll(Poll, DefaultTimeout, f.matchNginxConditions("", matcher)) assert.Nil(ginkgo.GinkgoT(), err, "waiting for nginx server condition/s") @@ -325,7 +327,7 @@ func (f *Framework) SetNginxConfigMapData(cmData map[string]string) { assert.Nil(ginkgo.GinkgoT(), err, "updating configuration configmap") } - f.waitForReload(fn) + f.WaitForReload(fn) } // CreateConfigMap creates a new configmap in the current namespace @@ -356,10 +358,12 @@ func (f *Framework) UpdateNginxConfigMapData(key string, value string) { assert.Nil(ginkgo.GinkgoT(), err, "updating configuration configmap") } - f.waitForReload(fn) + f.WaitForReload(fn) } -func (f *Framework) waitForReload(fn func()) { +// WaitForReload calls the passed function and +// asser it has caused at least 1 reload. +func (f *Framework) WaitForReload(fn func()) { initialReloadCount := getReloadCount(f.pod, f.Namespace, f.KubeClientSet) fn() diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index 7bff65b826c..3ca4a3c4652 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -76,7 +76,7 @@ func (f *Framework) EnsureIngress(ingress *networking.Ingress) *networking.Ingre assert.Nil(ginkgo.GinkgoT(), err, "creating ingress") } - f.waitForReload(fn) + f.WaitForReload(fn) ing := f.GetIngress(f.Namespace, ingress.Name) if ing.Annotations == nil { diff --git a/test/e2e/settings/globalratelimit.go b/test/e2e/settings/globalratelimit.go new file mode 100644 index 00000000000..409cd5d9c9c --- /dev/null +++ b/test/e2e/settings/globalratelimit.go @@ -0,0 +1,96 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package settings + +import ( + "fmt" + "net/http" + "strconv" + "strings" + + "github.com/onsi/ginkgo" + "github.com/stretchr/testify/assert" + "k8s.io/ingress-nginx/test/e2e/framework" +) + +var _ = framework.DescribeSetting("settings-global-rate-limit", func() { + f := framework.NewDefaultFramework("global-rate-limit") + host := "global-rate-limit" + + ginkgo.BeforeEach(func() { + f.NewEchoDeployment() + }) + + ginkgo.It("generates correct NGINX configuration", func() { + annotations := make(map[string]string) + ing := framework.NewSingleIngress(host, "/", host, f.Namespace, framework.EchoService, 80, annotations) + f.EnsureIngress(ing) + + ginkgo.By("generating correct defaults") + + ngxCfg := "" + f.WaitForNginxConfiguration(func(cfg string) bool { + if strings.Contains(cfg, "global_throttle") { + ngxCfg = cfg + return true + } + return false + }) + + assert.Contains(ginkgo.GinkgoT(), ngxCfg, fmt.Sprintf(`global_throttle = { `+ + `memcached = { host = "%v", port = %d, connect_timeout = %d, max_idle_timeout = %d, `+ + `pool_size = %d, }, status_code = %d, }`, + "", 11211, 50, 10000, 50, 429)) + + f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK) + + ginkgo.By("applying customizations") + + memcachedHost := "memc.default.svc.cluster.local" + memcachedPort := 11211 + memcachedConnectTimeout := 100 + memcachedMaxIdleTimeout := 5000 + memcachedPoolSize := 100 + statusCode := 503 + + f.SetNginxConfigMapData(map[string]string{ + "global-rate-limit-memcached-host": memcachedHost, + "global-rate-limit-memcached-port": strconv.Itoa(memcachedPort), + "global-rate-limit-memcached-connect-timeout": strconv.Itoa(memcachedConnectTimeout), + "global-rate-limit-memcached-max-idle-timeout": strconv.Itoa(memcachedMaxIdleTimeout), + "global-rate-limit-memcached-pool-size": strconv.Itoa(memcachedPoolSize), + "global-rate-limit-status-code": strconv.Itoa(statusCode), + }) + + ngxCfg = "" + f.WaitForNginxConfiguration(func(cfg string) bool { + if strings.Contains(cfg, "global_throttle") { + ngxCfg = cfg + return true + } + return false + }) + + assert.Contains(ginkgo.GinkgoT(), ngxCfg, fmt.Sprintf(`global_throttle = { `+ + `memcached = { host = "%v", port = %d, connect_timeout = %d, max_idle_timeout = %d, `+ + `pool_size = %d, }, status_code = %d, }`, + memcachedHost, memcachedPort, memcachedConnectTimeout, memcachedMaxIdleTimeout, + memcachedPoolSize, statusCode)) + + f.HTTPTestClient().GET("/").WithHeader("Host", host).Expect().Status(http.StatusOK) + }) +})