Skip to content

Commit

Permalink
UPSTREAM: <carry>: add conditional shutdown response header
Browse files Browse the repository at this point in the history
  • Loading branch information
tkashem authored and bertinatto committed Jul 25, 2023
1 parent 7801795 commit fd7d9fa
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 172 deletions.
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/pkg/server/config.go
Expand Up @@ -1024,7 +1024,6 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithAudit(handler, c.AuditBackend, c.AuditPolicyRuleEvaluator, c.LongRunningFunc)
handler = filterlatency.TrackStarted(handler, c.TracerProvider, "audit")

handler = genericfilters.WithShutdownLateAnnotation(handler, c.lifecycleSignals.ShutdownInitiated, c.ShutdownDelayDuration)
handler = genericfilters.WithStartupEarlyAnnotation(handler, c.lifecycleSignals.HasBeenReady)

failedHandler := genericapifilters.Unauthorized(c.Serializer)
Expand Down Expand Up @@ -1059,6 +1058,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithRetryAfter(handler, c.lifecycleSignals.NotAcceptingNewRequest.Signaled())
}
handler = genericfilters.WithOptInRetryAfter(handler, c.newServerFullyInitializedFunc())
handler = genericfilters.WithShutdownResponseHeader(handler, c.lifecycleSignals.ShutdownInitiated, c.ShutdownDelayDuration, c.APIServerID)
handler = genericfilters.WithHTTPLogging(handler, c.newIsTerminatingFunc())
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
Expand Down
Expand Up @@ -64,16 +64,20 @@ func exemptIfHealthProbe(r *http.Request) bool {
return false
}

// WithShutdownLateAnnotation, if added to the handler chain, tracks the
// incoming request(s) after the apiserver has initiated the graceful
// shutdown, and annoates the audit event for these request(s) with
// diagnostic information.
// This enables us to identify the actor(s)/load balancer(s) that are sending
// requests to the apiserver late during the server termination.
// It should be placed after (in order of execution) the
// 'WithAuthentication' filter.
func WithShutdownLateAnnotation(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration) http.Handler {
return withShutdownLateAnnotation(handler, shutdownInitiated, delayDuration, exemptIfHealthProbe, clockutils.RealClock{})
// WithShutdownResponseHeader, if added to the handler chain, adds a header
// 'X-OpenShift-Disruption' to the response with the following information:
//
// shutdown={true|false} shutdown-delay-duration=%s elapsed=%s host=%s
// shutdown: whether the server is currently shutting down gracefully.
// shutdown-delay-duration: value of --shutdown-delay-duration server run option
// elapsed: how much time has elapsed since the server received a TERM signal
// host: host name of the server, it is used to identify the server instance
// from the others.
//
// This handler will add the response header only if the client opts in by
// adding the 'X-Openshift-If-Disruption' header to the request.
func WithShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string) http.Handler {
return withShutdownResponseHeader(handler, shutdownInitiated, delayDuration, apiServerID, clockutils.RealClock{})
}

// WithStartupEarlyAnnotation annotates the request with an annotation keyed as
Expand All @@ -84,59 +88,38 @@ func WithStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEven
return withStartupEarlyAnnotation(handler, hasBeenReady, exemptIfHealthProbe)
}

func withShutdownLateAnnotation(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, shouldExemptFn shouldExemptFunc, clock clockutils.PassiveClock) http.Handler {
func withShutdownResponseHeader(handler http.Handler, shutdownInitiated lifecycleEvent, delayDuration time.Duration, apiServerID string, clock clockutils.PassiveClock) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
select {
case <-shutdownInitiated.Signaled():
default:
if len(req.Header.Get("X-Openshift-If-Disruption")) == 0 {
handler.ServeHTTP(w, req)
return
}

if shouldExemptFn(req) {
msgFn := func(shutdown bool, elapsed time.Duration) string {
return fmt.Sprintf("shutdown=%t shutdown-delay-duration=%s elapsed=%s host=%s",
shutdown, delayDuration.Round(time.Second).String(), elapsed.Round(time.Second).String(), apiServerID)
}

select {
case <-shutdownInitiated.Signaled():
default:
w.Header().Set("X-OpenShift-Disruption", msgFn(false, time.Duration(0)))
handler.ServeHTTP(w, req)
return
}

shutdownInitiatedAt := shutdownInitiated.SignaledAt()
if shutdownInitiatedAt == nil {
w.Header().Set("X-OpenShift-Disruption", msgFn(true, time.Duration(0)))
handler.ServeHTTP(w, req)
return
}

elapsedSince := clock.Since(*shutdownInitiatedAt)
// TODO: 80% is the threshold, if requests arrive after 80% of
// shutdown-delay-duration elapses we annotate the request as late=true.
late := lateMsg(delayDuration, elapsedSince, 80)

// NOTE: some upstream unit tests have authentication disabled and will
// fail if we require the requestor to be present in the request
// context. Fixing those unit tests will increase the chance of merge
// conflict during rebase.
// This also implies that this filter must be placed after (in order of
// execution) the 'WithAuthentication' filter.
self := "self="
if requestor, exists := request.UserFrom(req.Context()); exists && requestor != nil {
self = fmt.Sprintf("%s%t", self, requestor.GetName() == user.APIServerUser)
}

message := fmt.Sprintf("%s %s loopback=%t", late, self, isLoopback(req.RemoteAddr))
audit.AddAuditAnnotation(req.Context(), "apiserver.k8s.io/shutdown", message)

w.Header().Set("X-OpenShift-Disruption", msgFn(true, clock.Since(*shutdownInitiatedAt)))
handler.ServeHTTP(w, req)
w.Header().Set("X-OpenShift-Shutdown", message)
})
}

