Skip to content

Commit

Permalink
Merge pull request #114925 from tkashem/watch-termination
Browse files Browse the repository at this point in the history
apiserver: terminate watch with rate limiting during shutdown
  • Loading branch information
k8s-ci-robot committed Feb 27, 2023
2 parents 85c09cd + 791fcd6 commit a16fd54
Show file tree
Hide file tree
Showing 10 changed files with 606 additions and 61 deletions.
21 changes: 19 additions & 2 deletions staging/src/k8s.io/apiserver/pkg/endpoints/handlers/watch.go
Expand Up @@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers/negotiation"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/util/wsstream"
)

Expand Down Expand Up @@ -105,6 +106,11 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
embeddedEncoder = scope.Serializer.EncoderForVersion(serializer.Serializer, contentKind.GroupVersion())
}

var serverShuttingDownCh <-chan struct{}
if signals := apirequest.ServerShutdownSignalFrom(req.Context()); signals != nil {
serverShuttingDownCh = signals.ShuttingDown()
}

ctx := req.Context()

server := &WatchServer{
Expand Down Expand Up @@ -132,7 +138,8 @@ func serveWatch(watcher watch.Interface, scope *RequestScope, mediaTypeOptions n
return result
},

TimeoutFactory: &realTimeoutFactory{timeout},
TimeoutFactory: &realTimeoutFactory{timeout},
ServerShuttingDownCh: serverShuttingDownCh,
}

server.ServeHTTP(w, req)
Expand All @@ -156,7 +163,8 @@ type WatchServer struct {
// used to correct the object before we send it to the serializer
Fixup func(runtime.Object) runtime.Object

TimeoutFactory TimeoutFactory
TimeoutFactory TimeoutFactory
ServerShuttingDownCh <-chan struct{}
}

