-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathparser.go
266 lines (223 loc) · 8.14 KB
/
parser.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
package http
import (
"bufio"
"io"
"net/http"
"github.com/google/gopacket/reassembly"
"github.com/google/uuid"
"github.com/pkg/errors"
"github.com/akitasoftware/akita-libs/akinet"
"github.com/akitasoftware/akita-libs/buffer_pool"
"github.com/akitasoftware/akita-libs/memview"
)
var (
// Default maximum HTTP length supported.
// Can be altered by the CLI as a configuration setting, but doing so after parsing
// has started will be a race condition.
MaximumHTTPLength int64 = 1024 * 1024
)
// Parses a single HTTP request or response.
//
// Internally, this uses Go's HTTP parser. Go's parser is a synchronous one; we
// convert it into an asynchronous one by running it in a goroutine.
type httpParser struct {
// For sending incoming bytes to the parser goroutine.
w *io.PipeWriter
// The total number of bytes consumed from the stream being parsed.
totalBytesConsumed int64
// When anything is written to this channel, it indicates that the parser
// goroutine is done. The value written is the resulting error, if any.
readClosed chan error
// When anything is written to this channel, it indicates that the parser
// goroutine is done. The value written is the result of the parsing: an HTTP
// request or response.
resultChan chan akinet.ParsedNetworkContent
// Indicates whether this parser is for a request or a response.
isRequest bool
// Maximum length of HTTP request or response supported; larger requests or
// responses may be truncated.
maxHttpLength int64
}
var _ akinet.TCPParser = (*httpParser)(nil)
func (p *httpParser) Name() string {
if p.isRequest {
return "HTTP/1.x Request Parser"
}
return "HTTP/1.x Response Parser"
}
func (p *httpParser) Parse(input memview.MemView, isEnd bool) (result akinet.ParsedNetworkContent, unused memview.MemView, totalBytesConsumed int64, err error) {
var consumedBytes int64
defer func() {
totalBytesConsumed = p.totalBytesConsumed
if err == nil {
return
}
// Adjust the number of bytes that were read by the reader but were unused.
switch e := err.(type) {
case httpPipeReaderDone:
result = <-p.resultChan
unused = input.SubView(consumedBytes-int64(e), input.Len())
totalBytesConsumed -= unused.Len()
err = nil
case httpPipeReaderError:
err = e.err
default:
err = errors.Wrap(err, "encountered unknown HTTP pipe reader error")
}
}()
p.totalBytesConsumed += input.Len()
// The PipeWriter blocks until the reader is done consuming all the bytes.
consumedBytes, err = io.Copy(p.w, input.CreateReader())
if err != nil {
return
}
// The reader might close (aka parse complete) after the write returns, so we
// need to check. We force an empty write such that:
// - If the parse is indeed complete, the reader no longer consumes anything,
// so this call will block until the reader closes.
// - If the parse is not done yet, the empty write doesn't change things.
_, err = p.w.Write([]byte{})
if err != nil {
return
}
// If the reader has not closed yet, tell it we have no more input. This case
// happens if there's no content-length and we're reading until connection
// close.
//
// Also, if the HTTP request or response is longer than our maximum length,
// close the pipe anyway. This will leave the input stream in a state where it
// probably can't find the next header until the accumulated data in the
// reassembly buffer is all skipped.
if isEnd || p.totalBytesConsumed > p.maxHttpLength {
p.w.Close()
err = <-p.readClosed
}
return
}
func newHTTPParser(isRequest bool, bidiID akinet.TCPBidiID, seq, ack reassembly.Sequence, pool buffer_pool.BufferPool) *httpParser {
// Unfortunately, go's http request parser blocks. So we need to run it in a
// separate goroutine. This needs to be addressed as part of
// https://app.clubhouse.io/akita-software/story/600
// The channel on which the parsed HTTP request or response is sent.
resultChan := make(chan akinet.ParsedNetworkContent)
readClosed := make(chan error, 1)
r, w := io.Pipe()
go func() {
var req *http.Request
var resp *http.Response
var err error
br := bufio.NewReader(r)
// Create a buffer for the body.
//
// XXX This is used in a very non-local fashion. Consumers of the body are
// responsible for resetting the buffer, but there is no way to guarantee
// that this will happen.
body := pool.NewBuffer()
if isRequest {
req, err = readSingleHTTPRequest(br, body)
} else {
resp, err = readSingleHTTPResponse(br, body)
}
if err != nil {
err = httpPipeReaderError{
err: err,
unusedBytes: int64(br.Buffered()),
}
r.CloseWithError(err)
readClosed <- err
body.Release()
return
}
// Close the reader to signal to the pipe writer that result is ready.
err = httpPipeReaderDone(br.Buffered())
r.CloseWithError(err)
readClosed <- err
var c akinet.ParsedNetworkContent
if isRequest {
// Because HTTP requires the request to finish before sending a response,
// TCP ack number on the first segment of the HTTP request is equal to the
// TCP seq number on the first segment of the corresponding HTTP response.
// Hence we use it to differntiate differnt pairs of HTTP request and
// response on the same TCP stream.
c = akinet.FromStdRequest(uuid.UUID(bidiID), int(ack), req, body)
} else {
// Because HTTP requires the request to finish before sending a response,
// TCP ack number on the first segment of the HTTP request is equal to the
// TCP seq number on the first segment of the corresponding HTTP response.
// Hence we use it to differntiate differnt pairs of HTTP request and
// response on the same TCP stream.
c = akinet.FromStdResponse(uuid.UUID(bidiID), int(seq), resp, body)
}
resultChan <- c
}()
return &httpParser{
w: w,
resultChan: resultChan,
readClosed: readClosed,
isRequest: isRequest,
maxHttpLength: MaximumHTTPLength,
}
}
// Reads a single HTTP request, only consuming the exact number of bytes that
// form the request and its body, but there may be unused bytes left in the
// bufio.Reader's buffer. The request body is written into the given buffer.
func readSingleHTTPRequest(r *bufio.Reader, body buffer_pool.Buffer) (*http.Request, error) {
req, err := http.ReadRequest(r)
if err != nil {
return nil, err
}
if req.Body == nil {
return req, nil
}
// Read the body to move the reader's position to the end of the body.
_, bodyErr := io.Copy(body, req.Body)
req.Body.Close()
switch {
case
errors.Is(bodyErr, io.ErrUnexpectedEOF),
errors.Is(bodyErr, buffer_pool.ErrEmptyPool):
// Let the next level try to handle a body that was truncated.
bodyErr = nil
}
return req, bodyErr
}
// Reads a single HTTP response, only consuming the exact number of bytes that
// form the response and its body, but there may be unused bytes left in the
// bufio.Reader's buffer. The response body is written into the given buffer.
func readSingleHTTPResponse(r *bufio.Reader, body buffer_pool.Buffer) (*http.Response, error) {
// XXX BUG Because a nil http.Request is provided to ReadResponse, the http
// library assumes a GET request. If this is actually a response to a HEAD
// request and the Content-Length header is present, the library will treat
// the bytes after the end of the response as a response body.
resp, err := http.ReadResponse(r, nil)
if err != nil {
return nil, err
}
if resp.Body == nil {
return resp, nil
}
// Read the body to move the reader's position to the end of the body.
_, bodyErr := io.Copy(body, resp.Body)
resp.Body.Close()
switch {
case
errors.Is(bodyErr, io.ErrUnexpectedEOF),
errors.Is(bodyErr, buffer_pool.ErrEmptyPool):
// Let the next level try to handle a body that was truncated.
bodyErr = nil
}
return resp, bodyErr
}
// Indicates the pipe reader has successfully completed parsing. The integer
// specifies the number of bytes read from the pipe writer but were unused.
type httpPipeReaderDone int64
func (httpPipeReaderDone) Error() string {
return "HTTP pipe reader success"
}
type httpPipeReaderError struct {
err error // the actual err
unusedBytes int64 // number of bytes read from the pipe writer but were unused
}
func (e httpPipeReaderError) Error() string {
return e.err.Error()
}