Skip to content

Commit

Permalink
Make loadbalancer avoid sending requests to unhealthy nodes
Browse files Browse the repository at this point in the history
Updates #2346

Signed-off-by: Roman Zavodskikh <roman.zavodskikh@zalando.de>
  • Loading branch information
Roman Zavodskikh committed Jun 8, 2023
1 parent 87a23b9 commit 171360a
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 137 deletions.
167 changes: 57 additions & 110 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,98 +67,36 @@ func fadeIn(now time.Time, duration time.Duration, exponent float64, detected ti
return math.Pow(float64(rel)/float64(duration), exponent)
}

func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, w []float64, now time.Time) routing.LBEndpoint {
var sum float64
weightSums := w
rt := ctx.Route
ep := ctx.Route.LBEndpoints
for _, epi := range ep {
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
sum += wi
weightSums = append(weightSums, sum)
}

choice := ep[len(weightSums)-1]
r := rnd.Float64() * sum
for i := range weightSums {
if weightSums[i] > r {
choice = ep[i]
break
}
}

return choice
}

func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, wf []float64, now time.Time) routing.LBEndpoint {
notFadingIndexes := wi
ep := ctx.Route.LBEndpoints

// if all endpoints are fading, the simplest approach is to use the oldest,
// this departs from the desired curve, but guarantees monotonic fade-in. From
// the perspective of the oldest endpoint, this is temporarily the same as if
// there was no fade-in.
if len(notFadingIndexes) == 0 {
return shiftWeighted(rnd, ctx, wf, now)
}

// otherwise equally distribute between the old endpoints
return ep[notFadingIndexes[rnd.Intn(len(notFadingIndexes))]]
}

func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, notFadingIndexes []int, choice int, algo routing.LBAlgorithm) routing.LBEndpoint {
ep := ctx.Route.LBEndpoints
func goodEndpoint(rnd *rand.Rand, ep *routing.LBEndpoint, ctx *routing.LBContext) bool {
now := time.Now()
f := fadeIn(
now,
ctx.Route.LBFadeInDuration,
ctx.Route.LBFadeInExponent,
ctx.Route.LBEndpoints[choice].Detected,
ep.Detected,
)
goodFadeIn := rnd.Float64() < f

if rnd.Float64() < f {
return ep[choice]
}
for i := 0; i < len(ep); i++ {
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Detected); !fadingIn {
notFadingIndexes = append(notFadingIndexes, i)
}
failedReqs, totalReqs := (int64)(0), (int64)(0)
if ep.Metrics != nil {
failedReqs, totalReqs = ep.Metrics.GetFailedRequests()
}
goodHealthyEp := totalReqs == 0 || float64(failedReqs)/float64(totalReqs) < 0.95

switch a := algo.(type) {
case *roundRobin:
return shiftToRemaining(a.rnd, ctx, notFadingIndexes, a.fadingWeights, now)
case *random:
return shiftToRemaining(a.rand, ctx, notFadingIndexes, a.fadingWeights, now)
case *consistentHash:
// If all endpoints are fading, normal consistent hash result
if len(notFadingIndexes) == 0 {
return ep[choice]
}
// otherwise calculate consistent hash again using endpoints which are not fading
return ep[a.chooseConsistentHashEndpoint(ctx, skipFadingEndpoints(notFadingIndexes))]
default:
return ep[choice]
}
return goodFadeIn && goodHealthyEp
}

type roundRobin struct {
mx sync.Mutex
index int
rnd *rand.Rand
notFadingIndexes []int
fadingWeights []float64
mx sync.Mutex
index int
rnd *rand.Rand
}

func newRoundRobin(endpoints []string) routing.LBAlgorithm {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec
return &roundRobin{
index: rnd.Intn(len(endpoints)),
rnd: rnd,

// preallocating frequently used slice
notFadingIndexes: make([]int, 0, len(endpoints)),
fadingWeights: make([]float64, 0, len(endpoints)),
}
}

Expand All @@ -170,30 +108,31 @@ func (r *roundRobin) Apply(ctx *routing.LBContext) routing.LBEndpoint {

r.mx.Lock()
defer r.mx.Unlock()
r.index = (r.index + 1) % len(ctx.Route.LBEndpoints)

if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[r.index]
endpoints := []routing.LBEndpoint{}
for _, e := range ctx.Route.LBEndpoints {
e := e
if goodEndpoint(r.rnd, &e, ctx) {
endpoints = append(endpoints, e)
}
}
if len(endpoints) == 0 {
endpoints = ctx.Route.LBEndpoints
}
r.index = (r.index + 1) % len(endpoints)

return withFadeIn(r.rnd, ctx, r.notFadingIndexes, r.index, r)
return endpoints[r.index]
}

type random struct {
rand *rand.Rand
notFadingIndexes []int
fadingWeights []float64
rand *rand.Rand
}

func newRandom(endpoints []string) routing.LBAlgorithm {
t := time.Now().UnixNano()
// #nosec
return &random{
rand: rand.New(rand.NewSource(t)),

// preallocating frequently used slice
notFadingIndexes: make([]int, 0, len(endpoints)),
fadingWeights: make([]float64, 0, len(endpoints)),
}
}

Expand All @@ -203,12 +142,19 @@ func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint {
return ctx.Route.LBEndpoints[0]
}

i := r.rand.Intn(len(ctx.Route.LBEndpoints))
if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[i]
endpoints := []routing.LBEndpoint{}
for _, e := range ctx.Route.LBEndpoints {
e := e
if goodEndpoint(r.rand, &e, ctx) {
endpoints = append(endpoints, e)
}
}
if len(endpoints) == 0 {
endpoints = ctx.Route.LBEndpoints
}
i := r.rand.Intn(len(endpoints))

return withFadeIn(r.rand, ctx, r.notFadingIndexes, i, r)
return endpoints[i]
}