func lateMsg(delayDuration, elapsedSince time.Duration, threshold float64) string {
if delayDuration == time.Duration(0) {
return fmt.Sprintf("elapsed=%s threshold= late=%t", elapsedSince.Round(time.Second).String(), true)
}

percentElapsed := (float64(elapsedSince) / float64(delayDuration)) * 100
return fmt.Sprintf("elapsed=%s threshold=%.2f%% late=%t",
elapsedSince.Round(time.Second).String(), percentElapsed, percentElapsed > threshold)
}

func withStartupEarlyAnnotation(handler http.Handler, hasBeenReady lifecycleEvent, shouldExemptFn shouldExemptFunc) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
select {
Expand Down
Expand Up @@ -19,7 +19,6 @@ package filters
import (
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -32,137 +31,90 @@ import (
clocktesting "k8s.io/utils/clock/testing"
)

func TestWithShutdownLateAnnotation(t *testing.T) {
func TestWithShutdownResponseHeader(t *testing.T) {
var (
shutdownDelayDuration = 100 * time.Second
signaledAt = time.Now()
elapsedAtWithingThreshold = signaledAt.Add(shutdownDelayDuration - 21*time.Second)
elapsedAtBeyondThreshold = signaledAt.Add(shutdownDelayDuration - 19*time.Second)
signaledAt = time.Now()
elapsedAt = signaledAt.Add(20 * time.Second)
)

tests := []struct {
name string
shutdownInitiated func() lifecycleEvent
delayDuration time.Duration
user authenticationuser.Info
clock func() utilsclock.PassiveClock
url string
remoteAddr string
handlerInvoked int
statusCodeExpected int
annotationShouldContain string
name string
optIn bool
shutdownInitiated func() lifecycleEvent
delayDuration time.Duration
clock func() utilsclock.PassiveClock
handlerInvoked int
statusCodeExpected int
responseHeader string
}{
{
name: "shutdown is not initiated",
name: "client did not opt in",
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: make(chan struct{})}
return nil
},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
},
{
name: "shutdown initiated, health probes are not annotated",
name: "client opted in, shutdown not initiated",
optIn: true,
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel()}
return fakeLifecycleSignal{ch: make(chan struct{})}
},
url: "/readyz?verbos=1",
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
delayDuration: 10 * time.Second,
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
},
// use cases where the request will be annotated
{
name: "shutdown initiated, no user in request context",
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "self= loopback=",
},
{
name: "shutdown initiated, self=true",
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.APIServerUser},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "self=true",
},
{
name: "shutdown initiated, self=false",
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "self=false",
responseHeader: "shutdown=false shutdown-delay-duration=10s elapsed=0s host=foo",
},
{
name: "shutdown initiated, loopback=true",
name: "client opted in, shutdown initiated, signaled at is nil",
optIn: true,
delayDuration: 10 * time.Second,
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
return fakeLifecycleSignal{ch: newClosedChannel(), at: nil}
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
remoteAddr: "127.0.0.1:80",
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "loopback=true",
},
{
name: "shutdown initiated, loopback=false",
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
remoteAddr: "www.foo.bar:80",
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "loopback=false",
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
responseHeader: "shutdown=true shutdown-delay-duration=10s elapsed=0s host=foo",
},
{
name: "shutdown initiated, shutdown delay duration is zero",
name: "client opted in, shutdown initiated, signaled at is nil",
optIn: true,
delayDuration: 10 * time.Second,
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
delayDuration: time.Duration(0),
clock: func() utilsclock.PassiveClock {
return clocktesting.NewFakeClock(elapsedAtWithingThreshold)
return fakeLifecycleSignal{ch: newClosedChannel(), at: nil}
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "elapsed=1m19s threshold= late=true",
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
responseHeader: "shutdown=true shutdown-delay-duration=10s elapsed=0s host=foo",
},
{
name: "shutdown initiated, within 80%",
name: "client opted in, shutdown delay duration is zero",
optIn: true,
delayDuration: 0,
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
delayDuration: shutdownDelayDuration,
clock: func() utilsclock.PassiveClock {
return clocktesting.NewFakeClock(elapsedAtWithingThreshold)
return clocktesting.NewFakeClock(elapsedAt)
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "elapsed=1m19s threshold=79.00% late=false self=false loopback=false",
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
responseHeader: "shutdown=true shutdown-delay-duration=0s elapsed=20s host=foo",
},
{
name: "shutdown initiated, outside 80%",
name: "client opted in, shutdown initiated, signaled at is valied",
optIn: true,
delayDuration: 10 * time.Second,
shutdownInitiated: func() lifecycleEvent {
return fakeLifecycleSignal{ch: newClosedChannel(), at: &signaledAt}
},
delayDuration: shutdownDelayDuration,
clock: func() utilsclock.PassiveClock {
return clocktesting.NewFakeClock(elapsedAtBeyondThreshold)
return clocktesting.NewFakeClock(elapsedAt)
},
user: &authenticationuser.DefaultInfo{Name: authenticationuser.Anonymous},
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
annotationShouldContain: "elapsed=1m21s threshold=81.00% late=true self=false loopback=false",
handlerInvoked: 1,
statusCodeExpected: http.StatusOK,
responseHeader: "shutdown=true shutdown-delay-duration=10s elapsed=20s host=foo",
},
}

Expand All @@ -179,33 +131,14 @@ func TestWithShutdownLateAnnotation(t *testing.T) {
if test.clock != nil {
clock = test.clock()
}
target := withShutdownLateAnnotation(handler, event, test.delayDuration, exemptIfHealthProbe, clock)
target := withShutdownResponseHeader(handler, event, test.delayDuration, "foo", clock)

url := "/api/v1/namespaces"
if test.url != "" {
url = test.url
}
req, err := http.NewRequest(http.MethodGet, url, nil)
req, err := http.NewRequest(http.MethodGet, "/api/v1/namespaces", nil)
if err != nil {
t.Fatalf("failed to create new http request - %v", err)
}
if test.remoteAddr != "" {
req.RemoteAddr = test.remoteAddr
}

ctx := req.Context()
if test.user != nil {
ctx = apirequest.WithUser(ctx, test.user)
}
ctx = audit.WithAuditContext(ctx)
req = req.WithContext(ctx)

ac := audit.AuditContextFrom(req.Context())
if ac == nil {
t.Fatalf("expected audit context inside the request context")
}
ac.Event = &auditinternal.Event{
Level: auditinternal.LevelMetadata,
if test.optIn {
req.Header.Set("X-Openshift-If-Disruption", "true")
}

w := httptest.NewRecorder()
Expand All @@ -219,19 +152,16 @@ func TestWithShutdownLateAnnotation(t *testing.T) {
t.Errorf("expected status code: %d, but got: %d", test.statusCodeExpected, w.Result().StatusCode)
}

key := "apiserver.k8s.io/shutdown"
key := "X-OpenShift-Disruption"
switch {
case len(test.annotationShouldContain) == 0:
if valueGot, ok := ac.Event.Annotations[key]; ok {
t.Errorf("did not expect annotation to be added, but got: %s", valueGot)
case len(test.responseHeader) == 0:
if valueGot := w.Header().Get(key); len(valueGot) > 0 {
t.Errorf("did not expect header to be added to the response, but got: %s", valueGot)
}
default:
if valueGot, ok := ac.Event.Annotations[key]; !ok || !strings.Contains(valueGot, test.annotationShouldContain) {
if valueGot := w.Header().Get(key); len(valueGot) == 0 || test.responseHeader != valueGot {
t.Logf("got: %s", valueGot)
t.Errorf("expected annotation to match, diff: %s", cmp.Diff(test.annotationShouldContain, valueGot))
}
if header := w.Header().Get("X-OpenShift-Shutdown"); !strings.Contains(header, test.annotationShouldContain) {
t.Errorf("expected response header to match, diff: %s", cmp.Diff(test.annotationShouldContain, header))
t.Errorf("expected response header to match, diff: %s", cmp.Diff(test.responseHeader, valueGot))
}
}
})
Expand Down

0 comments on commit fd7d9fa

Please sign in to comment.