Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle redirects in apiserver proxy handler #34987

Merged
merged 1 commit into from Nov 5, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions pkg/registry/core/pod/rest/subresources.go
Expand Up @@ -70,7 +70,7 @@ func (r *ProxyREST) Connect(ctx api.Context, id string, opts runtime.Object, res
}
location.Path = path.Join(location.Path, proxyOpts.Path)
// Return a proxy handler that uses the desired transport, wrapped with additional proxy handling (to get URL rewriting, X-Forwarded-* headers, etc)
return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, responder), nil
return newThrottledUpgradeAwareProxyHandler(location, transport, true, false, false, responder), nil
}

// Support both GET and POST methods. We must support GET for browsers that want to use WebSockets.
Expand Down Expand Up @@ -100,7 +100,7 @@ func (r *AttachREST) Connect(ctx api.Context, name string, opts runtime.Object,
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

// NewConnectOptions returns the versioned object that represents exec parameters
Expand Down Expand Up @@ -137,7 +137,7 @@ func (r *ExecREST) Connect(ctx api.Context, name string, opts runtime.Object, re
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

// NewConnectOptions returns the versioned object that represents exec parameters
Expand Down Expand Up @@ -180,11 +180,12 @@ func (r *PortForwardREST) Connect(ctx api.Context, name string, opts runtime.Obj
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, true, responder), nil
}

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) *genericrest.UpgradeAwareProxyHandler {
func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired, interceptRedirects bool, responder rest.Responder) *genericrest.UpgradeAwareProxyHandler {
handler := genericrest.NewUpgradeAwareProxyHandler(location, transport, wrapTransport, upgradeRequired, responder)
handler.InterceptRedirects = interceptRedirects
handler.MaxBytesPerSec = capabilities.Get().PerConnectionBandwidthLimitBytesPerSec
return handler
}
6 changes: 6 additions & 0 deletions pkg/registry/generic/rest/BUILD
Expand Up @@ -23,9 +23,11 @@ go_library(
"//pkg/api/errors:go_default_library",
"//pkg/api/rest:go_default_library",
"//pkg/api/unversioned:go_default_library",
"//pkg/util/config:go_default_library",
"//pkg/util/httpstream:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/proxy:go_default_library",
"//pkg/util/runtime:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:github.com/mxk/go-flowrate/flowrate",
],
Expand All @@ -43,8 +45,12 @@ go_test(
deps = [
"//pkg/api:go_default_library",
"//pkg/api/errors:go_default_library",
"//pkg/util/config:go_default_library",
"//pkg/util/httpstream:go_default_library",
"//pkg/util/net:go_default_library",
"//pkg/util/proxy:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:github.com/stretchr/testify/require",
"//vendor:golang.org/x/net/websocket",
],
)
171 changes: 150 additions & 21 deletions pkg/registry/generic/rest/proxy.go
Expand Up @@ -17,7 +17,11 @@ limitations under the License.
package rest

import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"net/http"
"net/http/httputil"
"net/url"
Expand All @@ -26,9 +30,11 @@ import (
"time"

"k8s.io/kubernetes/pkg/api/errors"
utilconfig "k8s.io/kubernetes/pkg/util/config"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/net"
utilnet "k8s.io/kubernetes/pkg/util/net"
"k8s.io/kubernetes/pkg/util/proxy"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"

"github.com/golang/glog"
"github.com/mxk/go-flowrate/flowrate"
Expand All @@ -41,10 +47,13 @@ type UpgradeAwareProxyHandler struct {
// Transport provides an optional round tripper to use to proxy. If nil, the default proxy transport is used
Transport http.RoundTripper
// WrapTransport indicates whether the provided Transport should be wrapped with default proxy transport behavior (URL rewriting, X-Forwarded-* header setting)
WrapTransport bool
FlushInterval time.Duration
MaxBytesPerSec int64
Responder ErrorResponder
WrapTransport bool
// InterceptRedirects determines whether the proxy should sniff backend responses for redirects,
// following them as necessary.
InterceptRedirects bool
FlushInterval time.Duration
MaxBytesPerSec int64
Responder ErrorResponder
}

const defaultFlushInterval = 200 * time.Millisecond
Expand Down Expand Up @@ -131,32 +140,44 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R
return false
}

