Skip to content

Commit

Permalink
UPSTREAM: <carry>: optionally enable retry after until apiserver is r…
Browse files Browse the repository at this point in the history
…eady

OpenShift-Rebase-Source: fc3523f
  • Loading branch information
tkashem authored and bertinatto committed Jun 20, 2023
1 parent 83e6551 commit 286fede
Show file tree
Hide file tree
Showing 4 changed files with 272 additions and 0 deletions.
16 changes: 16 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Expand Up @@ -303,6 +303,18 @@ type Config struct {
// This grace period is orthogonal to other grace periods, and
// it is not overridden by any other grace period.
ShutdownWatchTerminationGracePeriod time.Duration

// SendRetryAfterWhileNotReadyOnce, if enabled, the apiserver will
// reject all incoming requests with a 503 status code and a
// 'Retry-After' response header until the apiserver has fully
// initialized, except for requests from a designated debugger group.
// This option ensures that the system stays consistent even when
// requests are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and
// orphaned content in case of the namespaces controller.
// NOTE: this option is applicable to Microshift only,
// this should never be enabled for OCP.
SendRetryAfterWhileNotReadyOnce bool
}

// EventSink allows to create events.
Expand Down Expand Up @@ -986,6 +998,10 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithMaxInFlightLimit(handler, c.MaxRequestsInFlight, c.MaxMutatingRequestsInFlight, c.LongRunningFunc)
}

if c.SendRetryAfterWhileNotReadyOnce {
handler = genericfilters.WithNotReady(handler, c.lifecycleSignals.HasBeenReady.Signaled())
}

handler = filterlatency.TrackCompleted(handler)
handler = genericapifilters.WithImpersonation(handler, c.Authorization.Authorizer, c.Serializer)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "impersonation")
Expand Down
@@ -0,0 +1,92 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package filters

import (
"errors"
"k8s.io/apiserver/pkg/warning"
"net/http"

"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
"k8s.io/apiserver/pkg/endpoints/request"
)

const (
// notReadyDebuggerGroup facilitates debugging if the apiserver takes longer
// to initilize. All request(s) from this designated group will be allowed
// while the apiserver is being initialized.
// The apiserver will reject all incoming requests with a 'Retry-After'
// response header until it has fully initialized, except for
// requests from this special debugger group.
notReadyDebuggerGroup = "system:openshift:risky-not-ready-microshift-debugging-group"
)

// WithNotReady rejects any incoming new request(s) with a 'Retry-After'
// response if the specified hasBeenReadyCh channel is still open, with
// the following exceptions:
// - all request(s) from the designated debugger group is exempt, this
// helps debug the apiserver if it takes longer to initialize.
// - local loopback requests (this exempts system:apiserver)
// - /healthz, /livez, /readyz, /metrics are exempt
//
// It includes new request(s) on a new or an existing TCP connection
// Any new request(s) arriving before hasBeenreadyCh is closed
// are replied with a 503 and the following response headers:
// - 'Retry-After: N` (so client can retry after N seconds)
func WithNotReady(handler http.Handler, hasBeenReadyCh <-chan struct{}) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
select {
case <-hasBeenReadyCh:
handler.ServeHTTP(w, req)
return
default:
}

requestor, exists := request.UserFrom(req.Context())
if !exists {
responsewriters.InternalError(w, req, errors.New("no user found for request"))
return
}

// make sure we exempt:
// - local loopback requests (this exempts system:apiserver)
// - health probes and metric scraping
// - requests from the exempt debugger group.
if requestor.GetName() == user.APIServerUser ||
hasExemptPathPrefix(req) ||
matchesDebuggerGroup(requestor, notReadyDebuggerGroup) {
warning.AddWarning(req.Context(), "", "The apiserver was still initializing, while this request was being served")
handler.ServeHTTP(w, req)
return
}

// Return a 503 status asking the client to try again after 5 seconds
w.Header().Set("Retry-After", "5")
http.Error(w, "The apiserver hasn't been fully initialized yet, please try again later.",
http.StatusServiceUnavailable)
})
}

func matchesDebuggerGroup(requestor user.Info, debugger string) bool {
for _, group := range requestor.GetGroups() {
if group == debugger {
return true
}
}
return false
}
@@ -0,0 +1,143 @@
package filters

import (
"net/http"
"net/http/httptest"
"testing"

"k8s.io/apiserver/pkg/authentication/user"
genericapifilters "k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/request"
)

