Skip to content

Commit 94a763e

Browse files
quaesitor-scientiamRichard Wheelerclaude
authored
net.http: stream response callbacks and stop limits over HTTP/2 (#27369)
* net.http: stream response callbacks and stop limits over HTTP/2 (closes #27368) The HTTP/2 fetch path (#27362) buffered the entire response body, so requests using on_progress / on_progress_body / stop_copying_limit / stop_receiving_limit were forced onto HTTP/1.1. This adds real streaming support so they work on the HTTP/2 path too. - New H2ClientRequest fields: on_data (per-DATA-frame callback) and stop_copying_limit / stop_receiving_limit, mirroring the HTTP/1.1 semantics. - H2Conn.read_response now tracks cumulative body bytes, reads Content-Length if present, fires on_data per DATA frame (including chunk, cumulative body_so_far, content-length, and status), respects stop_copying_limit (caps the response body while still firing callbacks and draining the stream), and respects stop_receiving_limit (breaks the read loop early). - The h2_do shim in backend.c.v adapts the Request's on_progress and on_progress_body into a single H2DataFn closure and threads the two stop limits through. The previous gate (uses_response_streaming) is removed, and the enable_http2 docs note that on_progress fires per DATA payload on h2 rather than per raw network read. Tests over the in-memory transport assert: on_data fires per DATA frame with cumulative body_so_far, Content-Length (when present), and status; stop_copying_limit caps the response body while callbacks keep firing across all chunks; stop_receiving_limit breaks the loop early. Verified end-to-end against https://www.google.com/ — http.fetch(enable_http2: true, on_progress_body: f) reports HTTP/2.0, status 200, and the on_progress_body callback fires once per 16 KiB DATA frame with cumulative bytes matching the final body length. Passes under -W -cstrict -cc clang. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> * net.http: RST_STREAM(CANCEL) and mark H2Conn unusable on early termination When stop_receiving_limit triggered, the response stream was left open without sending RST_STREAM. On a reused H2Conn the peer's in-flight DATA frames for the abandoned stream would still arrive, consuming the connection-level receive window and risking starvation of subsequent requests. Fix: when bailing early, send RST_STREAM with error code CANCEL on the request stream (RFC 7540 Section 8.1.4 / 5.4.2) so the peer stops sending more DATA, and set a new H2Conn.aborted flag so subsequent H2Conn.do() calls return a clear error rather than proceeding on a half-drained connection. Strengthens the stop_receiving_limit test to assert the client emitted RST_STREAM(CANCEL) on the request stream and that a second do() on the same connection errors out. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Richard Wheeler <quaesitor.scientiam@gmail.com> Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 0ff44dc commit 94a763e

7 files changed

Lines changed: 259 additions & 48 deletions

File tree

vlib/net/http/backend.c.v

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,10 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri
2222
eprintln('')
2323
}
2424
// Advertise ALPN `h2` (with an `http/1.1` fallback) only when HTTP/2 is
25-
// requested, so existing callers see no change on the wire. Requests that
26-
// rely on streaming response callbacks or stop limits stay on HTTP/1.1,
27-
// since the HTTP/2 path buffers the full response; the ALPN offer is gated
28-
// here (before negotiation) because once a server selects `h2` we cannot
29-
// fall back to HTTP/1.1 framing on the same connection.
30-
use_h2 := req.enable_http2 && !req.uses_response_streaming()
31-
alpn := if use_h2 { ['h2', 'http/1.1'] } else { []string{} }
25+
// requested, so existing callers see no change on the wire. The HTTP/2 read
26+
// path now feeds the same streaming callbacks and honors the stop limits,
27+
// so they no longer force HTTP/1.1.
28+
alpn := if req.enable_http2 { ['h2', 'http/1.1'] } else { []string{} }
3229
for {
3330
mut ssl_conn := ssl.new_ssl_conn(
3431
verify: req.verify
@@ -53,7 +50,7 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri
5350
}
5451
// If the server negotiated HTTP/2 via ALPN, speak it; otherwise fall
5552
// back to the existing HTTP/1.1 path unchanged.
56-
if use_h2 && ssl_conn.negotiated_alpn() == 'h2' {
53+
if req.enable_http2 && ssl_conn.negotiated_alpn() == 'h2' {
5754
return req.h2_do(mut ssl_conn, method, host_name, port, path, data, header)!
5855
}
5956
return req.do_request(req_headers, mut ssl_conn)!
@@ -63,12 +60,40 @@ fn net_ssl_do(req &Request, port int, method Method, host_name string, path stri
6360

6461
// h2_do runs a single request over an HTTP/2 connection on an already-dialled,
6562
// ALPN-negotiated `h2` TLS socket, and returns the response as a net.http
66-
// Response.
63+
// Response. The request's streaming callbacks (on_progress / on_progress_body)
64+
// and stop limits are adapted onto the H2 chunk hook so they fire per DATA
65+
// frame, matching the HTTP/1.1 streaming semantics as closely as is possible
66+
// on the framed wire (on_progress receives DATA payloads rather than raw
67+
// network reads).
6768
fn (req &Request) h2_do(mut ssl_conn ssl.SSLConn, method Method, host_name string, port int, path string, data string, header Header) !Response {
6869
defer {
6970
ssl_conn.shutdown() or {}
7071
}
71-
h2req := req.to_h2_request(method, h2_authority(host_name, port), path, data, header)
72+
base := req.to_h2_request(method, h2_authority(host_name, port), path, data, header)
73+
on_progress := req.on_progress
74+
on_progress_body := req.on_progress_body
75+
mut on_data := H2DataFn(unsafe { nil })
76+
if on_progress != unsafe { nil } || on_progress_body != unsafe { nil } {
77+
on_data = fn [req, on_progress, on_progress_body] (chunk []u8, body_so_far u64, body_expected u64, status int) ! {
78+
if on_progress != unsafe { nil } {
79+
on_progress(req, chunk, body_so_far)!
80+
}
81+
if on_progress_body != unsafe { nil } {
82+
on_progress_body(req, chunk, body_so_far, body_expected, status)!
83+
}
84+
}
85+
}
86+
h2req := H2ClientRequest{
87+
method: base.method
88+
scheme: base.scheme
89+
authority: base.authority
90+
path: base.path
91+
headers: base.headers
92+
body: base.body
93+
on_data: on_data
94+
stop_copying_limit: req.stop_copying_limit
95+
stop_receiving_limit: req.stop_receiving_limit
96+
}
7297
mut conn := new_h2_conn(ssl_conn)
7398
h2resp := conn.do(h2req)!
7499
if req.on_finish != unsafe { nil } {

vlib/net/http/h2_client.v

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -73,15 +73,6 @@ fn (req &Request) to_h2_request(method Method, authority string, path string, da
7373
}
7474
}
7575

76-
// uses_response_streaming reports whether the request relies on streaming
77-
// response callbacks or stop limits. The HTTP/2 path buffers the full response,
78-
// so such requests must not negotiate HTTP/2 and instead use the HTTP/1.1 path,
79-
// which honors these. (Streaming over HTTP/2 is a planned follow-up.)
80-
fn (req &Request) uses_response_streaming() bool {
81-
return req.on_progress != unsafe { nil } || req.on_progress_body != unsafe { nil }
82-
|| req.stop_copying_limit >= 0 || req.stop_receiving_limit >= 0
83-
}
84-
8576
// h2_response_to_http converts an HTTP/2 response into a net.http Response,
8677
// decoding any Content-Encoding the same way the HTTP/1.1 path does.
8778
fn h2_response_to_http(h2resp H2ClientResponse) Response {

vlib/net/http/h2_client_test.v

Lines changed: 0 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -103,26 +103,3 @@ fn test_to_h2_request_authority_from_host_header() {
103103
assert h2req.authority == 'override.example:8443'
104104
assert !h2req.headers.any(it.name == 'host')
105105
}
106-
107-
fn test_uses_response_streaming() {
108-
assert !(Request{}).uses_response_streaming()
109-
assert (Request{
110-
stop_copying_limit: 0
111-
}).uses_response_streaming()
112-
assert (Request{
113-
stop_receiving_limit: 100
114-
}).uses_response_streaming()
115-
progress := fn (request &Request, chunk []u8, read_so_far u64) ! {}
116-
assert (Request{
117-
on_progress: progress
118-
}).uses_response_streaming()
119-
body_progress := fn (request &Request, chunk []u8, body_read_so_far u64, body_expected_size u64, status_code int) ! {}
120-
assert (Request{
121-
on_progress_body: body_progress
122-
}).uses_response_streaming()
123-
// A non-streaming callback (on_finish) does not force HTTP/1.1.
124-
finish := fn (request &Request, final_size u64) ! {}
125-
assert !(Request{
126-
on_finish: finish
127-
}).uses_response_streaming()
128-
}

vlib/net/http/h2_conn.v

Lines changed: 70 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ mut:
4242
max_header_list_size u32 = max_u32
4343
}
4444

45+
// H2DataFn is called for each DATA frame received on the response stream, with
46+
// the chunk's bytes, the cumulative body bytes received (including this chunk),
47+
// the body length from Content-Length if known (else 0), and the response
48+
// status code.
49+
pub type H2DataFn = fn (chunk []u8, body_so_far u64, body_expected u64, status int) !
50+
4551
// H2ClientRequest describes a single HTTP/2 request. Header names in `headers`
4652
// must be lowercase (RFC 7540 Section 8.1.2); the pseudo-headers are filled in
4753
// from the other fields.
@@ -53,6 +59,20 @@ pub:
5359
path string = '/'
5460
headers []H2HeaderField
5561
body []u8
62+
// Optional response chunk callback, called after each DATA frame's payload
63+
// is received. The arguments are the chunk bytes (not yet copied into the
64+
// response body), the cumulative body bytes received so far (including this
65+
// chunk), the body length from Content-Length (0 when not present), and the
66+
// response status code.
67+
on_data H2DataFn = unsafe { nil }
68+
// stop_copying_limit, when >= 0, caps the cumulative body bytes copied into
69+
// the response body; further DATA chunks are dropped but the callback keeps
70+
// firing and the stream is drained to completion.
71+
stop_copying_limit i64 = -1
72+
// stop_receiving_limit, when >= 0, causes the response read loop to break
73+
// once that many body bytes have been received. The callback fires for the
74+
// final chunk; no further callbacks fire after that.
75+
stop_receiving_limit i64 = -1
5676
}
5777

5878
// H2ClientResponse is the result of an HTTP/2 request.
@@ -78,6 +98,12 @@ mut:
7898
stream_send_window i64 // send window for cur_stream_id
7999
handshaked bool
80100
goaway bool
101+
// aborted is set when this connection terminated a stream early
102+
// (RST_STREAM sent without draining the remaining DATA). Subsequent
103+
// requests on the same connection must fail rather than risk being starved
104+
// by leftover DATA frames the peer had already sent for the cancelled
105+
// stream.
106+
aborted bool
81107
}
82108

83109
// new_h2_conn creates a client connection over `transport`. The HTTP/2
@@ -94,6 +120,9 @@ pub fn (mut c H2Conn) do(req H2ClientRequest) !H2ClientResponse {
94120
if c.goaway {
95121
return error('h2: connection is shutting down (GOAWAY)')
96122
}
123+
if c.aborted {
124+
return error('h2: connection is no longer usable after an early stream termination')
125+
}
97126
stream_id := c.next_stream_id
98127
c.next_stream_id += 2
99128
c.cur_stream_id = stream_id
@@ -114,7 +143,7 @@ pub fn (mut c H2Conn) do(req H2ClientRequest) !H2ClientResponse {
114143
if has_body {
115144
c.send_body(stream_id, req.body)!
116145
}
117-
return c.read_response(stream_id)!
146+
return c.read_response(stream_id, req)!
118147
}
119148

120149
fn (mut c H2Conn) handshake() ! {
@@ -135,10 +164,14 @@ fn (mut c H2Conn) handshake() ! {
135164
}
136165

137166
// read_response reads frames until `stream_id` is closed, returning its
138-
// response and servicing connection-level frames along the way.
139-
fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse {
167+
// response and servicing connection-level frames along the way. The streaming
168+
// options on `req` (on_data callback and the two stop limits) are honored
169+
// while reading DATA frames, matching the HTTP/1.1 streaming semantics.
170+
fn (mut c H2Conn) read_response(stream_id u32, req H2ClientRequest) !H2ClientResponse {
140171
mut resp := H2ClientResponse{}
141172
mut got_headers := false
173+
mut body_so_far := u64(0)
174+
mut body_expected := u64(0)
142175
for {
143176
frame := c.next_frame()!
144177
if c.handle_conn_frame(frame)! {
@@ -155,6 +188,9 @@ fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse {
155188
resp.status = f.value.int()
156189
} else if !f.name.starts_with(':') {
157190
resp.headers << f
191+
if f.name == 'content-length' {
192+
body_expected = f.value.u64()
193+
}
158194
}
159195
}
160196
got_headers = true
@@ -166,15 +202,45 @@ fn (mut c H2Conn) read_response(stream_id u32) !H2ClientResponse {
166202
if frame.stream_id != stream_id {
167203
continue
168204
}
169-
resp.body << frame.data
170205
if frame.data.len > 0 {
206+
body_so_far += u64(frame.data.len)
207+
// Append the chunk to the response body unless the copy
208+
// limit has been reached; the callback still fires.
209+
if req.stop_copying_limit < 0
210+
|| i64(body_so_far) - i64(frame.data.len) < req.stop_copying_limit {
211+
if req.stop_copying_limit >= 0 && i64(body_so_far) > req.stop_copying_limit {
212+
remaining := req.stop_copying_limit - (i64(body_so_far) - i64(frame.data.len))
213+
if remaining > 0 {
214+
resp.body << frame.data[..int(remaining)]
215+
}
216+
} else {
217+
resp.body << frame.data
218+
}
219+
}
220+
if req.on_data != unsafe { nil } {
221+
req.on_data(frame.data, body_so_far, body_expected, resp.status)!
222+
}
171223
// Replenish flow control so the peer keeps sending.
172224
c.send_window_update(0, u32(frame.data.len))!
173225
c.send_window_update(stream_id, u32(frame.data.len))!
174226
}
175227
if frame.end_stream {
176228
break
177229
}
230+
if req.stop_receiving_limit >= 0 && i64(body_so_far) >= req.stop_receiving_limit {
231+
// Cancel the stream (RFC 7540 Section 8.1.4 / 5.4.2) so the
232+
// peer stops sending more DATA, and mark the connection
233+
// unusable: in-flight DATA frames that the peer has already
234+
// sent for this stream would otherwise consume the
235+
// connection-level receive window and block subsequent
236+
// requests on the same H2Conn.
237+
c.send_frame(H2RstStreamFrame{
238+
stream_id: stream_id
239+
error_code: u32(H2ErrorCode.cancel)
240+
})!
241+
c.aborted = true
242+
break
243+
}
178244
}
179245
H2RstStreamFrame {
180246
if frame.stream_id == stream_id {

0 commit comments

Comments
 (0)