Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply request context to metrics in apiserver. #98246

Merged
merged 4 commits into from Feb 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/audit/metrics.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package audit

import (
"context"
"fmt"

auditinternal "k8s.io/apiserver/pkg/apis/audit"
Expand Down Expand Up @@ -84,13 +85,13 @@ func init() {
}

// ObserveEvent updates the relevant prometheus metrics for the generated audit event.
func ObserveEvent() {
eventCounter.Inc()
func ObserveEvent(ctx context.Context) {
eventCounter.WithContext(ctx).Inc()
}

// ObservePolicyLevel updates the relevant prometheus metrics with the audit level for a request.
func ObservePolicyLevel(level auditinternal.Level) {
levelCounter.WithLabelValues(string(level)).Inc()
func ObservePolicyLevel(ctx context.Context, level auditinternal.Level) {
levelCounter.WithContext(ctx).WithLabelValues(string(level)).Inc()
}

// HandlePluginError handles an error that occurred in an audit plugin. This method should only be
Expand Down
Expand Up @@ -149,7 +149,7 @@ func (a *Authenticator) AuthenticateRequest(req *http.Request) (*authenticator.R
}

remaining := req.TLS.PeerCertificates[0].NotAfter.Sub(time.Now())
clientCertificateExpirationHistogram.Observe(remaining.Seconds())
clientCertificateExpirationHistogram.WithContext(req.Context()).Observe(remaining.Seconds())
chains, err := req.TLS.PeerCertificates[0].Verify(optsCopy)
if err != nil {
return nil, false, fmt.Errorf(
Expand Down
Expand Up @@ -133,7 +133,7 @@ func (a *cachedTokenAuthenticator) AuthenticateToken(ctx context.Context, token
}

func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, token string) *cacheRecord {
doneAuthenticating := stats.authenticating()
doneAuthenticating := stats.authenticating(ctx)

auds, audsOk := authenticator.AudiencesFrom(ctx)

Expand All @@ -145,15 +145,15 @@ func (a *cachedTokenAuthenticator) doAuthenticateToken(ctx context.Context, toke
}

// Record cache miss
doneBlocking := stats.blocking()
doneBlocking := stats.blocking(ctx)
defer doneBlocking()
defer doneAuthenticating(false)

c := a.group.DoChan(key, func() (val interface{}, _ error) {
// always use one place to read and write the output of AuthenticateToken
record := &cacheRecord{}

doneFetching := stats.fetching()
doneFetching := stats.fetching(ctx)
// We're leaving the request handling stack so we need to handle crashes
// ourselves. Log a stack trace and return a 500 if something panics.
defer func() {
Expand Down
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cache

import (
"context"
"time"

"k8s.io/component-base/metrics"
Expand Down Expand Up @@ -86,7 +87,7 @@ type statsCollector struct{}

var stats = statsCollector{}

func (statsCollector) authenticating() func(hit bool) {
func (statsCollector) authenticating(ctx context.Context) func(hit bool) {
start := time.Now()
return func(hit bool) {
var tag string
Expand All @@ -98,18 +99,18 @@ func (statsCollector) authenticating() func(hit bool) {

latency := time.Since(start)

requestCount.WithLabelValues(tag).Inc()
requestLatency.WithLabelValues(tag).Observe(float64(latency.Milliseconds()) / 1000)
requestCount.WithContext(ctx).WithLabelValues(tag).Inc()
requestLatency.WithContext(ctx).WithLabelValues(tag).Observe(float64(latency.Milliseconds()) / 1000)
}
}

func (statsCollector) blocking() func() {
activeFetchCount.WithLabelValues(fetchBlockedTag).Inc()
return activeFetchCount.WithLabelValues(fetchBlockedTag).Dec
func (statsCollector) blocking(ctx context.Context) func() {
activeFetchCount.WithContext(ctx).WithLabelValues(fetchBlockedTag).Inc()
return activeFetchCount.WithContext(ctx).WithLabelValues(fetchBlockedTag).Dec
}

func (statsCollector) fetching() func(ok bool) {
activeFetchCount.WithLabelValues(fetchInFlightTag).Inc()
func (statsCollector) fetching(ctx context.Context) func(ok bool) {
activeFetchCount.WithContext(ctx).WithLabelValues(fetchInFlightTag).Inc()
return func(ok bool) {
var tag string
if ok {
Expand All @@ -118,8 +119,8 @@ func (statsCollector) fetching() func(ok bool) {
tag = fetchFailedTag
}

fetchCount.WithLabelValues(tag).Inc()
fetchCount.WithContext(ctx).WithLabelValues(tag).Inc()

activeFetchCount.WithLabelValues(fetchInFlightTag).Dec()
activeFetchCount.WithContext(ctx).WithLabelValues(fetchInFlightTag).Dec()
}
}
Expand Up @@ -56,8 +56,8 @@ func TrackStarted(handler http.Handler, name string) http.Handler {
// TrackCompleted measures the timestamp the given handler has completed execution and then
// it updates the corresponding metric with the filter latency duration.
func TrackCompleted(handler http.Handler) http.Handler {
return trackCompleted(handler, utilclock.RealClock{}, func(fr *requestFilterRecord, completedAt time.Time) {
metrics.RecordFilterLatency(fr.name, completedAt.Sub(fr.startedTimestamp))
return trackCompleted(handler, utilclock.RealClock{}, func(ctx context.Context, fr *requestFilterRecord, completedAt time.Time) {
metrics.RecordFilterLatency(ctx, fr.name, completedAt.Sub(fr.startedTimestamp))
})
}

Expand All @@ -81,7 +81,7 @@ func trackStarted(handler http.Handler, name string, clock utilclock.PassiveCloc
})
}

func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(*requestFilterRecord, time.Time)) http.Handler {
func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action func(context.Context, *requestFilterRecord, time.Time)) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// The previous filter has just completed.
completedAt := clock.Now()
Expand All @@ -90,7 +90,7 @@ func trackCompleted(handler http.Handler, clock utilclock.PassiveClock, action f

ctx := r.Context()
if fr := requestFilterRecordFrom(ctx); fr != nil {
action(fr, completedAt)
action(ctx, fr, completedAt)
}
})
}
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package filterlatency

import (
"context"
"net/http"
"net/http/httptest"
"testing"
Expand Down Expand Up @@ -120,7 +121,7 @@ func TestTrackCompletedContextHasFilterRecord(t *testing.T) {
})

requestFilterEndedAt := time.Now()
wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(fr *requestFilterRecord, completedAt time.Time) {
wrapped := trackCompleted(handler, utilclock.NewFakeClock(requestFilterEndedAt), func(_ context.Context, fr *requestFilterRecord, completedAt time.Time) {
actionCallCount++
filterRecordGot = fr
filterCompletedAtGot = completedAt
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestTrackCompletedContextDoesNotHaveFilterRecord(t *testing.T) {
handlerCallCount++
})

wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ *requestFilterRecord, _ time.Time) {
wrapped := trackCompleted(handler, utilclock.NewFakeClock(time.Now()), func(_ context.Context, _ *requestFilterRecord, _ time.Time) {
actionCallCount++
})

Expand Down
25 changes: 14 additions & 11 deletions staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit.go
Expand Up @@ -18,6 +18,7 @@ package filters

import (
"bufio"
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -56,8 +57,8 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
}

ev.Stage = auditinternal.StageRequestReceived
if processed := processAuditEvent(sink, ev, omitStages); !processed {
audit.ApiserverAuditDroppedCounter.Inc()
if processed := processAuditEvent(ctx, sink, ev, omitStages); !processed {
audit.ApiserverAuditDroppedCounter.WithContext(ctx).Inc()
responsewriters.InternalError(w, req, errors.New("failed to store audit event"))
return
}
Expand All @@ -70,7 +71,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
longRunningSink = sink
}
}
respWriter := decorateResponseWriter(w, ev, longRunningSink, omitStages)
respWriter := decorateResponseWriter(ctx, w, ev, longRunningSink, omitStages)

// send audit event when we leave this func, either via a panic or cleanly. In the case of long
// running requests, this will be the second audit event.
Expand All @@ -84,7 +85,7 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
Reason: metav1.StatusReasonInternalError,
Message: fmt.Sprintf("APIServer panic'd: %v", r),
}
processAuditEvent(sink, ev, omitStages)
processAuditEvent(ctx, sink, ev, omitStages)
return
}

Expand All @@ -98,14 +99,14 @@ func WithAudit(handler http.Handler, sink audit.Sink, policy policy.Checker, lon
if ev.ResponseStatus == nil && longRunningSink != nil {
ev.ResponseStatus = fakedSuccessStatus
ev.Stage = auditinternal.StageResponseStarted
processAuditEvent(longRunningSink, ev, omitStages)
processAuditEvent(ctx, longRunningSink, ev, omitStages)
}

ev.Stage = auditinternal.StageResponseComplete
if ev.ResponseStatus == nil {
ev.ResponseStatus = fakedSuccessStatus
}
processAuditEvent(sink, ev, omitStages)
processAuditEvent(ctx, sink, ev, omitStages)
}()
handler.ServeHTTP(respWriter, req)
})
Expand All @@ -125,7 +126,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
}

level, omitStages := policy.LevelAndStages(attribs)
audit.ObservePolicyLevel(level)
audit.ObservePolicyLevel(ctx, level)
if level == auditinternal.LevelNone {
// Don't audit.
return req, nil, nil, nil
Expand All @@ -145,7 +146,7 @@ func createAuditEventAndAttachToContext(req *http.Request, policy policy.Checker
return req, ev, omitStages, nil
}

func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
func processAuditEvent(ctx context.Context, sink audit.Sink, ev *auditinternal.Event, omitStages []auditinternal.Stage) bool {
for _, stage := range omitStages {
if ev.Stage == stage {
return true
Expand All @@ -157,12 +158,13 @@ func processAuditEvent(sink audit.Sink, ev *auditinternal.Event, omitStages []au
} else {
ev.StageTimestamp = metav1.NewMicroTime(time.Now())
}
audit.ObserveEvent()
audit.ObserveEvent(ctx)
return sink.ProcessEvents(ev)
}

func decorateResponseWriter(responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
func decorateResponseWriter(ctx context.Context, responseWriter http.ResponseWriter, ev *auditinternal.Event, sink audit.Sink, omitStages []auditinternal.Stage) http.ResponseWriter {
delegate := &auditResponseWriter{
ctx: ctx,
ResponseWriter: responseWriter,
event: ev,
sink: sink,
Expand All @@ -186,6 +188,7 @@ var _ http.ResponseWriter = &auditResponseWriter{}
// create immediately an event (for long running requests).
type auditResponseWriter struct {
http.ResponseWriter
ctx context.Context
event *auditinternal.Event
once sync.Once
sink audit.Sink
Expand All @@ -205,7 +208,7 @@ func (a *auditResponseWriter) processCode(code int) {
a.event.Stage = auditinternal.StageResponseStarted

if a.sink != nil {
processAuditEvent(a.sink, a.event, a.omitStages)
processAuditEvent(a.ctx, a.sink, a.event, a.omitStages)
}
})
}
Expand Down
11 changes: 6 additions & 5 deletions staging/src/k8s.io/apiserver/pkg/endpoints/filters/audit_test.go
Expand Up @@ -18,6 +18,7 @@ package filters

import (
"bufio"
"context"
"net"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -92,14 +93,14 @@ func (*fancyResponseWriter) Flush() {}
func (*fancyResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { return nil, nil, nil }

func TestConstructResponseWriter(t *testing.T) {
actual := decorateResponseWriter(&simpleResponseWriter{}, nil, nil, nil)
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, nil, nil, nil)
switch v := actual.(type) {
case *auditResponseWriter:
default:
t.Errorf("Expected auditResponseWriter, got %v", reflect.TypeOf(v))
}

actual = decorateResponseWriter(&fancyResponseWriter{}, nil, nil, nil)
actual = decorateResponseWriter(context.Background(), &fancyResponseWriter{}, nil, nil, nil)
switch v := actual.(type) {
case *fancyResponseWriterDelegator:
default:
Expand All @@ -109,7 +110,7 @@ func TestConstructResponseWriter(t *testing.T) {

func TestDecorateResponseWriterWithoutChannel(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil)
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)

// write status. This will not block because firstEventSentCh is nil
actual.WriteHeader(42)
Expand All @@ -123,7 +124,7 @@ func TestDecorateResponseWriterWithoutChannel(t *testing.T) {

func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, nil, nil)
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, nil, nil)

// write status. This will not block because firstEventSentCh is nil
actual.Write([]byte("foo"))
Expand All @@ -138,7 +139,7 @@ func TestDecorateResponseWriterWithImplicitWrite(t *testing.T) {
func TestDecorateResponseWriterChannel(t *testing.T) {
sink := &fakeAuditSink{}
ev := &auditinternal.Event{}
actual := decorateResponseWriter(&simpleResponseWriter{}, ev, sink, nil)
actual := decorateResponseWriter(context.Background(), &simpleResponseWriter{}, ev, sink, nil)

done := make(chan struct{})
go func() {
Expand Down
Expand Up @@ -47,7 +47,7 @@ func WithAuthentication(handler http.Handler, auth authenticator.Request, failed
req = req.WithContext(authenticator.WithAudiences(req.Context(), apiAuds))
}
resp, ok, err := auth.AuthenticateRequest(req)
defer recordAuthMetrics(resp, ok, err, apiAuds, authenticationStart)
defer recordAuthMetrics(req.Context(), resp, ok, err, apiAuds, authenticationStart)
if err != nil || !ok {
if err != nil {
klog.ErrorS(err, "Unable to authenticate the request")
Expand Down
Expand Up @@ -52,7 +52,7 @@ func WithFailedAuthenticationAudit(failedHandler http.Handler, sink audit.Sink,
ev.ResponseStatus.Message = getAuthMethods(req)
ev.Stage = auditinternal.StageResponseStarted

rw := decorateResponseWriter(w, ev, sink, omitStages)
rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
failedHandler.ServeHTTP(rw, req)
})
}
Expand Down
9 changes: 5 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/endpoints/filters/metrics.go
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package filters

import (
"context"
"strings"
"time"

Expand Down Expand Up @@ -75,7 +76,7 @@ func init() {
legacyregistry.MustRegister(authenticationLatency)
}

func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) {
func recordAuthMetrics(ctx context.Context, resp *authenticator.Response, ok bool, err error, apiAudiences authenticator.Audiences, authStart time.Time) {
var resultLabel string

switch {
Expand All @@ -85,11 +86,11 @@ func recordAuthMetrics(resp *authenticator.Response, ok bool, err error, apiAudi
resultLabel = failureLabel
default:
resultLabel = successLabel
authenticatedUserCounter.WithLabelValues(compressUsername(resp.User.GetName())).Inc()
authenticatedUserCounter.WithContext(ctx).WithLabelValues(compressUsername(resp.User.GetName())).Inc()
}

authenticatedAttemptsCounter.WithLabelValues(resultLabel).Inc()
authenticationLatency.WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds())
authenticatedAttemptsCounter.WithContext(ctx).WithLabelValues(resultLabel).Inc()
authenticationLatency.WithContext(ctx).WithLabelValues(resultLabel).Observe(time.Since(authStart).Seconds())
}

// compressUsername maps all possible usernames onto a small set of categories
Expand Down
Expand Up @@ -126,7 +126,7 @@ func withFailedRequestAudit(failedHandler http.Handler, statusErr *apierrors.Sta
ev.ResponseStatus.Message = statusErr.Error()
}

rw := decorateResponseWriter(w, ev, sink, omitStages)
rw := decorateResponseWriter(req.Context(), w, ev, sink, omitStages)
failedHandler.ServeHTTP(rw, req)
})
}
Expand Down