diff --git a/lib/api/abort-signal.js b/lib/api/abort-signal.js index f65116ed52c..0fa8f2ce1d7 100644 --- a/lib/api/abort-signal.js +++ b/lib/api/abort-signal.js @@ -10,11 +10,14 @@ function abort (self) { } else if (self.controller?.abort) { self.controller.abort(self[kSignal]?.reason) } else { - self.onError(self[kSignal]?.reason ?? new RequestAbortedError()) + self.reason = self[kSignal]?.reason ?? new RequestAbortedError() } + removeSignal(self) } function addSignal (self, signal) { + self.reason = null + self[kSignal] = null self[kListener] = null diff --git a/lib/api/api-connect.js b/lib/api/api-connect.js index d19e67f69a2..11137771d03 100644 --- a/lib/api/api-connect.js +++ b/lib/api/api-connect.js @@ -1,7 +1,8 @@ 'use strict' +const assert = require('node:assert') const { AsyncResource } = require('node:async_hooks') -const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors') +const { InvalidArgumentError, SocketError } = require('../core/errors') const util = require('../core/util') const { addSignal, removeSignal } = require('./abort-signal') @@ -26,17 +27,20 @@ class ConnectHandler extends AsyncResource { this.opaque = opaque || null this.responseHeaders = responseHeaders || null this.callback = callback - this.abort = null + this.controller = null addSignal(this, signal) } - onConnect (abort, context) { - if (!this.callback) { - throw new RequestAbortedError() + onConnect (controller, context) { + if (this.reason) { + controller.abort(this.reason) + return } - this.abort = abort + assert(this.callback) + + this.controller = controller this.context = context } diff --git a/lib/api/api-pipeline.js b/lib/api/api-pipeline.js index f5112415bf7..b058b456254 100644 --- a/lib/api/api-pipeline.js +++ b/lib/api/api-pipeline.js @@ -88,7 +88,7 @@ class PipelineHandler extends AsyncResource { this.opaque = opaque || null this.responseHeaders = responseHeaders || null this.handler = handler - this.abort = null + this.controller = null this.context = null this.onInfo = onInfo || null @@ -114,14 +114,14 @@ class PipelineHandler extends AsyncResource { } }, destroy: (err, callback) => { - const { body, req, res, ret, abort } = this + const { body, req, res, ret, controller } = this if (!err && !ret._readableState.endEmitted) { err = new RequestAbortedError() } - if (abort && err) { - abort() + if (controller && err) { + controller.abort() } util.destroy(body, err) @@ -144,16 +144,18 @@ class PipelineHandler extends AsyncResource { addSignal(this, signal) } - onConnect (abort, context) { + onConnect (controller, context) { const { ret, res } = this - assert(!res, 'pipeline cannot be retried') - - if (ret.destroyed) { - throw new RequestAbortedError() + if (this.reason) { + controller.abort(this.reason) + return } - this.abort = abort + assert(!res, 'pipeline cannot be retried') + assert(!ret.destroyed) + + this.controller = controller this.context = context } diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 0a82173d0ed..b863c3f56e4 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -1,10 +1,8 @@ 'use strict' +const assert = require('node:assert') const { Readable } = require('./readable') -const { - InvalidArgumentError, - RequestAbortedError -} = require('../core/errors') +const { InvalidArgumentError } = require('../core/errors') const util = require('../core/util') const { getResolveErrorBodyCallback } = require('./util') const { AsyncResource } = require('node:async_hooks') @@ -51,6 +49,7 @@ class RequestHandler extends AsyncResource { this.opaque = opaque || null this.callback = callback this.res = null + this.controller = null this.body = body this.trailers = {} this.context = null @@ -68,10 +67,13 @@ class RequestHandler extends AsyncResource { } onConnect (controller, context) { - if (!this.callback) { - throw new RequestAbortedError() + if (this.reason) { + controller.abort(this.reason) + return } + assert(this.callback) + this.controller = controller this.context = context } @@ -92,8 +94,7 @@ class RequestHandler extends AsyncResource { const contentType = parsedHeaders['content-type'] const contentLength = parsedHeaders['content-length'] const body = new Readable({ - resume: this.controller?.resume ?? resume, - abort: this.controller?.abort ?? this.controller, + controller: this.controller, contentType, contentLength, highWaterMark diff --git a/lib/api/api-stream.js b/lib/api/api-stream.js index 0998c9f2da1..e21811a4818 100644 --- a/lib/api/api-stream.js +++ b/lib/api/api-stream.js @@ -1,10 +1,10 @@ 'use strict' +const assert = require('node:assert') const { finished, PassThrough } = require('node:stream') const { InvalidArgumentError, - InvalidReturnValueError, - RequestAbortedError + InvalidReturnValueError } = require('../core/errors') const util = require('../core/util') const { getResolveErrorBodyCallback } = require('./util') @@ -53,7 +53,7 @@ class StreamHandler extends AsyncResource { this.factory = factory this.callback = callback this.res = null - this.abort = null + this.controller = null this.context = null this.trailers = null this.body = body @@ -69,12 +69,15 @@ class StreamHandler extends AsyncResource { addSignal(this, signal) } - onConnect (abort, context) { - if (!this.callback) { - throw new RequestAbortedError() + onConnect (controller, context) { + if (this.reason) { + controller.abort(this.reason) + return } - this.abort = abort + assert(this.callback) + + this.controller = controller this.context = context } @@ -126,7 +129,7 @@ class StreamHandler extends AsyncResource { // TODO: Avoid finished. It registers an unnecessary amount of listeners. finished(res, { readable: false }, (err) => { - const { callback, res, opaque, trailers, abort } = this + const { callback, res, opaque, trailers, controller } = this this.res = null if (err || !res.readable) { @@ -137,7 +140,7 @@ class StreamHandler extends AsyncResource { this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) if (err) { - abort() + controller.abort() } }) } diff --git a/lib/api/api-upgrade.js b/lib/api/api-upgrade.js index bed946fdca9..55c8efcfe69 100644 --- a/lib/api/api-upgrade.js +++ b/lib/api/api-upgrade.js @@ -1,6 +1,6 @@ 'use strict' -const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors') +const { InvalidArgumentError, SocketError } = require('../core/errors') const { AsyncResource } = require('node:async_hooks') const util = require('../core/util') const { addSignal, removeSignal } = require('./abort-signal') @@ -27,18 +27,21 @@ class UpgradeHandler extends AsyncResource { this.responseHeaders = responseHeaders || null this.opaque = opaque || null this.callback = callback - this.abort = null + this.controller = null this.context = null addSignal(this, signal) } - onConnect (abort, context) { - if (!this.callback) { - throw new RequestAbortedError() + onConnect (controller, context) { + if (this.reason) { + controller.abort(this.reason) + return } - this.abort = abort + assert(this.callback) + + this.controller = controller this.context = null } diff --git a/lib/api/readable.js b/lib/api/readable.js index 796c237e889..43f31886e56 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -11,7 +11,7 @@ const { ReadableStreamFrom } = require('../core/util') const kConsume = Symbol('kConsume') const kReading = Symbol('kReading') const kBody = Symbol('kBody') -const kAbort = Symbol('kAbort') +const kController = Symbol('kController') const kContentType = Symbol('kContentType') const kContentLength = Symbol('kContentLength') @@ -19,21 +19,18 @@ const noop = () => {} class BodyReadable extends Readable { constructor ({ - resume, - abort, + controller, contentType = '', contentLength, highWaterMark = 64 * 1024 // Same as nodejs fs streams. }) { - super({ - autoDestroy: true, - read: resume, - highWaterMark - }) + assert(controller) + + super({ autoDestroy: true, highWaterMark }) this._readableState.dataEmitted = false - this[kAbort] = abort + this[kController] = controller this[kConsume] = null this[kBody] = null this[kContentType] = contentType @@ -52,12 +49,16 @@ class BodyReadable extends Readable { } if (err) { - this[kAbort]() + this[kController].abort(err) } return super.destroy(err) } + _read () { + this[kController].resume() + } + _destroy (err, callback) { // Workaround for Node "bug". If the stream is destroyed in same // tick as it is created, then a user who is waiting for a diff --git a/lib/core/request.js b/lib/core/request.js index 5de6846df30..ef0365b1931 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -106,7 +106,7 @@ class Request { this.errorHandler = err => { if (this.abort) { - this.abort(err) + this.controller.abort(err) } else { this.error = err } @@ -196,7 +196,7 @@ class Request { try { return this[kHandler].onBodySent(chunk) } catch (err) { - this.abort(err) + this.controller.abort(err) } } } @@ -210,7 +210,7 @@ class Request { try { return this[kHandler].onRequestSent() } catch (err) { - this.abort(err) + this.controller.abort(err) } } } @@ -220,31 +220,39 @@ class Request { assert(!this.aborted) assert(!this.completed) - if (this.error) { - abort(this.error) - } else { - this.abort = abort - - // Hack for compatibility with the old API. - abort.paused = false - abort.aborted = false - abort.internal = null - abort.abort = function (...args) { - abort.aborted = true - abort(...args) - } - abort.pause = function () { - abort.paused = true - abort.internal?.pause() - } - abort.resume = function () { - abort.paused = false - abort.internal?.resume() - } + // TODO(fix): Hack for compatibility with the old API. + // Remove and use proper class with encapsulation in + // semver major. + abort.paused = false + abort.aborted = false + abort._internal = null + abort._abort = abort + abort.abort = function (err) { + this.aborted = true + this._abort(err) + } + abort.pause = function () { + this.paused = true + this._internal?.pause?.() + } + abort.resume = function () { + this.paused = false + this._internal?.resume?.() + } + const controller = abort - this.controller = abort + // TODO (fix): This should be enabled. Needs some thinking + // in regards to pipeline retries. + // if (this.controller) { + // this.controller.abort() + // this.controller = null + // } - return this[kHandler].onConnect(this.controller) + if (this.error) { + this.controller.abort(this.error) + } else { + this.controller = controller + this[kHandler].onConnect(this.controller) } } @@ -253,24 +261,15 @@ class Request { assert(!this.completed) try { - let pause = false - // Compatibility with the old API. - if (this[kHandler].onResponseStarted) { - pause ||= this[kHandler].onResponseStarted() === false - } - - if (this[kHandler].onStart) { - pause ||= this[kHandler].onStart() === false - } - - return !pause + this[kHandler].onResponseStarted?.() + this[kHandler].onStart?.onStart() } catch (err) { - this.abort(err) + this.controller.abort(err) } } - onHeaders (statusCode, headers, resume, statusText) { + onHeaders (statusCode, headers, statusText) { assert(!this.aborted) assert(!this.completed) @@ -279,9 +278,11 @@ class Request { } try { - return this[kHandler].onHeaders(statusCode, headers, resume, statusText) + if (this[kHandler].onHeaders?.(statusCode, headers, () => this.controller.resume(), statusText) === false) { + this.controller.pause() + } } catch (err) { - this.abort(err) + this.controller.abort(err) } } @@ -290,9 +291,11 @@ class Request { assert(!this.completed) try { - return this[kHandler].onData(chunk) + if (this[kHandler].onData?.(chunk) === false) { + this.controller.pause() + } } catch (err) { - this.abort(err) + this.controller.abort(err) } } @@ -300,7 +303,7 @@ class Request { assert(!this.aborted) assert(!this.completed) - return this[kHandler].onUpgrade(statusCode, headers, socket) + this[kHandler].onUpgrade(statusCode, headers, socket) } onComplete (trailers) { @@ -314,7 +317,7 @@ class Request { } try { - return this[kHandler].onComplete(trailers) + this[kHandler].onComplete(trailers) } catch (err) { // TODO (fix): This might be a bad idea? this.onError(err) @@ -333,7 +336,7 @@ class Request { } this.aborted = true - return this[kHandler].onError(error) + this[kHandler].onError(error) } onFinally () { diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 0637dc3a796..5b4ca89cf03 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -148,30 +148,6 @@ const TIMEOUT_HEADERS = 1 const TIMEOUT_BODY = 2 const TIMEOUT_IDLE = 3 -class Controller { - #parser - #paused = false - - constructor (parser) { - this.#parser = parser - this.resume = this.resume.bind(this) - this.pause = this.pause.bind(this) - } - - pause () { - this.#paused = true - } - - resume () { - this.#paused = false - this.#parser.resume() - } - - get paused () { - return this.#paused - } -} - class Parser { constructor (client, socket, { exports }) { assert(Number.isFinite(client[kMaxHeadersSize]) && client[kMaxHeadersSize] > 0) @@ -193,8 +169,6 @@ class Parser { this.paused = false this.resume = this.resume.bind(this) - this.controller = new Controller(this) - this.bytesRead = 0 this.keepAlive = '' @@ -352,16 +326,11 @@ class Parser { return -1 } - request.controller.internal = this.controller - if (request.controller.paused) { - request.controller.internal.pause() - } + request.controller._internal = this - if (request.onStart() === false) { - this.controller.pause() - } + request.onStart() - return this.controller.paused ? constants.ERROR.PAUSED : 0 + return request.controller.paused ? constants.ERROR.PAUSED : 0 } onHeaderField (buf) { @@ -540,9 +509,7 @@ class Parser { socket[kReset] = true } - if (request.onHeaders(statusCode, headers, this.controller.resume, statusText) === false) { - this.controller.pause() - } + request.onHeaders(statusCode, headers, statusText) if (request.aborted) { return -1 @@ -561,7 +528,7 @@ class Parser { client[kResume]() } - return this.controller.paused ? constants.ERROR.PAUSED : 0 + return request.controller.paused ? constants.ERROR.PAUSED : 0 } onBody (buf) { @@ -591,11 +558,9 @@ class Parser { this.bytesRead += buf.length - if (request.onData(buf) === false) { - this.controller.pause() - } + request.onData(buf) - return this.controller.paused ? constants.ERROR.PAUSED : 0 + return request.controller.paused ? constants.ERROR.PAUSED : 0 } onMessageComplete () { diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 3d31db93006..8423e9272b4 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -42,31 +42,6 @@ try { http2 = { constants: {} } } -class Controller { - #stream - #paused = false - - constructor (stream) { - this.#stream = stream - this.resume = this.resume.bind(this) - this.pause = this.pause.bind(this) - } - - pause () { - this.#paused = true - this.#stream.pause() - } - - resume () { - this.#paused = false - this.#stream.resume() - } - - get paused () { - return this.#paused - } -} - const { constants: { HTTP2_HEADER_AUTHORITY, @@ -435,8 +410,6 @@ function writeH2 (client, request) { stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers - const resume = stream.resume.bind(stream) - // Due to the stream nature, it is possible we face a race condition // where the stream has been assigned, but the request has been aborted // the request remains in-flight and headers hasn't been received yet @@ -449,23 +422,17 @@ function writeH2 (client, request) { return } - request.controller.internal = new Controller(stream) + request.controller._internal = stream if (request.controller.paused) { - request.controller.internal.pause() - } - - if (request.onStart() === false) { stream.pause() } - if (request.onHeaders(Number(statusCode), realHeaders, resume, '') === false) { - stream.pause() - } + request.onStart() + // TODO (fix): Don't call onHeaders if paused... + request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), '') stream.on('data', (chunk) => { - if (request.onData(chunk) === false) { - stream.pause() - } + request.onData(chunk) }) }) diff --git a/lib/mock/mock-utils.js b/lib/mock/mock-utils.js index 10922a9c738..6a84ac37d88 100644 --- a/lib/mock/mock-utils.js +++ b/lib/mock/mock-utils.js @@ -293,6 +293,7 @@ function mockDispatch (opts, handler) { super() this.resume = this.resume.bind(this) this.pause = this.pause.bind(this) + this.abort = this.abort.bind(this) } resume () { @@ -303,7 +304,8 @@ function mockDispatch (opts, handler) { this.#paused = true } - abort () { + abort (err) { + handler.onError(err) this.#aborted = true } diff --git a/test/client-request.js b/test/client-request.js index 8e0111de09c..420549e21f5 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -14,6 +14,10 @@ const { promisify } = require('node:util') const { NotSupportedError } = require('../lib/core/errors') const { parseFormDataString } = require('./utils/formdata') +process.on('uncatchedException', (err) => { + console.error('###', err) +}) + test('request dump big', async (t) => { t = tspl(t, { plan: 3 }) @@ -140,7 +144,7 @@ test('request hwm', async (t) => { }) test('request abort before headers', async (t) => { - t = tspl(t, { plan: 6 }) + t = tspl(t, { plan: 4 }) const signal = new EE() const server = createServer((req, res) => { @@ -162,7 +166,6 @@ test('request abort before headers', async (t) => { t.ok(err instanceof errors.RequestAbortedError) t.strictEqual(signal.listenerCount('abort'), 0) }) - t.strictEqual(signal.listenerCount('abort'), 1) client.request({ path: '/', @@ -172,7 +175,6 @@ test('request abort before headers', async (t) => { t.ok(err instanceof errors.RequestAbortedError) t.strictEqual(signal.listenerCount('abort'), 0) }) - t.strictEqual(signal.listenerCount('abort'), 2) }) }) diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index fc617734e63..c5821d65e8e 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -875,15 +875,15 @@ test('dispatches in expected order', async (t) => { method: 'POST', body: 'body' }, { - onConnect () { + onConnect (controller) { + assert(typeof controller.resume === 'function') + assert(typeof controller.pause === 'function') dispatches.push('onConnect') }, onBodySent () { dispatches.push('onBodySent') }, - onStart (controller) { - assert(typeof controller.resume === 'function') - assert(typeof controller.pause === 'function') + onStart () { dispatches.push('onStart') }, onResponseStarted () { @@ -937,15 +937,15 @@ test('dispatches in expected order for http2', async (t) => { method: 'POST', body: 'body' }, { - onConnect () { + onConnect (controller) { + assert(typeof controller.resume === 'function') + assert(typeof controller.pause === 'function') dispatches.push('onConnect') }, onBodySent () { dispatches.push('onBodySent') }, - onStart (controller) { - assert(typeof controller.resume === 'function') - assert(typeof controller.pause === 'function') + onStart () { dispatches.push('onStart') }, onResponseStarted () { diff --git a/test/readable.js b/test/readable.js index dd0631daf8b..c7b2b5f21e9 100644 --- a/test/readable.js +++ b/test/readable.js @@ -12,7 +12,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.push(Buffer.from('hello')) @@ -34,7 +34,7 @@ describe('Readable', () => { function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.destroy(new Error('kaboom')) await t.rejects(r.text(), new Error('kaboom')) @@ -48,7 +48,7 @@ describe('Readable', () => { function abort () { } const r = await new Promise(resolve => { - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.destroy(new Error('kaboom')) resolve(r) }) @@ -67,7 +67,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.push(Buffer.from('hello world')) @@ -90,7 +90,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.push(Buffer.from('{"hello": "world"}')) @@ -110,7 +110,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.push(Buffer.from('hello world')) @@ -130,7 +130,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.push('\uFEFF') r.push(Buffer.from('hello world')) @@ -151,7 +151,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ resume, abort }) + const r = new Readable({ controller: { resume, abort } }) r.push(Buffer.from('hello world')) diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index 675dae273de..e03c0af9037 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -219,14 +219,16 @@ declare namespace Dispatcher { export interface Controller { pause (): void; resume (): void; + abort (err?: Error): void; readonly paused: boolean; + readonly aborted: boolean; } export interface DispatchHandlers { /** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */ - onConnect?(abort: () => void): void; + onConnect?(controller: Controller): void; /** */ - onStart?(controller: Controller): void; + onStart?(): void; /** Invoked when an error has occurred. */ onError?(err: Error): void; /** Invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. */