Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support terminal resizing for exec/attach/run #25273

Merged
merged 1 commit into from
Jul 14, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 4 additions & 3 deletions cmd/hyperkube/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,15 @@ limitations under the License.
package main

import (
"os"

"github.com/docker/docker/pkg/term"
"k8s.io/kubernetes/pkg/kubectl/cmd"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
)

func NewKubectlServer() *Server {
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
// need to use term.StdStreams to get the right IO refs on Windows
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not a problem on all the clients that have to deal with exec? Why specifically here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does any other code have to take this into account?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it here for hyperkube and also below for kubectl. I did it here because I figured the top-down approach made the most sense, instead of specifically going into the exec/run/attach commands and ignoring what was passed in for stdout and overriding it with the value from term.StdStreams.

stdin, stdout, stderr := term.StdStreams()
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr)
localFlags := cmd.LocalFlags()
localFlags.SetInterspersed(false)

Expand Down
6 changes: 4 additions & 2 deletions cmd/kubectl/app/kubectl.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ limitations under the License.
package app

import (
"os"
"github.com/docker/docker/pkg/term"

"k8s.io/kubernetes/pkg/kubectl/cmd"
cmdutil "k8s.io/kubernetes/pkg/kubectl/cmd/util"
Expand All @@ -28,6 +28,8 @@ WARNING: this logic is duplicated, with minor changes, in cmd/hyperkube/kubectl.
Any salient changes here will need to be manually reflected in that file.
*/
func Run() error {
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), os.Stdin, os.Stdout, os.Stderr)
// need to use term.StdStreams to get the right IO refs on Windows
stdin, stdout, stderr := term.StdStreams()
cmd := cmd.NewKubectlCommand(cmdutil.NewFactory(nil), stdin, stdout, stderr)
return cmd.Execute()
}
2 changes: 2 additions & 0 deletions pkg/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -2751,6 +2751,8 @@ const (
StreamTypeData = "data"
// Value for streamType header for error stream
StreamTypeError = "error"
// Value for streamType header for terminal resize stream
StreamTypeResize = "resize"

// Name of header that specifies the port being forwarded
PortHeader = "port"
Expand Down
41 changes: 25 additions & 16 deletions pkg/client/unversioned/remotecommand/remotecommand.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,28 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/httpstream/spdy"
"k8s.io/kubernetes/pkg/util/term"
)

// StreamOptions holds information pertaining to the current streaming session: supported stream
// protocols, input/output streams, if the client is requesting a TTY, and a terminal size queue to
// support terminal resizing.
type StreamOptions struct {
SupportedProtocols []string
Stdin io.Reader
Stdout io.Writer
Stderr io.Writer
Tty bool
TerminalSizeQueue term.TerminalSizeQueue
}

// Executor is an interface for transporting shell-style streams.
type Executor interface {
// Stream initiates the transport of the standard shell streams. It will transport any
// non-nil stream to a remote system, and return an error if a problem occurs. If tty
// is set, the stderr stream is not used (raw TTY manages stdout and stderr over the
// stdout stream).
Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error
Stream(options StreamOptions) error
}

// StreamExecutor supports the ability to dial an httpstream connection and the ability to
Expand Down Expand Up @@ -129,14 +142,18 @@ func (e *streamExecutor) Dial(protocols ...string) (httpstream.Connection, strin
return conn, resp.Header.Get(httpstream.HeaderProtocolVersion), nil
}

type streamCreator interface {
CreateStream(headers http.Header) (httpstream.Stream, error)
}

type streamProtocolHandler interface {
stream(httpstream.Connection) error
stream(conn streamCreator) error
}

