Skip to content

Commit

Permalink
priority & fairness: support dynamically configuring work estimator m…
Browse files Browse the repository at this point in the history
…ax seats

Max seats from prioriy & fairness work estimator is now min(0.15 x
nominalCL, nominalCL/handSize)

'Max seats' calculated by work estimator is currently hard coded to 10.
When using lower values for --max-requests-inflight, a single
LIST request taking up 10 seats could end up using all if not most seats in
the priority level. This change updates the default work estimator
config such that 'max seats' is at most 10% of the
maximum concurrency limit for a priority level, with an upper limit of 10.
This ensures seats taken from LIST request is proportional to the total
available seats.

Signed-off-by: Andrew Sy Kim <andrewsy@google.com>

Kubernetes-commit: 4c5ea17576f30d8c2920d12cd77e866a980ccefa
  • Loading branch information
andrewsykim authored and k8s-publishing-bot committed Apr 26, 2023
1 parent 7c9db42 commit 9a86b3e
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 53 deletions.
2 changes: 1 addition & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
if c.FlowControl != nil {
workEstimatorCfg := flowcontrolrequest.DefaultWorkEstimatorConfig()
requestWorkEstimator := flowcontrolrequest.NewWorkEstimator(
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg)
c.StorageObjectCountTracker.Get, c.FlowControl.GetInterestedWatchCount, workEstimatorCfg, c.FlowControl.GetMaxSeats)
handler = filterlatency.TrackCompleted(handler)
handler = genericfilters.WithPriorityAndFairness(handler, c.LongRunningFunc, c.FlowControl, requestWorkEstimator)
handler = filterlatency.TrackStarted(handler, "priorityandfairness")
Expand Down
16 changes: 10 additions & 6 deletions pkg/server/filters/priority-and-fairness_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ type fakeApfFilter struct {
postDequeue func()

utilflowcontrol.WatchTracker
utilflowcontrol.MaxSeatsTracker
}

