Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make loadbalancer avoid sending requests to unhealthy nodes #2363

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 55 additions & 110 deletions loadbalancer/algorithm.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,98 +67,36 @@ func fadeIn(now time.Time, duration time.Duration, exponent float64, detected ti
return math.Pow(float64(rel)/float64(duration), exponent)
}

func shiftWeighted(rnd *rand.Rand, ctx *routing.LBContext, w []float64, now time.Time) routing.LBEndpoint {
var sum float64
weightSums := w
rt := ctx.Route
ep := ctx.Route.LBEndpoints
for _, epi := range ep {
wi := fadeIn(now, rt.LBFadeInDuration, rt.LBFadeInExponent, epi.Detected)
sum += wi
weightSums = append(weightSums, sum)
}

choice := ep[len(weightSums)-1]
r := rnd.Float64() * sum
for i := range weightSums {
if weightSums[i] > r {
choice = ep[i]
break
}
}

return choice
}

func shiftToRemaining(rnd *rand.Rand, ctx *routing.LBContext, wi []int, wf []float64, now time.Time) routing.LBEndpoint {
notFadingIndexes := wi
ep := ctx.Route.LBEndpoints

// if all endpoints are fading, the simplest approach is to use the oldest,
// this departs from the desired curve, but guarantees monotonic fade-in. From
// the perspective of the oldest endpoint, this is temporarily the same as if
// there was no fade-in.
if len(notFadingIndexes) == 0 {
return shiftWeighted(rnd, ctx, wf, now)
}

// otherwise equally distribute between the old endpoints
return ep[notFadingIndexes[rnd.Intn(len(notFadingIndexes))]]
}

