Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distinguish logical processing timeout and response body write timeout #79609

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 57 additions & 24 deletions staging/src/k8s.io/apiserver/pkg/server/filters/timeout.go
Expand Up @@ -30,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apiserver/pkg/endpoints/metrics"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/klog"
)

var errConnKilled = fmt.Errorf("killing connection/stream because serving request timed out and response had been started")
Expand All @@ -39,14 +40,14 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning apir
if longRunning == nil {
return handler
}
timeoutFunc := func(req *http.Request) (*http.Request, <-chan time.Time, func(), *apierrors.StatusError) {
timeoutFunc := func(req *http.Request) (*http.Request, <-chan time.Time, func(error), *apierrors.StatusError) {
// TODO unify this with apiserver.MaxInFlightLimit
ctx := req.Context()

requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
// if this happens, the handler chain isn't setup correctly because there is no request info
return req, time.After(timeout), func() {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
return req, time.After(timeout), func(error) {}, apierrors.NewInternalError(fmt.Errorf("no request info found for request during timeout"))
}

if longRunning(req, requestInfo) {
Expand All @@ -56,16 +57,28 @@ func WithTimeoutForNonLongRunningRequests(handler http.Handler, longRunning apir
ctx, cancel := context.WithCancel(ctx)
req = req.WithContext(ctx)

postTimeoutFn := func() {
postTimeoutFn := func(err error) {
cancel()
metrics.Record(req, requestInfo, metrics.APIServerComponent, "", http.StatusGatewayTimeout, 0, 0)

if err == nil {
// No error occur, timeout handler write 504 code and body successfully
metrics.Record(req, requestInfo, metrics.APIServerComponent, "", http.StatusGatewayTimeout, 0, 0)
return
}

// Otherwise, inner handler has already tried to write headers and body but timeout to finish to write all body,
// or timeout handler failed to write 504 response.
// So, actually it's not a completed 504 response to the client, the client received EOF.
// We need to distinguish this with logic processing timeout
// TODO(answer1991): if -1 code is ok here
metrics.Record(req, requestInfo, metrics.APIServerComponent, "", -1, 0, 0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that none of the official HTTP response codes are appropriate here. In fact even nginx's 499, client closed the connection does not fit. However I do think we should probably try to use something more like a response code, rather that -1. This seems like a 5xx series error. What about something like a 594?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think this code is a response code, actually server had already write a response code to the client which maybe is 200 or any other official HTTP response code. So I set -1 here is just for metrics usage. Setting an unassigned HTTP response code(like 594) will cause conflict error if the code become standard official code in the feature.

}
return req, time.After(timeout), postTimeoutFn, apierrors.NewTimeoutError(fmt.Sprintf("request did not complete within %s", timeout), 0)
}
return WithTimeout(handler, timeoutFunc)
}

type timeoutFunc = func(*http.Request) (req *http.Request, timeout <-chan time.Time, postTimeoutFunc func(), err *apierrors.StatusError)
type timeoutFunc = func(*http.Request) (req *http.Request, timeout <-chan time.Time, postTimeoutFunc func(error), err *apierrors.StatusError)

// WithTimeout returns an http.Handler that runs h with a timeout
// determined by timeoutFunc. The new http.Handler calls h.ServeHTTP to handle
Expand Down Expand Up @@ -118,14 +131,35 @@ func (t *timeoutHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
return
case <-after:
postTimeoutFn()
tw.timeout(err)
writeRespErr := tw.timeout(err)
postTimeoutFn(writeRespErr)

// The timeout writer has been used by the inner handler. There is
// no way to timeout the HTTP request at the point. We have to shutdown
// the connection for HTTP1 or reset stream for HTTP2.
//
// Note from: Brad Fitzpatrick
// if the ServeHTTP goroutine panics, that will do the best possible thing for both
// HTTP/1 and HTTP/2. In HTTP/1, assuming you're replying with at least HTTP/1.1 and
// you've already flushed the headers so it's using HTTP chunking, it'll kill the TCP
// connection immediately without a proper 0-byte EOF chunk, so the peer will recognize
// the response as bogus. In HTTP/2 the server will just RST_STREAM the stream, leaving
// the TCP connection open, but resetting the stream to the peer so it'll have an error,
// like the HTTP/1 case.
if writeRespErr != nil {
panic(writeRespErr)
}
}
}

type timeoutWriter interface {
http.ResponseWriter
timeout(*apierrors.StatusError)

// timeout will be invoked by timeout handler to write 504 code and body to response when timeout,
// return nil if the timeout writer has not been used by the inner handler and write 504 response successfully,
// otherwise return the errConnKilled or the error when writing 504 response.
// Whatever error occurs, timeout handler need call panic
timeout(*apierrors.StatusError) error
}

func newTimeoutWriter(w http.ResponseWriter) timeoutWriter {
Expand Down Expand Up @@ -156,6 +190,8 @@ type baseTimeoutWriter struct {
wroteHeader bool
// if this timeout writer has been hijacked
hijacked bool
// wroteStartTime is the time when inner handler calls Write or WriteHeader, it's for logging and metrics
wroteStartTime time.Time
}

func (tw *baseTimeoutWriter) Header() http.Header {
Expand All @@ -181,6 +217,7 @@ func (tw *baseTimeoutWriter) Write(p []byte) (int, error) {
}

tw.wroteHeader = true
tw.wroteStartTime = time.Now()
return tw.w.Write(p)
}

Expand All @@ -206,10 +243,11 @@ func (tw *baseTimeoutWriter) WriteHeader(code int) {
}

tw.wroteHeader = true
tw.wroteStartTime = time.Now()
tw.w.WriteHeader(code)
}

func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) {
func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) error {
tw.mu.Lock()
defer tw.mu.Unlock()

Expand All @@ -221,22 +259,17 @@ func (tw *baseTimeoutWriter) timeout(err *apierrors.StatusError) {
if !tw.wroteHeader && !tw.hijacked {
tw.w.WriteHeader(http.StatusGatewayTimeout)
enc := json.NewEncoder(tw.w)
enc.Encode(&err.ErrStatus)
} else {
// The timeout writer has been used by the inner handler. There is
// no way to timeout the HTTP request at the point. We have to shutdown
// the connection for HTTP1 or reset stream for HTTP2.
//
// Note from: Brad Fitzpatrick
// if the ServeHTTP goroutine panics, that will do the best possible thing for both
// HTTP/1 and HTTP/2. In HTTP/1, assuming you're replying with at least HTTP/1.1 and
// you've already flushed the headers so it's using HTTP chunking, it'll kill the TCP
// connection immediately without a proper 0-byte EOF chunk, so the peer will recognize
// the response as bogus. In HTTP/2 the server will just RST_STREAM the stream, leaving
// the TCP connection open, but resetting the stream to the peer so it'll have an error,
// like the HTTP/1 case.
panic(errConnKilled)
return enc.Encode(&err.ErrStatus)
}

// The timeout writer has been used by the inner handler.
// Which means server has finished logical processing and has already started to write header and body,
// but failed to write whole response body or something hanged in inner handler,
// or some error occurred at the under-layer of the system such as socket issue, network iops issue, etc.
// TODO(answer1991): add a metric here?
klog.Errorf("timeout to write response body, write response elapsed: %v", time.Now().Sub(tw.wroteStartTime))

return errConnKilled
}

func (tw *baseTimeoutWriter) closeNotify() <-chan bool {
Expand Down
48 changes: 44 additions & 4 deletions staging/src/k8s.io/apiserver/pkg/server/filters/timeout_test.go
Expand Up @@ -36,14 +36,19 @@ import (
)

type recorder struct {
lock sync.Mutex
count int
lock sync.Mutex
count int
errCount int
}

func (r *recorder) Record() {
func (r *recorder) Record(err error) {
r.lock.Lock()
defer r.lock.Unlock()
r.count++

if err != nil {
r.errCount++
}
}

func (r *recorder) Count() int {
Expand All @@ -52,6 +57,12 @@ func (r *recorder) Count() int {
return r.count
}

func (r *recorder) ErrorCount() int {
r.lock.Lock()
defer r.lock.Unlock()
return r.errCount
}

func newHandler(responseCh <-chan string, panicCh <-chan struct{}, writeErrCh chan<- error) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
select {
Expand Down Expand Up @@ -82,7 +93,7 @@ func TestTimeout(t *testing.T) {

handler := newHandler(sendResponse, doPanic, writeErrors)
ts := httptest.NewServer(withPanicRecovery(
WithTimeout(handler, func(req *http.Request) (*http.Request, <-chan time.Time, func(), *apierrors.StatusError) {
WithTimeout(handler, func(req *http.Request) (*http.Request, <-chan time.Time, func(error), *apierrors.StatusError) {
return req, timeout, record.Record, timeoutErr
}), func(w http.ResponseWriter, req *http.Request, err interface{}) {
gotPanic <- err
Expand Down Expand Up @@ -157,3 +168,32 @@ func TestTimeout(t *testing.T) {
t.Fatalf("expected to see a handler panic, but didn't")
}
}

func TestWriteBodyTimeout(t *testing.T) {
gotPanic := make(chan interface{}, 1)
record := &recorder{}
timeoutErr := apierrors.NewServerTimeout(schema.GroupResource{Group: "foo", Resource: "bar"}, "get", 0)
ts := httptest.NewServer(withPanicRecovery(
WithTimeout(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(200)
// mock under-layer hang scene, like socket hang
<-make(chan struct{})
}), func(req *http.Request) (*http.Request, <-chan time.Time, func(error), *apierrors.StatusError) {
return req, time.After(time.Second * 10), record.Record, timeoutErr
}), func(w http.ResponseWriter, req *http.Request, err interface{}) {
gotPanic <- err
http.Error(w, "This request caused apiserver to panic. Look in the logs for details.", http.StatusInternalServerError)
}),
)
defer ts.Close()
_, err := http.Get(ts.URL)
if err == nil {
t.Fatal("unexpect got response without error")
}
if record.Count() != 1 {
t.Errorf("did not invoke record method: %#v", record)
}
if record.ErrorCount() != 1 {
t.Errorf("did not invoke record method with err: %#v", record)
}
}