Skip to content

Commit

Permalink
add context to metrics in util/flowcontrol.
Browse files Browse the repository at this point in the history
  • Loading branch information
YoyinZyc committed Feb 1, 2021
1 parent 266d67b commit 57d0bc3
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 28 deletions.
10 changes: 5 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
noteFn(fs, pl)
if req == nil {
if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt)
return
Expand All @@ -140,18 +140,18 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque
}()
idle = req.Finish(func() {
if queued {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
metrics.AddDispatch(pl.Name, fs.Name)
metrics.AddDispatch(ctx, pl.Name, fs.Name)
executed = true
startExecutionTime := time.Now()
defer func() {
metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime))
metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, time.Since(startExecutionTime))
}()
execFn()
})
if queued && !executed {
metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime))
}
panicking = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
if qs.qCfg.DesiredNumQueues < 1 {
if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit)
metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit")
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit")
return nil, qs.isIdleLocked()
}
req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2)
Expand All @@ -262,7 +262,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist
// concurrency shares and at max queue length already
if req == nil {
klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2)
metrics.AddReject(qs.qCfg.Name, fsName, "queue-full")
metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full")
return nil, qs.isIdleLocked()
}

Expand Down Expand Up @@ -351,7 +351,7 @@ func (req *request) wait() (bool, bool) {
switch decision {
case decisionReject:
klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2)
metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out")
metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out")
return false, qs.isIdleLocked()
case decisionCancel:
// TODO(aaron-prindle) add metrics for this case
Expand Down Expand Up @@ -448,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
if ok := qs.rejectOrEnqueueLocked(req); !ok {
return nil
}
metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests))
metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests))
return req
}

Expand Down Expand Up @@ -486,7 +486,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s
req.decision.SetLocked(decisionReject)
// get index for timed out requests
timeoutIdx = i
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
} else {
break
Expand Down Expand Up @@ -534,7 +534,7 @@ func (qs *queueSet) enqueueLocked(request *request) {
}
queue.Enqueue(request)
qs.totRequestsWaiting++
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1)
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1)
request.NoteQueued(true)
qs.obsPair.RequestsWaiting.Add(1)
}
Expand Down Expand Up @@ -569,7 +569,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish
}
req.decision.SetLocked(decisionExecute)
qs.totRequestsExecuting++
metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1)
metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1)
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(5).Enabled() {
klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting)
Expand Down Expand Up @@ -599,9 +599,9 @@ func (qs *queueSet) dispatchLocked() bool {
qs.totRequestsWaiting--
qs.totRequestsExecuting++
queue.requestsExecuting++
metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1)
metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1)
request.NoteQueued(false)
metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1)
metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1)
qs.obsPair.RequestsWaiting.Add(-1)
qs.obsPair.RequestsExecuting.Add(1)
if klog.V(6).Enabled() {
Expand Down Expand Up @@ -631,7 +631,7 @@ func (qs *queueSet) cancelWait(req *request) {
// remove the request
queue.requests = append(queue.requests[:i], queue.requests[i+1:]...)
qs.totRequestsWaiting--
metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1)
metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1)
req.NoteQueued(false)
qs.obsPair.RequestsWaiting.Add(-1)
break
Expand Down Expand Up @@ -704,7 +704,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool
func (qs *queueSet) finishRequestLocked(r *request) {
now := qs.clock.Now()
qs.totRequestsExecuting--
metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1)
metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1)
qs.obsPair.RequestsExecuting.Add(-1)

if r.queue == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package metrics

import (
"context"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -221,12 +222,12 @@ var (
)

// AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel
func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) {
func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
}

// AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel
func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) {
func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) {
apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta))
}

Expand All @@ -236,26 +237,26 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) {
}

// AddReject increments the # of rejected requests for flow control
func AddReject(priorityLevel, flowSchema, reason string) {
apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) {
apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1)
}

// AddDispatch increments the # of dispatched requests for flow control
func AddDispatch(priorityLevel, flowSchema string) {
apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1)
func AddDispatch(ctx context.Context, priorityLevel, flowSchema string) {
apiserverDispatchedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Add(1)
}

// ObserveQueueLength observes the queue length for flow control
func ObserveQueueLength(priorityLevel, flowSchema string, length int) {
apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length))
func ObserveQueueLength(ctx context.Context, priorityLevel, flowSchema string, length int) {
apiserverRequestQueueLength.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(float64(length))
}

// ObserveWaitingDuration observes the queue length for flow control
func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) {
apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, execute string, waitTime time.Duration) {
apiserverRequestWaitingSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds())
}

// ObserveExecutionDuration observes the execution duration for flow control
func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) {
apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) {
apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds())
}

0 comments on commit 57d0bc3

Please sign in to comment.