Skip to content

Commit

Permalink
phc: add a metric counter for endpoints that opted out before load ba…
Browse files Browse the repository at this point in the history
…lancing (#3035)

Signed-off-by: Mustafa Abdelrahman <mustafa.abdelrahman@zalando.de>
  • Loading branch information
MustafaSaber committed Apr 24, 2024
1 parent b5b0a32 commit a45de70
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 25 deletions.
6 changes: 6 additions & 0 deletions docs/operation/operation.md
Expand Up @@ -921,6 +921,12 @@ The parameters of `-passive-health-check` option are:
+ `max-drop-probabilty=<float more than/equal to 0 and less than/equal to 1>` - the maximum possible probability of unhealthy endpoint being not considered
while choosing the endpoint for the given request

### Metrics

A set of metrics will be exposed to track passive health check:

* `passive-health-check.endpoints.dropped`: Number of all endpoints dropped before load balancing a request, so after N requests and M endpoints are being dropped this counter would be N*M.

## Memory consumption

While Skipper is generally not memory bound, some features may require
Expand Down
4 changes: 3 additions & 1 deletion proxy/healthy_endpoints.go
Expand Up @@ -3,6 +3,7 @@ package proxy
import (
"math/rand"

"github.com/zalando/skipper/metrics"
"github.com/zalando/skipper/routing"
)

Expand All @@ -11,7 +12,7 @@ type healthyEndpoints struct {
endpointRegistry *routing.EndpointRegistry
}

func (h *healthyEndpoints) filterHealthyEndpoints(ctx *context, endpoints []routing.LBEndpoint) []routing.LBEndpoint {
func (h *healthyEndpoints) filterHealthyEndpoints(ctx *context, endpoints []routing.LBEndpoint, metrics metrics.Metrics) []routing.LBEndpoint {
if h == nil {
return endpoints
}
Expand All @@ -24,6 +25,7 @@ func (h *healthyEndpoints) filterHealthyEndpoints(ctx *context, endpoints []rout
if p < dropProbability {
ctx.Logger().Infof("Dropping endpoint %q due to passive health check: p=%0.2f, dropProbability=%0.2f",
e.Host, p, dropProbability)
metrics.IncCounter("passive-health-check.endpoints.dropped")
} else {
filtered = append(filtered, e)
}
Expand Down
37 changes: 17 additions & 20 deletions proxy/healthy_endpoints_test.go
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/zalando/skipper/metrics/metricstest"
"github.com/zalando/skipper/routing"
)

Expand Down Expand Up @@ -47,18 +48,23 @@ func sendGetRequests(t *testing.T, ps *httptest.Server) (failed int) {
return
}

func setupProxy(t *testing.T, doc string) (*testProxy, *httptest.Server) {
func setupProxy(t *testing.T, doc string) (*metricstest.MockMetrics, *httptest.Server) {
endpointRegistry := defaultEndpointRegistry()
m := &metricstest.MockMetrics{}

tp, err := newTestProxyWithParams(doc, Params{
EnablePassiveHealthCheck: true,
EndpointRegistry: endpointRegistry,
Metrics: m,
})
require.NoError(t, err)

ps := httptest.NewServer(tp.proxy)

return tp, ps
t.Cleanup(tp.close)
t.Cleanup(ps.Close)

return m, ps
}

func TestPHCWithoutRequests(t *testing.T) {
Expand All @@ -73,10 +79,8 @@ func TestPHCWithoutRequests(t *testing.T) {

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
tp, ps := setupProxy(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
_, ps := setupProxy(t, fmt.Sprintf(`* -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
defer tp.close()
defer ps.Close()
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand All @@ -87,10 +91,8 @@ func TestPHCWithoutRequests(t *testing.T) {
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
tp, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
defer tp.close()
defer ps.Close()
rsp := sendGetRequest(t, ps, 0)
assert.Equal(t, http.StatusOK, rsp.StatusCode)
rsp.Body.Close()
Expand Down Expand Up @@ -136,20 +138,16 @@ func TestPHCForMultipleHealthyEndpoints(t *testing.T) {

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
tp, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
defer tp.close()
defer ps.Close()
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
tp, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
_, ps := setupProxy(t, fmt.Sprintf(`* -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
defer tp.close()
defer ps.Close()
failedReqs := sendGetRequests(t, ps)
assert.Equal(t, 0, failedReqs)
})
Expand All @@ -172,20 +170,19 @@ func TestPHCForMultipleHealthyAndOneUnhealthyEndpoints(t *testing.T) {

for _, algorithm := range []string{"random", "consistentHash", "roundRobin", "powerOfRandomNChoices"} {
t.Run(algorithm, func(t *testing.T) {
tp, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
mockMetrics, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> <%s, "%s", "%s", "%s">`,
algorithm, services[0].URL, services[1].URL, services[2].URL))
defer tp.close()
defer ps.Close()
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
mockMetrics.WithCounters(func(counters map[string]int64) {
assert.InDelta(t, float64(nRequests), float64(counters["passive-health-check.endpoints.dropped"]), 0.3*float64(nRequests)) // allow 30% error
})
})
}

t.Run("consistent hash with balance factor", func(t *testing.T) {
tp, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
_, ps := setupProxy(t, fmt.Sprintf(`* -> backendTimeout("5ms") -> consistentHashKey("${request.header.ConsistentHashKey}") -> consistentHashBalanceFactor(1.25) -> <consistentHash, "%s", "%s", "%s">`,
services[0].URL, services[1].URL, services[2].URL))
defer tp.close()
defer ps.Close()
failedReqs := sendGetRequests(t, ps)
assert.InDelta(t, 0, failedReqs, 0.1*float64(nRequests))
})
Expand Down
6 changes: 2 additions & 4 deletions proxy/proxy.go
Expand Up @@ -540,7 +540,7 @@ func (p *Proxy) selectEndpoint(ctx *context) *routing.LBEndpoint {
rt := ctx.route
endpoints := rt.LBEndpoints
endpoints = p.fadein.filterFadeIn(endpoints, rt)
endpoints = p.heathlyEndpoints.filterHealthyEndpoints(ctx, endpoints)
endpoints = p.heathlyEndpoints.filterHealthyEndpoints(ctx, endpoints, p.metrics)

lbctx := &routing.LBContext{
Request: ctx.request,
Expand Down Expand Up @@ -1193,9 +1193,7 @@ func (p *Proxy) do(ctx *context, parentSpan ot.Span) (err error) {
if err := p.do(loopCTX, loopSpan); err != nil {
// in case of error we have to copy the response in this recursion unwinding
ctx.response = loopCTX.response
if err != nil {
p.applyFiltersOnError(ctx, processedFilters)
}
p.applyFiltersOnError(ctx, processedFilters)
return err
}

Expand Down

0 comments on commit a45de70

Please sign in to comment.