func TestWithNotReady(t *testing.T) {
const warning = `299 - "The apiserver was still initializing, while this request was being served"`

tests := []struct {
name string
requestURL string
hasBeenReady bool
user *user.DefaultInfo
handlerInvoked int
retryAfterExpected string
warningExpected string
statusCodeexpected int
}{
{
name: "the apiserver is fully initialized",
hasBeenReady: true,
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
},
{
name: "the apiserver is initializing, local loopback",
hasBeenReady: false,
user: &user.DefaultInfo{Name: user.APIServerUser},
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
warningExpected: warning,
},
{
name: "the apiserver is initializing, exempt debugger group",
hasBeenReady: false,
user: &user.DefaultInfo{Groups: []string{"system:authenticated", notReadyDebuggerGroup}},
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
warningExpected: warning,
},
{
name: "the apiserver is initializing, readyz",
requestURL: "/readyz?verbose=1",
user: &user.DefaultInfo{},
hasBeenReady: false,
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
warningExpected: warning,
},
{
name: "the apiserver is initializing, healthz",
requestURL: "/healthz?verbose=1",
user: &user.DefaultInfo{},
hasBeenReady: false,
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
warningExpected: warning,
},
{
name: "the apiserver is initializing, livez",
requestURL: "/livez?verbose=1",
user: &user.DefaultInfo{},
hasBeenReady: false,
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
warningExpected: warning,
},
{
name: "the apiserver is initializing, metrics",
requestURL: "/metrics",
user: &user.DefaultInfo{},
hasBeenReady: false,
handlerInvoked: 1,
statusCodeexpected: http.StatusOK,
warningExpected: warning,
},
{
name: "the apiserver is initializing, non-exempt request",
hasBeenReady: false,
user: &user.DefaultInfo{Groups: []string{"system:authenticated", "system:masters"}},
statusCodeexpected: http.StatusServiceUnavailable,
retryAfterExpected: "5",
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
hasBeenReadyCh := make(chan struct{})
if test.hasBeenReady {
close(hasBeenReadyCh)
} else {
defer close(hasBeenReadyCh)
}

var handlerInvoked int
handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
handlerInvoked++
w.WriteHeader(http.StatusOK)
})

if len(test.requestURL) == 0 {
test.requestURL = "/api/v1/namespaces"
}
req, err := http.NewRequest(http.MethodGet, test.requestURL, nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
if test.user != nil {
req = req.WithContext(request.WithUser(req.Context(), test.user))
}
w := httptest.NewRecorder()

withNotReady := WithNotReady(handler, hasBeenReadyCh)
withNotReady = genericapifilters.WithWarningRecorder(withNotReady)
withNotReady.ServeHTTP(w, req)

if test.handlerInvoked != handlerInvoked {
t.Errorf("expected the handler to be invoked: %d times, but got: %d", test.handlerInvoked, handlerInvoked)
}
if test.statusCodeexpected != w.Code {
t.Errorf("expected Response Status Code: %d, but got: %d", test.statusCodeexpected, w.Code)
}

retryAfterGot := w.Header().Get("Retry-After")
if test.retryAfterExpected != retryAfterGot {
t.Errorf("expected Retry-After: %q, but got: %q", test.retryAfterExpected, retryAfterGot)
}

warningGot := w.Header().Get("Warning")
if test.warningExpected != warningGot {
t.Errorf("expected Warning: %s, but got: %s", test.warningExpected, warningGot)
}

})
}
}
Expand Up @@ -90,6 +90,18 @@ type ServerRunOptions struct {
// This grace period is orthogonal to other grace periods, and
// it is not overridden by any other grace period.
ShutdownWatchTerminationGracePeriod time.Duration

// SendRetryAfterWhileNotReadyOnce, if enabled, the apiserver will
// reject all incoming requests with a 503 status code and a
// 'Retry-After' response header until the apiserver has fully
// initialized, except for requests from a designated debugger group.
// This option ensures that the system stays consistent even when
// requests are received before the server has been initialized.
// In particular, it prevents child deletion in case of GC or/and
// orphaned content in case of the namespaces controller.
// NOTE: this option is applicable to Microshift only,
// this should never be enabled for OCP.
SendRetryAfterWhileNotReadyOnce bool
}

func NewServerRunOptions() *ServerRunOptions {
Expand All @@ -106,6 +118,7 @@ func NewServerRunOptions() *ServerRunOptions {
MaxRequestBodyBytes: defaults.MaxRequestBodyBytes,
EnablePriorityAndFairness: true,
ShutdownSendRetryAfter: false,
SendRetryAfterWhileNotReadyOnce: false,
}
}

Expand All @@ -126,6 +139,7 @@ func (s *ServerRunOptions) ApplyTo(c *server.Config) error {
c.PublicAddress = s.AdvertiseAddress
c.ShutdownSendRetryAfter = s.ShutdownSendRetryAfter
c.ShutdownWatchTerminationGracePeriod = s.ShutdownWatchTerminationGracePeriod
c.SendRetryAfterWhileNotReadyOnce = s.SendRetryAfterWhileNotReadyOnce

return nil
}
Expand Down Expand Up @@ -342,5 +356,12 @@ func (s *ServerRunOptions) AddUniversalFlags(fs *pflag.FlagSet) {
"This option, if set, represents the maximum amount of grace period the apiserver will wait "+
"for active watch request(s) to drain during the graceful server shutdown window.")

// NOTE: this option is applicable to Microshift only, this should never be enabled for OCP.
fs.BoolVar(&s.SendRetryAfterWhileNotReadyOnce, "send-retry-after-while-not-ready-once", s.SendRetryAfterWhileNotReadyOnce, ""+
"If true, incoming request(s) will be rejected with a '503' status code and a 'Retry-After' response header "+
"until the apiserver has initialized, except for requests from a certain group. This option ensures that the system stays "+
"consistent even when requests arrive at the server before it has been initialized. "+
"This option is applicable to Microshift only, this should never be enabled for OCP")

utilfeature.DefaultMutableFeatureGate.AddFlag(fs)
}

0 comments on commit 286fede

Please sign in to comment.