Skip to content

Commit

Permalink
UPSTREAM: <carry>: kube-apiserver: wire through isTerminating into ha…
Browse files Browse the repository at this point in the history
…ndler chain

Origin-commit: 5772e7285acbe901762d8cd8cb1fc33d8b459d04
  • Loading branch information
sttts authored and damemi committed Aug 27, 2021
1 parent aba4be7 commit 3388a73
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 19 deletions.
17 changes: 12 additions & 5 deletions cmd/kube-apiserver/app/server.go
Expand Up @@ -33,7 +33,6 @@ import (
"k8s.io/kubernetes/openshift-kube-apiserver/enablement"
"k8s.io/kubernetes/openshift-kube-apiserver/openshiftkubeapiserver"

"github.com/go-openapi/spec"
"github.com/spf13/cobra"
"github.com/spf13/pflag"

Expand Down Expand Up @@ -212,7 +211,7 @@ func Run(completeOptions completedServerRunOptions, stopCh <-chan struct{}) erro

// CreateServerChain creates the apiservers connected via delegation.
func CreateServerChain(completedOptions completedServerRunOptions, stopCh <-chan struct{}) (*aggregatorapiserver.APIAggregator, error) {
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions)
kubeAPIServerConfig, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(completedOptions, stopCh)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -270,15 +269,15 @@ func CreateProxyTransport() *http.Transport {
}

// CreateKubeAPIServerConfig creates all the resources for running the API server, but runs none of them
func CreateKubeAPIServerConfig(s completedServerRunOptions) (
func CreateKubeAPIServerConfig(s completedServerRunOptions, stopCh <-chan struct{}) (
*controlplane.Config,
aggregatorapiserver.ServiceResolver,
[]admission.PluginInitializer,
error,
) {
proxyTransport := CreateProxyTransport()

genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport)
genericConfig, versionedInformers, serviceResolver, pluginInitializers, admissionPostStartHook, storageFactory, err := buildGenericConfig(s.ServerRunOptions, proxyTransport, stopCh)
if err != nil {
return nil, nil, nil, err
}
Expand Down Expand Up @@ -390,7 +389,7 @@ func CreateKubeAPIServerConfig(s completedServerRunOptions) (
func buildGenericConfig(
s *options.ServerRunOptions,
proxyTransport *http.Transport,

stopCh <-chan struct{},
) (
genericConfig *genericapiserver.Config,
versionedInformers clientgoinformers.SharedInformerFactory,
Expand All @@ -401,6 +400,14 @@ func buildGenericConfig(
lastErr error,
) {
genericConfig = genericapiserver.NewConfig(legacyscheme.Codecs)
genericConfig.IsTerminating = func() bool {
select {
case <-stopCh:
return true
default:
return false
}
}
genericConfig.MergedResourceConfig = controlplane.DefaultAPIResourceConfigSource()

if lastErr = s.GenericServerRunOptions.ApplyTo(genericConfig); lastErr != nil {
Expand Down
2 changes: 1 addition & 1 deletion cmd/kube-scheduler/app/server.go
Expand Up @@ -240,7 +240,7 @@ func buildHandlerChain(handler http.Handler, authn authenticator.Request, authz
handler = genericapifilters.WithAuthentication(handler, authn, failedHandler, nil)
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHTTPLogging(handler)
handler = genericfilters.WithHTTPLogging(handler, nil)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)

return handler
Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/server/server.go
Expand Up @@ -918,7 +918,7 @@ var statusesNoTracePred = httplog.StatusIsNot(

// ServeHTTP responds to HTTP requests on the Kubelet.
func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred)
handler := httplog.WithLogging(s.restfulCont, statusesNoTracePred, nil)

// monitor http requests
var serverType string
Expand Down
5 changes: 4 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/server/config.go
Expand Up @@ -248,6 +248,9 @@ type Config struct {

// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager

// A func that returns whether the server is terminating. This can be nil.
IsTerminating func() bool
}

type RecommendedConfig struct {
Expand Down Expand Up @@ -785,7 +788,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithWarningRecorder(handler)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
handler = genericfilters.WithHTTPLogging(handler)
handler = genericfilters.WithHTTPLogging(handler, c.IsTerminating)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
Expand Down
4 changes: 2 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/server/filters/wrap.go
Expand Up @@ -60,8 +60,8 @@ func WithPanicRecovery(handler http.Handler, resolver request.RequestInfoResolve
}

// WithHTTPLogging enables logging of incoming requests.
func WithHTTPLogging(handler http.Handler) http.Handler {
return httplog.WithLogging(handler, httplog.DefaultStacktracePred)
func WithHTTPLogging(handler http.Handler, isTerminating func() bool) http.Handler {
return httplog.WithLogging(handler, httplog.DefaultStacktracePred, isTerminating)
}

func withPanicRecovery(handler http.Handler, crashHandler func(http.ResponseWriter, *http.Request, interface{})) http.Handler {
Expand Down
18 changes: 14 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go
Expand Up @@ -56,6 +56,7 @@ type respLogger struct {
statusStack string
addedInfo strings.Builder
startTime time.Time
isTerminating bool

captureErrorOutput bool

Expand All @@ -82,22 +83,25 @@ func DefaultStacktracePred(status int) bool {
}

// WithLogging wraps the handler with logging.
func WithLogging(handler http.Handler, pred StacktracePred) http.Handler {
func WithLogging(handler http.Handler, pred StacktracePred, isTerminatingFn func() bool) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
if old := respLoggerFromRequest(req); old != nil {
panic("multiple WithLogging calls!")
}

startTime := time.Now()
if receivedTimestamp, ok := request.ReceivedTimestampFrom(ctx); ok {
startTime = receivedTimestamp
}

rl := newLoggedWithStartTime(req, w, startTime).StacktraceWhen(pred)
isTerminating := false
if isTerminatingFn != nil {
isTerminating = isTerminatingFn()
}
rl := newLoggedWithStartTime(req, w, startTime).StacktraceWhen(pred).IsTerminating(isTerminating)
req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl))

if klog.V(3).Enabled() {
if klog.V(3).Enabled() || (rl.isTerminating && klog.V(1).Enabled()) {
defer rl.Log()
}
handler.ServeHTTP(rl, req)
Expand Down Expand Up @@ -156,6 +160,12 @@ func (rl *respLogger) StacktraceWhen(pred StacktracePred) *respLogger {
return rl
}

// IsTerminating informs the logger that the server is terminating.
func (rl *respLogger) IsTerminating(is bool) *respLogger {
rl.isTerminating = is
return rl
}

// StatusIsNot returns a StacktracePred which will cause stacktraces to be logged
// for any status *not* in the given list.
func StatusIsNot(statuses ...int) StacktracePred {
Expand Down
Expand Up @@ -63,7 +63,7 @@ func TestWithLogging(t *testing.T) {
}
var handler http.Handler
handler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {})
handler = WithLogging(WithLogging(handler, DefaultStacktracePred), DefaultStacktracePred)
handler = WithLogging(WithLogging(handler, DefaultStacktracePred, nil), DefaultStacktracePred, nil)

func() {
defer func() {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestLogOf(t *testing.T) {
}
})
if makeLogger {
handler = WithLogging(handler, DefaultStacktracePred)
handler = WithLogging(handler, DefaultStacktracePred, nil)
want = "*httplog.respLogger"
} else {
want = "*httplog.passthroughLogger"
Expand Down Expand Up @@ -120,7 +120,7 @@ func TestUnlogged(t *testing.T) {
}
})
if makeLogger {
handler = WithLogging(handler, DefaultStacktracePred)
handler = WithLogging(handler, DefaultStacktracePred, nil)
}

handler.ServeHTTP(origWriter, req)
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/controller-manager/app/serve.go
Expand Up @@ -48,7 +48,7 @@ func BuildHandlerChain(apiHandler http.Handler, authorizationInfo *apiserver.Aut
}
handler = genericapifilters.WithRequestInfo(handler, requestInfoResolver)
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHTTPLogging(handler)
handler = genericfilters.WithHTTPLogging(handler, nil)
handler = genericfilters.WithPanicRecovery(handler, requestInfoResolver)

return handler
Expand Down
2 changes: 1 addition & 1 deletion test/integration/framework/test_server.go
Expand Up @@ -135,7 +135,7 @@ func StartTestServer(t *testing.T, stopCh <-chan struct{}, setup TestServerSetup
t.Fatalf("failed to validate ServerRunOptions: %v", utilerrors.NewAggregate(errs))
}

kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions)
kubeAPIServerConfig, _, _, err := app.CreateKubeAPIServerConfig(completedOptions, nil)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 3388a73

Please sign in to comment.