// Stream opens a protocol streamer to the server and streams until a client closes
// the connection or the server disconnects.
func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, stdout, stderr io.Writer, tty bool) error {
conn, protocol, err := e.Dial(supportedProtocols...)
func (e *streamExecutor) Stream(options StreamOptions) error {
conn, protocol, err := e.Dial(options.SupportedProtocols...)
if err != nil {
return err
}
Expand All @@ -145,23 +162,15 @@ func (e *streamExecutor) Stream(supportedProtocols []string, stdin io.Reader, st
var streamer streamProtocolHandler

switch protocol {
case remotecommand.StreamProtocolV3Name:
streamer = newStreamProtocolV3(options)
case remotecommand.StreamProtocolV2Name:
streamer = &streamProtocolV2{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
streamer = newStreamProtocolV2(options)
case "":
glog.V(4).Infof("The server did not negotiate a streaming protocol version. Falling back to %s", remotecommand.StreamProtocolV1Name)
fallthrough
case remotecommand.StreamProtocolV1Name:
streamer = &streamProtocolV1{
stdin: stdin,
stdout: stdout,
stderr: stderr,
tty: tty,
}
streamer = newStreamProtocolV1(options)
}

return streamer.stream(conn)
Expand Down
17 changes: 12 additions & 5 deletions pkg/client/unversioned/remotecommand/remotecommand_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/server/remotecommand"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/httpstream"
"k8s.io/kubernetes/pkg/util/term"
)

type fakeExecutor struct {
Expand All @@ -52,11 +53,11 @@ type fakeExecutor struct {
exec bool
}

func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (ex *fakeExecutor) ExecInContainer(name string, uid types.UID, container string, cmd []string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ex.run(name, uid, container, cmd, in, out, err, tty)
}

func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool) error {
func (ex *fakeExecutor) AttachContainer(name string, uid types.UID, container string, in io.Reader, out, err io.WriteCloser, tty bool, resize <-chan term.Size) error {
return ex.run(name, uid, container, nil, in, out, err, tty)
}

Expand Down Expand Up @@ -253,7 +254,13 @@ func TestStream(t *testing.T) {
t.Errorf("%s: unexpected error: %v", name, err)
continue
}
err = e.Stream(testCase.ClientProtocols, streamIn, streamOut, streamErr, testCase.Tty)
err = e.Stream(StreamOptions{
SupportedProtocols: testCase.ClientProtocols,
Stdin: streamIn,
Stdout: streamOut,
Stderr: streamErr,
Tty: testCase.Tty,
})
hasErr := err != nil

if len(testCase.Error) > 0 {
Expand All @@ -277,13 +284,13 @@ func TestStream(t *testing.T) {

if len(testCase.Stdout) > 0 {
if e, a := strings.Repeat(testCase.Stdout, testCase.MessageCount), localOut; e != a.String() {
t.Errorf("%s: expected stdout data '%s', got '%s'", name, e, a)
t.Errorf("%s: expected stdout data %q, got %q", name, e, a)
}
}

if testCase.Stderr != "" {
if e, a := strings.Repeat(testCase.Stderr, testCase.MessageCount), localErr; e != a.String() {
t.Errorf("%s: expected stderr data '%s', got '%s'", name, e, a)
t.Errorf("%s: expected stderr data %q, got %q", name, e, a)
}
}

Expand Down
62 changes: 33 additions & 29 deletions pkg/client/unversioned/remotecommand/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,27 @@ import (
)

// streamProtocolV1 implements the first version of the streaming exec & attach
// protocol. This version has some bugs, such as not being able to detecte when
// protocol. This version has some bugs, such as not being able to detect when
// non-interactive stdin data has ended. See http://issues.k8s.io/13394 and
// http://issues.k8s.io/13395 for more details.
type streamProtocolV1 struct {
stdin io.Reader
stdout io.Writer
stderr io.Writer
tty bool
StreamOptions

errorStream httpstream.Stream
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wasn't the original point of the factoring so that this code wasn't tightly coupled to httpstream, so we could move to http2 at some point?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or is this code already coupled to httpstream anyway, and we'd just have a new protocol?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent some time a few months ago trying to do an http2 implementation of the httpstream interfaces. I'm not sure it's going to be possible, or at least it won't be really easy. It's been a few months since I wrote this PR, so I don't remember the exact reason why I switched from io.Reader/Writer to httpstream.Stream. I'd have to go back and look.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I remember why the v1 protocol has these as httpstream.Stream and not io.Reader etc - it's because the v1 protocol calls Reset on the streams. I'm updating v3 to use an io.Writer for resizeStream.

remoteStdin httpstream.Stream
remoteStdout httpstream.Stream
remoteStderr httpstream.Stream
}

var _ streamProtocolHandler = &streamProtocolV1{}

func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
func newStreamProtocolV1(options StreamOptions) streamProtocolHandler {
return &streamProtocolV1{
StreamOptions: options,
}
}

func (p *streamProtocolV1) stream(conn streamCreator) error {
doneChan := make(chan struct{}, 2)
errorChan := make(chan error)

Expand All @@ -55,19 +63,15 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
}
}

var (
err error
errorStream, remoteStdin, remoteStdout, remoteStderr httpstream.Stream
)

// set up all the streams first
var err error
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError)
errorStream, err = conn.CreateStream(headers)
p.errorStream, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer errorStream.Reset()
defer p.errorStream.Reset()

// Create all the streams first, then start the copy goroutines. The server doesn't start its copy
// goroutines until it's received all of the streams. If the client creates the stdin stream and
Expand All @@ -76,38 +80,38 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
// getting processed because the server hasn't started its copying, and it won't do that until it
// gets all the streams. By creating all the streams first, we ensure that the server is ready to
// process data before the client starts sending any. See https://issues.k8s.io/16373 for more info.
if e.stdin != nil {
if p.Stdin != nil {
headers.Set(api.StreamType, api.StreamTypeStdin)
remoteStdin, err = conn.CreateStream(headers)
p.remoteStdin, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdin.Reset()
defer p.remoteStdin.Reset()
}

if e.stdout != nil {
if p.Stdout != nil {
headers.Set(api.StreamType, api.StreamTypeStdout)
remoteStdout, err = conn.CreateStream(headers)
p.remoteStdout, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStdout.Reset()
defer p.remoteStdout.Reset()
}

if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
headers.Set(api.StreamType, api.StreamTypeStderr)
remoteStderr, err = conn.CreateStream(headers)
p.remoteStderr, err = conn.CreateStream(headers)
if err != nil {
return err
}
defer remoteStderr.Reset()
defer p.remoteStderr.Reset()
}

// now that all the streams have been created, proceed with reading & copying

// always read from errorStream
go func() {
message, err := ioutil.ReadAll(errorStream)
message, err := ioutil.ReadAll(p.errorStream)
if err != nil && err != io.EOF {
errorChan <- fmt.Errorf("Error reading from error stream: %s", err)
return
Expand All @@ -118,25 +122,25 @@ func (e *streamProtocolV1) stream(conn httpstream.Connection) error {
}
}()

if e.stdin != nil {
if p.Stdin != nil {
// TODO this goroutine will never exit cleanly (the io.Copy never unblocks)
// because stdin is not closed until the process exits. If we try to call
// stdin.Close(), it returns no error but doesn't unblock the copy. It will
// exit when the process exits, instead.
go cp(api.StreamTypeStdin, remoteStdin, e.stdin)
go cp(api.StreamTypeStdin, p.remoteStdin, p.Stdin)
}

waitCount := 0
completedStreams := 0

if e.stdout != nil {
if p.Stdout != nil {
waitCount++
go cp(api.StreamTypeStdout, e.stdout, remoteStdout)
go cp(api.StreamTypeStdout, p.Stdout, p.remoteStdout)
}

if e.stderr != nil && !e.tty {
if p.Stderr != nil && !p.Tty {
waitCount++
go cp(api.StreamTypeStderr, e.stderr, remoteStderr)
go cp(api.StreamTypeStderr, p.Stderr, p.remoteStderr)
}

Loop:
Expand Down