-
Notifications
You must be signed in to change notification settings - Fork 327
/
handler.go
253 lines (222 loc) · 7.25 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
/*
* Tencent is pleased to support the open source community by making TKEStack
* available.
*
* Copyright (C) 2012-2019 Tencent. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* https://opensource.org/licenses/Apache-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package webtty
import (
"encoding/base64"
"fmt"
"net/http"
"net/url"
"strings"
"time"
"github.com/gorilla/websocket"
"k8s.io/client-go/tools/remotecommand"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
"tkestack.io/tke/pkg/gateway/token"
"tkestack.io/tke/pkg/util/log"
)
const (
// endOfTransmission defines end transmission flag.
endOfTransmission = "\u0004"
// bufferSize specify I/O buffer sizes. If a buffer size is zero, then buffers
// allocated by the HTTP server are used. The I/O buffer sizes do not limit
// the size of the messages that can be sent or received.
bufferSize = 1024
// writeWait defines time allowed to write a message to the peer.
writeWait = 10 * time.Second
)
var wsUpgrader = websocket.Upgrader{
ReadBufferSize: bufferSize,
WriteBufferSize: bufferSize,
CheckOrigin: func(r *http.Request) bool {
return true
},
}
type handler struct {
url *url.URL
roundTripper http.RoundTripper
upgrader spdy.Upgrader
}
// NewHandler to create a reverse proxy handler and returns it.
// The reverse proxy will parse the requested cookie content, get the token in
// it, and append it as the http request header to the backend service component.
func NewHandler(address string) (http.Handler, error) {
u, err := url.Parse(address)
if err != nil {
log.Error("Failed to parse backend service address", log.String("address", address), log.Err(err))
return nil, err
}
cfg := &rest.Config{
Host: u.Host,
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
},
}
roundTripper, upgrader, err := spdy.RoundTripperFor(cfg)
if err != nil {
return nil, err
}
return &handler{
url: u,
roundTripper: roundTripper,
upgrader: upgrader,
}, nil
}
func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// read cookie
t, err := token.RetrieveToken(req)
if err != nil {
log.Error("Failed to retrieve token from webtty", log.Err(err))
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return
}
clusterName := req.URL.Query().Get("clusterName")
if clusterName == "" {
log.Error("Failed to get cluster name from webtty request")
http.Error(w, "Invalid cluster name", http.StatusBadRequest)
return
}
projectName := req.URL.Query().Get("projectName")
namespace := req.URL.Query().Get("namespace")
if namespace == "" {
log.Error("Failed to get namespace from webtty request")
http.Error(w, "Invalid namespace", http.StatusBadRequest)
return
}
podName := req.URL.Query().Get("podName")
if podName == "" {
log.Error("Failed to get pod name from webtty request")
http.Error(w, "Invalid pod name", http.StatusBadRequest)
return
}
containerName := req.URL.Query().Get("containerName")
if containerName == "" {
log.Error("Failed to get container name from webtty request")
http.Error(w, "Invalid container name", http.StatusBadRequest)
return
}
command := req.URL.Query().Get("command")
if command == "" {
command = "/bin/sh"
}
conn, err := wsUpgrader.Upgrade(w, req, nil)
if err != nil {
log.Error("Failed initialize websocket connection", log.Err(err))
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
u := fmt.Sprintf("/api/v1/namespaces/%s/pods/%s/exec?container=%s&command=%s&stdin=true&stdout=true&stderr=false&tty=true", namespace, podName, containerName, command)
reqURL, err := url.Parse(u)
if err != nil {
log.Error("Failed to generate pod exec url", log.Err(err))
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
reqURL.Host = h.url.Host
reqURL.Scheme = h.url.Scheme
executor, err := NewSPDYExecutorForTransports(h.roundTripper, h.upgrader, http.MethodPost, reqURL, clusterName, projectName, strings.TrimSpace(t.ID))
if err != nil {
log.Error("Failed to create SPDY executor", log.Err(err))
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
handler := &streamHandler{conn: conn, resizeEvent: make(chan remotecommand.TerminalSize)}
if err := executor.Stream(remotecommand.StreamOptions{
Stdin: handler,
Stdout: handler,
Stderr: handler,
TerminalSizeQueue: handler,
Tty: true,
}); err != nil {
log.Error("Failed to stream exec command", log.Err(err))
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
}
type streamHandler struct {
conn *websocket.Conn
resizeEvent chan remotecommand.TerminalSize
}
type xtermMessage struct {
Input string `json:"input"`
MsgType string `json:"type"`
Rows uint16 `json:"rows"`
Cols uint16 `json:"cols"`
}
// Next handles pty->process resize events, called in a loop from remotecommand
// as long as the process is running.
func (handler *streamHandler) Next() *remotecommand.TerminalSize {
select {
case size := <-handler.resizeEvent:
return &size
}
}
// Read handles pty->process messages (stdin, resize), called in a loop from
// remotecommand as long as the process is running.
func (handler *streamHandler) Read(p []byte) (int, error) {
var xtermMsg xtermMessage
if err := handler.conn.ReadJSON(&xtermMsg); err != nil {
log.Error("Failed to read and unmarshal message from websocket", log.Err(err))
return copy(p, endOfTransmission), err
}
switch xtermMsg.MsgType {
case "resize":
handler.resizeEvent <- remotecommand.TerminalSize{
Width: xtermMsg.Cols,
Height: xtermMsg.Rows,
}
return 0, nil
case "input":
if len(xtermMsg.Input) == 0 {
return 0, nil
}
input, err := decodeInputMessage(xtermMsg.Input)
if err != nil {
return copy(p, endOfTransmission), err
}
return copy(p, input), nil
default:
log.Error("Unknown message type from websocket", log.String("type", xtermMsg.MsgType))
return copy(p, endOfTransmission), fmt.Errorf("unknown message type %s from websocket", xtermMsg.MsgType)
}
}
// Write handles process->pty stdout, called from remotecommand whenever there
// is any output.
func (handler *streamHandler) Write(p []byte) (int, error) {
size := len(p)
if size == 0 {
return 0, nil
}
output := encodeOutputMessage(p)
_ = handler.conn.SetWriteDeadline(time.Now().Add(writeWait))
if err := handler.conn.WriteMessage(websocket.TextMessage, []byte(output)); err != nil {
log.Error("Failed write message to websocket connection", log.Err(err))
return size, err
}
return size, nil
}
// Close shutdown the websocket connection.
func (handler *streamHandler) Close() error {
return handler.conn.Close()
}
func decodeInputMessage(input string) ([]byte, error) {
return base64.StdEncoding.DecodeString(input)
}
func encodeOutputMessage(output []byte) string {
return base64.StdEncoding.EncodeToString(output)
}