diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 92c8c84228e..d0b841e2a01 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -77,6 +77,10 @@ class RequestHandler extends AsyncResource { this.context = context } + onStart (controller) { + this.controller = controller + } + onHeaders (statusCode, rawHeaders, resume, statusMessage) { const { callback, opaque, abort, context, responseHeaders, highWaterMark } = this @@ -92,7 +96,7 @@ class RequestHandler extends AsyncResource { const parsedHeaders = responseHeaders === 'raw' ? util.parseHeaders(rawHeaders) : headers const contentType = parsedHeaders['content-type'] const contentLength = parsedHeaders['content-length'] - const body = new Readable({ resume, abort, contentType, contentLength, highWaterMark }) + const body = new Readable({ resume: this.controller?.resume ?? resume, abort, contentType, contentLength, highWaterMark }) this.callback = null this.res = body @@ -115,8 +119,10 @@ class RequestHandler extends AsyncResource { } onData (chunk) { - const { res } = this - return res.push(chunk) + const { res, controller } = this + if (!res.push(chunk)) { + controller.pause() + } } onComplete (trailers) { diff --git a/lib/core/request.js b/lib/core/request.js index d44a2bcbc7e..6c3eabd8dbc 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -216,6 +216,7 @@ class Request { } onConnect (abort) { + assert(abort) assert(!this.aborted) assert(!this.completed) @@ -227,7 +228,11 @@ class Request { } } - onStart (resume) { + onStart (controller) { + assert(controller) + assert(!this.aborted) + assert(!this.completed) + try { let pause = false @@ -237,7 +242,7 @@ class Request { } if (this[kHandler].onStart) { - pause ||= this[kHandler].onStart(resume) === false + pause ||= this[kHandler].onStart(controller) === false } return !pause diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index b42543efbf0..d5bde0770b0 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -148,6 +148,30 @@ const TIMEOUT_HEADERS = 1 const TIMEOUT_BODY = 2 const TIMEOUT_IDLE = 3 +class Controller { + #parser + #paused = false + + constructor (parser) { + this.#parser = parser + this.resume = this.resume.bind(this) + this.pause = this.pause.bind(this) + } + + pause () { + this.#paused = true + } + + resume () { + this.#paused = false + this.#parser.resume() + } + + get paused () { + return this.#paused + } +} + class Parser { constructor (client, socket, { exports }) { assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0) @@ -169,6 +193,8 @@ class Parser { this.paused = false this.resume = this.resume.bind(this) + this.controller = new Controller(this) + this.bytesRead = 0 this.keepAlive = '' @@ -326,7 +352,11 @@ class Parser { return -1 } - return request.onStart(this.resume) === false ? constants.ERROR.PAUSED : 0 + if (request.onStart(this.controller) === false) { + this.controller.pause() + } + + return this.controller.paused ? constants.ERROR.PAUSED : 0 } onHeaderField (buf) { @@ -505,7 +535,9 @@ class Parser { socket[kReset] = true } - const pause = request.onHeaders(statusCode, headers, this.resume, statusText) === false + if (request.onHeaders(statusCode, headers, this.controller.resume, statusText) === false) { + this.controller.pause() + } if (request.aborted) { return -1 @@ -524,7 +556,7 @@ class Parser { client[kResume]() } - return pause ? constants.ERROR.PAUSED : 0 + return this.controller.paused ? constants.ERROR.PAUSED : 0 } onBody (buf) { @@ -555,8 +587,10 @@ class Parser { this.bytesRead += buf.length if (request.onData(buf) === false) { - return constants.ERROR.PAUSED + this.controller.pause() } + + return this.controller.paused ? constants.ERROR.PAUSED : 0 } onMessageComplete () { diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 5ee20bcb2fe..c4a785b62d4 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -42,6 +42,31 @@ try { http2 = { constants: {} } } +class Controller { + #stream + #paused = false + + constructor (stream) { + this.#stream = stream + this.resume = this.resume.bind(this) + this.pause = this.pause.bind(this) + } + + pause () { + this.#paused = true + this.#stream.pause() + } + + resume () { + this.#paused = false + this.#stream.resume() + } + + get paused () { + return this.#paused + } +} + const { constants: { HTTP2_HEADER_AUTHORITY, @@ -392,7 +417,7 @@ function writeH2 (client, request) { const resume = stream.resume.bind(stream) - if (request.onStart(resume) === false) { + if (request.onStart(new Controller(stream)) === false) { stream.pause() } diff --git a/lib/handler/decorator-handler.js b/lib/handler/decorator-handler.js index 9d70a767f1e..433c64aa6d8 100644 --- a/lib/handler/decorator-handler.js +++ b/lib/handler/decorator-handler.js @@ -6,30 +6,34 @@ module.exports = class DecoratorHandler { } onConnect (...args) { - return this.handler.onConnect(...args) + return this.handler.onConnect?.(...args) + } + + onStart (...args) { + return this.handler.onStart?.(...args) } onError (...args) { - return this.handler.onError(...args) + return this.handler.onError?.(...args) } onUpgrade (...args) { - return this.handler.onUpgrade(...args) + return this.handler.onUpgrade?.(...args) } onHeaders (...args) { - return this.handler.onHeaders(...args) + return this.handler.onHeaders?.(...args) } onData (...args) { - return this.handler.onData(...args) + return this.handler.onData?.(...args) } onComplete (...args) { - return this.handler.onComplete(...args) + return this.handler.onComplete?.(...args) } onBodySent (...args) { - return this.handler.onBodySent(...args) + return this.handler.onBodySent?.(...args) } } diff --git a/lib/handler/redirect-handler.js b/lib/handler/redirect-handler.js index 368ef520d76..d1d3b6191f1 100644 --- a/lib/handler/redirect-handler.js +++ b/lib/handler/redirect-handler.js @@ -76,15 +76,19 @@ class RedirectHandler { onConnect (abort) { this.abort = abort - this.handler.onConnect(abort, { history: this.history }) + this.handler.onConnect?.(abort, { history: this.history }) + } + + onStart (controller) { + this.handler.onStart?.(controller) } onUpgrade (statusCode, headers, socket) { - this.handler.onUpgrade(statusCode, headers, socket) + this.handler.onUpgrade?.(statusCode, headers, socket) } onError (error) { - this.handler.onError(error) + this.handler.onError?.(error) } onHeaders (statusCode, headers, resume, statusText) { @@ -107,7 +111,7 @@ class RedirectHandler { } if (!this.location) { - return this.handler.onHeaders(statusCode, headers, resume, statusText) + return this.handler.onHeaders?.(statusCode, headers, resume, statusText) } const { origin, pathname, search } = util.parseURL(new URL(this.location, this.opts.origin && new URL(this.opts.path, this.opts.origin))) @@ -150,7 +154,7 @@ class RedirectHandler { servers and browsers implementors, we ignore the body as there is no specified way to eventually parse it. */ } else { - return this.handler.onData(chunk) + return this.handler.onData?.(chunk) } } @@ -170,7 +174,7 @@ class RedirectHandler { this.dispatch(this.opts, this) } else { - this.handler.onComplete(trailers) + this.handler.onComplete?.(trailers) } } diff --git a/lib/handler/retry-handler.js b/lib/handler/retry-handler.js index 2258801ba58..ac4faf14b42 100644 --- a/lib/handler/retry-handler.js +++ b/lib/handler/retry-handler.js @@ -97,6 +97,10 @@ class RetryHandler { } } + onStart (controller) { + this.handler.onStart?.(controller) + } + onBodySent (chunk) { if (this.handler.onBodySent) return this.handler.onBodySent(chunk) } diff --git a/lib/mock/mock-utils.js b/lib/mock/mock-utils.js index c8c0ed1eef1..33525e25697 100644 --- a/lib/mock/mock-utils.js +++ b/lib/mock/mock-utils.js @@ -285,14 +285,32 @@ function mockDispatch (opts, handler) { const responseHeaders = generateKeyValues(headers) const responseTrailers = generateKeyValues(trailers) + const controller = new Controller() handler.abort = nop - handler.onHeaders(statusCode, responseHeaders, resume, getStatusText(statusCode)) + handler.onStart?.(controller) + handler.onHeaders(statusCode, responseHeaders, controller.resume, getStatusText(statusCode)) handler.onData(Buffer.from(responseData)) handler.onComplete(responseTrailers) deleteMockDispatch(mockDispatches, key) } - function resume () {} + class Controller { + #paused = false + + resume () { + this.#paused = false + this.resume = this.resume.bind(this) + this.pause = this.pause.bind(this) + } + + pause () { + this.#paused = true + } + + get paused () { + return this.#paused + } + } return true } diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index 6d29cc635da..fc617734e63 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -881,8 +881,9 @@ test('dispatches in expected order', async (t) => { onBodySent () { dispatches.push('onBodySent') }, - onStart (resume) { - assert(typeof resume === 'function') + onStart (controller) { + assert(typeof controller.resume === 'function') + assert(typeof controller.pause === 'function') dispatches.push('onStart') }, onResponseStarted () { @@ -942,8 +943,9 @@ test('dispatches in expected order for http2', async (t) => { onBodySent () { dispatches.push('onBodySent') }, - onStart (resume) { - assert(typeof resume === 'function') + onStart (controller) { + assert(typeof controller.resume === 'function') + assert(typeof controller.pause === 'function') dispatches.push('onStart') }, onResponseStarted () { diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 7665ae32841..675dae273de 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -215,9 +215,18 @@ declare namespace Dispatcher { context: object; } export type StreamFactory = (data: StreamFactoryData) => Writable; + + export interface Controller { + pause (): void; + resume (): void; + readonly paused: boolean; + } + export interface DispatchHandlers { /** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */ onConnect?(abort: () => void): void; + /** */ + onStart?(controller: Controller): void; /** Invoked when an error has occurred. */ onError?(err: Error): void; /** Invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. */