forked from kubernetes/kubernetes
-
Notifications
You must be signed in to change notification settings - Fork 0
/
portforward.go
300 lines (255 loc) · 8.47 KB
/
portforward.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package portforward
import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
"github.com/golang/glog"
)
type upgrader interface {
upgrade(*client.Request, *client.Config) (httpstream.Connection, error)
}
type defaultUpgrader struct{}
func (u *defaultUpgrader) upgrade(req *client.Request, config *client.Config) (httpstream.Connection, error) {
return req.Upgrade(config, spdy.NewRoundTripper)
}
// PortForwarder knows how to listen for local connections and forward them to
// a remote pod via an upgraded HTTP request.
type PortForwarder struct {
req *client.Request
config *client.Config
ports []ForwardedPort
stopChan <-chan struct{}
streamConn httpstream.Connection
listeners []io.Closer
upgrader upgrader
Ready chan struct{}
}
// ForwardedPort contains a Local:Remote port pairing.
type ForwardedPort struct {
Local uint16
Remote uint16
}
/*
valid port specifications:
5000
- forwards from localhost:5000 to pod:5000
8888:5000
- forwards from localhost:8888 to pod:5000
0:5000
:5000
- selects a random available local port,
forwards from localhost:<random port> to pod:5000
*/
func parsePorts(ports []string) ([]ForwardedPort, error) {
var forwards []ForwardedPort
for _, portString := range ports {
parts := strings.Split(portString, ":")
var localString, remoteString string
if len(parts) == 1 {
localString = parts[0]
remoteString = parts[0]
} else if len(parts) == 2 {
localString = parts[0]
if localString == "" {
// support :5000
localString = "0"
}
remoteString = parts[1]
} else {
return nil, fmt.Errorf("Invalid port format '%s'", portString)
}
localPort, err := strconv.ParseUint(localString, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing local port '%s': %s", localString, err)
}
remotePort, err := strconv.ParseUint(remoteString, 10, 16)
if err != nil {
return nil, fmt.Errorf("Error parsing remote port '%s': %s", remoteString, err)
}
if remotePort == 0 {
return nil, fmt.Errorf("Remote port must be > 0")
}
forwards = append(forwards, ForwardedPort{uint16(localPort), uint16(remotePort)})
}
return forwards, nil
}
// New creates a new PortForwarder.
func New(req *client.Request, config *client.Config, ports []string, stopChan <-chan struct{}) (*PortForwarder, error) {
if len(ports) == 0 {
return nil, errors.New("You must specify at least 1 port")
}
parsedPorts, err := parsePorts(ports)
if err != nil {
return nil, err
}
return &PortForwarder{
req: req,
config: config,
ports: parsedPorts,
stopChan: stopChan,
Ready: make(chan struct{}),
}, nil
}
// ForwardPorts formats and executes a port forwarding request. The connection will remain
// open until stopChan is closed.
func (pf *PortForwarder) ForwardPorts() error {
defer pf.Close()
if pf.upgrader == nil {
pf.upgrader = &defaultUpgrader{}
}
var err error
pf.streamConn, err = pf.upgrader.upgrade(pf.req, pf.config)
if err != nil {
return fmt.Errorf("Error upgrading connection: %s", err)
}
defer pf.streamConn.Close()
return pf.forward()
}
// forward dials the remote host specific in req, upgrades the request, starts
// listeners for each port specified in ports, and forwards local connections
// to the remote host via streams.
func (pf *PortForwarder) forward() error {
var err error
listenSuccess := false
for _, port := range pf.ports {
err = pf.listenOnPort(&port)
if err != nil {
glog.Warningf("Unable to listen on port %d: %v", port, err)
}
listenSuccess = true
}
if !listenSuccess {
return fmt.Errorf("Unable to listen on any of the requested ports: %v", pf.ports)
}
close(pf.Ready)
// wait for interrupt or conn closure
select {
case <-pf.stopChan:
case <-pf.streamConn.CloseChan():
glog.Errorf("Lost connection to pod")
}
return nil
}
// listenOnPort creates a new listener on port and waits for new connections
// in the background.
func (pf *PortForwarder) listenOnPort(port *ForwardedPort) error {
listener, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port.Local))
if err != nil {
return err
}
parts := strings.Split(listener.Addr().String(), ":")
localPort, err := strconv.ParseUint(parts[1], 10, 16)
if err != nil {
return fmt.Errorf("Error parsing local part: %s", err)
}
port.Local = uint16(localPort)
glog.Infof("Forwarding from %d -> %d", localPort, port.Remote)
pf.listeners = append(pf.listeners, listener)
go pf.waitForConnection(listener, *port)
return nil
}
// waitForConnection waits for new connections to listener and handles them in
// the background.
func (pf *PortForwarder) waitForConnection(listener net.Listener, port ForwardedPort) {
for {
conn, err := listener.Accept()
if err != nil {
// TODO consider using something like https://github.com/hydrogen18/stoppableListener?
if !strings.Contains(strings.ToLower(err.Error()), "use of closed network connection") {
glog.Errorf("Error accepting connection on port %d: %v", port.Local, err)
}
return
}
go pf.handleConnection(conn, port)
}
}
// handleConnection copies data between the local connection and the stream to
// the remote server.
func (pf *PortForwarder) handleConnection(conn net.Conn, port ForwardedPort) {
defer conn.Close()
glog.Infof("Handling connection for %d", port.Local)
errorChan := make(chan error)
doneChan := make(chan struct{}, 2)
// create error stream
headers := http.Header{}
headers.Set(api.StreamType, api.StreamTypeError)
headers.Set(api.PortHeader, fmt.Sprintf("%d", port.Remote))
errorStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
glog.Errorf("Error creating error stream for port %d -> %d: %v", port.Local, port.Remote, err)
return
}
defer errorStream.Reset()
go func() {
message, err := ioutil.ReadAll(errorStream)
if err != nil && err != io.EOF {
errorChan <- fmt.Errorf("Error reading from error stream for port %d -> %d: %v", port.Local, port.Remote, err)
}
if len(message) > 0 {
errorChan <- fmt.Errorf("An error occurred forwarding %d -> %d: %v", port.Local, port.Remote, string(message))
}
}()
// create data stream
headers.Set(api.StreamType, api.StreamTypeData)
dataStream, err := pf.streamConn.CreateStream(headers)
if err != nil {
glog.Errorf("Error creating forwarding stream for port %d -> %d: %v", port.Local, port.Remote, err)
return
}
// Send a Reset when this function exits to completely tear down the stream here
// and in the remote server.
defer dataStream.Reset()
go func() {
// Copy from the remote side to the local port. We won't get an EOF from
// the server as it has no way of knowing when to close the stream. We'll
// take care of closing both ends of the stream with the call to
// stream.Reset() when this function exits.
if _, err := io.Copy(conn, dataStream); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error copying from remote stream to local connection: %v", err)
}
doneChan <- struct{}{}
}()
go func() {
// Copy from the local port to the remote side. Here we will be able to know
// when the Copy gets an EOF from conn, as that will happen as soon as conn is
// closed (i.e. client disconnected).
if _, err := io.Copy(dataStream, conn); err != nil && err != io.EOF && !strings.Contains(err.Error(), "use of closed network connection") {
glog.Errorf("Error copying from local connection to remote stream: %v", err)
}
doneChan <- struct{}{}
}()
select {
case err := <-errorChan:
glog.Error(err)
case <-doneChan:
}
}
func (pf *PortForwarder) Close() {
// stop all listeners
for _, l := range pf.listeners {
if err := l.Close(); err != nil {
glog.Errorf("Error closing listener: %v", err)
}
}
}