Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Apr 3, 2024
1 parent 9ec58f6 commit c65408c
Show file tree
Hide file tree
Showing 12 changed files with 81 additions and 55 deletions.
3 changes: 2 additions & 1 deletion lib/api/abort-signal.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ function abort (self) {
} else if (self.controller?.abort) {
self.controller.abort(self[kSignal]?.reason)
} else {
self.onError(self[kSignal]?.reason ?? new RequestAbortedError())
self.reason = self[kSignal]?.reason ?? new RequestAbortedError()
}
removeSignal(self)
}

function addSignal (self, signal) {
Expand Down
17 changes: 11 additions & 6 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
'use strict'

const assert = require('node:assert')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const { InvalidArgumentError, SocketError } = require('../core/errors')
const util = require('../core/util')
const { addSignal, removeSignal } = require('./abort-signal')

Expand All @@ -26,17 +27,21 @@ class ConnectHandler extends AsyncResource {
this.opaque = opaque || null
this.responseHeaders = responseHeaders || null
this.callback = callback
this.abort = null
this.controller = null

this.reason = null
addSignal(this, signal)
}

onConnect (abort, context) {
if (!this.callback) {
throw new RequestAbortedError()
onConnect (controller, context) {
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(this.callback)

this.controller = controller
this.context = context
}

Expand Down
23 changes: 13 additions & 10 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class PipelineHandler extends AsyncResource {
this.opaque = opaque || null
this.responseHeaders = responseHeaders || null
this.handler = handler
this.abort = null
this.controller = null
this.context = null
this.onInfo = onInfo || null

Expand All @@ -114,14 +114,14 @@ class PipelineHandler extends AsyncResource {
}
},
destroy: (err, callback) => {
const { body, req, res, ret, abort } = this
const { body, req, res, ret, controller } = this

if (!err && !ret._readableState.endEmitted) {
err = new RequestAbortedError()
}

if (abort && err) {
abort()
if (controller && err) {
controller.abort()
}

util.destroy(body, err)
Expand All @@ -141,19 +141,22 @@ class PipelineHandler extends AsyncResource {

this.res = null

this.reason = null
addSignal(this, signal)
}

onConnect (abort, context) {
onConnect (controller, context) {
const { ret, res } = this

assert(!res, 'pipeline cannot be retried')

if (ret.destroyed) {
throw new RequestAbortedError()
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(!res, 'pipeline cannot be retried')
assert(!ret.destroyed)

this.controller = controller
this.context = context
}

Expand Down
15 changes: 9 additions & 6 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
'use strict'

const assert = require('node:assert')
const { Readable } = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError
} = require('../core/errors')
const { InvalidArgumentError } = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
Expand Down Expand Up @@ -57,21 +55,26 @@ class RequestHandler extends AsyncResource {
this.onInfo = onInfo || null
this.throwOnError = throwOnError
this.highWaterMark = highWaterMark
this.controller = null

if (util.isStream(body)) {
body.on('error', (err) => {
this.onError(err)
})
}

this.reason = null
addSignal(this, signal)
}

onConnect (controller, context) {
if (!this.callback) {
throw new RequestAbortedError()
if (this.reason) {
controller.abort(this.reason)
return
}

assert(this.callback)

this.controller = controller
this.context = context
}
Expand Down
22 changes: 13 additions & 9 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
'use strict'

const assert = require('node:assert')
const { finished, PassThrough } = require('node:stream')
const {
InvalidArgumentError,
InvalidReturnValueError,
RequestAbortedError
InvalidReturnValueError
} = require('../core/errors')
const util = require('../core/util')
const { getResolveErrorBodyCallback } = require('./util')
Expand Down Expand Up @@ -53,7 +53,7 @@ class StreamHandler extends AsyncResource {
this.factory = factory
this.callback = callback
this.res = null
this.abort = null
this.controller = null
this.context = null
this.trailers = null
this.body = body
Expand All @@ -66,15 +66,19 @@ class StreamHandler extends AsyncResource {
})
}

this.reason = null
addSignal(this, signal)
}

onConnect (abort, context) {
if (!this.callback) {
throw new RequestAbortedError()
onConnect (controller, context) {
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(this.callback)

this.controller = controller
this.context = context
}

Expand Down Expand Up @@ -126,7 +130,7 @@ class StreamHandler extends AsyncResource {

// TODO: Avoid finished. It registers an unnecessary amount of listeners.
finished(res, { readable: false }, (err) => {
const { callback, res, opaque, trailers, abort } = this
const { callback, res, opaque, trailers, controller } = this

this.res = null
if (err || !res.readable) {
Expand All @@ -137,7 +141,7 @@ class StreamHandler extends AsyncResource {
this.runInAsyncScope(callback, null, err || null, { opaque, trailers })

if (err) {
abort()
controller.abort()
}
})
}
Expand Down
16 changes: 10 additions & 6 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const { InvalidArgumentError, SocketError } = require('../core/errors')
const { AsyncResource } = require('node:async_hooks')
const util = require('../core/util')
const { addSignal, removeSignal } = require('./abort-signal')
Expand All @@ -27,18 +27,22 @@ class UpgradeHandler extends AsyncResource {
this.responseHeaders = responseHeaders || null
this.opaque = opaque || null
this.callback = callback
this.abort = null
this.controller = null
this.context = null

this.reason = null
addSignal(this, signal)
}

onConnect (abort, context) {
if (!this.callback) {
throw new RequestAbortedError()
onConnect (controller, context) {
if (this.reason) {
controller.abort(this.reason)
return
}

this.abort = abort
assert(this.callback)

this.controller = controller
this.context = null
}

Expand Down
4 changes: 2 additions & 2 deletions lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,9 +229,9 @@ class Request {
abort.paused = false
abort.aborted = false
abort.internal = null
abort.abort = function (...args) {
abort.abort = function (err) {
abort.aborted = true
abort(...args)
abort(err)
}
abort.pause = function () {
abort.paused = true
Expand Down
2 changes: 1 addition & 1 deletion lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ function writeH2 (client, request) {
stream.pause()
}

if (request.onHeaders(Number(statusCode), realHeaders, resume, '') === false) {
if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), resume, '') === false) {
stream.pause()
}

Expand Down
4 changes: 3 additions & 1 deletion lib/mock/mock-utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ function mockDispatch (opts, handler) {
super()
this.resume = this.resume.bind(this)
this.pause = this.pause.bind(this)
this.abort = this.abort.bind(this)
}

resume () {
Expand All @@ -303,7 +304,8 @@ function mockDispatch (opts, handler) {
this.#paused = true
}

abort () {
abort (err) {
handler.onError(err)
this.#aborted = true
}

Expand Down
8 changes: 5 additions & 3 deletions test/client-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ const { promisify } = require('node:util')
const { NotSupportedError } = require('../lib/core/errors')
const { parseFormDataString } = require('./utils/formdata')

process.on('uncatchedException', (err) => {
console.error('###', err)
})

test('request dump big', async (t) => {
t = tspl(t, { plan: 3 })

Expand Down Expand Up @@ -140,7 +144,7 @@ test('request hwm', async (t) => {
})

test('request abort before headers', async (t) => {
t = tspl(t, { plan: 6 })
t = tspl(t, { plan: 4 })

const signal = new EE()
const server = createServer((req, res) => {
Expand All @@ -162,7 +166,6 @@ test('request abort before headers', async (t) => {
t.ok(err instanceof errors.RequestAbortedError)
t.strictEqual(signal.listenerCount('abort'), 0)
})
t.strictEqual(signal.listenerCount('abort'), 1)

client.request({
path: '/',
Expand All @@ -172,7 +175,6 @@ test('request abort before headers', async (t) => {
t.ok(err instanceof errors.RequestAbortedError)
t.strictEqual(signal.listenerCount('abort'), 0)
})
t.strictEqual(signal.listenerCount('abort'), 2)
})
})

Expand Down
16 changes: 8 additions & 8 deletions test/node-test/client-dispatch.js
Original file line number Diff line number Diff line change
Expand Up @@ -875,15 +875,15 @@ test('dispatches in expected order', async (t) => {
method: 'POST',
body: 'body'
}, {
onConnect () {
onConnect (controller) {
assert(typeof controller.resume === 'function')
assert(typeof controller.pause === 'function')
dispatches.push('onConnect')
},
onBodySent () {
dispatches.push('onBodySent')
},
onStart (controller) {
assert(typeof controller.resume === 'function')
assert(typeof controller.pause === 'function')
onStart () {
dispatches.push('onStart')
},
onResponseStarted () {
Expand Down Expand Up @@ -937,15 +937,15 @@ test('dispatches in expected order for http2', async (t) => {
method: 'POST',
body: 'body'
}, {
onConnect () {
onConnect (controller) {
assert(typeof controller.resume === 'function')
assert(typeof controller.pause === 'function')
dispatches.push('onConnect')
},
onBodySent () {
dispatches.push('onBodySent')
},
onStart (controller) {
assert(typeof controller.resume === 'function')
assert(typeof controller.pause === 'function')
onStart () {
dispatches.push('onStart')
},
onResponseStarted () {
Expand Down
6 changes: 4 additions & 2 deletions types/dispatcher.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,16 @@ declare namespace Dispatcher {
export interface Controller {
pause (): void;
resume (): void;
abort (err?:Error): void;
readonly paused: boolean;
readonly aborted: boolean;
}

export interface DispatchHandlers {
/** Invoked before request is dispatched on socket. May be invoked multiple times when a request is retried when the request at the head of the pipeline fails. */
onConnect?(abort: () => void): void;
onConnect?(controller: Controller): void;
/** */
onStart?(controller: Controller): void;
onStart?(): void;
/** Invoked when an error has occurred. */
onError?(err: Error): void;
/** Invoked when request is upgraded either due to a `Upgrade` header or `CONNECT` method. */
Expand Down

0 comments on commit c65408c

Please sign in to comment.