Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WebSocket Client and V5 RemoteCommand Subprotocol #119157

Merged
merged 1 commit into from
Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions hack/unwanted-dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"github.com/gorilla/mux": "unmaintained, archive mode",
"github.com/gorilla/rpc": "unmaintained, archive mode",
"github.com/gorilla/schema": "unmaintained, archive mode",
"github.com/gorilla/websocket": "unmaintained, archive mode",
"github.com/gregjones/httpcache": "unmaintained, archive mode",
"github.com/grpc-ecosystem/go-grpc-prometheus": "unmaintained, archive mode",
"github.com/grpc-ecosystem/grpc-gateway": "use github.com/grpc-ecosystem/grpc-gateway/v2",
Expand Down Expand Up @@ -143,11 +142,6 @@
"cloud.google.com/go/compute",
"cloud.google.com/go/storage"
],
"github.com/gorilla/websocket": [
"github.com/moby/spdystream",
"github.com/tmc/grpc-websocket-proxy",
"go.etcd.io/etcd/server/v3"
],
"github.com/gregjones/httpcache": [
"k8s.io/client-go"
],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"golang.org/x/net/websocket"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/klog/v2"
)
Expand Down Expand Up @@ -234,10 +235,18 @@ func (conn *Conn) Close() error {
return nil
}

// protocolSupportsStreamClose returns true if the passed protocol
// supports the stream close signal (currently only V5 remotecommand);
// false otherwise.
func protocolSupportsStreamClose(protocol string) bool {
return protocol == remotecommand.StreamProtocolV5Name
}