backendConn, err := proxy.DialURL(h.Location, h.Transport)
var (
backendConn net.Conn
rawResponse []byte
err error
)
if h.InterceptRedirects && utilconfig.DefaultFeatureGate.StreamingProxyRedirects() {
backendConn, rawResponse, err = h.connectBackendWithRedirects(req)
} else {
backendConn, err = h.connectBackend(req.Method, h.Location, req.Header, req.Body)
}
if err != nil {
h.Responder.Error(err)
return true
}
defer backendConn.Close()

requestHijackedConn, _, err := w.(http.Hijacker).Hijack()
if err != nil {
h.Responder.Error(err)
// Once the connection is hijacked, the ErrorResponder will no longer work, so
// hijacking should be the last step in the upgrade.
requestHijacker, ok := w.(http.Hijacker)
if !ok {
h.Responder.Error(fmt.Errorf("request connection cannot be hijacked: %T", w))
return true
}
defer requestHijackedConn.Close()

newReq, err := http.NewRequest(req.Method, h.Location.String(), req.Body)
requestHijackedConn, _, err := requestHijacker.Hijack()
if err != nil {
h.Responder.Error(err)
h.Responder.Error(fmt.Errorf("error hijacking request connection: %v", err))
return true
}
newReq.Header = req.Header
defer requestHijackedConn.Close()

if err = newReq.Write(backendConn); err != nil {
h.Responder.Error(err)
return true
// Forward raw response bytes back to client.
if len(rawResponse) > 0 {
if _, err = requestHijackedConn.Write(rawResponse); err != nil {
utilruntime.HandleError(fmt.Errorf("Error proxying response from backend to client: %v", err))
}
}

// Proxy the connection.
wg := &sync.WaitGroup{}
wg.Add(2)

Expand Down Expand Up @@ -192,6 +213,113 @@ func (h *UpgradeAwareProxyHandler) tryUpgrade(w http.ResponseWriter, req *http.R
return true
}

// connectBackend dials the backend at location and forwards a copy of the client request.
func (h *UpgradeAwareProxyHandler) connectBackend(method string, location *url.URL, header http.Header, body io.Reader) (conn net.Conn, err error) {
defer func() {
if err != nil && conn != nil {
conn.Close()
conn = nil
Copy link
Member

@liggitt liggitt Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assigning nil here doesn't change the returned value (https://play.golang.org/p/q5dnDAYsL6). actually do return nil, ... in error cases.

edit: I'm wrong, named returns make this work correctly. your call whether you want to switch it to clean up inline and return nil

}
}()

beReq, err := http.NewRequest(method, location.String(), body)
if err != nil {
return nil, err
}
beReq.Header = header
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't drop this, let the compiler help us

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The compiler already enforces this since we use it in a net.RoundTripperWrapper context, and the net package name now conflicts with the stdlib net. If you feel strongly I can alias the package name and add it back.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not seeing the type assertion elsewhere, I'd like to keep it

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


conn, err = proxy.DialURL(location, h.Transport)
if err != nil {
return conn, fmt.Errorf("error dialing backend: %v", err)
Copy link
Member

@liggitt liggitt Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return nil in error scenarios? otherwise we try to double close it, right? (in the defer here, and in the defer in connectBackendWithRedirects())

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Done.

}

if err = beReq.Write(conn); err != nil {
Copy link
Member

@liggitt liggitt Nov 1, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this consume and close the original req.Body?

Copy link
Contributor

@sttts sttts Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it proxies the original request's body to the backend. So that's the intention, isn't it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, but connectBackend() is called repeatedly with the same request in cases where the backend returns a redirect

return conn, fmt.Errorf("error sending request: %v", err)
}

return conn, err
}

// connectBackendWithRedirects dials the backend and forwards a copy of the client request. If the
// client responds with a redirect, it is followed. The raw response bytes are returned, and should
// be forwarded back to the client.
func (h *UpgradeAwareProxyHandler) connectBackendWithRedirects(req *http.Request) (net.Conn, []byte, error) {
const (
maxRedirects = 10
maxResponseSize = 4096
)
var (
initialReq = req
rawResponse = bytes.NewBuffer(make([]byte, 0, 256))
location = h.Location
intermediateConn net.Conn
err error
)
defer func() {
if intermediateConn != nil {
intermediateConn.Close()
}
}()

redirectLoop:
for redirects := 0; ; redirects++ {
if redirects > maxRedirects {
return nil, nil, fmt.Errorf("too many redirects (%d)", redirects)
}

if redirects == 0 {
intermediateConn, err = h.connectBackend(req.Method, location, req.Header, req.Body)
} else {
// Redirected requests switch to "GET" according to the HTTP spec:
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.3
intermediateConn, err = h.connectBackend("GET", location, initialReq.Header, nil)
}

if err != nil {
return nil, nil, err
}

// Peek at the backend response.
rawResponse.Reset()
respReader := bufio.NewReader(io.TeeReader(
io.LimitReader(intermediateConn, maxResponseSize), // Don't read more than maxResponseSize bytes.
rawResponse)) // Save the raw response.
resp, err := http.ReadResponse(respReader, req)
Copy link
Member

@liggitt liggitt Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how does this react if maxResponseSize falls in the middle of the headers, or in the middle of the response body where the response also sends a content-length header?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the maxResponseSize is hit, ReadResponse would return an error, which would cause the whole response to be sent to the client (i.e. no redirects followed). We don't read the body, so if it lands in the middle of the body it would be a non-issue.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it reads the HTTP protocol line, reads the headers, and wraps the remainder in a Body reader in the returned response, so the only issue would be if maxResponseSize fell in the headers portion of the response

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 1024 bytes sufficient for reading the headers?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm not positive, but I think if the maxResponseSize fell at just the right point between headers, it could look like a complete response and drop some headers. The worst case scenario would be that it was a redirect response (302), and the "Location" header was dropped. In this case, an error would be returned to the client instead: 302 response missing Location header.

I think options here are to ignore this case, or always forward the raw response to the client in the case of an error following a redirect.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably fine, maybe worth a comment explaining the assumptions

depending on how the kubelet redirects, I could see the headers exceeding 1024 bytes. The exec/attach command and params are in query params, so if the kubelet just redirects those, the length is user-controlled. If the kubelet consumes those when setting up the shim and redirects with a session/nonce query param, that could also be longish.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I issued a bunch of exec/attach/portforward calls, and logged the length of the responses. They were all < 200 bytes. I arbitrarily picked 1024 as a permissive upper bound that should still have minimal impact on memory if it's hit.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From out of band conversation: this imposes an artificial limit on the length of the exec command, since it is included in the Location header.

Copy link
Member

@liggitt liggitt Nov 3, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline, those < 200 byte responses are for the current kublet, without redirects. once the kubelet starts redirecting, if it includes the original query (containing exec/attach command and params) that's arbitrary-length user-data. That probably rules out that method. This is fine for alpha, but might require rethinking as the kubelet pieces get built out.

if err != nil {
// Unable to read the backend response; let the client handle it.
glog.Warningf("Error reading backend response: %v", err)
break redirectLoop
}
resp.Body.Close() // Unused.

switch resp.StatusCode {
case http.StatusFound:
// Redirect, continue.
default:
// Don't redirect.
break redirectLoop
}

// Reset the connection.
intermediateConn.Close()
intermediateConn = nil

// Prepare to follow the redirect.
redirectStr := resp.Header.Get("Location")
if redirectStr == "" {
return nil, nil, fmt.Errorf("%d response missing Location header", resp.StatusCode)
}
location, err = h.Location.Parse(redirectStr)
if err != nil {
return nil, nil, fmt.Errorf("malformed Location header: %v", err)
}
}

backendConn := intermediateConn
intermediateConn = nil // Don't close the connection when we return it.
return backendConn, rawResponse.Bytes(), nil
}

func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL, internalTransport http.RoundTripper) http.RoundTripper {
scheme := url.Scheme
host := url.Host
Expand All @@ -213,21 +341,22 @@ func (h *UpgradeAwareProxyHandler) defaultProxyTransport(url *url.URL, internalT

// corsRemovingTransport is a wrapper for an internal transport. It removes CORS headers
// from the internal response.
// Implements pkg/util/net.RoundTripperWrapper
type corsRemovingTransport struct {
http.RoundTripper
}

func (p *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := p.RoundTripper.RoundTrip(req)
var _ = utilnet.RoundTripperWrapper(&corsRemovingTransport{})

func (rt *corsRemovingTransport) RoundTrip(req *http.Request) (*http.Response, error) {
resp, err := rt.RoundTripper.RoundTrip(req)
if err != nil {
return nil, err
}
removeCORSHeaders(resp)
return resp, nil
}

var _ = net.RoundTripperWrapper(&corsRemovingTransport{})

func (rt *corsRemovingTransport) WrappedRoundTripper() http.RoundTripper {
return rt.RoundTripper
}
Expand Down