Skip to content

Commit

Permalink
Merge pull request kubernetes#94146 from MikeSpreitzer/limit-lag
Browse files Browse the repository at this point in the history
Make sampleAndWaterMarkHistograms not fall very far behind
  • Loading branch information
k8s-ci-robot committed Aug 21, 2020
2 parents 61edc6f + 9e89b92 commit 14a1106
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 4 deletions.
1 change: 1 addition & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
if s.isPostStartHookRegistered(priorityAndFairnessConfigConsumerHookName) {
} else if c.FlowControl != nil {
err := s.AddPostStartHook(priorityAndFairnessConfigConsumerHookName, func(context PostStartHookContext) error {
go c.FlowControl.MaintainObservations(context.StopCh)
go c.FlowControl.Run(context.StopCh)
return nil
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,23 @@ func (cfgCtlr *configController) initializeConfigController(informerFactory kube
}})
}

// MaintainObservations keeps the observers from
// metrics.PriorityLevelConcurrencyObserverPairGenerator from falling
// too far behind
func (cfgCtlr *configController) MaintainObservations(stopCh <-chan struct{}) {
wait.Until(cfgCtlr.updateObservations, 10*time.Second, stopCh)
}

func (cfgCtlr *configController) updateObservations() {
cfgCtlr.lock.Lock()
defer cfgCtlr.lock.Unlock()
for _, plc := range cfgCtlr.priorityLevelStates {
if plc.queues != nil {
plc.queues.UpdateObservations()
}
}
}

func (cfgCtlr *configController) Run(stopCh <-chan struct{}) error {
defer cfgCtlr.configQueue.ShutDown()
klog.Info("Starting API Priority and Fairness config controller")
Expand Down
14 changes: 10 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,23 @@ import (
type Interface interface {
// Handle takes care of queuing and dispatching a request
// characterized by the given digest. The given `noteFn` will be
// invoked with the results of request classification. If Handle
// decides that the request should be executed then `execute()`
// will be invoked once to execute the request; otherwise
// `execute()` will not be invoked.
// invoked with the results of request classification. If the
// request is queued then `queueNoteFn` will be called twice,
// first with `true` and then with `false`; otherwise
// `queueNoteFn` will not be called at all. If Handle decides
// that the request should be executed then `execute()` will be
// invoked once to execute the request; otherwise `execute()` will
// not be invoked.
Handle(ctx context.Context,
requestDigest RequestDigest,
noteFn func(fs *fctypesv1a1.FlowSchema, pl *fctypesv1a1.PriorityLevelConfiguration),
queueNoteFn fq.QueueNoteFn,
execFn func(),
)

// MaintainObservations is a helper for maintaining statistics.
MaintainObservations(stopCh <-chan struct{})

// Run monitors config objects from the main apiservers and causes
// any needed changes to local behavior. This method ceases
// activity and returns after the given channel is closed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func (cqs *ctlrTestQueueSet) BeginConfigChange(qc fq.QueuingConfig) (fq.QueueSet
return ctlrTestQueueSetCompleter{cqs.cts, cqs, qc}, nil
}

func (cqs *ctlrTestQueueSet) UpdateObservations() {
}

func (cqs *ctlrTestQueueSet) Dump(bool) debug.QueueSetDump {
return debug.QueueSetDump{}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ type QueueSet interface {
// exactly once.
StartRequest(ctx context.Context, hashValue uint64, flowDistinguisher, fsName string, descr1, descr2 interface{}, queueNoteFn QueueNoteFn) (req Request, idle bool)

// UpdateObservations makes sure any time-based statistics have
// caught up with the current clock reading
UpdateObservations()

// Dump saves and returns the instant internal state of the queue-set.
// Note that dumping process will stop the queue-set from proceeding
// any requests.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,6 +743,11 @@ func (qs *queueSet) goroutineDoneOrBlocked() {
qs.counter.Add(-1)
}

func (qs *queueSet) UpdateObservations() {
qs.obsPair.RequestsWaiting.Add(0)
qs.obsPair.RequestsExecuting.Add(0)
}

func (qs *queueSet) Dump(includeRequestDetails bool) debug.QueueSetDump {
qs.lock.Lock()
defer qs.lock.Unlock()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (noRestraint) StartRequest(ctx context.Context, hashValue uint64, flowDisti
return noRestraintRequest{}, false
}

func (noRestraint) UpdateObservations() {
}

func (noRestraint) Dump(bool) debug.QueueSetDump {
return debug.QueueSetDump{}
}
Expand Down

0 comments on commit 14a1106

Please sign in to comment.