func withFadeIn(rnd *rand.Rand, ctx *routing.LBContext, notFadingIndexes []int, choice int, algo routing.LBAlgorithm) routing.LBEndpoint {
ep := ctx.Route.LBEndpoints
func goodEndpoint(rnd *rand.Rand, ep *routing.LBEndpoint, ctx *routing.LBContext) bool {
now := time.Now()
f := fadeIn(
now,
ctx.Route.LBFadeInDuration,
ctx.Route.LBFadeInExponent,
ctx.Route.LBEndpoints[choice].Detected,
ep.Detected,
)
goodFadeIn := f == 1 || rnd.Float64() < f

if rnd.Float64() < f {
return ep[choice]
}
for i := 0; i < len(ep); i++ {
if _, fadingIn := fadeInState(now, ctx.Route.LBFadeInDuration, ep[i].Detected); !fadingIn {
notFadingIndexes = append(notFadingIndexes, i)
}
failedReqs, totalReqs := (int64)(0), (int64)(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use the defaults:

var failedReqs, totalReqs int64

if ep.Metrics != nil {
failedReqs, totalReqs = ep.Metrics.GetFailedRequests()
}
goodHealthyEp := totalReqs <= 10 || float64(failedReqs)/float64(totalReqs) < 0.95 || rnd.Float64() < 0.05
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such constants look like good baseline values to me. How can I make them configurable in a good way?

Copy link
Member

@szuecs szuecs Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we would have to store them in a new type and you have no "pure" func, but a "method".
Then you would need to create a constructor called from proxy or skipper and pass through the config.


switch a := algo.(type) {
case *roundRobin:
return shiftToRemaining(a.rnd, ctx, notFadingIndexes, a.fadingWeights, now)
case *random:
return shiftToRemaining(a.rand, ctx, notFadingIndexes, a.fadingWeights, now)
case *consistentHash:
// If all endpoints are fading, normal consistent hash result
if len(notFadingIndexes) == 0 {
return ep[choice]
}
// otherwise calculate consistent hash again using endpoints which are not fading
return ep[a.chooseConsistentHashEndpoint(ctx, skipFadingEndpoints(notFadingIndexes))]
default:
return ep[choice]
}
return goodFadeIn && goodHealthyEp
}

type roundRobin struct {
mx sync.Mutex
index int
rnd *rand.Rand
notFadingIndexes []int
fadingWeights []float64
mx sync.Mutex
index int
rnd *rand.Rand
}

func newRoundRobin(endpoints []string) routing.LBAlgorithm {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec
return &roundRobin{
index: rnd.Intn(len(endpoints)),
rnd: rnd,

// preallocating frequently used slice
notFadingIndexes: make([]int, 0, len(endpoints)),
fadingWeights: make([]float64, 0, len(endpoints)),
}
}

Expand All @@ -170,30 +108,31 @@ func (r *roundRobin) Apply(ctx *routing.LBContext) routing.LBEndpoint {

r.mx.Lock()
defer r.mx.Unlock()
r.index = (r.index + 1) % len(ctx.Route.LBEndpoints)

if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[r.index]
endpoints := []routing.LBEndpoint{}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I avoid repetitions (this looks the same as in random and almost the same as in consistent-hast)?

for _, e := range ctx.Route.LBEndpoints {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apply is in the hotpath and it feels like instead of an index access to slice we have now a loop through all endpoints, so from O(1) to O(n). That sounds not very good. Do I miss something?

Copy link
Contributor

@lucastt lucastt Jun 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so from O(1) to O(n)

Indeed.

I feel like there is two options here, one is to take this off of the hot path, being updated independently through some go routine or access the slice the same way, but if the endpoint is not healthy we shift to the next and do the same evaluation. The second one has to have a stop condition to avoid unending loops. I personally think it would be better to have this in a separate go routine that updates this slice every x seconds.

e := e
if goodEndpoint(r.rnd, &e, ctx) {
endpoints = append(endpoints, e)
}
}
if len(endpoints) == 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What will happen if a very small number of endpoints is healthy? Shouldn't we have some kind of guardrail for this? In this case there is two possibilities:

  1. This algorithm sheds some load;
  2. This algorithm does not mind if skipper overload some healthy backends;

The first option is kinda breaking the separation of concerns of these features, I don't think an LB algorithm should shed load, it seems a bit out of place and this is not what the author chose to do, which I agree.

But there is some concern that if very few endpoints are healthy this would make the system collapse. So maybe this algorithm should only be available to routes with some form of load shedding strategy configured. I don't have a well formed opinion about it, what do you think?

Well, at least we should document this risk very well.

endpoints = ctx.Route.LBEndpoints
}
r.index = (r.index + 1) % len(endpoints)

return withFadeIn(r.rnd, ctx, r.notFadingIndexes, r.index, r)
return endpoints[r.index]
}

type random struct {
rand *rand.Rand
notFadingIndexes []int
fadingWeights []float64
rand *rand.Rand
}

func newRandom(endpoints []string) routing.LBAlgorithm {
t := time.Now().UnixNano()
// #nosec
return &random{
rand: rand.New(rand.NewSource(t)),

// preallocating frequently used slice
notFadingIndexes: make([]int, 0, len(endpoints)),
fadingWeights: make([]float64, 0, len(endpoints)),
}
}

Expand All @@ -203,12 +142,19 @@ func (r *random) Apply(ctx *routing.LBContext) routing.LBEndpoint {
return ctx.Route.LBEndpoints[0]
}

i := r.rand.Intn(len(ctx.Route.LBEndpoints))
if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[i]
endpoints := []routing.LBEndpoint{}
for _, e := range ctx.Route.LBEndpoints {
e := e
if goodEndpoint(r.rand, &e, ctx) {
endpoints = append(endpoints, e)
}
}
if len(endpoints) == 0 {
endpoints = ctx.Route.LBEndpoints
}
i := r.rand.Intn(len(endpoints))

return withFadeIn(r.rand, ctx, r.notFadingIndexes, i, r)
return endpoints[i]
}

type (
Expand All @@ -217,9 +163,9 @@ type (
hash uint64 // hash of endpoint
}
consistentHash struct {
hashRing []endpointHash // list of endpoints sorted by hash value
rand *rand.Rand
notFadingIndexes []int
hashRing []endpointHash // list of endpoints sorted by hash value
rand *rand.Rand
allHosts map[string]struct{}
}
)

Expand All @@ -232,15 +178,16 @@ func (ch *consistentHash) Swap(i, j int) {
func newConsistentHashInternal(endpoints []string, hashesPerEndpoint int) routing.LBAlgorithm {
rnd := rand.New(rand.NewSource(time.Now().UnixNano())) // #nosec
ch := &consistentHash{
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
rand: rnd,
notFadingIndexes: make([]int, 0, len(endpoints)),
hashRing: make([]endpointHash, hashesPerEndpoint*len(endpoints)),
rand: rnd,
allHosts: map[string]struct{}{},
}
for i, ep := range endpoints {
endpointStartIndex := hashesPerEndpoint * i
for j := 0; j < hashesPerEndpoint; j++ {
ch.hashRing[endpointStartIndex+j] = endpointHash{i, hash(fmt.Sprintf("%s-%d", ep, j))}
}
ch.allHosts[ep] = struct{}{}
}
sort.Sort(ch)
return ch
Expand Down Expand Up @@ -309,13 +256,22 @@ func (ch *consistentHash) Apply(ctx *routing.LBContext) routing.LBEndpoint {
return ctx.Route.LBEndpoints[0]
}

choice := ch.chooseConsistentHashEndpoint(ctx, noSkippedEndpoints)

if ctx.Route.LBFadeInDuration <= 0 {
return ctx.Route.LBEndpoints[choice]
hosts := map[string]struct{}{}
for _, e := range ctx.Route.LBEndpoints {
e := e
if goodEndpoint(ch.rand, &e, ctx) {
hosts[e.Host] = struct{}{}
}
}
if len(hosts) == 0 {
hosts = ch.allHosts
}

return withFadeIn(ch.rand, ctx, ch.notFadingIndexes, choice, ch)
choice := ch.chooseConsistentHashEndpoint(ctx, func(i int) bool {
_, ok := hosts[ctx.Route.LBEndpoints[i].Host]
return !ok
})
return ctx.Route.LBEndpoints[choice]
}

func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, skipEndpoint func(int) bool) int {
Expand All @@ -334,17 +290,6 @@ func (ch *consistentHash) chooseConsistentHashEndpoint(ctx *routing.LBContext, s
return choice
}

func skipFadingEndpoints(notFadingEndpoints []int) func(int) bool {
return func(i int) bool {
for _, notFadingEndpoint := range notFadingEndpoints {
if i == notFadingEndpoint {
return false
}
}
return true
}
}

func noSkippedEndpoints(_ int) bool {
return false
}
Expand Down
7 changes: 7 additions & 0 deletions proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -870,6 +870,13 @@ func (p *Proxy) makeBackendRequest(ctx *context, requestContext stdlibcontext.Co
req = injectClientTrace(req, ctx.proxySpan)

response, err := roundTripper.RoundTrip(req)
if endpoint != nil {
if err != nil {
endpoint.Metrics.AddFailedRequest()
} else {
endpoint.Metrics.AddSucceededRequest()
}
}

ctx.proxySpan.LogKV("http_roundtrip", EndEvent)
if err != nil {
Expand Down
32 changes: 31 additions & 1 deletion routing/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,9 @@ type RouteFilter struct {
// LBMetrics contains metrics used by LB algorithms
type LBMetrics struct {
inflightRequests int64

failedRequests int64
totalRequests int64
}

// IncInflightRequest increments the number of outstanding requests from the proxy to a given backend.
Expand All @@ -160,11 +163,38 @@ func (m *LBMetrics) DecInflightRequest() {
atomic.AddInt64(&m.inflightRequests, -1)
}

// GetInflightRequests decrements the number of outstanding requests from the proxy to a given backend.
// GetInflightRequests loads the number of outstanding requests from the proxy to a given backend.
func (m *LBMetrics) GetInflightRequests() int {
return int(atomic.LoadInt64(&m.inflightRequests))
}

// AddFailedRequest increments the counters related to failed requests and total requests from the proxy to a given backend.
func (m *LBMetrics) AddFailedRequest() {
atomic.AddInt64(&m.totalRequests, 1)
atomic.AddInt64(&m.failedRequests, 1)
}

func (m *LBMetrics) AddSucceededRequest() {
atomic.AddInt64(&m.totalRequests, 1)
}

func (m *LBMetrics) GetFailedRequests() (int64, int64) {
failedReqs := atomic.LoadInt64(&m.failedRequests)
totalReqs := atomic.LoadInt64(&m.totalRequests)

return failedReqs, totalReqs
}

// DecTotalRequest decrements the number of total requests from the proxy to a given backend.
func (m *LBMetrics) DecTotalRequest() {
atomic.AddInt64(&m.totalRequests, -1)
}

// GetTotalRequests loads the number of total requests from the proxy to a given backend.
func (m *LBMetrics) GetTotalRequests() int {
return int(atomic.LoadInt64(&m.totalRequests))
}

// LBEndpoint represents the scheme and the host of load balanced
// backends.
type LBEndpoint struct {
Expand Down