From ce00f4ee545af8d7b7519045c3422893e19d43bc Mon Sep 17 00:00:00 2001 From: bjrara Date: Mon, 17 Aug 2020 15:57:15 +0800 Subject: [PATCH 1/2] Fix issue in missing metrics of terminated requests --- .../k8s.io/apiserver/pkg/server/filters/BUILD | 9 + .../server/filters/priority-and-fairness.go | 11 + .../filters/priority-and-fairness_test.go | 261 ++++++++++++++++++ 3 files changed, 281 insertions(+) create mode 100644 staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD index a13e51bf319b..b93a095595b1 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/BUILD @@ -13,19 +13,28 @@ go_test( "cors_test.go", "goaway_test.go", "maxinflight_test.go", + "priority-and-fairness_test.go", "timeout_test.go", ], embed = [":go_default_library"], deps = [ + "//staging/src/k8s.io/api/flowcontrol/v1alpha1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/diff:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap:go_default_library", "//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/filters:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics:go_default_library", + "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", "//vendor/golang.org/x/net/http2:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 7d1be83f14d8..a8c945b77218 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "net/http" + "sync" "sync/atomic" fcv1a1 "k8s.io/api/flowcontrol/v1alpha1" @@ -62,6 +63,9 @@ var waitingMark = &requestWatermark{ mutatingObserver: fcmetrics.ReadWriteConcurrencyObserverPairGenerator.Generate(1, 1, []string{epmetrics.MutatingKind}).RequestsWaiting, } +// apfStartOnce is used to avoid sharing one-time mutex with maxinflight handler +var apfStartOnce sync.Once + var atomicMutatingExecuting, atomicReadOnlyExecuting int32 var atomicMutatingWaiting, atomicReadOnlyWaiting int32 @@ -78,6 +82,8 @@ func WithPriorityAndFairness( } startOnce.Do(func() { startRecordingUsage(watermark) + }) + apfStartOnce.Do(func() { startRecordingUsage(waitingMark) }) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -144,6 +150,11 @@ func WithPriorityAndFairness( } }, execute) if !served { + if isMutatingRequest { + epmetrics.DroppedRequests.WithLabelValues(epmetrics.MutatingKind).Inc() + } else { + epmetrics.DroppedRequests.WithLabelValues(epmetrics.ReadOnlyKind).Inc() + } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) tooManyRequests(r, w) } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go new file mode 100644 index 000000000000..91b381fb676d --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -0,0 +1,261 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "context" + "errors" + "fmt" + "net/http" + "net/http/httptest" + "testing" + "time" + + fctypesv1a1 "k8s.io/api/flowcontrol/v1alpha1" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apiserver/pkg/apis/flowcontrol/bootstrap" + "k8s.io/apiserver/pkg/authentication/user" + apifilters "k8s.io/apiserver/pkg/endpoints/filters" + epmetrics "k8s.io/apiserver/pkg/endpoints/metrics" + apirequest "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/server/mux" + utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" + fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" + fcmetrics "k8s.io/apiserver/pkg/util/flowcontrol/metrics" + "k8s.io/component-base/metrics/legacyregistry" +) + +const ( + decisionNoQueuingExecute = iota + decisionQueuingExecute + decisionCancelWait + decisionReject + decisionSkipFilter +) + +type fakeApfFilter struct { + mockDecision int + postEnqueue func() + postDequeue func() +} + +func (t fakeApfFilter) MaintainObservations(stopCh <-chan struct{}) { +} + +func (t fakeApfFilter) Handle(ctx context.Context, + requestDigest utilflowcontrol.RequestDigest, + noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration), + queueNoteFn fq.QueueNoteFn, + execFn func(), +) { + if t.mockDecision == decisionSkipFilter { + panic("Handle should not be invoked") + } + noteFn(bootstrap.SuggestedFlowSchemaGlobalDefault, bootstrap.SuggestedPriorityLevelConfigurationGlobalDefault) + switch t.mockDecision { + case decisionNoQueuingExecute: + execFn() + case decisionQueuingExecute: + queueNoteFn(true) + t.postEnqueue() + queueNoteFn(false) + t.postDequeue() + execFn() + case decisionCancelWait: + queueNoteFn(true) + t.postEnqueue() + queueNoteFn(false) + t.postDequeue() + case decisionReject: + return + } +} + +func (t fakeApfFilter) Run(stopCh <-chan struct{}) error { + return nil +} + +func (t fakeApfFilter) Install(c *mux.PathRecorderMux) { +} + +func newApfServer(decision int, t *testing.T) *httptest.Server { + requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} + longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) + + apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if decision == decisionCancelWait { + t.Errorf("execute should not be invoked") + } + if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { + t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) + } + }), longRunningRequestCheck, fakeApfFilter{ + mockDecision: decision, + postEnqueue: func() { + if atomicReadOnlyWaiting != 1 { + t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) + } + }, + postDequeue: func() { + if atomicReadOnlyWaiting != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + } + }, + }) + + handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(apirequest.WithUser(r.Context(), &user.DefaultInfo{ + Groups: []string{user.AllUnauthenticated}, + })) + apfHandler.ServeHTTP(w, r) + if atomicReadOnlyExecuting != 0 { + t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting) + } + }), requestInfoFactory) + + apfServer := httptest.NewServer(handler) + return apfServer +} + +func TestApfSkipLongRunningRequest(t *testing.T) { + epmetrics.Register() + + server := newApfServer(decisionSkipFilter, t) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { + // request should not be rejected + t.Error(err) + } +} + +func TestApfRejectRequest(t *testing.T) { + epmetrics.Register() + + server := newApfServer(decisionReject, t) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { + t.Error(err) + } + + checkForExpectedMetricsWithRetry(t, []string{ + "apiserver_request_terminations_total", + "apiserver_dropped_requests_total", + }) +} + +func TestApfExemptRequest(t *testing.T) { + epmetrics.Register() + fcmetrics.Register() + + // wait the first sampleAndWaterMark metrics to be collected + time.Sleep(time.Millisecond * 50) + + server := newApfServer(decisionNoQueuingExecute, t) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { + t.Error(err) + } + + checkForExpectedMetricsWithRetry(t, []string{ + "apiserver_current_inflight_requests", + "apiserver_flowcontrol_read_vs_write_request_count_watermarks", + "apiserver_flowcontrol_read_vs_write_request_count_samples", + }) +} + +func TestApfExecuteRequest(t *testing.T) { + epmetrics.Register() + fcmetrics.Register() + + // wait the first sampleAndWaterMark metrics to be collected + time.Sleep(time.Millisecond * 50) + + server := newApfServer(decisionQueuingExecute, t) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { + t.Error(err) + } + + checkForExpectedMetricsWithRetry(t, []string{ + "apiserver_current_inflight_requests", + "apiserver_current_inqueue_requests", + "apiserver_flowcontrol_read_vs_write_request_count_watermarks", + "apiserver_flowcontrol_read_vs_write_request_count_samples", + }) +} + +func TestApfCancelWaitRequest(t *testing.T) { + epmetrics.Register() + + server := newApfServer(decisionCancelWait, t) + defer server.Close() + + if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { + t.Error(err) + } + + checkForExpectedMetricsWithRetry(t, []string{ + "apiserver_current_inflight_requests", + "apiserver_request_terminations_total", + "apiserver_dropped_requests_total", + }) +} + +// wait async metrics to be collected +func checkForExpectedMetricsWithRetry(t *testing.T, expectedMetrics []string) { + maxRetries := 5 + var checkErrors []error + for i := 0; i < maxRetries; i++ { + t.Logf("Check for expected metrics with retry %d", i) + metricsFamily, err := legacyregistry.DefaultGatherer.Gather() + if err != nil { + t.Fatalf("Failed to gather metrics %v", err) + } + + metrics := map[string]interface{}{} + for _, mf := range metricsFamily { + mf := mf + metrics[*mf.Name] = mf + } + + checkErrors = checkForExpectedMetrics(expectedMetrics, metrics) + if checkErrors == nil { + return + } + + time.Sleep(1 * time.Second) + } + for _, checkError := range checkErrors { + t.Error(checkError) + } +} + +func checkForExpectedMetrics(expectedMetrics []string, metrics map[string]interface{}) []error { + var errs []error + for _, metricName := range expectedMetrics { + if _, ok := metrics[metricName]; !ok { + if !ok { + errs = append(errs, errors.New("Scraped metrics did not include expected metric "+metricName)) + } + } + } + return errs +} From 833ce487b9fab1650d5aaba2a8b295f8a90e07bd Mon Sep 17 00:00:00 2001 From: bjrara Date: Fri, 9 Oct 2020 16:51:19 +0800 Subject: [PATCH 2/2] Add multi request test --- .../filters/priority-and-fairness_test.go | 139 ++++++++++++++---- 1 file changed, 114 insertions(+), 25 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go index 91b381fb676d..d2e130b35760 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness_test.go @@ -22,6 +22,7 @@ import ( "fmt" "net/http" "net/http/httptest" + "sync" "testing" "time" @@ -39,8 +40,10 @@ import ( "k8s.io/component-base/metrics/legacyregistry" ) +type mockDecision int + const ( - decisionNoQueuingExecute = iota + decisionNoQueuingExecute mockDecision = iota decisionQueuingExecute decisionCancelWait decisionReject @@ -48,7 +51,7 @@ const ( ) type fakeApfFilter struct { - mockDecision int + mockDecision mockDecision postEnqueue func() postDequeue func() } @@ -92,29 +95,41 @@ func (t fakeApfFilter) Run(stopCh <-chan struct{}) error { func (t fakeApfFilter) Install(c *mux.PathRecorderMux) { } -func newApfServer(decision int, t *testing.T) *httptest.Server { - requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} - longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) - - apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { +func newApfServerWithSingleRequest(decision mockDecision, t *testing.T) *httptest.Server { + onExecuteFunc := func() { if decision == decisionCancelWait { t.Errorf("execute should not be invoked") } + // atomicReadOnlyExecuting can be either 0 or 1 as we test one request at a time. if decision != decisionSkipFilter && atomicReadOnlyExecuting != 1 { t.Errorf("Wanted %d requests executing, got %d", 1, atomicReadOnlyExecuting) } + } + postExecuteFunc := func() {} + // atomicReadOnlyWaiting can be either 0 or 1 as we test one request at a time. + postEnqueueFunc := func() { + if atomicReadOnlyWaiting != 1 { + t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) + } + } + postDequeueFunc := func() { + if atomicReadOnlyWaiting != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + } + } + return newApfServerWithHooks(decision, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) +} + +func newApfServerWithHooks(decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func(), t *testing.T) *httptest.Server { + requestInfoFactory := &apirequest.RequestInfoFactory{APIPrefixes: sets.NewString("apis", "api"), GrouplessAPIPrefixes: sets.NewString("api")} + longRunningRequestCheck := BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString("proxy")) + + apfHandler := WithPriorityAndFairness(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + onExecute() }), longRunningRequestCheck, fakeApfFilter{ mockDecision: decision, - postEnqueue: func() { - if atomicReadOnlyWaiting != 1 { - t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) - } - }, - postDequeue: func() { - if atomicReadOnlyWaiting != 0 { - t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) - } - }, + postEnqueue: postEnqueue, + postDequeue: postDequeue, }) handler := apifilters.WithRequestInfo(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -122,6 +137,7 @@ func newApfServer(decision int, t *testing.T) *httptest.Server { Groups: []string{user.AllUnauthenticated}, })) apfHandler.ServeHTTP(w, r) + postExecute() if atomicReadOnlyExecuting != 0 { t.Errorf("Wanted %d requests executing, got %d", 0, atomicReadOnlyExecuting) } @@ -134,9 +150,10 @@ func newApfServer(decision int, t *testing.T) *httptest.Server { func TestApfSkipLongRunningRequest(t *testing.T) { epmetrics.Register() - server := newApfServer(decisionSkipFilter, t) + server := newApfServerWithSingleRequest(decisionSkipFilter, t) defer server.Close() + // send a watch request to test skipping long running request if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces?watch=true", server.URL), http.StatusOK); err != nil { // request should not be rejected t.Error(err) @@ -146,7 +163,7 @@ func TestApfSkipLongRunningRequest(t *testing.T) { func TestApfRejectRequest(t *testing.T) { epmetrics.Register() - server := newApfServer(decisionReject, t) + server := newApfServerWithSingleRequest(decisionReject, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { @@ -163,10 +180,11 @@ func TestApfExemptRequest(t *testing.T) { epmetrics.Register() fcmetrics.Register() - // wait the first sampleAndWaterMark metrics to be collected + // Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator, + // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServer(decisionNoQueuingExecute, t) + server := newApfServerWithSingleRequest(decisionNoQueuingExecute, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { @@ -184,10 +202,11 @@ func TestApfExecuteRequest(t *testing.T) { epmetrics.Register() fcmetrics.Register() - // wait the first sampleAndWaterMark metrics to be collected + // Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator, + // so that an observation will cause some data to go into the Prometheus metrics. time.Sleep(time.Millisecond * 50) - server := newApfServer(decisionQueuingExecute, t) + server := newApfServerWithSingleRequest(decisionQueuingExecute, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK); err != nil { @@ -202,10 +221,81 @@ func TestApfExecuteRequest(t *testing.T) { }) } +func TestApfExecuteMultipleRequests(t *testing.T) { + epmetrics.Register() + fcmetrics.Register() + + // Wait for at least one sampling window to pass since creation of metrics.ReadWriteConcurrencyObserverPairGenerator, + // so that an observation will cause some data to go into the Prometheus metrics. + time.Sleep(time.Millisecond * 50) + + concurrentRequests := 5 + var preStartExecute, postStartExecute, preEnqueue, postEnqueue, preDequeue, postDequeue, finishExecute sync.WaitGroup + for _, wg := range []*sync.WaitGroup{&preStartExecute, &postStartExecute, &preEnqueue, &postEnqueue, &preDequeue, &postDequeue, &finishExecute} { + wg.Add(concurrentRequests) + } + + onExecuteFunc := func() { + preStartExecute.Done() + preStartExecute.Wait() + if int(atomicReadOnlyExecuting) != concurrentRequests { + t.Errorf("Wanted %d requests executing, got %d", concurrentRequests, atomicReadOnlyExecuting) + } + postStartExecute.Done() + postStartExecute.Wait() + } + + postEnqueueFunc := func() { + preEnqueue.Done() + preEnqueue.Wait() + if int(atomicReadOnlyWaiting) != concurrentRequests { + t.Errorf("Wanted %d requests in queue, got %d", 1, atomicReadOnlyWaiting) + + } + postEnqueue.Done() + postEnqueue.Wait() + } + + postDequeueFunc := func() { + preDequeue.Done() + preDequeue.Wait() + if atomicReadOnlyWaiting != 0 { + t.Errorf("Wanted %d requests in queue, got %d", 0, atomicReadOnlyWaiting) + } + postDequeue.Done() + postDequeue.Wait() + } + + postExecuteFunc := func() { + finishExecute.Done() + finishExecute.Wait() + } + + server := newApfServerWithHooks(decisionQueuingExecute, onExecuteFunc, postExecuteFunc, postEnqueueFunc, postDequeueFunc, t) + defer server.Close() + + for i := 0; i < concurrentRequests; i++ { + var err error + go func() { + err = expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusOK) + }() + if err != nil { + t.Error(err) + } + } + + checkForExpectedMetricsWithRetry(t, []string{ + "apiserver_current_inflight_requests", + "apiserver_current_inqueue_requests", + "apiserver_flowcontrol_read_vs_write_request_count_watermarks", + "apiserver_flowcontrol_read_vs_write_request_count_samples", + }) +} + func TestApfCancelWaitRequest(t *testing.T) { epmetrics.Register() - server := newApfServer(decisionCancelWait, t) + server := newApfServerWithSingleRequest(decisionCancelWait, t) defer server.Close() if err := expectHTTPGet(fmt.Sprintf("%s/api/v1/namespaces/default", server.URL), http.StatusTooManyRequests); err != nil { @@ -232,7 +322,6 @@ func checkForExpectedMetricsWithRetry(t *testing.T, expectedMetrics []string) { metrics := map[string]interface{}{} for _, mf := range metricsFamily { - mf := mf metrics[*mf.Name] = mf }