Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add websocket support for port forwarding #33684

Merged
merged 3 commits into from Feb 2, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/openapi-spec/swagger.json
Expand Up @@ -3792,6 +3792,13 @@
"name": "namespace",
"in": "path",
"required": true
},
{
"uniqueItems": true,
"type": "integer",
"description": "List of ports to forward Required when using WebSockets",
"name": "ports",
"in": "query"
}
]
},
Expand Down
16 changes: 16 additions & 0 deletions api/swagger-spec/v1.json
Expand Up @@ -9095,6 +9095,14 @@
"summary": "connect GET requests to portforward of Pod",
"nickname": "connectGetNamespacedPodPortforward",
"parameters": [
{
"type": "integer",
"paramType": "query",
"name": "ports",
"description": "List of ports to forward Required when using WebSockets",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "path",
Expand Down Expand Up @@ -9125,6 +9133,14 @@
"summary": "connect POST requests to portforward of Pod",
"nickname": "connectPostNamespacedPodPortforward",
"parameters": [
{
"type": "integer",
"paramType": "query",
"name": "ports",
"description": "List of ports to forward Required when using WebSockets",
"required": false,
"allowMultiple": false
},
{
"type": "string",
"paramType": "path",
Expand Down
Expand Up @@ -27,7 +27,7 @@ spec:
command:
- /bin/sh
- -c
- "for i in gcr.io/google_containers/busybox gcr.io/google_containers/busybox:1.24 gcr.io/google_containers/dnsutils:e2e gcr.io/google_containers/eptest:0.1 gcr.io/google_containers/fakegitserver:0.1 gcr.io/google_containers/hostexec:1.2 gcr.io/google_containers/iperf:e2e gcr.io/google_containers/jessie-dnsutils:e2e gcr.io/google_containers/liveness:e2e gcr.io/google_containers/mounttest:0.7 gcr.io/google_containers/mounttest-user:0.3 gcr.io/google_containers/netexec:1.4 gcr.io/google_containers/netexec:1.7 gcr.io/google_containers/nettest:1.7 gcr.io/google_containers/nettest:1.8 gcr.io/google_containers/nginx-slim:0.7 gcr.io/google_containers/nginx-slim:0.8 gcr.io/google_containers/n-way-http:1.0 gcr.io/google_containers/pause:2.0 gcr.io/google_containers/pause-amd64:3.0 gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab gcr.io/google_containers/portforwardtester:1.0 gcr.io/google_containers/redis:e2e gcr.io/google_containers/resource_consumer:beta4 gcr.io/google_containers/resource_consumer/controller:beta4 gcr.io/google_containers/serve_hostname:v1.4 gcr.io/google_containers/test-webserver:e2e gcr.io/google_containers/ubuntu:14.04 gcr.io/google_containers/update-demo:kitten gcr.io/google_containers/update-demo:nautilus gcr.io/google_containers/volume-ceph:0.1 gcr.io/google_containers/volume-gluster:0.2 gcr.io/google_containers/volume-iscsi:0.1 gcr.io/google_containers/volume-nfs:0.6 gcr.io/google_containers/volume-rbd:0.1 gcr.io/google_samples/gb-redisslave:v1 gcr.io/google_containers/redis:v1; do echo $(date '+%X') pulling $i; docker pull $i 1>/dev/null; done; exit 0;"
- "for i in gcr.io/google_containers/busybox gcr.io/google_containers/busybox:1.24 gcr.io/google_containers/dnsutils:e2e gcr.io/google_containers/eptest:0.1 gcr.io/google_containers/fakegitserver:0.1 gcr.io/google_containers/hostexec:1.2 gcr.io/google_containers/iperf:e2e gcr.io/google_containers/jessie-dnsutils:e2e gcr.io/google_containers/liveness:e2e gcr.io/google_containers/mounttest:0.7 gcr.io/google_containers/mounttest-user:0.3 gcr.io/google_containers/netexec:1.4 gcr.io/google_containers/netexec:1.7 gcr.io/google_containers/nettest:1.7 gcr.io/google_containers/nettest:1.8 gcr.io/google_containers/nginx-slim:0.7 gcr.io/google_containers/nginx-slim:0.8 gcr.io/google_containers/n-way-http:1.0 gcr.io/google_containers/pause:2.0 gcr.io/google_containers/pause-amd64:3.0 gcr.io/google_containers/porter:cd5cb5791ebaa8641955f0e8c2a9bed669b1eaab gcr.io/google_containers/portforwardtester:1.2 gcr.io/google_containers/redis:e2e gcr.io/google_containers/resource_consumer:beta4 gcr.io/google_containers/resource_consumer/controller:beta4 gcr.io/google_containers/serve_hostname:v1.4 gcr.io/google_containers/test-webserver:e2e gcr.io/google_containers/ubuntu:14.04 gcr.io/google_containers/update-demo:kitten gcr.io/google_containers/update-demo:nautilus gcr.io/google_containers/volume-ceph:0.1 gcr.io/google_containers/volume-gluster:0.2 gcr.io/google_containers/volume-iscsi:0.1 gcr.io/google_containers/volume-nfs:0.6 gcr.io/google_containers/volume-rbd:0.1 gcr.io/google_samples/gb-redisslave:v1 gcr.io/google_containers/redis:v1; do echo $(date '+%X') pulling $i; docker pull $i 1>/dev/null; done; exit 0;"
securityContext:
privileged: true
volumeMounts:
Expand Down
18 changes: 17 additions & 1 deletion docs/api-reference/v1/operations.html
Expand Up @@ -9047,6 +9047,14 @@ <h4 id="_parameters_63">Parameters</h4>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">QueryParameter</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">ports</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">List of ports to forward Required when using WebSockets</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">false</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">integer (int32)</p></td>
<td class="tableblock halign-left valign-top"></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">PathParameter</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">namespace</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">object name and auth scope, such as for teams and projects</p></td>
Expand Down Expand Up @@ -9152,6 +9160,14 @@ <h4 id="_parameters_64">Parameters</h4>
</thead>
<tbody>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">QueryParameter</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">ports</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">List of ports to forward Required when using WebSockets</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">false</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">integer (int32)</p></td>
<td class="tableblock halign-left valign-top"></td>
</tr>
<tr>
<td class="tableblock halign-left valign-top"><p class="tableblock">PathParameter</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">namespace</p></td>
<td class="tableblock halign-left valign-top"><p class="tableblock">object name and auth scope, such as for teams and projects</p></td>
Expand Down Expand Up @@ -33308,7 +33324,7 @@ <h4 id="_tags_247">Tags</h4>
</div>
<div id="footer">
<div id="footer-text">
Last updated 2017-01-06 18:13:51 UTC
Last updated 2017-02-01 12:44:12 UTC
</div>
</div>
</body>
Expand Down
22 changes: 11 additions & 11 deletions pkg/client/tests/portfoward_test.go
Expand Up @@ -42,16 +42,16 @@ import (
type fakePortForwarder struct {
lock sync.Mutex
// stores data expected from the stream per port
expected map[uint16]string
expected map[int32]string
// stores data received from the stream per port
received map[uint16]string
received map[int32]string
// data to be sent to the stream per port
send map[uint16]string
send map[int32]string
}

var _ portforward.PortForwarder = &fakePortForwarder{}

func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16, stream io.ReadWriteCloser) error {
func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port int32, stream io.ReadWriteCloser) error {
defer stream.Close()

// read from the client
Expand All @@ -77,14 +77,14 @@ func (pf *fakePortForwarder) PortForward(name string, uid types.UID, port uint16

// fakePortForwardServer creates an HTTP server that can handle port forwarding
// requests.
func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[uint16]string) http.HandlerFunc {
func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedFromClient map[int32]string) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
pf := &fakePortForwarder{
expected: expectedFromClient,
received: make(map[uint16]string),
received: make(map[int32]string),
send: serverSends,
}
portforward.ServePortForward(w, req, pf, "pod", "uid", 0, 10*time.Second)
portforward.ServePortForward(w, req, pf, "pod", "uid", nil, 0, 10*time.Second, portforward.SupportedProtocols)

for port, expected := range expectedFromClient {
actual, ok := pf.received[port]
Expand All @@ -109,19 +109,19 @@ func fakePortForwardServer(t *testing.T, testName string, serverSends, expectedF
func TestForwardPorts(t *testing.T) {
tests := map[string]struct {
ports []string
clientSends map[uint16]string
serverSends map[uint16]string
clientSends map[int32]string
serverSends map[int32]string
}{
"forward 1 port with no data either direction": {
ports: []string{"5000"},
},
"forward 2 ports with bidirectional data": {
ports: []string{"5001", "6000"},
clientSends: map[uint16]string{
clientSends: map[int32]string{
5001: "abcd",
6000: "ghij",
},
serverSends: map[uint16]string{
serverSends: map[int32]string{
5001: "1234",
6000: "5678",
},
Expand Down
8 changes: 6 additions & 2 deletions pkg/genericapiserver/endpoints/installer.go
Expand Up @@ -1047,10 +1047,14 @@ func typeToJSON(typeName string) string {
return "string"
case "byte", "*byte":
return "string"

// TODO: Fix these when go-restful supports a way to specify an array query param:
// https://github.com/emicklei/go-restful/issues/225
case "[]string", "[]*string":
// TODO: Fix this when go-restful supports a way to specify an array query param:
// https://github.com/emicklei/go-restful/issues/225
return "string"
case "[]int32", "[]*int32":
return "integer"

default:
return typeName
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/kubelet/BUILD
Expand Up @@ -72,6 +72,7 @@ go_library(
"//pkg/kubelet/rkt:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/server/streaming:go_default_library",
Expand Down Expand Up @@ -177,6 +178,7 @@ go_test(
"//pkg/kubelet/prober/results:go_default_library",
"//pkg/kubelet/prober/testing:go_default_library",
"//pkg/kubelet/secret:go_default_library",
"//pkg/kubelet/server/portforward:go_default_library",
"//pkg/kubelet/server/remotecommand:go_default_library",
"//pkg/kubelet/server/stats:go_default_library",
"//pkg/kubelet/status:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/container/runtime.go
Expand Up @@ -130,7 +130,7 @@ type DirectStreamingRuntime interface {
// tty.
ExecInContainer(containerID ContainerID, cmd []string, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size, timeout time.Duration) error
// Forward the specified port from the specified pod to the stream.
PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error
PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error
// ContainerAttach encapsulates the attaching to containers for testability
ContainerAttacher
}
Expand All @@ -141,7 +141,7 @@ type DirectStreamingRuntime interface {
type IndirectStreamingRuntime interface {
GetExec(id ContainerID, cmd []string, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetAttach(id ContainerID, stdin, stdout, stderr, tty bool) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error)
GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error)
}

type ImageService interface {
Expand Down
6 changes: 3 additions & 3 deletions pkg/kubelet/container/testing/fake_runtime.go
Expand Up @@ -73,7 +73,7 @@ type FakeDirectStreamingRuntime struct {
TTY bool
// Port-forward args
Pod *Pod
Port uint16
Port int32
Stream io.ReadWriteCloser
}
}
Expand Down Expand Up @@ -394,7 +394,7 @@ func (f *FakeRuntime) RemoveImage(image ImageSpec) error {
return f.Err
}

func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port uint16, stream io.ReadWriteCloser) error {
func (f *FakeDirectStreamingRuntime) PortForward(pod *Pod, port int32, stream io.ReadWriteCloser) error {
f.Lock()
defer f.Unlock()

Expand Down Expand Up @@ -471,7 +471,7 @@ func (f *FakeIndirectStreamingRuntime) GetAttach(id ContainerID, stdin, stdout,
return &url.URL{Host: FakeHost}, f.Err
}

func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) {
func (f *FakeIndirectStreamingRuntime) GetPortForward(podName, podNamespace string, podUID types.UID, ports []int32) (*url.URL, error) {
f.Lock()
defer f.Unlock()

Expand Down
2 changes: 1 addition & 1 deletion pkg/kubelet/dockershim/docker_streaming.go
Expand Up @@ -64,7 +64,7 @@ func (r *streamingRuntime) PortForward(podSandboxID string, port int32, stream i
if port < 0 || port > math.MaxUint16 {
return fmt.Errorf("invalid port %d", port)
}
return dockertools.PortForward(r.client, podSandboxID, uint16(port), stream)
return dockertools.PortForward(r.client, podSandboxID, port, stream)
}

// ExecSync executes a command in the container, and returns the stdout output.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/dockertools/docker_manager.go
Expand Up @@ -1353,7 +1353,7 @@ func noPodInfraContainerError(podName, podNamespace string) error {
// - match cgroups of container
// - should we support nsenter + socat on the host? (current impl)
// - should we support nsenter + socat in a container, running with elevated privs and --pid=host?
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream io.ReadWriteCloser) error {
func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port int32, stream io.ReadWriteCloser) error {
podInfraContainer := pod.FindContainerByName(PodInfraContainerName)
if podInfraContainer == nil {
return noPodInfraContainerError(pod.Name, pod.Namespace)
Expand All @@ -1369,7 +1369,7 @@ func (dm *DockerManager) UpdatePodCIDR(podCIDR string) error {
}

// Temporarily export this function to share with dockershim.
func PortForward(client DockerInterface, podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error {
func PortForward(client DockerInterface, podInfraContainerID string, port int32, stream io.ReadWriteCloser) error {
container, err := client.InspectContainer(podInfraContainerID)
if err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions pkg/kubelet/kubelet.go
Expand Up @@ -2171,9 +2171,10 @@ func getStreamingConfig(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps
BaseURL: &url.URL{
Path: "/cri/",
},
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedProtocols: streaming.DefaultConfig.SupportedProtocols,
StreamIdleTimeout: kubeCfg.StreamingConnectionIdleTimeout.Duration,
StreamCreationTimeout: streaming.DefaultConfig.StreamCreationTimeout,
SupportedRemoteCommandProtocols: streaming.DefaultConfig.SupportedRemoteCommandProtocols,
SupportedPortForwardProtocols: streaming.DefaultConfig.SupportedPortForwardProtocols,
}
if kubeDeps.TLSOptions != nil {
config.TLSConfig = kubeDeps.TLSOptions.Config
Expand Down
7 changes: 4 additions & 3 deletions pkg/kubelet/kubelet_pods.go
Expand Up @@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/envvars"
"k8s.io/kubernetes/pkg/kubelet/images"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/kubelet/status"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
Expand Down Expand Up @@ -1394,7 +1395,7 @@ func (kl *Kubelet) AttachContainer(podFullName string, podUID types.UID, contain

// PortForward connects to the pod's port and copies data between the port
// and the stream.
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port uint16, stream io.ReadWriteCloser) error {
func (kl *Kubelet) PortForward(podFullName string, podUID types.UID, port int32, stream io.ReadWriteCloser) error {
streamingRuntime, ok := kl.containerRuntime.(kubecontainer.DirectStreamingRuntime)
if !ok {
return fmt.Errorf("streaming methods not supported by runtime")
Expand Down Expand Up @@ -1467,7 +1468,7 @@ func (kl *Kubelet) GetAttach(podFullName string, podUID types.UID, containerName
}

// GetPortForward gets the URL the port-forward will be served from, or nil if the Kubelet will serve it.
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID) (*url.URL, error) {
func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID, portForwardOpts portforward.V4Options) (*url.URL, error) {
switch streamingRuntime := kl.containerRuntime.(type) {
case kubecontainer.DirectStreamingRuntime:
// Kubelet will serve the attach directly.
Expand All @@ -1484,7 +1485,7 @@ func (kl *Kubelet) GetPortForward(podName, podNamespace string, podUID types.UID
return nil, fmt.Errorf("pod not found (%q)", podFullName)
}

return streamingRuntime.GetPortForward(podName, podNamespace, podUID)
return streamingRuntime.GetPortForward(podName, podNamespace, podUID, portForwardOpts.Ports)
default:
return nil, fmt.Errorf("container runtime does not support port-forward")
}
Expand Down
9 changes: 5 additions & 4 deletions pkg/kubelet/kubelet_pods_test.go
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/api/v1"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
"k8s.io/kubernetes/pkg/kubelet/server/portforward"
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
)

Expand Down Expand Up @@ -1607,7 +1608,7 @@ func TestPortForward(t *testing.T) {
podName = "podFoo"
podNamespace = "nsFoo"
podUID types.UID = "12345678"
port uint16 = 5000
port int32 = 5000
)
var (
stream = &fakeReadWriteCloser{}
Expand Down Expand Up @@ -1646,7 +1647,7 @@ func TestPortForward(t *testing.T) {
podFullName := kubecontainer.GetPodFullName(podWithUidNameNs(podUID, tc.podName, podNamespace))
{ // No streaming case
description := "no streaming - " + tc.description
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.Error(t, err, description)
assert.Nil(t, redirect, description)

Expand All @@ -1658,7 +1659,7 @@ func TestPortForward(t *testing.T) {
fakeRuntime := &containertest.FakeDirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime

redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
assert.NoError(t, err, description)
assert.Nil(t, redirect, description)

Expand All @@ -1677,7 +1678,7 @@ func TestPortForward(t *testing.T) {
fakeRuntime := &containertest.FakeIndirectStreamingRuntime{FakeRuntime: testKubelet.fakeRuntime}
kubelet.containerRuntime = fakeRuntime

redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID)
redirect, err := kubelet.GetPortForward(tc.podName, podNamespace, podUID, portforward.V4Options{})
if tc.expectError {
assert.Error(t, err, description)
} else {
Expand Down
4 changes: 2 additions & 2 deletions pkg/kubelet/kuberuntime/kuberuntime_sandbox.go
Expand Up @@ -237,17 +237,17 @@ func (m *kubeGenericRuntimeManager) getSandboxIDByPodUID(podUID kubetypes.UID, s
}

// GetPortForward gets the endpoint the runtime will serve the port-forward request from.
func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID) (*url.URL, error) {
func (m *kubeGenericRuntimeManager) GetPortForward(podName, podNamespace string, podUID kubetypes.UID, ports []int32) (*url.URL, error) {
sandboxIDs, err := m.getSandboxIDByPodUID(podUID, nil)
if err != nil {
return nil, fmt.Errorf("failed to find sandboxID for pod %s: %v", format.PodDesc(podName, podNamespace, podUID), err)
}
if len(sandboxIDs) == 0 {
return nil, fmt.Errorf("failed to find sandboxID for pod %s", format.PodDesc(podName, podNamespace, podUID))
}
// TODO: Port is unused for now, but we may need it in the future.
req := &runtimeapi.PortForwardRequest{
PodSandboxId: sandboxIDs[0],
Port: ports,
}
resp, err := m.runtimeService.PortForward(req)
if err != nil {
Expand Down