Skip to content

Commit

Permalink
Merge pull request #356 from sttts/sttts-reject-before-ready
Browse files Browse the repository at this point in the history
Bug 1880941: kube-apiserver: log non-probe requests before ready
  • Loading branch information
openshift-merge-robot committed Sep 21, 2020
2 parents f0c398b + 8ca9c03 commit e0a30f6
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 37 deletions.
5 changes: 5 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,9 @@ type Config struct {
// and the kind associated with a given resource. As resources are installed, they are registered here.
EquivalentResourceRegistry runtime.EquivalentResourceRegistry

// hasBeenReadyCh is closed when /readyz succeeds for the first time.
hasBeenReadyCh chan struct{}

// A func that returns whether the server is terminating. This can be nil.
IsTerminating func() bool
}
Expand Down Expand Up @@ -636,6 +639,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G
livezChecks: c.LivezChecks,
readyzChecks: c.ReadyzChecks,
readinessStopCh: make(chan struct{}),
hasBeenReadyCh: c.hasBeenReadyCh,
livezGracePeriod: c.LivezGracePeriod,

DiscoveryGroupManager: discovery.NewRootAPIsHandler(c.DiscoveryAddresses, c.Serializer),
Expand Down Expand Up @@ -761,6 +765,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericfilters.WithCORS(handler, c.CorsAllowedOriginList, nil, nil, nil, "true")
handler = genericfilters.WithTimeoutForNonLongRunningRequests(handler, c.LongRunningFunc, c.RequestTimeout)
handler = genericfilters.WithWaitGroup(handler, c.LongRunningFunc, c.HandlerChainWaitGroup)
handler = WithNonReadyRequestLogging(handler, c.hasBeenReadyCh)
handler = WithLateConnectionFilter(handler)
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
if c.SecureServing != nil && !c.SecureServing.DisableHTTP2 && c.GoawayChance > 0 {
Expand Down
2 changes: 2 additions & 0 deletions staging/src/k8s.io/apiserver/pkg/server/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"time"

"github.com/google/go-cmp/cmp"

"k8s.io/apimachinery/pkg/util/json"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/waitgroup"
Expand Down Expand Up @@ -265,6 +266,7 @@ func TestAuthenticationAuditAnnotationsDefaultChain(t *testing.T) {
return &authenticator.Response{User: &user.DefaultInfo{}}, true, nil
})
backend := &testBackend{}