type (
Expand All @@ -217,9 +163,10 @@ type (
hash uint64 // hash of endpoint
}
consistentHash struct {
hashRing []endpointHash // list of endpoints sorted by hash value
rand *rand.Rand
notFadingIndexes []int
hashRing []endpointHash // list of endpoints sorted by hash value
rand *rand.Rand
hashesPerEndpoint int
allHosts map[string]struct{}
}
)

Expand All @@ -232,16 +179,18 @@ func (ch *consistentHash) Swap(i, j int) {
func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routing.LBAlgorithm {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec
ch := &consistentHash{
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
rand: rnd,
notFadingIndexes: make([]int, 0, len(endpoints)),
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
rand: rnd,
allHosts: map[string]struct{}{},
}
for i, ep := range endpoints {
endpointStartIndex := hashesPerEndpoint * i
for j := 0; j < hashesPerEndpoint; j++ {
ch.hashRing[endpointStartIndex+j] = endpointHash{i, hash(fmt.Sprintf("%s-%d", ep, j))}
}
ch.allHosts[ep] = struct{}{}
}
ch.hashesPerEndpoint = hashesPerEndpoint
sort.Sort(ch)
return ch
}
Expand Down Expand Up @@ -309,13 +258,22 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
return ctx.Route.LBEndpoints[0]
}

choice := ch.chooseConsistentHashEndpoint(ctx, noSkippedEndpoints)

if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[choice]
hosts := map[string]struct{}{}
for _, e := range ctx.Route.LBEndpoints {
e := e
if goodEndpoint(ch.rand, &e, ctx) {
hosts[e.Host] = struct{}{}
}
}
if len(hosts) == 0 {
hosts = ch.allHosts
}

return withFadeIn(ch.rand, ctx, ch.notFadingIndexes, choice, ch)
choice := ch.chooseConsistentHashEndpoint(ctx, func(i int) bool {
_, ok := hosts[ctx.Route.LBEndpoints[i].Host]
return !ok
})
return ctx.Route.LBEndpoints[choice]
}

func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int {
Expand All @@ -334,17 +292,6 @@ func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, s
return choice
}

func skipFadingEndpoints(notFadingEndpoints []int) func(int) bool {
return func(i int) bool {
for _, notFadingEndpoint := range notFadingEndpoints {
if i == notFadingEndpoint {
return false
}
}
return true
}
}

func noSkippedEndpoints(_ int) bool {
return false
}
Expand Down
33 changes: 7 additions & 26 deletions loadbalancer/fadein_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
//go:build !race

package loadbalancer

