Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCPBUGS-18971: limit number of simultaneous client requests #76

Merged
merged 3 commits into from
Oct 9, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion pkg/client/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,13 @@ func (c *httpAPIClient) Do(ctx context.Context, verb, endpoint string, query url

// NewGenericAPIClient builds a new generic Prometheus API client for the given base URL and HTTP Client.
func NewGenericAPIClient(client *http.Client, baseURL *url.URL, headers http.Header) GenericAPIClient {
return &httpAPIClient{
c := &httpAPIClient{
client: client,
baseURL: baseURL,
headers: headers,
}

return newRequestLimiter(c)
}

const (
Expand Down
86 changes: 86 additions & 0 deletions pkg/client/limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"net/url"

"k8s.io/component-base/metrics"
"k8s.io/component-base/metrics/legacyregistry"
)

// Without a limit, the adapter could flood the Prometheus API with many
// requests when there are many pods running in the cluster because a query for
// getting all pod metrics would translate into (2 x pod number) queries to the
simonpasquier marked this conversation as resolved.
Show resolved Hide resolved
// Prometheus API.
// In the worst case, the Prometheus pods can hit their SOMAXCONN limit leading
simonpasquier marked this conversation as resolved.
Show resolved Hide resolved
// to timed-out requests from other clients. In particular it has been reported
// that it could fail the Kubelet liveness probes and lead to Prometheus pod
// restarts.
// The number has been chosen from empirical data.
const maxConcurrentRequests = 100

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no power of 2, I'm disappointed :)

At worst a Prometheus pod will serve both prom-adapter (if the other is down or LB missed it) or even 3 or 4 in some weird rolling scenarios which results in 4XX connections < --web.max-connections=512, which leaves room in that queue and the other ones for the other clients.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While testing, I've realized that even with 2 prometheus adapter pods and 2 Prometheus pods running, it can be that all adapter requests go to the same Prometheus because the service is configured with client IP affinity.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, had some similar cases as well while testing, didn't dig into how LB is set.
I think 100 is a good value.


var (
inflightRequests = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: "prometheus_adapter",
Subsystem: "prometheus_client",
Name: "inflight_requests",
Help: "Number of inflight requests to the Prometheus service",
})

maxRequests = metrics.NewGauge(
&metrics.GaugeOpts{
Namespace: "prometheus_adapter",
Subsystem: "prometheus_client",
Name: "max_requests",
Help: "Maximum number of requests to the Prometheus service",
})
)

func init() {
legacyregistry.MustRegister(inflightRequests, maxRequests)
maxRequests.Set(maxConcurrentRequests)
}

type requestLimitClient struct {
c GenericAPIClient
inflight chan struct{}
}

func newRequestLimiter(c GenericAPIClient) GenericAPIClient {
return &requestLimitClient{
c: c,
inflight: make(chan struct{}, maxConcurrentRequests),
}
}

func (c *requestLimitClient) Do(ctx context.Context, verb, endpoint string, query url.Values) (APIResponse, error) {
select {
case c.inflight <- struct{}{}:
inflightRequests.Inc()
defer func() {
inflightRequests.Dec()
<-c.inflight
}()
case <-ctx.Done():
return APIResponse{}, ctx.Err()
}

return c.c.Do(ctx, verb, endpoint, query)
}
97 changes: 97 additions & 0 deletions pkg/client/limiter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package client

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"sync"
"sync/atomic"
"testing"
"time"
)

func TestRequestLimitClient(t *testing.T) {
var (
ctx = context.Background()
total atomic.Int64
unblock = make(chan struct{})
)
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
total.Add(1)

w.Write([]byte("{}"))
if r.URL.Path == "/nonblocking" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Knowing that c.Do(ctx2, "GET", "/nonblocking", nil) would never reach this, why keeping it? to make the test easier to debug?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To ensure that the test catches the issue when/if c.Do(ctx2, "GET", "/nonblocking", nil) isn't blocked on the client side.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without the /nonblocking , in that case the test would be stuck as long as unblock isn't closed.
But ok this would make the test easier to debug

return
}

// Requests will be blocked until the test closes the unblock channel.
<-unblock
}))
defer srv.Close()

// Make as many requests as the max allowed number + 1.
var (
wg sync.WaitGroup
errChan = make(chan error, maxConcurrentRequests+1)
u, _ = url.Parse(srv.URL)
c = NewGenericAPIClient(&http.Client{}, u, nil)
)
for i := 0; i < maxConcurrentRequests+1; i++ {
wg.Add(1)
go func(i int) {
defer wg.Done()

_, err := c.Do(ctx, "GET", "/", nil)
if err != nil {
err = fmt.Errorf("request #%d: %w", i, err)
}
errChan <- err
}(i)
}

// Wait for the unblocked requests to hit the server.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Wait for the unblocked requests to hit the server.
// Wait for the first maxConcurrentRequests requests to hit the server.

To avoid confusion with /nonblocking.

for total.Load() != maxConcurrentRequests {
}

// Make one more blocked request which should timeout before hitting the server.
ctx2, _ := context.WithTimeout(ctx, time.Second)
_, err := c.Do(ctx2, "GET", "/nonblocking", nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check that it returns an APIResponse{} as well?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since it returns an error, the first returned value is irrelevant IMHO.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a safeguard, in case the receiver doesn't start by checking the error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure to understand :)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we assume the function calling this Do to not use the result if there is an error, but what if that function doesn't check the error and uses a faulty response that Do returns.

switch {
case err == nil:
t.Fatalf("expected %dth request to fail", maxConcurrentRequests+2)
case ctx2.Err() == nil:
t.Fatalf("expected %dth request to timeout", maxConcurrentRequests+2)
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can also add a check total.Load() == maxConcurrentRequests in here that proves that will also prove that the query doesn't reach the server and also that the equality in line 71 wasn't just transient.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it should be detected by the Do() request returning without error.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client/Do function may be faulty, it's better to check from server side as well. (+ check that the for total.Load() != maxConcurrentRequests check wasn't transient)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The total.Load() != maxConcurrentRequests can't be transient because the test spawns exactly 100 connections then waits for total == 100. Only after that, it starts the request to /nonblocking.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it spawns maxConcurrentRequests + 1 = 101 (for i := 0; i < maxConcurrentRequests+1; i++) IIUC.

// Release all inflight requests.
close(unblock)

// Wait for all requests to complete.
wg.Wait()

// Check that no error was returned.
close(errChan)
for err := range errChan {
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
}