Skip to content

Commit

Permalink
UPSTREAM: 96901: apiserver: plumb context with request deadline
Browse files Browse the repository at this point in the history
- as soon as a request is received by the apiserver, determine the
  timeout of the request and set a new request context with the deadline.
- the timeout filter that times out non-long-running requests should
  use the request context as opposed to a fixed 60s wait today.
- admission and storage layer uses the same request context with the
  deadline specified.

we use the default timeout enforced by the apiserver:
- if the user has specified a timeout of 0s, this implies no timeout on the user's part.
- if the user has specified a timeout that exceeds the maximum deadline allowed by the apiserver.
  • Loading branch information
tkashem committed Feb 3, 2021
1 parent cac2421 commit f536513
Show file tree
Hide file tree
Showing 18 changed files with 763 additions and 92 deletions.
4 changes: 3 additions & 1 deletion pkg/kubeapiserver/server/insecure_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ func BuildInsecureHandlerChain(apiHandler http.Handler, c *server.Config) http.H
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyChecker, c.LongRunningFunc)
handler = genericapifilters.WithAuthentication(handler, server.InsecureSuperuser{}, nil, nil)
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc)
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyChecker,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithWarningRecorder(handler)
Expand Down
4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/audit/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,12 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/apis/audit:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
)
Expand Down
7 changes: 5 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,10 +282,13 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
handler := genericapifilters.WithAudit(mux, auditSink, auditpolicy.FakeChecker(auditinternal.LevelRequestResponse, nil), func(r *http.Request, requestInfo *request.RequestInfo) bool {
longRunningCheck := func(r *http.Request, requestInfo *request.RequestInfo) bool {
// simplified long-running check
return requestInfo.Verb == "watch" || requestInfo.Verb == "proxy"
})
}
fakeChecker := auditpolicy.FakeChecker(auditinternal.LevelRequestResponse, nil)
handler := genericapifilters.WithAudit(mux, auditSink, fakeChecker, longRunningCheck)
handler = genericapifilters.WithRequestDeadline(handler, auditSink, fakeChecker, longRunningCheck, codecs, 60*time.Second)
handler = genericapifilters.WithRequestInfo(handler, testRequestInfoResolver())

