Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tweak up flaking end-to-end tests of API Priority and Fairness #96874

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion test/e2e/apimachinery/BUILD
Expand Up @@ -71,7 +71,6 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/version:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/watch:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/authentication/user:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/features:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/storage/names:go_default_library",
Expand All @@ -83,6 +82,7 @@ go_library(
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/util/cert:go_default_library",
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/client-go/util/keyutil:go_default_library",
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
Expand Down
85 changes: 43 additions & 42 deletions test/e2e/apimachinery/flowcontrol.go
Expand Up @@ -32,8 +32,8 @@ import (

flowcontrol "k8s.io/api/flowcontrol/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/client-go/rest"
clientsideflowcontrol "k8s.io/client-go/util/flowcontrol"
"k8s.io/kubernetes/test/e2e/framework"
)

Expand All @@ -43,7 +43,7 @@ const (
)

var _ = SIGDescribe("API priority and fairness", func() {
f := framework.NewDefaultFramework("flowschemas")
f := framework.NewDefaultFramework("apf")

ginkgo.It("should ensure that requests can be classified by testing flow-schemas/priority-levels", func() {
testingFlowSchemaName := "e2e-testing-flowschema"
Expand All @@ -56,7 +56,7 @@ var _ = SIGDescribe("API priority and fairness", func() {
defer cleanup()

ginkgo.By("creating a testing flowschema")
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, matchingUsername)
createdFlowSchema, cleanup := createFlowSchema(f, testingFlowSchemaName, 1000, testingPriorityLevelName, []string{matchingUsername})
defer cleanup()

ginkgo.By("checking response headers contain flow-schema/priority-level uid")
Expand All @@ -74,27 +74,31 @@ var _ = SIGDescribe("API priority and fairness", func() {
// higher QPS client cannot drown out the other one despite having higher
// priority.
ginkgo.It("should ensure that requests can't be drowned out (priority)", func() {
flowSchemaNamePrefix := "e2e-testing-flowschema"
priorityLevelNamePrefix := "e2e-testing-prioritylevel"
flowSchemaNamePrefix := "e2e-testing-flowschema-" + f.UniqueName
priorityLevelNamePrefix := "e2e-testing-prioritylevel-" + f.UniqueName
loadDuration := 10 * time.Second
highQPSClientName := "highqps-" + f.UniqueName
lowQPSClientName := "lowqps-" + f.UniqueName

type client struct {
username string
qps float64
priorityLevelName string
concurrencyMultiplier float64
concurrency int32
flowSchemaName string
matchingPrecedence int32
completedRequests int32
username string
qps float64
priorityLevelName string
concurrencyMultiplier float64
concurrency int32
flowSchemaName string
matchingPrecedence int32
completedRequests int32
expectedCompletedPercentage float64
}
clients := []client{
// "highqps" refers to a client that creates requests at a much higher
// QPS than its counter-part and well above its concurrency share limit.
// In contrast, "lowqps" stays under its concurrency shares.
// Additionally, the "highqps" client also has a higher matching
// precedence for its flow schema.
{username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0, matchingPrecedence: 999},
{username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5, matchingPrecedence: 1000},
{username: highQPSClientName, qps: 90, concurrencyMultiplier: 2.0, matchingPrecedence: 999, expectedCompletedPercentage: 0.90},
{username: lowQPSClientName, qps: 4, concurrencyMultiplier: 0.5, matchingPrecedence: 1000, expectedCompletedPercentage: 0.90},
}

ginkgo.By("creating test priority levels and flow schemas")
Expand All @@ -106,7 +110,7 @@ var _ = SIGDescribe("API priority and fairness", func() {

clients[i].flowSchemaName = fmt.Sprintf("%s-%s", flowSchemaNamePrefix, clients[i].username)
framework.Logf("creating FlowSchema %q", clients[i].flowSchemaName)
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, clients[i].username)
_, cleanup = createFlowSchema(f, clients[i].flowSchemaName, clients[i].matchingPrecedence, clients[i].priorityLevelName, []string{clients[i].username})
defer cleanup()
}

Expand All @@ -117,7 +121,7 @@ var _ = SIGDescribe("API priority and fairness", func() {
if clients[i].concurrency < 1 {
clients[i].concurrency = 1
}
framework.Logf("request concurrency for %q will be %d (concurrency share = %d)", clients[i].username, clients[i].concurrency, realConcurrency)
framework.Logf("request concurrency for %q will be %d (that is %d times client multiplier)", clients[i].username, clients[i].concurrency, realConcurrency)
}

ginkgo.By(fmt.Sprintf("starting uniform QPS load for %s", loadDuration.String()))
Expand All @@ -135,11 +139,11 @@ var _ = SIGDescribe("API priority and fairness", func() {
ginkgo.By("checking completed requests with expected values")
for _, client := range clients {
// Each client should have 95% of its ideal number of completed requests.
maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second)
maxCompletedRequests := float64(client.concurrency) * client.qps * loadDuration.Seconds()
fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
if fractionCompleted < 0.95 {
framework.Failf("client %q: got %.1f%% completed requests, want at least 95%%", client.username, 100*fractionCompleted)
if fractionCompleted < client.expectedCompletedPercentage {
framework.Failf("client %q: got %.1f%% completed requests, want at least %.1f%%", client.username, 100*fractionCompleted, 100*client.expectedCompletedPercentage)
}
}
})
Expand All @@ -150,28 +154,31 @@ var _ = SIGDescribe("API priority and fairness", func() {
// the two clients and not allow one client to drown out the other despite
// having a higher QPS.
ginkgo.It("should ensure that requests can't be drowned out (fairness)", func() {
priorityLevelName := "e2e-testing-prioritylevel"
flowSchemaName := "e2e-testing-flowschema"
priorityLevelName := "e2e-testing-prioritylevel-" + f.UniqueName
flowSchemaName := "e2e-testing-flowschema-" + f.UniqueName
loadDuration := 10 * time.Second

framework.Logf("creating PriorityLevel %q", priorityLevelName)
_, cleanup := createPriorityLevel(f, priorityLevelName, 1)
defer cleanup()

highQPSClientName := "highqps-" + f.UniqueName
lowQPSClientName := "lowqps-" + f.UniqueName
framework.Logf("creating FlowSchema %q", flowSchemaName)
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, "*")
_, cleanup = createFlowSchema(f, flowSchemaName, 1000, priorityLevelName, []string{highQPSClientName, lowQPSClientName})
defer cleanup()

type client struct {
username string
qps float64
concurrencyMultiplier float64
concurrency int32
completedRequests int32
username string
qps float64
concurrencyMultiplier float64
concurrency int32
completedRequests int32
expectedCompletedPercentage float64
}
clients := []client{
{username: "highqps", qps: 100.0, concurrencyMultiplier: 2.0},
{username: "lowqps", qps: 5.0, concurrencyMultiplier: 0.5},
{username: highQPSClientName, qps: 90, concurrencyMultiplier: 2.0, expectedCompletedPercentage: 0.90},
{username: lowQPSClientName, qps: 4, concurrencyMultiplier: 0.5, expectedCompletedPercentage: 0.90},
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this .9 an increase over #96798's .75?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A recent failure:

Dec 4 15:38:28.824: client "highqps-apf-8126": got 36.6% completed requests, want at least 75.0%

So I think we'll need a followup.

Perhaps we need to use quite large share amounts to throttle the rest of the cluster.

}

framework.Logf("getting real concurrency")
Expand Down Expand Up @@ -202,8 +209,8 @@ var _ = SIGDescribe("API priority and fairness", func() {
maxCompletedRequests := float64(client.concurrency) * client.qps * float64(loadDuration/time.Second)
fractionCompleted := float64(client.completedRequests) / maxCompletedRequests
framework.Logf("client %q completed %d/%d requests (%.1f%%)", client.username, client.completedRequests, int32(maxCompletedRequests), 100*fractionCompleted)
if fractionCompleted < 0.95 {
framework.Failf("client %q: got %.1f%% completed requests, want at least 95%%", client.username, 100*fractionCompleted)
if fractionCompleted < client.expectedCompletedPercentage {
framework.Failf("client %q: got %.1f%% completed requests, want at least %.1f%%", client.username, 100*fractionCompleted, 100*client.expectedCompletedPercentage)
}
}
})
Expand Down Expand Up @@ -265,16 +272,9 @@ func getPriorityLevelConcurrency(f *framework.Framework, priorityLevelName strin

// createFlowSchema creates a flow schema referring to a particular priority
// level and matching the username provided.
func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsername string) (*flowcontrol.FlowSchema, func()) {
func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPrecedence int32, priorityLevelName string, matchingUsernames []string) (*flowcontrol.FlowSchema, func()) {
var subjects []flowcontrol.Subject
if matchingUsername == "*" {
subjects = append(subjects, flowcontrol.Subject{
Kind: flowcontrol.SubjectKindGroup,
Group: &flowcontrol.GroupSubject{
Name: user.AllAuthenticated,
},
})
} else {
for _, matchingUsername := range matchingUsernames {
subjects = append(subjects, flowcontrol.Subject{
Kind: flowcontrol.SubjectKindUser,
User: &flowcontrol.UserSubject{
Expand Down Expand Up @@ -321,6 +321,7 @@ func createFlowSchema(f *framework.Framework, flowSchemaName string, matchingPre
func makeRequest(f *framework.Framework, username string) *http.Response {
config := f.ClientConfig()
config.Impersonate.UserName = username
config.RateLimiter = clientsideflowcontrol.NewFakeAlwaysRateLimiter()
config.Impersonate.Groups = []string{"system:authenticated"}
roundTripper, err := rest.TransportFor(config)
framework.ExpectNoError(err)
Expand Down Expand Up @@ -350,7 +351,7 @@ func testResponseHeaderMatches(f *framework.Framework, impersonatingUser, plUID,
func uniformQPSLoadSingle(f *framework.Framework, username string, qps float64, loadDuration time.Duration) int32 {
var completed int32
var wg sync.WaitGroup
ticker := time.NewTicker(time.Duration(1e9/qps) * time.Nanosecond)
ticker := time.NewTicker(time.Duration(float64(time.Second) / qps))
defer ticker.Stop()
timer := time.NewTimer(loadDuration)
for {
Expand Down