diff --git a/lib/api/api-connect.js b/lib/api/api-connect.js index 11137771d03..f9dbbf64fe8 100644 --- a/lib/api/api-connect.js +++ b/lib/api/api-connect.js @@ -27,20 +27,20 @@ class ConnectHandler extends AsyncResource { this.opaque = opaque || null this.responseHeaders = responseHeaders || null this.callback = callback - this.controller = null + this.abort = null addSignal(this, signal) } - onConnect (controller, context) { + onConnect (abort, context) { if (this.reason) { - controller.abort(this.reason) + abort(this.reason) return } assert(this.callback) - this.controller = controller + this.abort = abort this.context = context } diff --git a/lib/api/api-pipeline.js b/lib/api/api-pipeline.js index b058b456254..e64b3294965 100644 --- a/lib/api/api-pipeline.js +++ b/lib/api/api-pipeline.js @@ -88,7 +88,7 @@ class PipelineHandler extends AsyncResource { this.opaque = opaque || null this.responseHeaders = responseHeaders || null this.handler = handler - this.controller = null + this.abort = null this.context = null this.onInfo = onInfo || null @@ -114,14 +114,14 @@ class PipelineHandler extends AsyncResource { } }, destroy: (err, callback) => { - const { body, req, res, ret, controller } = this + const { body, req, res, ret, abort } = this if (!err && !ret._readableState.endEmitted) { err = new RequestAbortedError() } - if (controller && err) { - controller.abort() + if (abort && err) { + abort() } util.destroy(body, err) @@ -144,18 +144,18 @@ class PipelineHandler extends AsyncResource { addSignal(this, signal) } - onConnect (controller, context) { + onConnect (abort, context) { const { ret, res } = this if (this.reason) { - controller.abort(this.reason) + abort(this.reason) return } assert(!res, 'pipeline cannot be retried') assert(!ret.destroyed) - this.controller = controller + this.abort = abort this.context = context } diff --git a/lib/api/api-request.js b/lib/api/api-request.js index b863c3f56e4..2713d236a7c 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -49,7 +49,7 @@ class RequestHandler extends AsyncResource { this.opaque = opaque || null this.callback = callback this.res = null - this.controller = null + this.abort = null this.body = body this.trailers = {} this.context = null @@ -66,15 +66,15 @@ class RequestHandler extends AsyncResource { addSignal(this, signal) } - onConnect (controller, context) { + onConnect (abort, context) { if (this.reason) { - controller.abort(this.reason) + abort(this.reason) return } assert(this.callback) - this.controller = controller + this.abort = abort this.context = context } @@ -94,7 +94,8 @@ class RequestHandler extends AsyncResource { const contentType = parsedHeaders['content-type'] const contentLength = parsedHeaders['content-length'] const body = new Readable({ - controller: this.controller, + resume, + abort: this.abort, contentType, contentLength, highWaterMark @@ -121,10 +122,8 @@ class RequestHandler extends AsyncResource { } onData (chunk) { - const { res, controller } = this - if (!res.push(chunk)) { - controller.pause() - } + const { res } = this + return res.push(chunk) } onComplete (trailers) { diff --git a/lib/api/api-stream.js b/lib/api/api-stream.js index e21811a4818..5549cec86b0 100644 --- a/lib/api/api-stream.js +++ b/lib/api/api-stream.js @@ -53,7 +53,7 @@ class StreamHandler extends AsyncResource { this.factory = factory this.callback = callback this.res = null - this.controller = null + this.abort = null this.context = null this.trailers = null this.body = body @@ -69,15 +69,15 @@ class StreamHandler extends AsyncResource { addSignal(this, signal) } - onConnect (controller, context) { + onConnect (abort, context) { if (this.reason) { - controller.abort(this.reason) + abort(this.reason) return } assert(this.callback) - this.controller = controller + this.abort = abort this.context = context } @@ -129,7 +129,7 @@ class StreamHandler extends AsyncResource { // TODO: Avoid finished. It registers an unnecessary amount of listeners. finished(res, { readable: false }, (err) => { - const { callback, res, opaque, trailers, controller } = this + const { callback, res, opaque, trailers, abort } = this this.res = null if (err || !res.readable) { @@ -140,7 +140,7 @@ class StreamHandler extends AsyncResource { this.runInAsyncScope(callback, null, err || null, { opaque, trailers }) if (err) { - controller.abort() + abort() } }) } diff --git a/lib/api/api-upgrade.js b/lib/api/api-upgrade.js index 55c8efcfe69..019fe1efa2d 100644 --- a/lib/api/api-upgrade.js +++ b/lib/api/api-upgrade.js @@ -27,21 +27,21 @@ class UpgradeHandler extends AsyncResource { this.responseHeaders = responseHeaders || null this.opaque = opaque || null this.callback = callback - this.controller = null + this.abort = null this.context = null addSignal(this, signal) } - onConnect (controller, context) { + onConnect (abort, context) { if (this.reason) { - controller.abort(this.reason) + abort(this.reason) return } assert(this.callback) - this.controller = controller + this.abort = abort this.context = null } diff --git a/lib/api/readable.js b/lib/api/readable.js index 43f31886e56..796c237e889 100644 --- a/lib/api/readable.js +++ b/lib/api/readable.js @@ -11,7 +11,7 @@ const { ReadableStreamFrom } = require('../core/util') const kConsume = Symbol('kConsume') const kReading = Symbol('kReading') const kBody = Symbol('kBody') -const kController = Symbol('kController') +const kAbort = Symbol('kAbort') const kContentType = Symbol('kContentType') const kContentLength = Symbol('kContentLength') @@ -19,18 +19,21 @@ const noop = () => {} class BodyReadable extends Readable { constructor ({ - controller, + resume, + abort, contentType = '', contentLength, highWaterMark = 64 * 1024 // Same as nodejs fs streams. }) { - assert(controller) - - super({ autoDestroy: true, highWaterMark }) + super({ + autoDestroy: true, + read: resume, + highWaterMark + }) this._readableState.dataEmitted = false - this[kController] = controller + this[kAbort] = abort this[kConsume] = null this[kBody] = null this[kContentType] = contentType @@ -49,16 +52,12 @@ class BodyReadable extends Readable { } if (err) { - this[kController].abort(err) + this[kAbort]() } return super.destroy(err) } - _read () { - this[kController].resume() - } - _destroy (err, callback) { // Workaround for Node "bug". If the stream is destroyed in same // tick as it is created, then a user who is waiting for a diff --git a/lib/core/request.js b/lib/core/request.js index fcc4ae8c2cf..e78b1d04e8f 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -89,7 +89,7 @@ class Request { this.method = method - this.abort = null + this.controller = null if (body == null) { this.body = null @@ -105,7 +105,7 @@ class Request { } this.errorHandler = err => { - if (this.abort) { + if (this.controller) { this.controller.abort(err) } else { this.error = err @@ -223,24 +223,24 @@ class Request { // TODO(fix): Hack for compatibility with the old API. // Remove and use proper class with encapsulation in // semver major. - abort.paused = false - abort.aborted = false - abort._internal = null - abort._abort = abort - abort.abort = function (err) { - this.aborted = true - this._abort(err) - } - abort.pause = function () { - this.paused = true - this._internal?.pause?.() - } - abort.resume = function () { - this.paused = false - this._internal?.resume?.() + this.controller = abort.controller = { + paused: false, + aborted: false, + _internal: null, + _abort: abort, + abort (err) { + this.aborted = true + this._abort(err) + }, + pause () { + this.paused = true + this._internal?.pause?.() + }, + resume () { + this.paused = false + this._internal?.resume?.() + } } - abort.controller = abort - const controller = abort // TODO (fix): This should be enabled. Needs some thinking // in regards to pipeline retries. @@ -250,9 +250,8 @@ class Request { // } if (this.error) { - controller.abort(this.error) + abort(this.error) } else { - this.controller = controller this[kHandler].onConnect(abort) } } @@ -260,6 +259,7 @@ class Request { onStart () { assert(!this.aborted) assert(!this.completed) + assert(this.controller) try { // Compatibility with the old API. @@ -273,6 +273,7 @@ class Request { onHeaders (statusCode, statusText, headers) { assert(!this.aborted) assert(!this.completed) + assert(this.controller) if (channels.headers.hasSubscribers) { channels.headers.publish({ request: this, response: { statusCode, headers, statusText } }) @@ -290,6 +291,7 @@ class Request { onData (chunk) { assert(!this.aborted) assert(!this.completed) + assert(this.controller) try { if (this[kHandler].onData?.(chunk) === false) { @@ -303,6 +305,7 @@ class Request { onUpgrade (statusCode, headers, socket) { assert(!this.aborted) assert(!this.completed) + assert(this.controller) this[kHandler].onUpgrade(statusCode, headers, socket) } @@ -320,7 +323,6 @@ class Request { try { this[kHandler].onComplete(trailers) } catch (err) { - // TODO (fix): This might be a bad idea? this.onError(err) } } diff --git a/lib/handler/redirect-handler.js b/lib/handler/redirect-handler.js index 368ef520d76..16a7b2150a9 100644 --- a/lib/handler/redirect-handler.js +++ b/lib/handler/redirect-handler.js @@ -201,9 +201,9 @@ function shouldRemoveHeader (header, removeContent, unknownOrigin) { if (removeContent && util.headerNameToString(header).startsWith('content-')) { return true } - if (unknownOrigin && (header.length === 13 || header.length === 6)) { + if (unknownOrigin && (header.length === 13 || header.length === 6 || header.length === 19)) { const name = util.headerNameToString(header) - return name === 'authorization' || name === 'cookie' + return name === 'authorization' || name === 'cookie' || name === 'proxy-authorization' } return false } diff --git a/lib/mock/mock-utils.js b/lib/mock/mock-utils.js index 6a84ac37d88..05f5befeb20 100644 --- a/lib/mock/mock-utils.js +++ b/lib/mock/mock-utils.js @@ -322,7 +322,7 @@ function mockDispatch (opts, handler) { handler.abort = nop if (!controller.aborted) { - handler.onConnect?.(controller) + handler.onConnect?.(err => controller.abort(err)) } if (!controller.aborted) { @@ -330,7 +330,7 @@ function mockDispatch (opts, handler) { } if (!controller.aborted) { - handler.onHeaders(statusCode, responseHeaders, controller.resume, getStatusText(statusCode)) + handler.onHeaders(statusCode, responseHeaders, () => controller.resume(), getStatusText(statusCode)) } if (!controller.aborted) { diff --git a/test/client-request.js b/test/client-request.js index 420549e21f5..8e0111de09c 100644 --- a/test/client-request.js +++ b/test/client-request.js @@ -14,10 +14,6 @@ const { promisify } = require('node:util') const { NotSupportedError } = require('../lib/core/errors') const { parseFormDataString } = require('./utils/formdata') -process.on('uncatchedException', (err) => { - console.error('###', err) -}) - test('request dump big', async (t) => { t = tspl(t, { plan: 3 }) @@ -144,7 +140,7 @@ test('request hwm', async (t) => { }) test('request abort before headers', async (t) => { - t = tspl(t, { plan: 4 }) + t = tspl(t, { plan: 6 }) const signal = new EE() const server = createServer((req, res) => { @@ -166,6 +162,7 @@ test('request abort before headers', async (t) => { t.ok(err instanceof errors.RequestAbortedError) t.strictEqual(signal.listenerCount('abort'), 0) }) + t.strictEqual(signal.listenerCount('abort'), 1) client.request({ path: '/', @@ -175,6 +172,7 @@ test('request abort before headers', async (t) => { t.ok(err instanceof errors.RequestAbortedError) t.strictEqual(signal.listenerCount('abort'), 0) }) + t.strictEqual(signal.listenerCount('abort'), 2) }) }) diff --git a/test/node-test/client-dispatch.js b/test/node-test/client-dispatch.js index c5821d65e8e..78a377f83cb 100644 --- a/test/node-test/client-dispatch.js +++ b/test/node-test/client-dispatch.js @@ -875,9 +875,7 @@ test('dispatches in expected order', async (t) => { method: 'POST', body: 'body' }, { - onConnect (controller) { - assert(typeof controller.resume === 'function') - assert(typeof controller.pause === 'function') + onConnect ({ controller }) { dispatches.push('onConnect') }, onBodySent () { @@ -937,9 +935,7 @@ test('dispatches in expected order for http2', async (t) => { method: 'POST', body: 'body' }, { - onConnect (controller) { - assert(typeof controller.resume === 'function') - assert(typeof controller.pause === 'function') + onConnect ({ controller }) { dispatches.push('onConnect') }, onBodySent () { diff --git a/test/readable.js b/test/readable.js index c7b2b5f21e9..dd0631daf8b 100644 --- a/test/readable.js +++ b/test/readable.js @@ -12,7 +12,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.push(Buffer.from('hello')) @@ -34,7 +34,7 @@ describe('Readable', () => { function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.destroy(new Error('kaboom')) await t.rejects(r.text(), new Error('kaboom')) @@ -48,7 +48,7 @@ describe('Readable', () => { function abort () { } const r = await new Promise(resolve => { - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.destroy(new Error('kaboom')) resolve(r) }) @@ -67,7 +67,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.push(Buffer.from('hello world')) @@ -90,7 +90,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.push(Buffer.from('{"hello": "world"}')) @@ -110,7 +110,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.push(Buffer.from('hello world')) @@ -130,7 +130,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.push('\uFEFF') r.push(Buffer.from('hello world')) @@ -151,7 +151,7 @@ describe('Readable', () => { } function abort () { } - const r = new Readable({ controller: { resume, abort } }) + const r = new Readable({ resume, abort }) r.push(Buffer.from('hello world')) diff --git a/types/dispatcher.d.ts b/types/dispatcher.d.ts index e03c0af9037..327a62b1794 100644 --- a/types/dispatcher.d.ts +++ b/types/dispatcher.d.ts @@ -226,7 +226,7 @@ declare namespace Dispatcher { export interface DispatchHandlers { /** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */ - onConnect?(controller: Controller): void; + onConnect?({ controller: Controller }): void; /** */ onStart?(): void; /** Invoked when an error has occurred. */