c := &Config{
Authentication: AuthenticationInfo{Authenticator: authn},
AuditBackend: backend,
Expand Down
4 changes: 3 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ type GenericAPIServer struct {
// the readiness stop channel is used to signal that the apiserver has initiated a shutdown sequence, this
// will cause readyz to return unhealthy.
readinessStopCh chan struct{}
// hasBeenReadyCh is closed when /readyz succeeds for the first time.
hasBeenReadyCh chan struct{}

// auditing. The backend is started after the server starts listening.
AuditBackend audit.Backend
Expand Down Expand Up @@ -354,7 +356,7 @@ func (s preparedGenericAPIServer) Run(stopCh <-chan struct{}) error {
Listener: s.SecureServingInfo.Listener,
lateStopCh: lateStopCh,
}
lateConnectionEventf = s.Eventf
unexpectedRequestsEventf.Store(s.Eventf)

// close socket after delayed stopCh
stoppedCh, err := s.NonBlockingRun(delayedStopCh)
Expand Down
5 changes: 4 additions & 1 deletion staging/src/k8s.io/apiserver/pkg/server/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,10 @@ func (s *GenericAPIServer) installReadyz() {
s.readyzLock.Lock()
defer s.readyzLock.Unlock()
s.readyzChecksInstalled = true
healthz.InstallReadyzHandler(s.Handler.NonGoRestfulMux, s.readyzChecks...)
healthz.InstallReadyzHandlerWithHealthyFunc(s.Handler.NonGoRestfulMux, func() {
// note: InstallReadyzHandlerWithHealthyFunc guarantees that this is called only once
close(s.hasBeenReadyCh)
}, s.readyzChecks...)
}

// installLivez creates the livez endpoint for this server.
Expand Down
38 changes: 30 additions & 8 deletions staging/src/k8s.io/apiserver/pkg/server/healthz/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ func InstallReadyzHandler(mux mux, checks ...HealthChecker) {
InstallPathHandler(mux, "/readyz", checks...)
}

// InstallReadyzHandlerWithHealthyFunc is like InstallReadyzHandler, but in addition call firstTimeReady
// the first time /readyz succeeds.
func InstallReadyzHandlerWithHealthyFunc(mux mux, firstTimeReady func(), checks ...HealthChecker) {
InstallPathHandlerWithHealthyFunc(mux, "/readyz", firstTimeReady, checks...)
}

// InstallLivezHandler registers handlers for liveness checking on the path
// "/livez" to mux. *All handlers* for mux must be specified in
// exactly one call to InstallHandler. Calling InstallHandler more
Expand All @@ -154,13 +160,20 @@ func InstallLivezHandler(mux mux, checks ...HealthChecker) {
// InstallPathHandler more than once for the same path and mux will
// result in a panic.
func InstallPathHandler(mux mux, path string, checks ...HealthChecker) {
InstallPathHandlerWithHealthyFunc(mux, path, nil, checks...)
}

// InstallPathHandlerWithHealthyFunc is like InstallPathHandler, but calls firstTimeHealthy exactly once
// when the handler succeeds for the first time.
func InstallPathHandlerWithHealthyFunc(mux mux, path string, firstTimeHealthy func(), checks ...HealthChecker) {
if len(checks) == 0 {
klog.V(5).Info("No default health checks specified. Installing the ping handler.")
checks = []HealthChecker{PingHealthz}
}

klog.V(5).Infof("Installing health checkers for (%v): %v", path, formatQuoted(checkerNames(checks...)...))

name := strings.Split(strings.TrimPrefix(path, "/"), "/")[0]
mux.Handle(path,
metrics.InstrumentHandlerFunc("GET",
/* group = */ "",
Expand All @@ -171,7 +184,7 @@ func InstallPathHandler(mux mux, path string, checks ...HealthChecker) {
/* component = */ "",
/* deprecated */ false,
/* removedRelease */ "",
handleRootHealthz(checks...)))
handleRootHealth(name, firstTimeHealthy, checks...)))
for _, check := range checks {
mux.Handle(fmt.Sprintf("%s/%v", path, check.Name()), adaptCheckToHandler(check.Check))
}
Expand Down Expand Up @@ -207,9 +220,11 @@ func getExcludedChecks(r *http.Request) sets.String {
return sets.NewString()
}

// handleRootHealthz returns an http.HandlerFunc that serves the provided checks.
func handleRootHealthz(checks ...HealthChecker) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// handleRootHealth returns an http.HandlerFunc that serves the provided checks.
func handleRootHealth(name string, firstTimeHealthy func(), checks ...HealthChecker) http.HandlerFunc {
var notifyOnce sync.Once

return func(w http.ResponseWriter, r *http.Request) {
excluded := getExcludedChecks(r)
// failedVerboseLogOutput is for output to the log. It indicates detailed failed output information for the log.
var failedVerboseLogOutput bytes.Buffer
Expand Down Expand Up @@ -240,8 +255,8 @@ func handleRootHealthz(checks ...HealthChecker) http.HandlerFunc {
}
// always be verbose on failure
if len(failedChecks) > 0 {
klog.V(2).Infof("healthz check failed: %s\n%v", strings.Join(failedChecks, ","), failedVerboseLogOutput.String())
http.Error(httplog.Unlogged(r, w), fmt.Sprintf("%shealthz check failed", individualCheckOutput.String()), http.StatusInternalServerError)
klog.V(2).Infof("%s check failed: %s\n%v", strings.Join(failedChecks, ","), name, failedVerboseLogOutput.String())
http.Error(httplog.Unlogged(r, w), fmt.Sprintf("%s%s check failed", individualCheckOutput.String(), name), http.StatusInternalServerError)
return
}

Expand All @@ -253,8 +268,15 @@ func handleRootHealthz(checks ...HealthChecker) http.HandlerFunc {
}

individualCheckOutput.WriteTo(w)
fmt.Fprint(w, "healthz check passed\n")
})
fmt.Fprintf(w, "%s check passed\n", name)

// signal first time this is healthy
if firstTimeHealthy != nil {
notifyOnce.Do(func() {
firstTimeHealthy()
})
}
}
}

// adaptCheckToHandler returns an http.HandlerFunc that serves the provided checks.
Expand Down
16 changes: 8 additions & 8 deletions staging/src/k8s.io/apiserver/pkg/server/healthz/healthz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,24 +94,24 @@ func TestInstallPathHandler(t *testing.T) {

}

func testMultipleChecks(path string, t *testing.T) {
func testMultipleChecks(path, name string, t *testing.T) {
tests := []struct {
path string
expectedResponse string
expectedStatus int
addBadCheck bool
}{
{"?verbose", "[+]ping ok\nhealthz check passed\n", http.StatusOK, false},
{"?verbose", fmt.Sprintf("[+]ping ok\n%s check passed\n", name), http.StatusOK, false},
{"?exclude=dontexist", "ok", http.StatusOK, false},
{"?exclude=bad", "ok", http.StatusOK, true},
{"?verbose=true&exclude=bad", "[+]ping ok\n[+]bad excluded: ok\nhealthz check passed\n", http.StatusOK, true},
{"?verbose=true&exclude=dontexist", "[+]ping ok\nwarn: some health checks cannot be excluded: no matches for \"dontexist\"\nhealthz check passed\n", http.StatusOK, false},
{"?verbose=true&exclude=bad", fmt.Sprintf("[+]ping ok\n[+]bad excluded: ok\n%s check passed\n", name), http.StatusOK, true},
{"?verbose=true&exclude=dontexist", fmt.Sprintf("[+]ping ok\nwarn: some health checks cannot be excluded: no matches for \"dontexist\"\n%s check passed\n", name), http.StatusOK, false},
{"/ping", "ok", http.StatusOK, false},
{"", "ok", http.StatusOK, false},
{"?verbose", "[+]ping ok\n[-]bad failed: reason withheld\nhealthz check failed\n", http.StatusInternalServerError, true},
{"?verbose", fmt.Sprintf("[+]ping ok\n[-]bad failed: reason withheld\n%s check failed\n", name), http.StatusInternalServerError, true},
{"/ping", "ok", http.StatusOK, true},
{"/bad", "internal server error: this will fail\n", http.StatusInternalServerError, true},
{"", "[+]ping ok\n[-]bad failed: reason withheld\nhealthz check failed\n", http.StatusInternalServerError, true},
{"", fmt.Sprintf("[+]ping ok\n[-]bad failed: reason withheld\n%s check failed\n", name), http.StatusInternalServerError, true},
}

for i, test := range tests {
Expand Down Expand Up @@ -148,11 +148,11 @@ func testMultipleChecks(path string, t *testing.T) {
}

func TestMultipleChecks(t *testing.T) {
testMultipleChecks("", t)
testMultipleChecks("", "healthz", t)
}

func TestMultiplePathChecks(t *testing.T) {
testMultipleChecks("/ready", t)
testMultipleChecks("/ready", "ready", t)
}

func TestCheckerNames(t *testing.T) {
Expand Down
79 changes: 60 additions & 19 deletions staging/src/k8s.io/apiserver/pkg/server/patch_genericapiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import (
"net/http"
"strings"
"sync"
goatomic "sync/atomic"

"go.uber.org/atomic"

"k8s.io/klog/v2"

corev1 "k8s.io/api/core/v1"
"k8s.io/klog/v2"
)

// terminationLoggingListener wraps the given listener to mark late connections
Expand All @@ -41,10 +41,13 @@ type terminationLoggingListener struct {
lateStopCh <-chan struct{}
}

type eventfFunc func(eventType, reason, messageFmt string, args ...interface{})

var (
lateConnectionRemoteAddrsLock sync.RWMutex
lateConnectionRemoteAddrs map[string]bool = map[string]bool{}
lateConnectionEventf func(eventType, reason, messageFmt string, args ...interface{})
lateConnectionRemoteAddrs = map[string]bool{}

unexpectedRequestsEventf goatomic.Value
)

func (l *terminationLoggingListener) Accept() (net.Conn, error) {
Expand Down Expand Up @@ -74,24 +77,17 @@ func WithLateConnectionFilter(handler http.Handler) http.Handler {
lateConnectionRemoteAddrsLock.RUnlock()

if late {
// ignore connections to local IP. Those clients better know what they are doing.
local := false
host, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
// ignore error and keep going
} else if ip := net.ParseIP(host); ip != nil {
local = ip.IsLoopback()
}

if pth := "/" + strings.TrimLeft(r.URL.Path, "/"); pth != "/readyz" && pth != "/healthz" {
if local {
klog.V(4).Infof("Request from loopback client %s to %q (user agent %q) through connection created very late in the graceful termination process (more than 80%% has passed). This client probably does not watch /readyz and might get failures when termination is over.", r.RemoteAddr, r.URL.Path, r.UserAgent())
if pth := "/" + strings.TrimLeft(r.URL.Path, "/"); pth != "/readyz" && pth != "/healthz" && pth != "/livez" {
if isLocal(r) {
klog.V(4).Infof("Loopback request to %q (user agent %q) through connection created very late in the graceful termination process (more than 80%% has passed). This client probably does not watch /readyz and might get failures when termination is over.", r.URL.Path, r.UserAgent())
} else {
klog.Warningf("Request from %s to %q (user agent %q) through connection created very late in the graceful termination process (more than 80%% has passed), possibly a sign for a broken load balancer setup.", r.RemoteAddr, r.URL.Path, r.UserAgent())
klog.Warningf("Request to %q (source IP %s, user agent %q) through a connection created very late in the graceful termination process (more than 80%% has passed), possibly a sign for a broken load balancer setup.", r.URL.Path, r.RemoteAddr, r.UserAgent())

// create only one event to avoid event spam.
if swapped := lateRequestReceived.CAS(false, true); swapped && lateConnectionEventf != nil {
lateConnectionEventf(corev1.EventTypeWarning, "LateConnections", "The apiserver received connections (e.g. from %q, user agent %q) very late in the graceful termination process, possibly a sign for a broken load balancer setup.", r.RemoteAddr, r.UserAgent())
var eventf eventfFunc
eventf, _ = unexpectedRequestsEventf.Load().(eventfFunc)
if swapped := lateRequestReceived.CAS(false, true); swapped && eventf != nil {
eventf(corev1.EventTypeWarning, "LateConnections", "The apiserver received connections (e.g. from %q, user agent %q) very late in the graceful termination process, possibly a sign for a broken load balancer setup.", r.RemoteAddr, r.UserAgent())
}
}
}
Expand All @@ -100,3 +96,48 @@ func WithLateConnectionFilter(handler http.Handler) http.Handler {
handler.ServeHTTP(w, r)
})
}

// WithNonReadyRequestLogging rejects the request until the process has been ready once.
func WithNonReadyRequestLogging(handler http.Handler, hasBeenReadyCh <-chan struct{}) http.Handler {
var nonReadyRequestReceived atomic.Bool

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if hasBeenReadyCh != nil {
select {
case <-hasBeenReadyCh:
handler.ServeHTTP(w, r)
return
default:
}
}

// ignore connections to local IP. Those clients better know what they are doing.
if pth := "/" + strings.TrimLeft(r.URL.Path, "/"); pth != "/readyz" && pth != "/healthz" && pth != "/livez" {
if isLocal(r) {
klog.V(2).Infof("Loopback request to %q (user agent %q) before server is ready. This client probably does not watch /readyz and might get inconsistent answers.", r.URL.Path, r.UserAgent())
} else {
klog.Warningf("Request to %q (source IP %s, user agent %q) before server is ready, possibly a sign for a broken load balancer setup.", r.URL.Path, r.RemoteAddr, r.UserAgent())

// create only one event to avoid event spam.
var eventf eventfFunc
eventf, _ = unexpectedRequestsEventf.Load().(eventfFunc)
if swapped := nonReadyRequestReceived.CAS(false, true); swapped && eventf != nil {
eventf(corev1.EventTypeWarning, "NonReadyRequests", "The kube-apiserver received requests (e.g. from %q, user agent %q, accessing %s) before it was ready, possibly a sign for a broken load balancer setup.", r.RemoteAddr, r.UserAgent(), r.URL.Path)
}
}
}

handler.ServeHTTP(w, r)
})
}

func isLocal(req *http.Request) bool {
host, _, err := net.SplitHostPort(req.RemoteAddr)
if err != nil {
// ignore error and keep going
} else if ip := net.ParseIP(host); ip != nil {
return ip.IsLoopback()
}

return false
}

0 comments on commit e0a30f6

Please sign in to comment.