forked from google/martian
/
relay.go
618 lines (551 loc) · 19.4 KB
/
relay.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
// Copyright 2021 Google Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package h2
import (
"bytes"
"container/list"
"errors"
"fmt"
"io"
"math"
"sync"
"sync/atomic"
"github.com/google/martian/v3/log"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
)
const (
// See: https://httpwg.org/specs/rfc7540.html#SettingValues
initialMaxFrameSize = 16384
initialMaxHeaderTableSize = 4096
// See: https://tools.ietf.org/html/rfc7540#section-6.9.2
defaultInitialWindowSize = 65535
// headersPriorityMetadataLength is the length of the priority metadata that optionally occurs at
// the beginning of the payload of the header frame.
//
// See: https://tools.ietf.org/html/rfc7540#section-6.2
headersPriorityMetadataLength = 5
// pushPromiseMetadataLength is the length of the metadata that is part of the payload of the
// pushPromise frame. This does not include the padding length octet, which isn't needed due to
// the relaxed security constraints of a development proxy.
//
// See: https://tools.ietf.org/html/rfc7540#section-6.6
pushPromiseMetadataLength = 4
// outputChannelSize is the size of the output channel. Roughly, it should be large enough to
// allow a window's worth of frames to minimize synchronization overhead.
outputChannelSize = 15
)
// relay encapsulates a flow of h2 traffic in one direction.
type relay struct {
dir Direction
// srcLabel and destLabel are used only to create debugging messages.
srcLabel, destLabel string
src *http2.Framer
// destMu guards writes to dest, which may occur on from either the `relayFrames` thread of
// this relay or `peer`. `peer` writes WINDOW_UPDATE frames to this relay when it receives
// DATA frames.
destMu sync.Mutex
dest *http2.Framer
// maxFrameSize is set by the peer relay and is accessed atomically.
maxFrameSize uint32
// The decoder and encoder settings can be adjusted by the peer connection so access to these
// fields must be guarded.
decoderMu sync.Mutex
decoder *hpack.Decoder
encoderMu sync.Mutex
encoder *hpack.Encoder
reencoded bytes.Buffer // handle to the output buffer of `encoder`
// headerBuffer collects header fragments that are received across multiple frames, i.e.,
// when there are continuation frames.
headerBuffer bytes.Buffer
continuationState continuationState
// flowMu guards access to flow-control related fields.
flowMu sync.Mutex
initialWindowSize uint32
connectionWindowSize int // "global" connection-level window size
// outputBuffers is output pending available window size per-stream
outputBuffers map[uint32]*outputBuffer
// output stores stream output that is ready to be sent over HTTP/2. It provides a way to
// guarantee frame order without blocking on each frame being sent.
output chan queuedFrame
enableDebugLogs *bool
// The following fields depend on a circular dependency between the relays in opposite directions
// so must be set explicitly after initialization.
// processors stores per HTTP/2 stream processors.
processors *streamProcessors
peer *relay // relay for traffic from the peer
}
// newRelay initializes a relay for the given direction. This performs only partial initialization
// due to circular dependency.
func newRelay(
dir Direction,
srcLabel, destLabel string,
src, dest *http2.Framer,
enableDebugLogs *bool,
) *relay {
ret := &relay{
dir: dir,
srcLabel: srcLabel,
destLabel: destLabel,
src: src,
dest: dest,
maxFrameSize: initialMaxFrameSize,
decoder: hpack.NewDecoder(initialMaxHeaderTableSize, nil),
initialWindowSize: defaultInitialWindowSize,
connectionWindowSize: defaultInitialWindowSize,
outputBuffers: make(map[uint32]*outputBuffer),
output: make(chan queuedFrame, outputChannelSize),
enableDebugLogs: enableDebugLogs,
}
ret.encoder = hpack.NewEncoder(&ret.reencoded)
// These limits seem to be part of the Go implementation of hpack. They exist because in a
// production system, there must be limits on the resources requested by clients. However, this
// is irrevelevant in a development proxy context.
ret.decoder.SetAllowedMaxDynamicTableSize(math.MaxUint32)
ret.encoder.SetMaxDynamicTableSizeLimit(math.MaxUint32)
return ret
}
// relayFrames reads frames from `f.src` to `f.dest` until an error occurs or the connection closes.
func (r *relay) relayFrames(closing chan bool) error {
// Shutting down producer-consumers linked by channels is subtle. In this function, the writer
// goroutine consumes frames from `r.output`, which are populated by the reader goroutine. If
// the writer shuts down before the reader, the reader may deadlock on inserting frames into
// `r.output`. The writer therefore has to keep processing until the reader is done. This is
// coordinated via `readerDone`.
//
// A second subtlely is that errors on the writer goroutine should stop the reader goroutine.
// This is communicated via `writeErr`. To avoid deadlocks, even after the error occurs, the
// writer thread must still wait until `readerDone` has been communicated to stop processing.
// Communicates to the consuming writer goroutine that the reader (the calling goroutine of this
// method) is done.
readerDone := make(chan struct{})
defer func() { readerDone <- struct{}{} }()
// Communicates errors occuring on the writer goroutine to the reader goroutine.
writerErr := make(chan error, 1)
// This writer goroutine consumes the strictly ordered frames in `r.output` and delivers them.
go func() {
var err error
for {
select {
case f := <-r.output:
if err == nil {
r.destMu.Lock()
err = f.send(r.dest)
r.destMu.Unlock()
if err != nil {
writerErr <- err
}
}
// Once an output error has occurred, the remaining frames are drained from the channel
// without sending them.
case <-readerDone:
return
}
}
}()
// This channel is buffered to allow the ReadFrame goroutine to drain on closing.
frameReady := make(chan struct{}, 1)
for {
var frame http2.Frame
var err error
go func() {
// ReadFrame is called in its own goroutine to make this function responsive to closing. It
// does not need to block here to close.
frame, err = r.src.ReadFrame()
frameReady <- struct{}{}
}()
select {
case <-frameReady:
if err != nil {
if err == io.EOF {
return nil
}
return fmt.Errorf("reading frame: %w", err)
}
if err := r.processFrame(frame); err != nil {
return fmt.Errorf("processing frame: %w", err)
}
if *r.enableDebugLogs {
log.Infof("%s--%v-->%s", r.srcLabel, frame, r.destLabel)
}
case err := <-writerErr:
return fmt.Errorf("sending frame: %w", err)
case <-closing:
// The ReadFrame goroutine is abandoned at this point. It completes as soon as the blocking
// ReadFrame call completes, but could potentially leak for an unspecified duration.
return nil
}
}
}
func (r *relay) processFrame(f http2.Frame) error {
var err error
switch f := f.(type) {
case *http2.DataFrame:
// The proxy's window increments as soon as it receives data. This assumes that the proxy has
// ample resources because it is inteded for testing and development.
if err = r.peer.sendWindowUpdates(f); err == nil {
err = r.processor(f.StreamID).Data(f.Data(), f.StreamEnded())
}
case *http2.HeadersFrame:
if !f.HeadersEnded() {
r.headerBuffer.Reset()
r.headerBuffer.Write(f.HeaderBlockFragment())
r.continuationState = &headerContinuation{f.Priority}
} else {
var headers []hpack.HeaderField
headers, err = r.decodeFull(f.HeaderBlockFragment())
if err != nil {
return fmt.Errorf("decoding header %v: %w", f, err)
}
err = r.processor(f.StreamID).Header(headers, f.StreamEnded(), f.Priority)
}
case *http2.PriorityFrame:
err = r.processor(f.StreamID).Priority(f.PriorityParam)
case *http2.RSTStreamFrame:
err = r.processor(f.StreamID).RSTStream(f.ErrCode)
case *http2.SettingsFrame:
if f.IsAck() {
r.destMu.Lock()
err = r.dest.WriteSettingsAck()
r.destMu.Unlock()
} else {
var settings []http2.Setting
if err = f.ForeachSetting(func(s http2.Setting) error {
switch s.ID {
case http2.SettingHeaderTableSize:
r.peer.updateTableSize(s.Val)
case http2.SettingInitialWindowSize:
r.peer.updateInitialWindowSize(s.Val)
case http2.SettingMaxFrameSize:
r.peer.updateMaxFrameSize(s.Val)
}
settings = append(settings, s)
return nil
}); err == nil {
r.destMu.Lock()
err = r.dest.WriteSettings(settings...)
r.destMu.Unlock()
}
}
case *http2.PushPromiseFrame:
if !f.HeadersEnded() {
r.headerBuffer.Reset()
r.headerBuffer.Write(f.HeaderBlockFragment())
r.continuationState = &pushPromiseContinuation{f.PromiseID}
} else {
var headers []hpack.HeaderField
headers, err = r.decodeFull(f.HeaderBlockFragment())
if err != nil {
return fmt.Errorf("decoding push promise %v: %w", f, err)
}
err = r.processor(f.StreamID).PushPromise(f.PromiseID, headers)
}
case *http2.PingFrame:
r.destMu.Lock()
err = r.dest.WritePing(f.IsAck(), f.Data)
r.destMu.Unlock()
case *http2.GoAwayFrame:
r.destMu.Lock()
err = r.dest.WriteGoAway(f.LastStreamID, f.ErrCode, f.DebugData())
r.destMu.Unlock()
case *http2.WindowUpdateFrame:
r.peer.updateWindow(f)
case *http2.ContinuationFrame:
r.headerBuffer.Write(f.HeaderBlockFragment())
if f.HeadersEnded() {
var headers []hpack.HeaderField
headers, err = r.decodeFull(r.headerBuffer.Bytes())
if err != nil {
return fmt.Errorf("decoding headers for continuation %v: %w", f, err)
}
err = r.continuationState.complete(r.processor(f.StreamID), headers)
}
default:
err = errors.New("unrecognized frame type")
}
return err
}
func (r *relay) processor(id uint32) Processor {
return r.processors.Get(id, r.dir)
}
func (r *relay) updateTableSize(v uint32) {
r.decoderMu.Lock()
r.decoder.SetMaxDynamicTableSize(v)
r.decoderMu.Unlock()
r.encoderMu.Lock()
r.encoder.SetMaxDynamicTableSize(v)
r.encoderMu.Unlock()
}
func (r *relay) updateMaxFrameSize(v uint32) {
atomic.StoreUint32(&r.maxFrameSize, v)
}
// updateInitialWindowSize updates the initial window size and updates all stream windows based on
// the difference. Note that this should not include the connection window.
// See: https://tools.ietf.org/html/rfc7540#section-6.9.2
//
// This is called by `peer`, so requires a thread-safe implementation.
func (r *relay) updateInitialWindowSize(v uint32) {
r.flowMu.Lock()
delta := int(v) - int(r.initialWindowSize)
r.initialWindowSize = v
for _, w := range r.outputBuffers {
w.windowSize += delta
}
r.flowMu.Unlock()
// Since all the stream windows may be impacted, all the queues need to be checked for newly
// eligible frames.
r.sendQueuedFramesUnderWindowSize()
}
// updateWindow updates the specified window size and may result in the sending of data frames.
func (r *relay) updateWindow(f *http2.WindowUpdateFrame) {
if f.StreamID == 0 {
// A stream ID of 0 means updating the global connection window size. This may cause any
// queued frame belonging to any stream to become eligible for sending.
r.flowMu.Lock()
r.connectionWindowSize += int(f.Increment)
r.flowMu.Unlock()
r.sendQueuedFramesUnderWindowSize()
}
r.flowMu.Lock()
w := r.outputBuffer(f.StreamID)
w.windowSize += int(f.Increment)
w.emitEligibleFrames(r.output, &r.connectionWindowSize)
r.flowMu.Unlock()
}
func (r *relay) data(id uint32, data []byte, streamEnded bool) error {
// This implementation only allows `WriteData` without padding. Padding is used to improve the
// security against attacks like CRIME, but this isn't relevant for a development proxy.
//
// If padding were allowed, this length would need to vary depending on whether the padding
// length octet is present.
maxPayloadLength := atomic.LoadUint32(&r.maxFrameSize)
r.flowMu.Lock()
w := r.outputBuffer(id)
r.flowMu.Unlock()
// If data is larger than what would be permitted at the current max frame size setting, the data
// is split across multiple frames.
for {
nextPayloadLength := uint32(len(data))
if nextPayloadLength > maxPayloadLength {
nextPayloadLength = maxPayloadLength
}
nextPayload := make([]byte, nextPayloadLength)
copy(nextPayload, data)
data = data[nextPayloadLength:]
f := &queuedDataFrame{id, streamEnded && len(data) == 0, nextPayload}
r.flowMu.Lock()
w.enqueue(f)
w.emitEligibleFrames(r.output, &r.connectionWindowSize)
r.flowMu.Unlock()
// Some protocols send empty data frames with END_STREAM so the check is done here at the end
// of the loop instead of at the beginning of the loop.
if len(data) == 0 {
break
}
}
return nil
}
func (r *relay) header(
id uint32,
headers []hpack.HeaderField,
streamEnded bool,
priority http2.PriorityParam,
) error {
encoded, err := r.encodeFull(headers)
if err != nil {
return fmt.Errorf("encoding headers %v: %w", headers, err)
}
maxPayloadLength := atomic.LoadUint32(&r.maxFrameSize)
// Padding is not implemented because the extra security is not needed for a development proxy.
// If it were used, a single padding length octet should be deducted from the max header fragment
// length.
maxHeaderFragmentLength := maxPayloadLength
if !priority.IsZero() {
maxHeaderFragmentLength -= headersPriorityMetadataLength
}
chunks := splitIntoChunks(int(maxHeaderFragmentLength), int(maxPayloadLength), encoded)
r.enqueueFrame(&queuedHeaderFrame{
streamID: id,
endStream: streamEnded,
priority: priority,
chunks: chunks,
})
return nil
}
func (r *relay) priority(id uint32, priority http2.PriorityParam) {
r.enqueueFrame(&queuedPriorityFrame{
streamID: id,
priority: priority,
})
}
func (r *relay) rstStream(id uint32, errCode http2.ErrCode) {
r.enqueueFrame(&queuedRSTStreamFrame{
streamID: id,
errCode: errCode,
})
}
func (r *relay) pushPromise(id, promiseID uint32, headers []hpack.HeaderField) error {
encoded, err := r.encodeFull(headers)
if err != nil {
return fmt.Errorf("encoding push promise headers %v: %w", headers, err)
}
maxPayloadLength := atomic.LoadUint32(&r.maxFrameSize)
maxHeaderFragmentLength := maxPayloadLength - pushPromiseMetadataLength
chunks := splitIntoChunks(int(maxHeaderFragmentLength), int(maxPayloadLength), encoded)
r.enqueueFrame(&queuedPushPromiseFrame{
streamID: id,
promiseID: promiseID,
chunks: chunks,
})
return nil
}
func (r *relay) enqueueFrame(f queuedFrame) {
// The frame is first added to the appropriate stream.
r.flowMu.Lock()
w := r.outputBuffer(f.StreamID())
w.enqueue(f)
w.emitEligibleFrames(r.output, &r.connectionWindowSize)
r.flowMu.Unlock()
}
func (r *relay) sendQueuedFramesUnderWindowSize() {
r.flowMu.Lock()
for _, w := range r.outputBuffers {
w.emitEligibleFrames(r.output, &r.connectionWindowSize)
}
r.flowMu.Unlock()
}
// outputBuffer returns the outputBuffer instance for the given stream, creating one if needed.
//
// This method is not thread-safe. The caller should be holding `flowMu`.
func (r *relay) outputBuffer(streamID uint32) *outputBuffer {
w, ok := r.outputBuffers[streamID]
if !ok {
w = &outputBuffer{
windowSize: int(r.initialWindowSize),
}
r.outputBuffers[streamID] = w
}
return w
}
// sendWindowUpdates sends WINDOW_UPDATE frames effectively acknowledging consumption of the
// given data frame.
func (r *relay) sendWindowUpdates(f *http2.DataFrame) error {
if len(f.Data()) <= 0 {
return nil
}
r.destMu.Lock()
defer r.destMu.Unlock()
// First updates the connection level window.
if err := r.dest.WriteWindowUpdate(0, uint32(len(f.Data()))); err != nil {
return err
}
// Next updates the stream specific window.
return r.dest.WriteWindowUpdate(f.StreamID, uint32(len(f.Data())))
}
func (r *relay) decodeFull(data []byte) ([]hpack.HeaderField, error) {
r.decoderMu.Lock()
defer r.decoderMu.Unlock()
return r.decoder.DecodeFull(data)
}
func (r *relay) encodeFull(headers []hpack.HeaderField) ([]byte, error) {
r.encoderMu.Lock()
defer r.encoderMu.Unlock()
r.reencoded.Reset()
var buf bytes.Buffer
for _, h := range headers {
if *r.enableDebugLogs {
if h.Name == "content-type" && h.Value == "application/grpc" {
fmt.Fprintf(&buf, " \u001b[1;36m%v\u001b[0m\n", h)
} else {
fmt.Fprintf(&buf, " %v\n", h)
}
}
if err := r.encoder.WriteField(h); err != nil {
return nil, fmt.Errorf("reencoding header field %v in %v: %w", h, headers, err)
}
}
if *r.enableDebugLogs {
log.Infof("sending headers %s -> %s:\n%s", r.srcLabel, r.destLabel, buf.Bytes())
}
return r.reencoded.Bytes(), nil
}
// outputBuffer stores enqueued output frames for a given stream.
type outputBuffer struct {
// windowSize indicates how much data the receiver is ready to process.
windowSize int
queue list.List // contains queuedFrame elements
}
// emitEligibleFrames emits frames that would fit under both the stream window size and the
// given connection window size. It updates the given connectionWindowSize if applicable.
//
// This is not thread-safe. The caller should be holding `relay.flowMu`.
func (w *outputBuffer) emitEligibleFrames(output chan queuedFrame, connectionWindowSize *int) {
for e := w.queue.Front(); e != nil; {
f := e.Value.(queuedFrame)
if f.flowControlSize() > *connectionWindowSize || f.flowControlSize() > w.windowSize {
break
}
output <- f
*connectionWindowSize -= f.flowControlSize()
w.windowSize -= f.flowControlSize()
next := e.Next()
w.queue.Remove(e)
e = next
}
}
// enqueue adds the frame to this stream output. This is not thread-safe. The caller must hold
// relay.flowMu.
func (w *outputBuffer) enqueue(f queuedFrame) {
w.queue.PushBack(f)
}
// continuationState holds the context needed to interpret CONTINUATION frames, specifically whether
// the parents were HEADERS or PUSH_PROMISE frames.
type continuationState interface {
complete(s Processor, headers []hpack.HeaderField) error
}
type headerContinuation struct {
priority http2.PriorityParam
}
func (h *headerContinuation) complete(s Processor, headers []hpack.HeaderField) error {
return s.Header(headers, true, h.priority)
}
type pushPromiseContinuation struct {
promiseID uint32
}
func (p *pushPromiseContinuation) complete(s Processor, headers []hpack.HeaderField) error {
return s.PushPromise(p.promiseID, headers)
}
// splitIntoChunks splits header payloads into chunks that respect frame size limits.
func splitIntoChunks(firstChunkMax, continuationMax int, data []byte) [][]byte {
var chunks [][]byte
firstChunkLength := len(data)
if firstChunkLength > firstChunkMax {
firstChunkLength = firstChunkMax
}
buf := make([]byte, firstChunkLength)
copy(buf, data[:firstChunkLength])
chunks = append(chunks, buf)
remaining := data[firstChunkLength:]
for len(remaining) > 0 {
nextChunkLength := len(remaining)
if nextChunkLength > continuationMax {
nextChunkLength = continuationMax
}
buf = make([]byte, nextChunkLength)
copy(buf, remaining[:nextChunkLength])
chunks = append(chunks, buf)
remaining = remaining[nextChunkLength:]
}
return chunks
}