diff --git a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go index 96166e65454d..dc1eb5a7f152 100644 --- a/staging/src/k8s.io/apiserver/pkg/audit/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/audit/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package audit import ( + "context" "fmt" auditinternal "k8s.io/apiserver/pkg/apis/audit" @@ -84,13 +85,13 @@ func init() { } // ObserveEvent updates the relevant prometheus metrics for the generated audit event. -func ObserveEvent() { - eventCounter.Inc() +func ObserveEvent(ctx context.Context) { + eventCounter.WithContext(ctx).Inc() } // ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request. -func ObservePolicyLevel(level auditinternal.Level) { - levelCounter.WithLabelValues(string(level)).Inc() +func ObservePolicyLevel(ctx context.Context, level auditinternal.Level) { + levelCounter.WithContext(ctx).WithLabelValues(string(level)).Inc() } // HandlePluginError handles an error that occurred in an audit plugin. This method should only be diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/request/x509/x509.go b/staging/src/k8s.io/apiserver/pkg/authentication/request/x509/x509.go index 0116391409d1..12050a48d7a6 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/request/x509/x509.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/request/x509/x509.go @@ -149,7 +149,7 @@ func (a *Authenticator) AuthenticateRequest(req *http.Request) (*authenticator.R } remaining := req.TLS.PeerCertificates[0].NotAfter.Sub(time.Now()) - clientCertificateExpirationHistogram.Observe(remaining.Seconds()) + clientCertificateExpirationHistogram.WithContext(req.Context()).Observe(remaining.Seconds()) chains, err := req.TLS.PeerCertificates[0].Verify(optsCopy) if err != nil { return nil, false, fmt.Errorf( diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go index a10564f04d93..b0d06a231838 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/cached_token_authenticator.go @@ -133,7 +133,7 @@ func (a *cachedTokenAuthenticator) AuthenticateToken(ctx context.Context, token } func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, token string) *cacheRecord { - doneAuthenticating := stats.authenticating() + doneAuthenticating := stats.authenticating(ctx) auds, audsOk := authenticator.AudiencesFrom(ctx) @@ -145,7 +145,7 @@ func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, toke } // Record cache miss - doneBlocking := stats.blocking() + doneBlocking := stats.blocking(ctx) defer doneBlocking() defer doneAuthenticating(false) @@ -153,7 +153,7 @@ func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, toke // always use one place to read and write the output of AuthenticateToken record := &cacheRecord{} - doneFetching := stats.fetching() + doneFetching := stats.fetching(ctx) // We're leaving the request handling stack so we need to handle crashes // ourselves. Log a stack trace and return a 500 if something panics. defer func() { diff --git a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/stats.go b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/stats.go index dbe745718e79..d1b959aa5ed7 100644 --- a/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/stats.go +++ b/staging/src/k8s.io/apiserver/pkg/authentication/token/cache/stats.go @@ -17,6 +17,7 @@ limitations under the License. package cache import ( + "context" "time" "k8s.io/component-base/metrics" @@ -86,7 +87,7 @@ type statsCollector struct{} var stats = statsCollector{} -func (statsCollector) authenticating() func(hit bool) { +func (statsCollector) authenticating(ctx context.Context) func(hit bool) { start := time.Now() return func(hit bool) { var tag string @@ -98,18 +99,18 @@ func (statsCollector) authenticating() func(hit bool) { latency := time.Since(start) - requestCount.WithLabelValues(tag).Inc() - requestLatency.WithLabelValues(tag).Observe(float64(latency.Milliseconds()) / 1000) + requestCount.WithContext(ctx).WithLabelValues(tag).Inc() + requestLatency.WithContext(ctx).WithLabelValues(tag).Observe(float64(latency.Milliseconds()) / 1000) } } -func (statsCollector) blocking() func() { - activeFetchCount.WithLabelValues(fetchBlockedTag).Inc() - return activeFetchCount.WithLabelValues(fetchBlockedTag).Dec +func (statsCollector) blocking(ctx context.Context) func() { + activeFetchCount.WithContext(ctx).WithLabelValues(fetchBlockedTag).Inc() + return activeFetchCount.WithContext(ctx).WithLabelValues(fetchBlockedTag).Dec } -func (statsCollector) fetching() func(ok bool) { - activeFetchCount.WithLabelValues(fetchInFlightTag).Inc() +func (statsCollector) fetching(ctx context.Context) func(ok bool) { + activeFetchCount.WithContext(ctx).WithLabelValues(fetchInFlightTag).Inc() return func(ok bool) { var tag string if ok { @@ -118,8 +119,8 @@ func (statsCollector) fetching() func(ok bool) { tag = fetchFailedTag } - fetchCount.WithLabelValues(tag).Inc() + fetchCount.WithContext(ctx).WithLabelValues(tag).Inc() - activeFetchCount.WithLabelValues(fetchInFlightTag).Dec() + activeFetchCount.WithContext(ctx).WithLabelValues(fetchInFlightTag).Dec() } } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go index 04264230d8dd..d42e18233f15 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency.go @@ -56,8 +56,8 @@ func TrackStarted(handler http.Handler, name string) http.Handler { // TrackCompleted measures the timestamp the given handler has completed execution and then // it updates the corresponding metric with the filter latency duration. func TrackCompleted(handler http.Handler) http.Handler { - return trackCompleted(handler, utilclock.RealClock{}, func(fr *requestFilterRecord, completedAt time.Time) { - metrics.RecordFilterLatency(fr.name, completedAt.Sub(fr.startedTimestamp)) + return trackCompleted(handler, utilclock.RealClock{}, func(ctx context.Context, fr *requestFilterRecord, completedAt time.Time) { + metrics.RecordFilterLatency(ctx, fr.name, completedAt.Sub(fr.startedTimestamp)) }) } @@ -81,7 +81,7 @@ func trackStarted(handler http.Handler, name string, clock utilclock.PassiveCloc }) } -func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(*requestFilterRecord, time.Time)) http.Handler { +func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(context.Context, *requestFilterRecord, time.Time)) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // The previous filter has just completed. completedAt := clock.Now() @@ -90,7 +90,7 @@ func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action f ctx := r.Context() if fr := requestFilterRecordFrom(ctx); fr != nil { - action(fr, completedAt) + action(ctx, fr, completedAt) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go index 41437407ae6c..f12e9456489f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filterlatency/filterlatency_test.go @@ -17,6 +17,7 @@ limitations under the License. package filterlatency import ( + "context" "net/http" "net/http/httptest" "testing" @@ -120,7 +121,7 @@ func TestTrackCompletedContextHasFilterRecord(t *testing.T) { }) requestFilterEndedAt := time.Now() - wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(fr *requestFilterRecord, completedAt time.Time) { + wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(_ context.Context, fr *requestFilterRecord, completedAt time.Time) { actionCallCount++ filterRecordGot = fr filterCompletedAtGot = completedAt @@ -156,7 +157,7 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) { handlerCallCount++ }) - wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ *requestFilterRecord, _ time.Time) { + wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ context.Context, _ *requestFilterRecord, _ time.Time) { actionCallCount++ }) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go index 891d60935498..2f78ff1de642 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go @@ -18,6 +18,7 @@ package filters import ( "bufio" + "context" "errors" "fmt" "net" @@ -56,8 +57,8 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon } ev.Stage = auditinternal.StageRequestReceived - if processed := processAuditEvent(sink, ev, omitStages); !processed { - audit.ApiserverAuditDroppedCounter.Inc() + if processed := processAuditEvent(ctx, sink, ev, omitStages); !processed { + audit.ApiserverAuditDroppedCounter.WithContext(ctx).Inc() responsewriters.InternalError(w, req, errors.New("failed to store audit event")) return } @@ -70,7 +71,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon longRunningSink = sink } } - respWriter := decorateResponseWriter(w, ev, longRunningSink, omitStages) + respWriter := decorateResponseWriter(ctx, w, ev, longRunningSink, omitStages) // send audit event when we leave this func, either via a panic or cleanly. In the case of long // running requests, this will be the second audit event. @@ -84,7 +85,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon Reason: metav1.StatusReasonInternalError, Message: fmt.Sprintf("APIServer panic'd: %v", r), } - processAuditEvent(sink, ev, omitStages) + processAuditEvent(ctx, sink, ev, omitStages) return } @@ -98,14 +99,14 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon if ev.ResponseStatus == nil && longRunningSink != nil { ev.ResponseStatus = fakedSuccessStatus ev.Stage = auditinternal.StageResponseStarted - processAuditEvent(longRunningSink, ev, omitStages) + processAuditEvent(ctx, longRunningSink, ev, omitStages) } ev.Stage = auditinternal.StageResponseComplete if ev.ResponseStatus == nil { ev.ResponseStatus = fakedSuccessStatus } - processAuditEvent(sink, ev, omitStages) + processAuditEvent(ctx, sink, ev, omitStages) }() handler.ServeHTTP(respWriter, req) }) @@ -125,7 +126,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker } level, omitStages := policy.LevelAndStages(attribs) - audit.ObservePolicyLevel(level) + audit.ObservePolicyLevel(ctx, level) if level == auditinternal.LevelNone { // Don't audit. return req, nil, nil, nil @@ -145,7 +146,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker return req, ev, omitStages, nil } -func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { +func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool { for _, stage := range omitStages { if ev.Stage == stage { return true @@ -157,12 +158,13 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au } else { ev.StageTimestamp = metav1.NewMicroTime(time.Now()) } - audit.ObserveEvent() + audit.ObserveEvent(ctx) return sink.ProcessEvents(ev) } -func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter { +func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter { delegate := &auditResponseWriter{ + ctx: ctx, ResponseWriter: responseWriter, event: ev, sink: sink, @@ -186,6 +188,7 @@ var _ http.ResponseWriter = &auditResponseWriter{} // create immediately an event (for long running requests). type auditResponseWriter struct { http.ResponseWriter + ctx context.Context event *auditinternal.Event once sync.Once sink audit.Sink @@ -205,7 +208,7 @@ func (a *auditResponseWriter) processCode(code int) { a.event.Stage = auditinternal.StageResponseStarted if a.sink != nil { - processAuditEvent(a.sink, a.event, a.omitStages) + processAuditEvent(a.ctx, a.sink, a.event, a.omitStages) } }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go index e885644f8bdd..e1531b421033 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go @@ -18,6 +18,7 @@ package filters import ( "bufio" + "context" "net" "net/http" "net/http/httptest" @@ -92,14 +93,14 @@ func (*fancyResponseWriter) Flush() {} func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil } func TestConstructResponseWriter(t *testing.T) { - actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, nil, nil, nil) switch v := actual.(type) { case *auditResponseWriter: default: t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v)) } - actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil, nil) + actual = decorateResponseWriter(context.Background(), &fancyResponseWriter{}, nil, nil, nil) switch v := actual.(type) { case *fancyResponseWriterDelegator: default: @@ -109,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) { func TestDecorateResponseWriterWithoutChannel(t *testing.T) { ev := &auditinternal.Event{} - actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil) // write status. This will not block because firstEventSentCh is nil actual.WriteHeader(42) @@ -123,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) { func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { ev := &auditinternal.Event{} - actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil) // write status. This will not block because firstEventSentCh is nil actual.Write([]byte("foo")) @@ -138,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) { func TestDecorateResponseWriterChannel(t *testing.T) { sink := &fakeAuditSink{} ev := &auditinternal.Event{} - actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink, nil) + actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, sink, nil) done := make(chan struct{}) go func() { diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go index c133c66721bb..54e77467c0bd 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authentication.go @@ -47,7 +47,7 @@ func WithAuthentication(handler http.Handler, auth authenticator.Request, failed req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds)) } resp, ok, err := auth.AuthenticateRequest(req) - defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart) + defer recordAuthMetrics(req.Context(), resp, ok, err, apiAuds, authenticationStart) if err != nil || !ok { if err != nil { klog.ErrorS(err, "Unable to authenticate the request") diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go index 09d7db8cc903..2de13f747061 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/authn_audit.go @@ -52,7 +52,7 @@ func WithFailedAuthenticationAudit(failedHandler http.Handler, sink audit.Sink, ev.ResponseStatus.Message = getAuthMethods(req) ev.Stage = auditinternal.StageResponseStarted - rw := decorateResponseWriter(w, ev, sink, omitStages) + rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages) failedHandler.ServeHTTP(rw, req) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go index 421c0e0a2ba8..c983b5623313 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package filters import ( + "context" "strings" "time" @@ -75,7 +76,7 @@ func init() { legacyregistry.MustRegister(authenticationLatency) } -func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) { +func recordAuthMetrics(ctx context.Context, resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) { var resultLabel string switch { @@ -85,11 +86,11 @@ func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudi resultLabel = failureLabel default: resultLabel = successLabel - authenticatedUserCounter.WithLabelValues(compressUsername(resp.User.GetName())).Inc() + authenticatedUserCounter.WithContext(ctx).WithLabelValues(compressUsername(resp.User.GetName())).Inc() } - authenticatedAttemptsCounter.WithLabelValues(resultLabel).Inc() - authenticationLatency.WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds()) + authenticatedAttemptsCounter.WithContext(ctx).WithLabelValues(resultLabel).Inc() + authenticationLatency.WithContext(ctx).WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds()) } // compressUsername maps all possible usernames onto a small set of categories diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go index 1e43cdab20f1..bba40b8f7bba 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go @@ -126,7 +126,7 @@ func withFailedRequestAudit(failedHandler http.Handler, statusErr *apierrors.Sta ev.ResponseStatus.Message = statusErr.Error() } - rw := decorateResponseWriter(w, ev, sink, omitStages) + rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages) failedHandler.ServeHTTP(rw, req) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go index 22945ccf6b8a..47fe44b78d7a 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go @@ -163,8 +163,8 @@ type WatchServer struct { // or over a websocket connection. func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { kind := s.Scope.Kind - metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() - defer metrics.RegisteredWatchers.WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec() + metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() + defer metrics.RegisteredWatchers.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Dec() w = httplog.Unlogged(req, w) @@ -220,7 +220,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // End of results. return } - metrics.WatchEvents.WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() + metrics.WatchEvents.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Inc() obj := s.Fixup(event.Object) if err := s.EmbeddedEncoder.Encode(obj, buf); err != nil { @@ -233,7 +233,7 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // type unknown.Raw = buf.Bytes() event.Object = &unknown - metrics.WatchEventsSizes.WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) + metrics.WatchEventsSizes.WithContext(req.Context()).WithLabelValues(kind.Group, kind.Version, kind.Kind).Observe(float64(len(unknown.Raw))) *outEvent = metav1.WatchEvent{} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go index d4f6068b40e3..75f244564bae 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/metrics/metrics.go @@ -18,6 +18,7 @@ package metrics import ( "bufio" + "context" "net" "net/http" "net/url" @@ -324,8 +325,8 @@ func UpdateInflightRequestMetrics(phase string, nonmutating, mutating int) { } } -func RecordFilterLatency(name string, elapsed time.Duration) { - requestFilterDuration.WithLabelValues(name).Observe(elapsed.Seconds()) +func RecordFilterLatency(ctx context.Context, name string, elapsed time.Duration) { + requestFilterDuration.WithContext(ctx).WithLabelValues(name).Observe(elapsed.Seconds()) } // RecordRequestAbort records that the request was aborted possibly due to a timeout. @@ -341,7 +342,7 @@ func RecordRequestAbort(req *http.Request, requestInfo *request.RequestInfo) { group := requestInfo.APIGroup version := requestInfo.APIVersion - requestAbortsTotal.WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc() + requestAbortsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope).Inc() } // RecordRequestTermination records that the request was terminated early as part of a resource @@ -361,9 +362,9 @@ func RecordRequestTermination(req *http.Request, requestInfo *request.RequestInf reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req) if requestInfo.IsResourceRequest { - requestTerminationsTotal.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc() + requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component, codeToString(code)).Inc() } else { - requestTerminationsTotal.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc() + requestTerminationsTotal.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component, codeToString(code)).Inc() } } @@ -383,9 +384,9 @@ func RecordLongRunning(req *http.Request, requestInfo *request.RequestInfo, comp reportedVerb := cleanVerb(canonicalVerb(strings.ToUpper(req.Method), scope), req) if requestInfo.IsResourceRequest { - g = longRunningRequestGauge.WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component) + g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, requestInfo.APIGroup, requestInfo.APIVersion, requestInfo.Resource, requestInfo.Subresource, scope, component) } else { - g = longRunningRequestGauge.WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component) + g = longRunningRequestGauge.WithContext(req.Context()).WithLabelValues(reportedVerb, "", "", "", requestInfo.Path, scope, component) } g.Inc() defer g.Dec() @@ -404,23 +405,23 @@ func MonitorRequest(req *http.Request, verb, group, version, resource, subresour dryRun := cleanDryRun(req.URL) elapsedSeconds := elapsed.Seconds() cleanContentType := cleanContentType(contentType) - requestCounter.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc() + requestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component, cleanContentType, codeToString(httpCode)).Inc() // MonitorRequest happens after authentication, so we can trust the username given by the request info, ok := request.UserFrom(req.Context()) if ok && info.GetName() == user.APIServerUser { - apiSelfRequestCounter.WithLabelValues(reportedVerb, resource, subresource).Inc() + apiSelfRequestCounter.WithContext(req.Context()).WithLabelValues(reportedVerb, resource, subresource).Inc() } if deprecated { - deprecatedRequestGauge.WithLabelValues(group, version, resource, subresource, removedRelease).Set(1) + deprecatedRequestGauge.WithContext(req.Context()).WithLabelValues(group, version, resource, subresource, removedRelease).Set(1) audit.AddAuditAnnotation(req.Context(), deprecatedAnnotationKey, "true") if len(removedRelease) > 0 { audit.AddAuditAnnotation(req.Context(), removedReleaseAnnotationKey, removedRelease) } } - requestLatencies.WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds) + requestLatencies.WithContext(req.Context()).WithLabelValues(reportedVerb, dryRun, group, version, resource, subresource, scope, component).Observe(elapsedSeconds) // We are only interested in response sizes of read requests. if verb == "GET" || verb == "LIST" { - responseSizes.WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize)) + responseSizes.WithContext(req.Context()).WithLabelValues(reportedVerb, group, version, resource, subresource, scope, component).Observe(float64(respSize)) } } diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go index e873351c70bf..2484bfc76c8f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/maxinflight.go @@ -196,9 +196,9 @@ func WithMaxInFlightLimit( } // We need to split this data between buckets used for throttling. if isMutatingRequest { - metrics.DroppedRequests.WithLabelValues(metrics.MutatingKind).Inc() + metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.MutatingKind).Inc() } else { - metrics.DroppedRequests.WithLabelValues(metrics.ReadOnlyKind).Inc() + metrics.DroppedRequests.WithContext(ctx).WithLabelValues(metrics.ReadOnlyKind).Inc() } metrics.RecordRequestTermination(r, requestInfo, metrics.APIServerComponent, http.StatusTooManyRequests) tooManyRequests(r, w) diff --git a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go index 6b17d4c8462b..6f604243112c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go +++ b/staging/src/k8s.io/apiserver/pkg/server/filters/priority-and-fairness.go @@ -141,9 +141,9 @@ func WithPriorityAndFairness( }, execute) if !served { if isMutatingRequest { - epmetrics.DroppedRequests.WithLabelValues(epmetrics.MutatingKind).Inc() + epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.MutatingKind).Inc() } else { - epmetrics.DroppedRequests.WithLabelValues(epmetrics.ReadOnlyKind).Inc() + epmetrics.DroppedRequests.WithContext(ctx).WithLabelValues(epmetrics.ReadOnlyKind).Inc() } epmetrics.RecordRequestTermination(r, requestInfo, epmetrics.APIServerComponent, http.StatusTooManyRequests) tooManyRequests(r, w) diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go index a6afb2b24cad..57e601e62b9b 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/apf_filter.go @@ -123,7 +123,7 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque noteFn(fs, pl) if req == nil { if queued { - metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } klog.V(7).Infof("Handle(%#+v) => fsName=%q, distMethod=%#+v, plName=%q, isExempt=%v, reject", requestDigest, fs.Name, fs.Spec.DistinguisherMethod, pl.Name, isExempt) return @@ -140,18 +140,18 @@ func (cfgCtlr *configController) Handle(ctx context.Context, requestDigest Reque }() idle = req.Finish(func() { if queued { - metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } - metrics.AddDispatch(pl.Name, fs.Name) + metrics.AddDispatch(ctx, pl.Name, fs.Name) executed = true startExecutionTime := time.Now() defer func() { - metrics.ObserveExecutionDuration(pl.Name, fs.Name, time.Since(startExecutionTime)) + metrics.ObserveExecutionDuration(ctx, pl.Name, fs.Name, time.Since(startExecutionTime)) }() execFn() }) if queued && !executed { - metrics.ObserveWaitingDuration(pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) + metrics.ObserveWaitingDuration(ctx, pl.Name, fs.Name, strconv.FormatBool(req != nil), time.Since(startWaitingTime)) } panicking = false } diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index acdec4bba1f8..6a85bb589900 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -243,7 +243,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist if qs.qCfg.DesiredNumQueues < 1 { if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, fsName, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) - metrics.AddReject(qs.qCfg.Name, fsName, "concurrency-limit") + metrics.AddReject(ctx, qs.qCfg.Name, fsName, "concurrency-limit") return nil, qs.isIdleLocked() } req = qs.dispatchSansQueueLocked(ctx, flowDistinguisher, fsName, descr1, descr2) @@ -262,7 +262,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, hashValue uint64, flowDist // concurrency shares and at max queue length already if req == nil { klog.V(5).Infof("QS(%s): rejecting request %q %#+v %#+v due to queue full", qs.qCfg.Name, fsName, descr1, descr2) - metrics.AddReject(qs.qCfg.Name, fsName, "queue-full") + metrics.AddReject(ctx, qs.qCfg.Name, fsName, "queue-full") return nil, qs.isIdleLocked() } @@ -351,7 +351,7 @@ func (req *request) wait() (bool, bool) { switch decision { case decisionReject: klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, req.descr1, req.descr2) - metrics.AddReject(qs.qCfg.Name, req.fsName, "time-out") + metrics.AddReject(req.ctx, qs.qCfg.Name, req.fsName, "time-out") return false, qs.isIdleLocked() case decisionCancel: // TODO(aaron-prindle) add metrics for this case @@ -448,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte if ok := qs.rejectOrEnqueueLocked(req); !ok { return nil } - metrics.ObserveQueueLength(qs.qCfg.Name, fsName, len(queue.requests)) + metrics.ObserveQueueLength(ctx, qs.qCfg.Name, fsName, len(queue.requests)) return req } @@ -486,7 +486,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue, fsName s req.decision.SetLocked(decisionReject) // get index for timed out requests timeoutIdx = i - metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) } else { break @@ -534,7 +534,7 @@ func (qs *queueSet) enqueueLocked(request *request) { } queue.Enqueue(request) qs.totRequestsWaiting++ - metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, 1) request.NoteQueued(true) qs.obsPair.RequestsWaiting.Add(1) } @@ -569,7 +569,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, flowDistinguish } req.decision.SetLocked(decisionExecute) qs.totRequestsExecuting++ - metrics.AddRequestsExecuting(qs.qCfg.Name, fsName, 1) + metrics.AddRequestsExecuting(ctx, qs.qCfg.Name, fsName, 1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(5).Enabled() { klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %q %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, fsName, descr1, descr2, qs.totRequestsExecuting) @@ -599,9 +599,9 @@ func (qs *queueSet) dispatchLocked() bool { qs.totRequestsWaiting-- qs.totRequestsExecuting++ queue.requestsExecuting++ - metrics.AddRequestsInQueues(qs.qCfg.Name, request.fsName, -1) + metrics.AddRequestsInQueues(request.ctx, qs.qCfg.Name, request.fsName, -1) request.NoteQueued(false) - metrics.AddRequestsExecuting(qs.qCfg.Name, request.fsName, 1) + metrics.AddRequestsExecuting(request.ctx, qs.qCfg.Name, request.fsName, 1) qs.obsPair.RequestsWaiting.Add(-1) qs.obsPair.RequestsExecuting.Add(1) if klog.V(6).Enabled() { @@ -631,7 +631,7 @@ func (qs *queueSet) cancelWait(req *request) { // remove the request queue.requests = append(queue.requests[:i], queue.requests[i+1:]...) qs.totRequestsWaiting-- - metrics.AddRequestsInQueues(qs.qCfg.Name, req.fsName, -1) + metrics.AddRequestsInQueues(req.ctx, qs.qCfg.Name, req.fsName, -1) req.NoteQueued(false) qs.obsPair.RequestsWaiting.Add(-1) break @@ -704,7 +704,7 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) bool func (qs *queueSet) finishRequestLocked(r *request) { now := qs.clock.Now() qs.totRequestsExecuting-- - metrics.AddRequestsExecuting(qs.qCfg.Name, r.fsName, -1) + metrics.AddRequestsExecuting(r.ctx, qs.qCfg.Name, r.fsName, -1) qs.obsPair.RequestsExecuting.Add(-1) if r.queue == nil { diff --git a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go index bdbaa94601f8..4ebe85577ba3 100644 --- a/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go +++ b/staging/src/k8s.io/apiserver/pkg/util/flowcontrol/metrics/metrics.go @@ -17,6 +17,7 @@ limitations under the License. package metrics import ( + "context" "strings" "sync" "time" @@ -221,12 +222,12 @@ var ( ) // AddRequestsInQueues adds the given delta to the gauge of the # of requests in the queues of the specified flowSchema and priorityLevel -func AddRequestsInQueues(priorityLevel, flowSchema string, delta int) { +func AddRequestsInQueues(ctx context.Context, priorityLevel, flowSchema string, delta int) { apiserverCurrentInqueueRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } // AddRequestsExecuting adds the given delta to the gauge of executing requests of the given flowSchema and priorityLevel -func AddRequestsExecuting(priorityLevel, flowSchema string, delta int) { +func AddRequestsExecuting(ctx context.Context, priorityLevel, flowSchema string, delta int) { apiserverCurrentExecutingRequests.WithLabelValues(priorityLevel, flowSchema).Add(float64(delta)) } @@ -236,26 +237,26 @@ func UpdateSharedConcurrencyLimit(priorityLevel string, limit int) { } // AddReject increments the # of rejected requests for flow control -func AddReject(priorityLevel, flowSchema, reason string) { - apiserverRejectedRequestsTotal.WithLabelValues(priorityLevel, flowSchema, reason).Add(1) +func AddReject(ctx context.Context, priorityLevel, flowSchema, reason string) { + apiserverRejectedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, reason).Add(1) } // AddDispatch increments the # of dispatched requests for flow control -func AddDispatch(priorityLevel, flowSchema string) { - apiserverDispatchedRequestsTotal.WithLabelValues(priorityLevel, flowSchema).Add(1) +func AddDispatch(ctx context.Context, priorityLevel, flowSchema string) { + apiserverDispatchedRequestsTotal.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Add(1) } // ObserveQueueLength observes the queue length for flow control -func ObserveQueueLength(priorityLevel, flowSchema string, length int) { - apiserverRequestQueueLength.WithLabelValues(priorityLevel, flowSchema).Observe(float64(length)) +func ObserveQueueLength(ctx context.Context, priorityLevel, flowSchema string, length int) { + apiserverRequestQueueLength.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(float64(length)) } // ObserveWaitingDuration observes the queue length for flow control -func ObserveWaitingDuration(priorityLevel, flowSchema, execute string, waitTime time.Duration) { - apiserverRequestWaitingSeconds.WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) +func ObserveWaitingDuration(ctx context.Context, priorityLevel, flowSchema, execute string, waitTime time.Duration) { + apiserverRequestWaitingSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema, execute).Observe(waitTime.Seconds()) } // ObserveExecutionDuration observes the execution duration for flow control -func ObserveExecutionDuration(priorityLevel, flowSchema string, executionTime time.Duration) { - apiserverRequestExecutionSeconds.WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) +func ObserveExecutionDuration(ctx context.Context, priorityLevel, flowSchema string, executionTime time.Duration) { + apiserverRequestExecutionSeconds.WithContext(ctx).WithLabelValues(priorityLevel, flowSchema).Observe(executionTime.Seconds()) }