func (t fakeApfFilter) Handle(ctx context.Context,
Expand Down Expand Up @@ -146,10 +147,11 @@ func newApfServerWithSingleRequest(t *testing.T, decision mockDecision) *httptes

func newApfServerWithHooks(t *testing.T, decision mockDecision, onExecute, postExecute, postEnqueue, postDequeue func()) *httptest.Server {
fakeFilter := fakeApfFilter{
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
WatchTracker: utilflowcontrol.NewWatchTracker(),
mockDecision: decision,
postEnqueue: postEnqueue,
postDequeue: postDequeue,
WatchTracker: utilflowcontrol.NewWatchTracker(),
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
}
return newApfServerWithFilter(t, fakeFilter, onExecute, postExecute)
}
Expand Down Expand Up @@ -349,12 +351,14 @@ type fakeWatchApfFilter struct {
preExecutePanic bool

utilflowcontrol.WatchTracker
utilflowcontrol.MaxSeatsTracker
}

func newFakeWatchApfFilter(capacity int) *fakeWatchApfFilter {
return &fakeWatchApfFilter{
capacity: capacity,
WatchTracker: utilflowcontrol.NewWatchTracker(),
capacity: capacity,
WatchTracker: utilflowcontrol.NewWatchTracker(),
MaxSeatsTracker: utilflowcontrol.NewMaxSeatsTracker(),
}
}

Expand Down
24 changes: 24 additions & 0 deletions pkg/util/flowcontrol/apf_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ import (

const timeFmt = "2006-01-02T15:04:05.999"

const (
// priorityLevelMaxSeatsPercent is the percentage of the nominalCL used as max seats allocatable from work estimator
priorityLevelMaxSeatsPercent = float64(0.15)
)

// This file contains a simple local (to the apiserver) controller
// that digests API Priority and Fairness config objects (FlowSchema
// and PriorityLevelConfiguration) into the data structure that the
Expand Down Expand Up @@ -136,6 +141,12 @@ type configController struct {
// watchTracker implements the necessary WatchTracker interface.
WatchTracker

// MaxSeatsTracker tracks the maximum seats that should be allocatable from the
// work estimator for a given priority level. This controller does not enforce
// any limits on max seats stored in this tracker, it is up to the work estimator
// to set lower/upper limits on max seats (currently min=1, max=10).
MaxSeatsTracker

// the most recent update attempts, ordered by increasing age.
// Consumer trims to keep only the last minute's worth of entries.
// The controller uses this to limit itself to at most six updates
Expand Down Expand Up @@ -214,6 +225,7 @@ func newTestableController(config TestableConfig) *configController {
flowcontrolClient: config.FlowcontrolClient,
priorityLevelStates: make(map[string]*priorityLevelState),
WatchTracker: NewWatchTracker(),
MaxSeatsTracker: NewMaxSeatsTracker(),
}
klog.V(2).Infof("NewTestableController %q with serverConcurrencyLimit=%d, requestWaitLimit=%s, name=%s, asFieldManager=%q", cfgCtlr.name, cfgCtlr.serverConcurrencyLimit, cfgCtlr.requestWaitLimit, cfgCtlr.name, cfgCtlr.asFieldManager)
// Start with longish delay because conflicts will be between
Expand Down Expand Up @@ -628,6 +640,7 @@ func (meal *cfgMeal) processOldPLsLocked() {
// draining and no use is coming from another
// goroutine
klog.V(3).Infof("Removing undesired priority level %q (nilQueues=%v), Type=%v", plName, plState.queues == nil, plState.pl.Spec.Type)
meal.cfgCtlr.MaxSeatsTracker.ForgetPriorityLevel(plName)
continue
}
if !plState.quiescing {
Expand Down Expand Up @@ -675,6 +688,17 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
var waitLimit int
if qCfg := plState.pl.Spec.Limited.LimitResponse.Queuing; qCfg != nil {
waitLimit = int(qCfg.Queues * qCfg.QueueLengthLimit)

// Max seats allocatable from work estimator is calculated as MAX(1, MIN(0.15 * nominalCL, nominalCL/handSize)).
// This is to keep max seats relative to total available concurrency with a minimum value of 1.
// 15% of nominal concurrency was chosen since it preserved the previous max seats of 10 for default priority levels
// when using apiserver's default total server concurrency of 600 (--max-requests-inflight=400, --max-mutating-requests-inflight=200).
// This ensures that clusters with relatively high inflight requests will continue to use a max seats of 10
// while clusters with lower inflight requests will use max seats no greater than nominalCL/handSize.
// Calculated max seats can return arbitrarily high values but work estimator currently limits max seats at 10.
handSize := plState.pl.Spec.Limited.LimitResponse.Queuing.HandSize
maxSeats := uint64(math.Max(1, math.Min(math.Ceil(float64(concurrencyLimit)*priorityLevelMaxSeatsPercent), float64(int32(concurrencyLimit)/handSize))))
meal.cfgCtlr.MaxSeatsTracker.SetMaxSeats(plName, maxSeats)
}
meal.maxWaitingRequests += waitLimit

Expand Down
4 changes: 4 additions & 0 deletions pkg/util/flowcontrol/apf_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ type Interface interface {

// WatchTracker provides the WatchTracker interface.
WatchTracker

// MaxSeatsTracker is invoked from the work estimator to track max seats
// that can be occupied by a request for a priority level.
MaxSeatsTracker
}

// This request filter implements https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/1040-priority-and-fairness/README.md
Expand Down
66 changes: 66 additions & 0 deletions pkg/util/flowcontrol/max_seats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flowcontrol

import (
"sync"
)

// MaxSeatsTracker is used to track max seats allocatable per priority level from the work estimator
type MaxSeatsTracker interface {
// GetMaxSeats returns the maximum seats a request should occupy for a given priority level.
GetMaxSeats(priorityLevelName string) uint64

// SetMaxSeats configures max seats for a priority level.
SetMaxSeats(priorityLevelName string, maxSeats uint64)

// ForgetPriorityLevel removes max seats tracking for a priority level.
ForgetPriorityLevel(priorityLevelName string)
}

type maxSeatsTracker struct {
sync.RWMutex

maxSeats map[string]uint64
}

func NewMaxSeatsTracker() MaxSeatsTracker {
return &maxSeatsTracker{
maxSeats: make(map[string]uint64),
}
}

func (m *maxSeatsTracker) GetMaxSeats(plName string) uint64 {
m.RLock()
defer m.RUnlock()

return m.maxSeats[plName]
}

func (m *maxSeatsTracker) SetMaxSeats(plName string, maxSeats uint64) {
m.Lock()
defer m.Unlock()

m.maxSeats[plName] = maxSeats
}

func (m *maxSeatsTracker) ForgetPriorityLevel(plName string) {
m.Lock()
defer m.Unlock()

delete(m.maxSeats, plName)
}
140 changes: 140 additions & 0 deletions pkg/util/flowcontrol/max_seats_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flowcontrol

import (
"testing"
"time"

"k8s.io/api/flowcontrol/v1beta2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fqs "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset"
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/eventclock"
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
"k8s.io/client-go/informers"
clientsetfake "k8s.io/client-go/kubernetes/fake"
)

// Test_GetMaxSeats tests max seats retrieved from MaxSeatsTracker
func Test_GetMaxSeats(t *testing.T) {
testcases := []struct {
name string
nominalCL int
handSize int32
expectedMaxSeats uint64
}{
{
name: "nominalCL=5, handSize=6",
nominalCL: 5,
handSize: 6,
expectedMaxSeats: 1,
},
{
name: "nominalCL=10, handSize=6",
nominalCL: 10,
handSize: 6,
expectedMaxSeats: 1,
},
{
name: "nominalCL=15, handSize=6",
nominalCL: 15,
handSize: 6,
expectedMaxSeats: 2,
},
{
name: "nominalCL=20, handSize=6",
nominalCL: 20,
handSize: 6,
expectedMaxSeats: 3,
},
{
name: "nominalCL=35, handSize=6",
nominalCL: 35,
handSize: 6,
expectedMaxSeats: 5,
},
{
name: "nominalCL=100, handSize=6",
nominalCL: 100,
handSize: 6,
expectedMaxSeats: 15,
},
{
name: "nominalCL=200, handSize=6",
nominalCL: 200,
handSize: 6,
expectedMaxSeats: 30,
},
{
name: "nominalCL=10, handSize=1",
nominalCL: 10,
handSize: 1,
expectedMaxSeats: 2,
},
{
name: "nominalCL=100, handSize=20",
nominalCL: 100,
handSize: 20,
expectedMaxSeats: 5,
},
}

for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
clientset := clientsetfake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(clientset, time.Second)
flowcontrolClient := clientset.FlowcontrolV1beta2()
startTime := time.Now()
clk, _ := eventclock.NewFake(startTime, 0, nil)
c := newTestableController(TestableConfig{
Name: "Controller",
Clock: clk,
InformerFactory: informerFactory,
FlowcontrolClient: flowcontrolClient,
// for the purposes of this test, serverCL ~= nominalCL since there is
// only 1 PL with large concurrency shares, making mandatory PLs negligible.
ServerConcurrencyLimit: testcase.nominalCL,
RequestWaitLimit: time.Minute,
ReqsGaugeVec: metrics.PriorityLevelConcurrencyGaugeVec,
ExecSeatsGaugeVec: metrics.PriorityLevelExecutionSeatsGaugeVec,
QueueSetFactory: fqs.NewQueueSetFactory(clk),
})

testPriorityLevel := &v1beta2.PriorityLevelConfiguration{
ObjectMeta: metav1.ObjectMeta{
Name: "test-pl",
},
Spec: v1beta2.PriorityLevelConfigurationSpec{
Type: v1beta2.PriorityLevelEnablementLimited,
Limited: &v1beta2.LimitedPriorityLevelConfiguration{
AssuredConcurrencyShares: 10000,
LimitResponse: v1beta2.LimitResponse{
Queuing: &v1beta2.QueuingConfiguration{
HandSize: testcase.handSize,
},
},
},
},
}
c.digestConfigObjects([]*v1beta2.PriorityLevelConfiguration{testPriorityLevel}, nil)
maxSeats := c.GetMaxSeats("test-pl")
if maxSeats != testcase.expectedMaxSeats {
t.Errorf("unexpected max seats, got=%d, want=%d", maxSeats, testcase.expectedMaxSeats)
}
})
}
}
9 changes: 5 additions & 4 deletions pkg/util/flowcontrol/request/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (

const (
minimumSeats = 1
maximumSeats = 10
maximumSeatsLimit = 10
objectsPerSeat = 100.0
watchesPerSeat = 10.0
enableMutatingWorkEstimator = true
Expand All @@ -39,12 +39,13 @@ type WorkEstimatorConfig struct {

// MinimumSeats is the minimum number of seats a request must occupy.
MinimumSeats uint64 `json:"minimumSeats,omitempty"`
// MaximumSeats is the maximum number of seats a request can occupy

// MaximumSeatsLimit is an upper limit on the max seats a request can occupy.
//
// NOTE: work_estimate_seats_samples metric uses the value of maximumSeats
// as the upper bound, so when we change maximumSeats we should also
// update the buckets of the metric.
MaximumSeats uint64 `json:"maximumSeats,omitempty"`
MaximumSeatsLimit uint64 `json:"maximumSeatsLimit,omitempty"`
}

// ListWorkEstimatorConfig holds work estimator parameters related to list requests.
Expand All @@ -66,7 +67,7 @@ type MutatingWorkEstimatorConfig struct {
func DefaultWorkEstimatorConfig() *WorkEstimatorConfig {
return &WorkEstimatorConfig{
MinimumSeats: minimumSeats,
MaximumSeats: maximumSeats,
MaximumSeatsLimit: maximumSeatsLimit,
ListWorkEstimatorConfig: defaultListWorkEstimatorConfig(),
MutatingWorkEstimatorConfig: defaultMutatingWorkEstimatorConfig(),
}
Expand Down
Loading

0 comments on commit 9a86b3e

Please sign in to comment.