diff --git a/pkg/server/service/loadbalancer/wrr/wrr.go b/pkg/server/service/loadbalancer/balancer.go similarity index 67% rename from pkg/server/service/loadbalancer/wrr/wrr.go rename to pkg/server/service/loadbalancer/balancer.go index 6aa04c8ece..5b30686afb 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr.go +++ b/pkg/server/service/loadbalancer/balancer.go @@ -1,13 +1,13 @@ -package wrr +package loadbalancer import ( - "container/heap" "context" "errors" "hash/fnv" "net/http" "strconv" "sync" + "sync/atomic" "github.com/rs/zerolog/log" "github.com/traefik/traefik/v3/pkg/config/dynamic" @@ -18,6 +18,13 @@ type namedHandler struct { name string weight float64 deadline float64 + inflight atomic.Int64 +} + +func (h *namedHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + h.inflight.Add(1) + defer h.inflight.Add(-1) + h.Handler.ServeHTTP(w, req) } type stickyCookie struct { @@ -41,36 +48,74 @@ func convertSameSite(sameSite string) http.SameSite { } } -// Balancer is a WeightedRoundRobin load balancer based on Earliest Deadline First (EDF). -// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) -// Each pick from the schedule has the earliest deadline entry selected. -// Entries have deadlines set at currentDeadline + 1 / weight, -// providing weighted round-robin behavior with floating point weights and an O(log n) pick time. +// strategy is an interface that can be used to implement different load balancing strategies +// for the Balancer +type strategy interface { + // nextServer returns the next server to serve a request, this is called under the handlersMu lock. + // Each pick from the schedule has the earliest deadline entry selected. The status param is a + // map of the currently healthy child services. + nextServer(status map[string]struct{}) *namedHandler + // Entries have deadlines set at currentDeadline + 1 / weight, + // add adds a handler to the balancing algorithm, this is called under the handlersMu lock. + // providing weighted round-robin behavior with floating point weights and an O(log n) pick time. + add(h *namedHandler) + + setUp(name string, up bool) + + name() string + len() int +} + type Balancer struct { stickyCookie *stickyCookie wantsHealthCheck bool handlersMu sync.RWMutex - // References all the handlers by name and also by the hashed value of the name. - handlerMap map[string]*namedHandler - handlers []*namedHandler - curDeadline float64 - // status is a record of which child services of the Balancer are healthy, keyed - // by name of child service. A service is initially added to the map when it is - // created via Add, and it is later removed or added to the map as needed, - // through the SetStatus method. + // References all the handlers by name and also by the hashed value of the + // name. + handlerMap map[string]*namedHandler + // status is a record of which child services of the Balancer are healthy, + // keyed by name of child service. A service is initially added to the map + // when it is created via Add, and it is later removed or added to the map + // as needed, through the SetStatus method. status map[string]struct{} + + // strategy references the load balancing strategy to be used. The add and + // nextServer method must be called under the handlersMu lock + strategy strategy + // updaters is the list of hooks that are run (to update the Balancer // parent(s)), whenever the Balancer status changes. updaters []func(bool) } -// New creates a new load balancer. -func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { +// NewWRR creates a WeightedRoundRobin load balancer based on Earliest Deadline +// First (EDF). +// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) +// Each pick from the schedule has the earliest deadline entry selected. +// Entries have deadlines set at currentDeadline + 1 / weight, +// providing weighted round-robin behavior with floating point weights and an +// O(log n) pick time. +func NewWRR(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { + return newBalancer(sticky, wantHealthCheck, newStrategyWRR()) +} + +// NewP2C creates a "the power-of-two-random-choices" algorithm for load +// balancing. The idea of this is two take two of the backends at random from +// the available backends, and select the backend that has the fewest in-flight +// requests. This is constant time when picking, and has more beneficial "herd" +// behaviour than the "fewest connections" algorithm. +func NewP2C(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { + return newBalancer(sticky, wantHealthCheck, newStrategyP2C()) +} + +func newBalancer(sticky *dynamic.Sticky, wantHealthCheck bool, strategy strategy) *Balancer { + balancer := &Balancer{ status: make(map[string]struct{}), handlerMap: make(map[string]*namedHandler), wantsHealthCheck: wantHealthCheck, + strategy: strategy, } if sticky != nil && sticky.Cookie != nil { balancer.stickyCookie = &stickyCookie{ @@ -85,37 +130,6 @@ func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { return balancer } -// Len implements heap.Interface/sort.Interface. -func (b *Balancer) Len() int { return len(b.handlers) } - -// Less implements heap.Interface/sort.Interface. -func (b *Balancer) Less(i, j int) bool { - return b.handlers[i].deadline < b.handlers[j].deadline -} - -// Swap implements heap.Interface/sort.Interface. -func (b *Balancer) Swap(i, j int) { - b.handlers[i], b.handlers[j] = b.handlers[j], b.handlers[i] -} - -// Push implements heap.Interface for pushing an item into the heap. -func (b *Balancer) Push(x interface{}) { - h, ok := x.(*namedHandler) - if !ok { - return - } - - b.handlers = append(b.handlers, h) -} - -// Pop implements heap.Interface for popping an item from the heap. -// It panics if b.Len() < 1. -func (b *Balancer) Pop() interface{} { - h := b.handlers[len(b.handlers)-1] - b.handlers = b.handlers[0 : len(b.handlers)-1] - return h -} - // SetStatus sets on the balancer that its given child is now of the given // status. balancerName is only needed for logging purposes. func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) { @@ -137,6 +151,8 @@ func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) { delete(b.status, childName) } + b.strategy.setUp(childName, up) + upAfter := len(b.status) > 0 status = "DOWN" if upAfter { @@ -174,26 +190,13 @@ func (b *Balancer) nextServer() (*namedHandler, error) { b.handlersMu.Lock() defer b.handlersMu.Unlock() - if len(b.handlers) == 0 || len(b.status) == 0 { + if b.strategy.len() == 0 || len(b.status) == 0 { return nil, errNoAvailableServer } - var handler *namedHandler - for { - // Pick handler with closest deadline. - handler = heap.Pop(b).(*namedHandler) - - // curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones. - b.curDeadline = handler.deadline - handler.deadline += 1 / handler.weight - - heap.Push(b, handler) - if _, ok := b.status[handler.name]; ok { - break - } - } + handler := b.strategy.nextServer(b.status) - log.Debug().Msgf("Service selected by WRR: %s", handler.name) + log.Debug().Msgf("Service selected by strategy %q: %s", b.strategy.name(), handler.name) return handler, nil } @@ -263,8 +266,7 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int) { h := &namedHandler{Handler: handler, name: name, weight: float64(w)} b.handlersMu.Lock() - h.deadline = b.curDeadline + 1/h.weight - heap.Push(b, h) + b.strategy.add(h) b.status[name] = struct{}{} b.handlerMap[name] = h b.handlerMap[hash(name)] = h diff --git a/pkg/server/service/loadbalancer/wrr/wrr_test.go b/pkg/server/service/loadbalancer/balancer_test.go similarity index 57% rename from pkg/server/service/loadbalancer/wrr/wrr_test.go rename to pkg/server/service/loadbalancer/balancer_test.go index 3708fa618b..552736451e 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr_test.go +++ b/pkg/server/service/loadbalancer/balancer_test.go @@ -1,4 +1,4 @@ -package wrr +package loadbalancer import ( "context" @@ -11,7 +11,7 @@ import ( ) func TestBalancer(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -33,7 +33,7 @@ func TestBalancer(t *testing.T) { } func TestBalancerNoService(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) recorder := httptest.NewRecorder() balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) @@ -42,7 +42,7 @@ func TestBalancerNoService(t *testing.T) { } func TestBalancerOneServerZeroWeight(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -64,7 +64,7 @@ type key string const serviceName key = "serviceName" func TestBalancerNoServiceUp(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(http.StatusInternalServerError) @@ -84,7 +84,7 @@ func TestBalancerNoServiceUp(t *testing.T) { } func TestBalancerOneServerDown(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -105,7 +105,7 @@ func TestBalancerOneServerDown(t *testing.T) { } func TestBalancerDownThenUp(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -134,7 +134,7 @@ func TestBalancerDownThenUp(t *testing.T) { } func TestBalancerPropagate(t *testing.T) { - balancer1 := New(nil, true) + balancer1 := NewWRR(nil, true) balancer1.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") @@ -145,7 +145,7 @@ func TestBalancerPropagate(t *testing.T) { rw.WriteHeader(http.StatusOK) }), Int(1)) - balancer2 := New(nil, true) + balancer2 := NewWRR(nil, true) balancer2.Add("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "third") rw.WriteHeader(http.StatusOK) @@ -155,7 +155,7 @@ func TestBalancerPropagate(t *testing.T) { rw.WriteHeader(http.StatusOK) }), Int(1)) - topBalancer := New(nil, true) + topBalancer := NewWRR(nil, true) topBalancer.Add("balancer1", balancer1, Int(1)) _ = balancer1.RegisterStatusUpdater(func(up bool) { topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up) @@ -206,8 +206,8 @@ func TestBalancerPropagate(t *testing.T) { assert.Equal(t, wantStatus, recorder.status) } -func TestBalancerAllServersZeroWeight(t *testing.T) { - balancer := New(nil, false) +func TestWRRBalancerAllServersZeroWeight(t *testing.T) { + balancer := NewWRR(nil, false) balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0)) balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0)) @@ -219,85 +219,113 @@ func TestBalancerAllServersZeroWeight(t *testing.T) { } func TestSticky(t *testing.T) { - balancer := New(&dynamic.Sticky{ - Cookie: &dynamic.Cookie{ - Name: "test", - Secure: true, - HTTPOnly: true, - SameSite: "none", - MaxAge: 42, - }, - }, false) - - balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "first") - rw.WriteHeader(http.StatusOK) - }), Int(1)) - - balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "second") - rw.WriteHeader(http.StatusOK) - }), Int(2)) - - recorder := &responseRecorder{ - ResponseRecorder: httptest.NewRecorder(), - save: map[string]int{}, - cookies: make(map[string]*http.Cookie), + balancers := []*Balancer{ + NewWRR(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{ + Name: "test", + Secure: true, + HTTPOnly: true, + SameSite: "none", + MaxAge: 42, + }, + }, false), + NewP2C(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{ + Name: "test", + Secure: true, + HTTPOnly: true, + SameSite: "none", + MaxAge: 42, + }, + }, false), } - req := httptest.NewRequest(http.MethodGet, "/", nil) - for i := 0; i < 3; i++ { - for _, cookie := range recorder.Result().Cookies() { - assert.NotContains(t, "test=first", cookie.Value) - assert.NotContains(t, "test=second", cookie.Value) - req.AddCookie(cookie) - } - recorder.ResponseRecorder = httptest.NewRecorder() - - balancer.ServeHTTP(recorder, req) + // we need to make sure second is chosen + balancers[1].strategy.(*strategyPowerOfTwoChoices).rand = &mockRand{vals: []int{1, 0}} + + for _, balancer := range balancers { + t.Run(balancer.strategy.name(), func(t *testing.T) { + + balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) + + balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(2)) + + recorder := &responseRecorder{ + ResponseRecorder: httptest.NewRecorder(), + save: map[string]int{}, + cookies: make(map[string]*http.Cookie), + } + + req := httptest.NewRequest(http.MethodGet, "/", nil) + for i := 0; i < 3; i++ { + for _, cookie := range recorder.Result().Cookies() { + assert.NotContains(t, "test=first", cookie.Value) + assert.NotContains(t, "test=second", cookie.Value) + req.AddCookie(cookie) + } + recorder.ResponseRecorder = httptest.NewRecorder() + + balancer.ServeHTTP(recorder, req) + } + + assert.Equal(t, 0, recorder.save["first"]) + assert.Equal(t, 3, recorder.save["second"]) + assert.True(t, recorder.cookies["test"].HttpOnly) + assert.True(t, recorder.cookies["test"].Secure) + assert.Equal(t, http.SameSiteNoneMode, recorder.cookies["test"].SameSite) + assert.Equal(t, 42, recorder.cookies["test"].MaxAge) + }) } - - assert.Equal(t, 0, recorder.save["first"]) - assert.Equal(t, 3, recorder.save["second"]) - assert.True(t, recorder.cookies["test"].HttpOnly) - assert.True(t, recorder.cookies["test"].Secure) - assert.Equal(t, http.SameSiteNoneMode, recorder.cookies["test"].SameSite) - assert.Equal(t, 42, recorder.cookies["test"].MaxAge) } func TestSticky_FallBack(t *testing.T) { - balancer := New(&dynamic.Sticky{ - Cookie: &dynamic.Cookie{Name: "test"}, - }, false) + balancers := []*Balancer{ + NewWRR(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{Name: "test"}, + }, false), + NewP2C(&dynamic.Sticky{ + Cookie: &dynamic.Cookie{Name: "test"}, + }, false), + } - balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "first") - rw.WriteHeader(http.StatusOK) - }), Int(1)) + for _, balancer := range balancers { + t.Run(balancer.strategy.name(), func(t *testing.T) { + balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), Int(1)) - balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - rw.Header().Set("server", "second") - rw.WriteHeader(http.StatusOK) - }), Int(2)) + balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), Int(2)) - recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} - req := httptest.NewRequest(http.MethodGet, "/", nil) - req.AddCookie(&http.Cookie{Name: "test", Value: "second"}) - for i := 0; i < 3; i++ { - recorder.ResponseRecorder = httptest.NewRecorder() + req := httptest.NewRequest(http.MethodGet, "/", nil) + req.AddCookie(&http.Cookie{Name: "test", Value: "second"}) + for i := 0; i < 3; i++ { + recorder.ResponseRecorder = httptest.NewRecorder() - balancer.ServeHTTP(recorder, req) - } + balancer.ServeHTTP(recorder, req) + } - assert.Equal(t, 0, recorder.save["first"]) - assert.Equal(t, 3, recorder.save["second"]) + assert.Equal(t, 0, recorder.save["first"]) + assert.Equal(t, 3, recorder.save["second"]) + }) + } } // TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start, // and that it does not "over-favor" the high-weighted ones with a biased start-up regime. func TestBalancerBias(t *testing.T) { - balancer := New(nil, false) + balancer := NewWRR(nil, false) balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "A") @@ -339,3 +367,116 @@ func (r *responseRecorder) WriteHeader(statusCode int) { } r.ResponseRecorder.WriteHeader(statusCode) } + +func testHandler(name string, weight float64, inflight int) *namedHandler { + h := &namedHandler{ + name: name, + } + h.inflight.Store(int64(inflight)) + h.weight = weight + h.Handler = http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { + rw.Header().Set("server", name) + rw.WriteHeader(http.StatusOK) + }) + return h +} + +func TestStrategies(t *testing.T) { + newStrategies := []func() strategy{ + newStrategyWRR, + newStrategyP2C, + } + + for _, s := range newStrategies { + t.Run(s().name(), testStrategy(s)) + } +} + +func testStrategy(newStrategy func() strategy) func(t *testing.T) { + return func(t *testing.T) { + t.Run("OneHealthyBackend", testStrategyOneHealthyBackend(newStrategy())) + t.Run("TwoHealthyBackends", testStrategyTwoHealthyBackends(newStrategy())) + t.Run("OneHealthyOneUnhealthy", testStrategyOneHealthyOneUnhealthy(newStrategy())) + t.Run("OneHostDownThenUp", testStrategyOneHostDownThenUp(newStrategy())) + } +} + +func testStrategyOneHealthyBackend(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + + healthy := map[string]struct{}{"A": {}} + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + const requests = 10 + for i := 0; i < requests; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Equal(t, requests, recorder.save["A"], "A should have been hit with all requests") + } +} + +func testStrategyTwoHealthyBackends(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + strategy.add(testHandler("B", 1, 0)) + + healthy := map[string]struct{}{"A": {}, "B": {}} + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + for i := 0; i < 100; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + // not all strategies are going to be 50/50, but they shouldn't + // balance to 100/0 if both are healthy. + assert.Greater(t, recorder.save["A"], 0, "A should have been hit") + assert.Greater(t, recorder.save["B"], 0, "B should have been hit") + t.Logf("strategy %s with two backends has a ratio of %d:%d", strategy.name(), recorder.save["A"], recorder.save["B"]) + } +} + +func testStrategyOneHealthyOneUnhealthy(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + strategy.add(testHandler("B", 1, 0)) + + strategy.setUp("B", false) + healthy := map[string]struct{}{"A": {}} + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + const requests = 100 + for i := 0; i < requests; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Equal(t, requests, recorder.save["A"], "A should have been hit with all requests") + assert.Equal(t, 0, recorder.save["B"], "B should not have been hit") + } +} + +func testStrategyOneHostDownThenUp(strategy strategy) func(t *testing.T) { + return func(t *testing.T) { + strategy.add(testHandler("A", 1, 0)) + strategy.add(testHandler("B", 1, 0)) + + strategy.setUp("A", false) + strategy.setUp("A", true) + + healthy := map[string]struct{}{"A": {}, "B": {}} + + const requests = 100 + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + for i := 0; i < requests; i++ { + strategy.nextServer(healthy).ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + } + + assert.Greater(t, recorder.save["A"], 0, "A should have been hit") + assert.Greater(t, recorder.save["B"], 0, "B should have been hit") + } +} diff --git a/pkg/server/service/loadbalancer/p2c.go b/pkg/server/service/loadbalancer/p2c.go new file mode 100644 index 0000000000..89cd226a5e --- /dev/null +++ b/pkg/server/service/loadbalancer/p2c.go @@ -0,0 +1,105 @@ +package loadbalancer + +import ( + crand "crypto/rand" + "math/rand/v2" +) + +type rnd interface { + IntN(int) int +} + +// strategyPowerOfTwoChoices implements "the power-of-two-random-choices" algorithm for load balancing. +// The idea of this is two take two of the backends at random from the available backends, and select +// the backend that has the fewest in-flight requests. This algorithm more effectively balances the +// load than a round-robin approach, while also being constant time when picking: The strategy also +// has more beneficial "herd" behaviour than the "fewest connections" algorithm, especially when the +// load balancer doesn't have perfect knowledge about the global number of connections to the backend, +// for example, when running in a distributed fashion. +type strategyPowerOfTwoChoices struct { + healthy []*namedHandler + unhealthy []*namedHandler + rand rnd +} + +func newStrategyP2C() strategy { + return &strategyPowerOfTwoChoices{ + rand: newRand(), + } +} + +func (s *strategyPowerOfTwoChoices) nextServer(map[string]struct{}) *namedHandler { + if len(s.healthy) == 1 { + return s.healthy[0] + } + // in order to not get the same backend twice, we make the second call to s.rand.IntN one fewer + // than the length of the slice. We then have to shift over the second index if it is equal or + // greater than the first index, wrapping round if needed. + n1, n2 := s.rand.IntN(len(s.healthy)), s.rand.IntN(len(s.healthy)-1) + if n2 >= n1 { + n2 = (n2 + 1) % len(s.healthy) + } + + h1, h2 := s.healthy[n1], s.healthy[n2] + // ensure h1 has fewer inflight requests than h2 + if h2.inflight.Load() < h1.inflight.Load() { + h1, h2 = h2, h1 + } + + return h1 +} + +func (s *strategyPowerOfTwoChoices) add(h *namedHandler) { + s.healthy = append(s.healthy, h) +} + +func (s *strategyPowerOfTwoChoices) setUp(name string, up bool) { + if up { + var healthy *namedHandler + healthy, s.unhealthy = deleteAndPop(s.unhealthy, name) + s.healthy = append(s.healthy, healthy) + return + } + + var unhealthy *namedHandler + unhealthy, s.healthy = deleteAndPop(s.healthy, name) + s.unhealthy = append(s.unhealthy, unhealthy) +} + +func (s *strategyPowerOfTwoChoices) name() string { + return "p2c" +} + +func (s *strategyPowerOfTwoChoices) len() int { + return len(s.healthy) + len(s.unhealthy) +} + +func newRand() *rand.Rand { + var seed [16]byte + _, err := crand.Read(seed[:]) + if err != nil { + panic(err) + } + var seed1, seed2 uint64 + for i := 0; i < 16; i += 8 { + seed1 = seed1<<8 + uint64(seed[i]) + seed2 = seed2<<8 + uint64(seed[i+1]) + } + return rand.New(rand.NewPCG(seed1, seed2)) +} + +// we always overwrite slice that is passed in, so it doesn't matter if we mutate the parameter +func deleteAndPop(handlers []*namedHandler, name string) (deleted *namedHandler, remaining []*namedHandler) { + for i, h := range handlers { + if h.name == name { + // swap positions + handlers[i], handlers[len(handlers)-1] = handlers[len(handlers)-1], handlers[i] + // pop + deleted = handlers[len(handlers)-1] + remaining = handlers[:len(handlers)-1] + return + } + } + // this should never happen + panic("unreachable") +} diff --git a/pkg/server/service/loadbalancer/p2c_test.go b/pkg/server/service/loadbalancer/p2c_test.go new file mode 100644 index 0000000000..77e38ce4f0 --- /dev/null +++ b/pkg/server/service/loadbalancer/p2c_test.go @@ -0,0 +1,83 @@ +package loadbalancer + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +type mockRand struct { + vals []int + calls int +} + +func (m *mockRand) IntN(int) int { + defer func() { + m.calls++ + }() + return m.vals[m.calls] +} + +func testHandlers(inflights ...int) []*namedHandler { + var out []*namedHandler + for i, inflight := range inflights { + h := &namedHandler{ + name: fmt.Sprint(i), + } + h.inflight.Store(int64(inflight)) + out = append(out, h) + } + return out +} + +func TestStrategyTwoRandomChoices_AllHealthy(t *testing.T) { + cases := []struct { + name string + handlers []*namedHandler + rand *mockRand + expectHandler string + }{ + { + name: "oneHealthyHandler", + handlers: testHandlers(0), + rand: nil, + expectHandler: "0", + }, + { + name: "twoHandlersZeroInflight", + handlers: testHandlers(0, 0), + rand: &mockRand{vals: []int{1, 0}}, + expectHandler: "1", + }, + { + name: "choosesLowerOfTwo", + handlers: testHandlers(0, 1), + rand: &mockRand{vals: []int{1, 0}}, + expectHandler: "0", + }, + { + name: "choosesLowerOfThree", + handlers: testHandlers(10, 90, 40), + rand: &mockRand{vals: []int{1, 1}}, + expectHandler: "2", + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + strategy := newStrategyP2C() + strategy.(*strategyPowerOfTwoChoices).rand = tc.rand + + status := map[string]struct{}{} + for _, h := range tc.handlers { + strategy.add(h) + status[h.name] = struct{}{} + } + + got := strategy.nextServer(status) + + assert.Equal(t, tc.expectHandler, got.name, "balancer strategy gave unexpected backend handler") + }) + } +} diff --git a/pkg/server/service/loadbalancer/wrr.go b/pkg/server/service/loadbalancer/wrr.go new file mode 100644 index 0000000000..8c1a951774 --- /dev/null +++ b/pkg/server/service/loadbalancer/wrr.go @@ -0,0 +1,85 @@ +package loadbalancer + +import ( + "container/heap" +) + +// strategyWRR is a WeightedRoundRobin load balancer strategy based on Earliest Deadline First (EDF). +// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) +// Each pick from the schedule has the earliest deadline entry selected. +// Entries have deadlines set at currentDeadline + 1 / weight, +// providing weighted round-robin behavior with floating point weights and an O(log n) pick time. +type strategyWRR struct { + handlers []*namedHandler + curDeadline float64 + deadline float64 +} + +func newStrategyWRR() strategy { + return &strategyWRR{} +} + +func (s *strategyWRR) nextServer(status map[string]struct{}) *namedHandler { + + var handler *namedHandler + for { + // Pick handler with closest deadline. + handler = heap.Pop(s).(*namedHandler) + + // curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones. + s.curDeadline = handler.deadline + handler.deadline += 1 / handler.weight + + heap.Push(s, handler) + if _, ok := status[handler.name]; ok { + break + } + } + return handler +} + +func (s *strategyWRR) add(h *namedHandler) { + h.deadline = s.curDeadline + 1/h.weight + heap.Push(s, h) +} + +func (s *strategyWRR) setUp(string, bool) {} + +func (s *strategyWRR) name() string { + return "wrr" +} + +func (s *strategyWRR) len() int { + return len(s.handlers) +} + +// Len implements heap.Interface/sort.Interface. +func (s *strategyWRR) Len() int { return s.len() } + +// Less implements heap.Interface/sort.Interface. +func (s *strategyWRR) Less(i, j int) bool { + return s.handlers[i].deadline < s.handlers[j].deadline +} + +// Swap implements heap.Interface/sort.Interface. +func (s *strategyWRR) Swap(i, j int) { + s.handlers[i], s.handlers[j] = s.handlers[j], s.handlers[i] +} + +// Push implements heap.Interface for pushing an item into the heap. +func (s *strategyWRR) Push(x interface{}) { + h, ok := x.(*namedHandler) + if !ok { + return + } + + s.handlers = append(s.handlers, h) +} + +// Pop implements heap.Interface for popping an item from the heap. +// It panics if b.Len() < 1. +func (s *strategyWRR) Pop() interface{} { + h := s.handlers[len(s.handlers)-1] + s.handlers = s.handlers[0 : len(s.handlers)-1] + return h +} diff --git a/pkg/server/service/service.go b/pkg/server/service/service.go index b01d2327c1..b3106adc02 100644 --- a/pkg/server/service/service.go +++ b/pkg/server/service/service.go @@ -27,9 +27,9 @@ import ( "github.com/traefik/traefik/v3/pkg/server/cookie" "github.com/traefik/traefik/v3/pkg/server/middleware" "github.com/traefik/traefik/v3/pkg/server/provider" + "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer" "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/failover" "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/mirror" - "github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/wrr" ) const defaultMaxBodySize int64 = -1 @@ -220,7 +220,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName) } - balancer := wrr.New(config.Sticky, config.HealthCheck != nil) + balancer := loadbalancer.NewWRR(config.Sticky, config.HealthCheck != nil) for _, service := range shuffle(config.Services, m.rand) { serviceHandler, err := m.BuildHTTP(ctx, service.Name) if err != nil { @@ -283,7 +283,7 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName return nil, err } - lb := wrr.New(service.Sticky, service.HealthCheck != nil) + lb := loadbalancer.NewWRR(service.Sticky, service.HealthCheck != nil) healthCheckTargets := make(map[string]*url.URL) for _, server := range shuffle(service.Servers, m.rand) {