return &defaultAPIServer{handler, container}
Expand Down
4 changes: 4 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/filters/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_test(
"cachecontrol_test.go",
"impersonation_test.go",
"metrics_test.go",
"request_deadline_test.go",
"request_received_time_test.go",
"requestinfo_test.go",
"warning_test.go",
Expand All @@ -24,6 +25,7 @@ go_test(
deps = [
"//staging/src/k8s.io/api/authentication/v1:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer:go_default_library",
Expand All @@ -39,6 +41,7 @@ go_test(
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library",
"//staging/src/k8s.io/component-base/metrics/testutil:go_default_library",
"//vendor/github.com/google/go-cmp/cmp:go_default_library",
"//vendor/github.com/google/uuid:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
],
Expand All @@ -56,6 +59,7 @@ go_library(
"doc.go",
"impersonation.go",
"metrics.go",
"request_deadline.go",
"request_received_time.go",
"requestinfo.go",
"storageversion.go",
Expand Down
172 changes: 172 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/endpoints/filters/request_deadline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filters

import (
"context"
"errors"
"fmt"
"net/http"
"time"

apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilclock "k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
auditinternal "k8s.io/apiserver/pkg/apis/audit"
"k8s.io/apiserver/pkg/audit"
"k8s.io/apiserver/pkg/audit/policy"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
)

const (
// The 'timeout' query parameter in the request URL has an invalid duration specifier
invalidTimeoutInURL = "invalid timeout specified in the request URL"
)

// WithRequestDeadline determines the timeout duration applicable to the given request and sets a new context
// with the appropriate deadline.
// auditWrapper provides an http.Handler that audits a failed request.
// longRunning returns true if he given request is a long running request.
// requestTimeoutMaximum specifies the default request timeout value.
func WithRequestDeadline(handler http.Handler, sink audit.Sink, policy policy.Checker, longRunning request.LongRunningRequestCheck,
negotiatedSerializer runtime.NegotiatedSerializer, requestTimeoutMaximum time.Duration) http.Handler {
return withRequestDeadline(handler, sink, policy, longRunning, negotiatedSerializer, requestTimeoutMaximum, utilclock.RealClock{})
}

func withRequestDeadline(handler http.Handler, sink audit.Sink, policy policy.Checker, longRunning request.LongRunningRequestCheck,
negotiatedSerializer runtime.NegotiatedSerializer, requestTimeoutMaximum time.Duration, clock utilclock.PassiveClock) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()

requestInfo, ok := request.RequestInfoFrom(ctx)
if !ok {
handleError(w, req, http.StatusInternalServerError, fmt.Errorf("no RequestInfo found in context, handler chain must be wrong"))
return
}
if longRunning(req, requestInfo) {
handler.ServeHTTP(w, req)
return
}

userSpecifiedTimeout, ok, err := parseTimeout(req)
if err != nil {
statusErr := apierrors.NewBadRequest(fmt.Sprintf("%s", err.Error()))

klog.Errorf("Error - %s: %#v", err.Error(), req.RequestURI)

failed := failedErrorHandler(negotiatedSerializer, statusErr)
failWithAudit := withFailedRequestAudit(failed, statusErr, sink, policy)
failWithAudit.ServeHTTP(w, req)
return
}

timeout := requestTimeoutMaximum
if ok {
// we use the default timeout enforced by the apiserver:
// - if the user has specified a timeout of 0s, this implies no timeout on the user's part.
// - if the user has specified a timeout that exceeds the maximum deadline allowed by the apiserver.
if userSpecifiedTimeout > 0 && userSpecifiedTimeout < requestTimeoutMaximum {
timeout = userSpecifiedTimeout
}
}

started := clock.Now()
if requestStartedTimestamp, ok := request.ReceivedTimestampFrom(ctx); ok {
started = requestStartedTimestamp
}

ctx, cancel := context.WithDeadline(ctx, started.Add(timeout))
defer cancel()

req = req.WithContext(ctx)
handler.ServeHTTP(w, req)
})
}

// withFailedRequestAudit decorates a failed http.Handler and is used to audit a failed request.
// statusErr is used to populate the Message property of ResponseStatus.
func withFailedRequestAudit(failedHandler http.Handler, statusErr *apierrors.StatusError, sink audit.Sink, policy policy.Checker) http.Handler {
if sink == nil || policy == nil {
return failedHandler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
req, ev, omitStages, err := createAuditEventAndAttachToContext(req, policy)
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to create audit event: %v", err))
responsewriters.InternalError(w, req, errors.New("failed to create audit event"))
return
}
if ev == nil {
failedHandler.ServeHTTP(w, req)
return
}

ev.ResponseStatus = &metav1.Status{}
ev.Stage = auditinternal.StageResponseStarted
if statusErr != nil {
ev.ResponseStatus.Message = statusErr.Error()
}

rw := decorateResponseWriter(w, ev, sink, omitStages)
failedHandler.ServeHTTP(rw, req)
})
}

// failedErrorHandler returns an http.Handler that uses the specified StatusError object
// to render an error response to the request.
func failedErrorHandler(s runtime.NegotiatedSerializer, statusError *apierrors.StatusError) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, found := request.RequestInfoFrom(ctx)
if !found {
responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
return
}

gv := schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}
responsewriters.ErrorNegotiated(statusError, s, gv, w, req)
})
}

// parseTimeout parses the given HTTP request URL and extracts the timeout query parameter
// value if specified by the user.
// If a timeout is not specified the function returns false and err is set to nil
// If the value specified is malformed then the function returns false and err is set
func parseTimeout(req *http.Request) (time.Duration, bool, error) {
value := req.URL.Query().Get("timeout")
if value == "" {
return 0, false, nil
}

timeout, err := time.ParseDuration(value)
if err != nil {
return 0, false, fmt.Errorf("%s - %s", invalidTimeoutInURL, err.Error())
}

return timeout, true, nil
}

func handleError(w http.ResponseWriter, r *http.Request, code int, err error) {
errorMsg := fmt.Sprintf("Error - %s: %#v", err.Error(), r.RequestURI)
http.Error(w, errorMsg, code)
klog.Errorf(errorMsg)
}
Loading

0 comments on commit f536513

Please sign in to comment.