forked from gravitational/teleport
-
Notifications
You must be signed in to change notification settings - Fork 0
/
client.go
375 lines (313 loc) · 10.9 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
// Teleport
// Copyright (C) 2024 Gravitational, Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package resumption
import (
"context"
"crypto/ecdh"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"io"
"net"
"regexp"
"strconv"
"time"
"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"
"github.com/gravitational/teleport/lib/multiplexer"
)
const (
replacementInterval = 3 * time.Minute
reconnectTimeout = time.Minute
minBackoff = 50 * time.Millisecond
maxBackoff = 10 * time.Second
)
var resumablePreludeLine = regexp.MustCompile(`^` +
regexp.QuoteMeta(serverProtocolStringV1) +
` ([0-9A-Za-z+/]{` + strconv.Itoa(base64.RawStdEncoding.EncodedLen(ecdhP256UncompressedSize)) + `}) ` + `([0-9a-z\-]+)\r\n$`)
// readServerVersionExchange returns the ECDH public key and the host ID
// extracted from a resumption v1 server version line. A triplet of (nil, "",
// nil) is returned if a server version line is peeked and is not a resumption
// v1 line.
func readServerVersionExchange(conn *multiplexer.Conn) (dhPubKey *ecdh.PublicKey, hostID string, err error) {
const maxVersionIdentifierSize = 255
line, err := peekLine(conn, maxVersionIdentifierSize)
if err != nil {
return
}
match := resumablePreludeLine.FindSubmatch(line)
if match == nil {
return nil, "", nil
}
var buf [ecdhP256UncompressedSize]byte
if n, err := base64.RawStdEncoding.Decode(buf[:], match[1]); err != nil {
return nil, "", trace.Wrap(err)
} else if n != ecdhP256UncompressedSize {
return nil, "", trace.Wrap(io.ErrUnexpectedEOF, "short ECDH encoding")
}
dhPubKey, err = ecdh.P256().NewPublicKey(buf[:])
if err != nil {
return nil, "", trace.Wrap(err)
}
hostID = string(match[2])
// discard is guaranteed to work for the line we just peeked
_, _ = conn.Discard(len(line))
return
}
// redialFunc should dial the given host; the connection is allowed to die with
// the passed context (to accommodate the Teleport gRPC transport).
type redialFunc = func(ctx context.Context, hostID string) (net.Conn, error)
// WrapSSHClientConn tries to detect if the server at the other end of nc is a
// resumption v1 server, and if so it returns a [net.Conn] that will
// transparently resume itself (using the provided redial func). If the
// connection is wrapped, the context applies to the lifetime of the returned
// connection, not just the duration of the function call.
func WrapSSHClientConn(ctx context.Context, nc net.Conn, redial redialFunc) (net.Conn, error) {
return wrapSSHClientConn(ctx, nc, redial, clockwork.NewRealClock())
}
func wrapSSHClientConn(ctx context.Context, nc net.Conn, redial redialFunc, clock clockwork.Clock) (net.Conn, error) {
dhKey, err := ecdh.P256().GenerateKey(rand.Reader)
if err != nil {
logrus.WithError(err).Error("Failed to generate ECDH key, proceeding without resumption (this is a bug).")
return nc, nil
}
// adds a read buffer with the ability to peek to nc
conn := ensureMultiplexerConn(nc)
// We must send the first 8 bytes of the version string to go through some
// older versions of the multiplexer that sits in front of the Teleport SSH
// server; thankfully, no matter which SSH client we'll end up using, it
// must send `SSH-2.0-` as its first 8 bytes, as per RFC 4253 ("The Secure
// Shell (SSH) Transport Layer Protocol") section 4.2. Sending only the
// first 8 bytes rather than a full version string is noncompliant behavior,
// but our SSH client is only intended to talk to Teleport-implemented SSH
// servers anyway, and other clients in the ecosystem do much worse
// (ssh-keyscan will wait for the server to send data before sending
// anything, for example).
if _, err := conn.Write([]byte(sshPrefix)); err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
dhPub, hostID, err := readServerVersionExchange(conn)
if err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
if dhPub == nil {
// regular SSH connection, conn is about to read the SSH- line from the
// server but we've sent sshPrefix already, so we have to skip it from
// the application side writes
logrus.Debug("Server does not support resumption, proceeding without.")
return &sshVersionSkipConn{
Conn: conn,
alreadyWritten: sshPrefix,
}, nil
}
dhSecret, err := dhKey.ECDH(dhPub)
if err != nil {
logrus.WithError(err).Warn("Failed to complete ECDH key exchange, proceeding without resumption.")
return &sshVersionSkipConn{
Conn: conn,
alreadyWritten: sshPrefix,
}, nil
}
otp32 := sha256.Sum256(dhSecret)
token := resumptionToken(otp32[:16])
if _, err := conn.Write([]byte(clientSuffixV1)); err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
if _, err := conn.Write(dhKey.PublicKey().Bytes()); err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
if _, err := conn.Write([]byte{newConnClientExchangeTag}); err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
resumableConn := newResumableConn(conn.LocalAddr(), conn.RemoteAddr())
// runClientResumable expects a brand new, locked *Conn
resumableConn.mu.Lock()
go runClientResumableUnlocking(ctx, resumableConn, conn, token, hostID, redial, clock)
return resumableConn, nil
}
// runClientResumableUnlocking expects firstConn to be ready to be passed to
// runResumeV1Unlocking, and will drive resumableConn until the connection is
// impossible to resume further or connCtx is done.
func runClientResumableUnlocking(ctx context.Context, resumableConn *Conn, firstConn net.Conn, token resumptionToken, hostID string, redial redialFunc, clock clockwork.Clock) {
defer resumableConn.Close()
// detached is held open by the current underlying connection
const isFirstConn = true
detached := goAttachResumableUnlocking(resumableConn, firstConn, isFirstConn)
reconnectTicker := clock.NewTicker(replacementInterval)
defer reconnectTicker.Stop()
for {
select {
case <-ctx.Done():
return
case <-reconnectTicker.Chan():
logrus.Debug("Attempting periodic reconnection.")
newConn, err := dialResumable(ctx, token, hostID, redial)
if err != nil {
logrus.Warnf("Periodic reconnection: %v.", err.Error())
continue
}
if newConn == nil {
logrus.Warn("Impossible to resume connection, giving up on periodic reconnection.")
reconnectTicker.Stop()
select {
case <-ctx.Done():
case <-detached:
}
return
}
resumableConn.mu.Lock()
const isNotFirstConn = false
detached = goAttachResumableUnlocking(resumableConn, newConn, isNotFirstConn)
continue
case <-detached:
}
logrus.Debug("Connection lost, starting reconnection loop.")
reconnectDeadline := time.Now().Add(reconnectTimeout)
backoff := minBackoff
for {
resumableConn.mu.Lock()
if resumableConn.localClosed {
resumableConn.mu.Unlock()
return
}
resumableConn.mu.Unlock()
if time.Now().After(reconnectDeadline) {
logrus.Error("Failed to reconnect to server after timeout.")
return
}
select {
case <-ctx.Done():
return
case <-time.After(backoff):
}
backoff = min(maxBackoff, backoff*2)
newConn, err := dialResumable(ctx, token, hostID, redial)
if err != nil {
logrus.Warnf("Reconnection attempt: %v.", err.Error())
continue
}
if newConn == nil {
logrus.Error("Impossible to resume connection.")
return
}
resumableConn.mu.Lock()
const isNotFirstConn = false
detached = goAttachResumableUnlocking(resumableConn, newConn, isNotFirstConn)
break
}
reconnectTicker.Reset(replacementInterval)
select {
case <-reconnectTicker.Chan():
default:
}
}
}
// goAttachResumableUnlocking runs the resumable protocol over nc in a
// background goroutine, with some client-friendly logging, returning a channel
// that gets closed at the end of the goroutine. resumableConn is expected to be
// locked, like runResumeV1Unlocking.
func goAttachResumableUnlocking(resumableConn *Conn, nc net.Conn, firstConn bool) <-chan struct{} {
done := make(chan struct{})
go func() {
defer close(done)
if firstConn {
logrus.Debug("Attaching new resumable connection.")
} else {
logrus.Debug("Attaching existing resumable connection.")
}
err := runResumeV1Unlocking(resumableConn, nc, firstConn)
if firstConn {
logrus.Debugf("Handling new resumable connection: %v", err.Error())
} else {
logrus.Debugf("Handling existing resumable connection: %v", err.Error())
}
}()
return done
}
// dialResumable attempts to resume a connection with a given token. A return
// value of nil, nil represents an impossibility to resume due to network
// conditions (or bugs). The returned connection is allowed to not outlive the
// context.
func dialResumable(ctx context.Context, token resumptionToken, hostID string, redial redialFunc) (*multiplexer.Conn, error) {
dhKey, err := ecdh.P256().GenerateKey(rand.Reader)
if err != nil {
return nil, trace.Wrap(err)
}
logrus.Debug("Dialing server for connection resumption.")
nc, err := redial(ctx, hostID)
if err != nil {
return nil, trace.Wrap(err)
}
if _, err := nc.Write([]byte(clientPreludeV1)); err != nil {
nc.Close()
return nil, trace.Wrap(err)
}
if _, err := nc.Write(dhKey.PublicKey().Bytes()); err != nil {
nc.Close()
return nil, trace.Wrap(err)
}
if _, err := nc.Write([]byte{existingConnClientExchangeTag}); err != nil {
nc.Close()
return nil, trace.Wrap(err)
}
conn := ensureMultiplexerConn(nc)
dhPub, _, err := readServerVersionExchange(conn)
if err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
if dhPub == nil {
conn.Close()
logrus.Error("Reached a server without resumption support, giving up.")
return nil, nil
}
dhSecret, err := dhKey.ECDH(dhPub)
if err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
otp32 := sha256.Sum256(dhSecret)
for i := 0; i < 16; i++ {
otp32[i] ^= token[i]
}
if _, err := conn.Write(otp32[:16]); err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
responseTag, err := conn.ReadByte()
if err != nil {
conn.Close()
return nil, trace.Wrap(err)
}
switch responseTag {
default:
logrus.Errorf("Received unknown response tag %v, giving up.", responseTag)
conn.Close()
return nil, nil
case notFoundServerExchangeTag, badAddressServerExchangeTag:
logrus.Errorf("Received error tag %v, giving up.", responseTag)
conn.Close()
return nil, nil
case successServerExchangeTag:
return conn, nil
}
}