Skip to content

Commit

Permalink
add stream-timeout option to kubectl exec
Browse files Browse the repository at this point in the history
This patch implements a timeout option for remote streams. This allows a
maximum time to be specified for a remote shell to remain open. If a
`--stream-timeout` value `> 0` is provided, the stream executor will not
wait for the `stdout` and `stderr` streams of a remote shell to finish
copying, but will rather close the stream and exit with error `Timeout
exceeded for this operation`.

Although `kubectl exec` already supports the global
`request-timeout` option as well as the local `--timeout` option, the
former is only for making ther inital request with a `restclient`, and
the latter is the amount of time to wait before a pod is retrieved.
Therefore, it was necessary to introduce a third timeout option for
specifically timing out the command once inside of a remote shell.
  • Loading branch information
juanvallejo committed Dec 12, 2016
1 parent 12147a3 commit 7f7e699
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
23 changes: 22 additions & 1 deletion pkg/client/unversioned/remotecommand/remotecommand.go
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"net/http"
"net/url"
"time"

"github.com/golang/glog"

Expand All @@ -42,6 +43,7 @@ type StreamOptions struct {
Stderr io.Writer
Tty bool
TerminalSizeQueue term.TerminalSizeQueue
StreamTimeout time.Duration
}

// Executor is an interface for transporting shell-style streams.
Expand Down Expand Up @@ -175,5 +177,24 @@ func (e *streamExecutor) Stream(options StreamOptions) error {
streamer = newStreamProtocolV1(options)
}

return streamer.stream(conn)
streamChan := make(chan error)
go func() {
streamChan <- streamer.stream(conn)
}()

if options.StreamTimeout == 0 {
select {
case err, _ := <-streamChan:
return err
}
} else {
select {
case err, _ := <-streamChan:
return err
case <-time.After(5 * time.Second):
return fmt.Errorf("Timeout exceeded for this operation.")
}
}

return nil
}
16 changes: 13 additions & 3 deletions pkg/kubectl/cmd/exec.go
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"io"
"net/url"
"time"

dockerterm "github.com/docker/docker/pkg/term"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -80,18 +81,19 @@ func NewCmdExec(f cmdutil.Factory, cmdIn io.Reader, cmdOut, cmdErr io.Writer) *c
cmd.Flags().StringVarP(&options.ContainerName, "container", "c", "", "Container name. If omitted, the first container in the pod will be chosen")
cmd.Flags().BoolVarP(&options.Stdin, "stdin", "i", false, "Pass stdin to the container")
cmd.Flags().BoolVarP(&options.TTY, "tty", "t", false, "Stdin is a TTY")
cmdutil.AddRemoteStreamFlags(cmd)
return cmd
}

// RemoteExecutor defines the interface accepted by the Exec command - provided for test stubbing
type RemoteExecutor interface {
Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error
Execute(method string, url *url.URL, config *restclient.Config, streamTimeout time.Duration, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error
}

// DefaultRemoteExecutor is the standard implementation of remote command execution
type DefaultRemoteExecutor struct{}

func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restclient.Config, streamTimeout time.Duration, stdin io.Reader, stdout, stderr io.Writer, tty bool, terminalSizeQueue term.TerminalSizeQueue) error {
exec, err := remotecommand.NewExecutor(config, method, url)
if err != nil {
return err
Expand All @@ -103,6 +105,7 @@ func (*DefaultRemoteExecutor) Execute(method string, url *url.URL, config *restc
Stderr: stderr,
Tty: tty,
TerminalSizeQueue: terminalSizeQueue,
StreamTimeout: streamTimeout,
})
}

Expand All @@ -120,6 +123,11 @@ type StreamOptions struct {
Out io.Writer
Err io.Writer

// maximum amount of time to wait for stdout/stderr to finish copying
// in the remote shell. If value is "0" (default), the remote shell
// will remain open until stdout and stderr are finished copying.
StreamTimeout time.Duration

// for testing
overrideStreams func() (io.ReadCloser, io.Writer, io.Writer)
isTerminalIn func(t term.TTY) bool
Expand Down Expand Up @@ -159,6 +167,8 @@ func (p *ExecOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, argsIn []s
}
}

p.StreamTimeout = cmdutil.GetFlagDuration(cmd, "stream-timeout")

cmdParent := cmd.Parent()
if cmdParent != nil {
p.FullCmdName = cmdParent.CommandPath()
Expand Down Expand Up @@ -315,7 +325,7 @@ func (p *ExecOptions) Run() error {
TTY: t.Raw,
}, api.ParameterCodec)

return p.Executor.Execute("POST", req.URL(), p.Config, p.In, p.Out, p.Err, t.Raw, sizeQueue)
return p.Executor.Execute("POST", req.URL(), p.Config, p.StreamTimeout, p.In, p.Out, p.Err, t.Raw, sizeQueue)
}

if err := t.Safe(fn); err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubectl/cmd/util/helpers.go
Expand Up @@ -416,6 +416,10 @@ func AddGeneratorFlags(cmd *cobra.Command, defaultGenerator string) {
AddDryRunFlag(cmd)
}

func AddRemoteStreamFlags(cmd *cobra.Command) {
cmd.Flags().Duration("stream-timeout", 0, "Maximum amount of time to keep remote shell open. If 0, shell will remain open until an EOF is received. Defaults to 0.")
}

func ReadConfigDataFromReader(reader io.Reader, source string) ([]byte, error) {
data, err := ioutil.ReadAll(reader)
if err != nil {
Expand Down

0 comments on commit 7f7e699

Please sign in to comment.