From c92a1ed4a24dd2d812013c4fde4907163deeaace Mon Sep 17 00:00:00 2001 From: = <=> Date: Tue, 19 Mar 2024 18:00:18 +0000 Subject: [PATCH] Refactor service load balancer to support different strategies This is a change to add the "power of two random choices" load balancing strategy to the service's load balancer. This change does not actually start using the new strategy, and is backwards incompatable, but intended to start a design-review. To fully add support, many tests will need to be updated, so this change is meant to highlight the core changes to support the new strategy. Currently the only load balancing strategy available within traefik is (weighted) round robin. While this is a good strategy in general, it can be problematic if one or more of the backends is getting overloaded. There are many load balancing strategies, but the "power-of-two-random-choices" algorithm has some good properties that make it a good general use algorithm. Specifically some of the benefits include: - reducing the total number of requests to slower backends - constant time backend picking (in the general case) - reduced "herd behaviour" compared to e.g. "least connections" load balancing. The algorithm works by taking two backends at random, and choosing the backend that has the fewest in-flight requests. In order to do this, we have to track the number of in-flight requests each backend is currently processing. The aim of this change is to demonstrate that this new load balancing strategy can be added with minimal changes, and reusing a lot of the existing load balancing code by factoring out the explicit strategy into an interface. In order to do this, the wrr package was removed, and the existing LoadBalancer was moved to the parent directory: the loadbalancer package. There are many strategies that can be used for load balancing, many of which require "extrinsic" information, such as the CPU load on the backends. This change is not meant to open the door for adding such strategies, but attempts to add an effective load balancing strategy with low cost to the codebase. --- .../loadbalancer/{wrr/wrr.go => balancer.go} | 134 ++++---- .../{wrr/wrr_test.go => balancer_test.go} | 289 +++++++++++++----- pkg/server/service/loadbalancer/p2c.go | 105 +++++++ pkg/server/service/loadbalancer/p2c_test.go | 83 +++++ pkg/server/service/loadbalancer/wrr.go | 85 ++++++ pkg/server/service/service.go | 6 +- 6 files changed, 559 insertions(+), 143 deletions(-) rename pkg/server/service/loadbalancer/{wrr/wrr.go => balancer.go} (67%) rename pkg/server/service/loadbalancer/{wrr/wrr_test.go => balancer_test.go} (57%) create mode 100644 pkg/server/service/loadbalancer/p2c.go create mode 100644 pkg/server/service/loadbalancer/p2c_test.go create mode 100644 pkg/server/service/loadbalancer/wrr.go diff --git a/pkg/server/service/loadbalancer/wrr/wrr.go b/pkg/server/service/loadbalancer/balancer.go similarity index 67% rename from pkg/server/service/loadbalancer/wrr/wrr.go rename to pkg/server/service/loadbalancer/balancer.go index 6aa04c8ece..5b30686afb 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr.go +++ b/pkg/server/service/loadbalancer/balancer.go @@ -1,13 +1,13 @@ -package wrr +package loadbalancer import ( - "container/heap" "context" "errors" "hash/fnv" "net/http" "strconv" "sync" + "sync/atomic" "github.com/rs/zerolog/log" "github.com/traefik/traefik/v3/pkg/config/dynamic" @@ -18,6 +18,13 @@ type namedHandler struct { name string weight float64 deadline float64 + inflight atomic.Int64 +} + +func (h *namedHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + h.inflight.Add(1) + defer h.inflight.Add(-1) + h.Handler.ServeHTTP(w, req) } type stickyCookie struct { @@ -41,36 +48,74 @@ func convertSameSite(sameSite string) http.SameSite { } } -// Balancer is a WeightedRoundRobin load balancer based on Earliest Deadline First (EDF). -// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) -// Each pick from the schedule has the earliest deadline entry selected. -// Entries have deadlines set at currentDeadline + 1 / weight, -// providing weighted round-robin behavior with floating point weights and an O(log n) pick time. +// strategy is an interface that can be used to implement different load balancing strategies +// for the Balancer +type strategy interface { + // nextServer returns the next server to serve a request, this is called under the handlersMu lock. + // Each pick from the schedule has the earliest deadline entry selected. The status param is a + // map of the currently healthy child services. + nextServer(status map[string]struct{}) *namedHandler + // Entries have deadlines set at currentDeadline + 1 / weight, + // add adds a handler to the balancing algorithm, this is called under the handlersMu lock. + // providing weighted round-robin behavior with floating point weights and an O(log n) pick time. + add(h *namedHandler) + + setUp(name string, up bool) + + name() string + len() int +} + type Balancer struct { stickyCookie *stickyCookie wantsHealthCheck bool handlersMu sync.RWMutex - // References all the handlers by name and also by the hashed value of the name. - handlerMap map[string]*namedHandler - handlers []*namedHandler - curDeadline float64 - // status is a record of which child services of the Balancer are healthy, keyed - // by name of child service. A service is initially added to the map when it is - // created via Add, and it is later removed or added to the map as needed, - // through the SetStatus method. + // References all the handlers by name and also by the hashed value of the + // name. + handlerMap map[string]*namedHandler + // status is a record of which child services of the Balancer are healthy, + // keyed by name of child service. A service is initially added to the map + // when it is created via Add, and it is later removed or added to the map + // as needed, through the SetStatus method. status map[string]struct{} + + // strategy references the load balancing strategy to be used. The add and + // nextServer method must be called under the handlersMu lock + strategy strategy + // updaters is the list of hooks that are run (to update the Balancer // parent(s)), whenever the Balancer status changes. updaters []func(bool) } -// New creates a new load balancer. -func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { +// NewWRR creates a WeightedRoundRobin load balancer based on Earliest Deadline +// First (EDF). +// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) +// Each pick from the schedule has the earliest deadline entry selected. +// Entries have deadlines set at currentDeadline + 1 / weight, +// providing weighted round-robin behavior with floating point weights and an +// O(log n) pick time. +func NewWRR(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { + return newBalancer(sticky, wantHealthCheck, newStrategyWRR()) +} + +// NewP2C creates a "the power-of-two-random-choices" algorithm for load +// balancing. The idea of this is two take two of the backends at random from +// the available backends, and select the backend that has the fewest in-flight +// requests. This is constant time when picking, and has more beneficial "herd" +// behaviour than the "fewest connections" algorithm. +func NewP2C(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { + return newBalancer(sticky, wantHealthCheck, newStrategyP2C()) +} + +func newBalancer(sticky *dynamic.Sticky, wantHealthCheck bool, strategy strategy) *Balancer { + balancer := &Balancer{ status: make(map[string]struct{}), handlerMap: make(map[string]*namedHandler), wantsHealthCheck: wantHealthCheck, + strategy: strategy, } if sticky != nil && sticky.Cookie != nil { balancer.stickyCookie = &stickyCookie{ @@ -85,37 +130,6 @@ func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { return balancer } -// Len implements heap.Interface/sort.Interface. -func (b *Balancer) Len() int { return len(b.handlers) } - -// Less implements heap.Interface/sort.Interface. -func (b *Balancer) Less(i, j int) bool { - return b.handlers[i].deadline < b.handlers[j].deadline -} - -// Swap implements heap.Interface/sort.Interface. -func (b *Balancer) Swap(i, j int) { - b.handlers[i], b.handlers[j] = b.handlers[j], b.handlers[i] -} - -// Push implements heap.Interface for pushing an item into the heap. -func (b *Balancer) Push(x interface{}) { - h, ok := x.(*namedHandler) - if !ok { - return - } - - b.handlers = append(b.handlers, h) -} - -// Pop implements heap.Interface for popping an item from the heap. -// It panics if b.Len() < 1. -func (b *Balancer) Pop() interface{} { - h := b.handlers[len(b.handlers)-1] - b.handlers = b.handlers[0 : len(b.handlers)-1] - return h -} - // SetStatus sets on the balancer that its given child is now of the given // status. balancerName is only needed for logging purposes. func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) { @@ -137,6 +151,8 @@ func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) { delete(b.status, childName) } + b.strategy.setUp(childName, up) + upAfter := len(b.status) > 0 status = "DOWN" if upAfter { @@ -174,26 +190,13 @@ func (b *Balancer) nextServer() (*namedHandler, error) { b.handlersMu.Lock() defer b.handlersMu.Unlock() - if len(b.handlers) == 0 || len(b.status) == 0 { + if b.strategy.len() == 0 || len(b.status) == 0 { return nil, errNoAvailableServer } - var handler *namedHandler - for { - // Pick handler with closest deadline. - handler = heap.Pop(b).(*namedHandler) - - // curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones. - b.curDeadline = handler.deadline - handler.deadline += 1 / handler.weight - - heap.Push(b, handler) - if _, ok := b.status[handler.name]; ok { - break - } - } + handler := b.strategy.nextServer(b.status) - log.Debug().Msgf("Service selected by WRR: %s", handler.name) + log.Debug().Msgf("Service selected by strategy %q: %s", b.strategy.name(), handler.name) return handler, nil } @@ -263,8 +266,7 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int) { h := &namedHandler{Handler: handler, name: name, weight: float64(w)} b.handlersMu.Lock() - h.deadline = b.curDeadline + 1/h.weight - heap.Push(b, h) + b.strategy.add(h) b.status[name] = struct{}{} b.handlerMap[name] = h b.handlerMap[hash(name)] = h diff --git a/pkg/server/service/loadbalancer/wrr/wrr_test.go b/pkg/server/service/loadbalancer/balancer_test.go similarity index 57% rename from pkg/server/service/loadbalancer/wrr/wrr_test.go rename to pkg/server/service/loadbalancer/balancer_test.go index 3708fa618b..552736451e 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr_test.go +++ b/pkg/server/service/loadbalancer/balancer_test.go @@ -1,4 +1,4 @@ -package wrr +package loadbalancer import ( "context" @@ -11,7 +11,7 @@ import ( ) func TestBalancer(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -33,7 +33,7 @@ func TestBalancer(t *testing.T) { } func TestBalancerNoService(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) recorder := httptest.NewRecorder() balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) @@ -42,7 +42,7 @@ func TestBalancerNoService(t *testing.T) { } func TestBalancerOneServerZeroWeight(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -64,7 +64,7 @@ type key string const serviceName key = "serviceName" func TestBalancerNoServiceUp(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(http.StatusInternalServerError) @@ -84,7 +84,7 @@ func TestBalancerNoServiceUp(t *testing.T) { } func TestBalancerOneServerDown(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -105,7 +105,7 @@ func TestBalancerOneServerDown(t *testing.T) { } func TestBalancerDownThenUp(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -134,7 +134,7 @@ func TestBalancerDownThenUp(t *testing.T) { } func TestBalancerPropagate(t *testing.T) { - balancer1 := New(nil, true) + balancer1 := NewWRR(nil, true) balancer1.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -145,7 +145,7 @@ func TestBalancerPropagate(t *testing.T) { rw.WriteHeader(http.StatusOK) }), Int(1)) - balancer2 := New(nil, true) + balancer2 := NewWRR(nil, true) balancer2.Add("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "third") rw.WriteHeader(http.StatusOK) @@ -155,7 +155,7 @@ func TestBalancerPropagate(t *testing.T) { rw.WriteHeader(http.StatusOK) }), Int(1)) - topBalancer := New(nil, true) + topBalancer := NewWRR(nil, true) topBalancer.Add("balancer1", balancer1, Int(1)) _ = balancer1.RegisterStatusUpdater(func(up bool) { topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up) @@ -206,8 +206,8 @@ func TestBalancerPropagate(t *testing.T) { assert.Equal(t, wantStatus, recorder.status) } -func TestBalancerAllServersZeroWeight(t *testing.T) { - balancer := New(nil, false) +func TestWRRBalancerAllServersZeroWeight(t *testing.T) { + balancer := NewWRR(nil, false) balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0)) balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0)) @@ -219,85 +219,113 @@ func TestBalancerAllServersZeroWeight(t *testing.T) { } func TestSticky(t *testing.T) { - balancer := New(&dynamic.Sticky{ - Cookie: &dynamic.Cookie{ - Name: "test", - Secure: true, - HTTPOnly: true, - SameSite: "none", - MaxAge: 42, - }, - }, false) - - balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "first") - rw.WriteHeader(http.StatusOK) - }), Int(1)) - - balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "second") - rw.WriteHeader(http.StatusOK) - }), Int(2)) - - recorder := &responseRecorder{ - ResponseRecorder: httptest.NewRecorder(), - save: map[string]int{}, - cookies: make(map[string]*http.Cookie), + balancers := []*Balancer{ + NewWRR(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{ + Name: "test", + Secure: true, + HTTPOnly: true, + SameSite: "none", + MaxAge: 42, + }, + }, false), + NewP2C(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{ + Name: "test", + Secure: true, + HTTPOnly: true, + SameSite: "none", + MaxAge: 42, + }, + }, false), } - req := httptest.NewRequest(http.MethodGet, "/", nil) - for i := 0; i < 3; i++ { - for _, cookie := range recorder.Result().Cookies() { - assert.NotContains(t, "test=first", cookie.Value) - assert.NotContains(t, "test=second", cookie.Value) - req.AddCookie(cookie) - } - recorder.ResponseRecorder = httptest.NewRecorder() - - balancer.ServeHTTP(recorder, req) + // we need to make sure second is chosen + balancers[1].strategy.(*strategyPowerOfTwoChoices).rand = &mockRand{vals: []int{1, 0}} + + for _, balancer := range balancers { + t.Run(balancer.strategy.name(), func(t *testing.T) { + + balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(2)) + + recorder := &responseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + save: map[string]int{}, + cookies: make(map[string]*http.Cookie), + } + + req := httptest.NewRequest(http.MethodGet, "/", nil) + for i := 0; i < 3; i++ { + for _, cookie := range recorder.Result().Cookies() { + assert.NotContains(t, "test=first", cookie.Value) + assert.NotContains(t, "test=second", cookie.Value) + req.AddCookie(cookie) + } + recorder.ResponseRecorder = httptest.NewRecorder() + + balancer.ServeHTTP(recorder, req) + } + + assert.Equal(t, 0, recorder.save["first"]) + assert.Equal(t, 3, recorder.save["second"]) + assert.True(t, recorder.cookies["test"].HttpOnly) + assert.True(t, recorder.cookies["test"].Secure) + assert.Equal(t, http.SameSiteNoneMode, recorder.cookies["test"].SameSite) + assert.Equal(t, 42, recorder.cookies["test"].MaxAge) + }) } - - assert.Equal(t, 0, recorder.save["first"]) - assert.Equal(t, 3, recorder.save["second"]) - assert.True(t, recorder.cookies["test"].HttpOnly) - assert.True(t, recorder.cookies["test"].Secure) - assert.Equal(t, http.SameSiteNoneMode, recorder.cookies["test"].SameSite) - assert.Equal(t, 42, recorder.cookies["test"].MaxAge) } func TestSticky_FallBack(t *testing.T) { - balancer := New(&dynamic.Sticky{ - Cookie: &dynamic.Cookie{Name: "test"}, - }, false) + balancers := []*Balancer{ + NewWRR(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{Name: "test"}, + }, false), + NewP2C(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{Name: "test"}, + }, false), + } - balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "first") - rw.WriteHeader(http.StatusOK) - }), Int(1)) + for _, balancer := range balancers { + t.Run(balancer.strategy.name(), func(t *testing.T) { + balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) - balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "second") - rw.WriteHeader(http.StatusOK) - }), Int(2)) + balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(2)) - recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} - req := httptest.NewRequest(http.MethodGet, "/", nil) - req.AddCookie(&http.Cookie{Name: "test", Value: "second"}) - for i := 0; i < 3; i++ { - recorder.ResponseRecorder = httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.AddCookie(&http.Cookie{Name: "test", Value: "second"}) + for i := 0; i < 3; i++ { + recorder.ResponseRecorder = httptest.NewRecorder() - balancer.ServeHTTP(recorder, req) - } + balancer.ServeHTTP(recorder, req) + } - assert.Equal(t, 0, recorder.save["first"]) - assert.Equal(t, 3, recorder.save["second"]) + assert.Equal(t, 0, recorder.save["first"]) + assert.Equal(t, 3, recorder.save["second"]) + }) + } } // TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start, // and that it does not "over-favor" the high-weighted ones with a biased start-up regime. func TestBalancerBias(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "A") @@ -339,3 +367,116 @@ func (r *responseRecorder) WriteHeader(statusCode int) { } r.ResponseRecorder.WriteHeader(statusCode) } + +func testHandler(name string, weight float64, inflight int) *namedHandler { + h := &namedHandler{ + name: name, + } + h.inflight.Store(int64(inflight)) + h.weight = weight + h.Handler = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Set("server", name) + rw.WriteHeader(http.StatusOK) + }) + return h +} + +func TestStrategies(t *testing.T) { + newStrategies := []func() strategy{ + newStrategyWRR, + newStrategyP2C, + } + + for _, s := range newStrategies { + t.Run(s().name(), testStrategy(s)) + } +} + +func testStrategy(newStrategy func() strategy) func(t *testing.T) { + return func(t *testing.T) { + t.Run("OneHealthyBackend", testStrategyOneHealthyBackend(newStrategy())) + t.Run("TwoHealthyBackends", testStrategyTwoHealthyBackends(newStrategy())) + t.Run("OneHealthyOneUnhealthy", testStrategyOneHealthyOneUnhealthy(newStrategy())) + t.Run("OneHostDownThenUp", testStrategyOneHostDownThenUp(newStrategy())) + } +} + +func testStrategyOneHealthyBackend(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + + healthy := map[string]struct{}{"A": {}} + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + const requests = 10 + for i := 0; i < requests; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Equal(t, requests, recorder.save["A"], "A should have been hit with all requests") + } +} + +func testStrategyTwoHealthyBackends(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + strategy.add(testHandler("B", 1, 0)) + + healthy := map[string]struct{}{"A": {}, "B": {}} + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + for i := 0; i < 100; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + // not all strategies are going to be 50/50, but they shouldn't + // balance to 100/0 if both are healthy. + assert.Greater(t, recorder.save["A"], 0, "A should have been hit") + assert.Greater(t, recorder.save["B"], 0, "B should have been hit") + t.Logf("strategy %s with two backends has a ratio of %d:%d", strategy.name(), recorder.save["A"], recorder.save["B"]) + } +} + +func testStrategyOneHealthyOneUnhealthy(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + strategy.add(testHandler("B", 1, 0)) + + strategy.setUp("B", false) + healthy := map[string]struct{}{"A": {}} + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + const requests = 100 + for i := 0; i < requests; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Equal(t, requests, recorder.save["A"], "A should have been hit with all requests") + assert.Equal(t, 0, recorder.save["B"], "B should not have been hit") + } +} + +func testStrategyOneHostDownThenUp(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + strategy.add(testHandler("B", 1, 0)) + + strategy.setUp("A", false) + strategy.setUp("A", true) + + healthy := map[string]struct{}{"A": {}, "B": {}} + + const requests = 100 + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + for i := 0; i < requests; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Greater(t, recorder.save["A"], 0, "A should have been hit") + assert.Greater(t, recorder.save["B"], 0, "B should have been hit") + } +} diff --git a/pkg/server/service/loadbalancer/p2c.go b/pkg/server/service/loadbalancer/p2c.go new file mode 100644 index 0000000000..89cd226a5e --- /dev/null +++ b/pkg/server/service/loadbalancer/p2c.go @@ -0,0 +1,105 @@ +package loadbalancer + +import ( + crand "crypto/rand" + "math/rand/v2" +) + +type rnd interface { + IntN(int) int +} + +// strategyPowerOfTwoChoices implements "the power-of-two-random-choices" algorithm for load balancing. +// The idea of this is two take two of the backends at random from the available backends, and select +// the backend that has the fewest in-flight requests. This algorithm more effectively balances the +// load than a round-robin approach, while also being constant time when picking: The strategy also +// has more beneficial "herd" behaviour than the "fewest connections" algorithm, especially when the +// load balancer doesn't have perfect knowledge about the global number of connections to the backend, +// for example, when running in a distributed fashion. +type strategyPowerOfTwoChoices struct { + healthy []*namedHandler + unhealthy []*namedHandler + rand rnd +} + +func newStrategyP2C() strategy { + return &strategyPowerOfTwoChoices{ + rand: newRand(), + } +} + +func (s *strategyPowerOfTwoChoices) nextServer(map[string]struct{}) *namedHandler { + if len(s.healthy) == 1 { + return s.healthy[0] + } + // in order to not get the same backend twice, we make the second call to s.rand.IntN one fewer + // than the length of the slice. We then have to shift over the second index if it is equal or + // greater than the first index, wrapping round if needed. + n1, n2 := s.rand.IntN(len(s.healthy)), s.rand.IntN(len(s.healthy)-1) + if n2 >= n1 { + n2 = (n2 + 1) % len(s.healthy) + } + + h1, h2 := s.healthy[n1], s.healthy[n2] + // ensure h1 has fewer inflight requests than h2 + if h2.inflight.Load() < h1.inflight.Load() { + h1, h2 = h2, h1 + } + + return h1 +} + +func (s *strategyPowerOfTwoChoices) add(h *namedHandler) { + s.healthy = append(s.healthy, h) +} + +func (s *strategyPowerOfTwoChoices) setUp(name string, up bool) { + if up { + var healthy *namedHandler + healthy, s.unhealthy = deleteAndPop(s.unhealthy, name) + s.healthy = append(s.healthy, healthy) + return + } + + var unhealthy *namedHandler + unhealthy, s.healthy = deleteAndPop(s.healthy, name) + s.unhealthy = append(s.unhealthy, unhealthy) +} + +func (s *strategyPowerOfTwoChoices) name() string { + return "p2c" +} + +func (s *strategyPowerOfTwoChoices) len() int { + return len(s.healthy) + len(s.unhealthy) +} + +func newRand() *rand.Rand { + var seed [16]byte + _, err := crand.Read(seed[:]) + if err != nil { + panic(err) + } + var seed1, seed2 uint64 + for i := 0; i < 16; i += 8 { + seed1 = seed1<<8 + uint64(seed[i]) + seed2 = seed2<<8 + uint64(seed[i+1]) + } + return rand.New(rand.NewPCG(seed1, seed2)) +} + +// we always overwrite slice that is passed in, so it doesn't matter if we mutate the parameter +func deleteAndPop(handlers []*namedHandler, name string) (deleted *namedHandler, remaining []*namedHandler) { + for i, h := range handlers { + if h.name == name { + // swap positions + handlers[i], handlers[len(handlers)-1] = handlers[len(handlers)-1], handlers[i] + // pop + deleted = handlers[len(handlers)-1] + remaining = handlers[:len(handlers)-1] + return + } + } + // this should never happen + panic("unreachable") +} diff --git a/pkg/server/service/loadbalancer/p2c_test.go b/pkg/server/service/loadbalancer/p2c_test.go new file mode 100644 index 0000000000..77e38ce4f0 --- /dev/null +++ b/pkg/server/service/loadbalancer/p2c_test.go @@ -0,0 +1,83 @@ +package loadbalancer + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +type mockRand struct { + vals []int + calls int +} + +func (m *mockRand) IntN(int) int { + defer func() { + m.calls++ + }() + return m.vals[m.calls] +} + +func testHandlers(inflights ...int) []*namedHandler { + var out []*namedHandler + for i, inflight := range inflights { + h := &namedHandler{ + name: fmt.Sprint(i), + } + h.inflight.Store(int64(inflight)) + out = append(out, h) + } + return out +} + +func TestStrategyTwoRandomChoices_AllHealthy(t *testing.T) { + cases := []struct { + name string + handlers []*namedHandler + rand *mockRand + expectHandler string + }{ + { + name: "oneHealthyHandler", + handlers: testHandlers(0), + rand: nil, + expectHandler: "0", + }, + { + name: "twoHandlersZeroInflight", + handlers: testHandlers(0, 0), + rand: &mockRand{vals: []int{1, 0}}, + expectHandler: "1", + }, + { + name: "choosesLowerOfTwo", + handlers: testHandlers(0, 1), + rand: &mockRand{vals: []int{1, 0}}, + expectHandler: "0", + }, + { + name: "choosesLowerOfThree", + handlers: testHandlers(10, 90, 40), + rand: &mockRand{vals: []int{1, 1}}, + expectHandler: "2", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + strategy := newStrategyP2C() + strategy.(*strategyPowerOfTwoChoices).rand = tc.rand + + status := map[string]struct{}{} + for _, h := range tc.handlers { + strategy.add(h) + status[h.name] = struct{}{} + } + + got := strategy.nextServer(status) + + assert.Equal(t, tc.expectHandler, got.name, "balancer strategy gave unexpected backend handler") + }) + } +} diff --git a/pkg/server/service/loadbalancer/wrr.go b/pkg/server/service/loadbalancer/wrr.go new file mode 100644 index 0000000000..8c1a951774 --- /dev/null +++ b/pkg/server/service/loadbalancer/wrr.go @@ -0,0 +1,85 @@ +package loadbalancer + +import ( + "container/heap" +) + +// strategyWRR is a WeightedRoundRobin load balancer strategy based on Earliest Deadline First (EDF). +// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) +// Each pick from the schedule has the earliest deadline entry selected. +// Entries have deadlines set at currentDeadline + 1 / weight, +// providing weighted round-robin behavior with floating point weights and an O(log n) pick time. +type strategyWRR struct { + handlers []*namedHandler + curDeadline float64 + deadline float64 +} + +func newStrategyWRR() strategy { + return &strategyWRR{} +} + +func (s *strategyWRR) nextServer(status map[string]struct{}) *namedHandler { + + var handler *namedHandler + for { + // Pick handler with closest deadline. + handler = heap.Pop(s).(*namedHandler) + + // curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones. + s.curDeadline = handler.deadline + handler.deadline += 1 / handler.weight + + heap.Push(s, handler) + if _, ok := status[handler.name]; ok { + break + } + } + return handler +} + +func (s *strategyWRR) add(h *namedHandler) { + h.deadline = s.curDeadline + 1/h.weight + heap.Push(s, h) +} + +func (s *strategyWRR) setUp(string, bool) {} + +func (s *strategyWRR) name() string { + return "wrr" +} + +func (s *strategyWRR) len() int { + return len(s.handlers) +} + +// Len implements heap.Interface/sort.Interface. +func (s *strategyWRR) Len() int { return s.len() } + +// Less implements heap.Interface/sort.Interface. +func (s *strategyWRR) Less(i, j int) bool { + return s.handlers[i].deadline < s.handlers[j].deadline +} + +// Swap implements heap.Interface/sort.Interface. +func (s *strategyWRR) Swap(i, j int) { + s.handlers[i], s.handlers[j] = s.handlers[j], s.handlers[i] +} + +// Push implements heap.Interface for pushing an item into the heap. +func (s *strategyWRR) Push(x interface{}) { + h, ok := x.(*namedHandler) + if !ok { + return + } + + s.handlers = append(s.handlers, h) +} + +// Pop implements heap.Interface for popping an item from the heap. +// It panics if b.Len() < 1. +func (s *strategyWRR) Pop() interface{} { + h := s.handlers[len(s.handlers)-1] + s.handlers = s.handlers[0 : len(s.handlers)-1] + return h +} diff --git a/pkg/server/service/service.go b/pkg/server/service/service.go index b01d2327c1..b3106adc02 100644 --- a/pkg/server/service/service.go +++ b/pkg/server/service/service.go @@ -27,9 +27,9 @@ import ( "github.com/traefik/traefik/v3/pkg/server/cookie" "github.com/traefik/traefik/v3/pkg/server/middleware" "github.com/traefik/traefik/v3/pkg/server/provider" + "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer" "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/failover" "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/mirror" - "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/wrr" ) const defaultMaxBodySize int64 = -1 @@ -220,7 +220,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName) } - balancer := wrr.New(config.Sticky, config.HealthCheck != nil) + balancer := loadbalancer.NewWRR(config.Sticky, config.HealthCheck != nil) for _, service := range shuffle(config.Services, m.rand) { serviceHandler, err := m.BuildHTTP(ctx, service.Name) if err != nil { @@ -283,7 +283,7 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName return nil, err } - lb := wrr.New(service.Sticky, service.HealthCheck != nil) + lb := loadbalancer.NewWRR(service.Sticky, service.HealthCheck != nil) healthCheckTargets := make(map[string]*url.URL) for _, server := range shuffle(service.Servers, m.rand) {