// handle implements a websocket handler.
func (conn *Conn) handle(ws *websocket.Conn) {
defer conn.Close()
conn.initialize(ws)
supportsStreamClose := protocolSupportsStreamClose(conn.selectedProtocol)

for {
conn.resetTimeout()
Expand All @@ -251,6 +260,21 @@ func (conn *Conn) handle(ws *websocket.Conn) {
if len(data) == 0 {
continue
}
if supportsStreamClose && data[0] == remotecommand.StreamClose {
if len(data) != 2 {
klog.Errorf("Single channel byte should follow stream close signal. Got %d bytes", len(data)-1)
break
} else {
channel := data[1]
liggitt marked this conversation as resolved.
Show resolved Hide resolved
if int(channel) >= len(conn.channels) {
klog.Errorf("Close is targeted for a channel %d that is not valid, possible protocol error", channel)
break
}
klog.V(4).Infof("Received half-close signal from client; close %d stream", channel)
conn.channels[channel].Close() // After first Close, other closes are noop.
liggitt marked this conversation as resolved.
Show resolved Hide resolved
}
continue
}
channel := data[0]
if conn.codec == base64Codec {
channel = channel - '0'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,54 @@ limitations under the License.

// Package wsstream contains utilities for streaming content over WebSockets.
// The Conn type allows callers to multiplex multiple read/write channels over
// a single websocket. The Reader type allows an io.Reader to be copied over
// a websocket channel as binary content.
// a single websocket.
//
// "channel.k8s.io"
//
// The Websocket RemoteCommand subprotocol "channel.k8s.io" prepends each binary message with a
// byte indicating the channel number (zero indexed) the message was sent on. Messages in both
// directions should prefix their messages with this channel byte. Used for remote execution,
// the channel numbers are by convention defined to match the POSIX file-descriptors assigned
// to STDIN, STDOUT, and STDERR (0, 1, and 2). No other conversion is performed on the raw
// subprotocol - writes are sent as they are received by the server.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "channel.k8s.io"
// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
// READ []byte{1, 10} # receive "\n" on channel 1 (STDOUT)
// CLOSE
//
// "v2.channel.k8s.io"
//
// The second Websocket subprotocol version "v2.channel.k8s.io" is the same as version 1,
// but it is the first "versioned" subprotocol.
//
// "v3.channel.k8s.io"
//
// The third version of the Websocket RemoteCommand subprotocol adds another channel
// for terminal resizing events. This channel is prepended with the byte '3', and it
// transmits two window sizes (encoding TerminalSize struct) with integers in the range
// (0,65536].
//
// "v4.channel.k8s.io"
//
// The fourth version of the Websocket RemoteCommand subprotocol adds a channel for
// errors. This channel returns structured errors containing process exit codes. The
// error is "apierrors.StatusError{}".
//
// "v5.channel.k8s.io"
//
// The fifth version of the Websocket RemoteCommand subprotocol adds a CLOSE signal,
// which is sent as the first byte of the message. The second byte is the channel
// id. This CLOSE signal is handled by the websocket server by closing the stream,
// allowing the other streams to complete transmission if necessary, and gracefully
// shutdown the connection.
//
// Example client session:
//
// CONNECT http://server.com with subprotocol "v5.channel.k8s.io"
// WRITE []byte{0, 102, 111, 111, 10} # send "foo\n" on channel 0 (STDIN)
// WRITE []byte{255, 0} # send CLOSE signal (STDIN)
// CLOSE
package wsstream // import "k8s.io/apimachinery/pkg/util/httpstream/wsstream"
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,22 @@ const (
// adds support for exit codes.
StreamProtocolV4Name = "v4.channel.k8s.io"

// The subprotocol "v5.channel.k8s.io" is used for remote command
// attachment/execution. It is the 5th version of the subprotocol and
// adds support for a CLOSE signal.
StreamProtocolV5Name = "v5.channel.k8s.io"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

long term we may think in make it official https://www.iana.org/assignments/websocket/websocket.xml#subprotocol-name, the protocol is not very complicated


NonZeroExitCodeReason = metav1.StatusReason("NonZeroExitCode")
ExitCodeCauseType = metav1.CauseType("ExitCode")

// RemoteCommand stream identifiers. The first three identifiers (for STDIN,
// STDOUT, STDERR) are the same as their file descriptors.
StreamStdIn = 0
StreamStdOut = 1
StreamStdErr = 2
StreamErr = 3
StreamResize = 4
StreamClose = 255
)

var SupportedStreamingProtocols = []string{StreamProtocolV4Name, StreamProtocolV3Name, StreamProtocolV2Name, StreamProtocolV1Name}
1 change: 1 addition & 0 deletions staging/src/k8s.io/cli-runtime/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/google/go-cmp v0.5.9
github.com/google/gofuzz v1.2.0
github.com/google/uuid v1.3.0
github.com/gorilla/websocket v1.4.2
seans3 marked this conversation as resolved.
Show resolved Hide resolved
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7
github.com/imdario/mergo v0.3.6
github.com/peterbourgon/diskv v2.0.1+incompatible
Expand Down
1 change: 1 addition & 0 deletions staging/src/k8s.io/client-go/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

124 changes: 0 additions & 124 deletions staging/src/k8s.io/client-go/tools/remotecommand/remotecommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,10 @@ package remotecommand

import (
"context"
"fmt"
"io"
"net/http"
"net/url"

"k8s.io/klog/v2"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/apimachinery/pkg/util/remotecommand"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
)

// StreamOptions holds information pertaining to the current streaming session:
Expand Down Expand Up @@ -63,120 +56,3 @@ type streamCreator interface {
type streamProtocolHandler interface {
stream(conn streamCreator) error
}

// streamExecutor handles transporting standard shell streams over an httpstream connection.
type streamExecutor struct {
upgrader spdy.Upgrader
transport http.RoundTripper

method string
url *url.URL
protocols []string
}

// NewSPDYExecutor connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams.
func NewSPDYExecutor(config *restclient.Config, method string, url *url.URL) (Executor, error) {
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(config)
if err != nil {
return nil, err
}
return NewSPDYExecutorForTransports(wrapper, upgradeRoundTripper, method, url)
}

// NewSPDYExecutorForTransports connects to the provided server using the given transport,
// upgrades the response using the given upgrader to multiplexed bidirectional streams.
func NewSPDYExecutorForTransports(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL) (Executor, error) {
return NewSPDYExecutorForProtocols(
transport, upgrader, method, url,
remotecommand.StreamProtocolV4Name,
remotecommand.StreamProtocolV3Name,
remotecommand.StreamProtocolV2Name,
remotecommand.StreamProtocolV1Name,
)
}

// NewSPDYExecutorForProtocols connects to the provided server and upgrades the connection to
// multiplexed bidirectional streams using only the provided protocols. Exposed for testing, most
// callers should use NewSPDYExecutor or NewSPDYExecutorForTransports.
func NewSPDYExecutorForProtocols(transport http.RoundTripper, upgrader spdy.Upgrader, method string, url *url.URL, protocols ...string) (Executor, error) {
return &streamExecutor{
upgrader: upgrader,
transport: transport,
method: method,
url: url,
protocols: protocols,
}, nil
}

// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(options StreamOptions) error {
return e.StreamWithContext(context.Background(), options)
}

// newConnectionAndStream creates a new SPDY connection and a stream protocol handler upon it.
func (e *streamExecutor) newConnectionAndStream(ctx context.Context, options StreamOptions) (httpstream.Connection, streamProtocolHandler, error) {
req, err := http.NewRequestWithContext(ctx, e.method, e.url.String(), nil)
if err != nil {
return nil, nil, fmt.Errorf("error creating request: %v", err)
}

conn, protocol, err := spdy.Negotiate(
e.upgrader,
&http.Client{Transport: e.transport},
req,
e.protocols...,
)
if err != nil {
return nil, nil, err
}

var streamer streamProtocolHandler

switch protocol {
case remotecommand.StreamProtocolV4Name:
streamer = newStreamProtocolV4(options)
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = newStreamProtocolV2(options)
case "":
klog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = newStreamProtocolV1(options)
}

return conn, streamer, nil
}

// StreamWithContext opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects or the context is done.
func (e *streamExecutor) StreamWithContext(ctx context.Context, options StreamOptions) error {
conn, streamer, err := e.newConnectionAndStream(ctx, options)
if err != nil {
return err
}
defer conn.Close()

panicChan := make(chan any, 1)
errorChan := make(chan error, 1)
go func() {
defer func() {
if p := recover(); p != nil {
panicChan <- p
}
}()
errorChan <- streamer.stream(conn)
}()

select {
case p := <-panicChan:
panic(p)
case err := <-errorChan:
return err
case <-ctx.Done():
return ctx.Err()
}
}
Loading