/
exec.go
112 lines (93 loc) · 3.14 KB
/
exec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
package kubectl
import (
"bytes"
"io"
"net/http"
"os"
"github.com/covexo/devspace/pkg/util/terminal"
k8sv1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/transport/spdy"
"k8s.io/kubernetes/pkg/api/legacyscheme"
k8sapi "k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubectl/util/term"
)
// ExecStreamWithTransport executes a kubectl exec with given transport round tripper and upgrader
func ExecStreamWithTransport(transport http.RoundTripper, upgrader spdy.Upgrader, client *kubernetes.Clientset, pod *k8sv1.Pod, container string, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) error {
var t term.TTY
var sizeQueue remotecommand.TerminalSizeQueue
var streamOptions remotecommand.StreamOptions
execRequest := client.Core().RESTClient().Post().
Resource("pods").
Name(pod.Name).
Namespace(pod.Namespace).
SubResource("exec")
if tty {
t = terminal.SetupTTY(stdin, stdout)
if t.Raw {
// this call spawns a goroutine to monitor/update the terminal size
sizeQueue = t.MonitorSize(t.GetSize())
}
streamOptions = remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
Tty: t.Raw,
TerminalSizeQueue: sizeQueue,
}
} else {
streamOptions = remotecommand.StreamOptions{
Stdin: stdin,
Stdout: stdout,
Stderr: stderr,
}
}
execRequest.VersionedParams(&k8sapi.PodExecOptions{
Container: container,
Command: command,
Stdin: stdin != nil,
Stdout: stdout != nil,
Stderr: stderr != nil,
TTY: t.Raw,
}, legacyscheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutorForTransports(transport, upgrader, "POST", execRequest.URL())
if err != nil {
return err
}
return t.Safe(func() error {
return exec.Stream(streamOptions)
})
}
// ExecStream executes a command and streams the output to the given streams
func ExecStream(client *kubernetes.Clientset, pod *k8sv1.Pod, container string, command []string, tty bool, stdin io.Reader, stdout io.Writer, stderr io.Writer) error {
kubeconfig, err := GetClientConfig()
if err != nil {
return err
}
wrapper, upgradeRoundTripper, err := spdy.RoundTripperFor(kubeconfig)
if err != nil {
return err
}
return ExecStreamWithTransport(wrapper, upgradeRoundTripper, client, pod, container, command, tty, stdin, stdout, stderr)
}
// ExecBuffered executes a command for kubernetes and returns the output and error buffers
func ExecBuffered(kubectlClient *kubernetes.Clientset, pod *k8sv1.Pod, container string, command []string) ([]byte, []byte, error) {
stdoutReader, stdoutWriter, _ := os.Pipe()
stderrReader, stderrWriter, _ := os.Pipe()
err := ExecStream(kubectlClient, pod, container, command, false, nil, stdoutWriter, stderrWriter)
if err != nil {
return nil, nil, err
}
stdoutBuffer := &bytes.Buffer{}
stderrBuffer := &bytes.Buffer{}
_, err = stdoutBuffer.ReadFrom(stdoutReader)
if err != nil {
return nil, nil, err
}
_, err = stderrBuffer.ReadFrom(stderrReader)
if err != nil {
return nil, nil, err
}
return stdoutBuffer.Bytes(), stderrBuffer.Bytes(), nil
}