import (
Expand All @@ -12,7 +14,7 @@ import (
)

const (
fadeInDuration = 100 * time.Millisecond
fadeInDuration = 300 * time.Millisecond
bucketCount = 20
monotonyTolerance = 0.4 // we need to use a high tolerance for CI testing
)
Expand Down Expand Up @@ -76,15 +78,15 @@ func testFadeIn(
})
}

hashKeys := findHashKeys(a, ctx)
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
t.Log("test start", time.Now())
var stats []string
stop := time.After(fadeInDuration)
// Emulate the load balancer loop, sending requests to it with iterative hash keys
// of every endpoint over and over again till fadeIn period is over.
// Emulate the load balancer loop, sending requests to it with random hash keys
// over and over again till fadeIn period is over.
func() {
for {
ctx.Params[ConsistentHashKey] = hashKeys[len(stats)%len(hashKeys)]
ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(100000))
ep := a.Apply(ctx)
stats = append(stats, ep.Host)
select {
Expand Down Expand Up @@ -152,27 +154,6 @@ func testFadeIn(
})
}

// For each endpoint, return a hash key which will make the consistent hash algorithm select it.
// This allows the test to emulate round robin, useful for showing the increase in requests to each endpoint is monotonic.
func findHashKeys(a routing.LBAlgorithm, ctx *routing.LBContext) []string {
// temporarily disable fadein
ctx.Route.LBFadeInDuration = 0
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
var hashKeys []string
for _, ep := range ctx.Route.LBEndpoints {
for {
ctx.Params[ConsistentHashKey] = strconv.Itoa(rnd.Intn(1000))
if ep == a.Apply(ctx) {
hashKeys = append(hashKeys, ctx.Params[ConsistentHashKey].(string))
break
}
}
}
delete(ctx.Params, ConsistentHashKey)
ctx.Route.LBFadeInDuration = fadeInDuration
return hashKeys
}

func TestFadeIn(t *testing.T) {
old := 2 * fadeInDuration
testFadeIn(t, "round-robin, 0", newRoundRobin, old, old)
Expand Down
7 changes: 7 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,13 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
req = injectClientTrace(req, ctx.proxySpan)

response, err := roundTripper.RoundTrip(req)
if endpoint != nil {
if err != nil {
endpoint.Metrics.AddFailedRequest()
} else {
endpoint.Metrics.AddSucceededRequest()
}
}

ctx.proxySpan.LogKV("http_roundtrip", EndEvent)
if err != nil {
Expand Down
32 changes: 31 additions & 1 deletion routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ type RouteFilter struct {
// LBMetrics contains metrics used by LB algorithms
type LBMetrics struct {
inflightRequests int64

failedRequests int64
totalRequests int64
}

// IncInflightRequest increments the number of outstanding requests from the proxy to a given backend.
Expand All @@ -160,11 +163,38 @@ func (m *LBMetrics) DecInflightRequest() {
atomic.AddInt64(&m.inflightRequests, -1)
}

// GetInflightRequests decrements the number of outstanding requests from the proxy to a given backend.
// GetInflightRequests loads the number of outstanding requests from the proxy to a given backend.
func (m *LBMetrics) GetInflightRequests() int {
return int(atomic.LoadInt64(&m.inflightRequests))
}

// AddFailedRequest increments the counters related to failed requests and total requests from the proxy to a given backend.
func (m *LBMetrics) AddFailedRequest() {
atomic.AddInt64(&m.totalRequests, 1)
atomic.AddInt64(&m.failedRequests, 1)
}

func (m *LBMetrics) AddSucceededRequest() {
atomic.AddInt64(&m.totalRequests, 1)
}

func (m *LBMetrics) GetFailedRequests() (int64, int64) {
failedReqs := atomic.LoadInt64(&m.failedRequests)
totalReqs := atomic.LoadInt64(&m.totalRequests)

return failedReqs, totalReqs
}

// DecTotalRequest decrements the number of total requests from the proxy to a given backend.
func (m *LBMetrics) DecTotalRequest() {
atomic.AddInt64(&m.totalRequests, -1)
}

// GetTotalRequests loads the number of total requests from the proxy to a given backend.
func (m *LBMetrics) GetTotalRequests() int {
return int(atomic.LoadInt64(&m.totalRequests))
}

// LBEndpoint represents the scheme and the host of load balanced
// backends.
type LBEndpoint struct {
Expand Down

0 comments on commit 171360a

Please sign in to comment.