/
Response.pm6
374 lines (286 loc) · 13.6 KB
/
Response.pm6
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
use v6;
unit class HTTP::Supply::Response;
use HTTP::Supply;
use HTTP::Supply::Body;
use HTTP::Supply::Tools;
=begin pod
=NAME HTTP::Supply::Response - A modern HTTP/1.x response parser
=begin SYNOPSIS
use HTTP::Supply::Response;
my @pipeline = <
/stuff
/things
/more-stuff
>;
sub fetch-next($conn) {
my $uri = @pipeline.pop;
with $uri {
$conn.print("GET $uri HTTP/1.1\r\n");
True;
}
else {
$conn.close;
False;
}
}
react {
whenever IO::Socket::Async.connect('localhost', 8080) -> $conn {
fetch-next($conn);
whenever HTTP::Supply::Response.parse-http($conn) -> $res {
if $res[0] == 200 {
$res[2].reduce({ $^a ~ $^b }).decode('utf8').say;
done unless fetch-next($conn);
}
else {
die "Bad things happened: $res[0] {$res[1].grep(*.key eq '::server-reason-phrase'}";
}
}
}
}
=end SYNOPSIS
=begin DESCRIPTION
B<EXPERIMENTAL:> The API for this module is experimental and may change.
This class provides C<method parse-http> that parses incoming data from a
L<Supply> that is expected to emit raw binary data. It returns a Supply that
emits a response for each HTTP/1.x response frame parsed from the incoming data.
Each frame is returned as it arrives asynchronously.
This Supply emits an extended L<P6WAPI> response for use by the caller. If a
problem is detected in the stream, it will quit with an exception.
=end DESCRIPTION
=head1 METHODS
=ehad2 method parse-http
method parse-http(HTTP::Supply::Response: Supply:D() $conn, Bool :$debug = False) returns Supply:D
THe given L<Supply>, C<$conn>, must emit a stream of bytes. Any other data will
result in undefined behavior. The parser assumes that only binary bytes will be
sent and makes no particular effort to verify that assumption.
The returns Supply will emit a response whenever a response frame can be parsed
from the input supply. The response will be emitted as soon as the header has
been read. The response includes a supply containing the body, which will be
emitted as more binary bytes as it arrives.
The response is emitted as an extended L<P6WAPI> response. It will be a
L<Positional> object with three elements:
=over
=item The first element will be the numeric status code from the status line.
=item The second element will be an L<Array> of L<Pair>s for the headers.
=item The thirs will be a sane Supply that will emit the bytes in the message body.
=back
The headers are provided in the order they are received with any repeats
included as-is. The header names will be set in each key using folded case
(which means lower case as the headers will be decoded using ISO-8859-1).
The header will also include two special fields:
=over
=item The C<::server-protocol> key will be set to the server protocol set in the
status line of the response. This will be either "HTTP/1.0" or "HTTP/1.1".
=item The C<::server-reason-phrase> key will be set to the reason phrase set by
the server in the status line of the response after the numeric code. This will
usually be something like "OK" when the status code is 200, "Not Found" when the
status code is 404, etc.
=back
The parser aims at being very liberal in what it accepts. It is possible for the
headers to contain non-sensical values. So long as the format is syntactically
readable and the frames appear to make sense, the parser will continue emitting
responses as they arrive. Ensuring proper HTTP semantics and connection handling
is left up to the caller.
=head1 DIAGNOSTICS
In certain cases, the parser may caues the Supply to quit with an error.
=head2 X::HTTP::Supply::UnsupportedProtocol
This exception will be thrown if the message reports a server protocol that looks like HTTP, but is not HTTP/1.0 or HTTP/1.1.
=head2 X::HTTP::Supply::BadMessage
If there is any syntax error, this message may be thrown. This may also be thrown on certain obvious semantic errors. Even these, however, are primarily oriented toward sanity checking the syntax of each response frame.
=end pod
method !make-header(@header) {
my %header;
for @header {
if %header{ .key } :exists {
%header{ .key } ~= ',' ~ .value;
}
else {
%header{ .key } = .value;
}
}
%header;
}
method parse-http(Supply:D() $conn, Bool :$debug = False --> Supply:D) {
sub debug(*@msg) {
note "# Response [$*PID] [{now.Rat.fmt("%.5f")}] (#$*THREAD.id()) ", |@msg if $debug
}
supply {
my enum <StatusLine Header Body Close>;
my $expect;
my @res;
my buf8 $acc;
my Supplier $body-sink;
my Promise $left-over;
my sub new-response() {
$expect = StatusLine;
$acc = buf8.new;
$acc ~= .result with $left-over;
$left-over = Nil;
$body-sink = Nil;
@res := [ Nil, [], Nil ];
}
new-response();
whenever $conn -> $chunk {
LAST {
debug("server closed the connection");
if $expect ~~ Body {
$body-sink.done;
}
done;
}
# When expected a header add the chunk to the accumulation buffer.
debug("RECV ", $chunk.perl);
$acc ~= $chunk if $expect != Body;
# Otherwise, the chunk will be handled directly below.
CHUNK_PROCESS: loop {
given $expect {
# Ready to receive the status line
when StatusLine {
# Decode the response line
my $line = crlf-line($acc);
# We don't have a complete line yet
last CHUNK_PROCESS without $line;
debug("STATLINE [$line]");
# Break the line up into parts
my ($http-version, $status-code, $status-message) = $line.split(' ', 3);
# Make sure the status code is numeric and sane-ish
if $status-code !~~ /^ <[1..5]> <[0..9]> <[0..9]> $/ {
die X::HTTP::Supply::BadMessage.new(
reason => 'status code is not numeric or not in the 100-599 range',
);
}
# We got what looks like a status-line, let's check it
# just a bit.
if ($http-version//'') eq none('HTTP/1.0', 'HTTP/1.1') {
# Looks like HTTP/*?
if $http-version.defined && $http-version ~~ /^ 'HTTP/' <[0..9]>+ / {
die X::HTTP::Supply::UnsupportedProtocol.new.throw;
}
# It is other.
else {
die X::HTTP::Supply::BadMessage.new(
reason => 'status line contains garbage',
);
}
}
# Save the status line
@res[0] = $status-code.Int;
@res[1].push: '::server-protocol' => $http-version;
@res[1].push: '::server-reason-phrase' => $status-message;
$expect = Header;
}
# Ready to receive a header line
when Header {
# Decode the next line from the header
my $line = crlf-line($acc);
# We don't have a complete line yet
last CHUNK_PROCESS without $line;
# Empty line signals the end of the header
my %header := self!make-header(@res[1]);
if $line eq '' {
debug("HEADER END");
# Setup the body decoder itself
debug("STATUS ", @res[0]);
debug("HEAD ", @res[1].perl);
my $body-decoder-class = do
if %header<transfer-encoding>.defined
&& %header<transfer-encoding> eq 'chunked' {
HTTP::Supply::Body::ChunkedEncoding
}
elsif %header<content-length>.defined {
HTTP::Supply::Body::ContentLength
}
else {
HTTP::Supply::Body::UntilDone
}
debug("DECODER CLASS ", $body-decoder-class.^name);
# Setup the stream we will send to the P6WAPI response
my $body-stream = Supplier::Preserving.new;
@res[2] = $body-stream.Supply;
# If we expect a body to decode, setup the decoder
if $body-decoder-class ~~ HTTP::Supply::Body {
debug("DECODE BODY");
# Setup the stream we will send to the body decoderk
$body-sink = Supplier::Preserving.new;
# Setup the promise the body decoder can use to
# drop the left-overs
$left-over = Promise.new;
# Construst the decoder and tap the body-sink
my $body-decoder = $body-decoder-class.new(
:$body-stream, :$left-over, :%header,
);
$body-decoder.decode($body-sink.Supply);
# Get the existing chunks and put them into the
# body sink
debug("BODY ", $acc);
$body-sink.emit: $acc;
# Emit the resposne, its processing can begin
# while we continue to receive the body.
debug("EMIT ", @res.perl);
emit @res;
# Is the body decoder done already?
# The request finished and the pipeline is ready
# with another response, so begin again.
if $left-over.status == Kept {
new-response();
next CHUNK_PROCESS;
}
# The response is still going. We need more
# chunks.
else {
$expect = Body;
last CHUNK_PROCESS;
}
}
# No body expected. Emit and move on.
else {
# Emit the completed response
$body-stream.done;
emit @res;
# Setup to read the next response.
new-response();
}
}
# Lines starting with whitespace are folded. Append the
# value to the previous header.
elsif $line.starts-with(' '|"\t") {
debug("CONT HEADER ", $line);
# Folding encountered too early
die X::HTTP::Supply::BadMessage.new(
reason => 'header folding encountered before any header was sent',
) if @res[1].elems == 0;
@res[1][*-1].value ~= $line.trim-leading;
}
# We have received a new header. Save it.
else {
debug("START HEADER ", $line);
# Break the header line by the :
my ($name, $value) = $line.split(": ");
# Setup the name for going into the response
$name .= fc;
# Save the value into the response
@res[1].push: $name => $value;
}
}
# Continue to decode the body.
when Body {
# Send the chunk to the body decoder to continue
# decoding.
debug("BODY ", $chunk);
$body-sink.emit: $chunk;
# The response finished and the pipeline is ready with
# another response, so begin again.
if $left-over.status == Kept {
new-response();
next CHUNK_PROCESS;
}
# The response is still going. We need more chunks.
else {
last CHUNK_PROCESS;
}
}
}
}
}
}
}