// ServeHTTP serves a series of encoded events via HTTP with Transfer-Encoding: chunked
Expand Down Expand Up @@ -230,6 +238,15 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {

for {
select {
case <-s.ServerShuttingDownCh:
// the server has signaled that it is shutting down (not accepting
// any new request), all active watch request(s) should return
// immediately here. The WithWatchTerminationDuringShutdown server
// filter will ensure that the response to the client is rate
// limited in order to avoid any thundering herd issue when the
// client(s) try to reestablish the WATCH on the other
// available apiserver instance(s).
return
case <-done:
return
case <-timeoutCh:
Expand Down
@@ -0,0 +1,55 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package request

import (
"context"
)

// The serverShutdownSignalKeyType type is unexported to prevent collisions
type serverShutdownSignalKeyType int

// serverShutdownSignalKey is the context key for storing the
// watch termination interface instance for a WATCH request.
const serverShutdownSignalKey serverShutdownSignalKeyType = iota

// ServerShutdownSignal is associated with the request context so
// the request handler logic has access to signals rlated to
// the server shutdown events
type ServerShutdownSignal interface {
// Signaled when the apiserver is not receiving any new request
ShuttingDown() <-chan struct{}
}

// ServerShutdownSignalFrom returns the ServerShutdownSignal instance
// associated with the request context.
// If there is no ServerShutdownSignal asscoaied with the context,
// nil is returned.
func ServerShutdownSignalFrom(ctx context.Context) ServerShutdownSignal {
ev, _ := ctx.Value(serverShutdownSignalKey).(ServerShutdownSignal)
return ev
}

// WithServerShutdownSignal returns a new context that stores
// the ServerShutdownSignal interface instance.
func WithServerShutdownSignal(parent context.Context, window ServerShutdownSignal) context.Context {
if ServerShutdownSignalFrom(parent) != nil {
return parent // Avoid double registering.
}

return context.WithValue(parent, serverShutdownSignalKey, window)
}
44 changes: 36 additions & 8 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Expand Up @@ -161,6 +161,10 @@ type Config struct {
// handlers associated with non long-running requests
// to complete while the server is shuting down.
NonLongRunningRequestWaitGroup *utilwaitgroup.SafeWaitGroup
// WatchRequestWaitGroup allows us to wait for all chain
// handlers associated with active watch requests to
// complete while the server is shuting down.
WatchRequestWaitGroup *utilwaitgroup.RateLimitedSafeWaitGroup
// DiscoveryAddresses is used to build the IPs pass to discovery. If nil, the ExternalAddress is
// always reported
DiscoveryAddresses discovery.Addresses
Expand Down Expand Up @@ -272,6 +276,23 @@ type Config struct {

// AggregatedDiscoveryGroupManager serves /apis in an aggregated form.
AggregatedDiscoveryGroupManager discoveryendpoint.ResourceManager

// ShutdownWatchTerminationGracePeriod, if set to a positive value,
// is the maximum duration the apiserver will wait for all active
// watch request(s) to drain.
// Once this grace period elapses, the apiserver will no longer
// wait for any active watch request(s) in flight to drain, it will
// proceed to the next step in the graceful server shutdown process.
// If set to a positive value, the apiserver will keep track of the
// number of active watch request(s) in flight and during shutdown
// it will wait, at most, for the specified duration and allow these
// active watch requests to drain with some rate limiting in effect.
// The default is zero, which implies the apiserver will not keep
// track of active watch request(s) in flight and will not wait
// for them to drain, this maintains backward compatibility.
// This grace period is orthogonal to other grace periods, and
// it is not overridden by any other grace period.
ShutdownWatchTerminationGracePeriod time.Duration
}

type RecommendedConfig struct {
Expand Down Expand Up @@ -371,6 +392,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
Serializer: codecs,
BuildHandlerChainFunc: DefaultBuildHandlerChain,
NonLongRunningRequestWaitGroup: new(utilwaitgroup.SafeWaitGroup),
WatchRequestWaitGroup: &utilwaitgroup.RateLimitedSafeWaitGroup{},
LegacyAPIGroupPrefixes: sets.NewString(DefaultLegacyAPIPrefix),
DisabledPostStartHooks: sets.NewString(),
PostStartHooks: map[string]PostStartHookConfigEntry{},
Expand Down Expand Up @@ -408,9 +430,10 @@ func NewConfig(codecs serializer.CodecFactory) *Config {

// Default to treating watch as a long-running operation
// Generic API servers have no inherent long-running subresources
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: lifecycleSignals,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()),
lifecycleSignals: lifecycleSignals,
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(),
ShutdownWatchTerminationGracePeriod: time.Duration(0),

APIServerID: id,
StorageVersionManager: storageversion.NewDefaultManager(),
Expand Down Expand Up @@ -670,16 +693,18 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
delegationTarget: delegationTarget,
EquivalentResourceRegistry: c.EquivalentResourceRegistry,
NonLongRunningRequestWaitGroup: c.NonLongRunningRequestWaitGroup,
WatchRequestWaitGroup: c.WatchRequestWaitGroup,
Handler: apiServerHandler,
UnprotectedDebugSocket: debugSocket,

listedPathProvider: apiServerHandler,

minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout,
ShutdownDelayDuration: c.ShutdownDelayDuration,
SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,
minRequestTimeout: time.Duration(c.MinRequestTimeout) * time.Second,
ShutdownTimeout: c.RequestTimeout,
ShutdownDelayDuration: c.ShutdownDelayDuration,
ShutdownWatchTerminationGracePeriod: c.ShutdownWatchTerminationGracePeriod,
SecureServingInfo: c.SecureServing,
ExternalAddress: c.ExternalAddress,

openAPIConfig: c.OpenAPIConfig,
openAPIV3Config: c.OpenAPIV3Config,
Expand Down Expand Up @@ -907,6 +932,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithRequestDeadline(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator,
c.LongRunningFunc, c.Serializer, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.NonLongRunningRequestWaitGroup)
if c.ShutdownWatchTerminationGracePeriod > 0 {
handler = genericfilters.WithWatchTerminationDuringShutdown(handler, c.lifecycleSignals, c.WatchRequestWaitGroup)
}
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
handler = genericfilters.WithProbabilisticGoaway(handler, c.GoawayChance)
}
Expand Down
@@ -0,0 +1,62 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filters

import (
"errors"
"net/http"

"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog/v2"
)

func WithWatchTerminationDuringShutdown(handler http.Handler, termination apirequest.ServerShutdownSignal, wg RequestWaitGroup) http.Handler {
if termination == nil || wg == nil {
klog.Warningf("watch termination during shutdown not attached to the handler chain")
return handler
}
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context"))
return
}
if !watchVerbs.Has(requestInfo.Verb) {
handler.ServeHTTP(w, req)
return
}

if err := wg.Add(1); err != nil {
// When apiserver is shutting down, signal clients to retry
// There is a good chance the client hit a different server, so a tight retry is good for client responsiveness.
waitGroupWriteRetryAfterToResponse(w)
return
}

// attach ServerShutdownSignal to the watch request so that the
// watch handler loop can return as soon as the server signals
// that it is shutting down.
ctx = apirequest.WithServerShutdownSignal(req.Context(), termination)
req = req.WithContext(ctx)

defer wg.Done()
handler.ServeHTTP(w, req)
})
}

0 comments on commit a16fd54

Please sign in to comment.