diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 495fa60ba39ce3..be5d08a0b0a1f5 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -206,6 +206,7 @@ const STREAM_FLAGS_CLOSED = 0x2; const STREAM_FLAGS_HEADERS_SENT = 0x4; const STREAM_FLAGS_HEAD_REQUEST = 0x8; const STREAM_FLAGS_ABORTED = 0x10; +const STREAM_FLAGS_HAS_TRAILERS = 0x20; const SESSION_FLAGS_PENDING = 0x0; const SESSION_FLAGS_READY = 0x1; @@ -330,26 +331,13 @@ function onStreamClose(code) { if (stream.destroyed) return; - const state = stream[kState]; - debug(`Http2Stream ${stream[kID]} [Http2Session ` + `${sessionName(stream[kSession][kType])}]: closed with code ${code}`); - if (!stream.closed) { - // Clear timeout and remove timeout listeners - stream.setTimeout(0); - stream.removeAllListeners('timeout'); + if (!stream.closed) + closeStream(stream, code, false); - // Set the state flags - state.flags |= STREAM_FLAGS_CLOSED; - state.rstCode = code; - - // Close the writable side of the stream - abort(stream); - stream.end(); - } - - state.fd = -1; + stream[kState].fd = -1; // Defer destroy we actually emit end. if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. @@ -504,7 +492,7 @@ function requestOnConnect(headers, options) { // At this point, the stream should have already been destroyed during // the session.destroy() method. Do nothing else. - if (session.destroyed) + if (session === undefined || session.destroyed) return; // If the session was closed while waiting for the connect, destroy @@ -1412,6 +1400,9 @@ class ClientHttp2Session extends Http2Session { if (options.endStream) stream.end(); + if (options.waitForTrailers) + stream[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + const onConnect = requestOnConnect.bind(stream, headersList, options); if (this.connecting) { this.on('connect', onConnect); @@ -1445,8 +1436,11 @@ function afterDoStreamWrite(status, handle) { } function streamOnResume() { - if (!this.destroyed && !this.pending) + if (!this.destroyed && !this.pending) { + if (!this[kState].didRead) + this[kState].didRead = true; this[kHandle].readStart(); + } } function streamOnPause() { @@ -1454,16 +1448,6 @@ function streamOnPause() { this[kHandle].readStop(); } -// If the writable side of the Http2Stream is still open, emit the -// 'aborted' event and set the aborted flag. -function abort(stream) { - if (!stream.aborted && - !(stream._writableState.ended || stream._writableState.ending)) { - stream[kState].flags |= STREAM_FLAGS_ABORTED; - stream.emit('aborted'); - } -} - function afterShutdown() { this.callback(); const stream = this.handle[kOwner]; @@ -1471,6 +1455,51 @@ function afterShutdown() { stream[kMaybeDestroy](); } +function closeStream(stream, code, shouldSubmitRstStream = true) { + const state = stream[kState]; + state.flags |= STREAM_FLAGS_CLOSED; + state.rstCode = code; + + // Clear timeout and remove timeout listeners + stream.setTimeout(0); + stream.removeAllListeners('timeout'); + + const { ending, finished } = stream._writableState; + + if (!ending) { + // If the writable side of the Http2Stream is still open, emit the + // 'aborted' event and set the aborted flag. + if (!stream.aborted) { + state.flags |= STREAM_FLAGS_ABORTED; + stream.emit('aborted'); + } + + // Close the writable side. + stream.end(); + } + + if (shouldSubmitRstStream) { + const finishFn = finishCloseStream.bind(stream, code); + if (!ending || finished || code !== NGHTTP2_NO_ERROR) + finishFn(); + else + stream.once('finish', finishFn); + } +} + +function finishCloseStream(code) { + const rstStreamFn = submitRstStream.bind(this, code); + // If the handle has not yet been assigned, queue up the request to + // ensure that the RST_STREAM frame is sent after the stream ID has + // been determined. + if (this.pending) { + this.push(null); + this.once('ready', rstStreamFn); + return; + } + rstStreamFn(); +} + // An Http2Stream is a Duplex stream that is backed by a // node::http2::Http2Stream handle implementing StreamBase. class Http2Stream extends Duplex { @@ -1490,6 +1519,7 @@ class Http2Stream extends Duplex { this[kTimeout] = null; this[kState] = { + didRead: false, flags: STREAM_FLAGS_PENDING, rstCode: NGHTTP2_NO_ERROR, writeQueueSize: 0, @@ -1756,6 +1786,8 @@ class Http2Stream extends Duplex { throw headersList; this[kSentTrailers] = headers; + this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; + const ret = this[kHandle].trailers(headersList); if (ret < 0) this.destroy(new NghttpError(ret)); @@ -1786,38 +1818,13 @@ class Http2Stream extends Duplex { if (callback !== undefined && typeof callback !== 'function') throw new ERR_INVALID_CALLBACK(); - // Clear timeout and remove timeout listeners - this.setTimeout(0); - this.removeAllListeners('timeout'); - - // Close the writable - abort(this); - this.end(); - if (this.closed) return; - const state = this[kState]; - state.flags |= STREAM_FLAGS_CLOSED; - state.rstCode = code; - - if (callback !== undefined) { + if (callback !== undefined) this.once('close', callback); - } - - if (this[kHandle] === undefined) - return; - const rstStreamFn = submitRstStream.bind(this, code); - // If the handle has not yet been assigned, queue up the request to - // ensure that the RST_STREAM frame is sent after the stream ID has - // been determined. - if (this.pending) { - this.push(null); - this.once('ready', rstStreamFn); - return; - } - rstStreamFn(); + closeStream(this, code); } // Called by this.destroy(). @@ -1832,26 +1839,19 @@ class Http2Stream extends Duplex { debug(`Http2Stream ${this[kID] || ''} [Http2Session ` + `${sessionName(session[kType])}]: destroying stream`); const state = this[kState]; - const code = state.rstCode = - err != null ? - NGHTTP2_INTERNAL_ERROR : - state.rstCode || NGHTTP2_NO_ERROR; - if (handle !== undefined) { - // If the handle exists, we need to close, then destroy the handle - this.close(code); - if (!this._readableState.ended && !this._readableState.ending) - this.push(null); + const code = err != null ? + NGHTTP2_INTERNAL_ERROR : (state.rstCode || NGHTTP2_NO_ERROR); + + const hasHandle = handle !== undefined; + + if (!this.closed) + closeStream(this, code, hasHandle); + this.push(null); + + if (hasHandle) { handle.destroy(); session[kState].streams.delete(id); } else { - // Clear timeout and remove timeout listeners - this.setTimeout(0); - this.removeAllListeners('timeout'); - - state.flags |= STREAM_FLAGS_CLOSED; - abort(this); - this.end(); - this.push(null); session[kState].pendingStreams.delete(this); } @@ -1884,13 +1884,23 @@ class Http2Stream extends Duplex { } // TODO(mcollina): remove usage of _*State properties - if (this._readableState.ended && - this._writableState.ended && - this._writableState.pendingcb === 0 && - this.closed) { - this.destroy(); - // This should return, but eslint complains. - // return + if (this._writableState.ended && this._writableState.pendingcb === 0) { + if (this._readableState.ended && this.closed) { + this.destroy(); + return; + } + + // We've submitted a response from our server session, have not attempted + // to process any incoming data, and have no trailers. This means we can + // attempt to gracefully close the session. + const state = this[kState]; + if (this.headersSent && + this[kSession][kType] === NGHTTP2_SESSION_SERVER && + !(state.flags & STREAM_FLAGS_HAS_TRAILERS) && + !state.didRead && + !this._readableState.resumeScheduled) { + this.close(); + } } } } @@ -2095,7 +2105,6 @@ function afterOpen(session, options, headers, streamOptions, err, fd) { } if (this.destroyed || this.closed) { tryClose(fd); - abort(this); return; } state.fd = fd; @@ -2224,8 +2233,10 @@ class ServerHttp2Stream extends Http2Stream { if (options.endStream) streamOptions |= STREAM_OPTION_EMPTY_PAYLOAD; - if (options.waitForTrailers) + if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; + state.flags |= STREAM_FLAGS_HAS_TRAILERS; + } headers = processHeaders(headers); const statusCode = headers[HTTP2_HEADER_STATUS] |= 0; @@ -2285,8 +2296,10 @@ class ServerHttp2Stream extends Http2Stream { } let streamOptions = 0; - if (options.waitForTrailers) + if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; + this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + } if (typeof fd !== 'number') throw new ERR_INVALID_ARG_TYPE('fd', 'number', fd); @@ -2346,8 +2359,10 @@ class ServerHttp2Stream extends Http2Stream { } let streamOptions = 0; - if (options.waitForTrailers) + if (options.waitForTrailers) { streamOptions |= STREAM_OPTION_GET_TRAILERS; + this[kState].flags |= STREAM_FLAGS_HAS_TRAILERS; + } const session = this[kSession]; debug(`Http2Stream ${this[kID]} [Http2Session ` + diff --git a/src/node_http2.cc b/src/node_http2.cc index 1c5c68f09471f3..05d9243ee30ec0 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1364,16 +1364,35 @@ void Http2Session::MaybeScheduleWrite() { // storage for data and metadata that was associated with these writes. void Http2Session::ClearOutgoing(int status) { CHECK_NE(flags_ & SESSION_STATE_SENDING, 0); - flags_ &= ~SESSION_STATE_SENDING; - for (const nghttp2_stream_write& wr : outgoing_buffers_) { - WriteWrap* wrap = wr.req_wrap; - if (wrap != nullptr) - wrap->Done(status); + if (outgoing_buffers_.size() > 0) { + outgoing_storage_.clear(); + + for (const nghttp2_stream_write& wr : outgoing_buffers_) { + WriteWrap* wrap = wr.req_wrap; + if (wrap != nullptr) + wrap->Done(status); + } + + outgoing_buffers_.clear(); } - outgoing_buffers_.clear(); - outgoing_storage_.clear(); + flags_ &= ~SESSION_STATE_SENDING; + + // Now that we've finished sending queued data, if there are any pending + // RstStreams we should try sending again and then flush them one by one. + if (pending_rst_streams_.size() > 0) { + std::vector current_pending_rst_streams; + pending_rst_streams_.swap(current_pending_rst_streams); + + SendPendingData(); + + for (int32_t stream_id : current_pending_rst_streams) { + Http2Stream* stream = FindStream(stream_id); + if (stream != nullptr) + stream->FlushRstStream(); + } + } } // Queue a given block of data for sending. This always creates a copy, @@ -1397,18 +1416,19 @@ void Http2Session::CopyDataIntoOutgoing(const uint8_t* src, size_t src_length) { // chunk out to the i/o socket to be sent. This is a particularly hot method // that will generally be called at least twice be event loop iteration. // This is a potential performance optimization target later. -void Http2Session::SendPendingData() { +// Returns non-zero value if a write is already in progress. +uint8_t Http2Session::SendPendingData() { DEBUG_HTTP2SESSION(this, "sending pending data"); // Do not attempt to send data on the socket if the destroying flag has // been set. That means everything is shutting down and the socket // will not be usable. if (IsDestroyed()) - return; + return 0; flags_ &= ~SESSION_STATE_WRITE_SCHEDULED; // SendPendingData should not be called recursively. if (flags_ & SESSION_STATE_SENDING) - return; + return 1; // This is cleared by ClearOutgoing(). flags_ |= SESSION_STATE_SENDING; @@ -1432,15 +1452,15 @@ void Http2Session::SendPendingData() { // does take care of things like closing the individual streams after // a socket has been torn down, so we still need to call it. ClearOutgoing(UV_ECANCELED); - return; + return 0; } // Part Two: Pass Data to the underlying stream size_t count = outgoing_buffers_.size(); if (count == 0) { - flags_ &= ~SESSION_STATE_SENDING; - return; + ClearOutgoing(0); + return 0; } MaybeStackBuffer bufs; bufs.AllocateSufficientStorage(count); @@ -1471,6 +1491,8 @@ void Http2Session::SendPendingData() { DEBUG_HTTP2SESSION2(this, "wants data in return? %d", nghttp2_session_want_read(session_)); + + return 0; } @@ -1830,12 +1852,25 @@ int Http2Stream::SubmitPriority(nghttp2_priority_spec* prispec, // peer. void Http2Stream::SubmitRstStream(const uint32_t code) { CHECK(!this->IsDestroyed()); + code_ = code; + // If possible, force a purge of any currently pending data here to make sure + // it is sent before closing the stream. If it returns non-zero then we need + // to wait until the current write finishes and try again to avoid nghttp2 + // behaviour where it prioritizes RstStream over everything else. + if (session_->SendPendingData() != 0) { + session_->AddPendingRstStream(id_); + return; + } + + FlushRstStream(); +} + +void Http2Stream::FlushRstStream() { + if (IsDestroyed()) + return; Http2Scope h2scope(this); - // Force a purge of any currently pending data here to make sure - // it is sent before closing the stream. - session_->SendPendingData(); CHECK_EQ(nghttp2_submit_rst_stream(**session_, NGHTTP2_FLAG_NONE, - id_, code), 0); + id_, code_), 0); } diff --git a/src/node_http2.h b/src/node_http2.h index 87c929cdea12f9..da404841450f27 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -591,6 +591,8 @@ class Http2Stream : public AsyncWrap, // Submits an RST_STREAM frame using the given code void SubmitRstStream(const uint32_t code); + void FlushRstStream(); + // Submits a PUSH_PROMISE frame with this stream as the parent. Http2Stream* SubmitPushPromise( nghttp2_nv* nva, @@ -797,7 +799,7 @@ class Http2Session : public AsyncWrap, public StreamListener { bool Ping(v8::Local function); - void SendPendingData(); + uint8_t SendPendingData(); // Submits a new request. If the request is a success, assigned // will be a pointer to the Http2Stream instance assigned. @@ -845,6 +847,11 @@ class Http2Session : public AsyncWrap, public StreamListener { size_t self_size() const override { return sizeof(*this); } + // Schedule an RstStream for after the current write finishes. + inline void AddPendingRstStream(int32_t stream_id) { + pending_rst_streams_.emplace_back(stream_id); + } + // Handle reads/writes from the underlying network transport. void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override; void OnStreamAfterWrite(WriteWrap* w, int status) override; @@ -1049,6 +1056,7 @@ class Http2Session : public AsyncWrap, public StreamListener { std::vector outgoing_buffers_; std::vector outgoing_storage_; + std::vector pending_rst_streams_; void CopyDataIntoOutgoing(const uint8_t* src, size_t src_length); void ClearOutgoing(int status); diff --git a/test/fixtures/person-large.jpg b/test/fixtures/person-large.jpg new file mode 100644 index 00000000000000..3d0d0af42375c3 Binary files /dev/null and b/test/fixtures/person-large.jpg differ diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index 09ca011c736c96..e641335e751287 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -124,3 +124,21 @@ const Countdown = require('../common/countdown'); req.on('error', () => {}); })); } + +// test destroy before connect +{ + const server = h2.createServer(); + server.on('stream', common.mustNotCall()); + + server.listen(0, common.mustCall(() => { + const client = h2.connect(`http://localhost:${server.address().port}`); + + server.on('connection', common.mustCall(() => { + server.close(); + client.close(); + })); + + const req = client.request(); + req.destroy(); + })); +} diff --git a/test/parallel/test-http2-client-rststream-before-connect.js b/test/parallel/test-http2-client-rststream-before-connect.js index 7bace941d1a0af..33e22130aad19b 100644 --- a/test/parallel/test-http2-client-rststream-before-connect.js +++ b/test/parallel/test-http2-client-rststream-before-connect.js @@ -63,8 +63,14 @@ server.listen(0, common.mustCall(() => { message: 'Stream closed with error code NGHTTP2_PROTOCOL_ERROR' })); - req.on('response', common.mustCall()); - req.resume(); + // The `response` event should not fire as the server should receive the + // RST_STREAM frame before it ever has a chance to reply. + req.on('response', common.mustNotCall()); + + // The `end` event should still fire as we close the readable stream by + // pushing a `null` chunk. req.on('end', common.mustCall()); + + req.resume(); req.end(); })); diff --git a/test/parallel/test-http2-client-upload-reject.js b/test/parallel/test-http2-client-upload-reject.js new file mode 100644 index 00000000000000..ece7cbdf233f1f --- /dev/null +++ b/test/parallel/test-http2-client-upload-reject.js @@ -0,0 +1,48 @@ +'use strict'; + +// Verifies that uploading data from a client works + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const http2 = require('http2'); +const fs = require('fs'); +const fixtures = require('../common/fixtures'); + +const loc = fixtures.path('person-large.jpg'); + +assert(fs.existsSync(loc)); + +fs.readFile(loc, common.mustCall((err, data) => { + assert.ifError(err); + + const server = http2.createServer(); + + server.on('stream', common.mustCall((stream) => { + stream.on('close', common.mustCall(() => { + assert.strictEqual(stream.rstCode, 0); + })); + + stream.respond({ ':status': 400 }); + stream.end(); + })); + + server.listen(0, common.mustCall(() => { + const client = http2.connect(`http://localhost:${server.address().port}`); + + const req = client.request({ ':method': 'POST' }); + req.on('response', common.mustCall((headers) => { + assert.strictEqual(headers[':status'], 400); + })); + + req.resume(); + req.on('end', common.mustCall(() => { + server.close(); + client.close(); + })); + + const str = fs.createReadStream(loc); + str.pipe(req); + })); +})); diff --git a/test/parallel/test-http2-client-upload.js b/test/parallel/test-http2-client-upload.js index 70a8ff3ced01c6..78c6d47cbb4f44 100644 --- a/test/parallel/test-http2-client-upload.js +++ b/test/parallel/test-http2-client-upload.js @@ -11,7 +11,7 @@ const fs = require('fs'); const fixtures = require('../common/fixtures'); const Countdown = require('../common/countdown'); -const loc = fixtures.path('person.jpg'); +const loc = fixtures.path('person-large.jpg'); let fileData; assert(fs.existsSync(loc)); diff --git a/test/parallel/test-http2-large-write-close.js b/test/parallel/test-http2-large-write-close.js new file mode 100644 index 00000000000000..f9dee357d6da7b --- /dev/null +++ b/test/parallel/test-http2-large-write-close.js @@ -0,0 +1,44 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const assert = require('assert'); +const fixtures = require('../common/fixtures'); +const http2 = require('http2'); + +const content = Buffer.alloc(1e5, 0x44); + +const server = http2.createSecureServer({ + key: fixtures.readKey('agent1-key.pem'), + cert: fixtures.readKey('agent1-cert.pem') +}); +server.on('stream', common.mustCall((stream) => { + stream.respond({ + 'Content-Type': 'application/octet-stream', + 'Content-Length': (content.length.toString() * 2), + 'Vary': 'Accept-Encoding' + }); + + stream.write(content); + stream.write(content); + stream.end(); + stream.close(); +})); + +server.listen(0, common.mustCall(() => { + const client = http2.connect(`https://localhost:${server.address().port}`, + { rejectUnauthorized: false }); + + const req = client.request({ ':path': '/' }); + req.end(); + + let receivedBufferLength = 0; + req.on('data', common.mustCallAtLeast((buf) => { + receivedBufferLength += buf.length; + }, 1)); + req.on('close', common.mustCall(() => { + assert.strictEqual(receivedBufferLength, content.length * 2); + client.close(); + server.close(); + })); +})); diff --git a/test/parallel/test-http2-perf_hooks.js b/test/parallel/test-http2-perf_hooks.js index 07d9c55ed7e0d2..5dd8ad0f6d883b 100644 --- a/test/parallel/test-http2-perf_hooks.js +++ b/test/parallel/test-http2-perf_hooks.js @@ -26,7 +26,7 @@ const obs = new PerformanceObserver(common.mustCall((items) => { switch (entry.type) { case 'server': assert.strictEqual(entry.streamCount, 1); - assert.strictEqual(entry.framesReceived, 5); + assert(entry.framesReceived >= 3); break; case 'client': assert.strictEqual(entry.streamCount, 1);