From 56de583cd2fd4ae461381a1a9a3fe0b4e4e1fb66 Mon Sep 17 00:00:00 2001 From: Kir Shatrov Date: Thu, 2 Feb 2023 18:34:33 +0100 Subject: [PATCH] Implement Consistent Hashing with Bounded Loads --- .../annotations/upstreamhashby/main.go | 10 +- internal/ingress/controller/controller.go | 1 + internal/ingress/defaults/main.go | 12 + pkg/apis/ingress/types.go | 7 +- rootfs/etc/nginx/lua/balancer.lua | 6 +- .../nginx/lua/balancer/chashboundedloads.lua | 257 +++++++++++++++ .../test/balancer/chashboundedloads_test.lua | 294 ++++++++++++++++++ .../nginx/lua/test/chashboundedloads_test.lua | 294 ++++++++++++++++++ rootfs/etc/nginx/lua/test/util_test.lua | 30 ++ rootfs/etc/nginx/lua/util.lua | 49 +++ 10 files changed, 952 insertions(+), 8 deletions(-) create mode 100644 rootfs/etc/nginx/lua/balancer/chashboundedloads.lua create mode 100644 rootfs/etc/nginx/lua/test/balancer/chashboundedloads_test.lua create mode 100644 rootfs/etc/nginx/lua/test/chashboundedloads_test.lua diff --git a/internal/ingress/annotations/upstreamhashby/main.go b/internal/ingress/annotations/upstreamhashby/main.go index e6bbca6c3300..911818d4a1ac 100644 --- a/internal/ingress/annotations/upstreamhashby/main.go +++ b/internal/ingress/annotations/upstreamhashby/main.go @@ -29,9 +29,10 @@ type upstreamhashby struct { // Config contains the Consistent hash configuration to be used in the Ingress type Config struct { - UpstreamHashBy string `json:"upstream-hash-by,omitempty"` - UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"` - UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"` + UpstreamHashBy string `json:"upstream-hash-by,omitempty"` + UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"` + UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"` + UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"` } // NewParser creates a new UpstreamHashBy annotation parser @@ -44,10 +45,11 @@ func (a upstreamhashby) Parse(ing *networking.Ingress) (interface{}, error) { upstreamHashBy, _ := parser.GetStringAnnotation("upstream-hash-by", ing) upstreamHashBySubset, _ := parser.GetBoolAnnotation("upstream-hash-by-subset", ing) upstreamHashbySubsetSize, _ := parser.GetIntAnnotation("upstream-hash-by-subset-size", ing) + upstreamHashByBalanceFactor, _ := parser.GetFloatAnnotation("upstream-hash-by-balance-factor", ing) if upstreamHashbySubsetSize == 0 { upstreamHashbySubsetSize = 3 } - return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize}, nil + return &Config{upstreamHashBy, upstreamHashBySubset, upstreamHashbySubsetSize, upstreamHashByBalanceFactor}, nil } diff --git a/internal/ingress/controller/controller.go b/internal/ingress/controller/controller.go index 5b22fd17a1a0..b907ace5124b 100644 --- a/internal/ingress/controller/controller.go +++ b/internal/ingress/controller/controller.go @@ -1023,6 +1023,7 @@ func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.B upstreams[name].UpstreamHashBy.UpstreamHashBy = anns.UpstreamHashBy.UpstreamHashBy upstreams[name].UpstreamHashBy.UpstreamHashBySubset = anns.UpstreamHashBy.UpstreamHashBySubset upstreams[name].UpstreamHashBy.UpstreamHashBySubsetSize = anns.UpstreamHashBy.UpstreamHashBySubsetSize + upstreams[name].UpstreamHashBy.UpstreamHashByBalanceFactor = anns.UpstreamHashBy.UpstreamHashByBalanceFactor upstreams[name].LoadBalancing = anns.LoadBalancing if upstreams[name].LoadBalancing == "" { diff --git a/internal/ingress/defaults/main.go b/internal/ingress/defaults/main.go index 0aab2ff47873..711017c356b0 100644 --- a/internal/ingress/defaults/main.go +++ b/internal/ingress/defaults/main.go @@ -132,6 +132,18 @@ type Backend struct { // Default 3 UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size"` + // Configures percentage of average cluster load to bound per upstream host. + // For example, with a value of 1.5 no upstream host will get a load more than 1.5x times + // the average load of all the hosts in the cluster. + // + // This is implemented based on the method described in the paper https://arxiv.org/abs/1608.01350 + // This is an O(N) algorithm, unlike other load balancers. + // Using a lower hash_balance_factor results in more hosts being probed, + // so use a higher value if you require better performance. + // + // Defaults to 2 (meaning a host might be overloaded 2x compared to average) + UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor"` + // Let's us choose a load balancing algorithm per ingress LoadBalancing string `json:"load-balance"` diff --git a/pkg/apis/ingress/types.go b/pkg/apis/ingress/types.go index 9395683ece9f..6ab58cbc90dd 100644 --- a/pkg/apis/ingress/types.go +++ b/pkg/apis/ingress/types.go @@ -168,9 +168,10 @@ type CookieSessionAffinity struct { // UpstreamHashByConfig described setting from the upstream-hash-by* annotations. type UpstreamHashByConfig struct { - UpstreamHashBy string `json:"upstream-hash-by,omitempty"` - UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"` - UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"` + UpstreamHashBy string `json:"upstream-hash-by,omitempty"` + UpstreamHashBySubset bool `json:"upstream-hash-by-subset,omitempty"` + UpstreamHashBySubsetSize int `json:"upstream-hash-by-subset-size,omitempty"` + UpstreamHashByBalanceFactor float32 `json:"upstream-hash-by-balance-factor,omitempty"` } // Endpoint describes a kubernetes endpoint in a backend diff --git a/rootfs/etc/nginx/lua/balancer.lua b/rootfs/etc/nginx/lua/balancer.lua index 00104c89d7f6..0d7492e725d4 100644 --- a/rootfs/etc/nginx/lua/balancer.lua +++ b/rootfs/etc/nginx/lua/balancer.lua @@ -6,6 +6,7 @@ local configuration = require("configuration") local round_robin = require("balancer.round_robin") local chash = require("balancer.chash") local chashsubset = require("balancer.chashsubset") +local chashboundedloads = require("balancer.chashboundedloads") local sticky_balanced = require("balancer.sticky_balanced") local sticky_persistent = require("balancer.sticky_persistent") local ewma = require("balancer.ewma") @@ -29,6 +30,7 @@ local IMPLEMENTATIONS = { round_robin = round_robin, chash = chash, chashsubset = chashsubset, + chashboundedloads = chashboundedloads, sticky_balanced = sticky_balanced, sticky_persistent = sticky_persistent, ewma = ewma, @@ -55,7 +57,9 @@ local function get_implementation(backend) elseif backend["upstreamHashByConfig"] and backend["upstreamHashByConfig"]["upstream-hash-by"] then - if backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then + if backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"] then + name = "chashboundedloads" + elseif backend["upstreamHashByConfig"]["upstream-hash-by-subset"] then name = "chashsubset" else name = "chash" diff --git a/rootfs/etc/nginx/lua/balancer/chashboundedloads.lua b/rootfs/etc/nginx/lua/balancer/chashboundedloads.lua new file mode 100644 index 000000000000..714aecb6991a --- /dev/null +++ b/rootfs/etc/nginx/lua/balancer/chashboundedloads.lua @@ -0,0 +1,257 @@ +-- Implements Consistent Hashing with Bounded Loads based on the paper [1]. +-- For the specified hash-balance-factor, requests to any upstream host are capped +-- at hash_balance_factor times the average number of requests across the cluster. +-- When a request arrives for an upstream host that is currently serving at its max capacity, +-- linear probing is used to identify the next eligible host. +-- +-- This is an O(N) algorithm, unlike other load balancers. Using a lower hash-balance-factor +-- results in more hosts being probed, so use a higher value if you require better performance. +-- +-- [1]: https://arxiv.org/abs/1608.01350 + +local resty_roundrobin = require("resty.roundrobin") +local resty_chash = require("resty.chash") +local setmetatable = setmetatable +local lrucache = require("resty.lrucache") + +local util = require("util") +local split = require("util.split") +local reverse_table = util.reverse_table + +local string_format = string.format +local INFO = ngx.INFO +local ngx_ERR = ngx.ERR +local ngx_WARN = ngx.WARN +local ngx_log = ngx.log +local math_ceil = math.ceil +local ipairs = ipairs +local ngx = ngx + +local DEFAULT_HASH_BALANCE_FACTOR = 2 + +local HOST_SEED = util.get_host_seed() + +-- Controls how many "tenants" we'll keep track of +-- to avoid routing them to alternative_backends +-- as they were already consistently routed to some endpoint. +-- Lowering this value will increases the chances of more +-- tenants being routed to alternative_backends. +-- Similarly, increasing this value will keep more tenants +-- consistently routed to the same endpoint in the main backend. +local SEEN_LRU_SIZE = 1000 + +local _M = {} + +local function incr_req_stats(self, endpoint) + if not self.requests_by_endpoint[endpoint] then + self.requests_by_endpoint[endpoint] = 1 + else + self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] + 1 + end + self.total_requests = self.total_requests + 1 +end + +local function decr_req_stats(self, endpoint) + if self.requests_by_endpoint[endpoint] then + self.requests_by_endpoint[endpoint] = self.requests_by_endpoint[endpoint] - 1 + if self.requests_by_endpoint[endpoint] == 0 then + self.requests_by_endpoint[endpoint] = nil + end + end + self.total_requests = self.total_requests - 1 +end + +local function get_hash_by_value(self) + if not ngx.ctx.chash_hash_by_value then + ngx.ctx.chash_hash_by_value = util.generate_var_value(self.hash_by) + end + + local v = ngx.ctx.chash_hash_by_value + if v == "" then + return nil + end + return v +end + +local function endpoint_eligible(self, endpoint) + -- (num_requests * hash-balance-factor / num_servers) + local allowed = math_ceil( + (self.total_requests + 1) * self.balance_factor / self.total_endpoints) + local current = self.requests_by_endpoint[endpoint] + if current == nil then + return true, 0, allowed + else + return current < allowed, current, allowed + end +end + +local function update_balance_factor(self, backend) + local balance_factor = backend["upstreamHashByConfig"]["upstream-hash-by-balance-factor"] + if balance_factor and balance_factor <= 1 then + ngx_log(ngx_WARN, + "upstream-hash-by-balance-factor must be > 1. Forcing it to the default value of ", + DEFAULT_HASH_BALANCE_FACTOR) + balance_factor = DEFAULT_HASH_BALANCE_FACTOR + end + self.balance_factor = balance_factor or DEFAULT_HASH_BALANCE_FACTOR +end + +local function normalize_endpoints(endpoints) + local b = {} + for i, endpoint in ipairs(endpoints) do + b[i] = string_format("%s:%s", endpoint.address, endpoint.port) + end + return b +end + +local function update_endpoints(self, endpoints) + self.endpoints = endpoints + self.endpoints_reverse = reverse_table(endpoints) + self.total_endpoints = #endpoints + self.ring_seed = util.array_mod(HOST_SEED, self.total_endpoints) +end + +function _M.is_affinitized(self) + -- alternative_backends might contain a canary backend that gets a percentage of traffic. + -- If a tenant has already been consistently routed to a endpoint, we want to stick to that + -- to keep a higher cache ratio, rather than routing it to an alternative backend. + -- This would mean that alternative backends (== canary) would mostly be seeing "new" tenants. + + if not self.alternative_backends or not self.alternative_backends[1] then + return false + end + + local hash_by_value = get_hash_by_value(self) + if not hash_by_value then + return false + end + + return self.seen_hash_by_values:get(hash_by_value) ~= nil +end + +function _M.new(self, backend) + local nodes = util.get_nodes(backend.endpoints) + + local complex_val, err = + util.parse_complex_value(backend["upstreamHashByConfig"]["upstream-hash-by"]) + if err ~= nil then + ngx_log(ngx_ERR, "could not parse the value of the upstream-hash-by: ", err) + end + + local o = { + name = "chashboundedloads", + + chash = resty_chash:new(nodes), + roundrobin = resty_roundrobin:new(nodes), + alternative_backends = backend.alternativeBackends, + hash_by = complex_val, + + requests_by_endpoint = {}, + total_requests = 0, + seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE) + } + + update_endpoints(o, normalize_endpoints(backend.endpoints)) + update_balance_factor(o, backend) + + setmetatable(o, self) + self.__index = self + return o +end + +function _M.sync(self, backend) + self.alternative_backends = backend.alternativeBackends + + update_balance_factor(self, backend) + + local new_endpoints = normalize_endpoints(backend.endpoints) + + if util.deep_compare(self.endpoints, new_endpoints) then + ngx_log(INFO, "endpoints did not change for backend", backend.name) + return + end + + ngx_log(INFO, string_format("[%s] endpoints have changed for backend %s", + self.name, backend.name)) + + update_endpoints(self, new_endpoints) + + local nodes = util.get_nodes(backend.endpoints) + self.chash:reinit(nodes) + self.roundrobin:reinit(nodes) + + self.seen_hash_by_values = lrucache.new(SEEN_LRU_SIZE) + + ngx_log(INFO, string_format("[%s] nodes have changed for backend %s", self.name, backend.name)) +end + +function _M.balance(self) + local hash_by_value = get_hash_by_value(self) + + -- Tenant key not available, falling back to round-robin + if not hash_by_value then + local endpoint = self.roundrobin:find() + ngx.var.chashbl_debug = "fallback_round_robin" + return endpoint + end + + self.seen_hash_by_values:set(hash_by_value, true) + + local tried_endpoints + if not ngx.ctx.balancer_chashbl_tried_endpoints then + tried_endpoints = {} + ngx.ctx.balancer_chashbl_tried_endpoints = tried_endpoints + else + tried_endpoints = ngx.ctx.balancer_chashbl_tried_endpoints + end + + local first_endpoint = self.chash:find(hash_by_value) + local index = self.endpoints_reverse[first_endpoint] + + -- By design, resty.chash always points to the same element of the ring, + -- regardless of the environment. In this algorithm, we want the consistency + -- to be "seeded" based on the host where it's running. + -- That's how both Envoy and Haproxy implement this. + -- For convenience, we keep resty.chash but manually introduce the seed. + index = util.array_mod(index + self.ring_seed, self.total_endpoints) + + for i=0, self.total_endpoints-1 do + local j = util.array_mod(index + i, self.total_endpoints) + local endpoint = self.endpoints[j] + + if not tried_endpoints[endpoint] then + local eligible, current, allowed = endpoint_eligible(self, endpoint) + + if eligible then + ngx.var.chashbl_debug = string_format( + "attempt=%d score=%d allowed=%d total_requests=%d hash_by_value=%s", + i, current, allowed, self.total_requests, hash_by_value) + + incr_req_stats(self, endpoint) + tried_endpoints[endpoint] = true + return endpoint + end + end + end + + -- Normally, this case should never be reach out because with balance_factor > 1 + -- there should always be an eligible endpoint. + -- This would get reached only if the number of endpoints is less or equal + -- than max Nginx retries and tried_endpoints contains all endpoints. + incr_req_stats(self, first_endpoint) + ngx.var.chashbl_debug = "fallback_first_endpoint" + return first_endpoint +end + +function _M.after_balance(self) + local tried_upstreams = split.split_upstream_var(ngx.var.upstream_addr) + if (not tried_upstreams) or (not get_hash_by_value(self)) then + return + end + + for _, addr in ipairs(tried_upstreams) do + decr_req_stats(self, addr) + end +end + +return _M diff --git a/rootfs/etc/nginx/lua/test/balancer/chashboundedloads_test.lua b/rootfs/etc/nginx/lua/test/balancer/chashboundedloads_test.lua new file mode 100644 index 000000000000..2d3aa3366db6 --- /dev/null +++ b/rootfs/etc/nginx/lua/test/balancer/chashboundedloads_test.lua @@ -0,0 +1,294 @@ +local original_ngx = ngx +local function reset_ngx() + _G.ngx = original_ngx +end + +describe("Balancer chashboundedloads", function() + local balancer_chashboundedloads, backend, instance + + local function endpoint_for_hash(instance, hash_by_value, offset) + if offset == nil then offset = 0 end + + local first_endpoint = instance.chash:find(hash_by_value) + local index = instance.endpoints_reverse[first_endpoint] + + index = util.array_mod(index + instance.ring_seed + offset, instance.total_endpoints) + return instance.endpoints[index] + end + + before_each(function() + util = require_without_cache("util") + util.get_hostname = function() + return "test-host" + end + + balancer_chashboundedloads = require_without_cache("balancer.chashboundedloads") + + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2, ["upstream-hash-by"] = "$request_uri" }, + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + + ngx.header = {} + ngx.req = { + get_uri_args = function() + return {} + end + } + + instance = balancer_chashboundedloads:new(backend) + end) + + after_each(function() + reset_ngx() + ngx.var = {} + ngx.ctx = {} + end) + + it("sets balance_factor", function() + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2.5, ["upstream-hash-by"] = "$request_uri" }, + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + + instance = balancer_chashboundedloads:new(backend) + assert.are.equals(2.5, instance.balance_factor) + end) + + it("does not allow balance_factor <= 1", function() + local new_backend = util.deepcopy(backend) + new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 1 + + instance = balancer_chashboundedloads:new(new_backend) + assert.are.equals(2, instance.balance_factor) + + new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 0.1 + instance = balancer_chashboundedloads:new(new_backend) + assert.are.equals(2, instance.balance_factor) + end) + + it("sets default balance factor", function() + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by"] = "$request_uri" }, + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + + instance = balancer_chashboundedloads:new(backend) + assert.are.equals(2, instance.balance_factor) + end) + + it("uses round-robin and does not touch counters when hash_by value is missing", function() + ngx.var = { request_uri = nil } + + instance.roundrobin = { + find = function(self) + return "some-round-robin-endpoint" + end + } + + local endpoint = instance:balance() + assert.are.equals("some-round-robin-endpoint", endpoint) + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + + instance:after_balance() + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + end) + + it("skips tried endpoint", function() + ngx.var = { request_uri = "/alma/armud" } + + local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud") + local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1) + + ngx.ctx.balancer_chashbl_tried_endpoints = {[expected_first_endpoint]=true} + local endpoint = instance:balance() + + assert.are.equals(expected_second_endpoint, endpoint) + + assert.are.same({[expected_second_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(1, instance.total_requests) + end) + + it("after_balance decrements all tried endpoints", function() + local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud") + local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1) + + instance.requests_by_endpoint[expected_first_endpoint] = 1 + instance.requests_by_endpoint[expected_second_endpoint] = 1 + instance.total_requests = 2 + + ngx.var = { request_uri = "/alma/armud", upstream_addr = expected_first_endpoint .. " : " .. expected_second_endpoint } + + instance:after_balance() + + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + end) + + it("spills over", function() + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + + local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud") + local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1) + + assert.are.equals(expected_first_endpoint, endpoint) + + assert.are.same({[expected_first_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(1, instance.total_requests) + + ngx.ctx.balancer_chashbl_tried_endpoints = nil + + local endpoint = instance:balance() + assert.are.equals(expected_first_endpoint, endpoint) + + assert.are.same({[expected_first_endpoint] = 2}, instance.requests_by_endpoint) + assert.are.equals(2, instance.total_requests) + + ngx.ctx.balancer_chashbl_tried_endpoints = nil + + local endpoint = instance:balance() + assert.are.equals(expected_second_endpoint, endpoint) + + assert.are.same({[expected_first_endpoint] = 2, [expected_second_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(3, instance.total_requests) + end) + + it("balances and keeps track of requests", function() + local expected_endpoint = endpoint_for_hash(instance, "/alma/armud") + + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + assert.are.equals(expected_endpoint, endpoint) + + assert.are.same({[expected_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(1, instance.total_requests) + + ngx.var = { upstream_addr = endpoint } + + instance:after_balance() + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + end) + + it("starts from the beginning of the ring if first_endpoints points to the end of ring", function() + instance.chash = { + find = function(self, key) + return "10.10.10.3:8080" + end + } + instance.requests_by_endpoint["10.10.10.3:8080"] = 2 + instance.total_requests = 2 + instance.ring_seed = 0 + + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + assert.are.equals("10.10.10.1:8080", endpoint) + end) + + it("balances to the first when all endpoints have identical load", function() + instance.requests_by_endpoint["10.10.10.1:8080"] = 2 + instance.requests_by_endpoint["10.10.10.2:8080"] = 2 + instance.requests_by_endpoint["10.10.10.3:8080"] = 2 + instance.total_requests = 6 + + local expected_endpoint = endpoint_for_hash(instance, "/alma/armud") + + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + assert.are.equals(expected_endpoint, endpoint) + end) + + describe("is_affinitized()", function() + it("returns false is alternative_backends is empty", function() + instance.alternative_backends = nil + assert.is_false(instance:is_affinitized()) + + instance.alternative_backends = {} + assert.is_false(instance:is_affinitized()) + end) + + it("returns false if tenant was not seen", function() + ngx.var = { request_uri = "/alma/armud" } + + instance.alternative_backends = {"omglol"} + assert.is_false(instance:is_affinitized()) + end) + + it("returns true if tenant was seen", function() + ngx.var = { request_uri = "/alma/armud" } + + instance.alternative_backends = {"omglol"} + instance.seen_hash_by_values:set("/alma/armud", true) + assert.is_true(instance:is_affinitized()) + end) + end) + + describe("sync()", function() + it("updates endpoints and total_endpoints", function() + local new_backend = util.deepcopy(backend) + new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }, + + assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080"}, instance.endpoints) + assert.are.equal(3, instance.total_endpoints) + assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3}, instance.endpoints_reverse) + instance:sync(new_backend) + + assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080", "10.10.10.4:8080"}, instance.endpoints) + assert.are.equal(4, instance.total_endpoints) + assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3, ["10.10.10.4:8080"] = 4}, instance.endpoints_reverse) + + local expected_seed = util.array_mod(util.hash_string(util.get_hostname()), instance.total_endpoints) + assert.are.equal(expected_seed, instance.ring_seed) + end) + + it("updates chash and roundrobin", function() + instance.roundrobin = { + reinit = function(self, nodes) + self.nodes = nodes + end + } + + instance.chash = { + reinit = function(self, nodes) + self.nodes = nodes + end + } + + local new_backend = util.deepcopy(backend) + new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }, + assert.are.equal(3, instance.total_endpoints) + + instance:sync(new_backend) + assert.are.equal(4, instance.total_endpoints) + assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 1,["10.10.10.4:8080"] = 1,["10.10.10.3:8080"] = 1}, instance.roundrobin.nodes) + assert.are.same(instance.roundrobin.nodes, instance.chash.nodes) + end) + + it("updates balance-factor", function() + local new_backend = util.deepcopy(backend) + new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 4 + + instance:sync(new_backend) + + assert.are.equal(4, instance.balance_factor) + end) + end) +end) diff --git a/rootfs/etc/nginx/lua/test/chashboundedloads_test.lua b/rootfs/etc/nginx/lua/test/chashboundedloads_test.lua new file mode 100644 index 000000000000..2d3aa3366db6 --- /dev/null +++ b/rootfs/etc/nginx/lua/test/chashboundedloads_test.lua @@ -0,0 +1,294 @@ +local original_ngx = ngx +local function reset_ngx() + _G.ngx = original_ngx +end + +describe("Balancer chashboundedloads", function() + local balancer_chashboundedloads, backend, instance + + local function endpoint_for_hash(instance, hash_by_value, offset) + if offset == nil then offset = 0 end + + local first_endpoint = instance.chash:find(hash_by_value) + local index = instance.endpoints_reverse[first_endpoint] + + index = util.array_mod(index + instance.ring_seed + offset, instance.total_endpoints) + return instance.endpoints[index] + end + + before_each(function() + util = require_without_cache("util") + util.get_hostname = function() + return "test-host" + end + + balancer_chashboundedloads = require_without_cache("balancer.chashboundedloads") + + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2, ["upstream-hash-by"] = "$request_uri" }, + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + + ngx.header = {} + ngx.req = { + get_uri_args = function() + return {} + end + } + + instance = balancer_chashboundedloads:new(backend) + end) + + after_each(function() + reset_ngx() + ngx.var = {} + ngx.ctx = {} + end) + + it("sets balance_factor", function() + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by-balance-factor"] = 2.5, ["upstream-hash-by"] = "$request_uri" }, + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + + instance = balancer_chashboundedloads:new(backend) + assert.are.equals(2.5, instance.balance_factor) + end) + + it("does not allow balance_factor <= 1", function() + local new_backend = util.deepcopy(backend) + new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 1 + + instance = balancer_chashboundedloads:new(new_backend) + assert.are.equals(2, instance.balance_factor) + + new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 0.1 + instance = balancer_chashboundedloads:new(new_backend) + assert.are.equals(2, instance.balance_factor) + end) + + it("sets default balance factor", function() + backend = { + name = "namespace-service-port", ["load-balance"] = "ewma", + upstreamHashByConfig = { ["upstream-hash-by"] = "$request_uri", ["upstream-hash-by"] = "$request_uri" }, + endpoints = { + { address = "10.10.10.1", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.2", port = "8080", maxFails = 0, failTimeout = 0 }, + { address = "10.10.10.3", port = "8080", maxFails = 0, failTimeout = 0 }, + } + } + + instance = balancer_chashboundedloads:new(backend) + assert.are.equals(2, instance.balance_factor) + end) + + it("uses round-robin and does not touch counters when hash_by value is missing", function() + ngx.var = { request_uri = nil } + + instance.roundrobin = { + find = function(self) + return "some-round-robin-endpoint" + end + } + + local endpoint = instance:balance() + assert.are.equals("some-round-robin-endpoint", endpoint) + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + + instance:after_balance() + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + end) + + it("skips tried endpoint", function() + ngx.var = { request_uri = "/alma/armud" } + + local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud") + local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1) + + ngx.ctx.balancer_chashbl_tried_endpoints = {[expected_first_endpoint]=true} + local endpoint = instance:balance() + + assert.are.equals(expected_second_endpoint, endpoint) + + assert.are.same({[expected_second_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(1, instance.total_requests) + end) + + it("after_balance decrements all tried endpoints", function() + local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud") + local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1) + + instance.requests_by_endpoint[expected_first_endpoint] = 1 + instance.requests_by_endpoint[expected_second_endpoint] = 1 + instance.total_requests = 2 + + ngx.var = { request_uri = "/alma/armud", upstream_addr = expected_first_endpoint .. " : " .. expected_second_endpoint } + + instance:after_balance() + + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + end) + + it("spills over", function() + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + + local expected_first_endpoint = endpoint_for_hash(instance, "/alma/armud") + local expected_second_endpoint = endpoint_for_hash(instance, "/alma/armud", 1) + + assert.are.equals(expected_first_endpoint, endpoint) + + assert.are.same({[expected_first_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(1, instance.total_requests) + + ngx.ctx.balancer_chashbl_tried_endpoints = nil + + local endpoint = instance:balance() + assert.are.equals(expected_first_endpoint, endpoint) + + assert.are.same({[expected_first_endpoint] = 2}, instance.requests_by_endpoint) + assert.are.equals(2, instance.total_requests) + + ngx.ctx.balancer_chashbl_tried_endpoints = nil + + local endpoint = instance:balance() + assert.are.equals(expected_second_endpoint, endpoint) + + assert.are.same({[expected_first_endpoint] = 2, [expected_second_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(3, instance.total_requests) + end) + + it("balances and keeps track of requests", function() + local expected_endpoint = endpoint_for_hash(instance, "/alma/armud") + + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + assert.are.equals(expected_endpoint, endpoint) + + assert.are.same({[expected_endpoint] = 1}, instance.requests_by_endpoint) + assert.are.equals(1, instance.total_requests) + + ngx.var = { upstream_addr = endpoint } + + instance:after_balance() + assert.are.same({}, instance.requests_by_endpoint) + assert.are.equals(0, instance.total_requests) + end) + + it("starts from the beginning of the ring if first_endpoints points to the end of ring", function() + instance.chash = { + find = function(self, key) + return "10.10.10.3:8080" + end + } + instance.requests_by_endpoint["10.10.10.3:8080"] = 2 + instance.total_requests = 2 + instance.ring_seed = 0 + + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + assert.are.equals("10.10.10.1:8080", endpoint) + end) + + it("balances to the first when all endpoints have identical load", function() + instance.requests_by_endpoint["10.10.10.1:8080"] = 2 + instance.requests_by_endpoint["10.10.10.2:8080"] = 2 + instance.requests_by_endpoint["10.10.10.3:8080"] = 2 + instance.total_requests = 6 + + local expected_endpoint = endpoint_for_hash(instance, "/alma/armud") + + ngx.var = { request_uri = "/alma/armud" } + local endpoint = instance:balance() + assert.are.equals(expected_endpoint, endpoint) + end) + + describe("is_affinitized()", function() + it("returns false is alternative_backends is empty", function() + instance.alternative_backends = nil + assert.is_false(instance:is_affinitized()) + + instance.alternative_backends = {} + assert.is_false(instance:is_affinitized()) + end) + + it("returns false if tenant was not seen", function() + ngx.var = { request_uri = "/alma/armud" } + + instance.alternative_backends = {"omglol"} + assert.is_false(instance:is_affinitized()) + end) + + it("returns true if tenant was seen", function() + ngx.var = { request_uri = "/alma/armud" } + + instance.alternative_backends = {"omglol"} + instance.seen_hash_by_values:set("/alma/armud", true) + assert.is_true(instance:is_affinitized()) + end) + end) + + describe("sync()", function() + it("updates endpoints and total_endpoints", function() + local new_backend = util.deepcopy(backend) + new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }, + + assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080"}, instance.endpoints) + assert.are.equal(3, instance.total_endpoints) + assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3}, instance.endpoints_reverse) + instance:sync(new_backend) + + assert.are.same({"10.10.10.1:8080", "10.10.10.2:8080", "10.10.10.3:8080", "10.10.10.4:8080"}, instance.endpoints) + assert.are.equal(4, instance.total_endpoints) + assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 2, ["10.10.10.3:8080"] = 3, ["10.10.10.4:8080"] = 4}, instance.endpoints_reverse) + + local expected_seed = util.array_mod(util.hash_string(util.get_hostname()), instance.total_endpoints) + assert.are.equal(expected_seed, instance.ring_seed) + end) + + it("updates chash and roundrobin", function() + instance.roundrobin = { + reinit = function(self, nodes) + self.nodes = nodes + end + } + + instance.chash = { + reinit = function(self, nodes) + self.nodes = nodes + end + } + + local new_backend = util.deepcopy(backend) + new_backend.endpoints[4] = { address = "10.10.10.4", port = "8080", maxFails = 0, failTimeout = 0 }, + assert.are.equal(3, instance.total_endpoints) + + instance:sync(new_backend) + assert.are.equal(4, instance.total_endpoints) + assert.are.same({["10.10.10.1:8080"] = 1,["10.10.10.2:8080"] = 1,["10.10.10.4:8080"] = 1,["10.10.10.3:8080"] = 1}, instance.roundrobin.nodes) + assert.are.same(instance.roundrobin.nodes, instance.chash.nodes) + end) + + it("updates balance-factor", function() + local new_backend = util.deepcopy(backend) + new_backend.upstreamHashByConfig["upstream-hash-by-balance-factor"] = 4 + + instance:sync(new_backend) + + assert.are.equal(4, instance.balance_factor) + end) + end) +end) diff --git a/rootfs/etc/nginx/lua/test/util_test.lua b/rootfs/etc/nginx/lua/test/util_test.lua index 1aca67fa18bb..68d4d5411827 100644 --- a/rootfs/etc/nginx/lua/test/util_test.lua +++ b/rootfs/etc/nginx/lua/test/util_test.lua @@ -102,4 +102,34 @@ describe("utility", function() assert.are.same({ "10.10.10.2:8080" }, removed) end) end) + + describe("hash_string", function() + it("returns same seed for same string", function() + local seed1 = util.hash_string("test") + local seed2 = util.hash_string("test") + + assert.are.same(seed1, seed2) + end) + + it("does not overflow on a typical hostname", function() + local seed = util.hash_string("nginx-ingress-controller-6f7f95d6c-kvlvj") + assert.is_true(seed < 2^31) + end) + + it("returns different seed for different string", function() + local seed1 = util.hash_string("test") + local seed2 = util.hash_string("test2") + + assert.are_not.same(seed1, seed2) + end) + end) + + describe("get_host_seed", function() + it("returns same seed for subsequent calls", function() + local seed1 = util.get_host_seed() + local seed2 = util.get_host_seed() + + assert.are.same(seed1, seed2) + end) + end) end) diff --git a/rootfs/etc/nginx/lua/util.lua b/rootfs/etc/nginx/lua/util.lua index 7389f322669f..a2e1f0d091d3 100644 --- a/rootfs/etc/nginx/lua/util.lua +++ b/rootfs/etc/nginx/lua/util.lua @@ -92,6 +92,14 @@ local function normalize_endpoints(endpoints) return normalized_endpoints end +function _M.reverse_table(t) + local b = {} + for k, v in ipairs(t) do + b[v] = k + end + return b +end + -- diff_endpoints compares old and new -- and as a first argument returns what endpoints are in new -- but are not in old, and as a second argument it returns @@ -180,4 +188,45 @@ local function replace_special_char(str, a, b) end _M.replace_special_char = replace_special_char +local function get_hostname() + local f = io.popen("/bin/hostname") + + if f ~= nil then + local h = f:read("*a") or "" + h = string.gsub(h, "[\n]", "") + f:close() + return h + else + return "unknown" + end +end + +_M.get_hostname = get_hostname + +local MAX_HASH_NUM = 2^31-1 +local function hash_string(str) + local hash = 0 + for i = 1, string.len(str) do + hash = 31 * hash + string.byte(str, i) + if hash > MAX_HASH_NUM then + hash = hash % MAX_HASH_NUM + end + end + return hash +end + +_M.hash_string = hash_string + +local function get_host_seed() + return hash_string(_M.get_hostname()) +end + +_M.get_host_seed = get_host_seed + +local function array_mod(i, max) + return (i - 1) % max + 1 +end + +_M.array_mod = array_mod + return _M