/
conn.go
138 lines (127 loc) · 4.71 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package http3
import (
"context"
"sync/atomic"
"github.com/mollyy0514/quic-go"
"github.com/mollyy0514/quic-go/internal/protocol"
"github.com/mollyy0514/quic-go/internal/utils"
"github.com/mollyy0514/quic-go/quicvarint"
)
type connection struct {
quicConn quic.Connection
perspective protocol.Perspective
logger utils.Logger
enableDatagrams bool
uniStreamHijacker func(StreamType, quic.Connection, quic.ReceiveStream, error) (hijacked bool)
settings *Settings
receivedSettings chan struct{}
}
func newConnection(
quicConn quic.Connection,
enableDatagrams bool,
uniStreamHijacker func(StreamType, quic.Connection, quic.ReceiveStream, error) (hijacked bool),
perspective protocol.Perspective,
logger utils.Logger,
) *connection {
return &connection{
quicConn: quicConn,
perspective: perspective,
logger: logger,
enableDatagrams: enableDatagrams,
uniStreamHijacker: uniStreamHijacker,
receivedSettings: make(chan struct{}),
}
}
func (c *connection) HandleUnidirectionalStreams() {
var (
rcvdControlStr atomic.Bool
rcvdQPACKEncoderStr atomic.Bool
rcvdQPACKDecoderStr atomic.Bool
)
for {
str, err := c.quicConn.AcceptUniStream(context.Background())
if err != nil {
c.logger.Debugf("accepting unidirectional stream failed: %s", err)
return
}
go func(str quic.ReceiveStream) {
streamType, err := quicvarint.Read(quicvarint.NewReader(str))
if err != nil {
if c.uniStreamHijacker != nil && c.uniStreamHijacker(StreamType(streamType), c.quicConn, str, err) {
return
}
c.logger.Debugf("reading stream type on stream %d failed: %s", str.StreamID(), err)
return
}
// We're only interested in the control stream here.
switch streamType {
case streamTypeControlStream:
case streamTypeQPACKEncoderStream:
if isFirst := rcvdQPACKEncoderStr.CompareAndSwap(false, true); !isFirst {
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate QPACK encoder stream")
}
// Our QPACK implementation doesn't use the dynamic table yet.
return
case streamTypeQPACKDecoderStream:
if isFirst := rcvdQPACKDecoderStr.CompareAndSwap(false, true); !isFirst {
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate QPACK decoder stream")
}
// Our QPACK implementation doesn't use the dynamic table yet.
return
case streamTypePushStream:
switch c.perspective {
case protocol.PerspectiveClient:
// we never increased the Push ID, so we don't expect any push streams
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeIDError), "")
case protocol.PerspectiveServer:
// only the server can push
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "")
}
return
default:
if c.uniStreamHijacker != nil && c.uniStreamHijacker(StreamType(streamType), c.quicConn, str, nil) {
return
}
str.CancelRead(quic.StreamErrorCode(ErrCodeStreamCreationError))
return
}
// Only a single control stream is allowed.
if isFirstControlStr := rcvdControlStr.CompareAndSwap(false, true); !isFirstControlStr {
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeStreamCreationError), "duplicate control stream")
return
}
f, err := parseNextFrame(str, nil)
if err != nil {
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeFrameError), "")
return
}
sf, ok := f.(*settingsFrame)
if !ok {
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeMissingSettings), "")
return
}
c.settings = &Settings{
EnableDatagram: sf.Datagram,
EnableExtendedConnect: sf.ExtendedConnect,
Other: sf.Other,
}
if c.receivedSettings != nil {
close(c.receivedSettings)
}
if !sf.Datagram {
return
}
// If datagram support was enabled on our side as well as on the server side,
// we can expect it to have been negotiated both on the transport and on the HTTP/3 layer.
// Note: ConnectionState() will block until the handshake is complete (relevant when using 0-RTT).
if c.enableDatagrams && !c.quicConn.ConnectionState().SupportsDatagrams {
c.quicConn.CloseWithError(quic.ApplicationErrorCode(ErrCodeSettingsError), "missing QUIC Datagram support")
}
}(str)
}
}
// ReceivedSettings returns a channel that is closed once the peer's SETTINGS frame was received.
func (c *connection) ReceivedSettings() <-chan struct{} { return c.receivedSettings }
// Settings returns the settings received on this connection.
// It is only valid to call this function after the channel returned by ReceivedSettings was closed.
func (c *connection) Settings() *Settings { return c.settings }