Skip to content

Commit

Permalink
portforward: tunnel spdy through websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
seans3 committed Mar 4, 2024
1 parent 89cbd94 commit 8b447d8
Show file tree
Hide file tree
Showing 20 changed files with 1,560 additions and 19 deletions.
9 changes: 9 additions & 0 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,13 @@ const (
// Enable users to specify when a Pod is ready for scheduling.
PodSchedulingReadiness featuregate.Feature = "PodSchedulingReadiness"

// owner: @seans3
// kep: http://kep.k8s.io/4006
// alpha: v1.30
//
// Enables PortForward to be proxied with a websocket client
PortForwardWebsockets featuregate.Feature = "PortForwardWebsockets"

// owner: @jessfraz
// alpha: v1.12
//
Expand Down Expand Up @@ -1101,6 +1108,8 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS

PodSchedulingReadiness: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, // GA in 1.30; remove in 1.32

PortForwardWebsockets: {Default: false, PreRelease: featuregate.Alpha},

ProcMountType: {Default: false, PreRelease: featuregate.Alpha},

QOSReserved: {Default: false, PreRelease: featuregate.Alpha},
Expand Down
7 changes: 6 additions & 1 deletion pkg/registry/core/pod/rest/subresources.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,12 @@ func (r *PortForwardREST) Connect(ctx context.Context, name string, opts runtime
if err != nil {
return nil, err
}
return newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder), nil
handler := newThrottledUpgradeAwareProxyHandler(location, transport, false, true, responder)
if utilfeature.DefaultFeatureGate.Enabled(features.PortForwardWebsockets) {
tunnelingHandler := translator.NewTunnelingHandler(handler)
handler = translator.NewTranslatingHandler(handler, tunnelingHandler, wsstream.IsWebSocketRequestWithTunnelingProtocol)
}
return handler, nil
}

func newThrottledUpgradeAwareProxyHandler(location *url.URL, transport http.RoundTripper, wrapTransport, upgradeRequired bool, responder rest.Responder) http.Handler {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/net/websocket"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/portforward"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -106,6 +107,23 @@ func IsWebSocketRequestWithStreamCloseProtocol(req *http.Request) bool {
return false
}

// IsWebSocketRequestWithTunnelingProtocol returns true if the request contains headers
// identifying that it is requesting a websocket upgrade with a tunneling protocol;
// false otherwise.
func IsWebSocketRequestWithTunnelingProtocol(req *http.Request) bool {
if !IsWebSocketRequest(req) {
return false
}
requestedProtocols := strings.TrimSpace(req.Header.Get(WebSocketProtocolHeader))
for _, requestedProtocol := range strings.Split(requestedProtocols, ",") {
if protocolSupportsWebsocketTunneling(strings.TrimSpace(requestedProtocol)) {
return true
}
}

return false
}

// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
// read and write deadlines are pushed every time a new message is received.
func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
Expand Down Expand Up @@ -301,6 +319,12 @@ func protocolSupportsStreamClose(protocol string) bool {
return protocol == remotecommand.StreamProtocolV5Name
}

// protocolSupportsWebsocketTunneling returns true if the passed protocol
// is a tunneled Kubernetes spdy protocol; false otherwise.
func protocolSupportsWebsocketTunneling(protocol string) bool {
return strings.HasPrefix(protocol, portforward.WebsocketsSPDYTunnelingPrefix) && strings.HasSuffix(protocol, portforward.KubernetesSuffix)
}

// handle implements a websocket handler.
func (conn *Conn) handle(ws *websocket.Conn) {
conn.initialize(ws)
Expand Down
24 changes: 24 additions & 0 deletions staging/src/k8s.io/apimachinery/pkg/util/portforward/constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package portforward

const (
PortForwardV1Name = "portforward.k8s.io"
WebsocketsSPDYTunnelingPrefix = "SPDY/3.1+"
KubernetesSuffix = ".k8s.io"
WebsocketsSPDYTunnelingPortForwardV1 = WebsocketsSPDYTunnelingPrefix + PortForwardV1Name
)
22 changes: 18 additions & 4 deletions staging/src/k8s.io/apimachinery/pkg/util/proxy/upgradeaware.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

"github.com/mxk/go-flowrate/flowrate"

"k8s.io/klog/v2"
)

Expand Down Expand Up @@ -336,6 +337,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
clone.Host = h.Location.Host
}
clone.URL = &location
klog.V(6).Infof("UpgradeAwareProxy: dialing for SPDY upgrade with headers: %v", clone.Header)
backendConn, err = h.DialForUpgrade(clone)
if err != nil {
klog.V(6).Infof("Proxy connection error: %v", err)
Expand Down Expand Up @@ -370,13 +372,13 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
// hijacking should be the last step in the upgrade.
requestHijacker, ok := w.(http.Hijacker)
if !ok {
klog.V(6).Infof("Unable to hijack response writer: %T", w)
klog.Errorf("Unable to hijack response writer: %T", w)
h.Responder.Error(w, req, fmt.Errorf("request connection cannot be hijacked: %T", w))
return true
}
requestHijackedConn, _, err := requestHijacker.Hijack()
if err != nil {
klog.V(6).Infof("Unable to hijack response: %v", err)
klog.Errorf("Unable to hijack response: %v", err)
h.Responder.Error(w, req, fmt.Errorf("error hijacking connection: %v", err))
return true
}
Expand Down Expand Up @@ -420,7 +422,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
} else {
writer = backendConn
}
_, err := io.Copy(writer, requestHijackedConn)
_, err := io.Copy(writer, &loggingReader{name: "client->backend", delegate: requestHijackedConn})
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from client to backend: %v", err)
}
Expand All @@ -434,7 +436,7 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
} else {
reader = backendConn
}
_, err := io.Copy(requestHijackedConn, reader)
_, err := io.Copy(requestHijackedConn, &loggingReader{name: "backend->client", delegate: reader})
if err != nil && !strings.Contains(err.Error(), "use of closed network connection") {
klog.Errorf("Error proxying data from backend to client: %v", err)
}
Expand All @@ -452,6 +454,18 @@ func (h *UpgradeAwareHandler) tryUpgrade(w http.ResponseWriter, req *http.Reques
return true
}

// loggingReader logs the bytes read from the "delegate" with a "name" prefix.
type loggingReader struct {
name string
delegate io.Reader
}

func (l *loggingReader) Read(p []byte) (int, error) {
n, err := l.delegate.Read(p)
klog.V(8).Infof("%s: %d bytes, err=%v, bytes=% X", l.name, n, err, p[:n])
return n, err
}

// FIXME: Taken from net/http/httputil/reverseproxy.go as singleJoiningSlash is not exported to be re-used.
// See-also: https://github.com/golang/go/issues/44290
func singleJoiningSlash(a, b string) string {
Expand Down
2 changes: 1 addition & 1 deletion staging/src/k8s.io/apiserver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ require (
github.com/google/go-cmp v0.6.0
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.5.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822
github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f
Expand Down Expand Up @@ -77,7 +78,6 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.0.1 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
Expand Down

0 comments on commit 8b447d8

Please sign in to comment.