Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APF e2e test: wait for steady state before proceeding #96984

Merged
merged 1 commit into from Jan 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions test/e2e/apimachinery/BUILD
Expand Up @@ -75,6 +75,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/apihelpers:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cli-runtime/pkg/printers:go_default_library",
"//staging/src/k8s.io/client-go/discovery:go_default_library",
Expand Down
80 changes: 65 additions & 15 deletions test/e2e/apimachinery/flowcontrol.go
Expand Up @@ -19,6 +19,7 @@ package apimachinery
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -32,14 +33,21 @@ import (

flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/util/apihelpers"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/test/e2e/framework"
)

const (
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
requestConcurrencyLimitMetricLabelName = "priority_level"
requestConcurrencyLimitMetricName = "apiserver_flowcontrol_request_concurrency_limit"
priorityLevelLabelName = "priority_level"
)

var (
errPriorityLevelNotFound = errors.New("cannot find a metric sample with a matching priority level name label")
)

var _ = SIGDescribe("API priority and fairness", func() {
Expand All @@ -59,6 +67,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
defer cleanup()

ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
waitForSteadyState(f, testingFlowSchemaName, testingPriorityLevelName)

var response *http.Response
ginkgo.By("response headers should contain the UID of the appropriate FlowSchema and PriorityLevelConfiguration for a matching user")
response = makeRequest(f, matchingUsername)
Expand Down Expand Up @@ -126,11 +137,15 @@ var _ = SIGDescribe("API priority and fairness", func() {
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
defer cleanup()

ginkgo.By("waiting for testing FlowSchema and PriorityLevelConfiguration to reach steady state")
waitForSteadyState(f, clients[i].flowSchemaName, clients[i].priorityLevelName)
}

ginkgo.By("getting request concurrency from metrics")
for i := range clients {
realConcurrency := getPriorityLevelConcurrency(f, clients[i].priorityLevelName)
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, clients[i].priorityLevelName)
framework.ExpectNoError(err)
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
if clients[i].concurrency < 1 {
clients[i].concurrency = 1
Expand Down Expand Up @@ -185,6 +200,9 @@ var _ = SIGDescribe("API priority and fairness", func() {
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
defer cleanup()

ginkgo.By("waiting for testing flow schema and priority level to reach steady state")
waitForSteadyState(f, flowSchemaName, priorityLevelName)

type client struct {
username string
qps float64
Expand All @@ -199,7 +217,8 @@ var _ = SIGDescribe("API priority and fairness", func() {
}

framework.Logf("getting real concurrency")
realConcurrency := getPriorityLevelConcurrency(f, priorityLevelName)
realConcurrency, err := getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
framework.ExpectNoError(err)
for i := range clients {
clients[i].concurrency = int32(float64(realConcurrency) * clients[i].concurrencyMultiplier)
if clients[i].concurrency < 1 {
Expand Down Expand Up @@ -259,33 +278,35 @@ func createPriorityLevel(f *framework.Framework, priorityLevelName string, assur
}
}

//lint:ignore U1000 function is actually referenced
func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName string) int32 {
resp, err := f.ClientSet.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
framework.ExpectNoError(err)
func getPriorityLevelConcurrency(c clientset.Interface, priorityLevelName string) (int32, error) {
resp, err := c.CoreV1().RESTClient().Get().RequestURI("/metrics").DoRaw(context.TODO())
if err != nil {
return 0, err
}
sampleDecoder := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(bytes.NewBuffer(resp), expfmt.FmtText),
Opts: &expfmt.DecodeOptions{},
}
for {
var v model.Vector
err := sampleDecoder.Decode(&v)
if err == io.EOF {
break
if err != nil {
if err == io.EOF {
break
}
return 0, err
}
framework.ExpectNoError(err)
for _, metric := range v {
if string(metric.Metric[model.MetricNameLabel]) != requestConcurrencyLimitMetricName {
continue
}
if string(metric.Metric[requestConcurrencyLimitMetricLabelName]) != priorityLevelName {
if string(metric.Metric[priorityLevelLabelName]) != priorityLevelName {
continue
}
return int32(metric.Value)
return int32(metric.Value), nil
}
}
framework.ExpectNoError(fmt.Errorf("cannot find metric %q with matching priority level name label %q", requestConcurrencyLimitMetricName, priorityLevelName))
return 0
return 0, errPriorityLevelNotFound
}

// createFlowSchema creates a flow schema referring to a particular priority
Expand Down Expand Up @@ -335,6 +356,35 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
}
}

// waitForSteadyState repeatedly polls the API server to check if the newly
// created flow schema and priority level have been seen by the APF controller
// by checking: (1) the dangling priority level reference condition in the flow
// schema status, and (2) metrics. The function times out after 30 seconds.
func waitForSteadyState(f *framework.Framework, flowSchemaName string, priorityLevelName string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Returning an error from this function and letting callers to handle them as they want would be a bit cleaner.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is meant by "cleaner" here? The suggested change would make the code more complex.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will make it a bit longer - I don't agree it's more complex.

I actually think that if something can return an error, I want to leave the decision to the called how they handle it, rather the hiding that behavior below.
As an example - the error returned FlowSchemas.Get() - I can easily imagine a caller may want to retry that in some cases. Another example is that I may want to completely differently handle the case of error from API call and differently a timeout from waiting for reconciliation.
Making a decision for the called how they should do that is not the best one.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that kind of thinking for non-test code. For test code, it seems like unnecessary generality to me --- we do not have an open world of code that will be calling the test code.

I am not sure what complexity measure you are thinking of. I think it is simpler to wrap up things as they are. Returning an error requires duplicating the error-handling at all the call sites, and in this case they all want the same handling. It is only test code; if some day we find the test code wants different error handling in different tests, we can refactor then.

But this is a pretty small detail; I can live with the suggested change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to change, but it's not something I'm going to fight until death too :)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another way to do this is to define the checking function, and let the caller invoke via the framework.ExpectNoError(wait.Poll(...)) line. Given the number and uniformity of the invocations, though, I think that's not necessary yet.

We can always change it later, let's leave it for now.

framework.ExpectNoError(wait.Poll(time.Second, 30*time.Second, func() (bool, error) {
fs, err := f.ClientSet.FlowcontrolV1beta1().FlowSchemas().Get(context.TODO(), flowSchemaName, metav1.GetOptions{})
if err != nil {
return false, err
}
condition := apihelpers.GetFlowSchemaConditionByType(fs, flowcontrol.FlowSchemaConditionDangling)
if condition == nil || condition.Status != flowcontrol.ConditionFalse {
// The absence of the dangling status object implies that the APF
// controller isn't done with syncing the flow schema object. And, of
// course, the condition being anything but false means that steady state
// hasn't been achieved.
return false, nil
adtac marked this conversation as resolved.
Show resolved Hide resolved
}
_, err = getPriorityLevelConcurrency(f.ClientSet, priorityLevelName)
if err != nil {
if err == errPriorityLevelNotFound {
return false, nil
}
return false, err
}
return true, nil
}))
}

// makeRequests creates a request to the API server and returns the response.
func makeRequest(f *framework.Framework, username string) *http.Response {
config := f.ClientConfig()
Expand Down