Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Factored TimedObserver into less surprising pieces #106432

Merged
merged 1 commit into from
Nov 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 5 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) {
// requestWatermark is used to track maximal numbers of requests in a particular phase of handling
type requestWatermark struct {
phase string
readOnlyObserver, mutatingObserver fcmetrics.TimedObserver
readOnlyObserver, mutatingObserver fcmetrics.RatioedChangeObserver
lock sync.Mutex
readOnlyWatermark, mutatingWatermark int
}

func (w *requestWatermark) recordMutating(mutatingVal int) {
w.mutatingObserver.Set(float64(mutatingVal))
w.mutatingObserver.Observe(float64(mutatingVal))

w.lock.Lock()
defer w.lock.Unlock()
Expand All @@ -78,7 +78,7 @@ func (w *requestWatermark) recordMutating(mutatingVal int) {
}

func (w *requestWatermark) recordReadOnly(readOnlyVal int) {
w.readOnlyObserver.Set(float64(readOnlyVal))
w.readOnlyObserver.Observe(float64(readOnlyVal))

w.lock.Lock()
defer w.lock.Unlock()
Expand Down Expand Up @@ -132,11 +132,11 @@ func WithMaxInFlightLimit(
var mutatingChan chan bool
if nonMutatingLimit != 0 {
nonMutatingChan = make(chan bool, nonMutatingLimit)
watermark.readOnlyObserver.SetX1(float64(nonMutatingLimit))
watermark.readOnlyObserver.SetDenominator(float64(nonMutatingLimit))
}
if mutatingLimit != 0 {
mutatingChan = make(chan bool, mutatingLimit)
watermark.mutatingObserver.SetX1(float64(mutatingLimit))
watermark.mutatingObserver.SetDenominator(float64(mutatingLimit))
}

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ type configController struct {
name string // varies in tests of fighting controllers
clock clock.PassiveClock
queueSetFactory fq.QueueSetFactory
reqsObsPairGenerator metrics.TimedObserverPairGenerator
execSeatsObsGenerator metrics.TimedObserverGenerator
reqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator
execSeatsObsGenerator metrics.RatioedChangeObserverGenerator

// How this controller appears in an ObjectMeta ManagedFieldsEntry.Manager
asFieldManager string
Expand Down Expand Up @@ -193,10 +193,10 @@ type priorityLevelState struct {
numPending int

// Observers tracking number of requests waiting, executing
reqsObsPair metrics.TimedObserverPair
reqsObsPair metrics.RatioedChangeObserverPair

// Observer of number of seats occupied throughout execution
execSeatsObs metrics.TimedObserver
execSeatsObs metrics.RatioedChangeObserver
}

// NewTestableController is extra flexible to facilitate testing
Expand Down Expand Up @@ -693,7 +693,7 @@ func (meal *cfgMeal) finishQueueSetReconfigsLocked() {
// given priority level configuration. Returns nil if that config
// does not call for limiting. Returns nil and an error if the given
// object is malformed in a way that is a problem for this package.
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func queueSetCompleterForPL(qsf fq.QueueSetFactory, queues fq.QueueSet, pl *flowcontrol.PriorityLevelConfiguration, requestWaitLimit time.Duration, reqsIntPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
if (pl.Spec.Type == flowcontrol.PriorityLevelEnablementExempt) != (pl.Spec.Limited == nil) {
return nil, errors.New("broken union structure at the top")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,10 @@ type TestableConfig struct {
RequestWaitLimit time.Duration

// ObsPairGenerator for metrics about requests
ReqsObsPairGenerator metrics.TimedObserverPairGenerator
ReqsObsPairGenerator metrics.RatioedChangeObserverPairGenerator

// TimedObserverPairGenerator for metrics about seats occupied by all phases of execution
ExecSeatsObsGenerator metrics.TimedObserverGenerator
// RatioedChangeObserverPairGenerator for metrics about seats occupied by all phases of execution
ExecSeatsObsGenerator metrics.RatioedChangeObserverGenerator

// QueueSetFactory for the queuing implementation
QueueSetFactory fq.QueueSetFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type ctlrTestRequest struct {
descr1, descr2 interface{}
}

func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.TimedObserverPair, eso metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func (cts *ctlrTestState) BeginConstruction(qc fq.QueuingConfig, rip metrics.RatioedChangeObserverPair, eso metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
return ctlrTestQueueSetCompleter{cts, nil, qc}, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,9 @@ import (
// Integrator computes the moments of some variable X over time as
// read from a particular clock. The integrals start when the
// Integrator is created, and ends at the latest operation on the
// Integrator. As a `metrics.TimedObserver` this fixes X1=1 and
// ignores attempts to change X1.
// Integrator.
type Integrator interface {
metrics.TimedObserver
metrics.ChangeObserver

GetResults() IntegratorResults

Expand Down Expand Up @@ -70,10 +69,7 @@ func NewIntegrator(clock clock.PassiveClock) Integrator {
}
}

func (igr *integrator) SetX1(x1 float64) {
}

func (igr *integrator) Set(x float64) {
func (igr *integrator) Observe(x float64) {
igr.Lock()
igr.setLocked(x)
igr.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestIntegrator(t *testing.T) {
if !results.Equal(&rToo) {
t.Errorf("expected %#+v, got %#+v", results, rToo)
}
igr.Set(2)
igr.Observe(2)
results = igr.GetResults()
if e := (IntegratorResults{Duration: 0, Average: math.NaN(), Deviation: math.NaN(), Min: 2, Max: 3}); !e.Equal(&results) {
t.Errorf("expected %#+v, got %#+v", e, results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ import (
// before committing to a concurrency allotment for the second.
type QueueSetFactory interface {
// BeginConstruction does the first phase of creating a QueueSet.
// The TimedObserverPair observes number of requests,
// The RatioedChangeObserverPair observes number of requests,
// execution covering just the regular phase.
// The TimedObserver observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (QueueSetCompleter, error)
// The RatioedChangeObserver observes number of seats occupied through all phases of execution.
BeginConstruction(QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (QueueSetCompleter, error)
}

// QueueSetCompleter finishes the two-step process of creating or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ type promiseFactoryFactory func(*queueSet) promiseFactory
// the fields `factory` and `theSet` is non-nil.
type queueSetCompleter struct {
factory *queueSetFactory
reqsObsPair metrics.TimedObserverPair
execSeatsObs metrics.TimedObserver
reqsObsPair metrics.RatioedChangeObserverPair
execSeatsObs metrics.RatioedChangeObserver
theSet *queueSet
qCfg fq.QueuingConfig
dealer *shufflesharding.Dealer
Expand All @@ -81,9 +81,9 @@ type queueSet struct {
clock eventclock.Interface
estimatedServiceDuration time.Duration

reqsObsPair metrics.TimedObserverPair // .RequestsExecuting covers regular phase only
reqsObsPair metrics.RatioedChangeObserverPair // .RequestsExecuting covers regular phase only

execSeatsObs metrics.TimedObserver // for all phases of execution
execSeatsObs metrics.RatioedChangeObserver // for all phases of execution

promiseFactory promiseFactory

Expand Down Expand Up @@ -148,7 +148,7 @@ func newTestableQueueSetFactory(c eventclock.Interface, promiseFactoryFactory pr
}
}

func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.TimedObserverPair, execSeatsObs metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func (qsf *queueSetFactory) BeginConstruction(qCfg fq.QueuingConfig, reqsObsPair metrics.RatioedChangeObserverPair, execSeatsObs metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
dealer, err := checkConfig(qCfg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -243,9 +243,9 @@ func (qs *queueSet) setConfiguration(ctx context.Context, qCfg fq.QueuingConfig,
if qll < 1 {
qll = 1
}
qs.reqsObsPair.RequestsWaiting.SetX1(float64(qll))
qs.reqsObsPair.RequestsExecuting.SetX1(float64(dCfg.ConcurrencyLimit))
qs.execSeatsObs.SetX1(float64(dCfg.ConcurrencyLimit))
qs.reqsObsPair.RequestsWaiting.SetDenominator(float64(qll))
qs.reqsObsPair.RequestsExecuting.SetDenominator(float64(dCfg.ConcurrencyLimit))
qs.execSeatsObs.SetDenominator(float64(dCfg.ConcurrencyLimit))

qs.dispatchAsMuchAsPossibleLocked()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1461,10 +1461,10 @@ func newFIFO(requests ...*request) fifo {
return l
}

func newObserverPair(clk clock.PassiveClock) metrics.TimedObserverPair {
func newObserverPair(clk clock.PassiveClock) metrics.RatioedChangeObserverPair {
return metrics.PriorityLevelConcurrencyObserverPairGenerator.Generate(1, 1, []string{"test"})
}

func newExecSeatsObserver(clk clock.PassiveClock) metrics.TimedObserver {
func newExecSeatsObserver(clk clock.PassiveClock) metrics.RatioedChangeObserver {
return metrics.PriorityLevelExecutionSeatsObserverGenerator.Generate(1, 1, []string{"test"})
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type noRestraint struct{}

type noRestraintRequest struct{}

func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.TimedObserverPair, metrics.TimedObserver) (fq.QueueSetCompleter, error) {
func (noRestraintFactory) BeginConstruction(fq.QueuingConfig, metrics.RatioedChangeObserverPair, metrics.RatioedChangeObserver) (fq.QueueSetCompleter, error) {
return noRestraintCompleter{}, nil
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
Copyright 2019 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package metrics

// Observer is something that can be given numeric observations.
type Observer interface {
// Observe takes an observation
Observe(float64)
}

// ChangeObserver extends Observer with the ability to take
// an observation that is relative to the previous observation.
type ChangeObserver interface {
Observer

// Observe a new value that differs by the given amount from the previous observation.
Add(float64)
}

// RatioedChangeObserver tracks ratios.
// The numerator is set/changed through the ChangeObserver methods,
// and the denominator can be updated through the SetDenominator method.
// A ratio is tracked whenever the numerator is set/changed.
type RatioedChangeObserver interface {
ChangeObserver

// SetDenominator sets the denominator to use until it is changed again
SetDenominator(float64)
}

// RatioedChangeObserverGenerator creates related observers that are
// differentiated by a series of label values
type RatioedChangeObserverGenerator interface {
Generate(initialNumerator, initialDenominator float64, labelValues []string) RatioedChangeObserver
}

// RatioedChangeObserverPair is a corresponding pair of observers, one for the
// number of requests waiting in queue(s) and one for the number of
// requests being executed
type RatioedChangeObserverPair struct {
// RequestsWaiting is given observations of the number of currently queued requests
RequestsWaiting RatioedChangeObserver

// RequestsExecuting is given observations of the number of requests currently executing
RequestsExecuting RatioedChangeObserver
}

// RatioedChangeObserverPairGenerator generates pairs
type RatioedChangeObserverPairGenerator interface {
Generate(initialWaitingDenominator, initialExecutingDenominator float64, labelValues []string) RatioedChangeObserverPair
}